mapreduce实例-Join连接 (reduce Side Join)
- - CSDN博客云计算推荐文章//根据连接类型做不同处理. //设置不同map处理不同输入. 外键作为map输出的key,相同的外键值必然落在一个reduce中,在reduce端根据需要做不同形式的连接. 作者:liuzhoulong 发表于2013-9-5 21:35:16 原文链接. 阅读:83 评论:0 查看评论.
public class ReduceSideJoin extends Configured implements Tool { public static class UserJoinMapper extends Mapper<Object, Text, Text, Text> { private Text outkey = new Text(); private Text outvalue = new Text(); @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { try { String[] sp = value.toString().split(","); String userid = sp[0]; outkey.set(userid); outvalue.set("A" + value.toString()); context.write(outkey, outkey); } catch (Exception e) { context.getCounter("UserJoinMapper", "errorlog").increment(1); } } } public static class CommentJoinMapper extends Mapper<Object, Text, Text, Text> { private Text outkey = new Text(); private Text outvalue = new Text(); @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { try { String[] sp = value.toString().split(","); String userid = sp[0]; outkey.set(userid); outvalue.set("B" + value.toString()); context.write(outkey, outkey); } catch (Exception e) { context.getCounter("UserJoinMapper", "errorlog").increment(1); } } } public static class UserJoinReducer extends Reducer<Text, Text, Text, Text> { private static final Text EMPTY_TEXT = new Text(""); private ArrayList<Text> listA = new ArrayList<Text>(); private ArrayList<Text> listB = new ArrayList<Text>(); private String joinType = null; @Override protected void setup(Context context) throws IOException, InterruptedException { joinType = context.getConfiguration().get("join.type"); } @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { listA.clear(); listB.clear(); for (Text value : values) { if (value.charAt(0) == 'A') { listA.add(new Text(value.toString().substring(1))); } else if (value.charAt(0) == 'B') { listB.add(new Text(value.toString().substring(1))); } } executeJoinLogic(context); } //根据连接类型做不同处理 private void executeJoinLogic(Context context) throws IOException, InterruptedException { if (joinType.equalsIgnoreCase("inner")) { if (listA.isEmpty() && listB.isEmpty()) { for (Text A : listA) { for (Text B : listB) { context.write(A, B); } } } } else if (joinType.equalsIgnoreCase("leftouter")) { for (Text A : listA) { if (!listB.isEmpty()) { for (Text B : listB) { context.write(A, B); } } else { context.write(A, EMPTY_TEXT); } } } else if (joinType.equalsIgnoreCase("rightouter")) { for (Text B : listB) { if (!listA.isEmpty()) { for (Text A : listA) { context.write(A, B); } } else { context.write(EMPTY_TEXT, B); } } } else if (joinType.equalsIgnoreCase("fullouter")) { if (!listA.isEmpty()) { for (Text A : listA) { if (!listB.isEmpty()) { for (Text B : listB) { context.write(A, B); } }else{ context.write(A, EMPTY_TEXT); } } } else { for (Text B : listB) { context.write(EMPTY_TEXT, B); } } } else if (joinType.equalsIgnoreCase("anti")) { if (listA.isEmpty() ^ listB.isEmpty()) { for (Text A : listA) { context.write(A, EMPTY_TEXT); } for (Text B : listB) { context.write(EMPTY_TEXT, B); } } } } } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); conf.set("join.type", args[2]); Job job = new Job(conf, "ReduceSideJoin"); job.setJarByClass(ReduceSideJoin.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //设置不同map处理不同输入 MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, UserJoinMapper.class); MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, CommentJoinMapper.class); FileOutputFormat.setOutputPath(job, new Path(args[3])); job.setOutputFormatClass(TextOutputFormat.class); job.setReducerClass(UserJoinReducer.class); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws IOException, InterruptedException { try { if (args.length < 4) { System.err.println("ERROR: Parameter format length "); System.exit(0); } int ret = ToolRunner.run(new ReduceSideJoin(), args); System.exit(ret); } catch (Exception e) { e.printStackTrace(); } } }