hadoop输入分片计算(Map Task个数的确定) - 有无之中

标签: hadoop 计算 map | 发表时间:2014-11-21 22:11 | 作者:有无之中
出处:

  作业从JobClient端的submitJobInternal()方法提交作业的同时,调用InputFormat接口的getSplits()方法来创建split。默认是使用InputFormat的子类FileInputFormat来计算分片,而split的默认实现为FileSplit(其父接口为InputSplit)。这里要注意,split只是逻辑上的概念,并不对文件做实际的切分。一个split记录了一个Map Task要处理的文件区间,所以分片要记录其对应的文件偏移量以及长度等。每个split由一个Map Task来处理,所以有多少split,就有多少Map Task。下面着重分析这个方法:

1 public List<InputSplit> getSplits(JobContext job
2 ) throws IOException {
3 //getFormatMinSplitSize():始终返回1
4 //getMinSplitSize(job):获取” mapred.min.split.size”的值,默认为1
5 long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
6
7 //getMaxSplitSize(job):获取"mapred.max.split.size"的值,
8 //默认配置文件中并没有这一项,所以其默认值为” Long.MAX_VALUE”,即2^63 – 1
9 long maxSize = getMaxSplitSize(job);
10
11 // generate splits
12 List<InputSplit> splits = new ArrayList<InputSplit>();
13 List<FileStatus>files = listStatus(job);
14 for (FileStatus file: files) {
15 Path path = file.getPath();
16 FileSystem fs = path.getFileSystem(job.getConfiguration());
17 long length = file.getLen();
18 BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
19 if ((length != 0) && isSplitable(job, path)) {
20 long blockSize = file.getBlockSize();
21 //计算split大小
22 long splitSize = computeSplitSize(blockSize, minSize, maxSize);
23
24 //计算split个数
25 long bytesRemaining = length; //bytesRemaining表示剩余字节数
26 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { //SPLIT_SLOP=1.1
27 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
28 splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
29 blkLocations[blkIndex].getHosts()));
30 bytesRemaining -= splitSize;
31 }
32
33 if (bytesRemaining != 0) {
34 splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,
35 blkLocations[blkLocations.length-1].getHosts()));
36 }
37 } else if (length != 0) {
38 splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
39 } else {
40 //Create empty hosts array for zero length files
41 splits.add(new FileSplit(path, 0, length, new String[0]));
42 }
43 }
44
45 // Save the number of input files in the job-conf
46 job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
47
48 LOG.debug("Total # of splits: " + splits.size());
49 return splits;
50 }

  首先计算分片的下限和上限:minSize和maxSize,具体的过程在注释中已经说清楚了。接下来用这两个值再加上blockSize来计算实际的split大小,过程也很简单,具体代码如下:

1 protected long computeSplitSize(long blockSize, long minSize,
2 long maxSize) {
3 return Math.max(minSize, Math.min(maxSize, blockSize));
4 }

 

  接下来就是计算实际的分片个数了。针对每个输入文件,计算input split的个数。while循环的含义如下:

  a)  文件剩余字节数/splitSize>1.1,创建一个split,这个split的字节数=splitSize,文件剩余字节数=文件大小 - splitSize

  b)  文件剩余字节数/splitSize<1.1,剩余的部分全都作为一个split(这主要是考虑到,不用为剩余的很少的字节数一些启动一个Map Task)

  

  我们发现,在默认配置下,split大小和block大小是相同的。这是不是为了防止这种情况:

一个split如果对应的多个block,若这些block大多不在本地,则会降低Map Task的本地性,降低效率。

 

  到这里split的划分就介绍完了,但是有两个问题需要考虑:

1、如果一个record跨越了两个block该怎么办?

  这个可以看到,在Map Task读取block的时候,每次是读取一行的,如果发现块的开头不是上一个文件的结束,那么抛弃第一条record,因为这个record会被上一个block对应的Map Task来处理。那么,第二个问题来了:

2、上一个block对应的Map Task并没有最后一条完整的record,它又该怎么办?

  一般来说,Map Task在读block的时候都会多读后续的几个block,以处理上面的这种情况。不过这部分的代码我还没有看到,等看到了再补充吧。

 

  本文基于hadoop1.2.1

  如有错误,还请指正

  参考文章:《Hadoop技术内幕 深入理解MapReduce架构设计与实现原理》 董西成

  转载请注明出处: http://www.cnblogs.com/gwgyk/p/4113929.html

 


本文链接: hadoop输入分片计算(Map Task个数的确定),转载请注明。

相关 [hadoop 计算 map] 推荐:

hadoop输入分片计算(Map Task个数的确定) - 有无之中

