mahout贝叶斯算法开发思路(拓展篇)1

标签: mahout 算法 开发 | 发表时间:2013-09-14 22:28 | 作者:fansy1990
出处:http://blog.csdn.net

首先说明一点,此篇blog解决的问题是就下面的数据如何应用mahout中的贝叶斯算法?(这个问题是在上篇(。。。完结篇)blog最后留的问题,如果想直接使用该工具,可以在 mahout贝叶斯算法拓展下载):

0.2	0.3	0.4:1
0.32	0.43	0.45:1
0.23	0.33	0.54:1
2.4	2.5	2.6:2
2.3	2.2	2.1:2
5.4	7.2	7.2:3
5.6	7	6:3
5.8	7.1	6.3:3
6	6	5.4:3
11	12	13:4

前篇blog上面的数据在最后的空格使用冒号代替(因为样本向量和标识的解析需要不同的解析符号,同一个的话解析就会出问题)。关于上面的数据其实就是说样本[0.2,0.3,0.4]被贴上了标签1,其他依次类推,然后这个作为训练数据训练贝叶斯模型,最后通过上面的数据进行分类建议模型的准确度。

处理的过程大概可以分为7个步骤:1.转换原始数据到贝叶斯算法可以使用的数据格式;2. 把所有的标识转换为数值型格式;3.对原始数据进行处理获得贝叶斯模型的属性参数值1;4.对原始数据进行处理获得贝叶斯模型的属性参数值2;5.根据3、4的结果把贝叶斯模型写入文件;6.对原始数据进行自分类;7.根据6的结果对贝叶斯模型进行评价。

下面分别介绍:

1. 数据格式转换:

代码如下:

package mahout.fansy.bayes.transform;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.common.AbstractJob;
import org.apache.mahout.common.HadoopUtil;
import org.apache.mahout.math.NamedVector;
import org.apache.mahout.math.RandomAccessSparseVector;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;

public class TFText2VectorWritable extends AbstractJob {
	/**
	 * 处理把
	 * [2.1,3.2,1.2:a
	 * 2.1,3.2,1.3:b]
	 * 这样的数据转换为 key:new Text(a),value:new VectorWritable(2.1,3.2,1.2:a) 的序列数据
	 * @param args
	 * @throws Exception 
	 */
	public static void main(String[] args) throws Exception {
		ToolRunner.run(new Configuration(), new TFText2VectorWritable(),args);
	}
	@Override
	public int run(String[] args) throws Exception {
		addInputOption();
	    addOutputOption();
	    // 增加向量之间的分隔符,默认为逗号;
	    addOption("splitCharacterVector","scv", "Vector split character,default is ','", ",");
	    // 增加向量和标示的分隔符,默认为冒号;
	    addOption("splitCharacterLabel","scl", "Vector and Label split character,default is ':'", ":");
	    if (parseArguments(args) == null) {
		      return -1;
		}
	    Path input = getInputPath();
	    Path output = getOutputPath();
	    String scv=getOption("splitCharacterVector");
	    String scl=getOption("splitCharacterLabel");
	    Configuration conf=getConf();
	//    FileSystem.get(output.toUri(), conf).deleteOnExit(output);//如果输出存在,删除输出
	    HadoopUtil.delete(conf, output);
	    conf.set("SCV", scv);
	    conf.set("SCL", scl);
	    Job job=new Job(conf);
	    job.setJobName("transform text to vector by input:"+input.getName());
	    job.setJarByClass(TFText2VectorWritable.class); 
	    
	    job.setInputFormatClass(TextInputFormat.class);
	    job.setOutputFormatClass(SequenceFileOutputFormat.class);
	    
	    job.setMapperClass(TFMapper.class);
	    job.setMapOutputKeyClass(Text.class);
	    job.setMapOutputValueClass(VectorWritable.class);
	    job.setNumReduceTasks(0);
	    job.setOutputKeyClass(Text.class);
	    job.setOutputValueClass(VectorWritable.class);
	    TextInputFormat.setInputPaths(job, input);
	    SequenceFileOutputFormat.setOutputPath(job, output);
	   
	   
	    if(job.waitForCompletion(true)){
	    	return 0;
	    }
		return -1;
	}

	
	public static class TFMapper extends Mapper<LongWritable,Text,Text,VectorWritable>{
		private String SCV;
		private String SCL;
		/**
		 * 初始化分隔符参数 
		 */
		@Override
		public void setup(Context ctx){
			SCV=ctx.getConfiguration().get("SCV");
			SCL=ctx.getConfiguration().get("SCL");
		}
		/**
		 * 解析字符串,并输出
		 * @throws InterruptedException 
		 * @throws IOException 
		 */
		@Override
		public void map(LongWritable key,Text value,Context ctx) throws IOException, InterruptedException{
			String[] valueStr=value.toString().split(SCL);
			if(valueStr.length!=2){
				return;  // 没有两个说明解析错误,退出
			}
			String name=valueStr[1];
			String[] vector=valueStr[0].split(SCV);
			Vector v=new RandomAccessSparseVector(vector.length);
			for(int i=0;i<vector.length;i++){
				double item=0;
				try{
					item=Double.parseDouble(vector[i]);
				}catch(Exception e){
					return; // 如果不可以转换,说明输入数据有问题
				}
				v.setQuick(i, item);
			}
			NamedVector nv=new NamedVector(v,name);
			VectorWritable vw=new VectorWritable(nv);
			ctx.write(new Text(name), vw);
		}
		
	}
}
上面的代码只使用了Mapper对数据进行处理即可,把原始数据的Text格式使用分隔符进行解析输出<Text,VectorWritable>对应<标识,样本向量>,贝叶斯算法处理的数据格式是VectorWritable的,所以要进行转换。其中的解析符号是根据传入的参数进行设置的。如果要单独运行该类,传入的参数如下:

