<<上篇 | 首页 | 下篇>>

MapReduce里面的二次排序、组排序和Partitioner - FacingTheSunCN的专栏 - 博客频道 - CSDN.NET

在MapReduce程序中,我们常常需要对属于同一个key的value进行排序,即“二次排序”,将key和value进行组合,合并成一个新的key,给map去排序。在Hadoop 1.0.4中,利用setSortComparatorClass()对二次排序进行设定,但是sort comparator需要自己实现一个comparator,下面是一个自己实现的comparator的例子。

 

[java] view plaincopy
 
  1. public static class SortComparator extends WritableComparator {  
  2.   
  3.     protected SortComparator() {  
  4.         super(Text.classtrue);  
  5.           
  6.         // TODO Auto-generated constructor stub  
  7.     }  
  8.   
  9.     @Override  
  10.     public int compare(WritableComparable a, WritableComparable b) {  
  11.         // TODO Auto-generated method stub  
  12.         String[] strs_a = ((Text) a).toString().split(":");  
  13.         String[] strs_b = ((Text) b).toString().split(":");  
  14.   
  15.         if ((strs_a.length != 3) || (strs_b.length != 3)) {  
  16.             log.error("Error: dimension error 1 in SortComparator!");  
  17.             System.exit(1);  
  18.         }  
  19.   
  20.         if (Integer.parseInt(strs_a[0]) > Integer.parseInt(strs_b[0])) {  
  21.             return 1;  
  22.         } else if (Integer.parseInt(strs_a[0]) < Integer  
  23.                 .parseInt(strs_b[0])) {  
  24.             return -1;  
  25.         } else {  
  26.             if (Double.parseDouble(strs_a[1]) > Double  
  27.                     .parseDouble(strs_b[1])) {  
  28.                 return 1;  
  29.             } else {  
  30.                 return -1;  
  31.             }  
  32.         }  
  33.     }  
  34. }  

 

然后,在job中设置

 

[java] view plaincopy
 
  1. job.setSortComparatorClass(SortComparator)  

 

由于我们使用了“二次排序”,因此现在的key是被合并过的key(上面说过,是将key与value合并成新的key),所以我们需要定义组比较器(grouping comparator),它的功能是在reducer中为我们需要的相同的key(即合并之前的key)送入到同一个reduce中(官方文档中的描述是“Define the comparator that controls which keys are grouped together for a single call to Reducer.reduce(Object, Iterable, org.apache.hadoop.mapreduce.Reducer.Context)”)。下面是一个grouping comparator的例子。

 

[java] view plaincopy
 
  1. public static class GroupComparator extends WritableComparator {  
  2.   
  3.     protected GroupComparator() {  
  4.         super(Text.classtrue);  
  5.         // TODO Auto-generated constructor stub  
  6.     }  
  7.   
  8.     @Override  
  9.     public int compare(WritableComparable a, WritableComparable b) {  
  10.         // TODO Auto-generated method stub  
  11.         String[] strs_a = ((Text) a).toString().split(":");  
  12.         String[] strs_b = ((Text) b).toString().split(":");  
  13.   
  14.         if ((strs_a.length != 3) || (strs_b.length != 3)) {  
  15.             log.error("Error: dimension error 1 in GroupComparator!");  
  16.             System.exit(1);  
  17.         }  
  18.   
  19.         String new_key_a = strs_a[0] + strs_a[2];  
  20.         String new_key_b = strs_b[0] + strs_b[2];  
  21.   
  22.         if (new_key_a.compareTo(new_key_b) == 0) {  
  23.             return 0;  
  24.         } else if (new_key_a.compareTo(new_key_b) > 0) {  
  25.             return 1;  
  26.         } else {  
  27.             return -1;  
  28.         }  
  29.   
  30.     }  
  31. }  

然后,在job中设置

 

 

[java] view plaincopy
 
  1. job.setGroupingComparatorClass(GroupComparator.class);  


此外,由于我们实际的key与我们所需要的key是不一样的,因此我们需要自己定义一个partitioner,以“欺骗”reducer,将我们所需的相同的key传到同一个reducer,下面是一个partitioner的例子。

 

 

[java] view plaincopy
 
  1. public static class Patitioner extends  
  2.         HashPartitioner<Text, IntWritable> {  
  3.     @Override  
  4.     public int getPartition(Text key, IntWritable value, int numReduceTasks) {  
  5.         // TODO Auto-generated method stub  
  6.         String[] new_key = key.toString().split(":");  
  7.         if (new_key.length != 3) {  
  8.             log.error("Error: dimension error in partitioner!");  
  9.             System.exit(1);  
  10.         }  
  11.         return super.getPartition(new Text(new_key[0]), value,  
  12.                 numReduceTasks);  
  13.     }  
  14. }  

然后,在job中设置

 

 

[java] view plaincopy
 
  1. job.setPartitionerClass(Patitioner.class);  



Partitioner和GroupingComparator有点饶人,功能好像重复了。

 

 

  1. Partitioner是将相同的key(用户虚拟的key)传到同一个reducer(到了reducer中,reducer只认map中实际输出的key,实际key中哪一部分作为key用一个单独的reduce来处理就是GroupingComparator的功能)
  2. GroupingComparator是让reducer用一个单独的reduce来处理同一个key
  3. Partitioner中的key和GroupingComparator中的key是可以不一样的(例如我的例子中)

阅读全文……

标签 : , ,

mapreduce编程(二)- 大象书中求每一年的最高温度 - - 博客频道 - CSDN.NET

1 通过设置了partitioner来进行分区。因为分区是按照年份来进行,所以同年的数据就可以分区到一个reducer中。

2 自定义key比较器,按照年份升序,温度值降序。这样map输出的所有kv对就是按照年份升序,温度值降序排列的。

3 自定义分组比较器,所有同一年的数据属于同一个组,那么在reduce输出的时候,只需要取第一个value就能达到输出一年最高气温的目的。 

阅读全文……

标签 : , ,

Hadoop 中的两表join | Alex的个人Blog

Common Join

最为普通的join策略,不受数据量的大小影响,也可以叫做reduce side join ,最没效率的一种join 方式. 它由一个mapreduce job 完成.

首先将大表和小表分别进行map 操作, 在map shuffle 的阶段每一个map output key 变成了table_name_tag_prefix + join_column_value , 但是在进行partition 的时候它仍然只使用join_column_value 进行hash.

每一个reduce 接受所有的map 传过来的split , 在reducce 的shuffle 阶段,它将map output key 前面的table_name_tag_prefix 给舍弃掉进行比较. 因为reduce 的个数可以由小表的大小进行决定,所以对于每一个节点的reduce 一定可以将小表的split 放入内存变成hashtable. 然后将大表的每一条记录进行一条一条的比较.

 

Map Join

Map Join 的计算步骤分两步,将小表的数据变成hashtable广播到所有的map 端,将大表的数据进行合理的切分,然后在map 阶段的时候用大表的数据一行一行的去探测(probe) 小表的hashtable. 如果join key 相等,就写入HDFS.

map join 之所以叫做map join 是因为它所有的工作都在map 端进行计算.

阅读全文……

标签 : ,