MongoDB MapReduce 性能提升20倍的优化宝典

标签: mongodb mapreduce 性能 | 发表时间:2014-06-12 22:33 | 作者:这些年
出处:http://www.iteye.com

自从MongoDB被越来越多的大型关键项目采用后,数据分析也成为了越来越重要的话题。人们似乎已经厌倦了使用不同的软件来进行分析(这都利用到了Hadoop),因为这些方法往往需要大规模的数据传输,而这些成本相当昂贵。 

MongoDB提供了2种方式来对数据进行分析: Map Reduce(以下简称MR)和聚合框架(Aggregation Framework)。MR非常灵活且易于使用,它可以很好地与分片(sharding)结合使用,并允许大规模输出。尽管在MongoDB v2.4版本中,由于JavaScript引擎从Spider切换到了V8,使得MR的性能有了大幅改进,但是与Agg Framework(使用C++)相比,MR的速度还是显得比较慢。本文就来看看,有哪些方法可以让MR的速度有所提升。 

测试 

首先我们来做个测试,插入1000万文档,这些文档中包含了介于0和100万之间的单一整数值,这意味着,平均每10个文档具有相同的值。 

代码 
  1. > for (var i = 0; i < 10000000; ++i){ db.uniques.insert({ dim0: Math.floor(Math.random()*1000000) });}  
  2. > db.uniques.findOne()  
  3. { "_id" : ObjectId("51d3c386acd412e22c188dec"), "dim0" : 570859 }  
  4. > db.uniques.ensureIndex({dim0: 1})  
  5. > db.uniques.stats()  
  6. {  
  7.         "ns" : "test.uniques",  
  8.         "count" : 10000000,  
  9.         "size" : 360000052,  
  10.         "avgObjSize" : 36.0000052,  
  11.         "storageSize" : 582864896,  
  12.         "numExtents" : 18,  
  13.         "nindexes" : 2,  
  14.         "lastExtentSize" : 153874432,  
  15.         "paddingFactor" : 1,  
  16.         "systemFlags" : 1,  
  17.         "userFlags" : 0,  
  18.         "totalIndexSize" : 576040080,  
  19.         "indexSizes" : {  
  20.                 "_id_" : 324456384,  
  21.                 "dim0_1" : 251583696  
  22.         },  
  23.         "ok" : 1  
  24. }  



这里我们想要得到文档中唯一值的计数,可以通过下面的MR任务来轻松完成: 

代码 
  1. > db.runCommand(  
  2. { mapreduce: "uniques",  
  3. map: function () { emit(this.dim0, 1); },  
  4. reduce: function (key, values) { return Array.sum(values); },  
  5. out: "mrout" })  
  6. {  
  7.         "result" : "mrout",  
  8.         "timeMillis" : 1161960,  
  9.         "counts" : {  
  10.                 "input" : 10000000,  
  11.                 "emit" : 10000000,  
  12.                 "reduce" : 1059138,  
  13.                 "output" : 999961  
  14.         },  
  15.         "ok" : 1  
  16. }  



正如你看到的,输出结果大约需要1200秒(在EC2 M3实例上测试),共输出了1千万maps、100万reduces、999961个文档。结果类似于: 

代码 
  1. > db.mrout.find()  
  2. { "_id" : 1, "value" : 10 }  
  3. { "_id" : 2, "value" : 5 }  
  4. { "_id" : 3, "value" : 6 }  
  5. { "_id" : 4, "value" : 10 }  
  6. { "_id" : 5, "value" : 9 }  
  7. { "_id" : 6, "value" : 12 }  
  8. { "_id" : 7, "value" : 5 }  
  9. { "_id" : 8, "value" : 16 }  
  10. { "_id" : 9, "value" : 10 }  
  11. { "_id" : 10, "value" : 13 }  
  12. ...  



下面就来看看如何进行优化。 

使用排序 

我在之前的这篇 文章中简要说明了使用排序对于MR的好处,这是一个鲜为人知的特性。在这种情况下,如果处理未排序的输入,意味着MR引擎将得到随机排序的值,基本上没有机会在RAM中进行reduce,相反,它将不得不通过一个临时collection来将数据写回磁盘,然后按顺序读取并进行reduce。 

下面来看看如果使用排序,会有什么帮助: 

