hadoop 二次排序

标签: hadoop 排序 | 发表时间:2014-03-09 23:06 | 作者:blackproof
出处:http://www.iteye.com

 

hadoop的工作流程:

http://blackproof.iteye.com/blog/2028640

 

hadoop 二次排序

是在key中,排序value的实现,思路是

1.把value中需要有序的部分value-part放入key中

2.sortCompare类或key的CompareTo方法中完成对key+value-part的比较

3.GroupingCompare中只对key进行比较,这样相同的key跌倒获取到reduce中

 

 

转: http://blog.csdn.net/heyutao007/article/details/5890103

mr自带的例子中的源码SecondarySort,我重新写了一下,基本没变。

这个例子中定义的map和reduce如下,关键是它对输入输出类型的定义:(java泛型编程) 

public static class Map extends Mapper<LongWritable, Text, IntPair, IntWritable> 
public static class Reduce extends Reducer<IntPair, NullWritable, IntWritable, IntWritable> 

1 首先说一下工作原理:

在map阶段,使用job.setInputFormatClass定义的InputFormat将输入的数据集分割成小数据块splites,同时InputFormat提供一个RecordReder的实现。本例子中使用的是TextInputFormat,他提供的RecordReder会将文本的一行的行号作为key,这一行的文本作为value。这就是自定义Map的输入是<LongWritable, Text>的原因。然后调用自定义Map的map方法,将一个个<LongWritable, Text>对输入给Map的map方法。注意输出应该符合自定义Map中定义的输出<IntPair, IntWritable>。最终是生成一个List<IntPair, IntWritable>。在map阶段的最后,会先调用job.setPartitionerClass对这个List进行分区,每个分区映射到一个reducer。每个分区内又调用job.setSortComparatorClass设置的key比较函数类排序。可以看到,这本身就是一个二次排序。如果没有通过job.setSortComparatorClass设置key比较函数类,则使用key的实现的compareTo方法。在第一个例子中,使用了IntPair实现的compareTo方法,而在下一个例子中,专门定义了key比较函数类。
在reduce阶段,reducer接收到所有映射到这个reducer的map输出后,也是会调用job.setSortComparatorClass设置的key比较函数类对所有数据对排序。然后开始构造一个key对应的value迭代器。这时就要用到分组,使用jobjob.setGroupingComparatorClass设置的分组函数类。只要这个比较器比较的两个key相同,他们就属于同一个组,它们的value放在一个value迭代器,而这个迭代器的key使用属于同一个组的所有key的第一个key。最后就是进入Reducer的reduce方法,reduce方法的输入是所有的(key和它的value迭代器)。同样注意输入与输出的类型必须与自定义的Reducer中声明的一致。 

2  二次排序就是首先按照第一字段排序,然后再对第一字段相同的行按照第二字段排序,注意不能破坏第一次排序的结果 。例如

输入文件
20 21
50 51
50 52
50 53
50 54
60 51
60 53
60 52
60 56
60 57
70 58
60 61
70 54
70 55
70 56
70 57
70 58
1 2
3 4
5 6
7 82
203 21
50 512
50 522
50 53
530 54
40 511
20 53
20 522
60 56
60 57
740 58
63 61
730 54
71 55
71 56
73 57
74 58
12 211
31 42
50 62
7 8
输出:(注意需要分割线)
------------------------------------------------
1       2
------------------------------------------------
3       4
------------------------------------------------
5       6
------------------------------------------------
7       8
7       82
------------------------------------------------
12      211
------------------------------------------------
20      21
20      53
20      522
------------------------------------------------
31      42
------------------------------------------------
40      511
------------------------------------------------
50      51
50      52
50      53
50      53
50      54
50      62
50      512
50      522
------------------------------------------------
60      51
60      52
60      53
60      56
60      56
60      57
60      57
60      61
------------------------------------------------
63      61
------------------------------------------------
70      54
70      55
70      56
70      57
70      58
70      58
------------------------------------------------
71      55
71      56
------------------------------------------------
73      57
------------------------------------------------
74      58
------------------------------------------------
203     21
------------------------------------------------
530     54
------------------------------------------------
730     54
------------------------------------------------
740     58 

