Hadoop中的各种排序

标签: hadoop 排序 | 发表时间:2014-02-21 19:26 | 作者:haiyang_1987
出处:http://www.iteye.com
转载:
http://blog.csdn.net/kingjinzi_2008/article/details/7738188


1:shuffle阶段的排序(部分排序)

shuffle阶段的排序可以理解成两部分,一个是对spill进行分区时,由于一个分区包含多个key值,所以要对分区内的<key,value>按照key进行排序,即key值相同的一串<key,value>存放在一起,这样一个partition内按照key值整体有序了。

第二部分并不是排序,而是进行merge,merge有两次,一次是map端将多个spill 按照分区和分区内的key进行merge,形成一个大的文件。第二次merge是在reduce端,进入同一个reduce的多个map的输出 merge在一起,该merge理解起来有点复杂,最终不是形成一个大文件,而且期间数据在内存和磁盘上都有,关于这点金子准备日后单独整理一下。

所以shuffle阶段的merge并不是严格的排序意义,只是将多个整体有序的文件merge成一个大的文件,由于同的task执行,map的输出会有所不同,所以merge后的结果不是每次都相同,不过还是严格要求按照分区划分,同时每个分区内的具有相同key的<key,value>对挨在一起。



shuffle排序综述:如果只定义了map函数,没有定义reduce函数,那么输入数据经过shuffle的排序后,结果为key值相同的输出挨在一起,且key值小的一定在前面,这样整体来看key值有序(宏观意义的,不一定是按从大到小,因为如果采用默认的HashPartitioner,则key 的hash值相等的在一个分区,如果key为IntWritable的话,每个分区内的key会排序好的),而每个key对应的value不是有序的。

应用一:金子理解:shuffle的排序随不能满足全局排序,但是实际中还是帮助我们做了很多工作,比如我们只希望把<key,value>对按照key值,将相同key的<key,value>对输出到一起,这样shuffle排序就可以满足了,也就不需要reduce函数,只单独指定map函数就OK啦!

应用二:基于分区的MapFile查找技术。(我没仔细看)



2:全排序

对于全排序,金子深有体会,借助于hadoop的Terasort,我曾经写了整数和字符串的全排序,其中代码重叠率很高,只注意改改输入格式什么的就OK了。要进行全局排序,首先要理解分区的概念,并且要使用TotalOrderpartition(因为默认的partition是hashpartition,不适用于全局排序)。主要思路就是将数据按照区间进行分割,比如对整数排序,[0,10000]的在partiiton 0中,(10000,20000]在partition 1中。。。这样排序后面的partition中的数据肯定比排在前面的partition中的数要大,宏观上看是有序的,然后在对每个分区中的数据进行排序,由于这时分区中数据量已经比较小了,在进行排序就容易的多了。在数据分布均匀的情况下,每个分区内的数据量基本相同,这种就是比较理想的情况了,但是实际中数据往往分布不均匀,出现了数据倾斜的情况,这时按照之前的分区划分数据就不合适了,此时就需要一个东西的帮助——采样器。采样的核心思想是只查看一小部分键,获得键的近似分布,并由此键分区。关于采样器的一些使用细节,可以查看我的另一篇博客:Hadoop 中的采样器-不一样的视角 Hadoop中的采样器——不一样的视角

典型应用:TeraSort



3:二次排序

也称作辅助排序,MapReduce框架在把记录到达reducers之前会将记录按照键排序。对于任意一个特殊的键,然而,值是不排序的。甚至是,值在两次执行中的顺序是不一样的,原因是它们是从不同的map中来的,这些不同的map可能在不同的执行过程中结束的先后顺序不确定。通常情况下,大多数的MapReduce程序的reduce函数不会依赖于值的顺序。然而,我们也可通过以一种特殊的方式排序和分组键,来指定值的顺序。

二次排序金子并没有使用过,不过在join连接操作中,输入到一个reduce中的value<list>是来自两个表的,如果进行排序,将第一个表的放在前面,第二个表的放在后面,这样就只需要将表1存放到ArrayList中,表二不需要,然后进行全连接就搞定啦,这样是很多关于并行数据库的论文中对join操作的一个明显优化。

看到一篇写的很好的分析二次排序的博客:http://blog.sina.com.cn/s/blog_70324d8d0100wa63.html,讲的是日志分析中二次排序是怎么应用的。要写二次排序,则需要非常熟悉Jobconf的几个函数,以及各自相关的类:


A:setOutputKeyComparatorClass  :参数为继承RawComparator的子类
public void setOutputKeyComparatorClass(Class<? extends RawComparator> theClass)
RawComparator接口:继承自Comparator ,就是一个比较器,直接在代表对象特征的字节上进行操作。经常使用的其实现的类有:DoubleWritable.Comparator, FloatWritable.Comparator, IntWritable.Comparator,  LongWritable.Comparator, NullWritable.Comparator, SecondarySort.FirstGroupingComparator, SecondarySort.IntPair.Comparator,Text.Comparator,WritableComparator
自定义的类要实现compare函数。以上这部分的代码就是对组合键中的key进行排序的意思。

