hadoop编程:分析CSDN注册邮箱分布情况

标签: hadoop 编程 分析 | 发表时间:2014-07-08 15:48 | 作者:jdh99
出处:http://blog.csdn.net

hadoop编程:分析CSDN注册邮箱分布情况


本文博客链接: http://blog.csdn.net/jdh99,作者:jdh,转载请注明.


环境:

主机:Ubuntu10.04

hadoop版本:1.2.1

开发工具:eclipse4.4.0


说明:

要求:原始数据共6428632条,分析不同邮箱的注册情况,并按使用人数从大到小排序。

分析:hadoop自带一个排序,是按key值来进行排序的。要按值(value)进行排序,需要二次排序。

步骤:

1.job1:统计不同注册邮箱的使用人数,用默认的key值排序,保存在HDFS系统中

2.job2:对job1的输出进行二次排序,按值从大到小排序


结果输出:

使用人数在1W以上的邮箱共有24个:

qq.com    1976196
163.com    1766927
126.com    807895
sina.com    351596
yahoo.com.cn    205491
hotmail.com    202948
gmail.com    186843
sohu.com    104736
yahoo.cn    87048
tom.com    72365
yeah.net    53295
21cn.com    50710
vip.qq.com    35119
139.com    29207
263.net    24779
sina.com.cn    19156
live.cn    18920
sina.cn    18601
yahoo.com    18454
foxmail.com    16432
163.net    15176
msn.com    14211
eyou.com    13372
yahoo.com.tw    10810


源代码:


JOB1:统计不同注册邮箱的人数

CsdnData.java

  package com.bazhangkeji.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class CsdnData 
{
	public static void main(String[] args) throws Exception 
	{
		Configuration conf = new Configuration();
		String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
		if (otherArgs.length != 2) 
		{
			System.err.println("Usage: csdndata <in> <out>");
			System.exit(2);
		}
		Job job = new Job(conf, "csdndata");
		
		job.setJarByClass(CsdnData.class);
		job.setMapperClass(MapData.class);
		
		job.setReducerClass(ReducerData.class); 
		job.setOutputKeyClass(Text.class);
		
		job.setOutputValueClass(IntWritable.class);
		FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
		FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
		System.exit(job.waitForCompletion(true) ? 0 : 1);
  	}
}


MapData.java

package com.bazhangkeji.hadoop;
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;

public class MapData extends Mapper<Object, Text, Text, IntWritable>
{
	IntWritable one = new IntWritable(1);
  	Text word = new Text();

  	public void map(Object key, Text value, Context context) throws IOException, InterruptedException 
	{
  		StringBuffer str_in = new StringBuffer();
		StringBuffer str_out = new StringBuffer();
		int index = 0;
		
		//初始化字符串
		str_in.setLength(0);
		str_out.setLength(0);
		str_in.append(value.toString());
		
		//获得邮箱的起始位置
		index = str_in.toString().lastIndexOf('@');
		if (index != -1)
		{
			word.set(str_in.toString().substring(index + 1).trim().toLowerCase());
  			context.write(word, one);
		}
  	}
}


ReducerData.java

package com.bazhangkeji.hadoop;
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;

public class ReducerData extends Reducer<Text,IntWritable,Text,IntWritable> 
{
	IntWritable result = new IntWritable();

  	public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException 
	{
    	int sum = 0;
    	for (IntWritable val : values) 
		{
      		sum += val.get();
    	}
    	result.set(sum);
    	context.write(key, result);
  	}
}


JOB2:对job1的输出进行二次排序,按值从大到小排序

SortSecond.java

  package com.bazhangkeji.hadoop2;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class SortSecond 
{
	public static void main(String[] args) throws Exception 
	{
		Configuration conf = new Configuration();
		String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
		if (otherArgs.length != 2) 
		{
			System.err.println("Usage: csdndata <in> <out>");
			System.exit(2);
		}
		Job job = new Job(conf, "sortsecond");
		job.setJarByClass(SortSecond.class);
		
		job.setMapperClass(MapSecond.class);
		job.setReducerClass(ReduceSecond.class); 
		
		job.setSortComparatorClass(SortMy.class); //设置自定义二次排序策略
		
		job.setOutputKeyClass(KeyMy.class);
		job.setOutputValueClass(IntWritable.class);
		
		FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
		FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
		System.exit(job.waitForCompletion(true) ? 0 : 1);
  	}
}


