Hadoop中两表JOIN的处理方法

标签: Hadoop | 发表时间:2012-05-14 21:34 | 作者:jrckkyy
出处:http://hi.baidu.com/jrckkyy

1. 概述

在传统数据库(如:MYSQL)中,JOIN操作是非常常见且非常耗时的。而在HADOOP中进行JOIN操作,同样常见且耗时,由于Hadoop的独特设计思想,当进行JOIN操作时,有一些特殊的技巧。

本文首先介绍了Hadoop上通常的JOIN实现方法,然后给出了几种针对不同输入数据集的优化方法。

2. 常见的join方法介绍

假设要进行join的数据分别来自File1和File2.

2.1 reduce side join

reduce side join是一种最简单的join方式,其主要思想如下:

在map阶段,map函数同时读取两个文件File1和File2,为了区分两种来源的key/value数据对,对每条数据打一个标签(tag),比如:tag=0表示来自文件File1,tag=2表示来自文件File2。即:map阶段的主要任务是对不同文件中的数据打标签。

在reduce阶段,reduce函数获取key相同的来自File1和File2文件的value list, 然后对于同一个key,对File1和File2中的数据进行join(笛卡尔乘积)。即:reduce阶段进行实际的连接操作。

2.2 map side join

之所以存在reduce side join,是因为在map阶段不能获取所有需要的join字段,即:同一个key对应的字段可能位于不同map中。Reduce side join是非常低效的,因为shuffle阶段要进行大量的数据传输。

Map side join是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样,我们可以将小表复制多份,让每个map task内存中存在一份(比如存放到hash table中),然后只扫描大表:对于大表中的每一条记录key/value,在hash table中查找是否有相同的key的记录,如果有,则连接后输出即可。

为了支持文件的复制,Hadoop提供了一个类DistributedCache,使用该类的方法如下:

