Hadoop MapReduce技巧

标签: hadoop mapreduce 技巧 | 发表时间:2013-03-19 13:19 | 作者:
出处:http://agiledon.github.com/

我在使用Hadoop编写MapReduce程序时,遇到了一些问题,通过在Google上查询资料,并结合自己对Hadoop的理解,逐一解决了这些问题。

自定义Writable

Hadoop对MapReduce中Key与Value的类型是有要求的,简单说来,这些类型必须支持Hadoop的序列化。为了提高序列化的性能,Hadoop还为Java中常见的基本类型提供了相应地支持序列化的类型,如IntWritable,LongWritable,并为String类型提供了Text类型。不过,这些Hadoop内建的类型并不足以支持真实遇到的业务。此时,就需要自定义Writable类,使得它既能够作为Job的Key或者Value,又能体现业务逻辑。

假设我已经从豆瓣抓取了书籍的数据,包括书籍的Title以及读者定义的Tag,并以Json格式存储在文本文件中。现在我希望提取这些数据中我感兴趣的内容,例如指定书籍的Tag列表,包括Tag被标记的次数。这些数据可以作为向量,为后面的数据分析提供基础数据。对于Map,我希望读取Json文件,然后得到每本书的Title,以及对应的单个Tag信息。作为Map的输出,我希望是我自己定义的类型BookTag。它只包括Tag的名称和标记次数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
      public class BookTag implements Writable {
    private String name;
    private int count;

    public BookTag() {
        count = 0;
    }

    public BookTag(String name, int count) {
        this.name = name;
        this.count = count;
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        if (dataOutput != null) {
            Text.writeString(dataOutput, name);
            dataOutput.writeInt(count);
        }
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        if (dataInput != null) {
            name = Text.readString(dataInput);
            count = dataInput.readInt();
        }
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getCount() {
        return count;
    }

    public void setCount(int count) {
        this.count = count;
    }

    @Override
    public String toString() {
        return "BookTag{" +
                "name='" + name + '\'' +
                ", count=" + count +
                '}';
    }
}

注意,在write()与readFields()方法中,对于String类型的处理完全不同于Int、Long等类型,它需要调用Text的相关静态方法。

针对每本书,Map出来的结果可能包含重复的BookTag信息(指Tag Name相同);而我需要得到每个Tag的标记总和,以作为数据分析的向量。因此,作为Reduce的输入,可以是<Text, Iterable>,但输出则应该是合并了相同Tag信息的结果。为此,我引入了BookTags类,在其内部维持了一个BookTag的Map,它同样需要实现Writable。由于BookTags包含了一个集合类型,因此它的实现会略有不同:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
      public class BookTags implements Writable {
    private Map<String, BookTag> tags = new HashMap<String, BookTag>();

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeInt(tags.size());
        for (BookTag tag : tags.values()) {
            tag.write(dataOutput);
        }
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        int size = dataInput.readInt();
        for (int i = 0; i < size; i++) {
            BookTag tag = new BookTag();
            tag.readFields(dataInput);
            tags.put(tag.getName(), tag);
        }
    }

    public void add(BookTag tag) {
            String tagName = tag.getName();
            if (tags.containsKey(tagName)) {
                BookTag bookTag = tags.get(tagName);
                bookTag.setCount(bookTag.getCount() + tag.getCount());
            } else {
                tags.put(tagName, tag);
            }
    }

    @Override
    public String toString() {
        StringBuilder resultTags = new StringBuilder();
        for (BookTag tag : tags.values()) {
            resultTags.append(tag.toString());
            resultTags.append("|");
        }
        return resultTags.toString();
    }
}

其实,针对这种嵌套了集合的自定义Writable类型,由于嵌套的类型同样实现了Writable接口,因而同样可以调用嵌套类型的write()与readFields()方法,唯一的区别是需要将集合的Size写入到DataOutput中,以便于在读取时可以遍历集合。这实际上是一种Composite模式。

Iterable的奇怪行为

我需要在reduce()方法中,遍历传入的Iterable,以便于对重复的Tag进行累加操作。在遍历该对象时,我发现了一个奇怪现象,即最终得到的每本书的Tag信息,全部变成了一样的内容。通过对Reduce Job进行调试,发现每当遍历到Iterable的下一个元素时,这个最新的值就会覆盖之前得到的对象,使其变成同一个对象。通过Google,我发现这个问题是Hadoop的奇怪行为,即Iterable对象的next()方法永远会返回同一个对象。解决办法就是在遍历时,创建一个新对象放到我们要存储的集合中,如下第5行代码所示:

