MapReduce之二次排序

标签: mapreduce 排序 | 发表时间:2014-04-14 19:58 | 作者:WuyouJie2008
出处:http://blog.csdn.net

应用场景

     面试官问了一个MapReduce问题:“如何用MapReduce实现两个表的连接”。

     我说“用两个job实现,第一个。。。,第二个。。。”

     “还要用两个?二次排序会不会?”

     “不会”。于是这两天回来看了下MapReduce的二次排序。

     在某些情况下,需要对reduce中的value进行排序。而这时,可以利用二次排序。二次排序,可以将根据key聚合起来的valueList根据value进行排序。

例子

输入数据 输出数据
1 2
3 4
5 6
7 8
7 82
12 211
20 21
20 53
20 522
31 42
40 511
50 51
50 52
50 53
50 53
50 54
50 62
50 512
50 522
60 51
60 52
60 53
60 56
60 56
60 57
60 57
60 61
63 61
70 54
70 55
70 56
70 57
70 58
70 58
71 55
71 56
73 57
74 58
203 21
530 54
730 54
740 58
1 2
3 4
5 6
7 8
7 82
12 211
20 21
20 53
20 522
31 42
40 511
50 51
50 52
50 53
50 53
50 54
50 62
50 512
50 522
60 51
60 52
60 53
60 56
60 56
60 57
60 57
60 61
63 61
70 54
70 55
70 56
70 57
70 58
70 58
71 55
71 56
73 57
74 58
203 21
530 54
730 54
740 58

解决思路

     以前一直以为Reduce中reduce(key,valueList)中键值对的各个key是一样的,而从对MapReduce的深入学习中,知道了只要满足一定条件,这些key是可以不一样的。我先分析下mapreduce过程。
原理
    map任务快结束时,会依次调用以下几个函数:
    1.job.setPartitionerClass()对键值对列表根据<key,value>进行分区,每个分区映射到一个reducer。默认的分区函数是HashPartion。
    2.job.setSortComparatorClass()设置的key比较函数类排序,默认根据key的compareTo进行比较。
   reduce阶段:
    1.接收到所有映射到这个reducer的map输出后,也是会调用job.setSortComparatorClass()设置的key比较函数类对所有数据对排序。
    2.之后构造一个key对应的value迭代器。这时就要用到分组,使用job.setGroupingComparatorClass()设置的分组函数类。只要这个比较器比较的两个key相同,他们就属于同一个组,它们的value放在一个value迭代器,而这个迭代器的key使用属于同一个组的所有key的第一个key。
    3.之后,执行Reducer的reduce()函数,该函数的输入是所有在同一个分组内的(key,value)
思路
    这样如果要进行二次排序,可以这样设计:以前的键值对是<key,value>,新的键是键(key value)的组合,键值对是<键(key value),value>。
    setGroupingComparatorClass()根据键(key value)的key进行比较,这样就可以将具有相同key的<键(key value),value>分在一个reduce里面进行操作。键(key value)的compareTo()首先进行key的比较,再进行value的比较。完成之后reduce输入的<键(key value),valueList>中valueList就是根据value进行排序的。

示例代码

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.examples;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;

/**
 * This is an example Hadoop Map/Reduce application.
 * It reads the text input files that must contain two integers per a line.
 * The output is sorted by the first and second number and grouped on the 
 * first number.
 *
 * To run: bin/hadoop jar build/hadoop-examples.jar secondarysort
 *            <i>in-dir</i> <i>out-dir</i> 
 */
public class SecondarySort {
 
