Hadoop MapReduce编程入门案例

标签: hadoop mapreduce 编程 | 发表时间:2014-08-24 17:00 | 作者:zpcandzhj
出处:http://blog.csdn.net

Hadoop入门例程简析中

(下面的程序下载地址: http://download.csdn.net/detail/zpcandzhj/7810829)

一、一些说明

(1)Hadoop新旧API的区别

新的API倾向于使用虚类(抽象类),而不是接口,因为这更容易扩展。
例如,可以无需修改类的实现而在虚类中添加一个方法(即用默认的实现)。
在新的API中,mapper和reducer现在都是虚类。
新的API 放在org.apache.hadoop.mapreduce 包(和子包)中。之前版本的API 依旧放在org.apache.hadoop.mapred中。
新的API充分使用上下文对象(Context),使用户代码能与MapReduce系统通信。例如,MapContext 基本具备了JobConf、OutputCollector和Reporter的功能。
新的API 同时支持"推"(push)和"拉"(pull)式的迭代。
这两类API,均可以将键/值对记录推给mapper,但除此之外,新的API 也允许把记录从map()方法中拉出。
对reducer来说是一样的。"拉"式处理数据的好处是可以实现数据的批量处理,而非逐条记录地处理。
新增的API实现了配置的统一。旧API 通过一个特殊的JobConf 对象配置作业,该对象是Hadoop配置对象的一个扩展。
在新的API 中,我们丢弃这种区分,所有作业的配置均通过Configuration 来完成。
新API中作业控制由Job类实现,而非JobClient类,新API中删除了JobClient类。
输出文件的命名方式稍有不同。map的输出文件名为part-m-nnnnn,而reduce的输出为part-r-nnnnn(其中nnnnn表示分块序号,为整数,且从0开始算。

(2)设置Hadoop回收站

有的时候我们会不小心误删除HDFS中的文件,Hadoop提供了回收站机制,但是默认是不开启的。设置步骤如下:
(a)修改conf/core-site.xml,增加
<property> 
<name>fs.trash.interval</name> 
<value>1440</value> 
<description>Number of minutes between trash checkpoints. 
If zero, the trash feature is disabled. 
</description> 
</property>

默认是0.单位分钟。这里我设置的是1天(60*24) 
删除数据rm后,会将数据move到当前文件夹下的.Trash目录(一般在HDFS的/user/root目录下)

(b)测试 
1)新建目录input
hadoop/bin/hadoop fs -mkdir input
2)上传文件
root@master:/data/soft# hadoop/bin/hadoop fs -copyFromLocal /data/soft/file0* input
3)删除目录input
[root@master data]# hadoop fs -rmr input 
Moved to trash: hdfs://master:9000/user/root/input
4)参看当前目录(回收目录在HDFS的/user/root目录下)
[root@master data]# hadoop fs -ls 
Found 2 items 
drwxr-xr-x - root supergroup 0 2014-08-12 13:21 /user/root/.Trash
发现input删除,多了一个目录.Trash
5)恢复刚刚删除的目录(注意设置源 和 目的地址)
[root@master data]# hadoop fs -mv /user/root/.Trash/Current/user/root/input  /user/root/input
6)检查恢复的数据
[root@master data]# hadoop fs -ls input 
Found 2 items 
-rw-r--r-- 3 root supergroup 22 2014-08-12 13:21 /user/root/input/file01 
-rw-r--r-- 3 root supergroup 28 2014-08-12 13:21 /user/root/input/file02
7)删除.Trash目录(清理垃圾)
[root@master data]# hadoop fs -rmr .Trash 
Deleted hdfs://master:9000/user/root/.Trash

(3)在Eclipse中右击点Run on Hadoop

