[转]基于mapreduce的Hadoop join实现

标签: | 发表时间:2011-11-03 22:12 | 作者:HEYUTAO007
出处:http://blog.csdn.net/heyutao007
对于一个大数据的分析应用,join是必不可少的一项功能.现在很多构建与hadoop之上的应用,如Hive,PIG等在其内部实现了join程序,可以通过很简单的sql语句或者数据操控脚本完成相应的Join工作.那么join应该如何实现呢?今天我们就对join做一个简单的实现.
我们来看一个例子,现在有两组数据:一组为单位人员信息,如下:
人员ID 人员名称 地址ID
1 张三 1
2 李四 2
3 王五 1
4 赵六 3
5 马七 3
另外一组为地址信息:
地址ID 地址名称
1 北京
2 上海
3 广州
这里给出了一个很简单的例子,而且数据量很小,就这么用眼睛就能看过来的几行,当然,实际的情况可能是几十万上百万甚至上亿的数据量.要实现的功能很简单,就是将人员信息与地址信息进行join,将人员的地址ID完善成为地址名称.对于Hadoop文件系统的应用,目前看来,很多数据的存储都是基于文本的,而且都是将数据放在一个文件目录中进行处理.因此我们这里也采用这种模式来完成.
对于mapreduce程序来说,最主要的就是将要做的工作转化为map以及reduce两个部分.我们可以将地址以及人员都采用同样的数据结构来存储,通过一个flag标志来指定该数据结构里面存储的是地址信息还是人员信息.经过map后,使用地址ID作为key,将所有的具有相同地址的地址信息和人员信息放入一个key->value list数据结构中传送到reduce中进行处理.在reduce过程中,由于key是地址的ID,所以value list中只有一个是地址信息,其他的都是人员信息,因此,找到该地址信息后,其他的人员信息的地址就是该地址所指定的地址名称.

上面提到了存储人员和地址信息的数据结构,可以说这个数据结构是改程序的重要的数据载体之一.我们先来看看该数据结构:


import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;


public class Record implements WritableComparable {
int type; //数据类型的定义,1为人员,2为地址

String empName="";
String empId="";
String locId="";
String locationName="";
public Record(){
super();
}

public Record(Record record){
this.type = record.type;
this.empName = record.empName;
this.empId = record.empId;
this.locId = record.locId;
this.locationName = record.locationName;
}

public String toString(){
if(type == 1)
return empId+","+empName+","+locationName;
else if(type == 2)
return locId+","+locationName;
return "uninit data!";
}


public void readFields(DataInput in) throws IOException {
type = in.readInt();
empName = in.readUTF();
empId = in.readUTF();
locId = in.readUTF();
locationName = in.readUTF();
}


public void write(DataOutput out) throws IOException {
out.writeInt(type);
out.writeUTF(empName);
out.writeUTF(empId);
out.writeUTF(locId);
out.writeUTF(locationName);
}


public int compareTo(Object arg0) {
return 0;
}
}
上面的Record的实现了WritableComparable,对于Mapreduce的中间结果类来说,必须要实现Writable,从而在map完成输出中间结果时能够将中间结果写入到运行job的node文件系统中,至于Comparable接口的实现,对于作为Key的中间结果来说需要实现该接口,从而能够完成基于key的排序功能.
接下来是Join的主程序,就是mapreduce的主程序.基本的主程序如下:
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;


public class Join {
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
JobConf conf = new JobConf(Join.class);
conf.setJobName("Join");

FileSystem fstm = FileSystem.get(conf);
Path outDir = new Path("/Users/hadoop/outputtest");
fstm.delete(outDir, true);


conf.setOutputFormat(SequenceFileOutputFormat.class);
conf.setMapOutputValueClass(Record.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);

conf.setMapperClass(JoinMapper.class);
conf.setReducerClass(JoinReducer.class);
FileInputFormat.setInputPaths(conf, new Path(
"/user/hadoop/input/join"));
FileOutputFormat.setOutputPath(conf, outDir);


JobClient.runJob(conf);


Path outPutFile = new Path(outDir, "part-00000");
SequenceFile.Reader reader = new SequenceFile.Reader(fstm, outPutFile,
conf);
org.apache.hadoop.io.Text numInside = new Text();
LongWritable numOutside = new LongWritable();
while (reader.next(numOutside, numInside)) {
System.out.println(numInside.toString() + " "
+ numOutside.toString());
}
reader.close();
}


}
程序主体很简单,开始将输出目录删除,中间进行一系列的JobConf设定工作,将输出格式设为SequenceFile,最后读出程序结果到控制台.接下来我们看看Mapper的实现:
import java.io.IOException;


import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.io.*;