代码 
  1. > db.runCommand(  
  2. { mapreduce: "uniques",  
  3. map: function () { emit(this.dim0, 1); },  
  4. reduce: function (key, values) { return Array.sum(values); },  
  5. out: "mrout",  
  6. sort: {dim0: 1} })  
  7. {  
  8.         "result" : "mrout",  
  9.         "timeMillis" : 192589,  
  10.         "counts" : {  
  11.                 "input" : 10000000,  
  12.                 "emit" : 10000000,  
  13.                 "reduce" : 1000372,  
  14.                 "output" : 999961  
  15.         },  
  16.         "ok" : 1  
  17. }  



现在时间降到了192秒,速度提升了6倍。其实reduces的数量是差不多的,但是它们在被写入磁盘之前已经在RAM中完成了。 

使用多线程 

在MongoDB中,一个单一的MR任务并不能使用多线程——只有在多个任务中才能使用多线程。但是目前的多核CPU非常有利于在单一服务器上进行并行化工作,就像Hadoop。我们需要做的是,将输入数据分割成若干块,并为每个块分配一个MR任务。splitVector命令可以帮助你非常迅速地找到分割点,如果你有更简单的分割方法更好。 

代码 
  1. > db.runCommand({splitVector: "test.uniques", keyPattern: {dim0: 1}, maxChunkSizeBytes: 32000000})  
  2. {  
  3.     "timeMillis" : 6006,  
  4.     "splitKeys" : [  
  5.         {  
  6.             "dim0" : 18171  
  7.         },  
  8.         {  
  9.             "dim0" : 36378  
  10.         },  
  11.         {  
  12.             "dim0" : 54528  
  13.         },  
  14.         {  
  15.             "dim0" : 72717  
  16.         },  
  17. …  
  18.         {  
  19.             "dim0" : 963598  
  20.         },  
  21.         {  
  22.             "dim0" : 981805  
  23.         }  
  24.     ],  
  25.     "ok" : 1  
  26. }  



从1千万文档中找出分割点,使用splitVector命令只需要大约5秒,这已经相当快了。所以,下面我们需要做的是找到一种方式来创建多个MR任务。从应用服务器方面来说,使用多线程和$gt / $lt查询命令会非常方便。从shell方面来说,可以使用ScopedThread对象,它的工作原理如下: 

代码 
  1. > var t = new ScopedThread(mapred, 963598, 981805)  
  2. > t.start()  
  3. > t.join()  



现在我们可以放入一些JS代码,这些代码可以产生4个线程,下面来等待结果显示: 

代码 
  1. > var res = db.runCommand({splitVector: "test.uniques", keyPattern: {dim0: 1}, maxChunkSizeBytes: 32 *1024 * 1024 })  
  2. > var keys = res.splitKeys  
  3. > keys.length  
  4. 39  
  5. > var mapred = function(min, max) {  
  6. return db.runCommand({ mapreduce: "uniques",  
  7. map: function () { emit(this.dim0, 1); },  
  8. reduce: function (key, values) { return Array.sum(values); },  
  9. out: "mrout" + min,  
  10. sort: {dim0: 1},  
  11. query: { dim0: { $gte: min, $lt: max } } }) }  
  12. > var numThreads = 4  
  13. > var inc = Math.floor(keys.length / numThreads) + 1  
  14. > threads = []; for (var i = 0; i < numThreads; ++i) { var min = (i == 0) ? 0 : keys[i * inc].dim0; var max = (i * inc + inc >= keys.length) ? MaxKey : keys[i * inc + inc].dim0 ; print("min:" + min + " max:" + max); var t = new ScopedThread(mapred, min, max); threads.push(t); t.start() }  
  15. min:0 max:274736  
  16. min:274736 max:524997  
  17. min:524997 max:775025  
  18. min:775025 max:{ "$maxKey" : 1 }  
  19. connecting to: test  
  20. connecting to: test  
  21. connecting to: test  
  22. connecting to: test  
  23. > for (var i in threads) { var t = threads[i]; t.join(); printjson(t.returnData()); }  
  24. {  
  25.         "result" : "mrout0",  
  26.         "timeMillis" : 205790,  
  27.         "counts" : {  
  28.                 "input" : 2750002,  
  29.                 "emit" : 2750002,  
  30.                 "reduce" : 274828,  
  31.                 "output" : 274723  
  32.         },  
  33.         "ok" : 1  
  34. }  
  35. {  
  36.         "result" : "mrout274736",  
  37.         "timeMillis" : 189868,  
  38.         "counts" : {  
  39.                 "input" : 2500013,  
  40.                 "emit" : 2500013,  
  41.                 "reduce" : 250364,  
  42.                 "output" : 250255  
  43.         },  
  44.         "ok" : 1  
  45. }  
  46. {  
  47.         "result" : "mrout524997",  
  48.         "timeMillis" : 191449,  
  49.         "counts" : {  
  50.                 "input" : 2500014,  
  51.                 "emit" : 2500014,  
  52.                 "reduce" : 250120,  
  53.                 "output" : 250019  
  54.         },  
  55.         "ok" : 1  
  56. }  
  57. {  
  58.         "result" : "mrout775025",  
  59.         "timeMillis" : 184945,  
  60.         "counts" : {  
  61.                 "input" : 2249971,  
  62.                 "emit" : 2249971,  
  63.                 "reduce" : 225057,  
  64.                 "output" : 224964  
  65.         },  
  66.         "ok" : 1  
  67. }  