在Eclipse中右击点Run on Hadoop运行MR作业但在web页面(http://hadoop:50070和http://hadoop:50030)看不到作业运行记录

(4)为了简化命令行方式运行作业,Hadoop自带了一些辅助类。

利用ToolRunner等辅助类
例如如下的程序
public class WordCount {
    // 略...
    public static void main(String[] args) throws Exception {
//新API就是通过Configuration对象进行作业的配置        
Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, 
                                            args).getRemainingArgs();
        // 略...
        Job job = new Job(conf, "word count");
        // 略...
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
这段程序中使用到了GenericOptionsParser这个类,它的作用是将命令行中参数自动设置到变量conf中。
GenericOptionsParser是一个类,用来解释常用的Hadoop命令行选项,并根据需要,为Configuration对象设置相应的取值。
通常不直接使用GenericOptionsParser,更方便的方式是:实现Tool接口,通过ToolRunner来运行应用程序,ToolRunner内部调用GenericOptionsParser
修改后的代码变成了这样:
</pre><h3>(5)添加第三方jar包</h3><p>在用命令行运行MR作业时,如果出现ClassNotFoundException可能是因为缺少第三方jar包,可以把第三方jar包copy到hadoop安装目录下放置jar的那个目录。</p><h2>二、Mapreduce例程</h2><h3>1、WordCount</h3>程序:新API版<pre name="code" class="java">package inAction;
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

//基于新API的WordCount例子(用extends Configured implements Tool的方式便于管理作业)
public class MyWordCount extends Configured implements Tool {

public static class MyMapper extends
Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();


@Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {


StringTokenizer st = new StringTokenizer(value.toString());
while (st.hasMoreTokens()) {
word.set(st.nextToken());
context.write(word, one);
}
}
}

public static class MyReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable value : values) {
count += value.get();
}
result.set(count);
context.write(key, result);
}
}

@Override
public int run(String[] args) throws Exception {
Configuration conf=getConf();//新APIConfiguration对象负责作业配置
//ToolRunner工具会自动调用隐藏的GenericOptionsParser将命令行参数设置到conf中
Job job=new Job(conf,"MyWordCount");
job.setJarByClass(MyWordCount.class);

job.setMapperClass(MyMapper.class);
job.setCombinerClass(MyReducer.class);
job.setReducerClass(MyReducer.class);

job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

//输入输出参数可以在命令行运行时设置,也可以在程序中直接设置,右击run on hadoop
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

System.exit(job.waitForCompletion(true)?0:1);
return 0;
}

public static void main(String[] args) throws Exception {
int res=ToolRunner.run(new Configuration(), new MyWordCount(), args);
System.exit(res);
}
}

运行:hadoop jar /root/wordCount.jar /usr/input  /usr/output
(jar包一般放在本地磁盘上,而输入输出文件放在HDFS上。也可以在程序中直接给出输入输出路径,数据文件也可以放在本地硬盘。
本例中的数据文件为word1、word2,文件可以没有后缀名,全部放在HDFS中的/usr/input目录下)
程序:旧API版
package inAction;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;

//基于旧API的WordCount实现
public class WordCount2 {


public static class MyMapper extends MapReduceBase implements
Mapper<LongWritable, Text, Text, IntWritable> {
private IntWritable one = new IntWritable(1);
private Text word = new Text();


@Override
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
StringTokenizer st = new StringTokenizer(value.toString());
while (st.hasMoreTokens()) {
word.set(st.nextToken());
output.collect(word, one);
}
}
}

public static class MyReduce extends MapReduceBase implements
Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable res = new IntWritable();
@Override
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
res.set(sum);
output.collect(key, res);
}
}

public static void main(String[] args) throws IOException {
//旧API使用JobConf配置作业
JobConf conf=new JobConf(WordCount2.class);
conf.setJobName("OldAPIWordCount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);

conf.setMapperClass(MyMapper.class);
conf.setCombinerClass(MyReduce.class);
conf.setReducerClass(MyReduce.class);

conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);

FileInputFormat.setInputPaths(conf, new Path("hdfs://hadoop:9000/usr/wordsIn"));
FileOutputFormat.setOutputPath(conf, new Path("hdfs://hadoop:9000/usr/wordsOut2"));

JobClient.runJob(conf);//新API中JobClient已删除

}

}

关于wordCount的详细解释可以参考这篇文章:http://www.cnblogs.com/xia520pi/archive/2012/05/16/2504205.html

2、MaxTemperature《Hadoop权威指南》

程序:MaxTemperature.java
package inAction;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


