如何使用Hadoop的Partitioner - 三劫散仙 - ITeye技术网站
Partitioner的作用:
对map端输出的数据key作一个散列,使数据能够均匀分布在各个reduce上进行后续操作,避免产生热点区。
Hadoop默认使用的分区函数是Hash Partitioner,源码如下:
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.apache.hadoop.mapreduce.lib.partition;
- import org.apache.hadoop.mapreduce.Partitioner;
- /** Partition keys by their {@link Object#hashCode()}. */
- public class HashPartitioner<K, V> extends Partitioner<K, V> {
- /** Use {@link Object#hashCode()} to partition. */
- public int getPartition(K key, V value,
- int numReduceTasks) {
- //默认使用key的hash值与上int的最大值,避免出现数据溢出 的情况
- return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
- }
- }
大部分情况下,我们都会使用默认的分区函数,但有时我们又有一些,特殊的需求,而需要定制Partition来完成我们的业务,案例如下:
对如下数据,按字符串的长度分区,长度为1的放在一个,2的一个,3的各一个。
- 河南省;1
- 河南;2
- 中国;3
- 中国人;4
- 大;1
- 小;3
- 中;11
这时候,我们使用默认的分区函数,就不行了,所以需要我们定制自己的Partition,首先分析下,我们需要3个分区输出,所以在设置reduce的个数时,一定要设置为3,其次在partition里,进行分区时,要根据长度具体分区,而不是根据字符串的hash码来分区。核心代码如下:
- /**
- * Partitioner
- *
- *
- * */
- public static class PPartition extends Partitioner<Text, Text>{
- @Override
- public int getPartition(Text arg0, Text arg1, int arg2) {
- /**
- * 自定义分区,实现长度不同的字符串,分到不同的reduce里面
- *
- * 现在只有3个长度的字符串,所以可以把reduce的个数设置为3
- * 有几个分区,就设置为几
- * */
- String key=arg0.toString();
- if(key.length()==1){
- return 1%arg2;
- }else if(key.length()==2){
- return 2%arg2;
- }else if(key.length()==3){
- return 3%arg2;
- }
- return 0;
- }
- }
全部代码如下:
- package com.partition.test;
- import java.io.IOException;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapred.JobConf;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Partitioner;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
- import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
- import com.qin.operadb.PersonRecoder;
- import com.qin.operadb.ReadMapDB;
- /**
- * @author qindongliang
- *
- * 大数据交流群:376932160
- *
- *
- * **/
- public class MyTestPartition {
- /**
- * map任务
- *
- * */
- public static class PMapper extends Mapper<LongWritable, Text, Text, Text>{
- @Override
- protected void map(LongWritable key, Text value,Context context)
- throws IOException, InterruptedException {
- // System.out.println("进map了");
- //mos.write(namedOutput, key, value);
- String ss[]=value.toString().split(";");
- context.write(new Text(ss[0]), new Text(ss[1]));
- }
- }
- /**
- * Partitioner
- *
- *
- * */
- public static class PPartition extends Partitioner<Text, Text>{
- @Override
- public int getPartition(Text arg0, Text arg1, int arg2) {
- /**
- * 自定义分区,实现长度不同的字符串,分到不同的reduce里面
- *
- * 现在只有3个长度的字符串,所以可以把reduce的个数设置为3
- * 有几个分区,就设置为几
- * */
- String key=arg0.toString();
- if(key.length()==1){
- return 1%arg2;
- }else if(key.length()==2){
- return 2%arg2;
- }else if(key.length()==3){
- return 3%arg2;
- }
- return 0;
- }
- }
- /***
- * Reduce任务
- *
- * **/
- public static class PReduce extends Reducer<Text, Text, Text, Text>{
- @Override
- protected void reduce(Text arg0, Iterable<Text> arg1, Context arg2)
- throws IOException, InterruptedException {
- String key=arg0.toString().split(",")[0];
- System.out.println("key==> "+key);
- for(Text t:arg1){
- //System.out.println("Reduce: "+arg0.toString()+" "+t.toString());
- arg2.write(arg0, t);
- }
- }
- }
- public static void main(String[] args) throws Exception{
- JobConf conf=new JobConf(ReadMapDB.class);
- //Configuration conf=new Configuration();
- conf.set("mapred.job.tracker","192.168.75.130:9001");
- //读取person中的数据字段
- conf.setJar("tt.jar");
- //注意这行代码放在最前面,进行初始化,否则会报
- /**Job任务**/
- Job job=new Job(conf, "testpartion");
- job.setJarByClass(MyTestPartition.class);
- System.out.println("模式: "+conf.get("mapred.job.tracker"));;
- // job.setCombinerClass(PCombine.class);
- job.setPartitionerClass(PPartition.class);
- job.setNumReduceTasks(3);//设置为3
- job.setMapperClass(PMapper.class);
- // MultipleOutputs.addNamedOutput(job, "hebei", TextOutputFormat.class, Text.class, Text.class);
- // MultipleOutputs.addNamedOutput(job, "henan", TextOutputFormat.class, Text.class, Text.class);
- job.setReducerClass(PReduce.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
- String path="hdfs://192.168.75.130:9000/root/outputdb";
- FileSystem fs=FileSystem.get(conf);
- Path p=new Path(path);
- if(fs.exists(p)){
- fs.delete(p, true);
- System.out.println("输出路径存在,已删除!");
- }
- FileInputFormat.setInputPaths(job, "hdfs://192.168.75.130:9000/root/input");
- FileOutputFormat.setOutputPath(job,p );
- System.exit(job.waitForCompletion(true) ? 0 : 1);
- }
- }
MapReduce:详解Shuffle过程 - 每天一小步 - ITeye技术网站
Shuffle过程是MapReduce的核心,也被称为奇迹发生的地方。要想理解MapReduce, Shuffle是必须要了解的。我看过很多相关的资料,但每次看完都云里雾里的绕着,很难理清大致的逻辑,反而越搅越混。前段时间在做MapReduce job 性能调优的工作,需要深入代码研究MapReduce的运行机制,这才对Shuffle探了个究竟。考虑到之前我在看相关资料而看不懂时很恼火,所以在这里我尽最大的可能试着把Shuffle说清楚,让每一位想了解它原理的朋友都能有所收获。如果你对这篇文章有任何疑问或建议请留言到后面,谢谢!
Shuffle的正常意思是洗牌或弄乱,可能大家更熟悉的是Java API里的Collections.shuffle(List)方法,它会随机地打乱参数list里的元素顺序。如果你不知道MapReduce里Shuffle是什么,那么请看这张图:
这张是官方对Shuffle过程的描述。但我可以肯定的是,单从这张图你基本不可能明白Shuffle的过程,因为它与事实相差挺多,细节也是错乱的。后面我会具体描述Shuffle的事实情况,所以这里你只要清楚Shuffle的大致范围就成-怎样把map task的输出结果有效地传送到reduce端。也可以这样理解, Shuffle描述着数据从map task输出到reduce task输入的这段过程。
在Hadoop这样的集群环境中,大部分map task与reduce task的执行是在不同的节点上。当然很多情况下Reduce执行时需要跨节点去拉取其它节点上的map task结果。如果集群正在运行的job有很多,那么task的正常执行对集群内部的网络资源消耗会很严重。这种网络消耗是正常的,我们不能限制,能做的就是最大化地减少不必要的消耗。还有在节点内,相比于内存,磁盘IO对job完成时间的影响也是可观的。从最基本的要求来说,我们对Shuffle过程的期望可以有:
OK,看到这里时,大家可以先停下来想想,如果是自己来设计这段Shuffle过程,那么你的设计目标是什么。我想能优化的地方主要在于减少拉取数据的量及尽量使用内存而不是磁盘。
我的分析是基于Hadoop0.21.0的源码,如果与你所认识的Shuffle过程有差别,不吝指出。我会以WordCount为例,并假设它有8个map task和3个reduce task。从上图看出,Shuffle过程横跨map与reduce两端,所以下面我也会分两部分来展开。
先看看map端的情况,如下图:
上图可能是某个map task的运行情况。拿它与官方图的左半边比较,会发现很多不一致。官方图没有清楚地说明partition, sort与combiner到底作用在哪个阶段。我画了这张图,希望让大家清晰地了解从map数据输入到map端所有数据准备好的全过程。
整个流程我分了四步。简单些可以这样说,每个map task都有一个内存缓冲区,存储着map的输出结果,当缓冲区快满的时候需要将缓冲区的数据以一个临时文件的方式存放到磁盘,当整个map task结束后再对磁盘中这个map task产生的所有临时文件做合并,生成最终的正式输出文件,然后等待reduce task来拉数据。
当然这里的每一步都可能包含着多个步骤与细节,下面我对细节来一一说明:
1. 在map task执行时,它的输入数据来源于HDFS的block,当然在MapReduce概念中,map task只读取split。Split与block的对应关系可能是多对一,默认是一对一。在WordCount例子里,假设map的输入数据都是像“aaa”这样的字符串。
2. 在经过mapper的运行后,我们得知mapper的输出是这样一个key/value对: key是“aaa”, value是数值1。因为当前map端只做加1的操作,在reduce task里才去合并结果集。前面我们知道这个job有3个reduce task,到底当前的“aaa”应该交由哪个reduce去做呢,是需要现在决定的。
MapReduce提供Partitioner接口,它的作用就是根据key或value及reduce的数量来决定当前的这对输出数据最终应该交由哪个reduce task处理。默认对key hash后再以reduce task数量取模。默认的取模方式只是为了平均reduce的处理能力,如果用户自己对Partitioner有需求,可以订制并设置到job上。
在我们的例子中,“aaa”经过Partitioner后返回0,也就是这对值应当交由第一个reducer来处理。接下来,需要将数据写入内存缓冲区中,缓冲区的作用是批量收集map结果,减少磁盘IO的影响。我们的key/value对以及Partition的结果都会被写入缓冲区。当然写入之前,key与value值都会被序列化成字节数组。
整个内存缓冲区就是一个字节数组,它的字节索引及key/value存储结构我没有研究过。如果有朋友对它有研究,那么请大致描述下它的细节吧。
3. 这个内存缓冲区是有大小限制的,默认是100MB。当map task的输出结果很多时,就可能会撑爆内存,所以需要在一定条件下将缓冲区中的数据临时写入磁盘,然后重新利用这块缓冲区。这个从内存往磁盘写数据的过程被称为Spill,中文可译为溢写,字面意思很直观。这个溢写是由单独线程来完成,不影响往缓冲区写map结果的线程。溢写线程启动时不应该阻止map的结果输出,所以整个缓冲区有个溢写的比例spill.percent。这个比例默认是0.8,也就是当缓冲区的数据已经达到阈值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执行溢写过程。Map task的输出结果还可以往剩下的20MB内存中写,互不影响。
当溢写线程启动后,需要对这80MB空间内的key做排序(Sort)。排序是MapReduce模型默认的行为,这里的排序也是对序列化的字节做的排序。
在这里我们可以想想,因为map task的输出是需要发送到不同的reduce端去,而内存缓冲区没有对将发送到相同reduce端的数据做合并,那么这种合并应该是体现是磁盘文件中的。从官方图上也可以看到写到磁盘中的溢写文件是对不同的reduce端的数值做过合并。所以溢写过程一个很重要的细节在于,如果有很多个key/value对需要发送到某个reduce端去,那么需要将这些key/value值拼接到一块,减少与partition相关的索引记录。
在针对每个reduce端而合并数据时,有些数据可能像这样:“aaa”/1, “aaa”/1。对于WordCount例子,就是简单地统计单词出现的次数,如果在同一个map task的结果中有很多个像“aaa”一样出现多次的key,我们就应该把它们的值合并到一块,这个过程叫reduce也叫combine。但MapReduce的术语中,reduce只指reduce端执行从多个map task取数据做计算的过程。除reduce外,非正式地合并数据只能算做combine了。其实大家知道的,MapReduce中将Combiner等同于Reducer。
如果client设置过Combiner,那么现在就是使用Combiner的时候了。将有相同key的key/value对的value加起来,减少溢写到磁盘的数据量。Combiner会优化MapReduce的中间结果,所以它在整个模型中会多次使用。那哪些场景才能使用Combiner呢?从这里分析,Combiner的输出是Reducer的输入,Combiner绝不能改变最终的计算结果。所以从我的想法来看,Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。Combiner的使用一定得慎重,如果用好,它对job执行效率有帮助,反之会影响reduce的最终结果。
4. 每次溢写会在磁盘上生成一个溢写文件,如果map的输出结果真的很大,有多次这样的溢写发生,磁盘上相应的就会有多个溢写文件存在。当map task真正完成时,内存缓冲区中的数据也全部溢写到磁盘中形成一个溢写文件。最终磁盘中会至少有一个这样的溢写文件存在(如果map的输出结果很少,当map执行完成时,只会产生一个溢写文件),因为最终的文件只有一个,所以需要将这些溢写文件归并到一起,这个过程就叫做Merge。Merge是怎样的?如前面的例子,“aaa”从某个map task读取过来时值是5,从另外一个map 读取时值是8,因为它们有相同的key,所以得merge成group。什么是group。对于“aaa”就是像这样的:{“aaa”, [5, 8, 2, …]},数组中的值就是从不同溢写文件中读取出来的,然后再把这些值加起来。请注意,因为merge是将多个溢写文件合并到一个文件,所以可能也有相同的key存在,在这个过程中如果client设置过Combiner,也会使用Combiner来合并相同的key。
至此,map端的所有工作都已结束,最终生成的这个文件也存放在TaskTracker够得着的某个本地目录内。每个reduce task不断地通过RPC从JobTracker那里获取map task是否完成的信息,如果reduce task得到通知,获知某台TaskTracker上的map task执行完成,Shuffle的后半段过程开始启动。
简单地说,reduce task在执行之前的工作就是不断地拉取当前job里每个map task的最终结果,然后对从不同地方拉取过来的数据不断地做merge,也最终形成一个文件作为reduce task的输入文件。见下图:
如map 端的细节图,Shuffle在reduce端的过程也能用图上标明的三点来概括。当前reduce copy数据的前提是它要从JobTracker获得有哪些map task已执行结束,这段过程不表,有兴趣的朋友可以关注下。Reducer真正运行之前,所有的时间都是在拉取数据,做merge,且不断重复地在做。如前面的方式一样,下面我也分段地描述reduce 端的Shuffle细节:
1. Copy过程,简单地拉取数据。Reduce进程启动一些数据copy线程(Fetcher),通过HTTP方式请求map task所在的TaskTracker获取map task的输出文件。因为map task早已结束,这些文件就归TaskTracker管理在本地磁盘中。
2. Merge阶段。这里的merge如map端的merge动作,只是数组中存放的是不同map端copy来的数值。Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活,它基于JVM的heap size设置,因为Shuffle阶段Reducer不运行,所以应该把绝大部分的内存都给Shuffle用。这里需要强调的是,merge有三种形式:1)内存到内存 2)内存到磁盘 3)磁盘到磁盘。默认情况下第一种形式不启用,让人比较困惑,是吧。当内存中的数据量到达一定阈值,就启动内存到磁盘的merge。与map 端类似,这也是溢写的过程,这个过程中如果你设置有Combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。第二种merge方式一直在运行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge方式生成最终的那个文件。
3. Reducer的输入文件。不断地merge后,最后会生成一个“最终文件”。为什么加引号?因为这个文件可能存在于磁盘上,也可能存在于内存中。对我们来说,当然希望它存放于内存中,直接作为Reducer的输入,但默认情况下,这个文件是存放于磁盘中的。至于怎样才能让这个文件出现在内存中,之后的性能优化篇我再说。当Reducer的输入文件已定,整个Shuffle才最终结束。然后就是Reducer执行,把结果放到HDFS上。
上面就是整个Shuffle的过程。细节很多,我很多都略过了,只试着把要点说明白。当然,我可能也有理解或表述上的很多问题,不吝指点。我希望不断地完善和修改这篇文章,能让它通俗、易懂,看完就能知道Shuffle的方方面面。至于具体的实现原理,各位有兴趣就自己去探索,如果不方便的话,留言给我,我再来研究并反馈。
hadoop中MapReduce多种join实现实例分析 - 蚂蚁 - 51CTO技术博客
对于RDBMS中的join操作大伙一定非常熟悉,写sql的时候要十分注意细节,稍有差池就会耗时巨久造成很大的性能瓶颈,而在Hadoop中使用MapReduce框架进行join的操作时同样耗时,但是由于hadoop的分布式设计理念的特殊性,因此对于这种join操作同样也具备了一定的特殊性。本文主要对MapReduce框架对表之间的join操作的几种实现方式进行详细分析,并且根据我在实际开发过程中遇到的实际例子来进行进一步的说明。
二、实现原理
1、在Reudce端进行连接。
在Reudce端进行连接是MapReduce框架进行表之间join操作最为常见的模式,其具体的实现原理如下:
Map端的主要工作:为来自不同表(文件)的key/value对打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。
reduce端的主要工作:在reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在map阶段已经打标志)分开,最后进行笛卡尔只就ok了。原理非常简单,下面来看一个实例:
(1)自定义一个value返回类型:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
package com.mr.reduceSizeJoin;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
public class CombineValues implements WritableComparable<CombineValues>{
//private static final Logger logger = LoggerFactory.getLogger(CombineValues.class);
private Text joinKey;//链接关键字
private Text flag;//文件来源标志
private Text secondPart;//除了链接键外的其他部分
public void setJoinKey(Text joinKey) {
this.joinKey = joinKey;
}
public void setFlag(Text flag) {
this.flag = flag;
}
public void setSecondPart(Text secondPart) {
this.secondPart = secondPart;
}
public Text getFlag() {
return flag;
}
public Text getSecondPart() {
return secondPart;
}
public Text getJoinKey() {
return joinKey;
}
public CombineValues() {
this.joinKey = new Text();
this.flag = new Text();
this.secondPart = new Text();
}
@Override
public void write(DataOutput out) throws IOException {
this.joinKey.write(out);
this.flag.write(out);
this.secondPart.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
this.joinKey.readFields(in);
this.flag.readFields(in);
this.secondPart.readFields(in);
}
@Override
public int compareTo(CombineValues o) {
return this.joinKey.compareTo(o.getJoinKey());
}
@Override
public String toString() {
// TODO Auto-generated method stub
return "[flag="+this.flag.toString()+",joinKey="+this.joinKey.toString()+",secondPart="+this.secondPart.toString()+"]";
}
}
|
(2)map、reduce主体代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
|
package com.mr.reduceSizeJoin;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author zengzhaozheng
* 用途说明:
* reudce side join中的left outer join
* 左连接,两个文件分别代表2个表,连接字段table1的id字段和table2的cityID字段
* table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show)
* tb_dim_city.dat文件内容,分隔符为"|":
* id name orderid city_code is_show
* 0 其他 9999 9999 0
* 1 长春 1 901 1
* 2 吉林 2 902 1
* 3 四平 3 903 1
* 4 松原 4 904 1
* 5 通化 5 905 1
* 6 辽源 6 906 1
* 7 白城 7 907 1
* 8 白山 8 908 1
* 9 延吉 9 909 1
* -------------------------风骚的分割线-------------------------------
* table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int)
* tb_user_profiles.dat文件内容,分隔符为"|":
* userID network flow cityID
* 1 2G 123 1
* 2 3G 333 2
* 3 3G 555 1
* 4 2G 777 3
* 5 3G 666 4
*
* -------------------------风骚的分割线-------------------------------
* 结果:
* 1 长春 1 901 1 1 2G 123
* 1 长春 1 901 1 3 3G 555
* 2 吉林 2 902 1 2 3G 333
* 3 四平 3 903 1 4 2G 777
* 4 松原 4 904 1 5 3G 666
*/
public class ReduceSideJoin_LeftOuterJoin extends Configured implements Tool{
private static final Logger logger = LoggerFactory.getLogger(ReduceSideJoin_LeftOuterJoin.class);
public static class LeftOutJoinMapper extends Mapper<Object, Text, Text, CombineValues> {
private CombineValues combineValues = new CombineValues();
private Text flag = new Text();
private Text joinKey = new Text();
private Text secondPart = new Text();
@Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
//获得文件输入路径
String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();
//数据来自tb_dim_city.dat文件,标志即为"0"
if(pathName.endsWith("tb_dim_city.dat")){
String[] valueItems = value.toString().split("\\|");
//过滤格式错误的记录
if(valueItems.length != 5){
return;
}
flag.set("0");
joinKey.set(valueItems[0]);
secondPart.set(valueItems[1]+"\t"+valueItems[2]+"\t"+valueItems[3]+"\t"+valueItems[4]);
combineValues.setFlag(flag);
combineValues.setJoinKey(joinKey);
combineValues.setSecondPart(secondPart);
context.write(combineValues.getJoinKey(), combineValues);
}//数据来自于tb_user_profiles.dat,标志即为"1"
else if(pathName.endsWith("tb_user_profiles.dat")){
String[] valueItems = value.toString().split("\\|");
//过滤格式错误的记录
if(valueItems.length != 4){
return;
}
flag.set("1");
joinKey.set(valueItems[3]);
secondPart.set(valueItems[0]+"\t"+valueItems[1]+"\t"+valueItems[2]);
combineValues.setFlag(flag);
combineValues.setJoinKey(joinKey);
combineValues.setSecondPart(secondPart);
context.write(combineValues.getJoinKey(), combineValues);
}
}
}
public static class LeftOutJoinReducer extends Reducer<Text, CombineValues, Text, Text> {
//存储一个分组中的左表信息
private ArrayList<Text> leftTable = new ArrayList<Text>();
//存储一个分组中的右表信息
private ArrayList<Text> rightTable = new ArrayList<Text>();
private Text secondPar = null;
private Text output = new Text();
/**
* 一个分组调用一次reduce函数
*/
@Override
protected void reduce(Text key, Iterable<CombineValues> value, Context context)
throws IOException, InterruptedException {
leftTable.clear();
rightTable.clear();
/**
* 将分组中的元素按照文件分别进行存放
* 这种方法要注意的问题:
* 如果一个分组内的元素太多的话,可能会导致在reduce阶段出现OOM,
* 在处理分布式问题之前最好先了解数据的分布情况,根据不同的分布采取最
* 适当的处理方法,这样可以有效的防止导致OOM和数据过度倾斜问题。
*/
for(CombineValues cv : value){
secondPar = new Text(cv.getSecondPart().toString());
//左表tb_dim_city
if("0".equals(cv.getFlag().toString().trim())){
leftTable.add(secondPar);
}
//右表tb_user_profiles
else if("1".equals(cv.getFlag().toString().trim())){
rightTable.add(secondPar);
}
}
logger.info("tb_dim_city:"+leftTable.toString());
logger.info("tb_user_profiles:"+rightTable.toString());
for(Text leftPart : leftTable){
for(Text rightPart : rightTable){
output.set(leftPart+ "\t" + rightPart);
context.write(key, output);
}
}
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf=getConf(); //获得配置文件对象
Job job=new Job(conf,"LeftOutJoinMR");
job.setJarByClass(ReduceSideJoin_LeftOuterJoin.class);
FileInputFormat.addInputPath(job, new Path(args[0])); //设置map输入文件路径
FileOutputFormat.setOutputPath(job, new Path(args[1])); //设置reduce输出文件路径
job.setMapperClass(LeftOutJoinMapper.class);
job.setReducerClass(LeftOutJoinReducer.class);
job.setInputFormatClass(TextInputFormat.class); //设置文件输入格式
job.setOutputFormatClass(TextOutputFormat.class);//使用默认的output格式
//设置map的输出key和value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(CombineValues.class);
//设置reduce的输出key和value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.waitForCompletion(true);
return job.isSuccessful()?0:1;
}
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
try {
int returnCode = ToolRunner.run(new ReduceSideJoin_LeftOuterJoin(),args);
System.exit(returnCode);
} catch (Exception e) {
// TODO Auto-generated catch block
logger.error(e.getMessage());
}
}
}
|
其中具体的分析以及数据的输出输入请看代码中的注释已经写得比较清楚了,这里主要分析一下reduce join的一些不足。之所以会存在reduce join这种方式,我们可以很明显的看出原:因为整体数据被分割了,每个map task只处理一部分数据而不能够获取到所有需要的join字段,因此我们需要在讲join key作为reduce端的分组将所有join key相同的记录集中起来进行处理,所以reduce join这种方式就出现了。这种方式的缺点很明显就是会造成map和reduce端也就是shuffle阶段出现大量的数据传输,效率很低。
2、在Map端进行连接。
使用场景:一张表十分小、一张表很大。
用法:在提交作业的时候先将小表文件放到该作业的DistributedCache中,然后从DistributeCache中取出该小表进行join key / value解释分割放到内存中(可以放大Hash Map等等容器中)。然后扫描大表,看大表中的每条记录的join key /value值是否能够在内存中找到相同join key的记录,如果有则直接输出结果。
直接上代码,比较简单:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
|
package com.mr.mapSideJoin;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author zengzhaozheng
*
* 用途说明:
* Map side join中的left outer join
* 左连接,两个文件分别代表2个表,连接字段table1的id字段和table2的cityID字段
* table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show),
* 假设tb_dim_city文件记录数很少,tb_dim_city.dat文件内容,分隔符为"|":
* id name orderid city_code is_show
* 0 其他 9999 9999 0
* 1 长春 1 901 1
* 2 吉林 2 902 1
* 3 四平 3 903 1
* 4 松原 4 904 1
* 5 通化 5 905 1
* 6 辽源 6 906 1
* 7 白城 7 907 1
* 8 白山 8 908 1
* 9 延吉 9 909 1
* -------------------------风骚的分割线-------------------------------
* table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int)
* tb_user_profiles.dat文件内容,分隔符为"|":
* userID network flow cityID
* 1 2G 123 1
* 2 3G 333 2
* 3 3G 555 1
* 4 2G 777 3
* 5 3G 666 4
* -------------------------风骚的分割线-------------------------------
* 结果:
* 1 长春 1 901 1 1 2G 123
* 1 长春 1 901 1 3 3G 555
* 2 吉林 2 902 1 2 3G 333
* 3 四平 3 903 1 4 2G 777
* 4 松原 4 904 1 5 3G 666
*/
public class MapSideJoinMain extends Configured implements Tool{
private static final Logger logger = LoggerFactory.getLogger(MapSideJoinMain.class);
public static class LeftOutJoinMapper extends Mapper<Object, Text, Text, Text> {
private HashMap<String,String> city_info = new HashMap<String, String>();
private Text outPutKey = new Text();
private Text outPutValue = new Text();
private String mapInputStr = null;
private String mapInputSpit[] = null;
private String city_secondPart = null;
/**
* 此方法在每个task开始之前执行,这里主要用作从DistributedCache
* 中取到tb_dim_city文件,并将里边记录取出放到内存中。
*/
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
BufferedReader br = null;
//获得当前作业的DistributedCache相关文件
Path[] distributePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration());
String cityInfo = null;
for(Path p : distributePaths){
if(p.toString().endsWith("tb_dim_city.dat")){
//读缓存文件,并放到mem中
br = new BufferedReader(new FileReader(p.toString()));
while(null!=(cityInfo=br.readLine())){
String[] cityPart = cityInfo.split("\\|",5);
if(cityPart.length ==5){
city_info.put(cityPart[0], cityPart[1]+"\t"+cityPart[2]+"\t"+cityPart[3]+"\t"+cityPart[4]);
}
}
}
}
}
/**
* Map端的实现相当简单,直接判断tb_user_profiles.dat中的
* cityID是否存在我的map中就ok了,这样就可以实现Map Join了
*/
@Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
//排掉空行
if(value == null || value.toString().equals("")){
return;
}
mapInputStr = value.toString();
mapInputSpit = mapInputStr.split("\\|",4);
//过滤非法记录
if(mapInputSpit.length != 4){
return;
}
//判断链接字段是否在map中存在
city_secondPart = city_info.get(mapInputSpit[3]);
if(city_secondPart != null){
this.outPutKey.set(mapInputSpit[3]);
this.outPutValue.set(city_secondPart+"\t"+mapInputSpit[0]+"\t"+mapInputSpit[1]+"\t"+mapInputSpit[2]);
context.write(outPutKey, outPutValue);
}
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf=getConf(); //获得配置文件对象
DistributedCache.addCacheFile(new Path(args[1]).toUri(), conf);//为该job添加缓存文件
Job job=new Job(conf,"MapJoinMR");
job.setNumReduceTasks(0);
FileInputFormat.addInputPath(job, new Path(args[0])); //设置map输入文件路径
FileOutputFormat.setOutputPath(job, new Path(args[2])); //设置reduce输出文件路径
job.setJarByClass(MapSideJoinMain.class);
job.setMapperClass(LeftOutJoinMapper.class);
job.setInputFormatClass(TextInputFormat.class); //设置文件输入格式
job.setOutputFormatClass(TextOutputFormat.class);//使用默认的output格式
//设置map的输出key和value类型
job.setMapOutputKeyClass(Text.class);
//设置reduce的输出key和value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.waitForCompletion(true);
return job.isSuccessful()?0:1;
}
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
try {
int returnCode = ToolRunner.run(new MapSideJoinMain(),args);
System.exit(returnCode);
} catch (Exception e) {
// TODO Auto-generated catch block
logger.error(e.getMessage());
}
}
}
|
这里说说DistributedCache。DistributedCache是分布式缓存的一种实现,它在整个MapReduce框架中起着相当重要的作用,他可以支撑我们写一些相当复杂高效的分布式程序。说回到这里,JobTracker在作业启动之前会获取到DistributedCache的资源uri列表,并将对应的文件分发到各个涉及到该作业的任务的TaskTracker上。另外,关于DistributedCache和作业的关系,比如权限、存储路径区分、public和private等属性,接下来有用再整理研究一下写一篇blog,这里就不详细说了。
另外还有一种比较变态的Map Join方式,就是结合HBase来做Map Join操作。这种方式完全可以突破内存的控制,使你毫无忌惮的使用Map Join,而且效率也非常不错。
3、SemiJoin。
SemiJoin就是所谓的半连接,其实仔细一看就是reduce join的一个变种,就是在map端过滤掉一些数据,在网络中只传输参与连接的数据不参与连接的数据不必在网络中进行传输,从而减少了shuffle的网络传输量,使整体效率得到提高,其他思想和reduce join是一模一样的。说得更加接地气一点就是将小表中参与join的key单独抽出来通过DistributedCach分发到相关节点,然后将其取出放到内存中(可以放到HashSet中),在map阶段扫描连接表,将join key不在内存HashSet中的记录过滤掉,让那些参与join的记录通过shuffle传输到reduce端进行join操作,其他的和reduce join都是一样的。看代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
|
package com.mr.SemiJoin;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author zengzhaozheng
*
* 用途说明:
* reudce side join中的left outer join
* 左连接,两个文件分别代表2个表,连接字段table1的id字段和table2的cityID字段
* table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show)
* tb_dim_city.dat文件内容,分隔符为"|":
* id name orderid city_code is_show
* 0 其他 9999 9999 0
* 1 长春 1 901 1
* 2 吉林 2 902 1
* 3 四平 3 903 1
* 4 松原 4 904 1
* 5 通化 5 905 1
* 6 辽源 6 906 1
* 7 白城 7 907 1
* 8 白山 8 908 1
* 9 延吉 9 909 1
* -------------------------风骚的分割线-------------------------------
* table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int)
* tb_user_profiles.dat文件内容,分隔符为"|":
* userID network flow cityID
* 1 2G 123 1
* 2 3G 333 2
* 3 3G 555 1
* 4 2G 777 3
* 5 3G 666 4
* -------------------------风骚的分割线-------------------------------
* joinKey.dat内容:
* city_code
* 1
* 2
* 3
* 4
* -------------------------风骚的分割线-------------------------------
* 结果:
* 1 长春 1 901 1 1 2G 123
* 1 长春 1 901 1 3 3G 555
* 2 吉林 2 902 1 2 3G 333
* 3 四平 3 903 1 4 2G 777
* 4 松原 4 904 1 5 3G 666
*/
public class SemiJoin extends Configured implements Tool{
private static final Logger logger = LoggerFactory.getLogger(SemiJoin.class);
public static class SemiJoinMapper extends Mapper<Object, Text, Text, CombineValues> {
private CombineValues combineValues = new CombineValues();
private HashSet<String> joinKeySet = new HashSet<String>();
private Text flag = new Text();
private Text joinKey = new Text();
private Text secondPart = new Text();
/**
* 将参加join的key从DistributedCache取出放到内存中,以便在map端将要参加join的key过滤出来。b
*/
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
BufferedReader br = null;
//获得当前作业的DistributedCache相关文件
Path[] distributePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration());
String joinKeyStr = null;
for(Path p : distributePaths){
if(p.toString().endsWith("joinKey.dat")){
//读缓存文件,并放到mem中
br = new BufferedReader(new FileReader(p.toString()));
while(null!=(joinKeyStr=br.readLine())){
joinKeySet.add(joinKeyStr);
}
}
}
}
@Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
//获得文件输入路径
String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();
//数据来自tb_dim_city.dat文件,标志即为"0"
if(pathName.endsWith("tb_dim_city.dat")){
String[] valueItems = value.toString().split("\\|");
//过滤格式错误的记录
if(valueItems.length != 5){
return;
}
//过滤掉不需要参加join的记录
if(joinKeySet.contains(valueItems[0])){
flag.set("0");
joinKey.set(valueItems[0]);
secondPart.set(valueItems[1]+"\t"+valueItems[2]+"\t"+valueItems[3]+"\t"+valueItems[4]);
combineValues.setFlag(flag);
combineValues.setJoinKey(joinKey);
combineValues.setSecondPart(secondPart);
context.write(combineValues.getJoinKey(), combineValues);
}else{
return ;
}
}//数据来自于tb_user_profiles.dat,标志即为"1"
else if(pathName.endsWith("tb_user_profiles.dat")){
String[] valueItems = value.toString().split("\\|");
//过滤格式错误的记录
if(valueItems.length != 4){
return;
}
//过滤掉不需要参加join的记录
if(joinKeySet.contains(valueItems[3])){
flag.set("1");
joinKey.set(valueItems[3]);
secondPart.set(valueItems[0]+"\t"+valueItems[1]+"\t"+valueItems[2]);
combineValues.setFlag(flag);
combineValues.setJoinKey(joinKey);
combineValues.setSecondPart(secondPart);
context.write(combineValues.getJoinKey(), combineValues);
}else{
return ;
}
}
}
}
public static class SemiJoinReducer extends Reducer<Text, CombineValues, Text, Text> {
//存储一个分组中的左表信息
private ArrayList<Text> leftTable = new ArrayList<Text>();
//存储一个分组中的右表信息
private ArrayList<Text> rightTable = new ArrayList<Text>();
private Text secondPar = null;
private Text output = new Text();
/**
* 一个分组调用一次reduce函数
*/
@Override
protected void reduce(Text key, Iterable<CombineValues> value, Context context)
throws IOException, InterruptedException {
leftTable.clear();
rightTable.clear();
/**
* 将分组中的元素按照文件分别进行存放
* 这种方法要注意的问题:
* 如果一个分组内的元素太多的话,可能会导致在reduce阶段出现OOM,
* 在处理分布式问题之前最好先了解数据的分布情况,根据不同的分布采取最
* 适当的处理方法,这样可以有效的防止导致OOM和数据过度倾斜问题。
*/
for(CombineValues cv : value){
secondPar = new Text(cv.getSecondPart().toString());
//左表tb_dim_city
if("0".equals(cv.getFlag().toString().trim())){
leftTable.add(secondPar);
}
//右表tb_user_profiles
else if("1".equals(cv.getFlag().toString().trim())){
rightTable.add(secondPar);
}
}
logger.info("tb_dim_city:"+leftTable.toString());
logger.info("tb_user_profiles:"+rightTable.toString());
for(Text leftPart : leftTable){
for(Text rightPart : rightTable){
output.set(leftPart+ "\t" + rightPart);
context.write(key, output);
}
}
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf=getConf(); //获得配置文件对象
DistributedCache.addCacheFile(new Path(args[2]).toUri(), conf);
Job job=new Job(conf,"LeftOutJoinMR");
job.setJarByClass(SemiJoin.class);
FileInputFormat.addInputPath(job, new Path(args[0])); //设置map输入文件路径
FileOutputFormat.setOutputPath(job, new Path(args[1])); //设置reduce输出文件路径
job.setMapperClass(SemiJoinMapper.class);
job.setReducerClass(SemiJoinReducer.class);
job.setInputFormatClass(TextInputFormat.class); //设置文件输入格式
job.setOutputFormatClass(TextOutputFormat.class);//使用默认的output格式
//设置map的输出key和value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(CombineValues.class);
//设置reduce的输出key和value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.waitForCompletion(true);
return job.isSuccessful()?0:1;
}
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
try {
int returnCode = ToolRunner.run(new SemiJoin(),args);
System.exit(returnCode);
} catch (Exception e) {
logger.error(e.getMessage());
}
}
}
|
这里还说说SemiJoin也是有一定的适用范围的,其抽取出来进行join的key是要放到内存中的,所以不能够太大,容易在Map端造成OOM。
三、总结
blog介绍了三种join方式。这三种join方式适用于不同的场景,其处理效率上的相差还是蛮大的,其中主要导致因素是网络传输。Map join效率最高,其次是SemiJoin,最低的是reduce join。另外,写分布式大数据处理程序的时最好要对整体要处理的数据分布情况作一个了解,这可以提高我们代码的效率,使数据的倾斜度降到最低,使我们的代码倾向性更好。
参考文献: