Hadoop HelloWord Examples- 求平均数

标签: hadoop helloword examples | 发表时间:2013-08-23 21:51 | 作者:qiul12345
出处:http://blog.csdn.net

  另外一个hadoop的入门demo,求平均数。是对WordCount这个demo的一个小小的修改。输入一堆成绩单(人名,成绩),然后求每个人成绩平均数,比如:

//  subject1.txt

  a 90
  b 80
  c 70


 // subject2.txt

  a 100
  b 90
  c 80


  求a,b,c这三个人的平均分。解决思路很简单,在map阶段key是名字,value是成绩,直接output。reduce阶段得到了map输出的key名字,values是该名字对应的一系列的成绩,那么对其求平均数即可。

  这里我们实现了两个版本的代码,分别用TextInputFormat和 KeyValueTextInputFormat来作为输入格式。

  TextInputFormat版本:

 

import java.util.*;
import java.io.*;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;



public class AveScore {
	
	public static class AveMapper extends Mapper<Object, Text, Text, IntWritable>
	{
		@Override
		public void map(Object key, Text value, Context context) throws IOException, InterruptedException
		{
			String line = value.toString();
			String[] strs = line.split(" ");
			String name = strs[0];
			int score = Integer.parseInt(strs[1]);
			context.write(new Text(name), new IntWritable(score));
		}
	}
	
	public static class AveReducer extends Reducer<Text, IntWritable, Text, IntWritable>
	{
		@Override
		public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
		{
			int sum = 0;
			int count = 0;
			
			for(IntWritable val : values)
			{
				sum += val.get();
				count++;
			}
			
			int aveScore = sum / count;
			
			context.write(key, new IntWritable(aveScore));
		}
	}
	
	public static void main(String[] args) throws Exception
	{
		Configuration conf = new Configuration();
		
		Job job = new Job(conf,"AverageScore");
		job.setJarByClass(AveScore.class);
		
		job.setMapperClass(AveMapper.class);
		job.setReducerClass(AveReducer.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		System.exit( job.waitForCompletion(true) ? 0 : 1);
	}
}

KeyValueTextInputFormat版本;

import java.util.*;
import java.io.*;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;


public class AveScore_KeyValue {
	
	public static class AveMapper extends Mapper<Text, Text, Text, IntWritable>
	{
		@Override
		public void map(Text key, Text value, Context context) throws IOException, InterruptedException
		{
		    int score = Integer.parseInt(value.toString());
			context.write(key, new IntWritable(score) );
		}
	}
	
	public static class AveReducer extends Reducer<Text, IntWritable, Text, IntWritable>
	{
		@Override
		public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
		{
			int sum = 0;
			int count = 0;
			
			for(IntWritable val : values)
			{
				sum += val.get();
				count++;
			}
			
			int aveScore = sum / count;
			
			context.write(key, new IntWritable(aveScore));
		}
	}
	
	public static void main(String[] args) throws Exception
	{
		Configuration conf = new Configuration();
		conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", " ");
		
		Job job = new Job(conf,"AverageScore");
		job.setJarByClass(AveScore_KeyValue.class);
		
		job.setMapperClass(AveMapper.class);
		job.setReducerClass(AveReducer.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
  		job.setInputFormatClass(KeyValueTextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class)  ; 

		
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		System.exit( job.waitForCompletion(true) ? 0 : 1);
	}
}


输出结果为:

