mapreduce倒排序索引
看第一个代码:
package Inverse;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class InverseIndex {
private static class IndexMapper extends Mapper<Object, Text, Text, Text>{
private Text word_filepath = new Text();//文件路径
private Text one = new Text("1");//个数
@Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String str = value.toString().replaceAll("[^0-9a-zA-Z]", " ");
String[] ss = str.split(" \\s+");
FileSplit fileSplit = (FileSplit) context.getInputSplit();
String fileName = fileSplit.getPath().getName();
System.out.println("InverseIndex.IndexMapper.map()"+value.toString()+" "+fileName);
for(int i=0;i<ss.length;i++){
word_filepath.set(ss[i]+"###"+fileName);
context.write(word_filepath, one);
}
}
}
private static class IndexCombiner extends Reducer<Text, Text, Text, Text>{
protected void reduce(Text key, Iterable<Text> value,Context context)
throws IOException, InterruptedException {
String[] str = key.toString().split("###");
int sum = 0;
for (Iterator iterator = value.iterator(); iterator.hasNext();) {
Text val = (Text) iterator.next();
sum +=Integer.parseInt(val.toString());
}
context.write(new Text(str[0]), new Text(str[1]+"###"+sum));
}
}
public static class Mypartitioner extends Partitioner<Text, Text>{
@Override
public int getPartition(Text key, Text value, int numPartitions) {
// if(key.toString().)
return 0;
}
}
public static class IndexReducer extends Reducer<Text, Text, Text, Text>{
protected void reduce(Text key, Iterable<Text> value,Context context)
throws IOException, InterruptedException {
StringBuffer sb = new StringBuffer();
for (Iterator iterator = value.iterator(); iterator.hasNext();) {
Text val = (Text) iterator.next();
String str = val.toString();
sb.append(str);
if(iterator.hasNext()){
sb.append(";");
}
}
context.write(key, new Text(sb.toString()));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new String[]{"/testinverse/","/inverseout"};
conf.set("mapred.job.tracker", "172.24.132.190:9001");
Job job = new Job(conf, "word count");
System.out.println(job.getJar());
job.setJarByClass(InverseIndex.class);
job.setMapperClass(IndexMapper.class);
job.setCombinerClass(IndexCombiner.class);//优化
job.setReducerClass(IndexReducer.class);
// job.setNumReduceTasks(2);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileSystem fs = FileSystem.get(conf);
Path temp = new Path(otherArgs[1]);
if (fs.exists(temp)) {
fs.delete(temp, true);
}
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
上面这个代码可以处理单机节点的,加入是多台机器执行mapper函数,那么就会出现问题。所以下面是处理机器环境的倒排序索引:
package Inverse;
import java.io.IOException;
import java.util.StringTokenizer;
import java.util.Vector;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
/**
* 倒排索引
* Map阶段输出格式key=bjsxt###t1.html value=1
* 采用Partitioner将key相同的用同一个reduce处理(只得到key的#前边相同的用同一个reducec处理)
* @author Dingzhiwei
*
*/
public class InvertedIndex2 {
private static Text oldkey = null;
private static Vector<Text> vector = new Vector<Text>();
public static class InvertedIndexMapper extends
Mapper<LongWritable, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException{
//获取文件名及预处理
FileSplit fileSplit = (FileSplit)context.getInputSplit();
String fileName = fileSplit.getPath().getName();
String wordlineString = new String(value.toString().replaceAll("[^1-9a-zA-Z]", " "));
StringTokenizer itr = new StringTokenizer(wordlineString.toLowerCase());
while(itr.hasMoreTokens()){
String tempKey = itr.nextToken();
String temp2 = tempKey + "#" + fileName;
context.write(new Text(temp2), one);
}
}
}
public static class InvertedIndexCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static class InvertedIndexPartioner extends HashPartitioner<Text, IntWritable>{
public int getPartition(Text key, IntWritable value, int numReduceTasks) {
String term = key.toString().split("#")[0];
super.getPartition(new Text(term), value, numReduceTasks);
return 0;
}
}
public static class InvertedIndexReducer extends Reducer<Text, IntWritable, Text, Text>{
//reduce阶段输出为 bjsxt t1.html###2;t2.html###4
public void reduce(Text key, Iterable<IntWritable> values, Context context
)throws IOException, InterruptedException{
String[] temp = key.toString().split("#");
Text key1 = new Text(temp[0]);
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
Text valueText = new Text(String.valueOf(sum));
if(oldkey == null || !oldkey.equals(key1) ){
if(oldkey != null){
// 输出
StringBuffer tmpString = new StringBuffer();
for(Text t: vector){
tmpString.append(t.toString());
}
context.write(oldkey, new Text(tmpString.toString()));
}
vector.clear();
//vector1.add(new Text(new String("" + '\t')));
oldkey = key1;
}
Text addText = new Text(temp[1] + "###" + valueText + ';');
vector.add(addText);
}
//reduce阶段的清理工作
protected void cleanup(Context context)
throws IOException, InterruptedException{
StringBuffer tmpString = new StringBuffer();
for(Text t: vector){
tmpString.append(t.toString());
}
context.write(oldkey, new Text(tmpString.toString()));
}
}
public static void main(String[] args){
try{
Configuration conf = new Configuration();
String[] otherArgs = new String[] { "/testinverse", "/inverseout" };
// conf
Job job = new Job(conf, "invert index");
conf.set("mapred.job.tracker", "172.24.132.190:9001");
job.setJarByClass(InvertedIndex2.class);
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(InvertedIndexMapper.class);
job.setReducerClass(InvertedIndexReducer.class);
job.setCombinerClass(InvertedIndexCombiner.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setPartitionerClass(InvertedIndexPartioner.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
FileSystem.get(conf).delete(new Path(otherArgs[1]), true);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}catch(IOException e){
e.printStackTrace();
}catch(InterruptedException e){
e.printStackTrace();
}catch(ClassNotFoundException e){
e.printStackTrace();
}
}
}
上面使用了partitioner,把reducer的输入首先判断一下,加入key的前半部分即单词相同,就放在同一个机器上面执行reducer,在reducer中,每次打印出上一个单词统计结果,所以加入cleanup函数打印最后一个单词的统计结果。前后两个差别已经比较大,认真看,会发现思路也有所不同。
partitioner介绍:
partitioner:
得到map给的记录后,他们该分配给哪些reducer来处理呢?hadoop采用的默认的派发方式是根据散列值来派发的,但是实际中,这并不能很高效或者按照我们要求的去执行任务。例如,经过partition处理后,一个节点的reducer分配到了20条记录,另一个却分配道了10W万条,试想,这种情况效率如何。又或者,我们想要处理后得到的文件按照一定的规律进行输出,假设有两个reducer,我们想要最终结果中part-00000中存储的是"h"开头的记录的结果,part-00001中存储其他开头的结果,这些默认的partitioner是做不到的。所以需要我们自己定制partition来根据自己的要求,选择记录的reducer。
自定义partitioner很简单,只要自定义一个类,并且继承Partitioner类,重写其getPartition方法就好了,在使用的时候通过调用Job的setPartitionerClass指定一下即可