  /**
   * Define a pair of integers that are writable.
   * They are serialized in a byte comparable format.
   */
  public static class IntPair 
                      implements WritableComparable<IntPair> {
    private int first = 0;
    private int second = 0;
    
    /**
     * Set the left and right values.
     */
    public void set(int left, int right) {
      first = left;
      second = right;
    }
    public int getFirst() {
      return first;
    }
    public int getSecond() {
      return second;
    }
    /**
     * Read the two integers. 
     * Encoded as: MIN_VALUE -> 0, 0 -> -MIN_VALUE, MAX_VALUE-> -1
     */
    @Override
    public void readFields(DataInput in) throws IOException {
      first = in.readInt() + Integer.MIN_VALUE;
      second = in.readInt() + Integer.MIN_VALUE;
    }
    @Override
    public void write(DataOutput out) throws IOException {
      out.writeInt(first - Integer.MIN_VALUE);
      out.writeInt(second - Integer.MIN_VALUE);
    }
    @Override
    public int hashCode() {
      return first * 157 + second;
    }
    @Override
    public boolean equals(Object right) {
      if (right instanceof IntPair) {
        IntPair r = (IntPair) right;
        return r.first == first && r.second == second;
      } else {
        return false;
      }
    }
    /** A Comparator that compares serialized IntPair. */ 
    public static class Comparator extends WritableComparator {
      public Comparator() {
        super(IntPair.class);
      }

      public int compare(byte[] b1, int s1, int l1,
                         byte[] b2, int s2, int l2) {
        return compareBytes(b1, s1, l1, b2, s2, l2);
      }
    }

    static {                                        // register this comparator
      WritableComparator.define(IntPair.class, new Comparator());
    }

    @Override
    public int compareTo(IntPair o) {
      if (first != o.first) {
        return first < o.first ? -1 : 1;
      } else if (second != o.second) {
        return second < o.second ? -1 : 1;
      } else {
        return 0;
      }
    }
  }
  
  /**
   * Partition based on the first part of the pair.
   */
  public static class FirstPartitioner extends Partitioner<IntPair,IntWritable>{
    @Override
    public int getPartition(IntPair key, IntWritable value, 
                            int numPartitions) {
      return Math.abs(key.getFirst() * 127) % numPartitions;
    }
  }

  /**
   * Compare only the first part of the pair, so that reduce is called once
   * for each value of the first part.
   */
  public static class FirstGroupingComparator 
                implements RawComparator<IntPair> {
    @Override
    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
      return WritableComparator.compareBytes(b1, s1, Integer.SIZE/8, 
                                             b2, s2, Integer.SIZE/8);
    }

    @Override
    public int compare(IntPair o1, IntPair o2) {
      int l = o1.getFirst();
      int r = o2.getFirst();
      return l == r ? 0 : (l < r ? -1 : 1);
    }
  }

  /**
   * Read two integers from each line and generate a key, value pair
   * as ((left, right), right).
   */
  public static class MapClass 
         extends Mapper<LongWritable, Text, IntPair, IntWritable> {
    
    private final IntPair key = new IntPair();
    private final IntWritable value = new IntWritable();
    
    @Override
    public void map(LongWritable inKey, Text inValue, 
                    Context context) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(inValue.toString());
      int left = 0;
      int right = 0;
      if (itr.hasMoreTokens()) {
        left = Integer.parseInt(itr.nextToken());
        if (itr.hasMoreTokens()) {
          right = Integer.parseInt(itr.nextToken());
        }
        key.set(left, right);
        value.set(right);
        context.write(key, value);
      }
    }
  }
   /**
   * A reducer class that just emits the sum of the input values.
   */
  public static class Reduce 
         extends Reducer<IntPair, IntWritable, Text, IntWritable> {
    private static final Text SEPARATOR = 
      new Text("------------------------------------------------");
    private final Text first = new Text();
    
    @Override
    public void reduce(IntPair key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      context.write(SEPARATOR, null);
      first.set(Integer.toString(key.getFirst()));
      for(IntWritable value: values) {
        context.write(first, value);
      }
    }
  }
  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: secondarysort <in> <out>");
      System.exit(2);
    }
    Job job = new Job(conf, "secondary sort");
    job.setJarByClass(SecondarySort.class);
    job.setMapperClass(MapClass.class);
    job.setReducerClass(Reduce.class);

    // group and partition by the first int in the pair
    job.setPartitionerClass(FirstPartitioner.class);
    job.setGroupingComparatorClass(FirstGroupingComparator.class);

    // the map output is IntPair, IntWritable
    job.setMapOutputKeyClass(IntPair.class);
    job.setMapOutputValueClass(IntWritable.class);

    // the reduce output is Text, IntWritable
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }

}


作者:WuyouJie2008 发表于2014-4-14 11:58:03 原文链接
阅读:18 评论:0 查看评论

相关 [mapreduce 排序] 推荐:

MapReduce之二次排序

- - CSDN博客云计算推荐文章
     面试官问了一个MapReduce问题:“如何用MapReduce实现两个表的连接”.      我说“用两个job实现,第一个. 于是这两天回来看了下MapReduce的二次排序.      在某些情况下,需要对reduce中的value进行排序. 二次排序,可以将根据key聚合起来的valueList根据value进行排序.