MapSecond.java

  package com.bazhangkeji.hadoop2;
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;

public class MapSecond extends Mapper<LongWritable, Text, KeyMy, IntWritable>
{
	IntWritable one = new IntWritable(1);
  	Text word = new Text();
  	KeyMy keymy = new KeyMy();

  	public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
	{
  		String str_in = value.toString();
  		int index = 0;
  		
  		index = str_in.indexOf('\t');
  		if (value.toString().length() > 3 && index != -1)
  		{
	  		String str1 = str_in.substring(0, index);
	  		String str2 = str_in.substring(index + 1);
	  		
	  		if (str1.length() != 0 && str2.length() != 0)
	  		{
	  			one.set(Integer.parseInt(str2));
	  			word.set(str1);
				keymy.setFirstKey(word);
				keymy.setSecondKey(one);
	  			context.write(keymy, one);
	  		}
  		}
  	}
}


ReduceSecond.java

  package com.bazhangkeji.hadoop2;
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;

public class ReduceSecond extends Reducer<KeyMy,IntWritable,Text,IntWritable> 
{
	IntWritable result = new IntWritable();

  	public void reduce(KeyMy key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException 
	{
    	context.write(key.getFirstKey(), key.getSecondKey());
  	}
}


KeyMy.java

  package com.bazhangkeji.hadoop2;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * 自定义组合键
 */
public class KeyMy implements WritableComparable<KeyMy>{
    private static final Logger logger = LoggerFactory.getLogger(KeyMy.class);
    private Text firstKey;
    private IntWritable secondKey;
    public KeyMy() {
        this.firstKey = new Text();
        this.secondKey = new IntWritable();
    }
    public Text getFirstKey() {
        return this.firstKey;
    }
    public void setFirstKey(Text firstKey) {
        this.firstKey = firstKey;
    }
    public IntWritable getSecondKey() {
        return this.secondKey;
    }
    public void setSecondKey(IntWritable secondKey) {
        this.secondKey = secondKey;
    }
    @Override
    public void readFields(DataInput dateInput) throws IOException {
        // TODO Auto-generated method stub
        this.firstKey.readFields(dateInput);
        this.secondKey.readFields(dateInput);
    }
    @Override
    public void write(DataOutput outPut) throws IOException {
        this.firstKey.write(outPut);
        this.secondKey.write(outPut);
    }
    /**
     * 自定义比较策略
     * 注意:该比较策略用于 mapreduce的第一次默认排序,也就是发生在map阶段的sort小阶段,
     * 发生地点为环形缓冲区(可以通过io.sort.mb进行大小调整)
     */
    @Override
    public int compareTo(KeyMy KeyMy) {
        logger.info("-------KeyMy flag-------");
        return this.firstKey.compareTo(KeyMy.getFirstKey());
    }
}


SortMy.java

