Hadoop优化 第一篇 : HDFS/MapReduce - leocook

标签: hadoop 优化 hdfs | 发表时间:2015-06-27 19:45 | 作者:leocook
出处:

比较惭愧,博客很久(半年)没更新了。最近也自己搭了个博客,wordpress玩的还不是很熟,感兴趣的朋友可以多多交流哈!地址是:http://www.leocook.org/

另外,我建了个QQ群:305994766,希望对大数据、算法研发、系统架构感兴趣的朋友能够加入进来,大家一起学习,共同进步(进群请说明自己的公司-职业-昵称)。

 

1.应用程序角度进行优化

1.1.减少不必要的reduce任务
若对于同一份数据需要多次处理,可以尝试先排序、分区,然后自定义InputSplit将某一个分区作为一个Map的输入,在Map中处理数据,将Reduce的个数设置为空。

1.2.外部文件引用
如字典、配置文件等需要在Task之间共享的数据,可使用分布式缓存DistributedCache或者使用-files

1.3.使用Combiner
combiner是发生在map端的,作用是归并Map端输出的文件,这样Map端输出的数据量就小了,减少了Map端和reduce端间的数据传输。需要注意的是,Combiner不能影响作业的结果;不是每个MR都可以使用Combiner的,需要根据具体业务来定;Combiner是发生在Map端的,不能垮Map来执行(只有Reduce可以接收多个Map任务的输出数据)

1.4.使用合适的Writable类型
尽可能使用二进制的Writable类型,例如:IntWritable, FloatWritable等,而不是Text。因为在一个批处理系统中将数值转换为文本时低效率的。使用二进制的Writable类型可以降低cpu资源的消耗,也可以减少Map端中间数据、结果数据占用的空间。

1.5.尽可能的少创建新的Java对象
a)需要注意的Writable对象,例如下面的写法:

public void map(...) {

for (String word : words) {
output.collect(new Text(word), new IntWritable(1));
}
}

这样会冲去创建对象new Text(word)和new IntWritable(1)),这样可能会产生海量的短周期对象。更高效的写法见下:

class MyMapper … {
Text wordText = new Text();
IntWritable one = new IntWritable(1);
public void map(...) {
for (String word: words) {
wordText.set(word);
output.collect(wordText, one);
}
}
}

b)对于可变字符串,使用StringBuffer而不是String

String类是经过final修饰的,那么每次对它的修改都会产生临时对象,而SB则不会。

 

2. Linux系统层面上的配置调优
2.1. 文件系统的配置
a) 关闭文件在被操作时会记下时间戳:noatime和nodiratime
b) 选择I/O性能较好的文件系统(Hadoop比较依赖本地的文件系统)

2.2. Linux文件系统预读缓冲区大小
命令blockdev

2.3. 去除RAID和LVM

2.4. 增大同时打开的文件数和网络连接数
ulimit
net.core.somaxconn

2.5. 关闭swap分区
在Hadoop中,对于每个作业处理的数据量和每个Task中用到的各种缓冲,用户都是完全可控的。
/etc/sysctl.conf

2.6. I/O调度器选择
详情见AMD的白皮书

 

3. Hadoop平台内参数调优
Hadoop相关可配置参数共有几百个,但是其中只有三十个左右会对其性能产生显著影响。
3.1. 计算资源优化
a) 设置合理的slot(资源槽位)
mapred.tasktracker.map.tasks.maximum / mapred.tasktracker.reduce.tasks.maximum
参数说明:每个TaskTracker上可并发执行的Map Task和Reduce Task数目
默认值:都是2
推荐值:根据具体的节点资源来看,推荐值是(core_per_node)/2~2*(cores_per_node)
单位:无

3.2. 节点间的通信优化
a) TaskTracker和JobTracker之间的心跳间隔
这个值太小的话,在一个大集群中会造成JobTracker需要处理高并发心跳,可能会有很大的压力。
建议集群规模小于300时,使用默认值3秒,在此基础上,集群规模每增加100台,会加1秒。
b) 启用带外心跳(out-of-band heartbeat)
mapreduce.tasktracker.outofband.heartbeat
参数说明:主要是为了减少任务分配延迟。它与常规心跳不同,一般的心跳是一定时间间隔发送的,而带外心跳是在任务运行结束或是失败时发送,这样就能在TaskTracker节点出现空闲资源的时候能第一时间通知JobTracker。

