Hive JOIN使用详解

标签: hive join | 发表时间:2015-06-11 18:27 | 作者:王书兴
出处:http://www.iteye.com

Hive是基于Hadoop平台的,它提供了类似SQL一样的查询语言HQL。有了Hive,如果使用过SQL语言,并且不理解Hadoop MapReduce运行原理,也就无法通过编程来实现MR,但是你仍然可以很容易地编写出特定查询分析的HQL语句,通过使用类似SQL的语法,将HQL查询语句提交Hive系统执行查询分析,最终Hive会帮你转换成底层Hadoop能够理解的MR Job。
对于最基本的HQL查询我们不再累述,这里主要说明Hive中进行统计分析时使用到的JOIN操作。在说明Hive JOIN之前,我们先简单说明一下,Hadoop执行MR Job的基本过程(运行机制),能更好的帮助我们理解HQL转换到底层的MR Job后是如何执行的。我们重点说明MapReduce执行过程中,从Map端到Reduce端这个过程(Shuffle)的执行情况,如图所示(来自《Hadoop: The Definitive Guide》):
hadoop-mr-shuffle
基本执行过程,描述如下:

  1. 一个InputSplit输入到map,会运行我们实现的Mapper的处理逻辑,对数据进行映射操作。
  2. map输出时,会首先将输出中间结果写入到map自带的buffer中(buffer默认大小为100M,可以通过io.sort.mb配置)。
  3. map自带的buffer使用容量达到一定门限(默认0.80或80%,可以通过io.sort.spill.percent配置),一个后台线程会准备将buffer中的数据写入到磁盘。
  4. 这个后台线程在将buffer中数据写入磁盘之前,会首先将buffer中的数据进行partition(分区,partition数为Reducer的个数),对于每个的数据会基于Key进行一个in-memory排序。
  5. 排序后,会检查是否配置了Combiner,如果配置了则直接作用到已排序的每个partition的数据上,对map输出进行化简压缩(这样写入磁盘的数据量就会减少,降低I/O操作开销)。
  6. 现在可以将经过处理的buffer中的数据写入磁盘,生成一个文件(每次buffer容量达到设置的门限,都会对应着一个写入到磁盘的文件)。
  7. map任务结束之前,会对输出的多个文件进行合并操作,合并成一个文件(若map输出至少3个文件,在多个文件合并后写入之前,如果配置了Combiner,则会运行来化简压缩输出的数据,文件个数可以通过min.num.splits.for.combine配置;如果指定了压缩map输出,这里会根据配置对数据进行压缩写入磁盘),这个文件仍然保持partition和排序的状态。
  8. reduce阶段,每个reduce任务开始从多个map上拷贝属于自己partition(map阶段已经做好partition,而且每个reduce任务知道应该拷贝哪个partition;拷贝过程是在不同节点之间,Reducer上拷贝线程基于HTTP来通过网络传输数据)。
  9. 每个reduce任务拷贝的map任务结果的指定partition,也是先将数据放入到自带的一个buffer中(buffer默认大小为Heap内存的70%,可以通过mapred.job.shuffle.input.buffer.percent配置),如果配置了map结果进行压缩,则这时要先将数据解压缩后放入buffer中。
  10. reduce自带的buffer使用容量达到一定门限(默认0.66或66%,可以通过mapred.job.shuffle.merge.percent配置),或者buffer中存放的map的输出的数量达到一定门限(默认1000,可以通过mapred.inmem.merge.threshold配置),buffer中的数据将会被写入到磁盘中。
  11. 在将buffer中多个map输出合并写入磁盘之前,如果设置了Combiner,则会化简压缩合并的map输出。
  12. 当属于该reducer的map输出全部拷贝完成,则会在reducer上生成多个文件,这时开始执行合并操作,并保持每个map输出数据中Key的有序性,将多个文件合并成一个文件(在reduce端可能存在buffer和磁盘上都有数据的情况,这样在buffer中的数据可以减少一定量的I/O写入操作开销)。
  13. 最后,执行reduce阶段,运行我们实现的Reducer中化简逻辑,最终将结果直接输出到HDFS中(因为Reducer运行在DataNode上,输出结果的第一个replica直接在存储在本地节点上)。

通过上面的描述我们看到,在MR执行过程中,存在Shuffle过程的MR需要在网络中的节点之间(Mapper节点和Reducer节点)拷贝数据,如果传输的数据量很大会造成一定的网络开销。而且,Map端和Reduce端都会通过一个特定的buffer来在内存中临时缓存数据,如果无法根据实际应用场景中数据的规模来使用Hive,尤其是执行表的JOIN操作,有可能很浪费资源,降低了系统处理任务的效率,还可能因为内存不足造成OOME问题,导致计算任务失败。
下面,我们说明Hive中的JOIN操作,针对不同的JOIN方式,应该如何来实现和优化:

生成一个MR Job

多表连接,如果多个表中每个表都使用同一个列进行连接(出现在JOIN子句中),则只会生成一个MR Job,例如:

1 SELECT  a.val, b.val, c.val  FROM  JOIN  ON  (a. key  = b.key1)  JOIN  ON  (c. key  = b.key1)

三个表a、b、c都分别使用了同一个字段进行连接,亦即同一个字段同时出现在两个JOIN子句中,从而只生成一个MR Job。

生成多个MR Job

多表连接,如果多表中,其中存在一个表使用了至少2个字段进行连接(同一个表的至少2个列出现在JOIN子句中),则会至少生成2个MR Job,例如:

1 SELECT  a.val, b.val, c.val  FROM  JOIN  ON  (a. key  = b.key1)  JOIN  ON  (c. key  = b.key2)

三个表基于2个字段进行连接,这两个字段b.key1和b.key2同时出现在b表中。连接的过程是这样的:首先a和b表基于a.key和b.key1进行连接,对应着第一个MR Job;表a和b连接的结果,再和c进行连接,对应着第二个MR Job。

表连接顺序优化

多表连接,会转换成多个MR Job,每一个MR Job在Hive中称为JOIN阶段(Stage)。在每一个Stage,按照JOIN顺序中的最后一个表应该尽量是大表,因为JOIN前一阶段生成的数据会存在于Reducer的buffer中,通过stream最后面的表,直接从Reducer的buffer中读取已经缓冲的中间结果数据(这个中间结果数据可能是JOIN顺序中,前面表连接的结果的Key,数据量相对较小,内存开销就小),这样,与后面的大表进行连接时,只需要从buffer中读取缓存的Key,与大表中的指定Key进行连接,速度会更快,也可能避免内存缓冲区溢出。例如:

1 SELECT  a.val, b.val, c.val  FROM  JOIN  ON  (a. key  = b.key1)  JOIN  ON  (c. key  = b.key1)

这个JOIN语句,会生成一个MR Job,在选择JOIN顺序的时候,数据量相比应该是b < c,表a和b基于a.key = b.key1进行连接,得到的结果(基于a和b进行连接的Key)会在Reducer上缓存在buffer中,在与c进行连接时,从buffer中读取Key(a.key=b.key1)来与表c的c.key进行连接。
另外,也可以通过给出一些Hint信息来启发JOIN操作,这指定了将哪个表作为大表,从而得到优化。例如:

1 SELECT  /*+ STREAMTABLE(a) */ a.val, b.val, c.val  FROM  JOIN  ON  (a. key  = b.key1)  JOIN  c ON  (c. key  = b.key1)

上述JOIN语句中,a表被视为大表,则首先会对表b和c进行JOIN,然后再将得到的结果与表a进行JOIN。

基于条件的LEFT OUTER JOIN优化

左连接时,左表中出现的JOIN字段都保留,右表没有连接上的都为空。对于带WHERE条件的JOIN语句,例如:

1 SELECT  a.val, b.val  FROM  LEFT  OUTER  JOIN  ON  (a. key =b. key )
2 WHERE  a.ds= '2009-07-07'  AND  b.ds= '2009-07-07'

执行顺序是,首先完成2表JOIN,然后再通过WHERE条件进行过滤,这样在JOIN过程中可能会输出大量结果,再对这些结果进行过滤,比较耗时。可以进行优化,将WHERE条件放在ON后,例如:

1 SELECT  a.val, b.val  FROM  LEFT  OUTER  JOIN  b
2 ON  (a. key =b. key  AND  b.ds= '2009-07-07'  AND  a.ds= '2009-07-07' )

这样,在JOIN的过程中,就对不满足条件的记录进行了预先过滤,可能会有更好的表现。

左半连接(LEFT SEMI JOIN)

左半连接实现了类似IN/EXISTS的查询语义,使用关系数据库子查询的方式实现查询SQL,例如:

1 SELECT  a. key , a.value  FROM  WHERE  a. key  IN  ( SELECT  b. key  FROM  b);

使用Hive对应于如下语句:

1 SELECT  a. key , a.val  FROM  LEFT  SEMI  JOIN  ON  (a. key  = b. key )

需要注意的是,在LEFT SEMI JOIN中,表b只能出现在ON子句后面,不能够出现在SELECT和WHERE子句中。
关于子查询,这里提一下,Hive支持情况如下:

  • 在0.12版本,只支持FROM子句中的子查询;
  • 在0.13版本,也支持WHERE子句中的子查询。

Map Side JOIN

Map Side JOIN优化的出发点是,Map任务输出后,不需要将数据拷贝到Reducer节点,降低的数据在网络节点之间传输的开销。
多表连接,如果只有一个表比较大,其他表都很小,则JOIN操作会转换成一个只包含Map的Job,例如:

1 SELECT  /*+ MAPJOIN(b) */ a. key , a.value  FROM  JOIN  ON  a. key  = b. key

对于表a数据的每一个Map,都能够完全读取表b的数据。这里,表a与b不允许执行FULL OUTER JOIN、RIGHT OUTER JOIN。

BUCKET Map Side JOIN

我们先看两个表a和b的DDL,表a为:

1 CREATE  TABLE  a( key  INT , othera STRING)
2 CLUSTERED  BY ( key INTO  4 BUCKETS
3 ROW FORMAT DELIMITED
4 FIELDS TERMINATED  BY  '\001'
5 COLLECTION ITEMS TERMINATED  BY  '\002'
6 MAP KEYS TERMINATED  BY  '\003'
7 STORED  AS  SEQUENCEFILE;

表b为:

1 CREATE  TABLE  b( key  INT , otherb STRING)
2 CLUSTERED  BY ( key INTO  32 BUCKETS
3 ROW FORMAT DELIMITED
4 FIELDS TERMINATED  BY  '\001'
5 COLLECTION ITEMS TERMINATED  BY  '\002'
6 MAP KEYS TERMINATED  BY  '\003'
7 STORED  AS  SEQUENCEFILE;

现在要基于a.key和b.key进行JOIN操作,此时JOIN列同时也是BUCKET列,JOIN语句如下:

1 SELECT  /*+ MAPJOIN(b) */ a. key , a.value  FROM  JOIN  ON  a. key  = b. key

并且表a有4个BUCKET,表b有32个BUCKET,默认情况下,对于表a的每一个BUCKET,都会去获取表b中的每一个BUCKET来进行JOIN,这回造成一定的开销,因为只有表b中满足JOIN条件的BUCKET才会真正与表a的BUCKET进行连接。
这种默认行为可以进行优化,通过改变默认JOIN行为,只需要设置变量:

1 set  hive.optimize.bucketmapjoin =  true

这样,JOIN的过程是,表a的BUCKET 1只会与表b中的BUCKET 1进行JOIN,而不再考虑表b中的其他BUCKET 2~32。
如果上述表具有相同的BUCKET,如都是32个,而且还是排序的,亦即,在表定义中在CLUSTERED BY(key)后面增加如下约束:

1 SORTED  BY ( key )

则上述JOIN语句会执行一个Sort-Merge-Bucket (SMB) JOIN,同样需要设置如下参数来改变默认行为,优化JOIN时只遍历相关的BUCKET即可:

1 set  hive.input. format =org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
2 set  hive.optimize.bucketmapjoin =  true ;
3 set  hive.optimize.bucketmapjoin.sortedmerge =  true ;

关于更多的有关JOIN优化,可以参考后面的链接。



已有 0 人发表留言,猛击->> 这里<<-参与讨论


ITeye推荐



相关 [hive join] 推荐:

Hive中的join

- - CSDN博客云计算推荐文章
select a.* from a join b on a.id = b.id select a.* from a join b on (a.id = b.id and a.department = b.department). 在使用join写查询的时候有一个原则:应该将条目少的表或者子查询放在join操作符的左边.

hive join 优化 --小表join大表

- - CSDN博客云计算推荐文章
在小表和大表进行join时,将 小表放在前边,效率会高,hive会将小表进行缓存. 使用mapjoin将小表放入内存,在map端和大表逐一匹配,从而省去reduce. 在0.7版本后,也可以用配置来自动优化. 作者:smile0198 发表于2014-10-25 21:49:25 原文链接. 阅读:62 评论:0 查看评论.

Hive Join 优化 翻译

- - 数据库 - ITeye博客
翻译自  https://cwiki.apache.org/confluence/display/Hive/LanguageManual+JoinOptimization#LanguageManualJoinOptimization-AutoConversiontoSMBMapJoin. Join Optimization ----Join 调优.

Hive JOIN使用详解

- - 数据库 - ITeye博客
Hive是基于Hadoop平台的,它提供了类似SQL一样的查询语言HQL. 有了Hive,如果使用过SQL语言,并且不理解Hadoop MapReduce运行原理,也就无法通过编程来实现MR,但是你仍然可以很容易地编写出特定查询分析的HQL语句,通过使用类似SQL的语法,将HQL查询语句提交Hive系统执行查询分析,最终Hive会帮你转换成底层Hadoop能够理解的MR Job.

Hive高级查询(group by、 order by、 join等)

- - CSDN博客推荐文章
所有值不全为NULL时,加1操作 count(1). 不管有没有值,只要有这条记录,值就加1 count(col) col列里面的值为null,值不会加1,这个列里面的值不为NULL,才加1. sum(可转成数字的值) 返回bigint. avg(可转成数字的值)返回double. distinct不同值个数.

Hive中Join的原理和机制

- - 编程语言 - ITeye博客
Hive中Join的原理和机制. 笼统的说,Hive中的Join可分为Common Join(Reduce阶段完成join)和Map Join(Map阶段完成join). 本文简单介绍一下两种join的原理和机制. 如果不指定MapJoin或者不符合MapJoin的条件,那么Hive解析器会将Join操作转换成Common Join,即:在Reduce阶段完成join.

hive 配置文件以及join中null值的处理

- - CSDN博客云计算推荐文章
1.  三种设定方式:配置文件. ·   用户自定义配置文件:$HIVE_CONF_DIR/hive-site.xml. ·   默认配置文件:$HIVE_CONF_DIR/hive-default.xml. 用户自定义配置会覆盖默认配置. 另外,Hive也会读入Hadoop的配置,因为Hive是作为Hadoop的客户端启动的,Hadoop的配置文件包括.

Hive中小表与大表关联(join)的性能分析

- - 数据库 - ITeye博客
经常看到一些Hive优化的建议中说当小表与大表做关联时,把小表写在前面,这样可以使Hive的关联速度更快,提到的原因都是说因为小表可以先放到内存中,然后大表的每条记录再去内存中检测,最终完成关联查询. 这样的原因看似合理,但是仔细推敲,又站不住脚跟. 如果所谓的小表在内存中放不下怎么办. 我用2个只有几条记录的表做关联查询,这应该算是小表了,在查看reduce的执行日志时依然是有写磁盘的操作的.

转:hive表Join的倾斜问题以及解决方法

- - SQL - 编程语言 - ITeye博客
写HQL语句的时候常常会遇到表Join的情况,一个简单的Join会被Hive解释成一个MapReduce任务,Map端分别读取两个表的数据,Reduce做真正的Join操作. 如果执行的过程中,如果发现有些Reduce任务比其他的Reduce任务慢很多,往往是发生了倾斜问题.  Join会被Hive解释成一个MapReduce任务时,Map端输出的记录是以Join的条件为Key的,即这些Map生成的Key都是 cat_id.

hive中与hbase外部表join时内存溢出(hive处理mapjoin的优化器机制)

- - CSDN博客云计算推荐文章
与hbase外部表(wizad_mdm_main)进行join出现问题:. 最后在进行到0.83时,内存溢出失败. 默认情况下,Hive会自动将小表加到DistributeCache中,然后在Map扫描大表的时候,去和DistributeCache中的小表做join,这称为Mapjoin. 这里wizad_mdm_main是基于HBase的外部表,而这张表在HDFS上的源路径为 /hivedata/warehouse/wizad.db/wizad_mdm_main,实际这个目录为空,.