3  具体步骤:
(1)自定义key

在mr中,所有的key是需要被比较和排序的,并且是二次,先根据partitione,再根据大小。而本例中也是要比较两次。先按照第一字段排序,然后再对第一字段相同的按照第二字段排序。根据这一点,我们可以构造一个复合类IntPair,他有两个字段,先利用分区对第一字段排序,再利用分区内的比较对第二字段排序。
所有自定义的key应该实现接口WritableComparable,因为是可序列的并且可比较的。并重载方法:

[cpp]  view plain copy
 
  1. //反序列化,从流中的二进制转换成IntPair  
  2. public void readFields(DataInput in) throws IOException          
  3. //序列化,将IntPair转化成使用流传送的二进制  
  4. public void write(DataOutput out)  
  5. //key的比较  
  6. public int compareTo(IntPair o)          
  7. //另外新定义的类应该重写的两个方法  
  8. //The hashCode() method is used by the HashPartitioner (the default partitioner in MapReduce)  
  9. public int hashCode()   
  10. public boolean equals(Object right)  

(2)由于key是自定义的,所以还需要自定义一下类:
(2.1)分区函数类。这是key的第一次比较。

[cpp]  view plain copy
 
  1. public static class FirstPartitioner extends Partitioner<IntPair,IntWritable>  

在job中使用setPartitionerClasss设置Partitioner。
(2.2)key比较函数类。这是key的第二次比较。这是一个比较器,需要继承WritableComparator。

[cpp]  view plain copy
 
  1. public static class KeyComparator extends WritableComparator  

必须有一个构造函数,并且重载 public int compare(WritableComparable w1, WritableComparable w2)

另一种方法是 实现接口RawComparator。
在job中使用setSortComparatorClass设置key比较函数类。
(2.3)分组函数类。在reduce阶段,构造一个key对应的value迭代器的时候,只要first相同就属于同一个组,放在一个value迭代器。这是一个比较器,需要继承WritableComparator。

[cpp]  view plain copy
 
  1. public static class GroupingComparator extends WritableComparator  

分组函数类也必须有一个构造函数,并且重载 public int compare(WritableComparable w1, WritableComparable w2)
分组函数类的另一种方法是实现接口RawComparator。
在job中使用setGroupingComparatorClass设置分组函数类。

另外注意的是,如果reduce的输入与输出不是同一种类型,则不要定义Combiner也使用reduce,因为Combiner的输出是reduce的输入。除非重新定义一个Combiner。 

3 代码。

这个例子中没有使用key比较函数类,而是使用key的实现的compareTo方法。 

 

