基于Hadoop datajoin包开发Reduce join及针对MRV2优化
编写不易,转载请注明(http://shihlei.iteye.com/blog/2263757)!
最近项目,需要对两个文件进行连接查询,从文件2中提取在文件1中选线id的记录。
主要问题:两个文件都很大【 文件1:1亿记录 ; 文件2:8亿记录 】
方案:
-
方案1:Map启动将文件1表示读取bloomfilter,map处理文件2,发现存在即输出。
问题:文件1过大,读取时间长,task直接timeout被kill。 -
方案2:使用Reduce端join,使用Hadoop data-join包的api进行连接
一 Hadoop Reduce Join
1思想
根据输入标记数据源,根据提供的Group key 分组,在Reduce侧处理组内容,完成连接。
2 实现
(1)定义可标记的输出类型
/** * * 1 根据文件名作为tag,区分数据来源 2 将数据封装成TaggedMapOutput 对象,并打上必要的tag 3 生成group * by的分组key,作为依据 * * */ public static class JoinMapper extends DataJoinMapperBase { /** * 读取输入的文件路径 * * **/ protected Text generateInputTag(String inputFile) { // 取文件名的A和B作为来源标记 String datasource = StringUtils.splitByWholeSeparatorPreserveAllTokens(inputFile, ".", 2)[0]; return new Text(datasource); } /*** * 分组的Key * * **/ protected Text generateGroupKey(TaggedMapOutput aRecord) { String line = ((Text) aRecord.getData()).toString(); if (StringUtils.isBlank(line)) { return null; } // 去每个文件的第一个字段作为连接key String groupKey = StringUtils.splitByWholeSeparatorPreserveAllTokens(line, ",", 2)[0]; return new Text(groupKey); } /** * 对文件打上标记 */ protected TaggedMapOutput generateTaggedMapOutput(Object value) { TaggedWritable retv = new TaggedWritable((Text) value); retv.setTag(this.inputTag); return retv; } } // 自定义输出类型===================================================== public static class TaggedWritable extends TaggedMapOutput { private Writable data; // 需要定义默认构造函数,否则报错 public TaggedWritable() { this.tag = new Text(); } public TaggedWritable(Writable data) { this.tag = new Text(""); this.data = data; } public Writable getData() { return data; } public void setData(Writable data) { this.data = data; } public void write(DataOutput out) throws IOException { this.tag.write(out); // 由于定义类型为WriteAble 所以不好使 out.writeUTF(this.data.getClass().getName()); this.data.write(out); } public void readFields(DataInput in) throws IOException { this.tag.readFields(in); this.data.readFields(in); String dataClz = in.readUTF(); try { if (this.data == null || !this.data.getClass().getName().equals(dataClz)) { this.data = (Writable) ReflectionUtils.newInstance(Class.forName(dataClz), null); } this.data.readFields(in); } catch (ClassNotFoundException cnfe) { System.out.println("Problem in TaggedWritable class, method readFields."); } } }
遇到的坑:——跟Hadoop实战的区别
a)没有默认构造函数
报错:
java.lang.Exception: java.lang.RuntimeException: java.lang.NoSuchMethodException: x.bd.hadoop.join.MR1ReduceJoinJob$TaggedWritable.<init>() at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462) at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:529) Caused by: java.lang.RuntimeException: java.lang.NoSuchMethodException: x.bd.hadoop.join.MR1ReduceJoinJob$TaggedWritable.<init>() at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:131) at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:66) at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:42) at org.apache.hadoop.mapred.Task$ValuesIterator.readNextValue(Task.java:1421) at org.apache.hadoop.mapred.Task$ValuesIterator.next(Task.java:1361) at org.apache.hadoop.mapred.ReduceTask$ReduceValuesIterator.moveToNext(ReduceTask.java:220) at org.apache.hadoop.mapred.ReduceTask$ReduceValuesIterator.next(ReduceTask.java:216) at org.apache.hadoop.contrib.utils.join.DataJoinReducerBase.regroup(DataJoinReducerBase.java:106) at org.apache.hadoop.contrib.utils.join.DataJoinReducerBase.reduce(DataJoinReducerBase.java:129) at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:444) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:392) at org.apache.hadoop.mapred.LocalJobRunner$Job$ReduceTaskRunnable.run(LocalJobRunner.java:319) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NoSuchMethodException: x.bd.hadoop.join.MR1ReduceJoinJob$TaggedWritable.<init>() at java.lang.Class.getConstructor0(Class.java:3082) at java.lang.Class.getDeclaredConstructor(Class.java:2178) at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:125)
解决:
添加默认构造器。
b)反序列化readFields空指针
报错:
java.lang.Exception: java.lang.NullPointerException at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462) at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:529) Caused by: java.lang.NullPointerException at x.bd.hadoop.join.MR1ReduceJoinJob$TaggedWritable.readFields(MR1ReduceJoinJob.java:160) at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:71) at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:42) at org.apache.hadoop.mapred.Task$ValuesIterator.readNextValue(Task.java:1421) at org.apache.hadoop.mapred.Task$ValuesIterator.next(Task.java:1361) at org.apache.hadoop.mapred.ReduceTask$ReduceValuesIterator.moveToNext(ReduceTask.java:220) at org.apache.hadoop.mapred.ReduceTask$ReduceValuesIterator.next(ReduceTask.java:216) at org.apache.hadoop.contrib.utils.join.DataJoinReducerBase.regroup(DataJoinReducerBase.java:106) at org.apache.hadoop.contrib.utils.join.DataJoinReducerBase.reduce(DataJoinReducerBase.java:129) at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:444) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:392) at org.apache.hadoop.mapred.LocalJobRunner$Job$ReduceTaskRunnable.run(LocalJobRunner.java:319) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
原因:
在反序列化时有如下(《Hadoop实战》中):
public void readFields(DataInput in) throws IOException { this.tag.readFields(in); this.data.readFields(in); }这时两个成员变量还没有值
解决:
1 在构造是创建Text 类型 Tag对象。
2 由于Data 对象无法是Writeable类型,无法创建,所以只能在序列化时多记录类型,在readRields时反射出来。
详细见上代码。
(2)继承DataJoinMapperBase 实现记录标记
主要实现三个方法:
/** * 根据文件名实现找打标记 */ protected abstract Text generateInputTag(String inputFile); /** * 对本行记录打上标记,生成TaggedMapOutput */ protected abstract TaggedMapOutput generateTaggedMapOutput(Object value); /** * 生成分组key,其实就是Reduce的key */ protected abstract Text generateGroupKey(TaggedMapOutput aRecord);
代码:
/** * * 1 根据文件名作为tag,区分数据来源 2 将数据封装成TaggedMapOutput 对象,并打上必要的tag 3 生成group * by的分组key,作为依据 * * */ public static class JoinMapper extends DataJoinMapperBase { /** * 读取输入的文件路径 * * **/ protected Text generateInputTag(String inputFile) { // 取文件名的A和B作为来源标记 String datasource = StringUtils.splitByWholeSeparatorPreserveAllTokens(inputFile, ".", 2)[0]; return new Text(datasource); } /*** * 分组的Key * * **/ protected Text generateGroupKey(TaggedMapOutput aRecord) { String line = ((Text) aRecord.getData()).toString(); if (StringUtils.isBlank(line)) { return null; } // 去每个文件的第一个字段作为连接key String groupKey = StringUtils.splitByWholeSeparatorPreserveAllTokens(line, ",", 2)[0]; return new Text(groupKey); } /** * 对文件打上标记 */ protected TaggedMapOutput generateTaggedMapOutput(Object value) { TaggedWritable retv = new TaggedWritable((Text) value); retv.setTag(this.inputTag); return retv; } }
(3)继承DataJoinReducerBase 根据条件数据数据
主要实现combin()方法,完成连接操作
public static class JoinReducer extends DataJoinReducerBase { /** * tags,标签集合,且有顺序通常按照文件读取顺序 values,标签值, * * 本方法被调一次,会传递一组要连接的记录,文件1的一条,文件2的一条 */ protected TaggedMapOutput combine(Object[] tags, Object[] values) { // 按照需求,非left join 或 right join所以要求 if (tags.length < 2) return null; String joinedStr = ""; for (int i = 0; i < values.length; i++) { // 设置拼接符 if (i > 0) joinedStr += ","; TaggedWritable tw = (TaggedWritable) values[i]; String line = ((Text) tw.getData()).toString(); String[] tokens = line.split(",", 2); joinedStr += tokens[1]; } // 写出 TaggedWritable retv = new TaggedWritable(new Text(joinedStr)); retv.setTag((Text) tags[0]); return retv; } }
(4)整体调用代码
public class MR1ReduceJoinJob { public static void main(String[] args) throws Exception { String in = "/Test/demo/in"; String out = "/Test/demo/out"; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); JobConf job = buildJob(new Path(in), new Path(out), fs, conf); RunningJob runningJob = JobClient.runJob(job); // 等待结束 runningJob.waitForCompletion(); if (runningJob.isSuccessful()) { System.out.println("success ! "); } else { System.out.println(runningJob.getFailureInfo()); } } public static JobConf buildJob(Path in, Path out, FileSystem fs, Configuration conf) throws IOException { fs.delete(out, true); JobConf job = new JobConf(new Configuration(), MR1ReduceJoinJob.class); job.setJobName("MR1ReduceJoinJob"); job.setJarByClass(MR1ReduceJoinJob.class); job.setMapperClass(JoinMapper.class); job.setReducerClass(JoinReducer.class); job.setNumReduceTasks(1); job.setInputFormat(TextInputFormat.class); job.setOutputFormat(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(TaggedWritable.class); job.set("mapred.textoutputformat.separator", ","); // 解决死循环问题 job.setLong("datajoin.maxNumOfValuesPerGroup", Long.MAX_VALUE); FileInputFormat.addInputPath(job, in); FileOutputFormat.setOutputPath(job, out); return job; } }
输入文件:
A:
1,a 2,b 3,c
B:
1,11 1,111 2,22 2,222 4,44
输出文件:
1,a,111 1,a,11 2,b,222 2,b,22
其他的坑:
大文件执行是一直出这个:
2-13 17:54:45 -2924 [localfetcher#5] INFO - closeInMemoryFile -> map-output of size: 215770, inMemoryMapOutputs.size() -> 2, commitMemory -> 829095, usedMemory ->1044865 2015-12-13 17:54:45 -2925 [localfetcher#5] INFO - localfetcher#5 about to shuffle output of map attempt_local647573184_0001_m_000009_0 decomp: 205864 len: 205868 to MEMORY 2015-12-13 17:54:45 -2925 [localfetcher#5] INFO - Read 205864 bytes from map-output for attempt_local647573184_0001_m_000009_0 2015-12-13 17:54:45 -2925 [localfetcher#5] INFO - closeInMemoryFile -> map-output of size: 205864, inMemoryMapOutputs.size() -> 3, commitMemory -> 1044865, usedMemory ->1250729 2015-12-13 17:54:45 -2926 [localfetcher#5] INFO - localfetcher#5 about to shuffle output of map attempt_local647573184_0001_m_000007_0 decomp: 211843 len: 211847 to MEMORY 2015-12-13 17:54:45 -2926 [localfetcher#5] INFO - Read 211843 bytes from map-output for attempt_local647573184_0001_m_000007_0 2015-12-13 17:54:45 -2926 [localfetcher#5] INFO - closeInMemoryFile -> map-output of size: 211843, inMemoryMapOutputs.size() -> 4, commitMemory -> 1250729, usedMemory ->1462572 2015-12-13 17:54:45 -2927 [localfetcher#5] INFO - localfetcher#5 about to shuffle output of map attempt_local647573184_0001_m_000000_0 decomp: 851861 len: 851865 to MEMORY 2015-12-13 17:54:45 -2929 [localfetcher#5] INFO - Read 851861 bytes from map-output for attempt_local647573184_0001_m_000000_0 2015-12-1
根据源码及别人方案,要改成
job.setLong("datajoin.maxNumOfValuesPerGroup", Long.MAX_VALUE);
后来问题又不出现了。
4 不足
- 基于mrv1 实现, API 交旧
-
Map的问题:
(1)只能基于文件名分组,要求连接的文件名必须能区分
(2)generateTaggedMapOutput()还需要手动绑定Tag -
Reduce 问题:
(1)迭代输出时不能灵活控制,框架给分组,输出单个文件的字段会重,需要自己处理。 -
排序相同Group key的记录,也不适合处理超大的记录,可以通过二次排序改进。
- 我这种需要查找exist的需求不好实现。
二 基于MR V2 重写并改进
1 TaggedValue
package x.bd.hadoop.join.base; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.util.ReflectionUtils; /** * 可标记的结果 * * @author shilei * */ public class TaggedValue implements Writable { private Text tag; private Writable data; public TaggedValue() { tag = new Text(); } public TaggedValue(Writable data) { tag = new Text(); this.data = data; } @Override public void write(DataOutput out) throws IOException { // 写出内容 this.tag.write(out); out.writeUTF(this.data.getClass().getName()); this.data.write(out); } @Override public void readFields(DataInput in) throws IOException { this.tag.readFields(in); String dataClz = in.readUTF(); try { if (this.data == null || !this.data.getClass().getName().equals(dataClz)) { this.data = (Writable) ReflectionUtils.newInstance(Class.forName(dataClz), null); } this.data.readFields(in); } catch (ClassNotFoundException cnfe) { System.out.println("Problem in TaggedWritable class, method readFields."); } } public Text getTag() { return tag; } public void setTag(Text tag) { this.tag = tag; } public Writable getData() { return data; } public void setData(Writable data) { this.data = data; } /** * clone克隆 一个 对象数据 * * @param conf * @return */ public TaggedValue clone(Configuration conf) { return (TaggedValue) WritableUtils.clone(this, conf); } public static void main(String[] args) { System.out.println(TaggedValue.class.getName()); } }
2 DataJoinMapBase
package x.bd.hadoop.join.base; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; /** * 连接查询 Mapper * * @author shilei * * @param <KEYIN> * @param <VALUEIN> */ public abstract class DataJoinMapperBase<KEYIN, VALUEIN> extends Mapper<KEYIN, VALUEIN, Text, TaggedValue> { // 类型标记 protected Text inputTag; // 输入文件路径,文件名 protected String inputFilePath, inputFileName; /** * 根据数据的文件名确定输入标签 * */ protected abstract Text generateInputTagByFile(String inputFilePath, String inputFileName); /** * 根据行内容处理Tag * * @param inputFilePath * @param inputFileName * @return */ protected Text generateInputTagByLine(Text tag, KEYIN key, VALUEIN value, Context context) { return inputTag; } /** * 封装待标签的输出 */ protected abstract TaggedValue generateTaggedMapValue(VALUEIN value); /** * 生成group by的列 */ protected abstract Text generateGroupKey(TaggedValue tagValue); @Override protected void setup(Mapper<KEYIN, VALUEIN, Text, TaggedValue>.Context context) throws IOException, InterruptedException { FileSplit inputSplit = (FileSplit)context.getInputSplit(); this.inputFilePath = inputSplit.getPath().getParent().getName(); this.inputFileName = inputSplit.getPath().getName(); this.inputTag = generateInputTagByFile(this.inputFilePath, this.inputFileName); } @Override public void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException { // 根据本行情况,处理Tag this.inputTag = generateInputTagByLine(this.inputTag, key, value, context); // 生成带标签的value TaggedValue taggedValue = generateTaggedMapValue(value); if (taggedValue == null) { context.getCounter("DataJoinMapper", "discardedCount").increment(1); return; } // 生成分组健 Text groupKey = generateGroupKey(taggedValue); if (groupKey == null) { context.getCounter("DataJoinMapper", "nullGroupKeyCount").increment(1); return; } // 输出内容绑定标签 taggedValue.setTag(this.inputTag); // key : group key , value : taggedValue context.write(groupKey, taggedValue); context.getCounter("DataJoinMapper", "outCount").increment(1); } }
3 DataJoinReduceJoin
package x.bd.hadoop.join.base; import java.io.IOException; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.SortedMap; import java.util.TreeMap; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /** * 连接查询Reduce * * @author shilei * * @param <KEYOUT> * @param <VALUEOUT> */ public abstract class DataJoinReducerBase<KEYOUT, VALUEOUT> extends Reducer<Text, TaggedValue, KEYOUT, VALUEOUT> { @Override protected void setup(Reducer<Text, TaggedValue, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { super.setup(context); } /** * 合并结果 */ protected abstract void combine(SortedMap<Text, List<TaggedValue>> valueGroups, Reducer<Text, TaggedValue, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException; @Override protected void reduce(Text key, Iterable<TaggedValue> values, Reducer<Text, TaggedValue, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { // 获得一个sort map ,key 是tab ,value 是结果的集合 SortedMap<Text, List<TaggedValue>> groups = regroup(key, values, context); combine(groups, context); context.getCounter("DataJoinReucer", "groupCount").increment(1); } /** * 按照Tag 对value 进行充分组 * * @param key * @param arg1 * @param reporter * @return * @throws IOException */ private SortedMap<Text, List<TaggedValue>> regroup(Text key, Iterable<TaggedValue> values, Reducer<Text, TaggedValue, KEYOUT, VALUEOUT>.Context context) throws IOException { /* * key: tag; value : TaggedValue */ SortedMap<Text, List<TaggedValue>> valueGroup = new TreeMap<Text, List<TaggedValue>>(); // 遍历Value Iterator<TaggedValue> iter = values.iterator(); while (iter.hasNext()) { // TODO 为什么需要克隆? TaggedValue taggedValue = ((TaggedValue) iter.next()).clone(context.getConfiguration()); // 获得记录的 tag Text tag = taggedValue.getTag(); // 从map 中获取一个iterator,如果已经创建,就做一个情况 List<TaggedValue> datas = valueGroup.get(tag); if (datas == null) { datas = new LinkedList<TaggedValue>(); valueGroup.put(tag, datas); } datas.add(taggedValue); // System.out.println("reduce : " + taggedValue + "|" + // tag.toString() + "|" + taggedValue.getData().toString()); taggedValue = null; } return valueGroup; } }
4 整体调用
package x.bd.hadoop.join; import java.io.IOException; import java.util.Iterator; import java.util.List; import java.util.SortedMap; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import x.bd.hadoop.join.base.DataJoinMapperBase; import x.bd.hadoop.join.base.DataJoinReducerBase; import x.bd.hadoop.join.base.TaggedValue; /** * 使用Hadoop API 对数据进行 Reduce 连接</br> * * 文件1:A.txt 1,a</br> 2,b </br> 3,c * * 文件2:B.txt 1,11</br> 1,111</br> 2,22</br> 2,222</br>4,44 * * 关联查询(要求inner join): 1,a,11</br> 1,a,111</br> 2,b,22 </br> 2,b,222</br> * * * @author shilei * */ public class MR2SelfReduceJoinJob extends Configured implements Tool { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); int res = ToolRunner.run(conf, new MR2SelfReduceJoinJob(), args); if (res == 0) { System.out.println("MR2SelfReduceJoinJob success !"); } else { System.out.println("MR2SelfReduceJoinJob error ! "); } System.exit(res); } @Override public int run(String[] args) throws Exception { String in = "/Test/demo/in"; String out = "/Test/demo/out"; Path inPath = new Path(in); Path outPath = new Path(out); Configuration conf = getConf(); FileSystem fs = FileSystem.get(conf); fs.delete(outPath, true); Job job = Job.getInstance(conf, "MR2SelfReduceJoinJob"); job.setJarByClass(MR2SelfReduceJoinJob.class); job.setMapperClass(JoinMapper.class); job.setReducerClass(JoinReducer.class); job.setNumReduceTasks(1); // 处理map的输出 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(TaggedValue.class); job.setOutputKeyClass(Writable.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.addInputPath(job, inPath); FileOutputFormat.setOutputPath(job, outPath); if (job.waitForCompletion(true)) { return 0; } else { return 1; } } // Map======================================= /** * * 1 根据文件名作为tag,区分数据来源 2 将数据封装成TaggedMapOutput 对象,并打上必要的tag 3 生成group * by的分组key,作为依据 * * */ public static class JoinMapper extends DataJoinMapperBase<Object, Text> { /** * 读取输入的文件路径 * */ @Override protected Text generateInputTagByFile(String inputFilePath, String inputFileName) { // 取文件名的A和B作为来源标记 String datasource = StringUtils.splitByWholeSeparatorPreserveAllTokens(inputFileName, ".", 2)[0]; return new Text(datasource); } /** * 按需峰值要处理的记录,这里只需要原样输出 */ @Override protected TaggedValue generateTaggedMapValue(Text value) { return new TaggedValue(value); } /** * 数据的第一个字段作为分组key */ @Override protected Text generateGroupKey(TaggedValue tagValue) { String line = ((Text) tagValue.getData()).toString(); if (StringUtils.isBlank(line)) { return null; } // 去每个文件的第一个字段作为连接key String groupKey = StringUtils.splitByWholeSeparatorPreserveAllTokens(line, ",", 2)[0]; return new Text(groupKey); } } // Reduce============================================ public static class JoinReducer extends DataJoinReducerBase<Writable, NullWritable> { private Text key = new Text("B"); @Override protected void combine(SortedMap<Text, List<TaggedValue>> valueGroups, Reducer<Text, TaggedValue, Writable, NullWritable>.Context context) throws IOException, InterruptedException { // 必须能连接上 if (valueGroups.size() < 2) { return; } // 这里只输出文件2的字段 // 写出 List<TaggedValue> cookieValues = valueGroups.get(key); Iterator<TaggedValue> iter = cookieValues.iterator(); while (iter.hasNext()) { TaggedValue value = iter.next(); if (value == null) { continue; } context.write(value.getData(), NullWritable.get()); } } } }
三 源码包
-
本文附件下载:
- hadoop-join.zip (18.4 KB)
已有 0 人发表留言,猛击->> 这里<<-参与讨论
ITeye推荐