1
2
3
4
5
6
7
8
9
          public static class BookReduce extends Reducer<Text, BookTag, Text, BookTags> {
        public void reduce(Text key, Iterable<BookTag> values, Context context) throws IOException, InterruptedException {
            BookTags bookTags = new BookTags();
            for (BookTag tag : values) {
                bookTags.add(new BookTag(tag.getName(), tag.getCount()));
            }
            context.write(key, bookTags);
        }
    }

这里得到的一个经验是,在编写MapReduce程序时,通过调试可以帮助你快速地定位问题。调试时,可以在项目的根目录下建立input文件夹,将数据源文件放入到该文件夹中,然后在调试的参数中设置即可。

如何进行单元测试

我们同样可以给MapReduce Job编写单元测试。除了可以使用Mockito进行Mock之外,我认为MRUnit可以更好地完成对MapReduce任务的验证。MRUnit为Map与Reduce提供了对应的Driver,即MapDriver与ReduceDriver。在编写测试用例时,我们只需要为Driver指定Input与Output,然后执行Driver的runTest()方法,即可测试任务的执行是否符合预期。这种预期是针对output输出的结果而言。以WordCounter为例,编写的单元测试如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
      public class WordCounterTest {
    private MapDriver<LongWritable, Text, Text, IntWritable> mapDriver;
    private ReduceDriver<Text, IntWritable, Text, IntWritable> reduceDriver;

    @Before
    public void setUp() {
        WordCounter.Map tokenizerMapper = new WordCounter.Map();
        WordCounter.Reduce reducer = new WordCounter.Reduce();
        mapDriver = MapDriver.newMapDriver(tokenizerMapper);
        reduceDriver = ReduceDriver.newReduceDriver(reducer);
    }

    @Test
    public void should_execute_tokenizer_map_job() throws IOException {
        mapDriver.withInput(new LongWritable(12), new Text("I am Bruce Bruce"));
        mapDriver.withOutput(new Text("I"), new IntWritable(1));
        mapDriver.withOutput(new Text("am"), new IntWritable(1));
        mapDriver.withOutput(new Text("Bruce"), new IntWritable(1));
        mapDriver.withOutput(new Text("Bruce"), new IntWritable(1));
        mapDriver.runTest();
    }

    @Test
    public void should_execute_reduce_job() {
        List<IntWritable> values = new ArrayList<IntWritable>();
        values.add(new IntWritable(1));
        values.add(new IntWritable(3));

        reduceDriver.withInput(new Text("Bruce"), values);
        reduceDriver.withOutput(new Text("Bruce"), new IntWritable(4));
        reduceDriver.runTest();
    }
}

Chaining Job

通过利用Hadoop提供的ChainMapper与ChainReducer,可以较为容易地实现多个Map Job或Reduce Job的链接。例如,我们可以将WordCounter分解为Tokenizer与Upper Case两个Map任务,最后执行Reduce。遗憾的是,ChainMapper与ChainReducer似乎不支持新版本的API,它要链接的Map与Reduce必须派生自MapReduceBase,并实现对应的Mapper或Reducer接口(说明,下面的代码基本上来自于StackOverFlow的 一个帖子)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
      public class ChainWordCounter extends Configured implements Tool {
    public static class Tokenizer extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
            StringTokenizer tokenizer = new StringTokenizer(value.toString());
            while (tokenizer.hasMoreTokens()) {
                word.set(tokenizer.nextToken());
                output.collect(word, one);
            }
        }
    }

    public static class UpperCaser extends MapReduceBase implements Mapper<Text, IntWritable, Text, IntWritable> {
        public void map(Text key, IntWritable count, OutputCollector<Text, IntWritable> collector, Reporter reporter) throws IOException {
            collector.collect(new Text(key.toString().toUpperCase()), count);
        }
    }

    public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();

        @Override
        public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> collector, Reporter reporter) throws IOException {
            int sum = 0;
            while (values.hasNext()) {
                sum += values.next().get();
            }

            result.set(sum);
            collector.collect(key, result);
        }
    }

    public int run(String[] args) throws Exception {
        JobConf jobConf = new JobConf(getConf(), ChainWordCounter.class);
        FileInputFormat.setInputPaths(jobConf, new Path(args[0]));

        FileInputFormat.setInputPaths(jobConf, new Path(args[0]));
        Path outputDir = new Path(args[1]);
        FileOutputFormat.setOutputPath(jobConf, outputDir);
        outputDir.getFileSystem(getConf()).delete(outputDir, true);

        JobConf tokenizerMapConf = new JobConf(false);
        ChainMapper.addMapper(jobConf, Tokenizer.class, LongWritable.class, Text.class, Text.class, IntWritable.class, true, tokenizerMapConf);

        JobConf upperCaserMapConf = new JobConf(false);
        ChainMapper.addMapper(jobConf, UpperCaser.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, upperCaserMapConf);

        JobConf reduceConf = new JobConf(false);
        ChainReducer.setReducer(jobConf, Reduce.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, reduceConf);

        JobClient.runJob(jobConf);
        return 0;
    }

    public static void main(String[] args) throws Exception {
        int ret = ToolRunner.run(new Configuration(), new ChainWordCounter(), args);
        System.exit(ret);
    }
}