[java]  view plain copy
 
  1. package secondarySort;  
  2. import java.io.DataInput;  
  3. import java.io.DataOutput;  
  4. import java.io.IOException;  
  5. import java.util.StringTokenizer;  
  6. import org.apache.hadoop.conf.Configuration;  
  7. import org.apache.hadoop.fs.Path;  
  8. import org.apache.hadoop.io.IntWritable;  
  9. import org.apache.hadoop.io.LongWritable;  
  10. import org.apache.hadoop.io.Text;  
  11. import org.apache.hadoop.io.WritableComparable;  
  12. import org.apache.hadoop.io.WritableComparator;  
  13. import org.apache.hadoop.mapreduce.Job;  
  14. import org.apache.hadoop.mapreduce.Mapper;  
  15. import org.apache.hadoop.mapreduce.Partitioner;  
  16. import org.apache.hadoop.mapreduce.Reducer;  
  17. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  18. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
  19. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  20. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  
  21.   
  22. public class SecondarySort {  
  23.     //自己定义的key类应该实现WritableComparable接口  
  24.     public static class IntPair implements WritableComparable<IntPair> {  
  25.         int first;  
  26.         int second;  
  27.         /** 
  28.          * Set the left and right values. 
  29.          */  
  30.         public void set(int left, int right) {  
  31.             first = left;  
  32.             second = right;  
  33.         }  
  34.         public int getFirst() {  
  35.             return first;  
  36.         }  
  37.         public int getSecond() {  
  38.             return second;  
  39.         }  
  40.         @Override  
  41.         //反序列化,从流中的二进制转换成IntPair  
  42.         public void readFields(DataInput in) throws IOException {  
  43.             // TODO Auto-generated method stub  
  44.             first = in.readInt();  
  45.             second = in.readInt();  
  46.         }  
  47.         @Override  
  48.         //序列化,将IntPair转化成使用流传送的二进制  
  49.         public void write(DataOutput out) throws IOException {  
  50.             // TODO Auto-generated method stub  
  51.             out.writeInt(first);  
  52.             out.writeInt(second);  
  53.         }  
  54.         @Override  
  55.         //key的比较  
  56.         public int compareTo(IntPair o) {  
  57.             // TODO Auto-generated method stub  
  58.             if (first != o.first) {  
  59.                 return first < o.first ? -1 : 1;  
  60.             } else if (second != o.second) {  
  61.                 return second < o.second ? -1 : 1;  
  62.             } else {  
  63.                 return 0;  
  64.             }  
  65.         }  
  66.           
  67.         //新定义类应该重写的两个方法  
  68.         @Override  
  69.         //The hashCode() method is used by the HashPartitioner (the default partitioner in MapReduce)  
  70.         public int hashCode() {  
  71.             return first * 157 + second;  
  72.         }  
  73.         @Override  
  74.         public boolean equals(Object right) {  
  75.             if (right == null)  
  76.                 return false;  
  77.             if (this == right)  
  78.                 return true;  
  79.             if (right instanceof IntPair) {  
  80.                 IntPair r = (IntPair) right;  
  81.                 return r.first == first && r.second == second;  
  82.             } else {  
  83.                 return false;  
  84.             }  
  85.         }  
  86.     }  
  87.      /** 
  88.        * 分区函数类。根据first确定Partition。 
  89.        */  
  90.       public static class FirstPartitioner extends Partitioner<IntPair,IntWritable>{  
  91.         @Override  
  92.         public int getPartition(IntPair key, IntWritable value,   
  93.                                 int numPartitions) {  
  94.           return Math.abs(key.getFirst() * 127) % numPartitions;  
  95.         }  
  96.       }  
  97.         
  98.       /** 
  99.        * 分组函数类。只要first相同就属于同一个组。 
  100.        */  
  101.     /*//第一种方法,实现接口RawComparator 
  102.     public static class GroupingComparator implements RawComparator<IntPair> { 
  103.         @Override 
  104.         public int compare(IntPair o1, IntPair o2) { 
  105.             int l = o1.getFirst(); 
  106.             int r = o2.getFirst(); 
  107.             return l == r ? 0 : (l < r ? -1 : 1); 
  108.         } 
  109.         @Override 
  110.         //一个字节一个字节的比,直到找到一个不相同的字节,然后比这个字节的大小作为两个字节流的大小比较结果。 
  111.         public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2){ 
  112.             // TODO Auto-generated method stub 
  113.              return WritableComparator.compareBytes(b1, s1, Integer.SIZE/8,  
  114.                      b2, s2, Integer.SIZE/8); 
  115.         } 
  116.     }*/  
  117.     //第二种方法,继承WritableComparator  
  118.     public static class GroupingComparator extends WritableComparator {  
  119.           protected GroupingComparator() {  
  120.             super(IntPair.class, true);  
  121.           }  
  122.           @Override  
  123.           //Compare two WritableComparables.  
  124.           public int compare(WritableComparable w1, WritableComparable w2) {  
  125.             IntPair ip1 = (IntPair) w1;  
  126.             IntPair ip2 = (IntPair) w2;  
  127.             int l = ip1.getFirst();  
  128.             int r = ip2.getFirst();  
  129.             return l == r ? 0 : (l < r ? -1 : 1);  
  130.           }  
  131.         }  
  132.       
  133.           
  134.     // 自定义map  
  135.     public static class Map extends  
  136.             Mapper<LongWritable, Text, IntPair, IntWritable> {  
  137.         private final IntPair intkey = new IntPair();  
  138.         private final IntWritable intvalue = new IntWritable();  
  139.         public void map(LongWritable key, Text value, Context context)  
  140.                 throws IOException, InterruptedException {  
  141.             String line = value.toString();  
  142.             StringTokenizer tokenizer = new StringTokenizer(line);  
  143.             int left = 0;  
  144.             int right = 0;  
  145.             if (tokenizer.hasMoreTokens()) {  
  146.                 left = Integer.parseInt(tokenizer.nextToken());  
  147.                 if (tokenizer.hasMoreTokens())  
  148.                     right = Integer.parseInt(tokenizer.nextToken());  
  149.                 intkey.set(left, right);  
  150.                 intvalue.set(right);  
  151.                 context.write(intkey, intvalue);  
  152.             }  
  153.         }  
  154.     }  
  155.     // 自定义reduce  
  156.     //  
  157.     public static class Reduce extends  
  158.             Reducer<IntPair, IntWritable, Text, IntWritable> {  
  159.         private final Text left = new Text();  
  160.         private static final Text SEPARATOR =   
  161.               new Text("------------------------------------------------");  
  162.         public void reduce(IntPair key, Iterable<IntWritable> values,  
  163.                 Context context) throws IOException, InterruptedException {  
  164.             context.write(SEPARATOR, null);  
  165.             left.set(Integer.toString(key.getFirst()));  
  166.             for (IntWritable val : values) {  
  167.                 context.write(left, val);  
  168.             }  
  169.         }  
  170.     }  
  171.     /** 
  172.      * @param args 
  173.      */  
  174.     public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {  
  175.         // TODO Auto-generated method stub  
  176.         // 读取hadoop配置  
  177.         Configuration conf = new Configuration();  
  178.         // 实例化一道作业  
  179.         Job job = new Job(conf, "secondarysort");  
  180.         job.setJarByClass(SecondarySort.class);  
  181.         // Mapper类型  
  182.         job.setMapperClass(Map.class);  
  183.         // 不再需要Combiner类型,因为Combiner的输出类型<Text, IntWritable>对Reduce的输入类型<IntPair, IntWritable>不适用  
  184.         //job.setCombinerClass(Reduce.class);  
  185.         // Reducer类型  
  186.         job.setReducerClass(Reduce.class);  
  187.         // 分区函数  
  188.         job.setPartitionerClass(FirstPartitioner.class);  
  189.         // 分组函数  
  190.         job.setGroupingComparatorClass(GroupingComparator.class);  
  191.           
  192.         // map 输出Key的类型  
  193.         job.setMapOutputKeyClass(IntPair.class);  
  194.         // map输出Value的类型  
  195.         job.setMapOutputValueClass(IntWritable.class);  
  196.         // rduce输出Key的类型,是Text,因为使用的OutputFormatClass是TextOutputFormat  
  197.         job.setOutputKeyClass(Text.class);  
  198.         // rduce输出Value的类型  
  199.         job.setOutputValueClass(IntWritable.class);  
  200.           
  201.         // 将输入的数据集分割成小数据块splites,同时提供一个RecordReder的实现。  
  202.         job.setInputFormatClass(TextInputFormat.class);  
  203.         // 提供一个RecordWriter的实现,负责数据输出。  
  204.         job.setOutputFormatClass(TextOutputFormat.class);  
  205.           
  206.         // 输入hdfs路径  
  207.         FileInputFormat.setInputPaths(job, new Path(args[0]));  
  208.         // 输出hdfs路径  
  209.         FileOutputFormat.setOutputPath(job, new Path(args[1]));  
  210.         // 提交job  
  211.         System.exit(job.waitForCompletion(true) ? 0 : 1);  
  212.     }  
  213. }  

 

 



