第一个完整的Map/Reduce小程序

标签: 完整 map reduce | 发表时间:2014-06-29 02:19 | 作者:
出处:http://www.iteye.com

          从在自己的win7下面装好虚拟机,然后在虚拟机上面安装hadoop,然后再安装hadoop-eclipse插件,过去好像有一个星期了,之前装虚拟机和hadoop都没成功,上个星期解除了封印,一口气把hadoop学习前期的所有的东西都搞定了,接下来就是遥遥无期的hadoop之路。希望自己能坚持下去。

         今天按着别人的思路在win7下面的eclipse里面敲了算是处女作的Map/Reduce程序,虽然很简单,但是自己还是一步一步的走通了,因为hadoop是安装在虚拟机上的,但是eclipse是在win7下面,所以在中间运行的时候会有一系列的错误,昨天晚上把遇到的问题百度的百度,问神的问神,烧香的烧香,基本上都解决了,现在能把程序跑起来,感觉自己的熬夜什么的都没有白费

          下面把一个完整的Map/Reduce程序贴出来,算是一个开始,也是一个纪念嘛!

     

        问题描述:

        先上数据:

         13599999999 10086

         13899999999      120

         13944444444 110

         13722222222 110

         18800000000 120

         13722222222 10086

         18944444444 10086

        

         要求是把拨打过同一个电话的电话输出来比如:

          110 13944444444,13722222222

         

         接下来就是Map/Reduce函数, Map函数将每一行数据读入,然后进行分割,分割成 <key,value>的格式         上面的 key相当于110value相当于 13944444444,也就是数据前面的是value,后面的是key,代码如下:

     

public static class Map extends Mapper<LongWritable, Text, Text, Text> {

		public void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {

			String line = value.toString(); // 读取源数据

			try {
				// 数据处理
				String[] lineSplit = line.split(" "); // 将数据分割
				String anum = lineSplit[0];
				String bnum = lineSplit[1];

				context.write(new Text(bnum), new Text(anum)); // 输出

			} catch (java.lang.ArrayIndexOutOfBoundsException e) {
				context.getCounter(Counter.LINESKIT).increment(1); // 出错令计数器加1
				return;
			}

		}

	}

 

 

    接下就是Reduce类, Reduce类要做的就是把从 Map传入来的数据进行整合, 将Map中具有相同key值的value    进行迭代,代码如下(需要知道的是, Map函数的输出也就是Reduce函数的输入,所以在写里面的reduce函     数的时候要注意 参数的类型要和上面 Map的输出值的类型相对应):

 

public static class Reduce extends Reducer<Text, Text, Text, Text> {
		
