MapReduce执行流程

标签: mapreduce | 发表时间:2013-02-07 18:17 | 作者:JavaMan_chen
出处:http://blog.csdn.net
MapReduce的大体流程是这样的,如图所示:

由图片可以看到mapreduce执行下来主要包含这样几个步骤
1.首先对输入数据源进行切片
2.master调度worker执行map任务
3.worker读取输入源片段
4.worker执行map任务,将任务输出保存在本地
5.master调度worker执行reduce任务,reduce worker读取map任务的输出文件
6.执行reduce任务,将任务输出保存到HDFS

若对流程细节进行深究,可以得到这样一张流程图

角色描述:
JobClient:执行任务的客户端
JobTracker:任务调度器
TaskTracker:任务跟踪器
Task:具体的任务(Map OR Reduce)

从生命周期的角度来看,mapreduce流程大概经历这样几个阶段:初始化、分配、执行、反馈、成功与失败的后续处理

每个阶段所做的事情大致如下

任务初始化

1.JobClient对数据源进行切片
切片信息由InputSplit对象封装,接口定义如下:
public interface InputSplit extends Writable {
    long getLength() throws IOException;
    String[] getLocations() throws IOException;
}
可以看到split并不包含具体的数据信息,而只是包含数据的引用,map任务会根据引用地址去加载数据
InputSplit是由InputFormat来负责创建的
public interface InputFormat<K, V> {
    InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
    RecordReader<K, V> getRecordReader(InputSplit split,JobConf job,Reporter reporter) throws IOException;
}
JobClient通过getSplits方法来计算切片信息,切片默认大小和HDFS的块大小相同(64M),这样有利于map任务的本地化执行,无需通过网络传递数据
切片成功后,JobClient会将切片信息传送至JobTracker
2.通过jobTracker生成jobId
    JobTracker.getNewJobId()
3.检查输出目录和输入数据源是否存在
    输出目录已存在,系统抛出异常
    输入源目录不存在,系统抛出异常
4.拷贝任务资源到jobTracker机器上(封装任务的jar包、集群配置文件、输入源切片信息)

任务分配

JobTracker遍历每一个InputSplit,根据其记录的引用地址选择距离最近的TaskTracker去执行,理想情况下切片信息就在TaskTracker的本地,这样节省了网络数据传输的时间
JobTracker和TaskTracker之间是有心跳通信的逻辑的,通过彼此间不停的通信,JobTracker可以判断出哪些TaskTracker正在执行任务,哪些TaskTracker处于空闲状态,以此来合理分配任务

任务执行

TaskTracker接到任务后开始执行如下操作:
1.将任务jar包从HDFS拷贝到本地并进行解压
2.创建一个新的JVM来执行具体的任务,这样做的好处是即使所执行的任务出现了异常,也不会影响TaskTracker的运行使用


如果所执行的任务是map任务,则处理流程大致如下:
首先加载InputSplit记录的数据源切片,通过InputFormat的getRecordReader()方法
获取到Reader后,执行如下操作:
K key = reader.createKey();
V value = reader.createValue();
while (reader.next(key, value)) {//遍历split中的每一条记录,执行map功能函数
    mapper.map(key, value, output, reporter);
}

执行反馈

mapreduce的执行是一个漫长的过程,执行期间会将任务的进度反馈给用户
任务结束后,控制台会打印Counter信息,方便用户以全局的视角来审查任务

执行成功

清理MapReduce本地存储(mapred.local.dir属性指定的目录)
清理map任务的输出文件

执行失败

1.如果task出现问题(map或者reduce)
错误可能原因:用户代码出现异常;任务超过mapred.task.timeout指定的时间依然没有返回
错误处理:
首先将错误信息写入日志
然后jobtracker会调度其他tasktracker来重新执行次任务,如果失败次数超过4次(通过mapred.map.max.attempts和mapred.reduce.max.attempts属性来设置,默认为4),则job以失败告终
如果系统不想以这种方式结束退出,而是想通过Task成功数的百分比来决定job是否通过,则可以指定如下两个属性
mapred.max.map.failures.percent            map任务最大失败率
mapred.max.reduce.failures.percent        reduce任务最大失败率
如果失败比率超过指定的值,则job以失败告终

2.如果是tasktracker出现问题
判断问题的依据:和jobtracker不再心跳通信
jobtracker将该tasktracker从资源池中移除,以后不在调度它

3.jobtracker出现问题
jobtracker作为系统的单点如果出现问题也是最为严重的问题,系统将处于瘫痪
作者:JavaMan_chen 发表于2013-2-7 18:17:27 原文链接
阅读:122 评论:0 查看评论

相关 [mapreduce] 推荐:

Mapreduce小结

- MAGI-CASPER/Peter Pan - 博客园-唯有前进值得敬仰
读完mapreduce论文小结一下. 1.MapReduce是一个编程模型,封装了并行计算、容错、数据分布、负载均衡等细节问题. 输入是一个key-value对的集合,中间输出也是key-value对的集合,用户使用两个函数:Map和Reduce. Map函数接受一个输入的key-value对,然后产生一个中间key-value 对的集合.

Hadoop MapReduce技巧

- - 简单文本
我在使用Hadoop编写MapReduce程序时,遇到了一些问题,通过在Google上查询资料,并结合自己对Hadoop的理解,逐一解决了这些问题. Hadoop对MapReduce中Key与Value的类型是有要求的,简单说来,这些类型必须支持Hadoop的序列化. 为了提高序列化的性能,Hadoop还为Java中常见的基本类型提供了相应地支持序列化的类型,如IntWritable,LongWritable,并为String类型提供了Text类型.

MapReduce原理

- - C++博客-牵着老婆满街逛
       MapReduce 是由Google公司的Jeffrey Dean 和 Sanjay Ghemawat 开发的一个针对大规模群组中的海量数据处理的分布式编程模型. MapReduce实现了两个功能. Map把一个函数应用于集合中的所有成员,然后返回一个基于这个处理的结果集. 而Reduce是把从两个或更多个Map中,通过多个线程,进程或者独立系统并行执行处理的结果集进行分类和归纳.

MapReduce优化

- - 行业应用 - ITeye博客
相信每个程序员在 编程时都会问自己两个问题“我如何完成这个任务”,以及“怎么能让程序运行得更快”. 同样,MapReduce计算模型的多次优化也是为了更好地解答这两个问题. MapReduce计算模型的优化涉及了方方面面的内容,但是主要集中在两个方面:一是计算性能方面的优化;二是I/O操作方面的优化.

Spark与Mapreduce?

- - 崔永键的博客
我本人是类似Hive平台的系统工程师,我对MapReduce的熟悉程度是一般,它是我的底层框架. 我隔壁组在实验Spark,想将一部分计算迁移到Spark上. 年初的时候,看Spark的评价,几乎一致表示,Spark是小数据集上处理复杂迭代的交互系统,并不擅长大数据集,也没有稳定性. 但是最近的风评已经变化,尤其是14年10月他们完成了Peta sort的实验,这标志着Spark越来越接近替代Hadoop MapReduce了.

Google Percolator替代MapReduce

- Hao - Solidot
Google在新一代内容索引系统中放弃了MapReduce,替代者是尚不为人知的分布式数据处理系统Percolator. The Register报道,Percolator是一种增量处理平台,它能持续更新索引系统,无需从头重新处理一遍整个系统. Google的工程师计划在下个月举行的年度USENIX Symposium 会议上公布Percolator相关论文.

下一代Hadoop MapReduce

- Jia - NoSQLFan
本文来自Hadoop Summit大会的一个演讲稿,主讲是Hadoop核心开发团队的Arun C Murthy (@acmurthy),同时他也是Yahoo!刚刚剥离的Hadoop独立公司Hortonworks的 Founder和架构师. 演讲中他讲述了现在的Hadoop存在的一些问题和集群上限,并展望了下一代Hadoop和其MapReduce将会得到的巨大提升.

MapReduce执行流程

- - CSDN博客云计算推荐文章
MapReduce的大体流程是这样的,如图所示:. 由图片可以看到mapreduce执行下来主要包含这样几个步骤. 1.首先对输入数据源进行切片. 2.master调度worker执行map任务. 3.worker读取输入源片段. 4.worker执行map任务,将任务输出保存在本地. 5.master调度worker执行reduce任务,reduce worker读取map任务的输出文件.

MapReduce编程模型

- - CSDN博客云计算推荐文章
MapReduce是一个Google发明的编程模型,也是一个处理和生成超大规模数据集的算法模型的相关实现. 用户首先创建一个Map函数处理一个基于对的数据集合,输出的中间结果基于对的数据集合,然后再创建一个Reduce函数用来合并所有的具有相同中间Key值的中间Value值.

MapReduce - 性能调优

- - CSDN博客云计算推荐文章
        Hadoop为用户作业提供了多种可配置的参数,以允许用户根据作业特点调整这些参数值使作业运行效率达到最优.         对于一大批MapReduce程序,如果可以设置一个Combiner,那么对于提高作业性能是十分有帮助的. Combiner可减少Map Task中间输出的结果,从而减少各个Reduce Task的远程拷贝数据量,最终表现为Map Task和Reduce Task执行时间缩短.