Hadoop MapReduce技巧
我在使用Hadoop编写MapReduce程序时,遇到了一些问题,通过在Google上查询资料,并结合自己对Hadoop的理解,逐一解决了这些问题。
自定义Writable
Hadoop对MapReduce中Key与Value的类型是有要求的,简单说来,这些类型必须支持Hadoop的序列化。为了提高序列化的性能,Hadoop还为Java中常见的基本类型提供了相应地支持序列化的类型,如IntWritable,LongWritable,并为String类型提供了Text类型。不过,这些Hadoop内建的类型并不足以支持真实遇到的业务。此时,就需要自定义Writable类,使得它既能够作为Job的Key或者Value,又能体现业务逻辑。
假设我已经从豆瓣抓取了书籍的数据,包括书籍的Title以及读者定义的Tag,并以Json格式存储在文本文件中。现在我希望提取这些数据中我感兴趣的内容,例如指定书籍的Tag列表,包括Tag被标记的次数。这些数据可以作为向量,为后面的数据分析提供基础数据。对于Map,我希望读取Json文件,然后得到每本书的Title,以及对应的单个Tag信息。作为Map的输出,我希望是我自己定义的类型BookTag。它只包括Tag的名称和标记次数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 | |
注意,在write()与readFields()方法中,对于String类型的处理完全不同于Int、Long等类型,它需要调用Text的相关静态方法。
针对每本书,Map出来的结果可能包含重复的BookTag信息(指Tag Name相同);而我需要得到每个Tag的标记总和,以作为数据分析的向量。因此,作为Reduce的输入,可以是<Text, Iterable>,但输出则应该是合并了相同Tag信息的结果。为此,我引入了BookTags类,在其内部维持了一个BookTag的Map,它同样需要实现Writable。由于BookTags包含了一个集合类型,因此它的实现会略有不同:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 | |
其实,针对这种嵌套了集合的自定义Writable类型,由于嵌套的类型同样实现了Writable接口,因而同样可以调用嵌套类型的write()与readFields()方法,唯一的区别是需要将集合的Size写入到DataOutput中,以便于在读取时可以遍历集合。这实际上是一种Composite模式。
Iterable的奇怪行为
我需要在reduce()方法中,遍历传入的Iterable,以便于对重复的Tag进行累加操作。在遍历该对象时,我发现了一个奇怪现象,即最终得到的每本书的Tag信息,全部变成了一样的内容。通过对Reduce Job进行调试,发现每当遍历到Iterable的下一个元素时,这个最新的值就会覆盖之前得到的对象,使其变成同一个对象。通过Google,我发现这个问题是Hadoop的奇怪行为,即Iterable对象的next()方法永远会返回同一个对象。解决办法就是在遍历时,创建一个新对象放到我们要存储的集合中,如下第5行代码所示:
1 2 3 4 5 6 7 8 9 | |
这里得到的一个经验是,在编写MapReduce程序时,通过调试可以帮助你快速地定位问题。调试时,可以在项目的根目录下建立input文件夹,将数据源文件放入到该文件夹中,然后在调试的参数中设置即可。
如何进行单元测试
我们同样可以给MapReduce Job编写单元测试。除了可以使用Mockito进行Mock之外,我认为MRUnit可以更好地完成对MapReduce任务的验证。MRUnit为Map与Reduce提供了对应的Driver,即MapDriver与ReduceDriver。在编写测试用例时,我们只需要为Driver指定Input与Output,然后执行Driver的runTest()方法,即可测试任务的执行是否符合预期。这种预期是针对output输出的结果而言。以WordCounter为例,编写的单元测试如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 | |
Chaining Job
通过利用Hadoop提供的ChainMapper与ChainReducer,可以较为容易地实现多个Map Job或Reduce Job的链接。例如,我们可以将WordCounter分解为Tokenizer与Upper Case两个Map任务,最后执行Reduce。遗憾的是,ChainMapper与ChainReducer似乎不支持新版本的API,它要链接的Map与Reduce必须派生自MapReduceBase,并实现对应的Mapper或Reducer接口(说明,下面的代码基本上来自于StackOverFlow的 一个帖子)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 | |
不知道什么时候这种机制能够很好地支持新版的API。