mapreduce倒排序索引

标签: mapreduce 排序 索引 | 发表时间:2013-11-07 00:38 | 作者:y0908105023
出处:http://blog.csdn.net

看第一个代码:

 

package Inverse;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class InverseIndex {

 private static class IndexMapper extends Mapper<Object, Text, Text, Text>{

  private Text word_filepath = new Text();//文件路径
  
  private  Text one  = new Text("1");//个数
  @Override
  protected void map(Object key, Text value, Context context)
    throws IOException, InterruptedException {
   String str = value.toString().replaceAll("[^0-9a-zA-Z]", " ");
   String[] ss = str.split(" \\s+");
   
   FileSplit fileSplit = (FileSplit) context.getInputSplit();
   String fileName = fileSplit.getPath().getName();
   
   System.out.println("InverseIndex.IndexMapper.map()"+value.toString()+"  "+fileName);
   
   for(int i=0;i<ss.length;i++){
    word_filepath.set(ss[i]+"###"+fileName);
    context.write(word_filepath, one);
   }
  }
  
 }
 
 private static class IndexCombiner extends Reducer<Text, Text, Text, Text>{

  protected void reduce(Text key, Iterable<Text> value,Context context)
    throws IOException, InterruptedException {
   String[] str = key.toString().split("###");
   int sum = 0;
   
   for (Iterator iterator = value.iterator(); iterator.hasNext();) {
    Text val = (Text) iterator.next();
    sum +=Integer.parseInt(val.toString());
   }
   context.write(new Text(str[0]), new Text(str[1]+"###"+sum));
  }
  
 }
 
 public static class Mypartitioner extends Partitioner<Text, Text>{

  @Override
  public int getPartition(Text key, Text value, int numPartitions) {
//   if(key.toString().)
   return 0;
  }
  
 }
 
 public static class IndexReducer extends Reducer<Text, Text, Text, Text>{

  protected void reduce(Text key, Iterable<Text> value,Context context)
    throws IOException, InterruptedException {
   StringBuffer sb = new  StringBuffer();
   for (Iterator iterator = value.iterator(); iterator.hasNext();) {
    Text val = (Text) iterator.next();
    String str = val.toString();
    
    
    
    sb.append(str);
    if(iterator.hasNext()){
     sb.append(";");
    }
   }
   context.write(key, new Text(sb.toString()));
  }
  
 }
 
 public static void main(String[] args) throws Exception {
  Configuration conf = new Configuration();
  String[] otherArgs = new String[]{"/testinverse/","/inverseout"};
  conf.set("mapred.job.tracker", "172.24.132.190:9001");
  Job job = new Job(conf, "word count");
     System.out.println(job.getJar());
    
     job.setJarByClass(InverseIndex.class);
    
     job.setMapperClass(IndexMapper.class);
     job.setCombinerClass(IndexCombiner.class);//优化
     job.setReducerClass(IndexReducer.class);
//     job.setNumReduceTasks(2);
    
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(Text.class);
    
     FileSystem fs = FileSystem.get(conf);
     Path temp = new Path(otherArgs[1]);
     if (fs.exists(temp)) {
      fs.delete(temp, true);
   
  }
     FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
     FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    
     System.exit(job.waitForCompletion(true) ? 0 : 1);
 }
}

 

上面这个代码可以处理单机节点的,加入是多台机器执行mapper函数,那么就会出现问题。所以下面是处理机器环境的倒排序索引:

package Inverse;

import java.io.IOException;
import java.util.StringTokenizer;
import java.util.Vector;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

/**
 * 倒排索引
 * Map阶段输出格式key=bjsxt###t1.html value=1
 * 采用Partitioner将key相同的用同一个reduce处理(只得到key的#前边相同的用同一个reducec处理)
 * @author Dingzhiwei
 *
 */
public class InvertedIndex2 {
   
    private static Text oldkey = null;
    private static Vector<Text> vector = new Vector<Text>();
   
    public static class InvertedIndexMapper extends
            Mapper<LongWritable, Text, Text, IntWritable>{
        private final static IntWritable one = new IntWritable(1);
        public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException{
            //获取文件名及预处理
            FileSplit fileSplit = (FileSplit)context.getInputSplit();
            String fileName = fileSplit.getPath().getName();
            String wordlineString = new String(value.toString().replaceAll("[^1-9a-zA-Z]", " "));
            StringTokenizer itr = new StringTokenizer(wordlineString.toLowerCase());
            while(itr.hasMoreTokens()){
                String tempKey = itr.nextToken();
                String temp2 = tempKey + "#" + fileName;
                context.write(new Text(temp2), one);
            }
        }
    }
   