已有 0 人发表留言,猛击->> 这里<<-参与讨论


ITeye推荐



相关 [hadoop 排序] 推荐:

hadoop 二次排序

- - 企业架构 - ITeye博客
hadoop的工作流程:. 是在key中,排序value的实现,思路是. 1.把value中需要有序的部分value-part放入key中. 2.sortCompare类或key的CompareTo方法中完成对key+value-part的比较. 3.GroupingCompare中只对key进行比较,这样相同的key跌倒获取到reduce中.

Hadoop中的各种排序

- - 互联网 - ITeye博客
1:shuffle阶段的排序(部分排序). shuffle阶段的排序可以理解成两部分,一个是对spill进行分区时,由于一个分区包含多个key值,所以要对分区内的按照key进行排序,即key值相同的一串存放在一起,这样一个partition内按照key值整体有序了.

hadoop复合键排序使用方法

- - CSDN博客云计算推荐文章
在hadoop中处理复杂业务时,需要用到复合键,复合不同于单纯的继承Writable接口,而是继承了WritableComparable接口,而实际上,WritableComparable接口继承了Writable和Comparable接口,如果只需要使用某一个类作为传值对象而不是作为key,继承Writable接口即可.

利用hadoop mapreduce 做数据排序

- - zzm
我们的需求是想统计一个文件中用IK分词后每个词出现的次数,然后按照出现的次数降序排列. 由于hadoop在reduce之后就不能对结果做什么了,所以只能分为两个job完成,第一个job统计次数,第二个job对第一个job的结果排序. 第一个job的就是hadoop最简单的例子countwords,我要说的是用hadoop对结果排序.