不知道什么时候这种机制能够很好地支持新版的API。

相关 [hadoop mapreduce 技巧] 推荐:

Hadoop MapReduce技巧

- - 简单文本
我在使用Hadoop编写MapReduce程序时,遇到了一些问题,通过在Google上查询资料,并结合自己对Hadoop的理解,逐一解决了这些问题. Hadoop对MapReduce中Key与Value的类型是有要求的,简单说来,这些类型必须支持Hadoop的序列化. 为了提高序列化的性能,Hadoop还为Java中常见的基本类型提供了相应地支持序列化的类型,如IntWritable,LongWritable,并为String类型提供了Text类型.

下一代Hadoop MapReduce

- Jia - NoSQLFan
本文来自Hadoop Summit大会的一个演讲稿,主讲是Hadoop核心开发团队的Arun C Murthy (@acmurthy),同时他也是Yahoo!刚刚剥离的Hadoop独立公司Hortonworks的 Founder和架构师. 演讲中他讲述了现在的Hadoop存在的一些问题和集群上限,并展望了下一代Hadoop和其MapReduce将会得到的巨大提升.

"Hadoop/MapReduce/HBase"分享总结

- - ITeye博客
此分享是关于hadoop生态系统的简单介绍包括起源到相对应用. Hadoop和HBase.pdf (2.1 MB). 已有 0 人发表留言,猛击->> 这里<<-参与讨论. —软件人才免语言低担保 赴美带薪读研.

Hadoop之MapReduce单元测试

- - ITeye博客
通常情况下,我们需要用小数据集来单元测试我们写好的map函数和reduce函数. 而一般我们可以使用Mockito框架来模拟OutputCollector对象(Hadoop版本号小于0.20.0)和Context对象(大于等于0.20.0). 下面是一个简单的WordCount例子:(使用的是新API).

Hadoop MapReduce高级编程

- - 互联网 - ITeye博客
•combine函数把一个map函数产生的对(多个key, value)合并成一个新的. 将新的作为输入到reduce函数中,其格式与reduce函数相同. •这样可以有效的较少中间结果,减少网络传输负荷. •什么情况下可以使用Combiner.

[转]基于mapreduce的Hadoop join实现

- -
对于一个大数据的分析应用,join是必不可少的一项功能.现在很多构建与hadoop之上的应用,如Hive,PIG等在其内部实现了join程序,可以通过很简单的sql语句或者数据操控脚本完成相应的Join工作.那么join应该如何实现呢?今天我们就对join做一个简单的实现. 我们来看一个例子,现在有两组数据:一组为单位人员信息,如下:.

【Hadoop】MapReduce使用combiner优化性能

- - CSDN博客云计算推荐文章
当MapReduce模型中,reduce执行的任务为统计分类类型的值总量或去重后的数量,或最大值最小值时,可以考虑在Map输出后进行combine操作;这样可以减少网络传输带来的开销,同时减轻了reduce任务的负担. Combine操作是运行在每个节点上的,只会影响本地Map的输出结果;Combine的输入为本地map的输出结果(一般是数据在溢出到磁盘之前,可以减少IO开销),其输出则作为reduce的输入.

hadoop的IO和MapReduce优化参数

- - CSDN博客系统运维推荐文章
           在MapReduce执行过程中,特别是Shuffle阶段,尽量使用内存缓冲区存储数据,减少磁盘溢写次数;同时在作业执行过程中增加并行度,都能够显著提高系统性能,这也是配置优化的一个重要依据.            下面分别介绍I/O属性和MapReduce属性这两个类的部分属性,并指明其优化方向.

Hadoop MapReduce编程入门案例

- - CSDN博客云计算推荐文章
Hadoop入门例程简析中. (下面的程序下载地址: http://download.csdn.net/detail/zpcandzhj/7810829). (1)Hadoop新旧API的区别. 新的API倾向于使用虚类(抽象类),而不是接口,因为这更容易扩展. 例如,可以无需修改类的实现而在虚类中添加一个方法(即用默认的实现).

提高hadoop的mapreduce job效率

- - 数据库 - ITeye博客
hadoop 的mapreduce 的作业在运行过程中常常碰到一些这样的情 况:. 每一个map或者reduce只有30-40秒钟就结束. 超 大规模的job 时,通常会需要大量的map和reduce的slots 支持,但是job运行起来后,running的map和reduce并没有沾满集群的可用slots.