关联规则二项集hadoop实现

标签: 关联规则 hadoop | 发表时间:2012-11-08 10:20 | 作者:fansy1990
出处:http://blog.csdn.net

近期看mahout的关联规则源码,颇为头痛,本来打算写一个系列分析关联规则的源码的,但是后面看到有点乱了,可能是稍微有点复杂吧,所以就打算先实现最简单的二项集关联规则。

算法的思想还是参考上次的图片:

这里实现分为五个步骤:

  1. 针对原始输入计算每个项目出现的次数;
  2. 按出现次数从大到小(排除出现次数小于阈值的项目)生成frequence list file;
  3. 针对原始输入的事务进行按frequence list file进行排序并剪枝;
  4. 生成二项集规则;
  5. 计算二项集规则出现的次数,并删除小于阈值的二项集规则;

第一步的实现:包括步骤1和步骤2,代码如下:

GetFlist.java:

package org.fansy.date1108.fpgrowth.twodimension;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.PriorityQueue;
import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

//  the specific comparator
class MyComparator implements Comparator<String>{
	private String splitter=",";
	public MyComparator(String splitter){
		this.splitter=splitter;
	}
	@Override
	public int compare(String o1, String o2) {
		// TODO Auto-generated method stub
		String[] str1=o1.toString().split(splitter);
		String[] str2=o2.toString().split(splitter);
		int num1=Integer.parseInt(str1[1]);
		int num2=Integer.parseInt(str2[1]);
		if(num1>num2){
			return -1;
		}else if(num1<num2){
			return 1;
		}else{
			return str1[0].compareTo(str2[0]);
		}
	}
}

public class GetFList {
	/**
	 *  the program is based on the picture 
	 */
	// Mapper
	public static class  MapperGF extends Mapper<LongWritable ,Text ,Text,IntWritable>{
		private Pattern splitter=Pattern.compile("[ ]*[ ,|\t]");
		private final IntWritable newvalue=new IntWritable(1);
		public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
			String [] items=splitter.split(value.toString());
			for(String item:items){
				context.write(new Text(item), newvalue);
			}
		}
	}
	// Reducer
	public static class ReducerGF extends Reducer<Text,IntWritable,Text ,IntWritable>{
		public void reduce(Text key,Iterable<IntWritable> value,Context context) throws IOException, InterruptedException{
			int temp=0;
			for(IntWritable v:value){
				temp+=v.get();
			}
			context.write(key, new IntWritable(temp));
		}
	}
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		// TODO Auto-generated method stub

		if(args.length!=3){
			System.out.println("Usage: <input><output><min_support>");
			System.exit(1);
		}
		String input=args[0];
		String output=args[1];
		int minSupport=0;
		try {
			minSupport=Integer.parseInt(args[2]);
		} catch (NumberFormatException e) {
			// TODO Auto-generated catch block
			minSupport=3;
		}
		Configuration conf=new Configuration();
		String temp=args[1]+"_temp";
		Job job=new Job(conf,"the get flist job");
		job.setJarByClass(GetFList.class);
		job.setMapperClass(MapperGF.class);
		job.setCombinerClass(ReducerGF.class);
		job.setReducerClass(ReducerGF.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);		
		FileInputFormat.setInputPaths(job, new Path(input));
		FileOutputFormat.setOutputPath(job, new Path(temp));		
		boolean succeed=job.waitForCompletion(true);
		if(succeed){		
			//  read the temp output and write the data to the final output
			List<String> list=readFList(temp+"/part-r-00000",minSupport);
			System.out.println("the frequence list has generated ... ");
			// generate the frequence file
			generateFList(list,output);
			System.out.println("the frequence file has generated ... ");
		}else{
			System.out.println("the job is failed");
			System.exit(1);
		}				
	}
	//  read the temp_output and return the frequence list
	public static List<String> readFList(String input,int minSupport) throws IOException{
		// read the hdfs file
		Configuration conf=new Configuration();
		Path path=new Path(input);
	       FileSystem fs=FileSystem.get(path.toUri(),conf);
		FSDataInputStream in1=fs.open(path);
		PriorityQueue<String> queue=new PriorityQueue<String>(15,new MyComparator("\t"));	
		InputStreamReader isr1=new InputStreamReader(in1);
		BufferedReader br=new BufferedReader(isr1);
		String line;
		while((line=br.readLine())!=null){
			int num=0;
			try {
					num=Integer.parseInt(line.split("\t")[1]);
			} catch (NumberFormatException e) {
				// TODO Auto-generated catch block
				num=0;
			}
			if(num>minSupport){
				queue.add(line);
			}
		}
		br.close();
		isr1.close();
		in1.close();
		List<String> list=new ArrayList<String>();
		while(!queue.isEmpty()){
			list.add(queue.poll());
		}
		return list;
	}
	// generate the frequence file
	public static void generateFList(List<String> list,String output) throws IOException{
		Configuration conf=new Configuration();
		Path path=new Path(output);
		FileSystem fs=FileSystem.get(path.toUri(),conf);
		FSDataOutputStream writer=fs.create(path);
		Iterator<String> i=list.iterator();
		while(i.hasNext()){
			writer.writeBytes(i.next()+"\n");//  in the last line add a \n which is not supposed to exist
		}
		writer.close();
	}
}
步骤1的实现其实就是最简单的wordcount程序的实现,在步骤2中涉及到HDFS文件的读取以及写入。在生成frequence list file时排序时用到了PriorityQueue类,同时自定义了一个类用来定义排序规则;