public class JoinMapper extends MapReduceBase 
implements Mapper<LongWritable, Text, LongWritable, Record> {


public void map(LongWritable key, Text value,
OutputCollector<LongWritable, Record> output, Reporter reporter)
throws IOException {
String line = value.toString();
String[] values = line.split(",");
if(values.length == 2){ //这里使用记录的长度来区别地址信息与人员信息,当然可以通过其他方式(如文件名等)来实现
Record reco = new Record();
reco.locId = values[0];
reco.type = 2;
reco.locationName = values[1];
output.collect(new LongWritable(Long.parseLong(values[0])), reco);
}else{
Record reco = new Record();
reco.empId = values[0];
reco.empName = values[1];
reco.locId = values[2];
reco.type = 1;
output.collect(new LongWritable(Long.parseLong(values[2])), reco);
}
}
}
对于maper来说,就是从输入文件中读取相应数据构造key->value(地址id->地址或者人员对象)的数据对,并交给hadoop框架完成shuffle等工作.经过hadoop框架完成suffle之后便会将具有想同地址ID的人员信息以及地址信息交给reducer来进行处理.
好啦,剩下就是最后一步了,其实也是最重要的一步就是reduce端的join工作了.还是来看看代码吧:


import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Vector;


import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;


public class JoinReducer  extends MapReduceBase implements
Reducer<LongWritable, Record, LongWritable, Text> {
public void reduce(LongWritable key, Iterator<Record> values,
            OutputCollector<LongWritable, Text> output, 
            Reporter reporter) throws IOException {
System.out.println("reducer for key "+key.toString());
Record thisLocation= new Record();
List<Record> employees= new Vector<Record>();

while (values.hasNext()){
Record reco = values.next();
if(reco.type ==  2){ //2 is the location
thisLocation = new Record(reco);
//thisLocation = reco;
System.out.println("location is "+ thisLocation.locationName);
}else{  //1 is employee
Record recoClone = new Record(reco);
employees.add(recoClone);
//employess.add(reco);
System.out.println(" employess "+ reco.toString());
}
}


for(Record e : employees){
e.locationName = thisLocation.locationName;
output.collect(new LongWritable(0), new Text(e.toString()));
}
System.out.println("+++++++++++++++");
}
}
在reducer端,我们先构造了一个地址对象,thisLocation用来保存地址信息.在reducer的迭代器values中,如果某个value是地址,就将其保存到thisLocation中.否则就将人员信息加入到List中以供后面打印.

由于在reduce端我们通过一个List数据结构保存了所有的某个外键的对应的所有人员信息,而List的最大值为Integer.MAX_VALUE,所以在数据量巨大的时候,会造成List越界的错误.所以对这个实现的优化显得很有必要.


结合第一种实现方式,我们看到第一种方式最需要改进的地方就是如果对于某个地址ID的迭代器values,如果values的第一个元素是地址信息的话,那么,我们就不需要缓存所有的人员信息了.如果第一个元素是地址信息,我们读出地址信息后,后来就全部是人员信息,那么就可以将人员的地址置为相应的地址.
现在我们回头看看mapreduce的partition和shuffle的过程,partitioner的主要功能是根据reduce的数量将map输出的结果进行分块,将数据送入到相应的reducer,所有的partitioner都必须实现Partitioner接口并实现getPartition方法,该方法的返回值为int类型,并且取值范围在0-numOfReducer-1,从而能够将map的输出输入到相应的reducer中,对于某个mapreduce过程,Hadoop框架定义了默认的partitioner为HashPartition,该Partitioner使用key的hashCode来决定将该key输送到哪个reducer;shuffle将每个partitioner输出的结果根据key进行group以及排序,将具有相同key的value构成一个valeus的迭代器,并根据key进行排序分别调用开发者定义的reduce方法进行归并.从shuffle的过程我们可以看出key之间需要进行比较,通过比较才能知道某两个key是否相等或者进行排序,因此mapduce的所有的key必须实现comparable接口的compareto()方法从而实现两个key对象之间的比较.
回到我们的问题,我们想要的是将地址信息在排序的过程中排到最前面,前面我们只通过locId进行比较的方法就不够用了,因为其无法标识出是地址表中的数据还是人员表中的数据.因此,我们需要实现自己定义的Key数据结构,完成在想共同locId的情况下地址表更小的需求.由于map的中间结果需要写到磁盘上,因此必须实现writable接口.具体实现如下:

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;


import org.apache.hadoop.io.WritableComparable;


public class RecordKey implements WritableComparable<RecordKey>{

int keyId;
boolean isPrimary;


public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
this.keyId = in.readInt();
this.isPrimary = in.readBoolean();
}