		//Iterable<> 使用一个迭代器将数据保存起来
		public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException {
			String valueString;
			String out = "";
			
			for(Text value : values) {
				valueString = value.toString();
				out += valueString+",";
			}
			
			context.write(key, new Text(out));
		}
	}

 

 

     接下来就是贴出整个程序,里面有个枚举是用来记录数据错误的时候进行一个计数,还有一个run方法是重写     了主类实现的接口的方法,每个Map/Reduce程序里面run方法里面书写的套路基本上都是一样的,main方       法里面书写的套路基本上也都是一样的,只要改一下主类的名字就可以了

  

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Test_2 extends Configured implements Tool {
	enum Counter {
		LINESKIT, // 出错的行
	}

	public static class Map extends Mapper<LongWritable, Text, Text, Text> {

		public void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {

			String line = value.toString(); // 读取源数据

			try {
				// 数据处理
				String[] lineSplit = line.split(" "); // 将数据分割
				String anum = lineSplit[0];
				String bnum = lineSplit[1];

				context.write(new Text(bnum), new Text(anum)); // 输出

			} catch (java.lang.ArrayIndexOutOfBoundsException e) {
				context.getCounter(Counter.LINESKIT).increment(1); // 出错令计数器加1
				return;
			}

		}

	}
	
	public static class Reduce extends Reducer<Text, Text, Text, Text> {
		
		//Iterable<> 使用一个迭代器将数据保存起来
		public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException {
			String valueString;
			String out = "";
			
			for(Text value : values) {
				valueString = value.toString();
				out += valueString+",";
			}
			
			context.write(key, new Text(out));
		}
	}

	@Override
	public int run(String[] args) throws Exception {
		Configuration conf =getConf();
		
		Job job = new Job(conf,"Test_2");    //任务名:主类的名字
		job.setJarByClass(Test_2.class);        //指定class
		
		FileInputFormat.addInputPath(job, new Path(args[0]));   //指定输入路径
		FileOutputFormat.setOutputPath(job, new Path(args[1]));  //指定输出路径
		
		job.setMapperClass(Map.class);    //调用上面的Map类作为Map任务代码
		job.setReducerClass(Reduce.class);   //调用上面的Reduce类做为Reduce任务代码
		
		job.setOutputFormatClass(TextOutputFormat.class);
		
		job.setOutputKeyClass(Text.class);    //指定输出的key的格式
		job.setOutputValueClass(Text.class);    //指定输出的value的格式
		
		job.waitForCompletion(true);
		
		return job.isSuccessful()?1:0;
	}
	
	public static void main(String[] args) throws Exception {
		//main 方法运行
		int res = ToolRunner.run(new Configuration(),new Test_2(),args);
		System.exit(res);
	}
	
}

  

 

    传好数据,

   

 

做好配置:在项目上右键,选择Run as 下面的 Run connfigruations,前面的Main填上项目的名字,然后填上主类的名字,接下来的Arguments写上linux下面hdfs中存放文件的路径和输出结果的路径,最后点击Apply 然后点击run



 

运行结果 :刷新右边DFS Locations 下面的root目录,下面就出现了你在上一步Arguments处填写的输出目录,需要注意的是运行前要保证 root目录下面的没有和你在Argments填写时同名的输出结果的目录,



 
 双击我们的输出目录下面的part-r-00000,就可以看到我们想要的结果了:



 

 

细心的数一下,我们的数据是不是少了一条,原因是我们给的数据上面的格式不对,仔细看上面的的第二条数据,可以知道中间不是只空了一个空格,所以这一条数据就被跳过了。

 

 

心得:写完第一个Map/Reduce函数,之前也看过几个,感觉 模式基本上是一样的, 需要记得的就是Map函数的输出结果是Reduce函数的输入,因此在写Map方法和Reduce方法的时候就要注意里面的参数的类型要保持一致,否则会出错,其他的 就基本上按照套路来就可以了,按照不同的需求写 出不同的方法,模板就感觉只有一个。

 



已有 0 人发表留言,猛击->> 这里<<-参与讨论


ITeye推荐



相关 [完整 map reduce] 推荐:

第一个完整的Map/Reduce小程序

- - ITeye博客
          从在自己的win7下面装好虚拟机,然后在虚拟机上面安装hadoop,然后再安装hadoop-eclipse插件,过去好像有一个星期了,之前装虚拟机和hadoop都没成功,上个星期解除了封印,一口气把hadoop学习前期的所有的东西都搞定了,接下来就是遥遥无期的hadoop之路.          今天按着别人的思路在win7下面的eclipse里面敲了算是处女作的Map/Reduce程序,虽然很简单,但是自己还是一步一步的走通了,因为hadoop是安装在虚拟机上的,但是eclipse是在win7下面,所以在中间运行的时候会有一系列的错误,昨天晚上把遇到的问题百度的百度,问神的问神,烧香的烧香,基本上都解决了,现在能把程序跑起来,感觉自己的熬夜什么的都没有白费.

基于的Map/Reduce的ItemCF