第1个线程所做的工作比其他的要多一点,但时间仍达到了190秒,这意味着多线程并没有比单线程快! 

使用多个数据库 

这里的问题是,线程之间存在太多锁争用。当锁时,MR不是非常无私(每1000次读取会进行yield)。由于MR任务做了大量写操作,线程之间结束时会等待彼此。由于MongoDB的每个数据库都有独立的锁,那么让我们来尝试为每个线程使用不同的输出数据库: 

代码 
  1. > var mapred = function(min, max) {  
  2. return db.runCommand({ mapreduce: "uniques",  
  3. map: function () { emit(this.dim0, 1); },  
  4. reduce: function (key, values) { return Array.sum(values); },  
  5. out: { replace: "mrout" + min, db: "mrdb" + min },  
  6. sort: {dim0: 1},  
  7. query: { dim0: { $gte: min, $lt: max } } }) }  
  8. > threads = []; for (var i = 0; i < numThreads; ++i) { var min = (i == 0) ? 0 : keys[i * inc].dim0; var max = (i * inc + inc >= keys.length) ? MaxKey : keys[i * inc + inc].dim0 ; print("min:" + min + " max:" + max); var t = new ScopedThread(mapred, min, max); threads.push(t); t.start() }  
  9. min:0 max:274736  
  10. min:274736 max:524997  
  11. min:524997 max:775025  
  12. min:775025 max:{ "$maxKey" : 1 }  
  13. connecting to: test  
  14. connecting to: test  
  15. connecting to: test  
  16. connecting to: test  
  17. > for (var i in threads) { var t = threads[i]; t.join(); printjson(t.returnData()); }  
  18. ...  
  19. {  
  20.         "result" : {  
  21.                 "db" : "mrdb274736",  
  22.                 "collection" : "mrout274736"  
  23.         },  
  24.         "timeMillis" : 105821,  
  25.         "counts" : {  
  26.                 "input" : 2500013,  
  27.                 "emit" : 2500013,  
  28.                 "reduce" : 250364,  
  29.                 "output" : 250255  
  30.         },  
  31.         "ok" : 1  
  32. }  
  33. ...  



所需时间减少到了100秒,这意味着与一个单独的线程相比,速度约提高2倍。尽管不如预期,但已经很不错了。在这里,我使用了4个核心,只提升了2倍,如果使用8核CPU,大约会提升4倍。 

使用纯JavaScript模式 

在线程之间分割输入数据时,有一些非常有趣的东西:每个线程只拥有约25万主键来输出,而不是100万。这意味着我们可以使用“纯JS模式”——通过jsMode:true来启用。开启后,MongoDB不会在JS和BSON之间反复转换,相反,它会从内部的一个50万主键的JS字典来reduces所有对象。下面来看看该操作是否对速度提升有帮助。