第二步:步骤3,代码如下:

SortAndCut.java:

package org.fansy.date1108.fpgrowth.twodimension;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SortAndCut {
	/**
	 *  sort and cut the items
	 */	
	public static class M extends Mapper<LongWritable,Text,NullWritable,Text>{
		private LinkedHashSet<String> list=new LinkedHashSet<String>();
		private Pattern splitter=Pattern.compile("[ ]*[ ,|\t]");
		
		public void setup(Context context) throws IOException{
			String input=context.getConfiguration().get("FLIST");
			 FileSystem fs=FileSystem.get(URI.create(input),context.getConfiguration());
				Path path=new Path(input);
				FSDataInputStream in1=fs.open(path);
				InputStreamReader isr1=new InputStreamReader(in1);
				BufferedReader br=new BufferedReader(isr1);
				String line;
				while((line=br.readLine())!=null){
					String[] str=line.split("\t");
					if(str.length>0){
						list.add(str[0]);
					}
				}
		}
		// map
		public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
			String [] items=splitter.split(value.toString());
			Set<String> set=new HashSet<String>();
			set.clear();
			for(String s:items){
				set.add(s);
			}
			Iterator<String> iter=list.iterator();
			StringBuffer sb=new StringBuffer();
			sb.setLength(0);
			int num=0;
			while(iter.hasNext()){
				String item=iter.next();
				if(set.contains(item)){
					sb.append(item+",");
					num++;
				}
			}
			if(num>0){
				context.write(NullWritable.get(), new Text(sb.toString()));
			}
		}
	}
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		// TODO Auto-generated method stub
		if(args.length!=3){
			System.out.println("Usage: <input><output><fListPath>");
			System.exit(1);
		}
		String input=args[0];
		String output=args[1];
		String fListPath=args[2];
		Configuration conf=new Configuration();
		conf.set("FLIST", fListPath);
		Job job=new Job(conf,"the sort and cut  the items  job");
		job.setJarByClass(SortAndCut.class);
		job.setMapperClass(M.class);
		job.setNumReduceTasks(0);
		job.setOutputKeyClass(NullWritable.class);
		job.setOutputValueClass(Text.class);	
		FileInputFormat.setInputPaths(job, new Path(input));
		FileOutputFormat.setOutputPath(job, new Path(output));	
		boolean succeed=job.waitForCompletion(true);
		if(succeed){
			System.out.println(job.getJobName()+" succeed ... ");
		}
	}
}
在本阶段的Mapper的setup中读取frequence file到一个LinkedHashSet(可以保持原始的插入顺序)中,然后在map中针对一个事务输出这个LinkedHashSet,不过限制输出是在这个事务中出现的项目而已。

第三步:步骤4和步骤5,代码如下:

OutRules.java

