Partitioner分区过程分析

标签: partitioner 分区 分析 | 发表时间:2014-11-16 22:48 | 作者:Androidlushangderen
出处:http://blog.csdn.net

            Partition的中文意思就是分区,分片的意思,这个阶段也是整个MapReduce过程的第三个阶段,就在Map任务的后面,他的作用就是使key分到通过一定的分区算法,分到固定的区域中,给不同的Reduce做处理,达到负载均衡的目的。他的执行过程其实就是发生在上篇文章提到的collect的过程阶段,当输入的key调用了用户的map函数时,中间结果就会被分区了。虽说这个过程看似不是很重要,但是也有值得学习的地方。Hadoop默认的算法是HashPartitioner,就是根据key的hashcode取摸运算,很简单的。

/** Partition keys by their {@link Object#hashCode()}. 
 */
public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> {

  public void configure(JobConf job) {}

  /** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K2 key, V2 value,
                          int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }

}
但是这虽然能保证了key的随机分布,但不能保证全局有序的实现,因为有些需求需要不同的分区呈现出阶段性的分布,第一个区所有key小于第二区间,同样第二区间要小于第三区间,而你的随机算法只是局部有序,在区间内时有序的,但是存在第一区间的key会大于第二区间的,因此,这里出现了一个叫 TotalOrderPartitioner的类,这也是本次学习的重点。先看看关系Partition的相关类结构。


可见,TotalOrderPartitioner还是挺复杂的。

        TotalOrderPartitioner的作用就是保证全局有序,对于key的划分,他划分了几个key的抽样点,作为key的划分点,比【2,4,6,8】,4个key抽样点,把区间划成了5份,如果某个key的值为5,他的区间为4-6,所以在第三区间,也就是说,这个类的作用就是围绕给定的划分点,寻找他的区间号,就代表任务的完成,至于你中间用的是二分搜索,还是其他的什么算法,都由你说了算。

      好的,首先第一步,从配置文件中得到划分点,他其实是存在于一个叫partition.file的文件中,配置中只保留了路径,

public void configure(JobConf job) {
    try {
      //获得partition file
      String parts = getPartitionFile(job);
      final Path partFile = new Path(parts);
      final FileSystem fs = (DEFAULT_PATH.equals(parts))
        ? FileSystem.getLocal(job)     // assume in DistributedCache
        : partFile.getFileSystem(job);

      Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
      //从partition中读出Spilts分区点
      K[] splitPoints = readPartitions(fs, partFile, keyClass, job);
      ....
spiltPoints在后面会起着关键的作用。

       然后开始关键的操作了,如果你的key值类型不是BinaryComparable二进制比较类型的话,比如能直接比较值的数字类型,就直接用二分算法,创建二分搜索节点,传入自己的比较器实现:

....
      RawComparator<K> comparator =
        (RawComparator<K>) job.getOutputKeyComparator();
      for (int i = 0; i < splitPoints.length - 1; ++i) {
        if (comparator.compare(splitPoints[i], splitPoints[i+1]) >= 0) {
          throw new IOException("Split points are out of order");
        }
      }
      boolean natOrder =
        job.getBoolean("total.order.partitioner.natural.order", true);
      //判断是否为BinaryComparable类型,如果是,建立Trie树
      if (natOrder && BinaryComparable.class.isAssignableFrom(keyClass)) {
        partitions = buildTrie((BinaryComparable[])splitPoints, 0,
            splitPoints.length, new byte[0],
            job.getInt("total.order.partitioner.max.trie.depth", 2));
      } else {
    	//如果是不是则建立构建BinarySearchNode,用二分查找,用自己构建的比较器
        partitions = new BinarySearchNode(splitPoints, comparator);
      }
继续往里点,里面的获取分区号的算法,直接用的是二分搜索查找:
/**
   * For types that are not {@link org.apache.hadoop.io.BinaryComparable} or
   * where disabled by <tt>total.order.partitioner.natural.order</tt>,
   * search the partition keyset with a binary search.
   */
  class BinarySearchNode implements Node<K> {
	//比较的内容节点
    private final K[] splitPoints;
    //比较器
    private final RawComparator<K> comparator;
    BinarySearchNode(K[] splitPoints, RawComparator<K> comparator) {
      this.splitPoints = splitPoints;
      this.comparator = comparator;
    }
    
   /**
    * 通过自己传入的比较器方法进行二分查找
    */
    public int findPartition(K key) {
      final int pos = Arrays.binarySearch(splitPoints, key, comparator) + 1;
      return (pos < 0) ? -pos : pos;
    }
  }
       但是如果key的类型如果是BinaryComparable二进制比较类型的呢(你可以就理解为字符串类型),则要依赖TrieTree的创建了。里面分为2种节点,InnerTrieNode和LeafTrieNode,都继承了TrieNode , LeafTrieNode为叶子节点,最底层保存的是分区点刚刚说过的splitPoints。InnerTrieNode就是在叶子节点上面的节点。这个TrieTree的原理就是从上往下扫描节点,最后到叶子节点,返回分区号
。有种二分搜索树的感觉。每个inner节点保留255个字节点,代表着255个字符

/**
   * An inner trie node that contains 256 children based on the next
   * character.
   */
  class InnerTrieNode extends TrieNode {
    private TrieNode[] child = new TrieNode[256];

    InnerTrieNode(int level) {
      super(level);
    }
    ...
所以最后的图线类似下面这样,这里只显示出了A-Z 26个字母,其实应该有255个:



可以想象这个树完全展开还是非常大的,所以这是标准的空间换时间的算法实现,所以创建TrieTree的过程应该是递归的过程,直到到达最深的深度,此时应该创建的Leaf叶子节点,至此,树创建完毕,看代码实现:

private TrieNode buildTrie(BinaryComparable[] splits, int lower,
      int upper, byte[] prefix, int maxDepth) {
    final int depth = prefix.length;
    if (depth >= maxDepth || lower == upper) {
      //深度抵达最大的时候,应创建叶子节点了
      return new LeafTrieNode(depth, splits, lower, upper);
    }
    InnerTrieNode result = new InnerTrieNode(depth);
    byte[] trial = Arrays.copyOf(prefix, prefix.length + 1);
    // append an extra byte on to the prefix
    int currentBound = lower;
    //每个父节点拥有着255个子节点
    for(int ch = 0; ch < 255; ++ch) {
      trial[depth] = (byte) (ch + 1);
      lower = currentBound;
      while (currentBound < upper) {
        if (splits[currentBound].compareTo(trial, 0, trial.length) >= 0) {
          break;
        }
        currentBound += 1;
      }
      trial[depth] = (byte) ch;
      //result.child为首节点,递归创建子节点
      result.child[0xFF & ch] = buildTrie(splits, lower, currentBound, trial,
                                   maxDepth);
    }
    // pick up the rest
    trial[depth] = 127;
    result.child[255] = buildTrie(splits, currentBound, upper, trial,
                                  maxDepth);
    return result;
  }
以上的步骤还只是初始化的过程,并非key查找获取partition分区的操作,构建过程的的流程图如下:


    接下来的步骤就是关键的输入key,进而查找分区的过程了,非二进制比较类型的情况很简单,直接通过自己的插入的比较器,二分搜索即可知道结果。我们看看TrieTree实现的字符串类型的查找分区如何实现,从以上构建的过程,我们知道,他是一层层的逐层查找过程,比如你要找,aad这个字符,你当然首先得第一个节点找a,然后再往这个节点的第一个子节点就是字符a在查找,最后找到叶子节点,在叶子节点的查找,Hadoop还是用了二分查找,这时因为本身的划分数据不是很多,不需要排序直接查找即可。

下面看看代码的实现,首先是innner节点,但字符的查找:

....
    /**
     * 非叶子的节点的查询
     */
    public int findPartition(BinaryComparable key) {
      //获取当前的深度
      int level = getLevel();
      
      if (key.getLength() <= level) {
        return child[0].findPartition(key);
      }
      
      //从key在此位置对应的字符child开始继续搜寻下一个,key.getBytes()[level]为第level位置的字符
      return child[0xFF & key.getBytes()[level]].findPartition(key);
    }
如果抵达了最后一层的LeafTrieNode,调用的是他自己的方法:

....
    //在叶子节点,进行二分查找分区号
    public int findPartition(BinaryComparable key) {
      final int pos = Arrays.binarySearch(splitPoints, lower, upper, key) + 1;
      return (pos < 0) ? -pos : pos;
    }
最终返回的也是分区号。也就完成了这个分区算法的最终实现了。标准的空间换时间算法,但是要保证此算法的高效性,对于划分点的采集就显得非常重要了,得要保证有一定的代表性。才能保证分区间的有序。在Hadoop中提供了3个采集的类:

SplitSampler:对前n个记录进行采样
RandomSampler:遍历所有数据,随机采样
IntervalSampler:固定间隔采样


小小的partition算法也蕴藏着很多奇妙的算法,MapReduce的代码真的是一份不可多得的好资料啊。

作者:Androidlushangderen 发表于2014-11-16 14:48:59 原文链接
阅读:26 评论:0 查看评论

相关 [partitioner 分区 分析] 推荐:

Partitioner分区过程分析

- - CSDN博客架构设计推荐文章
            Partition的中文意思就是分区,分片的意思,这个阶段也是整个MapReduce过程的第三个阶段,就在Map任务的后面,他的作用就是使key分到通过一定的分区算法,分到固定的区域中,给不同的Reduce做处理,达到负载均衡的目的. 他的执行过程其实就是发生在上篇文章提到的collect的过程阶段,当输入的key调用了用户的map函数时,中间结果就会被分区了.

hiveQL分区表

- - CSDN博客云计算推荐文章
2、加载数据时显示指定分区值. 4、show partitions 可以查看表的分区. 注意:partitioned by 子句中定义的列,数据文件中并不包含这些列值. 5、select 使用分区列查询,hive会对输入做修剪;. 作者:u011984824 发表于2013-11-7 13:25:38 原文链接.

mysql分区举例---子分区

- - ITeye博客
mysql允许RANGE和LIST分区上再进行HASH和KEY的子分区. 建立ts3表和ts2一模一样. 已有 0 人发表留言,猛击->> 这里<<-参与讨论. —软件人才免语言低担保 赴美带薪读研.

mysql分区举例---HASH分区

- - ITeye博客
    hash分区的目的是将数据均匀的分布到预先定义的各个分区中,保证各分区的数据数量大致一致. 在RANGE和LIST分区中,必须明确指定一个给定的列值或列值集合应该保存在哪个分区中;而在HASH分区中,MYSQL自动完成这些工作,用户所要做的只是基于将要被散列的列值指定一个列值或者表达式,以及指定呗分区的表将要被分割成的分区数量.

Oracle分区表及分区索引

- - Oracle - 数据库 - ITeye博客
分区表的几种分类:. 1、Range(范围)分区. 是应用范围比较广的表分区方式,它是以列的值的范围来做为分区的划分条件,将记录存放到列值所在的. 比如按照时间划分,2012 年1 季度的数据放到a 分区,12年2 季度的数据放到b分区,. 因此在创建的时候呢,需要你指定基于的列,以及分区的范围值,如果某些记录暂无法预测范围,.

oracle分区详解

- - Oracle - 数据库 - ITeye博客
此文从以下几个方面来整理关于分区表的概念及操作:.         1.表空间及分区表的概念.         2.表分区的具体作用.         3.表分区的优缺点.         4.表分区的几种类型及操作方法.         5.对表分区的维护性操作. (1.) 表空间及分区表的概念.

MySQL分区技术

- - 数据库 - ITeye博客
mysql分区技术是mysql5.1以后出现的新技术,能替代分库分表技术,它的优势在于只在物理层面来降低数据库压力. 常用的MySQL分区类型:. 1.RANGE分区:基于属于一个给定的连续区间的列值,把多行分配给分区(基于列). 2.LIST分区:类似于按RANGE分区,区别在于LIST分区是基于列值匹配一个离散值集合的某个值来进行选择(基于列值是固定值的).

竞品分析

- 章明 - 互联网的那点事
关于竞品分析,之前天行(@天行Aeros)有篇文章《设计公式:简单有效的竞品分析》已经进行了介绍,本文在该文章的基础之上再进行一些分享,希望对大家有用. 竞品分析(Competitive Analysis)一词最早源于经济学领域. 市场营销和战略管理方面的竞品分析是指对现有的或潜在的竞争产品的优势和劣势进行评价.