Hive性能优化小结

标签: | 发表时间:2018-05-07 22:25 | 作者:
出处:https://mp.weixin.qq.com

在工作中使用hive比较多,也写了很多HiveQL,对于那些执行比较慢的语句,看着那些执行慢的任务显示的进度真的是欲哭无泪。
是真的数据量比较大,计算比较复杂,还是还没将相关参数设置最优呢。这里对Hive常用的一些性能优化进行了总结。

列裁剪

Hive在读数据的时候,可以只读取查询中所需要用到的列,而忽略其他的列。这样做可以节省读取开销,中间表存储开销和数据整合开销。

    sethive.optimize.cp=true; // 默认为true      

分区裁剪

在查询的过程中只选择需要的分区,可以减少读入的分区数目,减少读入的数据量。

    sethive.optimize.pruner=true; // 默认为true      

表连接优化

小表前置

Hive假定查询中最后的一个表时大表。它会将其他表缓存起来,然后扫描最后那个表。
因此通常需要将小表放在前面,或者标记那张表是大表: /*streamtable(table_name)*/

使用相同的连接键

在hive中,当对3个或更多张表进行join时,如果on条件使用相同字段,那么它们会合并为一个MapReduce Job,利用这种特性,可以将相同的join on的放入一个job来节省执行时间。

优先过滤数据

尽量减少每个阶段的数据量,对于分区表能用上分区字段的尽量使用,同时只选择后面需要使用到的列,最大限度的减少参与join的数据量。

启用mapjoin

mapjoin是将join双方比较小的表直接分发到各个map进程的内存中,在map进程中进行join操作,这样就可以不用进行reduce步骤,从而提高了速度。只有join操作才能启用mapjoin。

    sethive.auto.convert.join =true; // 是否根据输入小表的大小,自动将reduce端的common join 转化为map join,将小表刷入内存中。      
sethive.mapjoin.smalltable.filesize =2500000; // 刷入内存表的大小(字节)
sethive.mapjoin.maxsize=1000000;  // Map Join所处理的最大的行数。超过此行数,Map Join进程会异常退出

尽量原子操作

尽量避免一个SQL包含复杂的逻辑,可以使用中间表来完成复杂的逻辑。

并行执行

hive会将一个查询任务转化为一个或多个阶段。默认情况下,一次只执行一个阶段。如果某些阶段不是互相依赖的,是可以并行执行的,这样可以缩短整个job执行时间。

    sethive.exec.parallel=true;   // 可以开启并发执行。      
sethive.exec.parallel.thread.number=16; // 同一个sql允许最大并行度,默认为8。

压缩数据

对数据进行压缩不仅可以减少数据的大小,还可以节省磁盘的读写时间。在Hive查询中,可以对中间数据和最终结果数据进行压缩。

中间数据压缩

中间数据压缩就是对hive查询的多个job之间的数据进行压缩。最好是选择一个节省CPU耗时的压缩方式。可以采用 snappy压缩算法,该算法的压缩和解压效率都非常高。

    sethive.exec.compress.intermediate=true;      
sethive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
sethive.intermediate.compression.type=BLOCK;

结果数据压缩

最终的结果数据(Reducer输出数据)也是可以进行压缩的,可以选择一个压缩效果比较好的,可以减少数据的大小和数据的磁盘读写时间;
注:常用的gzip,snappy压缩算法是不支持并行处理的,如果数据源是gzip/snappy压缩文件大文件,这样只会有有个mapper来处理这个文件,会严重影响查询效率。
所以如果结果数据需要作为其他查询任务的数据源,可以选择支持splitable的 LZO算法,这样既能对结果文件进行压缩,还可以并行的处理,这样就可以大大的提高job执行的速度了。关于如何给Hadoop集群安装LZO压缩库可以查看 这篇文章

    sethive.exec.compress.output=true;      
setmapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;
setmapred.output.compression.type=BLOCK:

Hadoop集群支持一下算法:

  • org.apache.hadoop.io.compress.GzipCodec

  • org.apache.hadoop.io.compress.SnappyCodec

  • com.hadoop.compression.lzo.LzopCodec

  • org.apache.hadoop.io.compress.Lz4Codec

本地化执行

对于小数据集,可以通过本地模式,在单台机器上处理所有任务,执行时间明显被缩短

    setmapred.job.tracker=local;      
sethive.exec.mode.local.auto=true;

当一个job满足下面条件才能真正使用本地模式:

  • job的输入数据大小必须小于参数 hive.exec.mode.local.inputbytes.max(默认128M)

  • job的map数必须小于参数 hive.exec.mode.local.auto.tasks.max(默认4)

  • job的reduce数必须为0或者1

Map端聚合优化

很多聚合操作可以先在Map端进行部分聚合,然后在Reduce端得出最终结果。

    hive.map.aggr=true; // 用于设定是否在 map 端进行聚合,默认值为真      