package org.fansy.date1108.fpgrowth.twodimension;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.Stack;
import java.util.TreeSet;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class OutRules {
	
	public static class M extends Mapper<LongWritable,Text,Text,Text>{
		public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
			String str=value.toString();
			String[] s=str.split(",");
			if(s.length<=1){
				return;
			}
			Stack<String> stack=new Stack<String>();
			for(int i=0;i<s.length;i++){
				stack.push(s[i]);
			}
			int num=str.length();
			while(stack.size()>1){
				num=num-2;
				context.write(new Text(stack.pop()),new Text(str.substring(0,num)));
			}
		}
	}
	// Reducer
	public static class R extends Reducer<Text ,Text,Text,Text>{
		private int minConfidence=0;
		public void setup(Context context){
			String str=context.getConfiguration().get("MIN");
			try {
				minConfidence=Integer.parseInt(str);
			} catch (NumberFormatException e) {
				// TODO Auto-generated catch block
				minConfidence=3;
			}
		}
		public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
			HashMap<String,Integer> hm=new HashMap<String ,Integer>();
			for(Text v:values){
				String[] str=v.toString().split(",");
				for(int i=0;i<str.length;i++){
					if(hm.containsKey(str[i])){
						int temp=hm.get(str[i]);
						hm.put(str[i], temp+1);
					}else{
						hm.put(str[i], 1);
					}
				}
			}
			//  end of for
			TreeSet<String> sss=new TreeSet<String>(new MyComparator(" "));
			Iterator<Entry<String,Integer>> iter=hm.entrySet().iterator();
			while(iter.hasNext()){
				Entry<String,Integer> k=iter.next();
				if(k.getValue()>minConfidence&&!key.toString().equals(k.getKey())){
					sss.add(k.getKey()+" "+k.getValue());
				}
			}
			Iterator<String> iters=sss.iterator();
			StringBuffer sb=new StringBuffer();
			while(iters.hasNext()){
				sb.append(iters.next()+"|");
			}
			context.write(key, new Text(":\t"+sb.toString()));
		}
	}
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		// TODO Auto-generated method stub
		if(args.length!=3){
			System.out.println("Usage: <input><output><min_confidence>");
			System.exit(1);
		}
		String input=args[0];
		String output=args[1];
		String minConfidence=args[2];	
		Configuration conf=new Configuration();
		conf.set("MIN", minConfidence);
		Job job=new Job(conf,"the out rules   job");
		job.setJarByClass(OutRules.class);
		job.setMapperClass(M.class);
		job.setNumReduceTasks(1);
		job.setReducerClass(R.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		FileInputFormat.setInputPaths(job, new Path(input));
		FileOutputFormat.setOutputPath(job, new Path(output));	
		boolean succeed=job.waitForCompletion(true);
		if(succeed){
			System.out.println(job.getJobName()+" succeed ... ");
		}
	}
}
在map阶段使用了Stack 和字符串操作实现类似下面的功能:
input:p,x,z,y,a,b
output:
b:p,x,z,y,a
a:p,x,z,y
y:p,x,z
z:p,x
x:p
在reduce阶段只是统计下项目出现的次数而已,用到了一个HashMap,又如果输出是根据项目出现的次数从大到小的一个排序那就更好了,所以又用到了TreeSet.

其中上面所有的输出文件中的格式都只是拼串而已,所以其中的格式可以按照自己的要求进行更改。

比如,我的输出如下:

0	:	39 125|48 99|32 44|41 37|38 26|310 17|5 16|65 14|1 13|89 13|1144 12|225 12|60 11|604 11|
1327 10|237 10|101 9|147 9|270 9|533 9|9 9|107 8|11 8|117 8|170 8|271 8|334 8|549 8|62 8|812 8|10 7|
1067 7|12925 7|23 7|255 7|279 7|548 7|783 7|14098 6|2 6|208 6|22 6|36 6|413 6|789 6|824 6|961 6|110 5|
120 5|12933 5|201 5|2238 5|2440 5|2476 5|251 5|286 5|2879 5|3 5|4105 5|415 5|438 5|467 5|475 5|479 5|49 5|
592 5|675 5|715 5|740 5|791 5|830 5|921 5|9555 5|976 5|979 5|1001 4|1012 4|1027 4|1055 4|1146 4|12 4|13334 4|
136 4|1393 4|16 4|1600 4|165 4|167 4|1819 4|1976 4|2051 4|2168 4|2215 4|2284 4|2353 4|2524 4|261 4|267 4|269 4|
27 4|2958 4|297 4|3307 4|338 4|420 4|4336 4|4340 4|488 4|4945 4|5405 4|58 4|589 4|75 4|766 4|795 4|809 4|880 4|8978 4|916 4|94 4|956 4|
冒号前面是项目,后面的39是项目再后面是<0,39>出现的次数,即125次,<0,48>出现的次数是99次;

总结,mahout的源代码确实比较难啃,所以要先对算法非常熟悉,然后去看源码的话 应该会比较容易点;



分享,快乐,成长






作者:fansy1990 发表于2012-11-8 10:20:15 原文链接
阅读:0 评论:0 查看评论

相关 [关联规则 hadoop] 推荐:

关联规则二项集hadoop实现

- - CSDN博客推荐文章
近期看mahout的关联规则源码,颇为头痛,本来打算写一个系列分析关联规则的源码的,但是后面看到有点乱了,可能是稍微有点复杂吧,所以就打算先实现最简单的二项集关联规则. 算法的思想还是参考上次的图片:. 针对原始输入计算每个项目出现的次数;. 按出现次数从大到小(排除出现次数小于阈值的项目)生成frequence list file;.