代码 
  1. > var mapred = function(min, max) {  
  2. return db.runCommand({ mapreduce: "uniques",  
  3. map: function () { emit(this.dim0, 1); },  
  4. reduce: function (key, values) { return Array.sum(values); },  
  5. out: { replace: "mrout" + min, db: "mrdb" + min },  
  6. sort: {dim0: 1},  
  7. query: { dim0: { $gte: min, $lt: max } },  
  8. jsMode: true }) }  
  9. > threads = []; for (var i = 0; i < numThreads; ++i) { var min = (i == 0) ? 0 : keys[i * inc].dim0; var max = (i * inc + inc >= keys.length) ? MaxKey : keys[i * inc + inc].dim0 ; print("min:" + min + " max:" + max); var t = new ScopedThread(mapred, min, max); threads.push(t); t.start() }  
  10. min:0 max:274736  
  11. min:274736 max:524997  
  12. min:524997 max:775025  
  13. min:775025 max:{ "$maxKey" : 1 }  
  14. connecting to: test  
  15. connecting to: test  
  16. connecting to: test  
  17. connecting to: test  
  18. > for (var i in threads) { var t = threads[i]; t.join(); printjson(t.returnData()); }  
  19. ...  
  20. {  
  21.         "result" : {  
  22.                 "db" : "mrdb274736",  
  23.                 "collection" : "mrout274736"  
  24.         },  
  25.         "timeMillis" : 70507,  
  26.         "counts" : {  
  27.                 "input" : 2500013,  
  28.                 "emit" : 2500013,  
  29.                 "reduce" : 250156,  
  30.                 "output" : 250255  
  31.         },  
  32.         "ok" : 1  
  33. }  
  34. ...  



现在时间降低到70秒。看来jsMode确实有帮助,尤其是当对象有很多字段时。该示例中是一个单一的数字字段,不过仍然提升了30%。 

MongoDB v2.6版本中的改进 

在MongoDB v2.6版本的开发中,移除了一段关于在JS函数调用时的一个可选“args”参数的代码。该参数是不标准的,也不建议使用,它由于历史原因遗留了下来(见 SERVER-4654)。让我们从Git库中pull最新的MongoDB并编译,然后再次运行测试用例: 

代码 
  1. ...  
  2. {  
  3.         "result" : {  
  4.                 "db" : "mrdb274736",  
  5.                 "collection" : "mrout274736"  
  6.         },  
  7.         "timeMillis" : 62785,  
  8.         "counts" : {  
  9.                 "input" : 2500013,  
  10.                 "emit" : 2500013,  
  11.                 "reduce" : 250156,  
  12.                 "output" : 250255  
  13.         },  
  14.         "ok" : 1  
  15. }  
  16. ...  



从结果来看,时间降低到了60秒,速度大约提升了10-15%。同时,这种更改也改善了JS引擎的整体堆消耗量。

结论 

回头来看,对于同样的MR任务,与最开始时的1200秒相比,速度已经提升了20倍。这种优化应该适用于大多数情况,即使一些技巧效果不那么理想(比如使用多个输出dbs /集合)。但是这些技巧可以帮助人们来提升MR任务的速度,未来这些特性也许会更加易用——比如,这个 ticket 将会使splitVector命令更加可用,这个 ticket将会改进同一数据库中的多个MR任务。 



已有 0 人发表留言,猛击->> 这里<<-参与讨论


ITeye推荐



相关 [mongodb mapreduce 性能] 推荐:

MongoDB MapReduce 性能提升20倍的优化宝典

- - 数据库 - ITeye博客
自从MongoDB被越来越多的大型关键项目采用后,数据分析也成为了越来越重要的话题. 人们似乎已经厌倦了使用不同的软件来进行分析(这都利用到了Hadoop),因为这些方法往往需要大规模的数据传输,而这些成本相当昂贵. MongoDB提供了2种方式来对数据进行分析: Map Reduce(以下简称MR)和聚合框架(Aggregation Framework).

MapReduce - 性能调优

- - CSDN博客云计算推荐文章
        Hadoop为用户作业提供了多种可配置的参数,以允许用户根据作业特点调整这些参数值使作业运行效率达到最优.         对于一大批MapReduce程序,如果可以设置一个Combiner,那么对于提高作业性能是十分有帮助的. Combiner可减少Map Task中间输出的结果,从而减少各个Reduce Task的远程拷贝数据量,最终表现为Map Task和Reduce Task执行时间缩短.