public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeInt(keyId);
out.writeBoolean(isPrimary);
}

public int compareTo(RecordKey k) {
// TODO Auto-generated method stub
if(this.keyId == k.keyId){
if(k.isPrimary == this.isPrimary)
return 0;
return this.isPrimary? -1:1;

}else
return this.keyId > k.keyId?1:-1;
}
@Override
public int hashCode() {
    return this.keyId;
}
}
这个key的数据结构中需要解释的方法就是compareTo方法,该方法完成了在keyId相同的情况下,确保地址数据比人员数据小.
有了这个数据结构,我们又发现了一个新的问题------就是shuffle的group过程,shuffle的group过程默认使用的是key的compareTo()方法.刚才我们添加的自定义Key没有办法将具有相同的locId的地址和人员放到同一个group中(因为从compareTo方法中可以看出他们是不相等的).不过hadoop框架提供了OutputValueGoupingComparator可以让使用者自定义key的group信息.我们需要的就是自己定义个groupingComparator就可以啦!看看这个比较器吧!
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;


public class PkFkComparator extends WritableComparator {


public PkFkComparator(){
super(RecordKey.class);
}
@Override
 public int compare(WritableComparable a, WritableComparable b) {
RecordKey key1 = (RecordKey)a;
RecordKey key2 = (RecordKey)b;
 System.out.println("call compare");
 if(key1.keyId == key2.keyId){
return 0;
 }else
return key1.keyId > key2.keyId?1:-1;
 }
}
这里我们重写了compare方法,将两个具有相同的keyId的数据设为相等.
好了,有了这两个辅助工具,剩下的就比较简单了.写mapper,reducer,以及主程序. 


import java.io.IOException;


import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;


import org.apache.hadoop.io.*;


public class JoinMapper extends MapReduceBase 
implements Mapper<LongWritable, Text, RecordKey, Record> {


public void map(LongWritable key, Text value,
OutputCollector<RecordKey, Record> output, Reporter reporter)
throws IOException {
String line = value.toString();
String[] values = line.split(",");

if(values.length == 2){ //这里使用记录的长度来区别地址信息与人员信息,当然可以通过其他方式(如文件名等)来实现
Record reco = new Record();
reco.locId = values[0];
reco.type = 2;
reco.locationName = values[1];

RecordKey recoKey = new RecordKey();
recoKey.keyId = Integer.parseInt(values[0]);
recoKey.isPrimary = true;
output.collect(recoKey, reco);
}else{
Record reco = new Record();
reco.locId = values[2];
reco.empId = values[0];
reco.empName = values[1];
reco.type = 1;

RecordKey recoKey = new RecordKey();
recoKey.keyId = Integer.parseInt(values[2]);
recoKey.isPrimary = false;
output.collect(recoKey, reco);
}
}
}


import java.io.IOException;
import java.util.Iterator;


import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;


public class JoinReducer  extends MapReduceBase implements
Reducer<RecordKey, Record, LongWritable, Text> {
public void reduce(RecordKey key, Iterator<Record> values,
            OutputCollector<LongWritable, Text> output, 
            Reporter reporter) throws IOException {
Record thisLocation= new Record();
while (values.hasNext()){
Record reco = values.next();
if(reco.type ==  2){ //2 is the location
thisLocation = new Record(reco);
System.out.println("location is "+ thisLocation.locationName);
}else{  //1 is employee
reco.locationName =thisLocation.locationName;
System.out.println("emp is "+reco.toString());
output.collect(new LongWritable(0), new Text(reco.toString()));
}
}
}
}




import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;




public class Join {
/**
 * @param args
 */
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub


JobConf conf = new JobConf(Join.class);
conf.setJobName("Join");

FileSystem fstm = FileSystem.get(conf);
Path outDir = new Path("/Users/outputtest");
fstm.delete(outDir, true);


conf.setOutputFormat(SequenceFileOutputFormat.class);
conf.setMapOutputKeyClass(RecordKey.class);
conf.setMapOutputValueClass(Record.class);

conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);

conf.setMapperClass(JoinMapper.class);
conf.setReducerClass(JoinReducer.class);


conf.setOutputValueGroupingComparator(PkFkComparator.class);


FileInputFormat.setInputPaths(conf, new Path(
"/user/input/join"));
FileOutputFormat.setOutputPath(conf, outDir);


JobClient.runJob(conf);


Path outPutFile = new Path(outDir, "part-00000");
SequenceFile.Reader reader = new SequenceFile.Reader(fstm, outPutFile,
conf);
org.apache.hadoop.io.Text numInside = new Text();
LongWritable numOutside = new LongWritable();
while (reader.next(numOutside, numInside)) {
System.out.println(numInside.toString() + " "
+ numOutside.toString());
}
reader.close();
}


}


