Hadoop MapReduce高级编程
- - 互联网 - ITeye博客•combine函数把一个map函数产生的对(多个key, value)合并成一个新的. 将新的作为输入到reduce函数中,其格式与reduce函数相同. •这样可以有效的较少中间结果,减少网络传输负荷. •什么情况下可以使用Combiner.
(下面的程序下载地址: http://download.csdn.net/detail/zpcandzhj/7810829)
<property> <name>fs.trash.interval</name> <value>1440</value> <description>Number of minutes between trash checkpoints. If zero, the trash feature is disabled. </description> </property>
在Eclipse中右击点Run on Hadoop运行MR作业但在web页面(http://hadoop:50070和http://hadoop:50030)看不到作业运行记录
public class WordCount { // 略... public static void main(String[] args) throws Exception { //新API就是通过Configuration对象进行作业的配置 Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); // 略... Job job = new Job(conf, "word count"); // 略... System.exit(job.waitForCompletion(true) ? 0 : 1); } }这段程序中使用到了GenericOptionsParser这个类,它的作用是将命令行中参数自动设置到变量conf中。
</pre><h3>(5)添加第三方jar包</h3><p>在用命令行运行MR作业时,如果出现ClassNotFoundException可能是因为缺少第三方jar包,可以把第三方jar包copy到hadoop安装目录下放置jar的那个目录。</p><h2>二、Mapreduce例程</h2><h3>1、WordCount</h3>程序:新API版<pre name="code" class="java">package inAction; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; //基于新API的WordCount例子(用extends Configured implements Tool的方式便于管理作业) public class MyWordCount extends Configured implements Tool { public static class MyMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer st = new StringTokenizer(value.toString()); while (st.hasMoreTokens()) { word.set(st.nextToken()); context.write(word, one); } } } public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; for (IntWritable value : values) { count += value.get(); } result.set(count); context.write(key, result); } } @Override public int run(String[] args) throws Exception { Configuration conf=getConf();//新APIConfiguration对象负责作业配置 //ToolRunner工具会自动调用隐藏的GenericOptionsParser将命令行参数设置到conf中 Job job=new Job(conf,"MyWordCount"); job.setJarByClass(MyWordCount.class); job.setMapperClass(MyMapper.class); job.setCombinerClass(MyReducer.class); job.setReducerClass(MyReducer.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //输入输出参数可以在命令行运行时设置,也可以在程序中直接设置,右击run on hadoop FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true)?0:1); return 0; } public static void main(String[] args) throws Exception { int res=ToolRunner.run(new Configuration(), new MyWordCount(), args); System.exit(res); } } 运行:hadoop jar /root/wordCount.jar /usr/input /usr/output (jar包一般放在本地磁盘上,而输入输出文件放在HDFS上。也可以在程序中直接给出输入输出路径,数据文件也可以放在本地硬盘。 本例中的数据文件为word1、word2,文件可以没有后缀名,全部放在HDFS中的/usr/input目录下)
程序:旧API版 package inAction; import java.io.IOException; import java.util.Iterator; import java.util.StringTokenizer; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; //基于旧API的WordCount实现 public class WordCount2 { public static class MyMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private IntWritable one = new IntWritable(1); private Text word = new Text(); @Override public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { StringTokenizer st = new StringTokenizer(value.toString()); while (st.hasMoreTokens()) { word.set(st.nextToken()); output.collect(word, one); } } } public static class MyReduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable res = new IntWritable(); @Override public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } res.set(sum); output.collect(key, res); } } public static void main(String[] args) throws IOException { //旧API使用JobConf配置作业 JobConf conf=new JobConf(WordCount2.class); conf.setJobName("OldAPIWordCount"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(MyMapper.class); conf.setCombinerClass(MyReduce.class); conf.setReducerClass(MyReduce.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path("hdfs://hadoop:9000/usr/wordsIn")); FileOutputFormat.setOutputPath(conf, new Path("hdfs://hadoop:9000/usr/wordsOut2")); JobClient.runJob(conf);//新API中JobClient已删除 } }
程序:MaxTemperature.java package inAction; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; //找出每一年的最高气温(本例只用了1901年和1902年的数据,网上下的) public class MaxTemperature extends Configured implements Tool{ //Mapper的功能是提取每一行原始数据中的年份和温度值 public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ private static final int MISSING=9999;//如果一行的气温值是9999即表明该年气温缺失 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line=value.toString(); String year=line.substring(15,19);//年份是15-19个字符 int temperature; //气温有正负要区别对待,Integer.parseInt不能处理负数 if(line.charAt(87)=='+'){ temperature=Integer.parseInt(line.substring(88,92)); }else{ temperature=Integer.parseInt(line.substring(87,92)); } String quantity=line.substring(92,93);//quantity.matches("[01459]")表明数量只有是01459时才是有效气温值 //只有有效气温值才输出 if(quantity.matches("[01459]")&&temperature!=MISSING){ context.write(new Text(year), new IntWritable(temperature)); } } } public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int maxValue=Integer.MIN_VALUE; for(IntWritable temp:values){ maxValue=Math.max(temp.get(), maxValue); } context.write(key, new IntWritable(maxValue)); } } @Override public int run(String[] args) throws Exception { Configuration conf=getConf(); Job job=new Job(conf,"MaxTemperature"); job.setJarByClass(MaxTemperature.class); job.setMapperClass(MyMapper.class); job.setCombinerClass(MyReducer.class);//设置Combiner减少传递给Reducer的数据量,提高性能 job.setReducerClass(MyReducer.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //输入输出参数可以在命令行运行时设置,也可以在程序中直接设置,右击run on hadoop FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true)?0:1); return 0; } public static void main(String[] args) throws Exception { int res=ToolRunner.run(new Configuration(), new MaxTemperature(), args); System.exit(res); } }关于MaxTemperature的详细解释可以参考这篇文章:http://www.linuxidc.com/Linux/2012-05/61196.htm
对于一个专利数据集统计每个专利及引用它的专利,即输出形如:专利号1 引用专利1的专利号,引用专利1的专利号...(本例来自《Hadoop in action》)
程序:MyJob2.java
package inAction; import java.io.IOException; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.*; public class MyJob2 extends Configured implements Tool { public static class MapClass extends Mapper<LongWritable, Text, Text, Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] citation = value.toString().split(","); context.write(new Text(citation[1]), new Text(citation[0])); } } public static class Reduce extends Reducer<Text, Text, Text, Text> { protected void reduce(Text key, Iterable<Text> value, Context context) throws IOException, InterruptedException { String csv = ""; for (Text val : value) { if (csv.length() > 0) csv += ","; csv += val.toString(); } context.write(key, new Text(csv)); } } @Override public int run(String[] arg0) throws Exception { Configuration conf=getConf(); Job job=new Job(conf,"MyJob2"); job.setJarByClass(MyJob2.class); Path in=new Path("/root/cite75_99.txt"); Path out=new Path("/root/inAction3"); FileInputFormat.setInputPaths(job, in); FileOutputFormat.setOutputPath(job, out); job.setMapperClass(MapClass.class); job.setReducerClass(Reduce.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); System.exit(job.waitForCompletion(true)?0:1); return 0; } public static void main(String[] args) throws Exception { int res=ToolRunner.run(new Configuration(), new MyJob2(), args); System.exit(res); } } 注:专利数据集apat63_99.txt和cite75_99.txt可以从网上下载。