Hive性能优化小结
在工作中使用hive比较多,也写了很多HiveQL,对于那些执行比较慢的语句,看着那些执行慢的任务显示的进度真的是欲哭无泪。
是真的数据量比较大,计算比较复杂,还是还没将相关参数设置最优呢。这里对Hive常用的一些性能优化进行了总结。
列裁剪
Hive在读数据的时候,可以只读取查询中所需要用到的列,而忽略其他的列。这样做可以节省读取开销,中间表存储开销和数据整合开销。
sethive.optimize.cp=true; // 默认为true
分区裁剪
在查询的过程中只选择需要的分区,可以减少读入的分区数目,减少读入的数据量。
sethive.optimize.pruner=true; // 默认为true
表连接优化
小表前置
Hive假定查询中最后的一个表时大表。它会将其他表缓存起来,然后扫描最后那个表。
因此通常需要将小表放在前面,或者标记那张表是大表: /*streamtable(table_name)*/
使用相同的连接键
在hive中,当对3个或更多张表进行join时,如果on条件使用相同字段,那么它们会合并为一个MapReduce Job,利用这种特性,可以将相同的join on的放入一个job来节省执行时间。
优先过滤数据
尽量减少每个阶段的数据量,对于分区表能用上分区字段的尽量使用,同时只选择后面需要使用到的列,最大限度的减少参与join的数据量。
启用mapjoin
mapjoin是将join双方比较小的表直接分发到各个map进程的内存中,在map进程中进行join操作,这样就可以不用进行reduce步骤,从而提高了速度。只有join操作才能启用mapjoin。
sethive.auto.convert.join =true; // 是否根据输入小表的大小,自动将reduce端的common join 转化为map join,将小表刷入内存中。
sethive.mapjoin.smalltable.filesize =2500000; // 刷入内存表的大小(字节)
sethive.mapjoin.maxsize=1000000; // Map Join所处理的最大的行数。超过此行数,Map Join进程会异常退出
尽量原子操作
尽量避免一个SQL包含复杂的逻辑,可以使用中间表来完成复杂的逻辑。
并行执行
hive会将一个查询任务转化为一个或多个阶段。默认情况下,一次只执行一个阶段。如果某些阶段不是互相依赖的,是可以并行执行的,这样可以缩短整个job执行时间。
sethive.exec.parallel=true; // 可以开启并发执行。
sethive.exec.parallel.thread.number=16; // 同一个sql允许最大并行度,默认为8。
压缩数据
对数据进行压缩不仅可以减少数据的大小,还可以节省磁盘的读写时间。在Hive查询中,可以对中间数据和最终结果数据进行压缩。
中间数据压缩
中间数据压缩就是对hive查询的多个job之间的数据进行压缩。最好是选择一个节省CPU耗时的压缩方式。可以采用 snappy
压缩算法,该算法的压缩和解压效率都非常高。
sethive.exec.compress.intermediate=true;
sethive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
sethive.intermediate.compression.type=BLOCK;
结果数据压缩
最终的结果数据(Reducer输出数据)也是可以进行压缩的,可以选择一个压缩效果比较好的,可以减少数据的大小和数据的磁盘读写时间;
注:常用的gzip,snappy压缩算法是不支持并行处理的,如果数据源是gzip/snappy压缩文件大文件,这样只会有有个mapper来处理这个文件,会严重影响查询效率。
所以如果结果数据需要作为其他查询任务的数据源,可以选择支持splitable的 LZO
算法,这样既能对结果文件进行压缩,还可以并行的处理,这样就可以大大的提高job执行的速度了。关于如何给Hadoop集群安装LZO压缩库可以查看 这篇文章。
sethive.exec.compress.output=true;
setmapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;
setmapred.output.compression.type=BLOCK:
Hadoop集群支持一下算法:
-
org.apache.hadoop.io.compress.GzipCodec
-
org.apache.hadoop.io.compress.SnappyCodec
-
com.hadoop.compression.lzo.LzopCodec
-
org.apache.hadoop.io.compress.Lz4Codec
本地化执行
对于小数据集,可以通过本地模式,在单台机器上处理所有任务,执行时间明显被缩短
setmapred.job.tracker=local;
sethive.exec.mode.local.auto=true;
当一个job满足下面条件才能真正使用本地模式:
-
job的输入数据大小必须小于参数
hive.exec.mode.local.inputbytes.max
(默认128M) -
job的map数必须小于参数
hive.exec.mode.local.auto.tasks.max
(默认4) -
job的reduce数必须为0或者1
Map端聚合优化
很多聚合操作可以先在Map端进行部分聚合,然后在Reduce端得出最终结果。
hive.map.aggr=true; // 用于设定是否在 map 端进行聚合,默认值为真
hive.groupby.mapaggr.checkinterval=100000; // 用于设定 map 端进行聚合操作的条目数
合并小文件
Map输入合并
在执行MapReduce程序的时候,一般情况是一个文件需要一个mapper来处理。但是如果数据源是大量的小文件,这样岂不是会启动大量的mapper任务,这样会浪费大量资源。可以将输入的小文件进行合并,从而减少mapper任务数量。 详细分析
sethive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; // Map端输入、合并文件之后按照block的大小分割(默认)
sethive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; // Map端输入,不合并 一个文件起一个Map
Map/Reduce输出合并
大量的小文件会给HDFS带来压力,影响处理效率。可以通过合并Map和Reduce的结果文件来消除影响。
sethive.merge.mapfiles=true; // 是否合并Map输出文件, 默认值为真
sethive.merge.mapredfiles=true; /// 是否合并Reduce 端输出文件,默认值为假
sethive.merge.size.per.task=25610001000; // 合并文件的大小,默认值为 256000000
控制map/reduce任务数量
控制mapper数量
减少mapper数可以通过合并小文件来实现
增加mapper数可以通过控制上一个reduce
默认的mapper个数计算方式
输入文件总大小:total_size
hdfs设置的数据量大小:dfs_block_size
default_mapper_num=total_size/dfs_block_size
setmapred.map.tasks=10;
从字面上看,貌似是可以直接设置mapper个数的样子,但是很遗憾不行,这个参数设置只有在大于 default_mapper_num
的时候,才会生效。
那如果我们需要减少mapper数量,但是文件大小是固定的,那该怎么办呢?
可以通过 mapred.min.split.size
设置每个任务处理的文件的大小,这个大小只有在大于 dfs_block_size
的时候才会生效
split_size=max(mapred.min.split.size, dfs_block_size)
split_num=total_size/split_size
compute_map_num = min(split_num, max(default_mapper_num, mapred.map.tasks))
这样就可以减少mapper数量了。
总结一下控制mapper个数的方法:
-
如果想增加mapper个数,可以设置
mapred.map.tasks
为一个较大的值 -
如果想减少mapper个数,可以设置
maperd.min.split.size
为一个较大的值 -
如果输入是大量小文件,想减少mapper个数,可以通过设置
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
合并小文件
如果想要调整mapper个数,在调整之前,需要确定处理的文件大概大小以及文件的存在形式(是大量小文件,还是单个大文件),然后再设置合适的参数。
控制reducer数量
如果reducer数量过多,一个reducer会产生一个结数量果文件,这样就会生成很多小文件,那么如果这些结果文件会作为下一个job的输入,则会出现小文件需要进行合并的问题,而且启动和初始化reducer需要耗费和资源。
如果reducer数量过少,这样一个reducer就需要处理大量的数据,并且还有可能会出现数据倾斜的问题,使得整个查询耗时长。
默认情况下,hive分配的reducer个数由下列参数决定:
-
参数1:hive.exec.reducers.bytes.per.reducer(默认1G)
-
参数2:hive.exec.reducers.max(默认为999)
reducer的计算公式为:
N=min(参数2, 总输入数据量/参数1)
可以通过改变上述两个参数的值来控制reducer的数量。
也可以通过
setmapred.map.tasks=10;
直接控制reducer个数,如果设置了该参数,上面两个参数就会忽略。
group by数据倾斜优化
在实际业务中,通常是数据集中在某些点上,这样在进行数据分组的时候,某一些组上数据量非常大,而其他的分组上数据量很小,在MapReduce程序中,同一个分组的数据会分配到同一个reduce上进行操作,这样会导致某一些reduce压力很大,一些reduce压力很小,这就是数据倾斜,整个job执行时间取决于那个执行最慢的那个reduce。
sethive.groupby.skewindata=false;//决定 group by 操作是否支持倾斜的数据。注意:只能对单个字段聚合
当上面选项设置为 true
的时候,生成的查询任务会生成两个MapReduce Job。第一个Job,map的输出结果会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的group by key有可能被分发到不同的reduce中,从而达到负载均衡的目的;第二个Job再根据预处理的数据结果按照group by key分布到reduce中,这个过程可以保证相同的group by key被分布到同一个reduce中,最后完成最终的聚合操作。
JVM重用
JVM重用对Hive的性能具有非常大的影响,特别是对于很难避免小文件的场景或者task特别多的场景,这类场景大多数执行时间都很短。hadoop默认配置是使用派生JVM来执行Map和Reduce任务的,JVM的启动过程会造成相当大的开销。尤其是执行的job包含成千上万个task任务的情况。
JVM重用可以使得JVM实例在同一个job中重新使用N次,N的值可以在hadoop的配置文件 mapred-site.xml
文件中进行配置。
set mapred.job.reuse.jvm.num.tasks=20;
当然,JVM也是有缺点的,开启JVM重用会一直占用使用到的task的插槽,以便进行重用,知道任务完成后才会释放。如果某个 不平衡的job
中有几个reduce task执行的时间要比其他的reduce task消耗的时间要多得多的话,那么保留的插槽就会一直空闲却无法被其他的job使用,直到所有的task都结束了才会释放。
使用列式存储
对于字段比较多,并且在大部分查询的时候只会使用部分列的时候,创建表的时候,可以设置成orc/parquet列式存储格式。因为列式存储的表,每一列的数据在物理上是存储在一起的,Hive在查询的时候只会遍历需要的列数据,从而可以大大减少处理的数据量。
喜欢本文的朋友们,欢迎长按下图关注公众号DigNew,收看更多精彩内容