hive.groupby.mapaggr.checkinterval=100000; // 用于设定 map 端进行聚合操作的条目数

合并小文件

Map输入合并

在执行MapReduce程序的时候,一般情况是一个文件需要一个mapper来处理。但是如果数据源是大量的小文件,这样岂不是会启动大量的mapper任务,这样会浪费大量资源。可以将输入的小文件进行合并,从而减少mapper任务数量。 详细分析

    sethive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; // Map端输入、合并文件之后按照block的大小分割(默认)      
sethive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; // Map端输入,不合并 一个文件起一个Map

Map/Reduce输出合并

大量的小文件会给HDFS带来压力,影响处理效率。可以通过合并Map和Reduce的结果文件来消除影响。

    sethive.merge.mapfiles=true;  // 是否合并Map输出文件, 默认值为真      
sethive.merge.mapredfiles=true; /// 是否合并Reduce 端输出文件,默认值为假
sethive.merge.size.per.task=25610001000; // 合并文件的大小,默认值为 256000000

控制map/reduce任务数量

控制mapper数量

减少mapper数可以通过合并小文件来实现
增加mapper数可以通过控制上一个reduce

默认的mapper个数计算方式

输入文件总大小:total_size        
hdfs设置的数据量大小:dfs_block_size        
default_mapper_num=total_size/dfs_block_size

    setmapred.map.tasks=10;      

从字面上看,貌似是可以直接设置mapper个数的样子,但是很遗憾不行,这个参数设置只有在大于 default_mapper_num的时候,才会生效。

那如果我们需要减少mapper数量,但是文件大小是固定的,那该怎么办呢?

可以通过 mapred.min.split.size设置每个任务处理的文件的大小,这个大小只有在大于 dfs_block_size的时候才会生效

split_size=max(mapred.min.split.size, dfs_block_size)          
split_num=total_size/split_size                  
compute_map_num = min(split_num,  max(default_mapper_num, mapred.map.tasks))

这样就可以减少mapper数量了。

总结一下控制mapper个数的方法:

  • 如果想增加mapper个数,可以设置 mapred.map.tasks为一个较大的值

  • 如果想减少mapper个数,可以设置 maperd.min.split.size为一个较大的值

  • 如果输入是大量小文件,想减少mapper个数,可以通过设置 set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;合并小文件

如果想要调整mapper个数,在调整之前,需要确定处理的文件大概大小以及文件的存在形式(是大量小文件,还是单个大文件),然后再设置合适的参数。

控制reducer数量

如果reducer数量过多,一个reducer会产生一个结数量果文件,这样就会生成很多小文件,那么如果这些结果文件会作为下一个job的输入,则会出现小文件需要进行合并的问题,而且启动和初始化reducer需要耗费和资源。

如果reducer数量过少,这样一个reducer就需要处理大量的数据,并且还有可能会出现数据倾斜的问题,使得整个查询耗时长。
默认情况下,hive分配的reducer个数由下列参数决定:

  • 参数1:hive.exec.reducers.bytes.per.reducer(默认1G)

  • 参数2:hive.exec.reducers.max(默认为999)

reducer的计算公式为:

N=min(参数2, 总输入数据量/参数1)

可以通过改变上述两个参数的值来控制reducer的数量。
也可以通过

    setmapred.map.tasks=10;      

直接控制reducer个数,如果设置了该参数,上面两个参数就会忽略。

group by数据倾斜优化

在实际业务中,通常是数据集中在某些点上,这样在进行数据分组的时候,某一些组上数据量非常大,而其他的分组上数据量很小,在MapReduce程序中,同一个分组的数据会分配到同一个reduce上进行操作,这样会导致某一些reduce压力很大,一些reduce压力很小,这就是数据倾斜,整个job执行时间取决于那个执行最慢的那个reduce。

    sethive.groupby.skewindata=false;//决定 group by 操作是否支持倾斜的数据。注意:只能对单个字段聚合      

当上面选项设置为 true的时候,生成的查询任务会生成两个MapReduce Job。第一个Job,map的输出结果会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的group by key有可能被分发到不同的reduce中,从而达到负载均衡的目的;第二个Job再根据预处理的数据结果按照group by key分布到reduce中,这个过程可以保证相同的group by key被分布到同一个reduce中,最后完成最终的聚合操作。

JVM重用

JVM重用对Hive的性能具有非常大的影响,特别是对于很难避免小文件的场景或者task特别多的场景,这类场景大多数执行时间都很短。hadoop默认配置是使用派生JVM来执行Map和Reduce任务的,JVM的启动过程会造成相当大的开销。尤其是执行的job包含成千上万个task任务的情况。

JVM重用可以使得JVM实例在同一个job中重新使用N次,N的值可以在hadoop的配置文件 mapred-site.xml文件中进行配置。

    set mapred.job.reuse.jvm.num.tasks=20;      