很多的二次排序例子中就利用继承WritableComparator来实现根据组合键进行排序。即将compare中的两个参数转换为组合键,组合键中的自然键不同时按照自然键排序,自然键相同时按照自然值排序。可参见《权威指南》中p243的代码。



B:setPartitionerClass:用于指定用于分区的类,参数我继承Partitioner的类


public void setPartitionerClass(Class<? extends Partitioner> theClass)
Deprecated.
Set the Partitioner class used to partition Mapper-outputs to be sent to the Reducers.
设置分区的类目的就是用于将map的输出按照指定的规则进行分区,每个分区进入不同的reduce,每个分区中按照自己程序的要求,可以有多个key值,如果一般的分区如Hashpartitioner 和TotalOrderPartitioner不能满足需求时,需要自定义继承Partitioner的类。
二次排序中由于map的输出key为组合键IntPair,所以自定义的分区类要继承Partitioner,同时函数getPartition 的返回值要根据组合键中的自然键,即key.first进行判断,例如return Math.abs(key.getFirst()*127)% numpartitions; 返回值相同的就被分配到一个分区中了。这样一个分区中有多个不同的自然键,但是reduce的输入要满足对于非组合键,就是单纯的一个自然键时,输入是 <key, value<list>>的形式,而现在全部都是<IntPair ,value><IntPair,value>....的形式,我们要将自然键相同的value放到一起,形成一个list,要怎么办呢,就要进行分组啦!!!分组也同样要用comparator,见下面C哦~



C:setOutputValueGroupingComparator:指定用户自定义的comparator,用于将reduce的输入进行分组,二次排序中可以理解为将自然键key相同的放到一起,相同key的value放到一个value迭代器里。

public void setOutputValueGroupingComparator(Class<? extends RawComparator> theClass)