  a 95
  b 85
  c 75

 

作者:qiul12345 发表于2013-8-23 21:51:03 原文链接
阅读:113 评论:0 查看评论

相关 [hadoop helloword examples] 推荐:

Hadoop HelloWord Examples- 求平均数

- - CSDN博客云计算推荐文章
  另外一个hadoop的入门demo,求平均数. 是对WordCount这个demo的一个小小的修改. 输入一堆成绩单(人名,成绩),然后求每个人成绩平均数,比如:.   求a,b,c这三个人的平均分. 解决思路很简单,在map阶段key是名字,value是成绩,直接output. reduce阶段得到了map输出的key名字,values是该名字对应的一系列的成绩,那么对其求平均数即可.

Hadoop HelloWorld Examples - 单表连接

- - CSDN博客云计算推荐文章
  应该是那本"Hadoop 实战"的第4个demo了,单表连接. 给出一对对的children和parents的名字,然后输出所有的grandchildren和grandparents对.   输入数据(第一列child,第二列 parent).   输出数据(第一列grandchild,第二列grandparents).

Hadoop Streaming 编程

- - 学着站在巨人的肩膀上
Hadoop Streaming是Hadoop提供的一个编程工具,它允许用户使用任何可执行文件或者脚本文件作为Mapper和Reducer,例如:. 采用shell脚本语言中的一些命令作为mapper和reducer(cat作为mapper,wc作为reducer). 本文安排如下,第二节介绍Hadoop Streaming的原理,第三节介绍Hadoop Streaming的使用方法,第四节介绍Hadoop Streaming的程序编写方法,在这一节中,用C++、C、shell脚本 和python实现了WordCount作业,第五节总结了常见的问题.

Hadoop使用(一)

- Pei - 博客园-首页原创精华区
Hadoop使用主/从(Master/Slave)架构,主要角色有NameNode,DataNode,secondary NameNode,JobTracker,TaskTracker组成. 其中NameNode,secondary NameNode,JobTracker运行在Master节点上,DataNode和TaskTracker运行在Slave节点上.

Hadoop MapReduce技巧

- - 简单文本
我在使用Hadoop编写MapReduce程序时,遇到了一些问题,通过在Google上查询资料,并结合自己对Hadoop的理解,逐一解决了这些问题. Hadoop对MapReduce中Key与Value的类型是有要求的,简单说来,这些类型必须支持Hadoop的序列化. 为了提高序列化的性能,Hadoop还为Java中常见的基本类型提供了相应地支持序列化的类型,如IntWritable,LongWritable,并为String类型提供了Text类型.

Hadoop TaskScheduler浅析

- - kouu&#39;s home
TaskScheduler,顾名思义,就是MapReduce中的任务调度器. 在MapReduce中,JobTracker接收JobClient提交的Job,将它们按InputFormat的划分以及其他相关配置,生成若干个Map和Reduce任务. 然后,当一个TaskTracker通过心跳告知JobTracker自己还有空闲的任务Slot时,JobTracker就会向其分派任务.

HADOOP安装

- - OracleDBA Blog---三少个人自留地
最近有时间看看hadoop的一些东西,而且在测试的环境上做了一些搭建的工作. 首先,安装前需要做一些准备工作. 使用一台pcserver作为测试服务器,同时使用Oracle VM VirtualBox来作为虚拟机的服务器. 新建了三个虚拟机以后,安装linux,我安装的linux的版本是redhat linux 5.4 x64版本.

Hadoop Corona介绍

- - 董的博客
Dong | 可以转载, 但必须以超链接形式标明文章原始出处和作者信息及 版权声明. 网址: http://dongxicheng.org/hadoop-corona/hadoop-corona/. Hadoop Corona是facebook开源的下一代MapReduce框架. 其基本设计动机和Apache的YARN一致,在此不再重复,读者可参考我的这篇文章 “下一代Apache Hadoop MapReduce框架的架构”.

Hadoop RPC机制

- - 企业架构 - ITeye博客
RPC(Remote Procedure Call Protocol)远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议. Hadoop底层的交互都是通过 rpc进行的. 例如:datanode和namenode 、tasktracker和jobtracker、secondary namenode和namenode之间的通信都是通过rpc实现的.

Hadoop Rumen介绍

- - 董的博客
Dong | 新浪微博: 西成懂 | 可以转载, 但必须以超链接形式标明文章原始出处和作者信息及 版权声明. 网址: http://dongxicheng.org/mapreduce/hadoop-rumen-introduction/. 什么是Hadoop Rumen?. Hadoop Rumen是为Hadoop MapReduce设计的日志解析和分析工具,它能够将JobHistory 日志解析成有意义的数据并格式化存储.