mapreduce实例-Join连接 (reduce Side Join)

标签: mapreduce 实例 join | 发表时间:2013-09-06 05:35 | 作者:liuzhoulong
出处:http://blog.csdn.net
public class ReduceSideJoin extends Configured implements Tool {

    public static class UserJoinMapper extends Mapper<Object, Text, Text, Text> {

        private Text outkey = new Text();
        private Text outvalue = new Text();

        @Override
        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            try {
                String[] sp = value.toString().split(",");
                String userid = sp[0];
                outkey.set(userid);
                outvalue.set("A" + value.toString());
                context.write(outkey, outkey);
            } catch (Exception e) {
                context.getCounter("UserJoinMapper", "errorlog").increment(1);
            }
        }
    }

    public static class CommentJoinMapper extends Mapper<Object, Text, Text, Text> {

        private Text outkey = new Text();
        private Text outvalue = new Text();

        @Override
        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            try {
                String[] sp = value.toString().split(",");
                String userid = sp[0];
                outkey.set(userid);
                outvalue.set("B" + value.toString());
                context.write(outkey, outkey);
            } catch (Exception e) {
                context.getCounter("UserJoinMapper", "errorlog").increment(1);
            }
        }
    }

    public static class UserJoinReducer extends Reducer<Text, Text, Text, Text> {

        private static final Text EMPTY_TEXT = new Text("");
        private ArrayList<Text> listA = new ArrayList<Text>();
        private ArrayList<Text> listB = new ArrayList<Text>();
        private String joinType = null;

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            joinType = context.getConfiguration().get("join.type");
        }

        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            listA.clear();
            listB.clear();
            for (Text value : values) {
                if (value.charAt(0) == 'A') {
                    listA.add(new Text(value.toString().substring(1)));
                } else if (value.charAt(0) == 'B') {
                    listB.add(new Text(value.toString().substring(1)));
                }
            }

            executeJoinLogic(context);
        }

        //根据连接类型做不同处理
        private void executeJoinLogic(Context context) throws IOException, InterruptedException {
            if (joinType.equalsIgnoreCase("inner")) {
                if (listA.isEmpty() && listB.isEmpty()) {
                    for (Text A : listA) {
                        for (Text B : listB) {
                            context.write(A, B);
                        }
                    }
                }
            } else if (joinType.equalsIgnoreCase("leftouter")) {
                for (Text A : listA) {
                    if (!listB.isEmpty()) {
                        for (Text B : listB) {
                            context.write(A, B);
                        }
                    } else {
                        context.write(A, EMPTY_TEXT);
                    }
                }
            } else if (joinType.equalsIgnoreCase("rightouter")) {
                for (Text B : listB) {
                    if (!listA.isEmpty()) {
                        for (Text A : listA) {
                            context.write(A, B);
                        }
                    } else {
                        context.write(EMPTY_TEXT, B);
                    }
                }
            } else if (joinType.equalsIgnoreCase("fullouter")) {
                if (!listA.isEmpty()) { 
                    for (Text A : listA) {
                        if (!listB.isEmpty()) {
                            for (Text B : listB) {
                                context.write(A, B);
                            }
                        }else{
                            context.write(A, EMPTY_TEXT);
                        }
                    }
                } else {
                    for (Text B : listB) {
                       context.write(EMPTY_TEXT, B);
                    }
                }
            } else if (joinType.equalsIgnoreCase("anti")) {
                if (listA.isEmpty() ^ listB.isEmpty()) {
                    for (Text A : listA) {
                        context.write(A, EMPTY_TEXT);
                    }
                    for (Text B : listB) {
                        context.write(EMPTY_TEXT, B);
                    }
                }
            }
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
        conf.set("join.type", args[2]);
        Job job = new Job(conf, "ReduceSideJoin");
        job.setJarByClass(ReduceSideJoin.class);
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        
        //设置不同map处理不同输入
        
        MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, UserJoinMapper.class);
        MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, CommentJoinMapper.class);
        
        FileOutputFormat.setOutputPath(job, new Path(args[3]));
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setReducerClass(UserJoinReducer.class);

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws IOException, InterruptedException {
        try {
            if (args.length < 4) {
                System.err.println("ERROR: Parameter format length ");
                System.exit(0);
            }
            int ret = ToolRunner.run(new ReduceSideJoin(), args);
            System.exit(ret);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

外键作为map输出的key,相同的外键值必然落在一个reduce中,在reduce端根据需要做不同形式的连接。
作者:liuzhoulong 发表于2013-9-5 21:35:16 原文链接
阅读:83 评论:0 查看评论

相关 [mapreduce 实例 join] 推荐:

mapreduce实例-Join连接 (reduce Side Join)

- - CSDN博客云计算推荐文章
//根据连接类型做不同处理. //设置不同map处理不同输入. 外键作为map输出的key,相同的外键值必然落在一个reduce中,在reduce端根据需要做不同形式的连接. 作者:liuzhoulong 发表于2013-9-5 21:35:16 原文链接. 阅读:83 评论:0 查看评论.

MapReduce中的Join算法

- - CSDN博客推荐文章
  在关系型数据库中Join是非常常见的操作,各种优化手段已经到了极致. 在海量数据的环境下,不可避免的也会碰到这种类型的需求,例如在数据分析时需要从不同的数据源中获取数据. 不同于传统的单机模式,在分布式存储下采用MapReduce编程模型,也有相应的处理措施和优化方法.   我们先简要地描述待解决的问题.

[转]基于mapreduce的Hadoop join实现

- -
对于一个大数据的分析应用,join是必不可少的一项功能.现在很多构建与hadoop之上的应用,如Hive,PIG等在其内部实现了join程序,可以通过很简单的sql语句或者数据操控脚本完成相应的Join工作.那么join应该如何实现呢?今天我们就对join做一个简单的实现. 我们来看一个例子,现在有两组数据:一组为单位人员信息,如下:.

Mapreduce实例-分组排重(group by distinct)

- - CSDN博客云计算推荐文章
需要实现以下几个类,代码太多,列了下主要代码,可根据排重数据的特征判读是否需要添加combiner来提速. job.setPartitionerClass(MyPartitioner.class); map略. combiner(根据需要添加) reduce中的实现:. 作者:liuzhoulong 发表于2013-9-5 22:17:26 原文链接.

Hive中的join

- - CSDN博客云计算推荐文章
select a.* from a join b on a.id = b.id select a.* from a join b on (a.id = b.id and a.department = b.department). 在使用join写查询的时候有一个原则:应该将条目少的表或者子查询放在join操作符的左边.

hive join 优化 --小表join大表

- - CSDN博客云计算推荐文章
在小表和大表进行join时,将 小表放在前边,效率会高,hive会将小表进行缓存. 使用mapjoin将小表放入内存,在map端和大表逐一匹配,从而省去reduce. 在0.7版本后,也可以用配置来自动优化. 作者:smile0198 发表于2014-10-25 21:49:25 原文链接. 阅读:62 评论:0 查看评论.

Mapreduce小结

- MAGI-CASPER/Peter Pan - 博客园-唯有前进值得敬仰
读完mapreduce论文小结一下. 1.MapReduce是一个编程模型,封装了并行计算、容错、数据分布、负载均衡等细节问题. 输入是一个key-value对的集合,中间输出也是key-value对的集合,用户使用两个函数:Map和Reduce. Map函数接受一个输入的key-value对,然后产生一个中间key-value 对的集合.

Hadoop MapReduce技巧

- - 简单文本
我在使用Hadoop编写MapReduce程序时,遇到了一些问题,通过在Google上查询资料,并结合自己对Hadoop的理解,逐一解决了这些问题. Hadoop对MapReduce中Key与Value的类型是有要求的,简单说来,这些类型必须支持Hadoop的序列化. 为了提高序列化的性能,Hadoop还为Java中常见的基本类型提供了相应地支持序列化的类型,如IntWritable,LongWritable,并为String类型提供了Text类型.

MapReduce原理

- - C++博客-牵着老婆满街逛
       MapReduce 是由Google公司的Jeffrey Dean 和 Sanjay Ghemawat 开发的一个针对大规模群组中的海量数据处理的分布式编程模型. MapReduce实现了两个功能. Map把一个函数应用于集合中的所有成员,然后返回一个基于这个处理的结果集. 而Reduce是把从两个或更多个Map中,通过多个线程,进程或者独立系统并行执行处理的结果集进行分类和归纳.

MapReduce优化

- - 行业应用 - ITeye博客
相信每个程序员在 编程时都会问自己两个问题“我如何完成这个任务”,以及“怎么能让程序运行得更快”. 同样,MapReduce计算模型的多次优化也是为了更好地解答这两个问题. MapReduce计算模型的优化涉及了方方面面的内容,但是主要集中在两个方面:一是计算性能方面的优化;二是I/O操作方面的优化.