二次排序工作原理综述:(转自:http://p-x1984.iteye.com/blog/800269)

在map阶段,使用job.setInputFormatClass定义的InputFormat将输入的数据集分割成小数据块splites,同时InputFormat提供一个RecordReder的实现。Hadoop自带的例子SecondrySort使用TextInputFormat,他提供的RecordReder会将文本的一行的行号作为key,这一行的文本作为value。这就是自定义Map的输入是<LongWritable, Text>的原因。然后调用自定义Map的map方法,将一个个<LongWritable, Text>对输入给Map的map方法。注意输出应该符合自定义Map中定义的输出<IntPair, IntWritable>。最终是生成一个List<IntPair, IntWritable>。在map阶段的最后,会先调用job.setPartitionerClass对这个List进行分区,每个分区映射到一个reducer。每个分区内又调用job.setSortComparatorClass设置的key比较函数类排序。可以看到,这本身就是一个二次排序。如果没有通过job.setSortComparatorClass设置key比较函数类,则使用key的实现的compareTo方法。在第一个例子中,使用了IntPair实现的compareTo方法,而在下一个例子中,专门定义了key比较函数类。

在reduce阶段,reducer接收到所有映射到这个reducer的map输出后,也是会调用job.setSortComparatorClass设置的key比较函数类对所有数据对排序。然后开始构造一个key对应的value迭代器。这时就要用到分组,使用job.setGroupingComparatorClass设置的分组函数类。只要这个比较器比较的两个key相同,他们就属于同一个组,它们的value放在一个value迭代器,而这个迭代器的key使用属于同一个组的所有key的第一个key。最后就是进入Reducer的reduce方法,reduce方法的输入是所有的(key和它的value迭代器)。同样注意输入与输出的类型必须与自定义的Reducer中声明的一致。

二次排序应用: 日志分析(转自:)
在计算visits的时候是一个很常见的需求。比如
原始日志为
用户标识为11111的 1点00 访问页面page1
用户标识为11111的 1点01 访问页面page2
用户标识为11111的 1点05 访问页面page3
用户标识为22222的 1点00 访问页面page1
用户标识为22222的 1点05 访问页面page3,


你需要统计结果为
用户11111 1:00的入口是 page1,出口是page3,访问3个页面,停留时间为5分钟
用户22222 1点00的入口是page1,出口是page3,访问2个页面,停留时间为5分钟
3 实步


实现简要步骤为
1. 构造(用户标识,时间)作为key, 时间和其他信息(比如访问页面)作为value,然后进入map流程
2. 在缺省的reduce的,传入参数为 单个key和value的集合,这会导致相同的用户标识和相同的时间被分在同一组,比如用户标识为11111的 1点00一个reduce, 用户标识为11111的 1点01另外一组,这不符合要求.所以需要更改缺省分组,需要由原来的按(用户标识,时间)改成按(用户标识)分组就行了。这样reduce是传入参数变为
户标识为11111 的value集合为(1点00 访问页面page1, 1点01 访问页面page2, 1点05 访问页面page3),然后在reduce方法里写自己的统计逻辑就行了。
3. 当然1和2步之间,有2个重要细节要处理:确定key的排序规则和确定分区规则(分区规则保证map后分配数据到reduce按照用户标识来散列,而不是按缺省的用户标识+时间来散列)


已有 0 人发表留言,猛击->> 这里<<-参与讨论


ITeye推荐



相关 [hadoop 排序] 推荐:

hadoop 二次排序

- - 企业架构 - ITeye博客
hadoop的工作流程:. 是在key中,排序value的实现,思路是. 1.把value中需要有序的部分value-part放入key中. 2.sortCompare类或key的CompareTo方法中完成对key+value-part的比较. 3.GroupingCompare中只对key进行比较,这样相同的key跌倒获取到reduce中.

Hadoop中的各种排序

- - 互联网 - ITeye博客
1:shuffle阶段的排序(部分排序). shuffle阶段的排序可以理解成两部分,一个是对spill进行分区时,由于一个分区包含多个key值,所以要对分区内的按照key进行排序,即key值相同的一串存放在一起,这样一个partition内按照key值整体有序了.

hadoop复合键排序使用方法

- - CSDN博客云计算推荐文章
在hadoop中处理复杂业务时,需要用到复合键,复合不同于单纯的继承Writable接口,而是继承了WritableComparable接口,而实际上,WritableComparable接口继承了Writable和Comparable接口,如果只需要使用某一个类作为传值对象而不是作为key,继承Writable接口即可.

利用hadoop mapreduce 做数据排序

- - zzm
我们的需求是想统计一个文件中用IK分词后每个词出现的次数,然后按照出现的次数降序排列. 由于hadoop在reduce之后就不能对结果做什么了,所以只能分为两个job完成,第一个job统计次数,第二个job对第一个job的结果排序. 第一个job的就是hadoop最简单的例子countwords,我要说的是用hadoop对结果排序.

Hadoop初体验――搭建hadoop简单实现文本数据全局排序

- - 学着站在巨人的肩膀上
      手头上有三台配置一样的电脑,就不去装虚拟机了,配置如下:.       三台电脑装有相同的操作系统――Ubuntu 11.04.       任选一台机器作为master,其他机器作为slaves,所有机器拥有相同的用户、相同的环境变量配置、相同的hadoop目录结构、相同的Java目录结构.

Hadoop中的排序器/组合器/合并器

- - 学着站在巨人的肩膀上
目前,海量数据处理主要存在二个问题:大规模计算(cpu+mem)、海量数据存储(disk),而Hadoop被专门设计用来针对海量数据的处理,它通过分布式文件系统解决海量数据的存储问题,组织成千上万个计算节点来共同完成一个任务解决了大规模计算问题. Hadoop的核心是MapReduce,而不是分布式文件系统HDFS,这是因为MapRduce所依赖的存储系统并不依赖于任何一个文件系统,甚至是分布式文件系统.

如何在Hadoop里面实现二次排序

- - 编程语言 - ITeye博客
在hadoop里面处理的数据,默认按输入内容的key进行排序的,大部分情况下,都可以满足的我们的业务需求,但有时候,可能出现类似以下的需求,输入内容:. 秦东亮;72 秦东亮;34 秦东亮;100 三劫;899 三劫;32 三劫;1 a;45 b;567 b;12. 注意上面的输出1,和输出2,其实都是一样的逻辑,只不过,输出的形式稍微改了下,那么今天散仙,就来分析下,怎么在hadoop里面,实现这样的需求.

如何使用hadoop对海量数据进行统计并排序

- - ITeye博客
hadoop在如下的几种应用场景里,用的还是非常广泛的,1,搜索引擎建索引,2,topK热关键词统计,3,海量日志的数据分析等等. 散仙,今天的这个例子的场景要对几亿的单词或短语做统计和并按词频排序,当然这些需求都是类似WordCount问题,如果你把Hadoop自带的WordCount的例子,给搞懂了,基本上做一些IP,热词的统计与分析就很很容易了,WordCount问题,确实是一个非常具有代表性的例子.

Hadoop二次排序关键点和出现时机(也叫辅助排序、Secondary Sort)

- - The Big Data Way,平凡但不乏味
    Hadoop二次排序在面试的时候出现频率还是比较高的. 今天花了点时间通过源码深入学习了一下. 后面内容以Hadoop自带实例——SecondarySort讲解.     它的作用是决定数据分区,说白了就是决定map输出key-value由哪个reduce处理,每个map task输出的key-value都会执行Partitioner的getPartition()方法,用于返回当前key-value由哪个reduce处理.

Hadoop Streaming 编程

- - 学着站在巨人的肩膀上
Hadoop Streaming是Hadoop提供的一个编程工具,它允许用户使用任何可执行文件或者脚本文件作为Mapper和Reducer,例如:. 采用shell脚本语言中的一些命令作为mapper和reducer(cat作为mapper,wc作为reducer). 本文安排如下,第二节介绍Hadoop Streaming的原理,第三节介绍Hadoop Streaming的使用方法,第四节介绍Hadoop Streaming的程序编写方法,在这一节中,用C++、C、shell脚本 和python实现了WordCount作业,第五节总结了常见的问题.