MapReduce编程实战之“高级特性”

标签: mapreduce 编程 | 发表时间:2014-05-15 06:20 | 作者:puma_dong
出处:http://blog.csdn.net


本篇介绍MapReduce的一些高级特性,如计数器、数据集的排序和连接。计数器是一种收集作业统计信息的有效手段,排序是MapReduce的核心技术,MapReduce也能够执行大型数据集间的“”连接(join)操作。


计数器


计数器是一种收集作业统计信息的有效手段,用于质量控制或应用级统计。计数器还可用于辅助诊断系统故障。对于大型分布式系统来说,获取计数器比分析日志文件容易的多。


示例一:气温缺失及不规则数据计数器


import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

//统计最高气温的作业,也统计气温值缺少的记录,不规范的记录
public class MaxTemperatureWithCounters extends Configured implements Tool {

	enum Temperature {
		MiSSING, MALFORMED
	}

	static class MaxTemeratureMapperWithCounters extends MapReduceBase implements
			Mapper<LongWritable, Text, Text, IntWritable> {

		private NcdcRecordParser parser = new NcdcRecordParser();

		@Override
		public void map(LongWritable key, Text value,
				OutputCollector<Text, IntWritable> output, Reporter reporter)
				throws IOException {
			parser.parse(value);
			if (parser.isValidTemperature()) {
				int airTemperature = parser.getAirTemperature();
				output.collect(new Text(parser.getYear()), new IntWritable(
						airTemperature));
			} else if (parser.isMa1formedTemperature()) {
				reporter.incrCounter(Temperature.MALFORMED, 1);
			} else if (parser.IsMissingTemperature()) {
				reporter.incrCounter(Temperature.MALFORMED, 1);
			}

		}

	}

	static class MaxTemperatureReduceWithCounters extends MapReduceBase implements
			Reducer<Text, IntWritable, Text, IntWritable> {
		public void reduce(Text key, Iterator<IntWritable> values,
				OutputCollector<Text, IntWritable> output, Reporter reporter)
				throws IOException {
			int maxValue = Integer.MIN_VALUE;
			while (values.hasNext()) {
				maxValue = Math.max(maxValue, values.next().get());
			}
			output.collect(key, new IntWritable(maxValue));

		}
	}

	@Override
	public int run(String[] args) throws Exception {
		args = new String[] { "/test/input/t", "/test/output/t" }; // 给定输入输出路径
		JobConf conf = JobBuilder.parseInputAndOutput(this, getConf(), args);
		if (conf == null) {
			return -1;
		}
		conf.setOutputKeyClass(Text.class);
		conf.setOutputValueClass(IntWritable.class);
		conf.setMapperClass(MaxTemeratureMapperWithCounters.class);
		conf.setCombinerClass(MaxTemperatureReduceWithCounters.class);
		conf.setReducerClass(MaxTemperatureReduceWithCounters.class);
		JobClient.runJob(conf);
		return 0;
	}

	public static void main(String[] args) throws Exception {
		int exitCode = ToolRunner.run(new MaxTemperatureWithCounters(), args);
		System.exit(exitCode);
	}
}

示例二:统计气温信息缺失记录所占比例


import org.apache.hadoop.conf.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
//统计气温缺失记录所占比例

public class MissingTemperatureFields extends Configured implements Tool {

	@Override
	public int run(String[] args) throws Exception {
		String jobID = args[0];
		JobClient jobClient = new JobClient(new JobConf(getConf()));
		RunningJob job = jobClient.getJob(JobID.forName(jobID));
		if (job == null) {
			System.err.printf("No job with ID %s found.\n", jobID);
			return -1;
		}
		if (!job.isComplete()) {
			System.err.printf("Job %s is not complete.\n", jobID);
			return -1;
		}
		Counters counters = job.getCounters();
		long missing = counters
				.getCounter(MaxTemperatureWithCounters.Temperature.MiSSING);
		long total = counters.findCounter(
				"org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS")
				.getCounter();
		System.out.printf("Records with missing temperature fields:%.2f%%\n",
				100.0 * missing / total);
		return 0;
	}

	public static void main(String[] args) throws Exception {
		int exitCode = ToolRunner.run(new MissingTemperatureFields(), args);
		System.exit(exitCode);
	}
}

hadoop jar xx.jar MissingTemperatureFields job_1400072670556_0001


排序


排序是MapReduce的核心技术。尽管应用本身可能并不需要对数据排序,但仍可能使用MapReduce的排序功能来组织数据。下面将讨论几种不同的数据集排序方法,以及如何控制MapReduce的排序。


实例一、数据准备:将天气数据转成顺序文件格式


import java.io.IOException;

