利用hadoop mapreduce 做数据排序

标签: 利用 hadoop mapreduce | 发表时间:2016-07-22 10:53 | 作者:
出处:http://m635674608.iteye.com
     我们的需求是想统计一个文件中用IK分词后每个词出现的次数,然后按照出现的次数降序排列。也就是高频词统计。

由于hadoop在reduce之后就不能对结果做什么了,所以只能分为两个job完成,第一个job统计次数,第二个job对第一个job的结果排序。 第一个job的就是hadoop最简单的例子countwords,我要说的是用hadoop对结果排序。 假设第一个job的结果输出如下:

   part-r-0000文件内容:

a    5
b    4
c    74
d    78
e    1
r    64
f    4

要做的就是按照每个词出现的次数降序排列。

   <!-- lang: lua -->
**********************************分割线*****************************************

首先可能会出现这样的问题:

1.可能上一个job为多个reduce,也就是会产生多个结果文件,因为一个reduce就会生成一个结果文件,结果存放在上一个job输出目录下类似part-r-00的文件里。

2.需要排序的文件内容很大,所以需要考虑多个reduce的情况。

   <!-- lang: lua -->
*********************************分割线*******************************

怎么去设计mapreduce

1.在map阶段按照行读取文本,然后调用map方法时把上一个job的结果颠倒,也就是map后结果应该是这样的

   <!-- lang: java -->
5    a
4    b
74    c
................
.........................
4    f

2.然后map后,hadoop会对结果进行分组,这时结果就会变成 <!-- lang: lua -->

   (5:a)
(4:b,f)
(74:c)

3.因为hadoop对数据分组后默认是按照key升序排序的,所以需要自定义排序函数将分组数据降序排序。

4.然后按照reduce数目的大小自定义分区函数,让结果形成多个区间,比如我认为大于50的应该在一个区间,一共3个reduce,那么最后的 数据应该是三个区间,大于50的直接分到第一个分区0,25到50之间的分到第二个分区1,小于25的分到第三个分区2.因为分区数和reduce数是相 同的,所以不同的分区对应不同的reduce,因为分区是从0开始的,数据分区到分区0的会被分到第一个reduce处理,分区是1的会分到第2个 reduce处理,依次类推。并且reduce对应着输出文件,所以,第一个reduce生成的文件就会是part-r-0000,第二个reduce对 应的生成文件就会是part-r-0001,依次类推,所以reduce处理时只需要把key和value再倒过来直接输出。这样最后就会形成数目最大的 字符串就会在第一个生成文件里,排好序的数据就是按照文件命名的顺序存放的。

   **其实就是利用了hadoop分组的特点,会把key相同的字符串放到一个组里,然后我们把分组的数据用自己定义的排序函数按照key排序后,再按照分区函数分到不同的reduce,固然会是第一个reduce结果文件里面是最大数字的已排序集合,也就是说需要排好序的数据时只需要依次遍历reduce的结果文件part-r-0000,part-r0001,part-r-0002...。当然,如果只有一个reduce,那就正好是一个排好序的结果文件。**

代码如下:

   <!-- lang: lua -->
*******************************分割线*****************************************

map:

   <!-- lang: java -->
/**
 * 把上一个mapreduce的结果的key和value颠倒,调到后就可以按照key排序了。
 * 
 * @author zhangdonghao
 * 
 */
    public class SortIntValueMapper extends
    Mapper<LongWritable, Text, IntWritable, Text> {
private final static IntWritable wordCount = new IntWritable(1);

private Text word = new Text();

public SortIntValueMapper() {
    super();
}

@Override
public void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {

    StringTokenizer tokenizer = new StringTokenizer(value.toString());
    while (tokenizer.hasMoreTokens()) {
        word.set(tokenizer.nextToken().trim());
        wordCount.set(Integer.valueOf(tokenizer.nextToken().trim()));
        context.write(wordCount, word);
    }
}
}

reudce:

   <!-- lang: java -->
/**
 * 把key和value颠倒过来输出
 * @author zhangdonghao
 * 
 */
    public class SortIntValueReduce extends
    Reducer<IntWritable, Text, Text, IntWritable> {

private Text result = new Text();

@Override
public void reduce(IntWritable key, Iterable<Text> values, Context context)
        throws IOException, InterruptedException {
    for (Text val : values) {
        result.set(val.toString());
        context.write(result, key);
    }

}
}

Partitioner:

   <!-- lang: java -->
/**
 * 按照key的大小来划分区间,当然,key 是 int值
 * 
 * @author zhangdonghao
 * 
 */
public class KeySectionPartitioner<K, V> extends Partitioner<K, V> {

public KeySectionPartitioner() {
}

@Override
public int getPartition(K key, V value, int numReduceTasks) {
    /**
     * int值的hashcode还是自己本身的数值
     */
    //这里我认为大于maxValue的就应该在第一个分区
    int maxValue = 50;
    int keySection = 0;
    // 只有传过来的key值大于maxValue 并且numReduceTasks比如大于1个才需要分区,否则直接返回0
    if (numReduceTasks > 1 && key.hashCode() < maxValue) {
        int sectionValue = maxValue / (numReduceTasks - 1);
        int count = 0;
        while ((key.hashCode() - sectionValue * count) > sectionValue) {
            count++;
        }
        keySection = numReduceTasks - 1 - count;
    }
    return keySection;
}

}

Comparator:

   <!-- lang: java -->
/**
 * int的key按照降序排列
 * 
 * @author zhangdonghao
 * 
 */
