【Hadoop】MapReduce使用combiner优化性能

标签: hadoop mapreduce combiner | 发表时间:2013-11-13 05:20 | 作者:moxiaomomo
出处:http://blog.csdn.net
当MapReduce模型中,reduce执行的任务为统计分类类型的值总量或去重后的数量,或最大值最小值时,可以考虑在Map输出后进行combine操作;这样可以减少网络传输带来的开销,同时减轻了reduce任务的负担。
Combine操作是运行在每个节点上的,只会影响本地Map的输出结果;Combine的输入为本地map的输出结果(一般是数据在溢出到磁盘之前,可以减少IO开销),其输出则作为reduce的输入。
很多时候combine的逻辑和reduce的逻辑是相同的,因此两者可以共用Reducer体;这个时候只需要在客户端中设置Map类之后,Reduce类之前加入一行代码: job.setCombinerClass(MyReducer.class);。如果需要自定义combiner类,可以类似这样(示例为从HBase表读取数据,计算,然后写入另一个HBase表中):
public class Test {		
	public static class MyMapper extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
		@Override
		public void map(ImmutableBytesWritable ibw, Result result, Context context) throws IOException, InterruptedException {
			// 从HBase中读取数据后,进行map相关操作
			// ...
			// 将map结果输出到缓冲区
			context.write(new ImmutableBytesWritable(myMapKey, new ImmutableBytesWritable(myMapValue));
		}
	}
	
	public static class MyCombiner extends TableReducer<ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable>{
		@Override
		public void reduce(ImmutableBytesWritable mapKey, Iterable<ImmutableBytesWritable> mapValues, Context context) throws IOException, InterruptedException {
			// 一些操作,如distinct, sum, max, distinct 
			// ...
			// 将combine结果输出到缓冲区
			context.write(mapKey, new ImmutableBytesWritable(combineValue));	
		}
	}	

	public static class MyReducer extends TableReducer<ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable> {
		@Override
		public void reduce(ImmutableBytesWritable reduceKeyIBW, Iterable<ImmutableBytesWritable> reduceValues, Context context) throws IOException, InterruptedException {
			// 一些操作,如distinct, sum, max, distinct 
			// ...
			// 将reduce结果(类型Put)输出到缓冲区,用于插入到指定HBase表中
			context.write(null, put);		
		}
	}	

