基于hadoop的mapreduce实现倒排索引
- - ITeye博客基于 hadoop 的 mapreduce 实现倒排索引. 倒排索引(英语: Inverted index ),也常被称为反向索引、置入档案或反向档案,是一种索引方法,被用来存储在全文搜索下某个单词在一个文档或者一组文档中的存储位置的映射. 它是文档检索系统中最常用的数据结构.
倒排索引(英语: Inverted index ),也常被称为反向索引、置入档案或反向档案,是一种索引方法,被用来存储在全文搜索下某个单词在一个文档或者一组文档中的存储位置的映射。它是文档检索系统中最常用的数据结构。通过倒排索引,可以根据单词快速获取包含这个单词的文档列表。倒排索引主要由两个部分组成:“单词词典”和“倒排文件”。
倒排索引 有两种不同的反向索引形式:一条记录的水平反向索引(或者反向档案索引)包含每个引用单词的文档的列表。一个单词的水平反向索引(或者完全反向索引)又包含每个单词在一个文档中的位置。后者的形式提供了更多的兼容性(比如短语搜索),但是需要更多的时间和空间来创建。现代搜索引擎的索引 [3] 都是基于倒排索引。相比“签名文件”、“后缀树”等索引结构, “倒排索引”是实现单词到文档映射关系的最佳实现方式和最有效的索引结构。
输入一组文档:
file1 | xiao lin lin |
file2 | yu yu xiao lin |
file3 | xiao lin xiao lin |
输出结果:
lin | file1:2;file2:1;file3:2 |
xiao | file3:2;file2:1;file1:1 |
yu | file2:2 |
实现代码:
package com.ds.demo; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class InvertedIndex { public static class InvertedIndexMapper extends Mapper<Object, Text, Text, Text>{ private Text keyInfo = new Text(); private Text valueInfo = new Text(); private FileSplit split; public void map(Object key, Text value, Context context) throws IOException, InterruptedException{ //获取文档 split = (FileSplit)context.getInputSplit(); StringTokenizer itr = new StringTokenizer(value.toString()); while(itr.hasMoreTokens()){ keyInfo.set(itr.nextToken() + ":" + split.getPath().toString()); valueInfo.set("1"); context.write(keyInfo, valueInfo); } } } public static class InvertedIndexCombiner extends Reducer<Text, Text, Text, Text>{ private Text info= new Text(); public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException{ int sum = 0; for(Text value : values){ sum += Integer.parseInt(value.toString()); } int splitIndex = key.toString().indexOf(":"); info.set(key.toString().substring(splitIndex + 1) + ":" + sum); key.set(key.toString().substring(0, splitIndex)); context.write(key, info); } } public static class InvertedIndexReducer extends Reducer<Text, Text, Text, Text>{ private Text result = new Text(); public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException{ String fileList = new String(); for(Text value : values){ fileList += value.toString() + ";"; } result.set(fileList); context.write(key, result); } } public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); conf.set("mapred.job.tracker", "192.168.9.201:9001"); String[] ars=new String[]{"B","InvertedOut"}; String[] otherArgs = new GenericOptionsParser(conf, ars).getRemainingArgs(); if(otherArgs.length != 2){ System.err.println("Usage: invertedindex <in> <out>"); System.exit(2); } Job job = new Job(conf, "InvertedIndex"); job.setJarByClass(InvertedIndex.class); job.setMapperClass(InvertedIndexMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setCombinerClass(InvertedIndexCombiner.class); job.setReducerClass(InvertedIndexReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
经过Map
key | value |
xiao:file1 | 1 |
lin:file1 | 1 |
lin:file1 | 1 |
...... |
经过shuffle
xiao:file1 | [1] |
lin:file1 | [1,1] |
yu:file2 | [1,1] |
.... |
经过reduce
xiao | file1:1 |
lin | file1:2 |
yu | file2:2 |
xiao | file2:1 |
lin | file2:1 |
.... |
再次 shuffle
xiao | [file1:1,file2:1.file3:2] |
lin | [file1:2,file2:1,file3:2] |
yu | [file2:2] |
再次reduce 得到结果