  package com.bazhangkeji.hadoop2;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * 自定义二次排序策略
 */
public class SortMy extends WritableComparator {
    private static final Logger logger = LoggerFactory.getLogger(SortMy.class);
    public SortMy() {
        super(KeyMy.class,true);
    }
    @Override
    public int compare(WritableComparable KeyMyOne,
            WritableComparable KeyMyOther) 
    {
        logger.info("---------enter SortMy flag---------");
                                                                                                                                                                                             
        KeyMy c1 = (KeyMy) KeyMyOne;
        KeyMy c2 = (KeyMy) KeyMyOther;
        
        return c2.getSecondKey().get()-c1.getSecondKey().get();//0,负数,正数
    }
}

参考资料:

1.《hadoop权威指南》

2.   http://zengzhaozheng.blog.51cto.com/8219051/1379271



作者:jdh99 发表于2014/7/8 15:48:05 原文链接
阅读:64 评论:0 查看评论

相关 [hadoop 编程 分析] 推荐:

Hadoop Streaming 编程

- - 学着站在巨人的肩膀上
Hadoop Streaming是Hadoop提供的一个编程工具,它允许用户使用任何可执行文件或者脚本文件作为Mapper和Reducer,例如:. 采用shell脚本语言中的一些命令作为mapper和reducer(cat作为mapper,wc作为reducer). 本文安排如下,第二节介绍Hadoop Streaming的原理,第三节介绍Hadoop Streaming的使用方法,第四节介绍Hadoop Streaming的程序编写方法,在这一节中,用C++、C、shell脚本 和python实现了WordCount作业,第五节总结了常见的问题.

hadoop编程:分析CSDN注册邮箱分布情况

- - CSDN博客云计算推荐文章
hadoop编程:分析CSDN注册邮箱分布情况. 本文博客链接: http://blog.csdn.net/jdh99,作者:jdh,转载请注明. 主机:Ubuntu10.04. hadoop版本:1.2.1. 开发工具:eclipse4.4.0. 要求:原始数据共6428632条,分析不同邮箱的注册情况,并按使用人数从大到小排序.

Hadoop MapReduce高级编程

- - 互联网 - ITeye博客
•combine函数把一个map函数产生的对(多个key, value)合并成一个新的. 将新的作为输入到reduce函数中,其格式与reduce函数相同. •这样可以有效的较少中间结果,减少网络传输负荷. •什么情况下可以使用Combiner.

对Hadoop的SWOT分析

- - ITeye博客
在当前大数据研究与应用如火如荼的时代,Hadoop毅然成为业界的翘楚. 我想本着客观的态度,从旁观者的角度分析当前Hadoop所面对的机遇与挑战. 在我的认识中,SWOT分析还算全面,所以就选择这样的分析思路. 当然,我分析Hadoop的初衷是我认同Hadoop所代表的方向,但不具体只是Hadoop这个产品.

Hadoop MapReduce编程入门案例

- - CSDN博客云计算推荐文章
Hadoop入门例程简析中. (下面的程序下载地址: http://download.csdn.net/detail/zpcandzhj/7810829). (1)Hadoop新旧API的区别. 新的API倾向于使用虚类(抽象类),而不是接口,因为这更容易扩展. 例如,可以无需修改类的实现而在虚类中添加一个方法(即用默认的实现).

Hadoop Metrics体系架构分析

- - 非技术 - ITeye博客
原文: http://blog.csdn.net/chenpingbupt/article/details/7957396. 本文基于Hadoop 0.20.XX版本分析,和现在的Metrics2稍有不同. Hadoop Metrics用来统计集群运行数据,比如接口调用次数,响应时间,队列长度等等,现阶段(0.19版本)支持为数不多的几个层级的数据,分别是dfs,jvm,rpc,mepred等.

Hadoop Metrics体系分析之三:构建自己的Metrics

- - Taobao QA Team
大型分布式系统中需要metrics来了解系统状态已成为系统必需的功能之一. 其实测试系统甚至测试用例中也同样需要metrics. 通过这些指标我们可以了解测试的进度、状况、以及一些过程情况,比如性能指标和一些无法用是否判断数据. 下面我们就用一个简单的例子来看看如何使用hadoop metrics.

Kylin:基于Hadoop的开源数据仓库OLAP分析引擎

- - 标点符
Kylin是一个开源、分布式的OLAP分析引擎,它由eBay公司开发,并且基于Hadoop提供了SQL接口和OLAP接口,能够支持TB到PB级别的数据量. OLAP即联机分析处理,它能够帮助分析人员、管理人员或执行人员从多角度快速、一致、交互地存取信息和更加深入的了解信息. OLAP的目标是满足决策支持或者满足在多维环境下特定的查询和报表需求.

(转)深度分析如何在Hadoop中控制Map的数量

- - 互联网 - ITeye博客
很 多文档中描述,Mapper的数量在默认情况下不可直接控制干预,因为Mapper的数量由输入的大小和个数决定. 在默认情况下,最终input占据了多 少block,就应该启动多少个Mapper. 如果输入的文件数量巨大,但是每个文件的size都小于HDFS的blockSize,那么会造成启动的 Mapper等于文件的数量(即每个文件都占据了一个block),那么很可能造成启动的Mapper数量超出限制而导致崩溃.

Hadoop集群硬盘故障分析与自动化修复

- - CSDN博客推荐文章
作者:Zhang, Haohao. 硬盘在服务器中起着至关重要的作用,因为硬盘里面存储的是数据,随着制造业技术的提高,硬盘的类型也在逐渐的改变. 对于硬盘的管理是IAAS部门的责任,但作为业务运维也需要懂得相关的技术. 有的公司采用LVM来管理硬盘,这样做方便扩缩容,也有的公司直接用裸盘来存数据,这样做的好处是不会因LVM而损失掉一部分硬盘I/O速度.