mongodb性能测试

- - 数据库 - ITeye博客
1) Mongodb的非安全插入方式,在一开始插入性能是非常高的,但是在达到了两千万条数据之后性能骤减,这个时候恰巧是服务器24G内存基本占满的时候(随着测试的进行mongodb不断占据内存,一直到操作系统的内存全部占满),也就是说Mongodb的内存映射方式,使得数据全部在内存中的时候速度飞快,当部分数据需要换出到磁盘上之后,性能下降很厉害.

【Hadoop】MapReduce使用combiner优化性能

- - CSDN博客云计算推荐文章
当MapReduce模型中,reduce执行的任务为统计分类类型的值总量或去重后的数量,或最大值最小值时,可以考虑在Map输出后进行combine操作;这样可以减少网络传输带来的开销,同时减轻了reduce任务的负担. Combine操作是运行在每个节点上的,只会影响本地Map的输出结果;Combine的输入为本地map的输出结果(一般是数据在溢出到磁盘之前,可以减少IO开销),其输出则作为reduce的输入.

记一次MongoDB性能问题

- Fstone - 火丁笔记
最近忙着把一个项目从MySQL迁移到MongoDB,在导入旧数据的过程中,遇到了些许波折,犯了不少错误,但同时也学到了不少知识,遂记录下来. 公司为这个项目专门配备了几台高性能务器,清一色的双路四核超线程CPU,外加32G内存,运维人员安装好MongoDB后,就交我手里了,我习惯于在使用新服务器前先看看相关日志,了解一下基本情况,当我浏览MongoDB日志时,发现一些警告信息:.

mongodb索引讲解与性能调优

- - haohtml's blog
mongodb索引规则基本上与传统的关系库一样,大部分优化MySQL/Oracle/SQLite索引的技巧也适用于mongodb. 当查询中用到某些条件时,可以对该键建立索引,以提高查询速度. 如果数据量很多且查询多于更新时,可以用索引提高查询的速度. a)         查询索引:. 查询索引很简单,比如说需要查询mailaccess数据库中的Mail collection上的索引时:.

Cassandra HBase和MongoDb性能比较

- - 数据库 - ITeye博客
这是一篇基于亚马逊云平台上对三个主流的. NoSQL数据库性能比较,在读写两个操作不同的组合情况下性能表现不同. 横坐标是吞吐量,纵坐标是延迟,这是一对矛盾,吞吐量越大,延迟越低,代表越好. 纯粹插入,Cassandra领先,见下图:. 2.WorkloadA: 读修改操作各占一半情况下的修改性能:MongoDB明显延迟增加,落败:.

[Cacti] mongodb性能监控实战

- - CSDN博客数据库推荐文章
          为了更好的使用mongodb,需要监控出mongodb的一些基础使用情况,比如Flush数、连接数、内存使用率、Index操作,Slave延迟等等,这些可以通过配置cacti监控mongodb的模板来完成. 1,在cacti界面导入模板 在计算机本地,下载此tgz包:http://mysql-cacti-templates.googlecode.com/files/better-cacti-templates-1.1.8.tar.gz.

开发高性能的MongoDB应用—浅谈MongoDB性能优化 - 吴纹羽

- - 博客园_首页
大数据时代的数据存储,非关系型数据库MongoDB(一).   “如何能让软件拥有更高的性能. ”,我想这是一个大部分开发者都思考过的问题. 性能往往决定了一个软件的质量,如果你开发的是一个互联网产品,那么你的产品性能将更加受到考验,因为你面对的是广大的 互联网用户,他们可不是那么有耐心的. 严重点说,页面的加载速度每增加一秒也许都会使你失去一部分用户,也就是说, 加载速度和用户量是成反比的.

记一次MongoDB性能问题,附原理解析

- zffl - NoSQLFan
下面文章转载自火丁笔记,原作者描述了一次MongoDB数据迁移过程中遇到的性能问题及其解决方案,中间追查问题的方法和工具值得我们学习. 另外NoSQLFan还对作者略讲的问题产生原理进行了分析,希望对您有用. 最近忙着把一个项目从MySQL迁移到MongoDB,在导入旧数据的过程中,遇到了些许波折,犯了不少错误,但同时也学到了不少知识,遂记录下来.