- - 博客园_首页
  作业从JobClient端的submitJobInternal()方法提交作业的同时,调用InputFormat接口的getSplits()方法来创建split. 默认是使用InputFormat的子类FileInputFormat来计算分片,而split的默认实现为FileSplit(其父接口为InputSplit).

hadoop学习(七)WordCount+Block+Split+Shuffle+Map+Reduce技术详解

- - CSDN博客数据库推荐文章
纯干活:通过WourdCount程序示例:详细讲解MapReduce之Block+Split+Shuffle+Map+Reduce的区别及数据处理流程.        Shuffle过程是MapReduce的核心,集中了MR过程最关键的部分. 要想了解MR,Shuffle是必须要理解的. 了解Shuffle的过程,更有利于我们在对MapReduce job性能调优的工作,以及对MR内部机理有更深一步的了解.

(转)深度分析如何在Hadoop中控制Map的数量

- - 互联网 - ITeye博客
很 多文档中描述,Mapper的数量在默认情况下不可直接控制干预,因为Mapper的数量由输入的大小和个数决定. 在默认情况下,最终input占据了多 少block,就应该启动多少个Mapper. 如果输入的文件数量巨大,但是每个文件的size都小于HDFS的blockSize,那么会造成启动的 Mapper等于文件的数量(即每个文件都占据了一个block),那么很可能造成启动的Mapper数量超出限制而导致崩溃.

Hadoop与分布式计算

- 透明 - 丕子
写本文由leftnoteasy发布于http://leftnoteasy.cnblogs.com 本文可以被全部或者部分的使用,但请注明出处,如果有问题,可以联系wheeleast (at) gmail.com, 也可以加作者的新浪微博:http://weibo.com/leftnoteasy. 很久没有写写博客了,之前主要是换工作,耽误了很多的时间,让人也变得懒散,不想花大时间来写东西.

JavaScript Source Map 详解

- - 阮一峰的网络日志
上周, jQuery 1.9发布. 这是2.0版之前的最后一个新版本,有很多新功能,其中一个就是支持Source Map. 访问 http://ajax.googleapis.com/ajax/libs/jquery/1.9.0/jquery.min.js,打开压缩后的版本,滚动到底部,你可以看到最后一行是这样的:.

Hibernate调优之select new map()

- - CSDN博客架构设计推荐文章
        Hibernate调优不只是设置一下lazy,调整一下由谁来维护这个字段而已.         这次要说的是对查询语句进行优化——select new map().         select new map语句结果说明.         语句一:.         结果list中,每条记录对应一个object数组,object[]中每个元素为hql语句中列的序号(从0开始).

基于的Map/Reduce的ItemCF

- - M.J.
ItemCF为基于邻域的方法使用用户共同行为来对Item之间的相似度进行计算,从而利用k-近邻算法使用用户曾经有个行为的Item进行推荐. 好处是系统只需要存储Item x Item的相似度矩阵,对于Item数量远小于用户数量的应用来说,具有很高的性价比. ItemCF最核心的计算为item之间相似度矩阵的计算,同时还需要能够在短时间内响应Item变化情况(用户有行为之后就会造成相似度矩阵的重新计算,实际中不会全部重新计算而会使用增量计算的方式.

Hadoop计算能力调度器算法解析

- Roger - 董的博客
本文描述了hadoop中的计算能力调度器(Capacity Scheduler)的实现算法,计算能力调度器是由Yahoo贡献的,主要是解决HADOOP-3421中提出的,在调度器上完成HOD(Hadoop On Demand)功能,克服已有HOD的性能低效的缺点. 它适合于多用户共享集群的环境的调度器.

IT企业利用云计算平台Hadoop的10种方式

- - 博客园_新闻
如果你是世界上广大 Hadoop 用户的一员,你肯定知道 Google 曾经靠着分布式计算技术(Hadoop),在搜索引擎和广告方面取得了举世瞩目的成就. 现在的 Hadoop 不仅是当年的老二 Yahoo 的专用产品了,从 Hadoop 长长的用户名单中,可以看到 Facebook, 可以看到 Linkedin,可以看到 Amazon,可以看到 EMC, eBay,Tweeter,IBM, Microsoft, Apple, HP….

分布式计算开源框架Hadoop入门实践

- - ITeye博客
一、分布式计算开源框架Hadoop实践. 在 SIP项目设计的过程中,对于它庞大的日志在开始时就考虑使用任务分解的多线程处理模式来分析统计,在我从前写的文章《Tiger Concurrent Practice --日志分析并行分解设计与实现》中有所提到. 但是由于统计的内容暂时还是十分简单,所以就采用Memcache作为计数器,结合MySQL就完成了访问 控制以及统计的工作.