usage: <command> [Generic Options] [Job-Specific Options]
Generic Options:
 -archives <paths>              comma separated archives to be unarchived
                                on the compute machines.
 -conf <configuration file>     specify an application configuration file
 -D <property=value>            use value for given property
 -files <paths>                 comma separated files to be copied to the
                                map reduce cluster
 -fs <local|namenode:port>      specify a namenode
 -jt <local|jobtracker:port>    specify a job tracker
 -libjars <paths>               comma separated jar files to include in
                                the classpath.
 -tokenCacheFile <tokensFile>   name of the file with the tokens
Job-Specific Options:                                                           
  --input (-i) input                                    Path to job input       
                                                        directory.              
  --output (-o) output                                  The directory pathname  
                                                        for output.             
  --splitCharacterVector (-scv) splitCharacterVector    Vector split            
                                                        character,default is    
                                                        ','                     
  --splitCharacterLabel (-scl) splitCharacterLabel      Vector and Label split  
                                                        character,default is    
                                                        ':'                     
  --help (-h)                                           Print out help          
  --tempDir tempDir                                     Intermediate output     
                                                        directory               
  --startPhase startPhase                               First phase to run      
  --endPhase endPhase                                   Last phase to run 
其中-scv和-scl参数是自己加的,其他参考mahout中的AbstractJob的默认设置;

2.转换标识

这一步的主要操作是把输入文件的所有标识全部读取出来,然后进行转换,转换为数值型,代码如下:

package mahout.fansy.bayes;

import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.mahout.common.Pair;
import org.apache.mahout.common.iterator.sequencefile.PathFilters;
import org.apache.mahout.common.iterator.sequencefile.PathType;
import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable;

import com.google.common.io.Closeables;

public class WriteIndexLabel {

	/**
	 * @param args
	 * @throws IOException 
	 */
	public static void main(String[] args) throws IOException {
		String inputPath="hdfs://ubuntu:9000/user/mahout/output_bayes/part-m-00000";
		String labPath="hdfs://ubuntu:9000/user/mahout/output_bayes/index.bin";
		Configuration conf=new Configuration();
		conf.set("mapred.job.tracker", "ubuntu:9001");
		long t=writeLabelIndex(inputPath,labPath,conf);
		System.out.println(t);
	}
	/**
	 * 从输入文件中读出全部标识,并加以转换,然后写入文件
	 * @param inputPath
	 * @param labPath
	 * @param conf
	 * @return
	 * @throws IOException
	 */
	public static long writeLabelIndex(String inputPath,String labPath,Configuration conf) throws IOException{
		long labelSize=0;
		Path p=new Path(inputPath);
		Path lPath=new Path(labPath);
		SequenceFileDirIterable<Text, IntWritable> iterable =
	              new SequenceFileDirIterable<Text, IntWritable>(p, PathType.LIST, PathFilters.logsCRCFilter(), conf);
		labelSize = writeLabel(conf, lPath, iterable);
		return labelSize;
	}
	
	/**
	 * 把数字和标识的映射写入文件
	 * @param conf
	 * @param indexPath
	 * @param labels
	 * @return
	 * @throws IOException
	 */
	public static long writeLabel(Configuration conf,Path indexPath,Iterable<Pair<Text,IntWritable>> labels) throws IOException{
		FileSystem fs = FileSystem.get(indexPath.toUri(), conf);
	    SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, indexPath, Text.class, IntWritable.class);
	    Collection<String> seen = new HashSet<String>();
	    int i = 0;
	    try {
	      for (Object label : labels) {
	        String theLabel = ((Pair<?,?>) label).getFirst().toString();
	        if (!seen.contains(theLabel)) {
	          writer.append(new Text(theLabel), new IntWritable(i++));
	          seen.add(theLabel);
	        }
	      }
	    } finally {
	      Closeables.closeQuietly(writer);
	    }
	    System.out.println("labels number is : "+i);
	    return i;
	}
}