//找出每一年的最高气温(本例只用了1901年和1902年的数据,网上下的)
public class MaxTemperature extends Configured implements Tool{


//Mapper的功能是提取每一行原始数据中的年份和温度值
public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
private static final int MISSING=9999;//如果一行的气温值是9999即表明该年气温缺失


@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {

String line=value.toString();
String year=line.substring(15,19);//年份是15-19个字符
int temperature;
//气温有正负要区别对待,Integer.parseInt不能处理负数
if(line.charAt(87)=='+'){
temperature=Integer.parseInt(line.substring(88,92));
}else{
temperature=Integer.parseInt(line.substring(87,92));
}
String quantity=line.substring(92,93);//quantity.matches("[01459]")表明数量只有是01459时才是有效气温值
//只有有效气温值才输出
if(quantity.matches("[01459]")&&temperature!=MISSING){
context.write(new Text(year), new IntWritable(temperature));
}
}
} 

public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable>{


@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException {
int maxValue=Integer.MIN_VALUE;
for(IntWritable temp:values){
maxValue=Math.max(temp.get(), maxValue);
}
context.write(key, new IntWritable(maxValue));
}

}



@Override
public int run(String[] args) throws Exception {
Configuration conf=getConf();
Job job=new Job(conf,"MaxTemperature");
job.setJarByClass(MaxTemperature.class);

job.setMapperClass(MyMapper.class);
job.setCombinerClass(MyReducer.class);//设置Combiner减少传递给Reducer的数据量,提高性能
job.setReducerClass(MyReducer.class);

job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

//输入输出参数可以在命令行运行时设置,也可以在程序中直接设置,右击run on hadoop
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

System.exit(job.waitForCompletion(true)?0:1);
return 0;
}


public static void main(String[] args) throws Exception {
int res=ToolRunner.run(new Configuration(), new MaxTemperature(), args);
System.exit(res);
}
}
关于MaxTemperature的详细解释可以参考这篇文章:http://www.linuxidc.com/Linux/2012-05/61196.htm

3、专利数据集《Hadoop in action》

对于一个专利数据集统计每个专利及引用它的专利,即输出形如:专利号1  引用专利1的专利号,引用专利1的专利号...(本例来自《Hadoop in action》)
程序:MyJob2.java

package inAction;
import java.io.IOException;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.*;


public class MyJob2 extends Configured implements Tool {
public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] citation = value.toString().split(",");
context.write(new Text(citation[1]), new Text(citation[0]));
}
}


public static class Reduce extends Reducer<Text, Text, Text, Text> {


protected void reduce(Text key, Iterable<Text> value, Context context)
throws IOException, InterruptedException {
String csv = "";
for (Text val : value) {
if (csv.length() > 0)
csv += ",";
csv += val.toString();
}


context.write(key, new Text(csv));
}
}


@Override
public int run(String[] arg0) throws Exception {
Configuration conf=getConf();

Job job=new Job(conf,"MyJob2");
job.setJarByClass(MyJob2.class);

Path in=new  Path("/root/cite75_99.txt");
Path out=new  Path("/root/inAction3");
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);

job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

System.exit(job.waitForCompletion(true)?0:1);
return 0;
}


public static void main(String[] args) throws Exception {
int res=ToolRunner.run(new Configuration(), new MyJob2(), args);
System.exit(res);
}
}

注:专利数据集apat63_99.txt和cite75_99.txt可以从网上下载。


下面的几个案例请参考博客园虾皮工作室!程序下载地址: http://download.csdn.net/detail/zpcandzhj/7810829

4、数据去重

对数据文件中的数据进行去重。数据文件中的每行都是一个数据。
样例输入如下所示:
1)file1:
2012-3-1 a
2012-3-2 b
2012-3-3 c
2012-3-4 d
2012-3-5 a
2012-3-6 b
2012-3-7 c
2012-3-3 c
2)file2:
2012-3-1 b
2012-3-2 a
2012-3-3 b
2012-3-4 d
2012-3-5 a
2012-3-6 c
2012-3-7 d
2012-3-3 c
样例输出如下所示:
2012-3-1 a
2012-3-1 b
2012-3-2 a
2012-3-2 b
2012-3-3 b
2012-3-3 c
2012-3-4 d
2012-3-5 a
2012-3-6 b
2012-3-6 c
2012-3-7 c
2012-3-7 d

程序:Dedup.java


5、数据排序

对输入文件中数据进行排序。输入文件中的每行内容均为一个数字,即一个数据。
要求在输出中每行有两个间隔的数字,其中,第一个代表原始数据在原始数据集中的位次,第二个代表原始数据。

样例输入:
1)file1:
 
