<< MapReduce:详解Shuffle过程 - 每天一小步 - ITeye技术网站 | 首页 | HBase Coprocessor 剖析与编程实践 - 林场 - 博客园 >>

如何使用Hadoop的Partitioner - 三劫散仙 - ITeye技术网站

Partitioner的作用: 
对map端输出的数据key作一个散列,使数据能够均匀分布在各个reduce上进行后续操作,避免产生热点区。 
Hadoop默认使用的分区函数是Hash Partitioner,源码如下: 

Java代码  收藏代码
  1. /** 
  2.  * Licensed to the Apache Software Foundation (ASF) under one 
  3.  * or more contributor license agreements.  See the NOTICE file 
  4.  * distributed with this work for additional information 
  5.  * regarding copyright ownership.  The ASF licenses this file 
  6.  * to you under the Apache License, Version 2.0 (the 
  7.  * "License"); you may not use this file except in compliance 
  8.  * with the License.  You may obtain a copy of the License at 
  9.  * 
  10.  *     http://www.apache.org/licenses/LICENSE-2.0 
  11.  * 
  12.  * Unless required by applicable law or agreed to in writing, software 
  13.  * distributed under the License is distributed on an "AS IS" BASIS, 
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
  15.  * See the License for the specific language governing permissions and 
  16.  * limitations under the License. 
  17.  */  
  18.   
  19. package org.apache.hadoop.mapreduce.lib.partition;  
  20.   
  21. import org.apache.hadoop.mapreduce.Partitioner;  
  22.   
  23. /** Partition keys by their {@link Object#hashCode()}. */  
  24. public class HashPartitioner<K, V> extends Partitioner<K, V> {  
  25.   
  26.   /** Use {@link Object#hashCode()} to partition. */  
  27.   public int getPartition(K key, V value,  
  28.                           int numReduceTasks) {  
  29.       //默认使用key的hash值与上int的最大值,避免出现数据溢出 的情况  
  30.     return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;  
  31.   }  
  32.   
  33. }  


大部分情况下,我们都会使用默认的分区函数,但有时我们又有一些,特殊的需求,而需要定制Partition来完成我们的业务,案例如下: 
对如下数据,按字符串的长度分区,长度为1的放在一个,2的一个,3的各一个。 

Java代码  收藏代码
  1. 河南省;1  
  2. 河南;2  
  3. 中国;3  
  4. 中国人;4  
  5. 大;1  
  6. 小;3  
  7. 中;11  


这时候,我们使用默认的分区函数,就不行了,所以需要我们定制自己的Partition,首先分析下,我们需要3个分区输出,所以在设置reduce的个数时,一定要设置为3,其次在partition里,进行分区时,要根据长度具体分区,而不是根据字符串的hash码来分区。核心代码如下: 

Java代码  收藏代码
  1. /** 
  2.  * Partitioner 
  3.  *  
  4.  *  
  5.  * */  
  6.  public static class PPartition extends Partitioner<Text, Text>{   
  7.     @Override  
  8.     public int getPartition(Text arg0, Text arg1, int arg2) {  
  9.          /** 
  10.           * 自定义分区,实现长度不同的字符串,分到不同的reduce里面 
  11.           *  
  12.           * 现在只有3个长度的字符串,所以可以把reduce的个数设置为3 
  13.           * 有几个分区,就设置为几 
  14.           * */  
  15.           
  16.         String key=arg0.toString();  
  17.         if(key.length()==1){  
  18.             return 1%arg2;  
  19.         }else if(key.length()==2){  
  20.             return 2%arg2;  
  21.         }else if(key.length()==3){  
  22.             return 3%arg2;  
  23.         }  
  24.           
  25.             
  26.           
  27.         return  0;  
  28.     }  
  29.        
  30.        
  31.        
  32.        
  33.  }  



全部代码如下: 