这一步要返回一个参数,即标识的一共个数,用于后面的处理需要。

3. 获得贝叶斯模型属性值1:

这个相当于 TrainNaiveBayesJob的第一个prepareJob,本来是可以直接使用mahout中的mapper和reducer的,但是其中mapper关于key的解析和我使用的不同,所以解析也不同,所以这一步骤的mapper可以认为就是TrainNaiveBayesJob中第一个prepareJob的mapper,只是做了很少的修改。此步骤的代码如下:

package mahout.fansy.bayes;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.classifier.naivebayes.BayesUtils;
import org.apache.mahout.common.AbstractJob;
import org.apache.mahout.common.HadoopUtil;
import org.apache.mahout.common.mapreduce.VectorSumReducer;
import org.apache.mahout.math.VectorWritable;
import org.apache.mahout.math.map.OpenObjectIntHashMap;
/**
 * 贝叶斯算法第一个job任务相当于 TrainNaiveBayesJob的第一个prepareJob
 * 只用修改Mapper即可,Reducer还用原来的
 * @author Administrator
 *
 */
public class BayesJob1 extends AbstractJob {
	/**
	 * @param args
	 * @throws Exception 
	 */
	public static void main(String[] args) throws Exception {
		ToolRunner.run(new Configuration(), new BayesJob1(),args);
	}
	
	@Override
	public int run(String[] args) throws Exception {
		addInputOption();
	    addOutputOption();
	    addOption("labelIndex","li", "The path to store the label index in");
	    if (parseArguments(args) == null) {
		      return -1;
		}
	    Path input = getInputPath();
	    Path output = getOutputPath();
	    String labelPath=getOption("labelIndex");
	    Configuration conf=getConf();
	    HadoopUtil.cacheFiles(new Path(labelPath), getConf());
	    HadoopUtil.delete(conf, output);
	    Job job=new Job(conf);
	    job.setJobName("job1 get scoreFetureAndLabel by input:"+input.getName());
	    job.setJarByClass(BayesJob1.class); 
	    
	    job.setInputFormatClass(SequenceFileInputFormat.class);
	    job.setOutputFormatClass(SequenceFileOutputFormat.class);
	    
	    job.setMapperClass(BJMapper.class);
	    job.setMapOutputKeyClass(IntWritable.class);
	    job.setMapOutputValueClass(VectorWritable.class);
	    job.setCombinerClass(VectorSumReducer.class);
	    job.setReducerClass(VectorSumReducer.class);
	    job.setOutputKeyClass(IntWritable.class);
	    job.setOutputValueClass(VectorWritable.class);
	    SequenceFileInputFormat.setInputPaths(job, input);
	    SequenceFileOutputFormat.setOutputPath(job, output);
	    
	    if(job.waitForCompletion(true)){
	    	return 0;
	    }
		return -1;
	}
	/**
	 * 自定义Mapper,只是解析的地方有改动而已
	 * @author Administrator
	 *
	 */
	public static class BJMapper extends Mapper<Text, VectorWritable, IntWritable, VectorWritable>{
		public enum Counter { SKIPPED_INSTANCES }

		  private OpenObjectIntHashMap<String> labelIndex;

		  @Override
		  protected void setup(Context ctx) throws IOException, InterruptedException {
		    super.setup(ctx);
		    labelIndex = BayesUtils.readIndexFromCache(ctx.getConfiguration()); //
		  }

		  @Override
		  protected void map(Text labelText, VectorWritable instance, Context ctx) throws IOException, InterruptedException {
		    String label = labelText.toString(); 
		    if (labelIndex.containsKey(label)) {
		      ctx.write(new IntWritable(labelIndex.get(label)), instance);
		    } else {
		      ctx.getCounter(Counter.SKIPPED_INSTANCES).increment(1);
		    }
		  }
	}

}
如果要单独使用此类,可以参考下面的调用方式:

usage: <command> [Generic Options] [Job-Specific Options]
Generic Options:
 -archives <paths>              comma separated archives to be unarchived
                                on the compute machines.
 -conf <configuration file>     specify an application configuration file
 -D <property=value>            use value for given property
 -files <paths>                 comma separated files to be copied to the
                                map reduce cluster
 -fs <local|namenode:port>      specify a namenode
 -jt <local|jobtracker:port>    specify a job tracker
 -libjars <paths>               comma separated jar files to include in
                                the classpath.
 -tokenCacheFile <tokensFile>   name of the file with the tokens