mapreduce倒排序索引

- - CSDN博客云计算推荐文章
  private Text word_filepath = new Text();//文件路径.   private  Text one  = new Text("1");//个数.      job.setCombinerClass(IndexCombiner.class);//优化. 上面这个代码可以处理单机节点的,加入是多台机器执行mapper函数,那么就会出现问题.

【原】MapReduce的排序和二次排序

- - ITeye博客
自己学习排序和二次排序的知识整理如下. 1.Hadoop的序列化格式介绍:Writable. 2.Hadoop的key排序逻辑. 4.如何自定义自己的Writable类型. 1.Hadoop的序列化格式介绍:Writable. 要了解和编写MR实现排序必须要知道的第一个知识点就是Writable相关的接口和类,这些是HADOOP自己的序列化格式.

利用hadoop mapreduce 做数据排序

- - zzm
我们的需求是想统计一个文件中用IK分词后每个词出现的次数,然后按照出现的次数降序排列. 由于hadoop在reduce之后就不能对结果做什么了,所以只能分为两个job完成,第一个job统计次数,第二个job对第一个job的结果排序. 第一个job的就是hadoop最简单的例子countwords,我要说的是用hadoop对结果排序.

Spark 颠覆 MapReduce 保持的排序记录

- - 开源中国社区最新新闻
在过去几年,Apache Spark的采用以惊人的速度增加着,通常被作为MapReduce后继,可以支撑数千节点规模的集群部署. 在内存中数 据处理上,Apache Spark比MapReduce更加高效已经得到广泛认识;但是当数据量远超内存容量时,我们也听到了一些机构在Spark使用 上的困扰. 因此,我们与Spark社区一起,投入了大量的精力做Spark稳定性、扩展性、性能等方面的提升.

Mapreduce小结

- MAGI-CASPER/Peter Pan - 博客园-唯有前进值得敬仰
读完mapreduce论文小结一下. 1.MapReduce是一个编程模型,封装了并行计算、容错、数据分布、负载均衡等细节问题. 输入是一个key-value对的集合,中间输出也是key-value对的集合,用户使用两个函数:Map和Reduce. Map函数接受一个输入的key-value对,然后产生一个中间key-value 对的集合.

Hadoop MapReduce技巧

- - 简单文本
我在使用Hadoop编写MapReduce程序时,遇到了一些问题,通过在Google上查询资料,并结合自己对Hadoop的理解,逐一解决了这些问题. Hadoop对MapReduce中Key与Value的类型是有要求的,简单说来,这些类型必须支持Hadoop的序列化. 为了提高序列化的性能,Hadoop还为Java中常见的基本类型提供了相应地支持序列化的类型,如IntWritable,LongWritable,并为String类型提供了Text类型.

MapReduce原理

- - C++博客-牵着老婆满街逛
       MapReduce 是由Google公司的Jeffrey Dean 和 Sanjay Ghemawat 开发的一个针对大规模群组中的海量数据处理的分布式编程模型. MapReduce实现了两个功能. Map把一个函数应用于集合中的所有成员,然后返回一个基于这个处理的结果集. 而Reduce是把从两个或更多个Map中,通过多个线程,进程或者独立系统并行执行处理的结果集进行分类和归纳.

MapReduce优化

- - 行业应用 - ITeye博客
相信每个程序员在 编程时都会问自己两个问题“我如何完成这个任务”,以及“怎么能让程序运行得更快”. 同样,MapReduce计算模型的多次优化也是为了更好地解答这两个问题. MapReduce计算模型的优化涉及了方方面面的内容,但是主要集中在两个方面:一是计算性能方面的优化;二是I/O操作方面的优化.

Spark与Mapreduce?

- - 崔永键的博客
我本人是类似Hive平台的系统工程师,我对MapReduce的熟悉程度是一般,它是我的底层框架. 我隔壁组在实验Spark,想将一部分计算迁移到Spark上. 年初的时候,看Spark的评价,几乎一致表示,Spark是小数据集上处理复杂迭代的交互系统,并不擅长大数据集,也没有稳定性. 但是最近的风评已经变化,尤其是14年10月他们完成了Peta sort的实验,这标志着Spark越来越接近替代Hadoop MapReduce了.