(1)用户使用静态方法DistributedCache.addCacheFile()指定要复制的文件,它的参数是文件的URI(如果是HDFS上的文件,可以这样:hdfs://jobtracker:50030/home/XXX/file)。JobTracker在作业启动之前会获取这个URI列表,并将相应的文件拷贝到各个TaskTracker的本地磁盘上。(2)用户使用DistributedCache.getLocalCacheFiles()方法获取文件目录,并使用标准的文件读写API读取相应的文件。

2.3 SemiJoin

SemiJoin,也叫半连接,是从分布式数据库中借鉴过来的方法。它的产生动机是:对于reduce side join,跨机器的数据传输量非常大,这成了join操作的一个瓶颈,如果能够在map端过滤掉不会参加join操作的数据,则可以大大节省网络IO。

实现方法很简单:选取一个小表,假设是File1,将其参与join的key抽取出来,保存到文件File3中,File3文件一般很小,可以放到内存中。在map阶段,使用DistributedCache将File3复制到各个TaskTracker上,然后将File2中不在File3中的key对应的记录过滤掉,剩下的reduce阶段的工作与reduce side join相同。

更多关于半连接的介绍,可参考:半连接介绍: http://wenku.baidu.com/view/ae7442db7f1922791688e877.html

2.4 reduce side join + BloomFilter

在某些情况下,SemiJoin抽取出来的小表的key集合在内存中仍然存放不下,这时候可以使用BloomFiler以节省空间。

BloomFilter最常见的作用是:判断某个元素是否在一个集合里面。它最重要的两个方法是:add() 和contains()。最大的特点是不会存在false negative,即:如果contains()返回false,则该元素一定不在集合中,但会存在一定的true negative,即:如果contains()返回true,则该元素一定可能在集合中。

因而可将小表中的key保存到BloomFilter中,在map阶段过滤大表,可能有一些不在小表中的记录没有过滤掉(但是在小表中的记录一定不会过滤掉),这没关系,只不过增加了少量的网络IO而已。

更多关于BloomFilter的介绍,可参考: http://blog.csdn.net/jiaomeng/article/details/1495500

3. 二次排序

在Hadoop中,默认情况下是按照key进行排序,如果要按照value进行排序怎么办?即:对于同一个key,reduce函数接收到的value list是按照value排序的。这种应用需求在join操作中很常见,比如,希望相同的key中,小表对应的value排在前面。

有两种方法进行二次排序,分别为:buffer and in memory sort和 value-to-key conversion。

对于buffer and in memory sort,主要思想是:在reduce()函数中,将某个key对应的所有value保存下来,然后进行排序。 这种方法最大的缺点是:可能会造成out of memory。

对于value-to-key conversion,主要思想是:将key和部分value拼接成一个组合key(实现WritableComparable接口或者调用setSortComparatorClass函数),这样reduce获取的结果便是先按key排序,后按value排序的结果,需要注意的是,用户需要自己实现Paritioner,以便只按照key进行数据划分。Hadoop显式的支持二次排序,在Configuration类中有个setGroupingComparatorClass()方法,可用于设置排序group的key值,具体参考: http://www.cnblogs.com/xuxm2007/archive/2011/09/03/2165805.html

4. 后记

最近一直在找工作,由于简历上写了熟悉Hadoop,所以几乎每个面试官都会问一些Hadoop相关的东西,而 Hadoop上Join的实现就成了一道必问的问题,而极个别公司还会涉及到DistributedCache原理以及怎样利用DistributedCache进行Join操作。为了更好地应对这些面试官,特整理此文章。

O(∩_∩)O哈哈~

5. 参考资料

(1) 书籍《Data-Intensive Text Processing with MapReduce》 page 60~67 Jimmy Lin and Chris Dyer,University of Maryland, College Park

(2) 书籍《Hadoop In Action》page 107~131

(3) mapreduce的二次排序 SecondarySort: http://www.cnblogs.com/xuxm2007/archive/2011/09/03/2165805.html

(4) 半连接介绍: http://wenku.baidu.com/view/ae7442db7f1922791688e877.html

(5) BloomFilter介绍: http://blog.csdn.net/jiaomeng/article/details/1495500

原创文章,转载请注明: 转载自 董的博客

本文链接地址: http://dongxicheng.org/mapreduce/hadoop-join-two-tables/

阅读全文
类别: Hadoop  查看评论

相关 [hadoop join 方法] 推荐:

Hadoop中两表JOIN的处理方法

- - 学着站在巨人的肩膀上
在传统数据库(如:MYSQL)中,JOIN操作是非常常见且非常耗时的. 而在HADOOP中进行JOIN操作,同样常见且耗时,由于Hadoop的独特设计思想,当进行JOIN操作时,有一些特殊的技巧. 本文首先介绍了Hadoop上通常的JOIN实现方法,然后给出了几种针对不同输入数据集的优化方法. 假设要进行join的数据分别来自File1和File2..

[转]基于mapreduce的Hadoop join实现

- -
对于一个大数据的分析应用,join是必不可少的一项功能.现在很多构建与hadoop之上的应用,如Hive,PIG等在其内部实现了join程序,可以通过很简单的sql语句或者数据操控脚本完成相应的Join工作.那么join应该如何实现呢?今天我们就对join做一个简单的实现. 我们来看一个例子,现在有两组数据:一组为单位人员信息,如下:.

基于Hadoop datajoin包开发Reduce join及针对MRV2优化

- - 开源软件 - ITeye博客
编写不易,转载请注明(http://shihlei.iteye.com/blog/2263757).         最近项目,需要对两个文件进行连接查询,从文件2中提取在文件1中选线id的记录. 主要问题:两个文件都很大【 文件1:1亿记录 ; 文件2:8亿记录 】 . 方案1:Map启动将文件1表示读取bloomfilter,map处理文件2,发现存在即输出.

Java多线程中join方法的理解

- - Java - 编程语言 - ITeye博客
thread.Join把指定的线程加入到当前线程,可以将两个交替执行的线程合并为顺序执行的线程. 比如在线程B中调用了线程A的Join()方法,直到线程A执行完毕后,才会继续执行线程B. t.join();      //使调用线程 t 在此之前执行完毕. t.join(1000);  //等待 t 线程,等待时间是1000毫秒.

转:hive表Join的倾斜问题以及解决方法

- - SQL - 编程语言 - ITeye博客
写HQL语句的时候常常会遇到表Join的情况,一个简单的Join会被Hive解释成一个MapReduce任务,Map端分别读取两个表的数据,Reduce做真正的Join操作. 如果执行的过程中,如果发现有些Reduce任务比其他的Reduce任务慢很多,往往是发生了倾斜问题.  Join会被Hive解释成一个MapReduce任务时,Map端输出的记录是以Join的条件为Key的,即这些Map生成的Key都是 cat_id.

Hive中的join

- - CSDN博客云计算推荐文章
select a.* from a join b on a.id = b.id select a.* from a join b on (a.id = b.id and a.department = b.department). 在使用join写查询的时候有一个原则:应该将条目少的表或者子查询放在join操作符的左边.

hadoop复合键排序使用方法

- - CSDN博客云计算推荐文章
在hadoop中处理复杂业务时,需要用到复合键,复合不同于单纯的继承Writable接口,而是继承了WritableComparable接口,而实际上,WritableComparable接口继承了Writable和Comparable接口,如果只需要使用某一个类作为传值对象而不是作为key,继承Writable接口即可.

hadoop源码调试跟踪方法

- - 开源软件 - ITeye博客
最近经常有问我hadoop源码怎么看的问题,做以下讲解希望有用(hdfs调试为例). 1.在eclipse中建立java工程,然后把hadoop的src包中的内容拷贝到src资源下(仅org的即可). 2.导入hadoop中lib目录下的jar直到代码不再报错. 3.搭建一个hadoop的集群,我的测试环境是1个nn和3的dn.

mapreduce实例-Join连接 (reduce Side Join)

- - CSDN博客云计算推荐文章
//根据连接类型做不同处理. //设置不同map处理不同输入. 外键作为map输出的key,相同的外键值必然落在一个reduce中,在reduce端根据需要做不同形式的连接. 作者:liuzhoulong 发表于2013-9-5 21:35:16 原文链接. 阅读:83 评论:0 查看评论.

hive join 优化 --小表join大表

- - CSDN博客云计算推荐文章
在小表和大表进行join时,将 小表放在前边,效率会高,hive会将小表进行缓存. 使用mapjoin将小表放入内存,在map端和大表逐一匹配,从而省去reduce. 在0.7版本后,也可以用配置来自动优化. 作者:smile0198 发表于2014-10-25 21:49:25 原文链接. 阅读:62 评论:0 查看评论.