MapReduce编程模型
- - CSDN博客云计算推荐文章MapReduce是一个Google发明的编程模型,也是一个处理和生成超大规模数据集的算法模型的相关实现. 用户首先创建一个Map函数处理一个基于对的数据集合,输出的中间结果基于对的数据集合,然后再创建一个Reduce函数用来合并所有的具有相同中间Key值的中间Value值.
本篇介绍MapReduce的一些高级特性,如计数器、数据集的排序和连接。计数器是一种收集作业统计信息的有效手段,排序是MapReduce的核心技术,MapReduce也能够执行大型数据集间的“”连接(join)操作。
计数器是一种收集作业统计信息的有效手段,用于质量控制或应用级统计。计数器还可用于辅助诊断系统故障。对于大型分布式系统来说,获取计数器比分析日志文件容易的多。
import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; //统计最高气温的作业,也统计气温值缺少的记录,不规范的记录 public class MaxTemperatureWithCounters extends Configured implements Tool { enum Temperature { MiSSING, MALFORMED } static class MaxTemeratureMapperWithCounters extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private NcdcRecordParser parser = new NcdcRecordParser(); @Override public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { parser.parse(value); if (parser.isValidTemperature()) { int airTemperature = parser.getAirTemperature(); output.collect(new Text(parser.getYear()), new IntWritable( airTemperature)); } else if (parser.isMa1formedTemperature()) { reporter.incrCounter(Temperature.MALFORMED, 1); } else if (parser.IsMissingTemperature()) { reporter.incrCounter(Temperature.MALFORMED, 1); } } } static class MaxTemperatureReduceWithCounters extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int maxValue = Integer.MIN_VALUE; while (values.hasNext()) { maxValue = Math.max(maxValue, values.next().get()); } output.collect(key, new IntWritable(maxValue)); } } @Override public int run(String[] args) throws Exception { args = new String[] { "/test/input/t", "/test/output/t" }; // 给定输入输出路径 JobConf conf = JobBuilder.parseInputAndOutput(this, getConf(), args); if (conf == null) { return -1; } conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(MaxTemeratureMapperWithCounters.class); conf.setCombinerClass(MaxTemperatureReduceWithCounters.class); conf.setReducerClass(MaxTemperatureReduceWithCounters.class); JobClient.runJob(conf); return 0; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new MaxTemperatureWithCounters(), args); System.exit(exitCode); } }
import org.apache.hadoop.conf.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; //统计气温缺失记录所占比例 public class MissingTemperatureFields extends Configured implements Tool { @Override public int run(String[] args) throws Exception { String jobID = args[0]; JobClient jobClient = new JobClient(new JobConf(getConf())); RunningJob job = jobClient.getJob(JobID.forName(jobID)); if (job == null) { System.err.printf("No job with ID %s found.\n", jobID); return -1; } if (!job.isComplete()) { System.err.printf("Job %s is not complete.\n", jobID); return -1; } Counters counters = job.getCounters(); long missing = counters .getCounter(MaxTemperatureWithCounters.Temperature.MiSSING); long total = counters.findCounter( "org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS") .getCounter(); System.out.printf("Records with missing temperature fields:%.2f%%\n", 100.0 * missing / total); return 0; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new MissingTemperatureFields(), args); System.exit(exitCode); } }
排序是MapReduce的核心技术。尽管应用本身可能并不需要对数据排序,但仍可能使用MapReduce的排序功能来组织数据。下面将讨论几种不同的数据集排序方法,以及如何控制MapReduce的排序。
import java.io.IOException; import org.apache.hadoop.conf.*; import org.apache.hadoop.io.*; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; public class SortDataPreprocessor extends Configured implements Tool { static class CleanerMapper extends MapReduceBase implements Mapper<LongWritable, Text, IntWritable, Text> { private NcdcRecordParser parser = new NcdcRecordParser(); @Override public void map(LongWritable key, Text value, OutputCollector<IntWritable, Text> output, Reporter reporter) throws IOException { parser.parse(value); if (parser.isValidTemperature()) { output.collect(new IntWritable(parser.getAirTemperature()), value); } } } @Override public int run(String[] args) throws Exception { args = new String[] { "/test/input/t", "/test/input/seq" }; JobConf conf = JobBuilder.parseInputAndOutput(this, getConf(), args); if (conf == null) { return -1; } conf.setMapperClass(CleanerMapper.class); conf.setOutputKeyClass(IntWritable.class); conf.setOutputValueClass(Text.class); conf.setNumReduceTasks(0); conf.setOutputFormat(SequenceFileOutputFormat.class); SequenceFileOutputFormat.setCompressOutput(conf, true); SequenceFileOutputFormat .setOutputCompressorClass(conf, GzipCodec.class); SequenceFileOutputFormat.setOutputCompressionType(conf, CompressionType.BLOCK); JobClient.runJob(conf); return 0; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new SortDataPreprocessor(), args); System.exit(exitCode); } }
import org.apache.hadoop.conf.*; import org.apache.hadoop.io.*; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; public class SortByTemperatureUsingHashPartitioner extends Configured implements Tool { @Override public int run(String[] args) throws Exception { args = new String[] { "/test/input/seq", "/test/output/t" }; JobConf conf = JobBuilder.parseInputAndOutput(this, getConf(), args); if (conf == null) { return -1; } conf.setInputFormat(SequenceFileInputFormat.class); conf.setOutputKeyClass(IntWritable.class); conf.setOutputFormat(SequenceFileOutputFormat.class); conf.setNumReduceTasks(5);//设置5个reduce任务,输出5个文件 SequenceFileOutputFormat.setCompressOutput(conf, true); SequenceFileOutputFormat .setOutputCompressorClass(conf, GzipCodec.class); SequenceFileOutputFormat.setOutputCompressionType(conf, CompressionType.BLOCK); JobClient.runJob(conf); return 0; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run( new SortByTemperatureUsingHashPartitioner(), args); System.exit(exitCode); } }
产生多个已经排好序的小文件。
MapReduce能够执行大型数据集间的“”连接(join)操作,但是从头编写相关代码来执行连接比较麻烦。也可以考虑使用一个更高级的框架,如Pig、Hive或Casading等,它们都将连接操作视为整个实现的核心部分。
其他章节也可能用到:)
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.util.Tool; public class JobBuilder { public static JobConf parseInputAndOutput(Tool tool, Configuration conf, String[] args) { if (args.length != 2) { printUsage(tool, "<input><output>"); return null; } JobConf jobConf = new JobConf(conf, tool.getClass()); FileInputFormat.addInputPath(jobConf, new Path(args[0])); FileOutputFormat.setOutputPath(jobConf, new Path(args[1])); return jobConf; } public static void printUsage(Tool tool, String extraArgsUsage) { System.err.printf("Usage:%s [genericOptions] %s\n\n", tool.getClass() .getSimpleName(), extraArgsUsage); } }
import org.apache.hadoop.io.Text; public class NcdcRecordParser { private static final int MISSING_TEMPERATURE = 9999; private String year; private int airTemperature; private String quality; public void parse(String record) { year = record.substring(15, 19); String airTemperatureString; // Remove leading plus sign as parseInt doesn't like them if (record.charAt(87) == '+') { airTemperatureString = record.substring(88, 92); } else { airTemperatureString = record.substring(87, 92); } airTemperature = Integer.parseInt(airTemperatureString); quality = record.substring(92, 93); } public void parse(Text record) { parse(record.toString()); } public boolean isValidTemperature() { return airTemperature != MISSING_TEMPERATURE && quality.matches("[01459]"); } public boolean isMa1formedTemperature() { return !quality.matches("[01459]"); } public boolean IsMissingTemperature() { return airTemperature == MISSING_TEMPERATURE; } public String getYear() { return year; } public int getAirTemperature() { return airTemperature; } }