Hadoop初体验――搭建hadoop简单实现文本数据全局排序

- - 学着站在巨人的肩膀上
      手头上有三台配置一样的电脑,就不去装虚拟机了,配置如下:.       三台电脑装有相同的操作系统――Ubuntu 11.04.       任选一台机器作为master,其他机器作为slaves,所有机器拥有相同的用户、相同的环境变量配置、相同的hadoop目录结构、相同的Java目录结构.

Hadoop中的排序器/组合器/合并器

- - 学着站在巨人的肩膀上
目前,海量数据处理主要存在二个问题:大规模计算(cpu+mem)、海量数据存储(disk),而Hadoop被专门设计用来针对海量数据的处理,它通过分布式文件系统解决海量数据的存储问题,组织成千上万个计算节点来共同完成一个任务解决了大规模计算问题. Hadoop的核心是MapReduce,而不是分布式文件系统HDFS,这是因为MapRduce所依赖的存储系统并不依赖于任何一个文件系统,甚至是分布式文件系统.

如何在Hadoop里面实现二次排序

- - 编程语言 - ITeye博客
在hadoop里面处理的数据,默认按输入内容的key进行排序的,大部分情况下,都可以满足的我们的业务需求,但有时候,可能出现类似以下的需求,输入内容:. 秦东亮;72 秦东亮;34 秦东亮;100 三劫;899 三劫;32 三劫;1 a;45 b;567 b;12. 注意上面的输出1,和输出2,其实都是一样的逻辑,只不过,输出的形式稍微改了下,那么今天散仙,就来分析下,怎么在hadoop里面,实现这样的需求.

如何使用hadoop对海量数据进行统计并排序

- - ITeye博客
hadoop在如下的几种应用场景里,用的还是非常广泛的,1,搜索引擎建索引,2,topK热关键词统计,3,海量日志的数据分析等等. 散仙,今天的这个例子的场景要对几亿的单词或短语做统计和并按词频排序,当然这些需求都是类似WordCount问题,如果你把Hadoop自带的WordCount的例子,给搞懂了,基本上做一些IP,热词的统计与分析就很很容易了,WordCount问题,确实是一个非常具有代表性的例子.

Hadoop二次排序关键点和出现时机(也叫辅助排序、Secondary Sort)

- - The Big Data Way,平凡但不乏味
    Hadoop二次排序在面试的时候出现频率还是比较高的. 今天花了点时间通过源码深入学习了一下. 后面内容以Hadoop自带实例——SecondarySort讲解.     它的作用是决定数据分区,说白了就是决定map输出key-value由哪个reduce处理,每个map task输出的key-value都会执行Partitioner的getPartition()方法,用于返回当前key-value由哪个reduce处理.

Hadoop Streaming 编程

- - 学着站在巨人的肩膀上
Hadoop Streaming是Hadoop提供的一个编程工具,它允许用户使用任何可执行文件或者脚本文件作为Mapper和Reducer,例如:. 采用shell脚本语言中的一些命令作为mapper和reducer(cat作为mapper,wc作为reducer). 本文安排如下,第二节介绍Hadoop Streaming的原理,第三节介绍Hadoop Streaming的使用方法,第四节介绍Hadoop Streaming的程序编写方法,在这一节中,用C++、C、shell脚本 和python实现了WordCount作业,第五节总结了常见的问题.