3.3. 磁盘块的配置优化
a) 作业相关的磁盘配置:mapred.local.dir
参数说明:map本地计算时所用到的目录,建议配置在多块硬盘上
b) 存储相关的磁盘配置(HDFS数据存储):dfs.data.dir
参数说明:HDFS的数据存储目录,建议配置在多块硬盘上,可提高整体IO性能
例如:

<property>
<name>dfs.name.dir</name>
<value>/data1/hadoopdata/mapred/jt/,/data2/hadoopdata/mapred/jt/</value>
</property>

c) 存储相关的磁盘配置(HDFS元数据存储):dfs.name.dir

参数说明:HDFS的元数据存储目录,建议设置多目录,每个多目录都可保存元数据的一个备份
注:要想提升hadoop整体IO性能,对于hadoop中用到的所有文件目录,都需要评估它磁盘IO的负载,对于IO负载可能会高的目录,最好都配置到多个磁盘上,以提示IO性能

3.4. RPC Handler个数和Http线程数优化
a) RPC Handler个数(mapred.job.tracker.handler.count)
参数说明:JobTracker需要并发的处理来自各个TaskTracker的RPC请求,可根据集群规模和并发数来调整RPC Handler的个数。
默认值:10
推荐值:60-70,最少要是TaskTracker个数的4%
单位:无
b) Http线程数(tasktracker.http.threads)
在Shuffle阶段,Reduce Task会通过Http请求从各个TaskTracker上读取Map Task的结果,TaskTracker是使用Jetty Server来提供服务的,这里可适量调整Jetty Server的工作线程以提高它的并发处理能力。
默认值:40
推荐值:50-80+

3.5. 选择合适的压缩算法
mapred.compress.map.output / Mapred.output.compress
map输出的中间结果时需要进行压缩的,指定压缩方式(Mapred.compress.map.output.codec/ Mapred.output.compress.codec)。推荐使用LZO压缩。

3.6. 启用批量任务调度(现在新版本都默认支持了)
a) Fair Scheduler
mapred.fairscheduler.assignmultiple
b) Capacity Scheduler

3.7. 启用预读机制(Apache暂时没有)
Hadoop是顺序读,所以预读机制可以很明显的提高HDFS的读性能。
HDFS预读:
dfs.datanode.readahead :true
dfs.datanode.readahead.bytes :4MB
shuffle预读:
mapred.tasktracker.shuffle.fadvise : true
mapred.tasktracker.shuffle.readahead.bytes : 4MB

3.8.HDFS相关参数优化
1) dfs.replication
参数说明:hdfs文件副本数
默认值:3
推荐值:3-5(对于IO较为密集的场景可适量增大)
单位:无
2) dfs.blocksize
参数说明:
默认值:67108864(64MB)
推荐值:稍大型集群建议设为128MB(134217728)或256MB(268435456)
单位:无
3) dfs.datanode.handler.count
参数说明:DateNode上的服务线程数
默认值:10
推荐值:
单位:无
4) fs.trash.interval
参数说明:HDFS文件删除后会移动到垃圾箱,该参数时清理垃圾箱的时间
默认值:0
推荐值:1440(1day)
单位:无
5) io.sort.factor
参数说明:当一个map task执行完之后,本地磁盘上(mapred.local.dir)有若干个spill文件,map task最后做的一件事就是执行merge sort,把这些spill文件合成一个文件(partition)。执行merge sort的时候,每次同时打开多少个spill文件由该参数决定。打开的文件越多,不一定merge sort就越快,所以要根据数据情况适当的调整。
默认值:10
推荐值:
单位:无
6) mapred.child.java.opts
参数说明:JVM堆的最大可用内存
默认值:-Xmx200m
推荐值:-Xmx1G | -Xmx4G | -Xmx8G
单位:-Xmx8589934592也行,单位不固定
7) io.sort.mb
参数说明:Map Task的输出结果和元数据在内存中占的buffer总大小,当buffer达到一定阀值时,会启动一个后台进程来对buffer里的内容进行排序,然后写入本地磁盘,形成一个split小文件
默认值:100
推荐值:200 | 800
单位:兆
8) io.sort.spill.percent
参数说明:即io.sort.mb中所说的阀值
默认值:0.8
推荐值:0.8
单位:无
9) io.sort.record
参数说明:io.sort.mb中分类给元数据的空间占比
默认值:0.05
推荐值:0.05
单位:无
10) Mapred.reduce.parallel
参数说明:Reduce shuffle阶段copier线程数。默认是5,对于较大集群,可调整为16~25
默认值:5
推荐值:16~25
单位:无