- - M.J.
ItemCF为基于邻域的方法使用用户共同行为来对Item之间的相似度进行计算,从而利用k-近邻算法使用用户曾经有个行为的Item进行推荐. 好处是系统只需要存储Item x Item的相似度矩阵,对于Item数量远小于用户数量的应用来说,具有很高的性价比. ItemCF最核心的计算为item之间相似度矩阵的计算,同时还需要能够在短时间内响应Item变化情况(用户有行为之后就会造成相似度矩阵的重新计算,实际中不会全部重新计算而会使用增量计算的方式.

hadoop学习(七)WordCount+Block+Split+Shuffle+Map+Reduce技术详解

- - CSDN博客数据库推荐文章
纯干活:通过WourdCount程序示例:详细讲解MapReduce之Block+Split+Shuffle+Map+Reduce的区别及数据处理流程.        Shuffle过程是MapReduce的核心,集中了MR过程最关键的部分. 要想了解MR,Shuffle是必须要理解的. 了解Shuffle的过程,更有利于我们在对MapReduce job性能调优的工作,以及对MR内部机理有更深一步的了解.

map-reduce自定义分组自定义排序

- - 行业应用 - ITeye博客
1 * @author zm * * 当第一列相同时,求出第二列的最小值---> 由要求分析如下: * 1 必然以 row1来进行分组. * 2 必然也是以 row1,row2作为一个整体来进行比较才能有 当第一列相同时,在比较第二列的状态发生 * 3 mr中,执行流程是 -->-->--> *.

map和reduce 个数的设定 (Hive优化)经典

- - 研发管理 - ITeye博客
一、    控制hive任务中的map数: . 1.    通常情况下,作业会通过input的目录产生一个或者多个map任务. 主要的决定因素有: input的文件总个数,input的文件大小,集群设置的文件块大小(目前为128M, 可在hive中通过set dfs.block.size;命令查看到,该参数不能自定义修改);.

JavaScript Source Map 详解

- - 阮一峰的网络日志
上周, jQuery 1.9发布. 这是2.0版之前的最后一个新版本,有很多新功能,其中一个就是支持Source Map. 访问 http://ajax.googleapis.com/ajax/libs/jquery/1.9.0/jquery.min.js,打开压缩后的版本,滚动到底部,你可以看到最后一行是这样的:.

mapreduce实例-Join连接 (reduce Side Join)

- - CSDN博客云计算推荐文章
//根据连接类型做不同处理. //设置不同map处理不同输入. 外键作为map输出的key,相同的外键值必然落在一个reduce中,在reduce端根据需要做不同形式的连接. 作者:liuzhoulong 发表于2013-9-5 21:35:16 原文链接. 阅读:83 评论:0 查看评论.

Hibernate调优之select new map()

- - CSDN博客架构设计推荐文章
        Hibernate调优不只是设置一下lazy,调整一下由谁来维护这个字段而已.         这次要说的是对查询语句进行优化——select new map().         select new map语句结果说明.         语句一:.         结果list中,每条记录对应一个object数组,object[]中每个元素为hql语句中列的序号(从0开始).

【转载】在Google Map上玩LEGO

- - HTML5研究小组
Google又放出了很帅又充满了Google式小清新风格的HTML5在线游戏,这次和LEGO合作——在Google Map上砌LEGO积木:. 这个游戏不知道是哪个和我一样买不起房的屌丝想出来的,不知道梦见几次在地球上某个有待开荒的土地上占一个山头盖属于自己的房子之后用满腔的热血把它做出来了. 不过貌似只能选择在大洋洲范围内的土地,估计开发者是那边的穷矮矬.

开发基于 Google Map 的 Android 应用

- - 博客 - 伯乐在线
简介: 随着移动互联网应用的迅速发展,利用智能手机提供的实时地理位置信息服务功能扩展出众多 LBS(Location Based Service) 应用,将实时地理位置信息与手机的便捷、移动特性结合,为人们提供多种多样的应用场景,比如实时定位、导航、搜索周围好友、基于地理位置的信息推荐等. 本文通过实例介绍如何开发基于 Google Map 的 Android 应用.