	public static void main(String[] args) throws Exception {
		Configuration configuration = HBaseConfiguration.create();
		Job job = new Job(configuration, "test_mr");
		job.setJarByClass(Test.class);			
		job.setNumReduceTasks(4);  // reduce任务数量默认为1
		
		Scan scan = new Scan();
		scan.setCacheBlocks(false); 
		scan.setCaching(1000); //每次从服务器端读取的行数,默认为配置文件中设置的值
		TableMapReduceUtil.initTableMapperJob("table_for_read", scan, MyMapper.class, ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
		job.setCombinerClass(MyCombiner.class);   // 设置combiner
		TableMapReduceUtil.initTableReducerJob("table_to_write", MyReducer.class, job);
		
		job.waitForCompletion(true);
	}
}


当combine操作适用而且Map的输出结果数量很大时,combine的作用是很明显的。
以下是某次测试环境中使用combine前后的对比效果,可以看到Combine output records接近Combine input records的三分之一, 而reduce的输入规模接近原来的十分之一。
两次实际的测试用时分别为50mins16sec, 30mins6sec,耗时节省了接近一半。如图所示:







作者:moxiaomomo 发表于2013-11-12 21:20:54 原文链接
阅读:79 评论:0 查看评论

相关 [hadoop mapreduce combiner] 推荐:

【Hadoop】MapReduce使用combiner优化性能

- - CSDN博客云计算推荐文章
当MapReduce模型中,reduce执行的任务为统计分类类型的值总量或去重后的数量,或最大值最小值时,可以考虑在Map输出后进行combine操作;这样可以减少网络传输带来的开销,同时减轻了reduce任务的负担. Combine操作是运行在每个节点上的,只会影响本地Map的输出结果;Combine的输入为本地map的输出结果(一般是数据在溢出到磁盘之前,可以减少IO开销),其输出则作为reduce的输入.

Hadoop MapReduce技巧

- - 简单文本
我在使用Hadoop编写MapReduce程序时,遇到了一些问题,通过在Google上查询资料,并结合自己对Hadoop的理解,逐一解决了这些问题. Hadoop对MapReduce中Key与Value的类型是有要求的,简单说来,这些类型必须支持Hadoop的序列化. 为了提高序列化的性能,Hadoop还为Java中常见的基本类型提供了相应地支持序列化的类型,如IntWritable,LongWritable,并为String类型提供了Text类型.

下一代Hadoop MapReduce

- Jia - NoSQLFan
本文来自Hadoop Summit大会的一个演讲稿,主讲是Hadoop核心开发团队的Arun C Murthy (@acmurthy),同时他也是Yahoo!刚刚剥离的Hadoop独立公司Hortonworks的 Founder和架构师. 演讲中他讲述了现在的Hadoop存在的一些问题和集群上限,并展望了下一代Hadoop和其MapReduce将会得到的巨大提升.

Hadoop 高级程序设计(三)---自定义Partition和Combiner

- - CSDN博客云计算推荐文章
Hadoop提供了缺省的Partition来完成map的输出向reduce分发处理. 有时也需要自定义partition来将相同key值的数据分发到同一个reduce处理,为了减少map过程输出的中间结果键值对的数量,降低网络数据通信开销,用户也可以自定制combiner过程. 自定制Partition过程:.

"Hadoop/MapReduce/HBase"分享总结

- - ITeye博客
此分享是关于hadoop生态系统的简单介绍包括起源到相对应用. Hadoop和HBase.pdf (2.1 MB). 已有 0 人发表留言,猛击->> 这里<<-参与讨论. —软件人才免语言低担保 赴美带薪读研.

Hadoop之MapReduce单元测试

- - ITeye博客
通常情况下,我们需要用小数据集来单元测试我们写好的map函数和reduce函数. 而一般我们可以使用Mockito框架来模拟OutputCollector对象(Hadoop版本号小于0.20.0)和Context对象(大于等于0.20.0). 下面是一个简单的WordCount例子:(使用的是新API).

Hadoop MapReduce高级编程

- - 互联网 - ITeye博客
•combine函数把一个map函数产生的对(多个key, value)合并成一个新的. 将新的作为输入到reduce函数中,其格式与reduce函数相同. •这样可以有效的较少中间结果,减少网络传输负荷. •什么情况下可以使用Combiner.

[转]基于mapreduce的Hadoop join实现

- -
对于一个大数据的分析应用,join是必不可少的一项功能.现在很多构建与hadoop之上的应用,如Hive,PIG等在其内部实现了join程序,可以通过很简单的sql语句或者数据操控脚本完成相应的Join工作.那么join应该如何实现呢?今天我们就对join做一个简单的实现. 我们来看一个例子,现在有两组数据:一组为单位人员信息,如下:.

hadoop的IO和MapReduce优化参数

- - CSDN博客系统运维推荐文章
           在MapReduce执行过程中,特别是Shuffle阶段,尽量使用内存缓冲区存储数据,减少磁盘溢写次数;同时在作业执行过程中增加并行度,都能够显著提高系统性能,这也是配置优化的一个重要依据.            下面分别介绍I/O属性和MapReduce属性这两个类的部分属性,并指明其优化方向.

Hadoop MapReduce编程入门案例

- - CSDN博客云计算推荐文章
Hadoop入门例程简析中. (下面的程序下载地址: http://download.csdn.net/detail/zpcandzhj/7810829). (1)Hadoop新旧API的区别. 新的API倾向于使用虚类(抽象类),而不是接口,因为这更容易扩展. 例如,可以无需修改类的实现而在虚类中添加一个方法(即用默认的实现).