关联规则的常用解法

- - 刘思喆@贝吉塔行星
Apriori算法是一种最有影响的挖掘 0-1 布尔关联规则频繁项集的算法. 这种算法利用了频繁项集性质的先验知识(因此叫做priori). Apriori使用了自底向上的实现方式(如果集合 I 不是频繁项集,那么包含 I 的更大的集合也不可能是频繁项集),k - 1 项集用于探索 k 项集. 首先,找出频繁 1 项集的集合($L_1$),$L_1$用于找频繁 2 项集的集合 $L_2$,而 $L_2$ 用于找 $L_3$,如此下去,直到不能找到满足条件的频繁 k 项集.

基于storm的在线关联规则

- - CSDN博客互联网推荐文章
    基于storm的在线视频推荐算法, 算法依据youtube的推荐算法  算法相对简单,可以认为是关联规则只挖掘频繁二项集. 下面给出与storm的结合实现在线实时算法 ,. 首先给出数据流图(不同颜色的线条代表不同的数据流. 在storm里面bolt也是可以声明数据流的.     关联规则挖掘数据项的时候,有事务的概念,这里的事务的定义为:给定时间窗口内用户看过的视频集.

关联规则推荐算法的原理及实现

- - 蓝鲸的网站分析笔记
关联规则用来发现数据间潜在的关联,最典型的应用是电商网站的购物车分析. 本文将通过一个简单的例子来说明关联规则中各个术语的含义以及具体的计算方法. 这是一些用户的购物数据,uid是用户的ID,后面是每个用户具体购买的商品名称,我们使用字母进行标识. 下面我们将使用关联规则对这些数据进行分析,挖掘不同商品间的联系.

Hadoop Streaming 编程

- - 学着站在巨人的肩膀上
Hadoop Streaming是Hadoop提供的一个编程工具,它允许用户使用任何可执行文件或者脚本文件作为Mapper和Reducer,例如:. 采用shell脚本语言中的一些命令作为mapper和reducer(cat作为mapper,wc作为reducer). 本文安排如下,第二节介绍Hadoop Streaming的原理,第三节介绍Hadoop Streaming的使用方法,第四节介绍Hadoop Streaming的程序编写方法,在这一节中,用C++、C、shell脚本 和python实现了WordCount作业,第五节总结了常见的问题.

Hadoop使用(一)

- Pei - 博客园-首页原创精华区
Hadoop使用主/从(Master/Slave)架构,主要角色有NameNode,DataNode,secondary NameNode,JobTracker,TaskTracker组成. 其中NameNode,secondary NameNode,JobTracker运行在Master节点上,DataNode和TaskTracker运行在Slave节点上.

Hadoop MapReduce技巧

- - 简单文本
我在使用Hadoop编写MapReduce程序时,遇到了一些问题,通过在Google上查询资料,并结合自己对Hadoop的理解,逐一解决了这些问题. Hadoop对MapReduce中Key与Value的类型是有要求的,简单说来,这些类型必须支持Hadoop的序列化. 为了提高序列化的性能,Hadoop还为Java中常见的基本类型提供了相应地支持序列化的类型,如IntWritable,LongWritable,并为String类型提供了Text类型.

Hadoop TaskScheduler浅析

- - kouu&#39;s home
TaskScheduler,顾名思义,就是MapReduce中的任务调度器. 在MapReduce中,JobTracker接收JobClient提交的Job,将它们按InputFormat的划分以及其他相关配置,生成若干个Map和Reduce任务. 然后,当一个TaskTracker通过心跳告知JobTracker自己还有空闲的任务Slot时,JobTracker就会向其分派任务.

HADOOP安装

- - OracleDBA Blog---三少个人自留地
最近有时间看看hadoop的一些东西,而且在测试的环境上做了一些搭建的工作. 首先,安装前需要做一些准备工作. 使用一台pcserver作为测试服务器,同时使用Oracle VM VirtualBox来作为虚拟机的服务器. 新建了三个虚拟机以后,安装linux,我安装的linux的版本是redhat linux 5.4 x64版本.

Hadoop Corona介绍

- - 董的博客
Dong | 可以转载, 但必须以超链接形式标明文章原始出处和作者信息及 版权声明. 网址: http://dongxicheng.org/hadoop-corona/hadoop-corona/. Hadoop Corona是facebook开源的下一代MapReduce框架. 其基本设计动机和Apache的YARN一致,在此不再重复,读者可参考我的这篇文章 “下一代Apache Hadoop MapReduce框架的架构”.