作者:HEYUTAO007 发表于2011-11-3 22:12:03 原文链接
阅读:694 评论:0 查看评论

相关 [mapreduce hadoop join] 推荐:

[转]基于mapreduce的Hadoop join实现

- -
对于一个大数据的分析应用,join是必不可少的一项功能.现在很多构建与hadoop之上的应用,如Hive,PIG等在其内部实现了join程序,可以通过很简单的sql语句或者数据操控脚本完成相应的Join工作.那么join应该如何实现呢?今天我们就对join做一个简单的实现. 我们来看一个例子,现在有两组数据:一组为单位人员信息,如下:.

mapreduce实例-Join连接 (reduce Side Join)

- - CSDN博客云计算推荐文章
//根据连接类型做不同处理. //设置不同map处理不同输入. 外键作为map输出的key,相同的外键值必然落在一个reduce中,在reduce端根据需要做不同形式的连接. 作者:liuzhoulong 发表于2013-9-5 21:35:16 原文链接. 阅读:83 评论:0 查看评论.

MapReduce中的Join算法

- - CSDN博客推荐文章
  在关系型数据库中Join是非常常见的操作,各种优化手段已经到了极致. 在海量数据的环境下,不可避免的也会碰到这种类型的需求,例如在数据分析时需要从不同的数据源中获取数据. 不同于传统的单机模式,在分布式存储下采用MapReduce编程模型,也有相应的处理措施和优化方法.   我们先简要地描述待解决的问题.

Hadoop MapReduce技巧

- - 简单文本
我在使用Hadoop编写MapReduce程序时,遇到了一些问题,通过在Google上查询资料,并结合自己对Hadoop的理解,逐一解决了这些问题. Hadoop对MapReduce中Key与Value的类型是有要求的,简单说来,这些类型必须支持Hadoop的序列化. 为了提高序列化的性能,Hadoop还为Java中常见的基本类型提供了相应地支持序列化的类型,如IntWritable,LongWritable,并为String类型提供了Text类型.

下一代Hadoop MapReduce

- Jia - NoSQLFan
本文来自Hadoop Summit大会的一个演讲稿,主讲是Hadoop核心开发团队的Arun C Murthy (@acmurthy),同时他也是Yahoo!刚刚剥离的Hadoop独立公司Hortonworks的 Founder和架构师. 演讲中他讲述了现在的Hadoop存在的一些问题和集群上限,并展望了下一代Hadoop和其MapReduce将会得到的巨大提升.

"Hadoop/MapReduce/HBase"分享总结

- - ITeye博客
此分享是关于hadoop生态系统的简单介绍包括起源到相对应用. Hadoop和HBase.pdf (2.1 MB). 已有 0 人发表留言,猛击->> 这里<<-参与讨论. —软件人才免语言低担保 赴美带薪读研.

Hadoop之MapReduce单元测试

- - ITeye博客
通常情况下,我们需要用小数据集来单元测试我们写好的map函数和reduce函数. 而一般我们可以使用Mockito框架来模拟OutputCollector对象(Hadoop版本号小于0.20.0)和Context对象(大于等于0.20.0). 下面是一个简单的WordCount例子:(使用的是新API).

Hadoop MapReduce高级编程

- - 互联网 - ITeye博客
•combine函数把一个map函数产生的对(多个key, value)合并成一个新的. 将新的作为输入到reduce函数中,其格式与reduce函数相同. •这样可以有效的较少中间结果,减少网络传输负荷. •什么情况下可以使用Combiner.

Hadoop中两表JOIN的处理方法

- - 学着站在巨人的肩膀上
在传统数据库(如:MYSQL)中,JOIN操作是非常常见且非常耗时的. 而在HADOOP中进行JOIN操作,同样常见且耗时,由于Hadoop的独特设计思想,当进行JOIN操作时,有一些特殊的技巧. 本文首先介绍了Hadoop上通常的JOIN实现方法,然后给出了几种针对不同输入数据集的优化方法. 假设要进行join的数据分别来自File1和File2..

【Hadoop】MapReduce使用combiner优化性能

- - CSDN博客云计算推荐文章
当MapReduce模型中,reduce执行的任务为统计分类类型的值总量或去重后的数量,或最大值最小值时,可以考虑在Map输出后进行combine操作;这样可以减少网络传输带来的开销,同时减轻了reduce任务的负担. Combine操作是运行在每个节点上的,只会影响本地Map的输出结果;Combine的输入为本地map的输出结果(一般是数据在溢出到磁盘之前,可以减少IO开销),其输出则作为reduce的输入.