关联规则二项集hadoop实现
- - CSDN博客推荐文章近期看mahout的关联规则源码,颇为头痛,本来打算写一个系列分析关联规则的源码的,但是后面看到有点乱了,可能是稍微有点复杂吧,所以就打算先实现最简单的二项集关联规则. 算法的思想还是参考上次的图片:. 针对原始输入计算每个项目出现的次数;. 按出现次数从大到小(排除出现次数小于阈值的项目)生成frequence list file;.
近期看mahout的关联规则源码,颇为头痛,本来打算写一个系列分析关联规则的源码的,但是后面看到有点乱了,可能是稍微有点复杂吧,所以就打算先实现最简单的二项集关联规则。
算法的思想还是参考上次的图片:
这里实现分为五个步骤:
第一步的实现:包括步骤1和步骤2,代码如下:
GetFlist.java:
package org.fansy.date1108.fpgrowth.twodimension; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.util.ArrayList; import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.PriorityQueue; import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; // the specific comparator class MyComparator implements Comparator<String>{ private String splitter=","; public MyComparator(String splitter){ this.splitter=splitter; } @Override public int compare(String o1, String o2) { // TODO Auto-generated method stub String[] str1=o1.toString().split(splitter); String[] str2=o2.toString().split(splitter); int num1=Integer.parseInt(str1[1]); int num2=Integer.parseInt(str2[1]); if(num1>num2){ return -1; }else if(num1<num2){ return 1; }else{ return str1[0].compareTo(str2[0]); } } } public class GetFList { /** * the program is based on the picture */ // Mapper public static class MapperGF extends Mapper<LongWritable ,Text ,Text,IntWritable>{ private Pattern splitter=Pattern.compile("[ ]*[ ,|\t]"); private final IntWritable newvalue=new IntWritable(1); public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{ String [] items=splitter.split(value.toString()); for(String item:items){ context.write(new Text(item), newvalue); } } } // Reducer public static class ReducerGF extends Reducer<Text,IntWritable,Text ,IntWritable>{ public void reduce(Text key,Iterable<IntWritable> value,Context context) throws IOException, InterruptedException{ int temp=0; for(IntWritable v:value){ temp+=v.get(); } context.write(key, new IntWritable(temp)); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // TODO Auto-generated method stub if(args.length!=3){ System.out.println("Usage: <input><output><min_support>"); System.exit(1); } String input=args[0]; String output=args[1]; int minSupport=0; try { minSupport=Integer.parseInt(args[2]); } catch (NumberFormatException e) { // TODO Auto-generated catch block minSupport=3; } Configuration conf=new Configuration(); String temp=args[1]+"_temp"; Job job=new Job(conf,"the get flist job"); job.setJarByClass(GetFList.class); job.setMapperClass(MapperGF.class); job.setCombinerClass(ReducerGF.class); job.setReducerClass(ReducerGF.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, new Path(input)); FileOutputFormat.setOutputPath(job, new Path(temp)); boolean succeed=job.waitForCompletion(true); if(succeed){ // read the temp output and write the data to the final output List<String> list=readFList(temp+"/part-r-00000",minSupport); System.out.println("the frequence list has generated ... "); // generate the frequence file generateFList(list,output); System.out.println("the frequence file has generated ... "); }else{ System.out.println("the job is failed"); System.exit(1); } } // read the temp_output and return the frequence list public static List<String> readFList(String input,int minSupport) throws IOException{ // read the hdfs file Configuration conf=new Configuration(); Path path=new Path(input); FileSystem fs=FileSystem.get(path.toUri(),conf); FSDataInputStream in1=fs.open(path); PriorityQueue<String> queue=new PriorityQueue<String>(15,new MyComparator("\t")); InputStreamReader isr1=new InputStreamReader(in1); BufferedReader br=new BufferedReader(isr1); String line; while((line=br.readLine())!=null){ int num=0; try { num=Integer.parseInt(line.split("\t")[1]); } catch (NumberFormatException e) { // TODO Auto-generated catch block num=0; } if(num>minSupport){ queue.add(line); } } br.close(); isr1.close(); in1.close(); List<String> list=new ArrayList<String>(); while(!queue.isEmpty()){ list.add(queue.poll()); } return list; } // generate the frequence file public static void generateFList(List<String> list,String output) throws IOException{ Configuration conf=new Configuration(); Path path=new Path(output); FileSystem fs=FileSystem.get(path.toUri(),conf); FSDataOutputStream writer=fs.create(path); Iterator<String> i=list.iterator(); while(i.hasNext()){ writer.writeBytes(i.next()+"\n");// in the last line add a \n which is not supposed to exist } writer.close(); } }步骤1的实现其实就是最简单的wordcount程序的实现,在步骤2中涉及到HDFS文件的读取以及写入。在生成frequence list file时排序时用到了PriorityQueue类,同时自定义了一个类用来定义排序规则;
第二步:步骤3,代码如下:
SortAndCut.java:
package org.fansy.date1108.fpgrowth.twodimension; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashSet; import java.util.Set; import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class SortAndCut { /** * sort and cut the items */ public static class M extends Mapper<LongWritable,Text,NullWritable,Text>{ private LinkedHashSet<String> list=new LinkedHashSet<String>(); private Pattern splitter=Pattern.compile("[ ]*[ ,|\t]"); public void setup(Context context) throws IOException{ String input=context.getConfiguration().get("FLIST"); FileSystem fs=FileSystem.get(URI.create(input),context.getConfiguration()); Path path=new Path(input); FSDataInputStream in1=fs.open(path); InputStreamReader isr1=new InputStreamReader(in1); BufferedReader br=new BufferedReader(isr1); String line; while((line=br.readLine())!=null){ String[] str=line.split("\t"); if(str.length>0){ list.add(str[0]); } } } // map public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{ String [] items=splitter.split(value.toString()); Set<String> set=new HashSet<String>(); set.clear(); for(String s:items){ set.add(s); } Iterator<String> iter=list.iterator(); StringBuffer sb=new StringBuffer(); sb.setLength(0); int num=0; while(iter.hasNext()){ String item=iter.next(); if(set.contains(item)){ sb.append(item+","); num++; } } if(num>0){ context.write(NullWritable.get(), new Text(sb.toString())); } } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // TODO Auto-generated method stub if(args.length!=3){ System.out.println("Usage: <input><output><fListPath>"); System.exit(1); } String input=args[0]; String output=args[1]; String fListPath=args[2]; Configuration conf=new Configuration(); conf.set("FLIST", fListPath); Job job=new Job(conf,"the sort and cut the items job"); job.setJarByClass(SortAndCut.class); job.setMapperClass(M.class); job.setNumReduceTasks(0); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job, new Path(input)); FileOutputFormat.setOutputPath(job, new Path(output)); boolean succeed=job.waitForCompletion(true); if(succeed){ System.out.println(job.getJobName()+" succeed ... "); } } }在本阶段的Mapper的setup中读取frequence file到一个LinkedHashSet(可以保持原始的插入顺序)中,然后在map中针对一个事务输出这个LinkedHashSet,不过限制输出是在这个事务中出现的项目而已。
第三步:步骤4和步骤5,代码如下:
OutRules.java
package org.fansy.date1108.fpgrowth.twodimension; import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import java.util.Map.Entry; import java.util.Stack; import java.util.TreeSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class OutRules { public static class M extends Mapper<LongWritable,Text,Text,Text>{ public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{ String str=value.toString(); String[] s=str.split(","); if(s.length<=1){ return; } Stack<String> stack=new Stack<String>(); for(int i=0;i<s.length;i++){ stack.push(s[i]); } int num=str.length(); while(stack.size()>1){ num=num-2; context.write(new Text(stack.pop()),new Text(str.substring(0,num))); } } } // Reducer public static class R extends Reducer<Text ,Text,Text,Text>{ private int minConfidence=0; public void setup(Context context){ String str=context.getConfiguration().get("MIN"); try { minConfidence=Integer.parseInt(str); } catch (NumberFormatException e) { // TODO Auto-generated catch block minConfidence=3; } } public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{ HashMap<String,Integer> hm=new HashMap<String ,Integer>(); for(Text v:values){ String[] str=v.toString().split(","); for(int i=0;i<str.length;i++){ if(hm.containsKey(str[i])){ int temp=hm.get(str[i]); hm.put(str[i], temp+1); }else{ hm.put(str[i], 1); } } } // end of for TreeSet<String> sss=new TreeSet<String>(new MyComparator(" ")); Iterator<Entry<String,Integer>> iter=hm.entrySet().iterator(); while(iter.hasNext()){ Entry<String,Integer> k=iter.next(); if(k.getValue()>minConfidence&&!key.toString().equals(k.getKey())){ sss.add(k.getKey()+" "+k.getValue()); } } Iterator<String> iters=sss.iterator(); StringBuffer sb=new StringBuffer(); while(iters.hasNext()){ sb.append(iters.next()+"|"); } context.write(key, new Text(":\t"+sb.toString())); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // TODO Auto-generated method stub if(args.length!=3){ System.out.println("Usage: <input><output><min_confidence>"); System.exit(1); } String input=args[0]; String output=args[1]; String minConfidence=args[2]; Configuration conf=new Configuration(); conf.set("MIN", minConfidence); Job job=new Job(conf,"the out rules job"); job.setJarByClass(OutRules.class); job.setMapperClass(M.class); job.setNumReduceTasks(1); job.setReducerClass(R.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job, new Path(input)); FileOutputFormat.setOutputPath(job, new Path(output)); boolean succeed=job.waitForCompletion(true); if(succeed){ System.out.println(job.getJobName()+" succeed ... "); } } }在map阶段使用了Stack 和字符串操作实现类似下面的功能:
input:p,x,z,y,a,b output: b:p,x,z,y,a a:p,x,z,y y:p,x,z z:p,x x:p在reduce阶段只是统计下项目出现的次数而已,用到了一个HashMap,又如果输出是根据项目出现的次数从大到小的一个排序那就更好了,所以又用到了TreeSet.
其中上面所有的输出文件中的格式都只是拼串而已,所以其中的格式可以按照自己的要求进行更改。
比如,我的输出如下:
0 : 39 125|48 99|32 44|41 37|38 26|310 17|5 16|65 14|1 13|89 13|1144 12|225 12|60 11|604 11| 1327 10|237 10|101 9|147 9|270 9|533 9|9 9|107 8|11 8|117 8|170 8|271 8|334 8|549 8|62 8|812 8|10 7| 1067 7|12925 7|23 7|255 7|279 7|548 7|783 7|14098 6|2 6|208 6|22 6|36 6|413 6|789 6|824 6|961 6|110 5| 120 5|12933 5|201 5|2238 5|2440 5|2476 5|251 5|286 5|2879 5|3 5|4105 5|415 5|438 5|467 5|475 5|479 5|49 5| 592 5|675 5|715 5|740 5|791 5|830 5|921 5|9555 5|976 5|979 5|1001 4|1012 4|1027 4|1055 4|1146 4|12 4|13334 4| 136 4|1393 4|16 4|1600 4|165 4|167 4|1819 4|1976 4|2051 4|2168 4|2215 4|2284 4|2353 4|2524 4|261 4|267 4|269 4| 27 4|2958 4|297 4|3307 4|338 4|420 4|4336 4|4340 4|488 4|4945 4|5405 4|58 4|589 4|75 4|766 4|795 4|809 4|880 4|8978 4|916 4|94 4|956 4|冒号前面是项目,后面的39是项目再后面是<0,39>出现的次数,即125次,<0,48>出现的次数是99次;
总结,mahout的源代码确实比较难啃,所以要先对算法非常熟悉,然后去看源码的话 应该会比较容易点;
分享,快乐,成长