import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class SortDataPreprocessor extends Configured implements Tool {
	static class CleanerMapper extends MapReduceBase implements
			Mapper<LongWritable, Text, IntWritable, Text> {

		private NcdcRecordParser parser = new NcdcRecordParser();

		@Override
		public void map(LongWritable key, Text value,
				OutputCollector<IntWritable, Text> output, Reporter reporter)
				throws IOException {
			parser.parse(value);
			if (parser.isValidTemperature()) {
				output.collect(new IntWritable(parser.getAirTemperature()),
						value);
			}
		}
	}

	@Override
	public int run(String[] args) throws Exception {
		args = new String[] { "/test/input/t", "/test/input/seq" };
		JobConf conf = JobBuilder.parseInputAndOutput(this, getConf(), args);
		if (conf == null) {
			return -1;
		}
		conf.setMapperClass(CleanerMapper.class);
		conf.setOutputKeyClass(IntWritable.class);
		conf.setOutputValueClass(Text.class);
		conf.setNumReduceTasks(0);
		conf.setOutputFormat(SequenceFileOutputFormat.class);
		SequenceFileOutputFormat.setCompressOutput(conf, true);
		SequenceFileOutputFormat
				.setOutputCompressorClass(conf, GzipCodec.class);
		SequenceFileOutputFormat.setOutputCompressionType(conf,
				CompressionType.BLOCK);
		JobClient.runJob(conf);
		return 0;
	}

	public static void main(String[] args) throws Exception {
		int exitCode = ToolRunner.run(new SortDataPreprocessor(), args);
		System.exit(exitCode);
	}
}

示例二、部分排序


import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class SortByTemperatureUsingHashPartitioner extends Configured implements
		Tool {

	@Override
	public int run(String[] args) throws Exception {
		args = new String[] { "/test/input/seq", "/test/output/t" };
		JobConf conf = JobBuilder.parseInputAndOutput(this, getConf(), args);
		if (conf == null) {
			return -1;
		}
		conf.setInputFormat(SequenceFileInputFormat.class);
		conf.setOutputKeyClass(IntWritable.class);
		conf.setOutputFormat(SequenceFileOutputFormat.class);
		conf.setNumReduceTasks(5);//设置5个reduce任务,输出5个文件
		SequenceFileOutputFormat.setCompressOutput(conf, true);
		SequenceFileOutputFormat
				.setOutputCompressorClass(conf, GzipCodec.class);
		SequenceFileOutputFormat.setOutputCompressionType(conf,
				CompressionType.BLOCK);
		JobClient.runJob(conf);
		return 0;
	}

	public static void main(String[] args) throws Exception {
		int exitCode = ToolRunner.run(
				new SortByTemperatureUsingHashPartitioner(), args);
		System.exit(exitCode);
	}

}

hadoop jar test.jar SortByTemperatureUsingHashPartitioner -D mapred.reduce.tasks=30

产生多个已经排好序的小文件。


连接


MapReduce能够执行大型数据集间的“”连接(join)操作,但是从头编写相关代码来执行连接比较麻烦。也可以考虑使用一个更高级的框架,如Pig、Hive或Casading等,它们都将连接操作视为整个实现的核心部分。


本章的代码用到的基础工具类


其他章节也可能用到:)


JobBuilder


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.Tool;

public class JobBuilder {

	public static JobConf parseInputAndOutput(Tool tool, Configuration conf,
			String[] args) {
		if (args.length != 2) {
			printUsage(tool, "<input><output>");
			return null;
		}
		JobConf jobConf = new JobConf(conf, tool.getClass());
		FileInputFormat.addInputPath(jobConf, new Path(args[0]));
		FileOutputFormat.setOutputPath(jobConf, new Path(args[1]));
		return jobConf;
	}

	public static void printUsage(Tool tool, String extraArgsUsage) {
		System.err.printf("Usage:%s [genericOptions] %s\n\n", tool.getClass()
				.getSimpleName(), extraArgsUsage);
	}
}

NcdcRecordParser

 

import org.apache.hadoop.io.Text;

public class NcdcRecordParser {
	private static final int MISSING_TEMPERATURE = 9999;

	private String year;
	private int airTemperature;
	private String quality;

	public void parse(String record) {
		year = record.substring(15, 19);
		String airTemperatureString;
		// Remove leading plus sign as parseInt doesn't like them
		if (record.charAt(87) == '+') {
			airTemperatureString = record.substring(88, 92);
		} else {
			airTemperatureString = record.substring(87, 92);
		}
		airTemperature = Integer.parseInt(airTemperatureString);
		quality = record.substring(92, 93);
	}

	public void parse(Text record) {
		parse(record.toString());
	}

	public boolean isValidTemperature() {
		return airTemperature != MISSING_TEMPERATURE
				&& quality.matches("[01459]");
	}

	public boolean isMa1formedTemperature() {
		return !quality.matches("[01459]");
	}

	public boolean IsMissingTemperature() {
		return airTemperature == MISSING_TEMPERATURE;
	}

	public String getYear() {
		return year;
	}