2
32
654
32
15
756
65223
2)file2:
5956
22
650
92
3)file3:
26
54
6
样例输出:
1    2
2    6
3    15
4    22
5    26
6    32
7    32
8    54
9    92
10    650
11    654
12    756
13    5956
14    65223
程序:Sort.java


6、平均成绩

对输入文件中数据进行就算学生平均成绩。
输入文件中的每行内容均为一个学生的姓名和他相应的成绩,如果有多门学科,则每门学科为一个文件。
要求在输出中每行有两个间隔的数据,其中,第一个代表学生的姓名,第二个代表其平均成绩。
样本输入:
1)math:
张三    88
李四    99
王五    66
赵六    77
2)china:
张三    78
李四    89
王五    96
赵六    67
3)english:
张三    80
李四    82
王五    84
赵六    86


样本输出:
张三    82
李四    90
王五    82
赵六    76

程序:AverageScore.java

7、单表关联

实例中给出child-parent(孩子——父母)表,要求输出grandchild-grandparent(孙子——爷奶)表。
样例输入如下所示。
file:
child        parent
Tom        Lucy
Tom        Jack
Jone        Lucy
Jone        Jack
Lucy        Mary
Lucy        Ben
Jack        Alice
Jack        Jesse
Terry        Alice
Terry        Jesse
Philip        Terry
Philip        Alma
Mark        Terry
Mark        Alma

样例输出如下所示
file:
grandchild        grandparent
Tom              Alice
Tom              Jesse
Jone              Alice
Jone              Jesse
Tom              Mary
Tom              Ben
Jone              Mary
Jone              Ben
Philip              Alice
Philip              Jesse
Mark              Alice
Mark              Jesse

程序:STJoin.java

8、多表关联

输入是两个文件,一个代表工厂表,包含工厂名列和地址编号列;
另一个代表地址表,包含地址名列和地址编号列。
要求从输入数据中找出工厂名和地址名的对应关系,输出"工厂名——地址名"表。
样例输入如下所示。
1)factory:
factoryname                  addressed
Beijing Red Star                  1
Shenzhen Thunder                3
Guangzhou Honda                 2
Beijing Rising                     1
Guangzhou Development Bank          2
Tencent                        3
Bank of Beijing                   1
2)address:
addressID    addressname
1            Beijing
2            Guangzhou
3            Shenzhen
4            Xian
样例输出如下所示
factoryname                      addressname
Bank of Beijing                   Beijing
Beijing Red Star                  Beijing
Beijing Rising                    Beijing
Guangzhou Development Bank        Guangzhou
Guangzhou Honda                  Guangzhou
Shenzhen Thunder                  Shenzhen
Tencent                        Shenzhen
多表关联和单表关联相似,都类似于数据库中的自然连接。相比单表关联,多表关联的左右表和连接列更加清楚。
所以可以采用和单表关联的相同的处理方式,map识别出输入的行属于哪个表之后,对其进行分割,将连接的列值保存在key中,
另一列和左右表标识保存在value中,然后输出。reduce拿到连接结果之后,解析value内容,根据标志将左右表内容分开存放,然后求笛卡尔积,最后直接输出。

程序:MTjoin.java

9、建立倒排索引

 "倒排索引"是文档检索系统中最常用的数据结构,被广泛地应用于全文搜索引擎。
它主要是用来存储某个单词(或词组)在一个文档或一组文档中的存储位置的映射,即提供了一种根据内容来查找文档的方式。
由于不是根据文档来确定文档所包含的内容,而是进行相反的操作,因而称为倒排索引(Inverted Index)。
样例输入如下所示。
1)file1:
MapReduce is simple
2)file2:
MapReduce is powerful is simple
3)file3:
Hello MapReduce bye MapReduce
样例输出如下所示(每个单词在每个文件中的权重也计算出来了)
MapReduce         file1.txt:1;file2.txt:1;file3.txt:2;
is            file1.txt:1;file2.txt:2;
simple           file1.txt:1;file2.txt:1;
powerful        file2.txt:1;
Hello           file3.txt:1;
bye             file3.txt:1;
程序:InvertedIndex.java


鸟鹏学习笔记,转载注明出处!
http://blog.csdn.net/hellozpc
















































































































作者:zpcandzhj 发表于2014-8-24 17:00:49 原文链接
阅读:113 评论:0 查看评论

