加速MapReduce2
背景
原文链接: Getting MapReduce 2 Up to Speed因为文章中采用的改善措施,CDH 5 让MapReduce2中作业的速度和MapReduce1的速度一样(甚至更快)。
一旦你知道问题所在,性能改进的地方会很微小、简单,甚至乏味。困难的是找到问题所在:缩小问题的范围、进一步专研、实验、实验,不断的 实验。
这套规则也适用于Apache Hadoop。最近,Cloudera的工程师努力让Hadoop2(MR2/YARN)上MapReduce的性能赶上或者超过Hadoop1(MR1)上MapReduce的性能。在架构上,相比MR1,MR2有很多优势:
- 通过将JobTracker分成ResourceManager和Application Master,MR2有更好的拓展性
- 因为采用了更细粒度的资源调度,MR2有更好的资源利用率以及更大的吞吐量
- 由于采用了更加智能的排序缓冲管理技术,为避免溢出导致的通信量更少。
- 由于Application Master会将小作业的所有任务都运行在同一个JVM上,MR2上小作业的运行时间会更短
尽管这些改进都是很重要的,但是没有一点对中等集群上的中等作业有用。一旦Hadoop的代码库发生改变,集群性能很可能会降低。
如果运行出错,很容易找到问题所在;但是性能降低了,没有经过严格的测试,是很难找到问题所在的。去年在MR2上进行了性能测试,我发现在每一个基准测试上,MR1的性能都优于MR2。从那开始,我们做了很多努力,包括调整Cloudera Manager上的参数,以及修改MapReduce本身的运行环境。我们现在可以骄傲的说,在所有基准测试上,CDH5上MR2的性能等于或者优于MR1上的性能。
这篇文章中,我将分享一些经历,以作为分析复杂的(Java)分布式系统上性能倒退时的研究案例。
确保比较的客观性
必须保证MR1和MR2上性能的比较是客观的。作为使用最广泛的测试用例,TeraSort上存在陷阱。这是因为MR1和MR2上自带的TeraSort存在差异。TeraSort产生的数据可供GraySort使用。GraySort可以作为MR1和MR2间性能比较的基准测试。但是MR2上自带的TeraSort产生的数据的压缩率比较低。一个公平的比较应该保证MR1和MR2上使用的TeraSort的版本一样;否则,MR1将得到不公平的待遇。
另外一个重要的地方是资源配置。MR1中,节点的资源被分成运行map任务的map slots和运行reduce 任务的reduce slots。而MR2中,节点上的资源可供map任务和reduce任务同时使用。如果节点给MR1集群分配8个map slots、8个reduce slots,相同的节点给MR2集群分配16个slots所拥有的内存。在MR1的map阶段,资源将得不到充分的利用。MR2将能运行16个map任务而MR1仅能运行8个任务。如果节点只给MR2集群分配8个slots的内存,当map任务和reduce任务重叠时,MR2的性能会降低。此时MR2只能同时运行8个任务,而MR1上能运行更多任务。
为了避免这个问题,我们给MR1上的map slots和reduce slots都分配全部的节点资源。为了让map阶段和reduce阶段没有重叠,我们将参数 mapred.reduce.slowstart.completedmaps设置成0.99。这样可以保证MR1和MR2的两个阶段均能使用集群的所有资源。
案例1:Map输出排序时的CPU缓存本地性
进行性能分析时,我们发现了一个可以改进的地方。此案例中,我们发现WordCount上性能的降低:某个作业在MR1上只需运行375秒,在MR2集群上需要运行475秒,这比MR1上多运行了25%。
我们首先要分析的是MapReduce的那个阶段出现了性能降低。此案例中,Web UI反映MR2上map阶段的时间明显长于MR1上map阶段的时间。接着,我们寻找计数器上数字的不同。MR1和MR2上WordCount上计数器的值几乎一样,这没有提供任何帮助。
由于从计数器上没有发现问题所在,需要更进一步找到问题所在——排除其他因素干扰,重现这个实验。用LocalJobRunner运行只有一个map任务和一个reduce任务的WordCount作业,我们发现了map阶段的运行时间存在巨大差异。然而不运行reduce任务时,时间差异消失了。由于没有reduce任务运行时,排序是不会进行的。因此可能是在map阶段的排序过程中,MR2的性能出现了倒退。
Map阶段的排序
下一步需要知道map阶段排序的运行过程。
Map阶段输出的数据被存放在内存中的buffer里。当buffer填满时,框架对其进行排序,并将结果写入磁盘(“spill”)。一个独立的线程会将各个排序好了的磁盘文件写入一个大的排序文件。buffer由两个部分组成:连续存放原始数据的区段、用于存放各个记录在原始数据中位置的元数据区段。MR1中这些区段的大小是固定的,通过参数 io.sort.record.percent进行设置,这个参数也可以在每一个作业中单独配置。这意味着,当参数设置不合理的时候,这时作业碰巧又有很多小记录,相比于原始数据区段,元数据区段将会很快被填满。在还没有被完全使用时,buffer将很快被spill到磁盘上。
MAPREDUCE-64解决了MR2中的这个问题。MR2中这两个区段可以共享buffer空间,区段的大小可以动态调整,这意味着在最小化spill数目的时候,不再需要对参数 io.sort.record.percent进行设置。
考虑所有这些情况,我意识到我们还没有为作业调整参数 io.sort.record.percent,这种情况下,MR1 map任务spill的次数是MR2 map任务spill次数的10倍。当我调整MR1中这个参数使得作业spill的次数和MR2一样多的时候,MR1的性能降低了——“splli次数更少、buffer存放的记录更多”意味着更慢的map任务。
我推测可能是CPU缓存延迟照成的。更小的块可能更加适合CPU缓存,也就是说,当排序的时候,所有的内存加速都非常快,因此排序过程也非常快。更大的块可能不适合CPU缓存加速,这意味内存加速会在更高级别的缓存中进行,或者直接在内存中进行。由于在不同级别的缓存上内存操作的速度存在数量级的差别,因此这可能是问题的原因所在。
幸运的是,Linux上有一个强大的性能测试工具:perf。通过他,可以方便地测试上面的数据。通过命令“ perf stat -e cache-misses”可以查看CPU缓存错失数目。调整参数后的MR1和MR2拥有相似的CPU缓存错失数目,而没调整参数的MR1作业的错失数只有差不多一半。
CPU缓存延迟的改善
因此如何改善CPU缓存延迟呢?在Todd Lipcon的建议下,我看了 MAPREDUCE-3235。这是一年前提出的现在尚未解决的JIRA,针对前面的情况, MAPREDUCE-3235提出了很多建议。这些建议值得尝试。不对meta数组的索引进行排序,而是直接对数组本身进行排序。以前的版本中,为了加速map输出记录,我发现首先会存储第n个记录在meta数组中的索引,接着存放记录key的位置,接着存放value在原始数据的位置。现在,我们首先存放第n个元素key的位置,接着存放value在原始数据中的位置。
这种方法减少了重新定位这一层,意味着减少了内存定位的过程。缺点是,当进行swap操作时,将会对元数据的一个实体(大约20个字节)进行操作,而不是对索引(4个字节)进行操作。而缓存外内存加速的代价大于缓存内额外的内存移动的代价,因此这种方法值得一试。
为什么会加速呢?这是因为排序时,我们将可以操作一段连续的内存空间,而操作其中一块时,可以进行CPU缓存加速。以前方法在进行寻址或者移动时,因为缓存区的元数据区段比较大,可能没法利用CPU缓存加速。
这个小的改变就像魔术一样。这种方法减少了一半CPU缓存丢失的次数,使得MR2作业的运行时间比MR1作业的运行时间更快。太棒了!
案例2:Shuffle阶段的过度读写
从Cloudera合作伙伴的一份报告中,我知道了MR2中作业的shuffle阶段的磁盘读写次数比MR1的更多。为了重现这个问题,我使用了基准测试用例ShuffleText,通过它,我们可以分析shuffle阶段的性能。Map slots会产生很多数据,然后进行shuffle,之后将数据传给reduce slots。
在伪分布模式下,我没有重现这个问题;但当我在集群上运行作业时,问题就显现出来了。MR2上作业运行的时间比MR1上运行的时间长30%。更加戏剧性的是,MR2中reduce取回map数据的时间是60秒,而MR1中只要27秒。
在很多情况下MapReduce计数器都非常有用,但是它不会提供节点的硬件度量,如磁盘上对硬件进行读操作的次数。Cloudera Manager能测量对节点的硬件操作,也能帮助我们找到问题所在。当作业运行时,我查到了集群磁盘读的字节数:MR2上共有31.2GB,而MR1上只有6.4GB。
在MapReduce作业中,只有几个地方进行了磁盘读。1)从磁盘上读入输入文件;2)当有很多spills时,对map输出的数据进行融合;3)将map阶段的数据移动到reduce端的shuffle阶段;4)reduce阶段对数据进行融合。
MR1将中间数据移动到reduce任务的进程是TaskTracker,而MR2的是NodeManager。从Clouder Manager上观察到的这些进程中,将数据移动到reduce端时,MR2进行了更多的磁盘读。通过读源码,我发现以前shuffle阶段用Jetty web server来处理数据,而MR2用Netty async IO来处理数据。MR2中,会用单独的线程来响应每一个请求,但是这没有反映为什么会产生这些问题。将shuffle阶段的磁盘读的次数打印出来,也没有得到任何有用的帮助。
我得到了一些信息:
- 在MR1和MR2中,shuffle阶段磁盘读的字节数,都比总的map输出的字节数少。
- MR1中,shuffle阶段的磁盘读主要发生在两个节点上,而其余节点的磁盘读几乎可以忽略。
- 这两个节点的物理内存比集群上其他节点的少。
能同时满足这几点的推测是,在MR1中,大多数的map输出都能放在OS缓存区的内存中。这两个节点的磁盘读操作比较多,是因为他们的物理内存比较少,因此不能将map输出的数据放在缓存区中。因为某个原因,MR2没有采用MR1中的这个特性。
fadvise
从源码中,我发现在和OS缓存进行交互时,MR1和MR2都非常小心。Linux中fadvise系统调用提供了OS内存子系统用于判断是否需要将文件放到缓存区。当shuffle服务接收到map输出数据的请求时,fadvise将数据存在内存中,可以通过FADV_WILLNEED操作说明数据在内存中,以后可以直接从内存中将数据读出来。当处理完一个文件时,fadvise会用FADV_DONTEED操作将文件从内存中释放出来以节省空间,因为内存中的文件不会再被使用了。
代码中没有明显的逻辑错误,下一步是找到更加直观的问题。命令“strace”可以跟踪进程所有的系统调用。我用它监听了fadvises、文件打开、文件关闭、文件读等进程。这些数据没有得到有用的信息。如果只记录WILLNEEDS和DONTNEED操作的次数呢?
对于WILLNEEDS,MR1和MR2进行的次数差不多。而对于DONTNEEDs,MR1的每个节点上进行了768次,近似等于那个节点上的map任务数目乘以所有reduce任务的数目。但是MR2上有更多的次数,且每次运行时,次数都不一样。
所有的map输出都到了相应的地方。普通情况下,reducers会对map的输出发出请求,当不需要这些输出时,将链接中断。这是因为reducer读map输出的内存是有限制的,而reducer可以从NodeManager/TaskTracker反馈的信息中得到map输出将占用的空间。(换句话说,在请求之前,reducer是不知道map输出所占用的空间的,因此当知道这一信息之后,reducer需要中断链接,当空间足够后再进行链接。)
如果reducer中断了链接,shuffle服务将不会从OS缓存中移除文件(不会进行DONTNEED)。这是因为reducer还会再次访问这些数据。MR1是按照这种方式进行的。而MR2不是,这意味着即使map输出时会将文件放入内存中,当reducer中断第一次请求时,shuffle会将文件移除OS缓存区。Reducer再次访问数据时,必须直接从磁盘上读。
因此在修订这个性能退化时,只需保证在reducer还没有完全取走map输出的数据前,shuffle不执行fadvise的DONTNEED操作。这个改变将reducer取中间数据的时间从60秒缩小到了27秒,和MR1时间一样。作业的平均运行时间也降低了30%,这和MR1的时间是一样的。
聪明的读者可能想到了一种更好的解决方案。在reducer取走map输出的数据前,有进程告知reducer map输出的数据量。这避免了reducer无法将map输出移到reducer缓存时,shuffle阶段reducer向map发出的多余服务请求。(我们可能在将来的工作中实现这一点)。
总结
我希望在进行性能退后的分析中,读者能从这些案例中得到帮助。当你用Eclipse编程时,疑惑着是否可以让一切都顺利进行,这些经历也许能给你一些小小的安慰。
感谢这些改进,感谢所有来自Cloudera的工程师和社区的开发者(MR2中这些性能的改进将会在Hadoop2.3中出现),我们骄傲的宣布CDH 5中出现的MR2,将会和MR1的性能一样,甚至更加出色。