Job-Specific Options:                                                           
  --input (-i) input               Path to job input directory.                 
  --output (-o) output             The directory pathname for output.           
  --labelIndex (-li) labelIndex    The path to store the label index in         
  --help (-h)                      Print out help                               
  --tempDir tempDir                Intermediate output directory                
  --startPhase startPhase          First phase to run                           
  --endPhase endPhase              Last phase to run 
其中的-li参数是自己加的,其实就是第2步骤中求得的标识的总个数,其他参考AbstractJob默认参数。


分享,成长,快乐

转载请注明blog地址: http://blog.csdn.net/fansy1990



作者:fansy1990 发表于2013-9-14 14:28:04 原文链接
阅读:106 评论:0 查看评论

相关 [mahout 算法 开发] 推荐:

mahout贝叶斯算法开发思路(拓展篇)2

- - CSDN博客云计算推荐文章
如果想直接下面算法调用包,可以直接在 mahout贝叶斯算法拓展下载,该算法调用的方式如下:. 这一步骤相当于 TrainNaiveBayesJob的第二个prepareJob,其中mapper和reducer都是参考这个job的,基本没有修改代码;代码如下:. 其实也就是设置一个标识的个数而已,其他参考AbstractJob的默认参数;.

mahout贝叶斯算法开发思路(拓展篇)1

- - CSDN博客云计算推荐文章
首先说明一点,此篇blog解决的问题是就下面的数据如何应用mahout中的贝叶斯算法. 完结篇)blog最后留的问题,如果想直接使用该工具,可以在 mahout贝叶斯算法拓展下载):. 前篇blog上面的数据在最后的空格使用冒号代替(因为样本向量和标识的解析需要不同的解析符号,同一个的话解析就会出问题).

Mahout: SVDRecommender SVD推荐算法

- -

Mahout实现的机器学习算法

- - ITeye博客
使用命令:mahout -h.   在Mahout实现的机器学习算法见下表:. EM聚类(期望最大化聚类). 并行FP Growth算法. 并行化了Watchmaker框架. 非Map-Reduce算法. 扩展了java的Collections类. Mahout最大的优点就是基于hadoop实现,把很多以前运行于单机上的算法,转化为了MapReduce模式,这样大大提升了算法可处理的数据量和处理性能.

[转]Mahout推荐算法基础

- - 小鸥的博客
Mahout推荐算法分为以下几大类. 2.相近的用户定义与数量. 2.用户数较少时计算速度快. 1.基于item的相似度. 1.item较少时就算速度更快. 2.当item的外部概念易于理解和获得是非常有用. 1基于SlopeOne算法(打分差异规则). 当item数目十分少了也很有效. 需要限制diffs的存储数目否则内存增长太快.

Mahout推荐算法API详解

- - zzm
Mahout推荐算法API详解. Hadoop家族系列文章, 主要介绍Hadoop家族产品,常用的项目包括Hadoop, Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增加的项目包括,YARN, Hcatalog, Oozie, Cassandra, Hama, Whirr, Flume, Bigtop, Crunch, Hue等.

Mahout实现的机器学习算法一览表

- - ITeye博客
         Apache Mahout 是 ApacheSoftware Foundation (ASF) 旗下的一个开源项目,提供一些可扩展的机器学习领域经典算法的实现,旨在帮助开发人员更加方便快捷地创建智能应用程序,并且,在 Mahout 的最近版本中还加入了对Apache Hadoop 的支持,使这些算法可以更高效的运行在云计算环境中.

利用Mahout实现在Hadoop上运行K-Means算法

- - CSDN博客云计算推荐文章
    K-Means算法是基于分划分的最基本的聚类算法,是学习机器学习、数据挖掘等技术的最基本的 知识,所以掌握其运行原理是很重要的.     转载请注明出处:  http://hanlaiming.freetzi.com/?p=144.      一、介绍Mahout.     Mahout是Apache下的开源机器学习软件包,目前实现的机器学习算法主要包含有 协同过滤/推荐引擎, 聚类和 分类三个部分.

【甘道夫】Mahout推荐算法编程实践

- - CSDN博客云计算推荐文章
Taste是曾经风靡一时的推荐算法框架,后来被并入Mahout中,Mahout的部分推荐算法基于Taste实现. 下文介绍基于Taste实现最常用的UserCF和ItemCF. 本文不涉及UserCF和ItemCF算法的介绍,这方面网上资料很多,本文仅介绍如何基于Mahout编程实现. UserCF和ItemCF算法的输入数据是用户偏好,用户偏好数据可以有两种形式:.

Mahout介绍

- - 互联网 - ITeye博客
Mahout 是机器学习和数据挖掘的一个分布式框架,区别于其他的开源数据挖掘软件,它是基于hadoop之上的; 所以hadoop的优势就是Mahout的优势. http://mahout.apache.org/ 上说的Scalable就是指hadoop的可扩展性. Mahout用map-reduce实现了部分数据挖掘算法,解决了并行挖掘的问题.