NativeTask:利用本地执行引擎加速Hadoop
文 / 常冰琳,杨栋
NativeTask是Hadoop MapReduce的高效执行引擎实现。与MapReduce相比,NativeTask获得了不错的性能提升,主要包括更好的排序实现、关键路径避免序列化、避免复杂抽象、更好的利用压缩等。
简介
NativeTask是一个高性能MapReduce执行单元,支持C++接口。顾名思义,NativeTask是一个本地数据处理引擎,专注于数据处理本身,在MapReduce的环境下,它仅替换Task模块功能。换句话说,NativeTask并不关心资源管理、作业调度和容错,这些功能仍旧由原有的模块完成,而实际的数据处理由这个高性能处理引擎完成。任务级别的数据处理占用Hadoop集群绝大部分资源,而利用NativeTask的高效性能可以显著提高数据分析速度、降低成本。
NativeTask目前还是一个正在开发中的开源项目,虽然部分功能还没有完成,但已能体现明显的性能优势。该项目的另一个目的是提供一个本地MapReduce开发接口,以便能在此基础上构建更加高效的大数据处理工具,例如下面两种情形。
数据仓库工具。开源实现,如Hive,相比商业分析型数据库的性能还有很大劣势。目前最先进的查询执行技术,例如轻量级压缩、向量化执行、LLVM动态编译等,使用本地语言更容易实现。
数据挖掘和机器学习算法库。这类应用通常都是计算密集型,需要多次迭代计算。另外有大量的数值分析和机器学习库是本地库,C++环境可以更容易利用到这些算法库。
对已经熟悉Hadoop的用户来说,NativeTask的接口和使用方法与Hadoop Pipes类似,用户通过NativeTask提供的头文件和动态库,编译生成自己的应用库,提交作业到Hadoop集群执行。NativeTask有以下特点。
- 高性能,加速作业执行并节约硬件资源。
- C++接口,应用可以更方便地使用各种优化技术,例如SSE/AVX指令优化、LLVM动态编译、GPU计算等。
- 纯二进制接口,避免序列化和反序列化开销。
- 支持不排序的数据流,经典的MapReduce数据流是需要对记录排序的,但很多应用并不需要排序,通过移除排序,可以进一步提升性能。
- 新加一种与MapReduce不同的编程模型接口Map-Foldl,能够更高效地执行聚合类的应用。
背景
大多数人感兴趣的问题是,为什么NativeTask能够这么快?在我看来,这从另一个问题展开更加合适:MapReduce模型是不是已经足够高效,以致于没有太大的优化空间了?
显然不是。比如,一个高质量的C++程序可以很轻易地在几秒钟内完成1GB数据的简单处理,但一个MapReduce任务(不是整个作业)通常需要几分钟来处理同样的数据。另外,最近学术界也有很多研究表明,MapReduce相比并行数据库性能差距很大,并提出一些改进方案,在特定的应用场景下获得了数十倍的性能提升,比如HadoopDB(目前已经成立创业公司Hadapt),以及最近提出的Clydesdale。这些研究都证明Hadoop还有很大的优化空间。
虽然Hadoop在可扩展性和容错性上有明显的优势,并且在处理非结构化和半结构化数据上易用性更好,但这些优势和性能并不是一种折中关系。MapReduce完全可以在保持原有计算模型不变的前提下更加高效,而且目前没有任何技术方面的局限去实现更高的性能。下面,我们来分析一下MapReduce能做到多快,假设现在有一个Hadoop节点,如表1所示。
假设每个核运行一个任务,这个节点可以并行运行12个任务,平均每个任务使用4GB内存、1个SATA硬盘。一个典型的Map任务各个流程的极限性能应该能够达到如表2所示的理想速度。
从表2可以看出,轻量级压缩可以起到放大I/O吞吐率的作用,这对Hadoop非常关键。目前最快的开源压缩算法可以做到表2中的压缩和解压速度,比如Snappy和LZ4,它们已经整合进了最新的Hadoop主干版本中。假设输入输出数据都拥有4倍的压缩比,通过简单的计算可以得到,一个Map任务可以在8秒钟之内处理1GB(压缩后250MB)数据,吞吐率约为125MB/s。更进一步,对于绝大多数的分析型作业来说,其输出规模比输入规模小得多。另外大部分作业是不需要排序的,这能够避免大部分的排序和压缩开销。最后,还可以利用多线程来加速排序和压缩,因为整个节点有很多CPU资源缝隙可以利用。综合这些因素,对于简单聚合类应用来说,在3秒钟内处理1GB数据完全可能,吞吐率是333MB/s。这个节点有12个并行任务,总的峰值吞吐能够达到333MB/s*12=4GB/s。
这个数字可能不够直观,我们把数字放大到集群。假设有一个25节点的Hadoop集群,使用10GbE网络,在资源调度、数据本地化都完美的情况下,这个集群应该能够:
- 在1分钟内完成1TB的数据排序;
- 在10秒钟内完成对1TB数据的简单聚合类查询。
在具备如此高性能之后,Hadoop已经具备以很低的成本来搭建数据仓库应用的能力, 并且其查询速度相比商业数据库已经没有太大的劣势。上文中提到的服务器大概为1-2万美元,其容量为12核、24TB存储。假设Hadoop的3个副本,实际压缩后容量8TB左右,平均价格为每个核1-2千美元,每TB存储1-2千美元。随着硬件成本持续降低,这个数字还会持续下降,并且在软件方面没有任何成本。在拥有与商业并行数据库可比较的性能之后,基于Hadoop开源技术的大数据解决方案的优势会更加突出。
虽然前景非常美好,但还有很长的路要走。目前在同等规模的集群上,使用1GbE网络,处理TB规模的作业一般需要十分钟到一个小时。目前一个任务处理数据的吞吐率大概为10-20MB/s,与目标100-300MB/s还有很大差距。
MapReduce性能分析
虽然前面讲到的计算包含太多的假设,但其中的各个环节都没有任何技术局限,剩下的只是工程上的问题,即一个高效的设计和实现。要设计和实现一个高效的MapReduce执行引擎,首先需要了解影响Hadoop性能的因素,主要有以下几点。
- I/O瓶颈:绝大多数的MapReduce作业都属于数据密集型,在不使用轻量压缩的情况下,磁盘和网络I/O很容易成为瓶颈。幸运的是,最近一年内有两个非常优秀的开源压缩算法出现,Snappy和LZ4,并且都与Apache许可证兼容,目前它们已经被整合进Hadoop主干版本。对于Hadoop上的常见应用来说,其数据的压缩比是很高的,一般能达到5倍以上,排序或使用列存储后可能会更高,压缩速度约为400M/s, 解压约为1.5G/s, 基本能够将I/O吞吐能力放大5倍。通过与列式存储配合,更高的压缩比也是可能的。在开启轻量压缩之后,CPU会成为Hadoop集群的瓶颈,并且在短期内不会改变,即使目前CPU处理能力的发展要比I/O带宽的发展要快一些。
- Map阶段排序:在简单Map任务中,排序约消耗整个任务CPU资源的一半。目前Map阶段的排序实现很低效,其原因是它使用一个大缓冲区对所有记录一起排序。另外目前的排序没有针对CPU缓存特性进行优化,一个更好的排序实现大概可以将排序性能提升10倍,目前社区已经发现这个问题,也提出一些改进方案。
- 序列化/反序列化:序列化会造成大量的对象创建和小数组拷贝,另外也会引入过于复杂的I/O流抽象和低效的对象比较实现,而这些操作几乎贯穿整个数据处理流。这个问题很早之前也在社区提出过,后来不了了之。个人认为在MapReduce框架层没有必要引入序列化机制,二进制接口是足够的,数据的描述和存储格式应该在更上层,比如在Hive上解决。
- Shuffle:Shuffle一直以来都被认为是一个瓶颈,使用压缩和高速网络可以部分改善这个问题。百度内部和开源社区都对Shuffle进行过很多优化,社区0.23版本MRv2使用Netty代替Jetty实现shuffle的服务器端,加入了批量传输,效率提升了30%。但还是有很大的优化空间,未来单节点的计算能力和网络带宽会越来越大,10GbE甚至40GbE会成为主流。Java和Netty能否支撑如此大的吞吐率,是否需要进一步调优,也是值得调研的。
- 数据局部性:这应该是分布式数据库性能优于MapReduce的主要原因之一,分布式数据库的数据切分、索引优化和查询优化要先进得多,这是数十年技术积累的结果。通常同样的查询,分布式数据库会尽可能减少数据读取和节点之间数据的传输,而MapReduce模型通常需要把整个数据集处理一遍或者多遍,大量的数据扫描和传输会严重影响作业的执行时间和资源消耗。这个是有办法改进的,比如引入一些更加灵活的数据处理流和计算模型、引入不排序的数据流、引入哈希Join的算法、连接多个作业的Map和Reduce任务防止中间数据落地,等等。社区的下一代资源管理框架和MapReduce框架会更加容易扩展,并欢迎尝试各种创新。
- 调度和启动开销:对于数据规模很小或有实时需求的应用,作业的调度和启动开销可能会比较大。
设计
对于前面描述的各种问题,NativeTask仅关注其中的一方面,即在MapReduce任务级别能够影响到方面,具体包括压缩、排序、序列化以及部分shuffle。此外,包括部分Shuffle、数据局部性以及调度,是Hadoop的其余模块和构建在Hadoop之上的应用需要关注的。这两方面上都需要持久的优化与创新,才能对整体性能产生数足够大的影响。虽然NativeTask仅关注在其中一个方面,但为上层的优化提供了更多的可能。
NativeTask选择C++实现,并不是因为Java效率不高。根据我的经验,Java对一般的数据处理非常高效,非常适合作为Hadoop的主要开发语言,但对于特殊的优化和任务,使用C++更加方便或者合适,原因如下。
- 轻量级压缩。目前最快的轻量压缩实现都是C/C++实现, 虽然可以使用JNI包装,但是JNI有额外的开销,并且只能以批量的形式使用。使用C++可以保证轻量级压缩在数据处理的各个环节以多种形式到使用。
- SSE/AVX优化。与压缩类似,Java只能使用JNI以批量的方式使用特殊指令集,从而影响其应用范围,例如Vectorwise中使用到的向量化优化可能就不便在Java中实现。
- LLVM。前文提到过,该项目的主要目的是在此项目之上实现高效的查询执行引擎,以此构建数据仓库工具,而这个执行引擎需要很可能会用到LLVM,所以C++是个合适的选择。
NativeTask避免序列化和数据拷贝。如前文所述,序列化会引入大量开销,为了达到最大的吞吐率,在主要数据流中不使用序列化,基本接口为传递内存数组引用的形式,尽量避免数据拷贝。相比Java,纯二进制接口并不会为C++开发造成额外使用难度。NativeTask的主要功能为纯数据处理,不会涉及到复杂的编程元素例如多线程同步和复杂设计模式,所以尽量保持简单的设计和实现以便于最大化实施优化。
NativeTask主要由Java模块和本地模块两部分构成,它们之间使用JNI进行消息控制和数据传输。这与Hadoop Pipes和Streaming不同,其区别如表3所示。
NativeTask通过为Hadoop添加一个任务代理执行接口来支持NativeTask任务的运行。目前支持兼容模式和本地模式两种数据流(如图1所示)。兼容模式类似Pipes和Streaming,仍旧可以使用原生的RecordReader/Writer, 键值对序列化之后在Java与本地模块之间传输。纯本地模式下RecordReader/Writer也由C++实现,几乎不需要在Java与本地模块之间传输数据,就可以达到最高性能。目前Reduce阶段的Shuffle和Merge还没有完成,依然使用Java的实现,所以Reduce阶段的性能提高还不明显,并且成为瓶颈影响到作业整体的加速比。Reduce阶段的Shuffle和Merge完成之后,应该能够得到与Map阶段类似的性能提升,NativeTask也将成为纯本地的运行环境。
除了保持原有的接口和功能外,NativeTask也增加了一些新功能,比如,支持不排序的数据流。在原有的MapReduce模型中,Map到Reduce之间键值对的排序是整个计算模型的一部分,通过排序可以把键相同的键值对分组在一起,从而便于上层应用处理。但排序会引入以下两大开销。
- Map到Reduce阶段的全局同步,也就是说Reducer要等到所有Map任务全部完成之后才能够开始执行,造成空等待,影响作业整体完成时间。
- Map阶段的排序,Reduce阶段Shuffle和Merge需要缓存所有的键值对,占用大量内存,在缓冲区不足时也会写磁盘,引入大量磁盘读写。
不排序的数据流能够避免这两大开销。聚合类的应用占据大部分的数据分析应用场景,一般在聚合类的数据分析应用中,是不需要排序的,应用仅需要在自己的Mapper和Reducer中加入保存聚合状态的哈希表或者数组,其中仅保存聚合后的状态,相比保存所有键值对并排序来说,其开销要小得多。
另一个新功能是支持foldl形式的接口,它也是为聚合类应用设计的,配合非排序的数据流使用。与传统接口相比,用户不需要再关心聚合操作所需的哈希表的实现和内存管理。
新增这两个功能主要目的是进一步加速聚合类的作业,因为这类作业在简单数据分析领域是最普遍的。目前这两个功能还在开发中,其他待开发的功能包括Reduce端Shuffle和Merge、并行的排序和压缩等,具体的实现细节这里不再赘述。
测试
在一个16节点(1个master节点、15个slave节点)的集群上,测试两个简单MapReduce程序的各项性能指标。这里使用Hadoop版本1.0.1。这两个程序的特点如表4所示。
选择这两个测试,是因为这两个测试都包含在Hadoop官方例子中,简单并容易理解,包含聚合和非聚合类作业,测试结果如表5和表6所示。
需要补充的是,测试环境的编译环境较老因此并不能编译生成较好的本地代码。我在自己的电脑上单独测试单个任务,NativeTask的性能还要再快40-60%左右。不过这里还是给出原始的测试结果。
可以看到,如果考虑到编译因素,NativeTask的Map任务处理速度可以达到原Hadoop的5-12倍,但由于Shuffle和Merge还没有实现,Reduce任务速度差别不会太大。从作业整体完成时间来看,NativeTask比Hadoop快1.5-6倍。不过相信随着NativeTask的持续开发与优化,以及现有Hadoop的持续优化,这个数字也会不断变化。
小结与展望
NativeTask是Hadoop MapReduce的高效执行引擎实现,并提供了C++编程接口。通过分析Hadoop目前的性能问题根源,并优化其中的任务执行单元涉及到的部分,NativeTask获得了不错的性能提升,性能提升的主要因素包括更好的排序实现、关键路径避免序列化、避免复杂抽象,更好地利用压缩等。NativeTask目前没有完全完成,所以期待未来还可以有更大的性能提升。从前文的计算结果可以看出,在目前的主流硬件条件下,理想的任务吞吐应该能够达到100-300MB/s, 而目前仅做到了50-100MB/s。
NativeTask仅关注Hadoop MapReduce性能因素的最底层方面,更高层次的方面也是同样重要的,并且在很多应用类型中经常占据最决定的因素。这些方面最好在更高的应用层次去优化解决,比如数据仓库工具Hive。这也是NativeTask最早计划的发展方向之一。对Hive进行改写,将其查询计划直接翻译为LLVM中间码,编译并在NativeTask上运行。Google在Tenzing的论文中描述了类似的优化,其数据处理吞吐率能够提升6-12倍。
由于目前大多数脚本语言还是C实现的,凭借C良好的接口兼容性,NativeTask很容易与其他脚本语言集成或交互,从而为更多的脚本语言提供使用Hadoop MapReduce的能力,例如Python和R语言。考虑到R语言在数据分析领域的流行程度,Hadoop对R语言的支持是一个趋势,目前已经有些开源项目在进行类似的工作。
还有一个有趣的方向是Hadoop在单一胖节点,或者胖节点组成的小集群(一个机架内)下的优化。对小型企业应用来说,其数据规模一般都在TB级别。只有少数大公司才真正需要PB级别的大数据分析。在不久的未来,随着众核技术流行,利用高密度存储,单个服务器的处理能力可以与当今的小型集群相当。在胖节点内运行的Hadoop作业可以有更多的优化机会,例如没有网络瓶颈,任务之间直接的数据共享,更小的启动开销和更精细的资源调度,结合NativeTask的性能优势,绝大部分的简单数据分析任务在单节点上就可以快速完成。同时节约大量的硬件资源。目前业界和社区更关注水平可扩展性,而对此类垂直扩展优化却似乎很少关注。
可以设想,在不久的未来,数据分析人员在自己的工作站上就可以使用Hadoop甚至R来分析TB级别的数据。如果他需要更大的计算能力,只需要将同样的程序提交到云上运行即可。目前在亚马逊云平台上可以不到1300美元每小时的价格轻松获得30000核的计算能力。但在软件层面,还没有系统可以配合做到这种无缝的可扩展能力和易用性。
声明:本文为作者原文,选登在《程序员》杂志2012年03期时进行了编辑和删减。