Java代码  收藏代码
  1. package com.partition.test;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. import org.apache.hadoop.fs.FileSystem;  
  6. import org.apache.hadoop.fs.Path;  
  7. import org.apache.hadoop.io.LongWritable;  
  8. import org.apache.hadoop.io.Text;  
  9. import org.apache.hadoop.mapred.JobConf;  
  10. import org.apache.hadoop.mapreduce.Job;  
  11. import org.apache.hadoop.mapreduce.Mapper;  
  12. import org.apache.hadoop.mapreduce.Partitioner;  
  13. import org.apache.hadoop.mapreduce.Reducer;  
  14. import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;  
  15. import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;  
  16. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  17. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  18. import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;  
  19. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  
  20.   
  21. import com.qin.operadb.PersonRecoder;  
  22. import com.qin.operadb.ReadMapDB;  
  23.    
  24.   
  25. /** 
  26.  * @author qindongliang 
  27.  *  
  28.  * 大数据交流群:376932160 
  29.  *  
  30.  *  
  31.  * **/  
  32. public class MyTestPartition {  
  33.       
  34.     /** 
  35.      * map任务 
  36.      *  
  37.      * */  
  38.     public static class PMapper extends Mapper<LongWritable, Text, Text, Text>{  
  39.               
  40.         @Override  
  41.         protected void map(LongWritable key, Text value,Context context)  
  42.                 throws IOException, InterruptedException {  
  43.             // System.out.println("进map了");  
  44.             //mos.write(namedOutput, key, value);  
  45.             String ss[]=value.toString().split(";");  
  46.               
  47.             context.write(new Text(ss[0]), new Text(ss[1]));  
  48.               
  49.               
  50.               
  51.         }  
  52.           
  53.           
  54.     }  
  55.       
  56.     /** 
  57.      * Partitioner 
  58.      *  
  59.      *  
  60.      * */  
  61.      public static class PPartition extends Partitioner<Text, Text>{   
  62.         @Override  
  63.         public int getPartition(Text arg0, Text arg1, int arg2) {  
  64.              /** 
  65.               * 自定义分区,实现长度不同的字符串,分到不同的reduce里面 
  66.               *  
  67.               * 现在只有3个长度的字符串,所以可以把reduce的个数设置为3 
  68.               * 有几个分区,就设置为几 
  69.               * */  
  70.               
  71.             String key=arg0.toString();  
  72.             if(key.length()==1){  
  73.                 return 1%arg2;  
  74.             }else if(key.length()==2){  
  75.                 return 2%arg2;  
  76.             }else if(key.length()==3){  
  77.                 return 3%arg2;  
  78.             }  
  79.               
  80.                 
  81.               
  82.             return  0;  
  83.         }  
  84.            
  85.            
  86.            
  87.            
  88.      }  
  89.        
  90.    
  91.      /*** 
  92.       * Reduce任务 
  93.       *  
  94.       * **/  
  95.      public static class PReduce extends Reducer<Text, Text, Text, Text>{  
  96.          @Override  
  97.         protected void reduce(Text arg0, Iterable<Text> arg1, Context arg2)  
  98.                 throws IOException, InterruptedException {  
  99.                
  100.               String key=arg0.toString().split(",")[0];  
  101.              System.out.println("key==> "+key);  
  102.              for(Text t:arg1){  
  103.                  //System.out.println("Reduce:  "+arg0.toString()+"   "+t.toString());  
  104.                  arg2.write(arg0, t);  
  105.              }  
  106.                  
  107.                
  108.         }  
  109.            
  110.        
  111.            
  112.      }  
  113.        
  114.        
  115.      public static void main(String[] args) throws Exception{  
  116.          JobConf conf=new JobConf(ReadMapDB.class);  
  117.          //Configuration conf=new Configuration();  
  118.          conf.set("mapred.job.tracker","192.168.75.130:9001");  
  119.         //读取person中的数据字段  
  120.          conf.setJar("tt.jar");  
  121.         //注意这行代码放在最前面,进行初始化,否则会报  
  122.        
  123.        
  124.         /**Job任务**/  
  125.         Job job=new Job(conf, "testpartion");  
  126.         job.setJarByClass(MyTestPartition.class);  
  127.         System.out.println("模式:  "+conf.get("mapred.job.tracker"));;  
  128.         // job.setCombinerClass(PCombine.class);  
  129.          job.setPartitionerClass(PPartition.class);  
  130.            
  131.          job.setNumReduceTasks(3);//设置为3  
  132.          job.setMapperClass(PMapper.class);  
  133.         // MultipleOutputs.addNamedOutput(job, "hebei", TextOutputFormat.class, Text.class, Text.class);  
  134.         // MultipleOutputs.addNamedOutput(job, "henan", TextOutputFormat.class, Text.class, Text.class);  
  135.          job.setReducerClass(PReduce.class);  
  136.          job.setOutputKeyClass(Text.class);  
  137.          job.setOutputValueClass(Text.class);  
  138.           
  139.         String path="hdfs://192.168.75.130:9000/root/outputdb";  
  140.         FileSystem fs=FileSystem.get(conf);  
  141.         Path p=new Path(path);  
  142.         if(fs.exists(p)){  
  143.             fs.delete(p, true);  
  144.             System.out.println("输出路径存在,已删除!");  
  145.         }  
  146.         FileInputFormat.setInputPaths(job, "hdfs://192.168.75.130:9000/root/input");  
  147.         FileOutputFormat.setOutputPath(job,p );  
  148.         System.exit(job.waitForCompletion(true) ? 0 : 1);    
  149.            
  150.            
  151.     }  
  152.       
  153.       
  154.   
  155. }  
  156.  

阅读全文……




发表评论 发送引用通报