	public int getAirTemperature() {
		return airTemperature;
	}
}

这一篇是《Hadoop权威指南》第八章的学习笔记,好久没看Hadoop,工作中也没使用,前不久学习的东西,忘记了很多。学以致用是非常重要的,没用应用的学习,最终会忘记大部分,感兴趣的就需要多多温习了。

作者:puma_dong 发表于2014-5-14 22:20:53 原文链接
阅读:78 评论:0 查看评论

相关 [mapreduce 编程] 推荐:

MapReduce编程模型

- - CSDN博客云计算推荐文章
MapReduce是一个Google发明的编程模型,也是一个处理和生成超大规模数据集的算法模型的相关实现. 用户首先创建一个Map函数处理一个基于对的数据集合,输出的中间结果基于对的数据集合,然后再创建一个Reduce函数用来合并所有的具有相同中间Key值的中间Value值.

Hadoop MapReduce高级编程

- - 互联网 - ITeye博客
•combine函数把一个map函数产生的对(多个key, value)合并成一个新的. 将新的作为输入到reduce函数中,其格式与reduce函数相同. •这样可以有效的较少中间结果,减少网络传输负荷. •什么情况下可以使用Combiner.

MapReduce编程实战之“高级特性”

- - CSDN博客云计算推荐文章
本篇介绍MapReduce的一些高级特性,如计数器、数据集的排序和连接. 计数器是一种收集作业统计信息的有效手段,排序是MapReduce的核心技术,MapReduce也能够执行大型数据集间的“”连接(join)操作. 计数器是一种收集作业统计信息的有效手段,用于质量控制或应用级统计. 计数器还可用于辅助诊断系统故障.

Hadoop MapReduce编程入门案例

- - CSDN博客云计算推荐文章
Hadoop入门例程简析中. (下面的程序下载地址: http://download.csdn.net/detail/zpcandzhj/7810829). (1)Hadoop新旧API的区别. 新的API倾向于使用虚类(抽象类),而不是接口,因为这更容易扩展. 例如,可以无需修改类的实现而在虚类中添加一个方法(即用默认的实现).

MapReuce 编程总结-多MapReduce执行

- - CSDN博客云计算推荐文章
学习hadoop,必不可少的就是写MapReduce程序,当然,对于简单的分析程序,我们只需一个MapReduce就能搞定,这里就不提单MapReuce的情况了,网上例子很多,大家可以百度Google一下. 对于比较复杂的分析程序,我们可能需要多个Job或者多个Map或者Reduce进行分析计算.         多Job或者多MapReduce的编程形式有以下几种:.

MapReduce 编程之 倒排索引

- - CSDN博客云计算推荐文章
本文调试环境: ubuntu 10.04 , hadoop-1.0.2. hadoop装的是伪分布模式,就是只有一个节点,集namenode, datanode, jobtracker, tasktracker...于一体. 本文实现了简单的倒排索引,单词,文档路径,词频,重要的解释都会在代码注视中.

文章: Apache Crunch:用于简化MapReduce编程的Java库

- - InfoQ cn
Apache Crunch(孵化器项目)是基于Google的 FlumeJava库编写的Java库,用于创建MapReduce流水线. 与其他用来创建MapReduce作业的高层工具(如Apache Hive、Apache Pig和Cascading等)类似,Crunch提供了用于实现如连接数据、执行聚合和排序记录等常见任务的模式库.

Mapreduce小结

- MAGI-CASPER/Peter Pan - 博客园-唯有前进值得敬仰
读完mapreduce论文小结一下. 1.MapReduce是一个编程模型,封装了并行计算、容错、数据分布、负载均衡等细节问题. 输入是一个key-value对的集合,中间输出也是key-value对的集合,用户使用两个函数:Map和Reduce. Map函数接受一个输入的key-value对,然后产生一个中间key-value 对的集合.

Hadoop MapReduce技巧

- - 简单文本
我在使用Hadoop编写MapReduce程序时,遇到了一些问题,通过在Google上查询资料,并结合自己对Hadoop的理解,逐一解决了这些问题. Hadoop对MapReduce中Key与Value的类型是有要求的,简单说来,这些类型必须支持Hadoop的序列化. 为了提高序列化的性能,Hadoop还为Java中常见的基本类型提供了相应地支持序列化的类型,如IntWritable,LongWritable,并为String类型提供了Text类型.

MapReduce原理

- - C++博客-牵着老婆满街逛
       MapReduce 是由Google公司的Jeffrey Dean 和 Sanjay Ghemawat 开发的一个针对大规模群组中的海量数据处理的分布式编程模型. MapReduce实现了两个功能. Map把一个函数应用于集合中的所有成员,然后返回一个基于这个处理的结果集. 而Reduce是把从两个或更多个Map中,通过多个线程,进程或者独立系统并行执行处理的结果集进行分类和归纳.