当然,JVM也是有缺点的,开启JVM重用会一直占用使用到的task的插槽,以便进行重用,知道任务完成后才会释放。如果某个 不平衡的job中有几个reduce task执行的时间要比其他的reduce task消耗的时间要多得多的话,那么保留的插槽就会一直空闲却无法被其他的job使用,直到所有的task都结束了才会释放。

使用列式存储

对于字段比较多,并且在大部分查询的时候只会使用部分列的时候,创建表的时候,可以设置成orc/parquet列式存储格式。因为列式存储的表,每一列的数据在物理上是存储在一起的,Hive在查询的时候只会遍历需要的列数据,从而可以大大减少处理的数据量。


喜欢本文的朋友们,欢迎长按下图关注公众号DigNew,收看更多精彩内容


相关 [hive 性能优化] 推荐:

Hive性能优化小结

- -
在工作中使用hive比较多,也写了很多HiveQL,对于那些执行比较慢的语句,看着那些执行慢的任务显示的进度真的是欲哭无泪. 是真的数据量比较大,计算比较复杂,还是还没将相关参数设置最优呢. 这里对Hive常用的一些性能优化进行了总结. Hive在读数据的时候,可以只读取查询中所需要用到的列,而忽略其他的列.

Hive性能优化 - 哥不是小萝莉

- - 博客园_首页
  继续《 那些年使用Hive踩过的坑》一文中的剩余部分,本篇博客赘述了在工作中总结Hive的常用优化手段和在工作中使用Hive出现的问题.   首先,我们来看看Hadoop的计算框架特性,在此特性下会衍生哪些问题. 数据量大不是问题,数据倾斜是个问题. jobs数比较多的作业运行效率相对比较低,比如即使有几百行的表,如果多次关联多次汇总,产生十几个jobs,耗时很长.

数据仓库中的SQL性能优化(Hive篇)

- - 奔跑的兔子
一个Hive查询生成多个map reduce job,一个map reduce job又有map,reduce,spill,shuffle,sort等多个阶段,所以针对hive查询的优化可以大致分为针对MR中单个步骤的优化(其中又会有细分),针对MR全局的优化,和针对整个查询(多MR job)的优化,下文会分别阐述.

深入浅出数据仓库中SQL性能优化之Hive篇

- - 极客521 | 极客521
一个Hive查询生成多个Map Reduce Job,一个Map Reduce Job又有Map,Reduce,Spill,Shuffle,Sort等多个阶段,所以针对Hive查询的优化可以大致分为针对MR中单个步骤的优化(其中又会有细分),针对MR全局的优化,和针对整个查询(多MRJob)的优化,下文会分别阐述.

hive调优

- - 互联网 - ITeye博客
一、    控制hive任务中的map数: . 1.    通常情况下,作业会通过input的目录产生一个或者多个map任务. 主要的决定因素有: input的文件总个数,input的文件大小,集群设置的文件块大小(目前为128M, 可在hive中通过set dfs.block.size;命令查看到,该参数不能自定义修改);.

hive 优化 tips

- - CSDN博客推荐文章
一、     Hive join优化. 也可以显示声明进行map join:特别适用于小表join大表的时候,SELECT /*+ MAPJOIN(b) */ a.key, a.value FROM a join b on a.key = b.key. 2.     注意带表分区的join, 如:.

Hive中的join

- - CSDN博客云计算推荐文章
select a.* from a join b on a.id = b.id select a.* from a join b on (a.id = b.id and a.department = b.department). 在使用join写查询的时候有一个原则:应该将条目少的表或者子查询放在join操作符的左边.

hive优化(2)

- - 开源软件 - ITeye博客
Hive是将符合SQL语法的字符串解析生成可以在Hadoop上执行的MapReduce的工具. 使用Hive尽量按照分布式计算的一些特点来设计sql,和传统关系型数据库有区别,. 所以需要去掉原有关系型数据库下开发的一些固有思维. 1:尽量尽早地过滤数据,减少每个阶段的数据量,对于分区表要加分区,同时只选择需要使用到的字段.

hive优化

- - 开源软件 - ITeye博客
hive.optimize.cp=true:列裁剪. hive.optimize.prunner:分区裁剪. hive.limit.optimize.enable=true:优化LIMIT n语句. hive.limit.optimize.limit.file=10:最大文件数.   1.job的输入数据大小必须小于参数:hive.exec.mode.local.auto.inputbytes.max(默认128MB).

Hive优化

- - 互联网 - ITeye博客
     使用Hive有一段时间了,目前发现需要进行优化的较多出现在出现join、distinct的情况下,而且一般都是reduce过程较慢.      Reduce过程比较慢的现象又可以分为两类:. 情形一:map已经达到100%,而reduce阶段一直是99%,属于数据倾斜. 情形二:使用了count(distinct)或者group by的操作,现象是reduce有进度但是进度缓慢,31%-32%-34%...一个附带的提示是使用reduce个数很可能是1.