相关 [hadoop mapreduce 编程] 推荐:

Hadoop MapReduce高级编程

- - 互联网 - ITeye博客
•combine函数把一个map函数产生的对(多个key, value)合并成一个新的. 将新的作为输入到reduce函数中,其格式与reduce函数相同. •这样可以有效的较少中间结果,减少网络传输负荷. •什么情况下可以使用Combiner.

Hadoop MapReduce编程入门案例

- - CSDN博客云计算推荐文章
Hadoop入门例程简析中. (下面的程序下载地址: http://download.csdn.net/detail/zpcandzhj/7810829). (1)Hadoop新旧API的区别. 新的API倾向于使用虚类(抽象类),而不是接口,因为这更容易扩展. 例如,可以无需修改类的实现而在虚类中添加一个方法(即用默认的实现).

Hadoop MapReduce技巧

- - 简单文本
我在使用Hadoop编写MapReduce程序时,遇到了一些问题,通过在Google上查询资料,并结合自己对Hadoop的理解,逐一解决了这些问题. Hadoop对MapReduce中Key与Value的类型是有要求的,简单说来,这些类型必须支持Hadoop的序列化. 为了提高序列化的性能,Hadoop还为Java中常见的基本类型提供了相应地支持序列化的类型,如IntWritable,LongWritable,并为String类型提供了Text类型.

下一代Hadoop MapReduce

- Jia - NoSQLFan
本文来自Hadoop Summit大会的一个演讲稿,主讲是Hadoop核心开发团队的Arun C Murthy (@acmurthy),同时他也是Yahoo!刚刚剥离的Hadoop独立公司Hortonworks的 Founder和架构师. 演讲中他讲述了现在的Hadoop存在的一些问题和集群上限,并展望了下一代Hadoop和其MapReduce将会得到的巨大提升.

"Hadoop/MapReduce/HBase"分享总结

- - ITeye博客
此分享是关于hadoop生态系统的简单介绍包括起源到相对应用. Hadoop和HBase.pdf (2.1 MB). 已有 0 人发表留言,猛击->> 这里<<-参与讨论. —软件人才免语言低担保 赴美带薪读研.

Hadoop之MapReduce单元测试

- - ITeye博客
通常情况下,我们需要用小数据集来单元测试我们写好的map函数和reduce函数. 而一般我们可以使用Mockito框架来模拟OutputCollector对象(Hadoop版本号小于0.20.0)和Context对象(大于等于0.20.0). 下面是一个简单的WordCount例子:(使用的是新API).

MapReduce编程模型

- - CSDN博客云计算推荐文章
MapReduce是一个Google发明的编程模型,也是一个处理和生成超大规模数据集的算法模型的相关实现. 用户首先创建一个Map函数处理一个基于对的数据集合,输出的中间结果基于对的数据集合,然后再创建一个Reduce函数用来合并所有的具有相同中间Key值的中间Value值.

[转]基于mapreduce的Hadoop join实现

- -
对于一个大数据的分析应用,join是必不可少的一项功能.现在很多构建与hadoop之上的应用,如Hive,PIG等在其内部实现了join程序,可以通过很简单的sql语句或者数据操控脚本完成相应的Join工作.那么join应该如何实现呢?今天我们就对join做一个简单的实现. 我们来看一个例子,现在有两组数据:一组为单位人员信息,如下:.

【Hadoop】MapReduce使用combiner优化性能

- - CSDN博客云计算推荐文章
当MapReduce模型中,reduce执行的任务为统计分类类型的值总量或去重后的数量,或最大值最小值时,可以考虑在Map输出后进行combine操作;这样可以减少网络传输带来的开销,同时减轻了reduce任务的负担. Combine操作是运行在每个节点上的,只会影响本地Map的输出结果;Combine的输入为本地map的输出结果(一般是数据在溢出到磁盘之前,可以减少IO开销),其输出则作为reduce的输入.

hadoop的IO和MapReduce优化参数

- - CSDN博客系统运维推荐文章
           在MapReduce执行过程中,特别是Shuffle阶段,尽量使用内存缓冲区存储数据,减少磁盘溢写次数;同时在作业执行过程中增加并行度,都能够显著提高系统性能,这也是配置优化的一个重要依据.            下面分别介绍I/O属性和MapReduce属性这两个类的部分属性,并指明其优化方向.