    public static class InvertedIndexCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{
        private IntWritable result = new IntWritable();
        public void reduce(Text key, Iterable<IntWritable> values,
                Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }
   
    public static class InvertedIndexPartioner extends HashPartitioner<Text, IntWritable>{
        public int getPartition(Text key, IntWritable value, int numReduceTasks) {
            String term = key.toString().split("#")[0];
            super.getPartition(new Text(term), value, numReduceTasks);
            return 0;
        }       
    }
   
    public static class InvertedIndexReducer extends Reducer<Text, IntWritable, Text, Text>{
        //reduce阶段输出为  bjsxt t1.html###2;t2.html###4
        public void reduce(Text key, Iterable<IntWritable> values, Context context
                )throws IOException, InterruptedException{
            String[] temp = key.toString().split("#");
            Text key1 = new Text(temp[0]);
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            Text valueText = new Text(String.valueOf(sum));
            if(oldkey == null || !oldkey.equals(key1) ){
                if(oldkey != null){
                    // 输出
                    StringBuffer tmpString = new StringBuffer();
                    for(Text t: vector){
                        tmpString.append(t.toString());
                    }
                    context.write(oldkey, new Text(tmpString.toString()));
                }
                vector.clear();
                //vector1.add(new Text(new String("" + '\t')));
                oldkey = key1;
            }
            Text addText = new Text(temp[1] + "###" + valueText + ';');
            vector.add(addText);
        }
       
        //reduce阶段的清理工作
        protected void cleanup(Context context)
            throws IOException, InterruptedException{
            StringBuffer tmpString = new StringBuffer();
            for(Text t: vector){
                tmpString.append(t.toString());
            }
            context.write(oldkey, new Text(tmpString.toString()));
        }
       
    }
           
    public static void main(String[] args){
        try{
            Configuration conf = new Configuration();
            String[] otherArgs = new String[] { "/testinverse", "/inverseout" };
//            conf
            Job job = new Job(conf, "invert index");
            conf.set("mapred.job.tracker", "172.24.132.190:9001");
            job.setJarByClass(InvertedIndex2.class);
            job.setInputFormatClass(TextInputFormat.class);
            job.setMapperClass(InvertedIndexMapper.class);
            job.setReducerClass(InvertedIndexReducer.class);
            job.setCombinerClass(InvertedIndexCombiner.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            job.setPartitionerClass(InvertedIndexPartioner.class);
            FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
            FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
            FileSystem.get(conf).delete(new Path(otherArgs[1]), true);
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }catch(IOException e){
            e.printStackTrace();
        }catch(InterruptedException e){
            e.printStackTrace();
        }catch(ClassNotFoundException e){
            e.printStackTrace();
        }
       
    }
}

 

 

上面使用了partitioner,把reducer的输入首先判断一下,加入key的前半部分即单词相同,就放在同一个机器上面执行reducer,在reducer中,每次打印出上一个单词统计结果,所以加入cleanup函数打印最后一个单词的统计结果。前后两个差别已经比较大,认真看,会发现思路也有所不同。

 

partitioner介绍:

partitioner: 

    得到map给的记录后,他们该分配给哪些reducer来处理呢?hadoop采用的默认的派发方式是根据散列值来派发的,但是实际中,这并不能很高效或者按照我们要求的去执行任务。例如,经过partition处理后,一个节点的reducer分配到了20条记录,另一个却分配道了10W万条,试想,这种情况效率如何。又或者,我们想要处理后得到的文件按照一定的规律进行输出,假设有两个reducer,我们想要最终结果中part-00000中存储的是"h"开头的记录的结果,part-00001中存储其他开头的结果,这些默认的partitioner是做不到的。所以需要我们自己定制partition来根据自己的要求,选择记录的reducer。

  自定义partitioner很简单,只要自定义一个类,并且继承Partitioner类,重写其getPartition方法就好了,在使用的时候通过调用Job的setPartitionerClass指定一下即可

 

 

 

 

 

 

 

 

 

作者:y0908105023 发表于2013-11-6 16:38:01 原文链接
阅读:95 评论:0 查看评论

相关 [mapreduce 排序 索引] 推荐:

mapreduce倒排序索引

- - CSDN博客云计算推荐文章
  private Text word_filepath = new Text();//文件路径.   private  Text one  = new Text("1");//个数.      job.setCombinerClass(IndexCombiner.class);//优化. 上面这个代码可以处理单机节点的,加入是多台机器执行mapper函数,那么就会出现问题.

MapReduce之二次排序

- - CSDN博客云计算推荐文章
     面试官问了一个MapReduce问题:“如何用MapReduce实现两个表的连接”.      我说“用两个job实现,第一个. 于是这两天回来看了下MapReduce的二次排序.      在某些情况下,需要对reduce中的value进行排序. 二次排序,可以将根据key聚合起来的valueList根据value进行排序.

【原】MapReduce的排序和二次排序

- - ITeye博客
自己学习排序和二次排序的知识整理如下. 1.Hadoop的序列化格式介绍:Writable. 2.Hadoop的key排序逻辑. 4.如何自定义自己的Writable类型. 1.Hadoop的序列化格式介绍:Writable. 要了解和编写MR实现排序必须要知道的第一个知识点就是Writable相关的接口和类,这些是HADOOP自己的序列化格式.

利用hadoop mapreduce 做数据排序

- - zzm
我们的需求是想统计一个文件中用IK分词后每个词出现的次数,然后按照出现的次数降序排列. 由于hadoop在reduce之后就不能对结果做什么了,所以只能分为两个job完成,第一个job统计次数,第二个job对第一个job的结果排序. 第一个job的就是hadoop最简单的例子countwords,我要说的是用hadoop对结果排序.

MapReduce 编程之 倒排索引

- - CSDN博客云计算推荐文章
本文调试环境: ubuntu 10.04 , hadoop-1.0.2. hadoop装的是伪分布模式,就是只有一个节点,集namenode, datanode, jobtracker, tasktracker...于一体. 本文实现了简单的倒排索引,单词,文档路径,词频,重要的解释都会在代码注视中.

MapReduce案例之倒排索引

- - 行业应用 - ITeye博客
1       倒排索引. "倒排索引"是文档检索系统中最常用的数据结构,被广泛地应用于全文搜索引擎. 它主要是用来存储某个单词(或词组)在一个文档或一组文档中的存储位置的映射,即提供了一种根据内容来查找文档的方式. 由于不是根据文档来确定文档所包含的内容,而是进行相反的操作,因而称为倒排索引(Inverted Index).

Spark 颠覆 MapReduce 保持的排序记录

- - 开源中国社区最新新闻
在过去几年,Apache Spark的采用以惊人的速度增加着,通常被作为MapReduce后继,可以支撑数千节点规模的集群部署. 在内存中数 据处理上,Apache Spark比MapReduce更加高效已经得到广泛认识;但是当数据量远超内存容量时,我们也听到了一些机构在Spark使用 上的困扰. 因此,我们与Spark社区一起,投入了大量的精力做Spark稳定性、扩展性、性能等方面的提升.

基于hadoop的mapreduce实现倒排索引

- - ITeye博客
基于 hadoop 的 mapreduce 实现倒排索引. 倒排索引(英语: Inverted index ),也常被称为反向索引、置入档案或反向档案,是一种索引方法,被用来存储在全文搜索下某个单词在一个文档或者一组文档中的存储位置的映射. 它是文档检索系统中最常用的数据结构.

Mapreduce小结

- MAGI-CASPER/Peter Pan - 博客园-唯有前进值得敬仰
读完mapreduce论文小结一下. 1.MapReduce是一个编程模型,封装了并行计算、容错、数据分布、负载均衡等细节问题. 输入是一个key-value对的集合,中间输出也是key-value对的集合,用户使用两个函数:Map和Reduce. Map函数接受一个输入的key-value对,然后产生一个中间key-value 对的集合.

Hadoop MapReduce技巧

- - 简单文本
我在使用Hadoop编写MapReduce程序时,遇到了一些问题,通过在Google上查询资料,并结合自己对Hadoop的理解,逐一解决了这些问题. Hadoop对MapReduce中Key与Value的类型是有要求的,简单说来,这些类型必须支持Hadoop的序列化. 为了提高序列化的性能,Hadoop还为Java中常见的基本类型提供了相应地支持序列化的类型,如IntWritable,LongWritable,并为String类型提供了Text类型.