4.系统实现角度调优
https://www.xiaohui.org/archives/944.html
主要针对HDFS进行优化,HDFS性能低下的两个原因:调度延迟和可移植性


4.1. 调度延迟
关于调度延迟主要是发生在两个阶段:
a) tasktracker上出现空余的slot到该tasktracker接收到新的task;
b) tasktracker获取到了新的Task后,到连接上了datanode,并且可以读写数据。
之所以说这两个阶段不够高效,因为一个分布式计算系统需要解决的是计算问题,如果把过多的时间花费在其它上,就显得很不合适,例如线程等待、高负荷的数据传输。
下面解释下会经历上边两个阶段发生的过程:
a) 当tasktracker上出现slot时,他会调用heartbeat方法向jobtracker发送心跳包(默认时间间隔是3秒,集群很大时可适量调整)来告知它,假设此时有准备需要执行的task,那么jobtracker会采用某种调度机制(调度机制很重要,是一个可以深度研究的东东)选择一个Task,然后通过调用heartbeat方法发送心跳包告知tasktracker。在该过程中,HDFS一直处于等待状态,这就使得资源利用率不高。
b) 这个过程中所发生的操作都是串行化的:tasktracker会连接到namenode上获取到自己需要的数据在datanode上的存储情况,然后再从datanode上读数据,在该过程中,HDFS一直处于等待状态,这就使得资源利用率不高。
若能减短hdfs的等待时间;在执行task之前就开始把数据读到将要执行该task的tasktracker上,减少数据传输时间,那么将会显得高效很多。未解决此类问题,有这样几种解决方案:重叠I/O和CPU阶段(pipelining),task预取(task prefetching),数据预取(data prefetching)等。

4.2. 可移植性
Hadoop是Java写的,所以可移植性相对较高。由于它屏蔽了底层文件系统,所以无法使用底层api来优化数据的读写。在活跃度较高的集群里(例如共享集群),大量并发读写会增加磁盘的随机寻道时间,这会降低读写效率;在大并发写的场景下,还会增加大量的磁盘碎片,这样将会大大的增加了读数据的成本,hdfs更适合文件顺序读取。
对于上述问题,可以尝试使用下面的解决方案:
tasktracker现在的线程模型是:one thread per client,即每个client连接都是由一个线程处理的(包括接受请求、处理请求,返回结果)。那么这一块一个拆分成两个部分来做,一组线程来处理和client的通信(Client Threads),一组用于数据的读写(Disk Threads)。
想要解决上述两个问题,暂时没有十全十美的办法,只能尽可能的权衡保证调度延迟相对较低+可移植性相对较高。


4.3. 优化策略:Prefetching与preshuffling
a) Prefetching包括Block-intra prefetching和Block-inter prefetching:
Block-intra prefetching:对block内部数据处理方式进行了优化,即一边进行计算,一边预读将要用到的数据。这种方式需要解决两个难题:一个是计算和预取同步,另一个是确定合适的预取率。前者可以使用进度条(processing bar)的概念,进度条主要是记录计算数据和预读数据的进度,当同步被打破时发出同步失效的通知。后者是要根据实际情况来设定,可采用重复试验的方法来确定。
Block-inter prefetching:在block层面上预读数据,在某个Task正在处理数据块A1的时候,预测器能预测接下来将要读取的数据块A2、A3、A4,然后把数据块A2、A3、A4预读到Task所在的rack上。

b) preshuffling
数据被map task处理之前,由预测器判断每条记录将要被哪个reduce task处理,将这些数据交给靠近reduce task的map task来处理。

 

参考资料:
cloudera官方文档
http://blog.cloudera.com/blog/2009/12/7-tips-for-improving-mapreduce-performance/
AMD白皮书(较为实用)
http://www.admin-magazine.com/HPC/content/download/9408/73372/file/Hadoop_Tuning_Guide-Version5.pdf

国内博客(大部分内容都是AMD白皮书上的翻译):
http://dongxicheng.org/mapreduce/hadoop-optimization-0/
http://dongxicheng.org/mapreduce/hadoop-optimization-1/

 


本文链接: Hadoop优化 第一篇 : HDFS/MapReduce,转载请注明。

相关 [hadoop 优化 hdfs] 推荐:

Hadoop优化 第一篇 : HDFS/MapReduce - leocook