public class IntKeyDescComparator extends WritableComparator {

protected IntKeyDescComparator() {
    super(IntWritable.class, true);

}

@Override
public int compare(WritableComparable a, WritableComparable b) {
    return -super.compare(a, b);
}

}

job的关键设置:

   <!-- lang: java -->
            /**
     * 这里是map输出的key和value类型
     */
    job.setOutputKeyClass(IntWritable.class);
    job.setOutputValueClass(Text.class);

    job.setMapperClass(SortIntValueMapper.class);
    // job.setCombinerClass(WordCountReduce.class);
    job.setReducerClass(SortIntValueReduce.class);
    // key按照降序排列
    job.setSortComparatorClass(IntKeyAscComparator.class);

    job.setPartitionerClass(KeySectionPartitioner.class);

    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);
            /**
              *这里可以放输入目录数组,也就是可以把上一个job所有的结果都放进去
             **/
            FileInputFormat.setInputPaths(job, inputPath);

    FileOutputFormat.setOutputPath(job,outputPath);



http://my.oschina.net/132722/blog/168022


已有 0 人发表留言,猛击->> 这里<<-参与讨论


ITeye推荐



相关 [利用 hadoop mapreduce] 推荐:

利用hadoop mapreduce 做数据排序

- - zzm
我们的需求是想统计一个文件中用IK分词后每个词出现的次数,然后按照出现的次数降序排列. 由于hadoop在reduce之后就不能对结果做什么了,所以只能分为两个job完成,第一个job统计次数,第二个job对第一个job的结果排序. 第一个job的就是hadoop最简单的例子countwords,我要说的是用hadoop对结果排序.

Hadoop MapReduce技巧

- - 简单文本
我在使用Hadoop编写MapReduce程序时,遇到了一些问题,通过在Google上查询资料,并结合自己对Hadoop的理解,逐一解决了这些问题. Hadoop对MapReduce中Key与Value的类型是有要求的,简单说来,这些类型必须支持Hadoop的序列化. 为了提高序列化的性能,Hadoop还为Java中常见的基本类型提供了相应地支持序列化的类型,如IntWritable,LongWritable,并为String类型提供了Text类型.

下一代Hadoop MapReduce

- Jia - NoSQLFan
本文来自Hadoop Summit大会的一个演讲稿,主讲是Hadoop核心开发团队的Arun C Murthy (@acmurthy),同时他也是Yahoo!刚刚剥离的Hadoop独立公司Hortonworks的 Founder和架构师. 演讲中他讲述了现在的Hadoop存在的一些问题和集群上限,并展望了下一代Hadoop和其MapReduce将会得到的巨大提升.

"Hadoop/MapReduce/HBase"分享总结

- - ITeye博客
此分享是关于hadoop生态系统的简单介绍包括起源到相对应用. Hadoop和HBase.pdf (2.1 MB). 已有 0 人发表留言,猛击->> 这里<<-参与讨论. —软件人才免语言低担保 赴美带薪读研.

Hadoop之MapReduce单元测试

- - ITeye博客
通常情况下,我们需要用小数据集来单元测试我们写好的map函数和reduce函数. 而一般我们可以使用Mockito框架来模拟OutputCollector对象(Hadoop版本号小于0.20.0)和Context对象(大于等于0.20.0). 下面是一个简单的WordCount例子:(使用的是新API).

Hadoop MapReduce高级编程

- - 互联网 - ITeye博客
•combine函数把一个map函数产生的对(多个key, value)合并成一个新的. 将新的作为输入到reduce函数中,其格式与reduce函数相同. •这样可以有效的较少中间结果,减少网络传输负荷. •什么情况下可以使用Combiner.

[转]基于mapreduce的Hadoop join实现

- -
对于一个大数据的分析应用,join是必不可少的一项功能.现在很多构建与hadoop之上的应用,如Hive,PIG等在其内部实现了join程序,可以通过很简单的sql语句或者数据操控脚本完成相应的Join工作.那么join应该如何实现呢?今天我们就对join做一个简单的实现. 我们来看一个例子,现在有两组数据:一组为单位人员信息,如下:.

【Hadoop】MapReduce使用combiner优化性能

- - CSDN博客云计算推荐文章
当MapReduce模型中,reduce执行的任务为统计分类类型的值总量或去重后的数量,或最大值最小值时,可以考虑在Map输出后进行combine操作;这样可以减少网络传输带来的开销,同时减轻了reduce任务的负担. Combine操作是运行在每个节点上的,只会影响本地Map的输出结果;Combine的输入为本地map的输出结果(一般是数据在溢出到磁盘之前,可以减少IO开销),其输出则作为reduce的输入.

hadoop的IO和MapReduce优化参数

- - CSDN博客系统运维推荐文章
           在MapReduce执行过程中,特别是Shuffle阶段,尽量使用内存缓冲区存储数据,减少磁盘溢写次数;同时在作业执行过程中增加并行度,都能够显著提高系统性能,这也是配置优化的一个重要依据.            下面分别介绍I/O属性和MapReduce属性这两个类的部分属性,并指明其优化方向.

Hadoop MapReduce编程入门案例

- - CSDN博客云计算推荐文章
Hadoop入门例程简析中. (下面的程序下载地址: http://download.csdn.net/detail/zpcandzhj/7810829). (1)Hadoop新旧API的区别. 新的API倾向于使用虚类(抽象类),而不是接口,因为这更容易扩展. 例如,可以无需修改类的实现而在虚类中添加一个方法(即用默认的实现).