Hadoop MapReduce高级编程
- - 互联网 - ITeye博客•combine函数把一个map函数产生的对(多个key, value)合并成一个新的. 将新的作为输入到reduce函数中,其格式与reduce函数相同. •这样可以有效的较少中间结果,减少网络传输负荷. •什么情况下可以使用Combiner.
(下面的程序下载地址: http://download.csdn.net/detail/zpcandzhj/7810829)
<property> <name>fs.trash.interval</name> <value>1440</value> <description>Number of minutes between trash checkpoints. If zero, the trash feature is disabled. </description> </property>
在Eclipse中右击点Run on Hadoop运行MR作业但在web页面(http://hadoop:50070和http://hadoop:50030)看不到作业运行记录
public class WordCount {
// 略...
public static void main(String[] args) throws Exception {
//新API就是通过Configuration对象进行作业的配置
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf,
args).getRemainingArgs();
// 略...
Job job = new Job(conf, "word count");
// 略...
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}这段程序中使用到了GenericOptionsParser这个类,它的作用是将命令行中参数自动设置到变量conf中。 </pre><h3>(5)添加第三方jar包</h3><p>在用命令行运行MR作业时,如果出现ClassNotFoundException可能是因为缺少第三方jar包,可以把第三方jar包copy到hadoop安装目录下放置jar的那个目录。</p><h2>二、Mapreduce例程</h2><h3>1、WordCount</h3>程序:新API版<pre name="code" class="java">package inAction;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
//基于新API的WordCount例子(用extends Configured implements Tool的方式便于管理作业)
public class MyWordCount extends Configured implements Tool {
public static class MyMapper extends
Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer st = new StringTokenizer(value.toString());
while (st.hasMoreTokens()) {
word.set(st.nextToken());
context.write(word, one);
}
}
}
public static class MyReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable value : values) {
count += value.get();
}
result.set(count);
context.write(key, result);
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf=getConf();//新APIConfiguration对象负责作业配置
//ToolRunner工具会自动调用隐藏的GenericOptionsParser将命令行参数设置到conf中
Job job=new Job(conf,"MyWordCount");
job.setJarByClass(MyWordCount.class);
job.setMapperClass(MyMapper.class);
job.setCombinerClass(MyReducer.class);
job.setReducerClass(MyReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//输入输出参数可以在命令行运行时设置,也可以在程序中直接设置,右击run on hadoop
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true)?0:1);
return 0;
}
public static void main(String[] args) throws Exception {
int res=ToolRunner.run(new Configuration(), new MyWordCount(), args);
System.exit(res);
}
}
运行:hadoop jar /root/wordCount.jar /usr/input /usr/output
(jar包一般放在本地磁盘上,而输入输出文件放在HDFS上。也可以在程序中直接给出输入输出路径,数据文件也可以放在本地硬盘。
本例中的数据文件为word1、word2,文件可以没有后缀名,全部放在HDFS中的/usr/input目录下) 程序:旧API版
package inAction;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
//基于旧API的WordCount实现
public class WordCount2 {
public static class MyMapper extends MapReduceBase implements
Mapper<LongWritable, Text, Text, IntWritable> {
private IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
StringTokenizer st = new StringTokenizer(value.toString());
while (st.hasMoreTokens()) {
word.set(st.nextToken());
output.collect(word, one);
}
}
}
public static class MyReduce extends MapReduceBase implements
Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable res = new IntWritable();
@Override
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
res.set(sum);
output.collect(key, res);
}
}
public static void main(String[] args) throws IOException {
//旧API使用JobConf配置作业
JobConf conf=new JobConf(WordCount2.class);
conf.setJobName("OldAPIWordCount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(MyMapper.class);
conf.setCombinerClass(MyReduce.class);
conf.setReducerClass(MyReduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path("hdfs://hadoop:9000/usr/wordsIn"));
FileOutputFormat.setOutputPath(conf, new Path("hdfs://hadoop:9000/usr/wordsOut2"));
JobClient.runJob(conf);//新API中JobClient已删除
}
} 程序:MaxTemperature.java
package inAction;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
//找出每一年的最高气温(本例只用了1901年和1902年的数据,网上下的)
public class MaxTemperature extends Configured implements Tool{
//Mapper的功能是提取每一行原始数据中的年份和温度值
public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
private static final int MISSING=9999;//如果一行的气温值是9999即表明该年气温缺失
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line=value.toString();
String year=line.substring(15,19);//年份是15-19个字符
int temperature;
//气温有正负要区别对待,Integer.parseInt不能处理负数
if(line.charAt(87)=='+'){
temperature=Integer.parseInt(line.substring(88,92));
}else{
temperature=Integer.parseInt(line.substring(87,92));
}
String quantity=line.substring(92,93);//quantity.matches("[01459]")表明数量只有是01459时才是有效气温值
//只有有效气温值才输出
if(quantity.matches("[01459]")&&temperature!=MISSING){
context.write(new Text(year), new IntWritable(temperature));
}
}
}
public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException {
int maxValue=Integer.MIN_VALUE;
for(IntWritable temp:values){
maxValue=Math.max(temp.get(), maxValue);
}
context.write(key, new IntWritable(maxValue));
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf=getConf();
Job job=new Job(conf,"MaxTemperature");
job.setJarByClass(MaxTemperature.class);
job.setMapperClass(MyMapper.class);
job.setCombinerClass(MyReducer.class);//设置Combiner减少传递给Reducer的数据量,提高性能
job.setReducerClass(MyReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//输入输出参数可以在命令行运行时设置,也可以在程序中直接设置,右击run on hadoop
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true)?0:1);
return 0;
}
public static void main(String[] args) throws Exception {
int res=ToolRunner.run(new Configuration(), new MaxTemperature(), args);
System.exit(res);
}
}关于MaxTemperature的详细解释可以参考这篇文章:http://www.linuxidc.com/Linux/2012-05/61196.htm 对于一个专利数据集统计每个专利及引用它的专利,即输出形如:专利号1 引用专利1的专利号,引用专利1的专利号...(本例来自《Hadoop in action》)
程序:MyJob2.java
package inAction;
import java.io.IOException;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.*;
public class MyJob2 extends Configured implements Tool {
public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] citation = value.toString().split(",");
context.write(new Text(citation[1]), new Text(citation[0]));
}
}
public static class Reduce extends Reducer<Text, Text, Text, Text> {
protected void reduce(Text key, Iterable<Text> value, Context context)
throws IOException, InterruptedException {
String csv = "";
for (Text val : value) {
if (csv.length() > 0)
csv += ",";
csv += val.toString();
}
context.write(key, new Text(csv));
}
}
@Override
public int run(String[] arg0) throws Exception {
Configuration conf=getConf();
Job job=new Job(conf,"MyJob2");
job.setJarByClass(MyJob2.class);
Path in=new Path("/root/cite75_99.txt");
Path out=new Path("/root/inAction3");
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true)?0:1);
return 0;
}
public static void main(String[] args) throws Exception {
int res=ToolRunner.run(new Configuration(), new MyJob2(), args);
System.exit(res);
}
}
注:专利数据集apat63_99.txt和cite75_99.txt可以从网上下载。