- - 博客园_首页
比较惭愧,博客很久(半年)没更新了. 最近也自己搭了个博客,wordpress玩的还不是很熟,感兴趣的朋友可以多多交流哈. 地址是:http://www.leocook.org/. 另外,我建了个QQ群:305994766,希望对大数据、算法研发、系统架构感兴趣的朋友能够加入进来,大家一起学习,共同进步(进群请说明自己的公司-职业-昵称).

Hadoop剖析之HDFS

- - CSDN博客数据库推荐文章
Hadoop的分布式文件系统(HDFS)是Hadoop的很重要的一部分,本文先简单介绍HDFS的几个特点,然后再分析背后的原理,即怎样实现这种特点的. 这是HDFS最核心的特性了,把大量数据部署在便宜的硬件上,即使其中某些磁盘出现故障,HDFS也能很快恢复丢失的数据. 这个的意思是HDFS适合一次写入,多次读取的程序,文件写入后,就不需要修改了.

Hadoop之HDFS子框架

- - CSDN博客云计算推荐文章
由图片可以看到HDFS主要包含这样几个功能组件. Namenode:存储文档的元数据信息,还有整个文件系统的目录结构. DataNode:存储文档块信息,并且文档块之间是有冗余备份的. 这里面提到了文档块的概念,同本地文件系统一样,HDFS也是按块存储的,只不过块的大小设置的相对大一些,默认为64M.

hadoop深入研究:(一)——hdfs介绍

- - CSDN博客云计算推荐文章
转载请注明出处: http://blog.csdn.net/lastsweetop/article/details/8992505. 这里的非常大是指几百MB,GB,TB.雅虎的hadoop集群已经可以存储PB级别的数据.  hdfs的高可用是用软件来解决,因此不需要昂贵的硬件来保障高可用性,各个生产商售卖的pc或者虚拟机即可.

flume写入hadoop hdfs报错 Too many open files

- - CSDN博客云计算推荐文章
网络搜索,怀疑linux nofile超过最大限制,当前设置大小1024,默认值. 而查看flume进程打开的文件数量为2932(这个比较奇怪,怎么超过1024了呢. 1.修改nfile配置文件,手工增加nofile的大小. 2.重启flume进程,也就是进程29828,问题解决. 作者:hijk139 发表于2013-2-17 16:37:34 原文链接.

Apache Hadoop 1.0.0支持Kerberos验证,支持Apache HBase,提供针对HDFS的RESTful API

- - InfoQ中文站
海量数据框架Apache Hadoop怀胎六年终于瓜熟蒂落发布1.0.0版本. 本次发布的核心特性包括支持Kerberos身份验证,支持Apache HBase,以及针对HDFS的RESTful API. InfoQ就此次发布请Apache Hadoop项目的VP——Arun Murthy回答了几个问题.

Hadoop集群与Hadoop性能优化

- - 学着站在巨人的肩膀上
本文讲解一下Hadoop集群、Hadoop性能优化、Hadoop机架感知实现、Hadoop配置等,下面是有关这些命令的具体介绍. Hadoop性能优化:Hadoop机架感知实现及配置:分布式的集群通常包含非常多的机器,由于受到机架槽位和交换机网口的限制,通常大型的分布式集群都会跨好几个机架,由多个机架上的机器共同组成一个分布式集群.

Hadoop分布式文件系统HDFS和OpenStack对象存储系统Swift有何不同?

- - ITeye博客
HDFS使用 集中式单一节点架构(NameNode)来维护文件系统元数据,而在Swift中,元数据 分布在整个集群中并拥有多个副本. 注意:集中式元数据存储使HDFS存在单点故障和扩展性问题,因此规模越大就性能越低,就越难扩展甚至不能扩展,所幸的是HDFS2使用NameNode HA和HDFS Federation解决了这两个问题.

Hadoop 优化总结(一)

- - 开源软件 - ITeye博客
自带的Text很好用,但是字符串转换开销较大,故根据实际需要自定义Writable,注意作为Key时要实现WritableCompareable接口. 避免output.collect(new Text( ),new Text()). 提倡key.set( ) value.set( ) output.collect(key,value).

HADOOP OS部分优化

- - 数据库 - ITeye博客
文件描述符是一个索引值,指向内核为每一个进程所维护的该进程打开文件的记录表. 当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符. 在程序设计中,一些涉及底层的程序编写往往会围绕着文件描述符展开,文件描述符这一概念往往只适用于UNIX、Linux这样的操作系统. 在Linux系列的操作系统上,由于Linux的设计思想便是把一切设备都视作文件.