如何使用hadoop对海量数据进行统计并排序

标签: hadoop 量数 统计 | 发表时间:2013-11-13 17:54 | 作者:
出处:http://www.iteye.com
不得不说,Hadoop确实是处理海量离线数据的利器,当然,凡是一个东西有优点必定也有缺点,hadoop的缺点也很多,比如对流式计算,实时计算,DAG具有依赖关系的计算,支持都不友好,所以,由此诞生了很多新的分布式计算框架,Storm,Spark,Tez,impala,drill,等等,他们都是针对特定问题提出一种解决方案,新框架的的兴起,并不意味者他们就可以替代hadoop,一手独大,HDFS和MapReduce依旧是很优秀的,特别是对离线海量数据的处理。

hadoop在如下的几种应用场景里,用的还是非常广泛的,1,搜索引擎建索引,2,topK热关键词统计,3,海量日志的数据分析等等。
散仙,今天的这个例子的场景要对几亿的单词或短语做统计和并按词频排序,当然这些需求都是类似WordCount问题,如果你把Hadoop自带的WordCount的例子,给搞懂了,基本上做一些IP,热词的统计与分析就很很容易了,WordCount问题,确实是一个非常具有代表性的例子。


下面进入正题,先来分析下散仙这个例子的需求,总共需要二步来完成,第一步就是对短语的统计,第二步就是对结果集的排序。所以如果使用MapReduce来完成的话,就得需要2个作业来完成这件事情,第一个作业来统计词频,第二个来负责进行排序,当然这两者之间是有依赖关系的,第二个作业的执行,需要依赖第一个作业的结果,这就是典型的M,R,R的问题并且作业之间具有依赖关系,这种问题使用MapReduce来完成,效率可能有点低,如果使用支持DAG作业的Tez来做这件事情,那么就很简单了。不过本篇散仙,要演示的例子还是基于MapReduce来完成的,有兴趣的朋友,可以研究一下使用Tez。


对于第一个作业,我们只需要改写wordcount的例子,即可,因为散仙的需求里面涉及短语的统计,所以定义的格式为,短语和短语之间使用分号隔开,(默认的格式是按单词统计的,以空格为分割符)在map时只需要,按分号打散成数组,进行处理即可,测试的数据内容如下:



map里面的核心代码如下:
  /**
     * 统计词频的map端
     * 代码
     * 
     * **/
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
   
    	String [] data=value.toString().split(";");//按每行的分号拆分短语
    	for(String s:data){
    		if(s.trim().length()>0){//忽略空字符
    		word.set(s);
    		context.write(word, one);
    		}
    	}
 
    }


reduce端的核心代码如下:

/**
     * reduce端的
     * 代码
     * **/
    public void reduce(Text key, Iterable<IntWritable> values, 
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();//累加词频
      }
      result.set(sum);
      context.write(new Text(key.toString()+"::"), result);//为方便短语排序,以双冒号分隔符间隔
    }
  }

main函数里面的代码如下:
  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: wordcount <in> <out>");
      System.exit(2);
    }
    Job job = new Job(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }

运行结果,如下所示:
a good student::	1
good student::	3
patient::	2
patient a::	1


下面,散仙来分析下排序作业的代码,如上图所示hadoop默认的排序,是基于key排序的,如果是字符类型的则基于字典表排序,如果是数值类型的则基于数字大小排序,两种方式都是按默认的升序排列的,如果想要降序输出,就需要我们自己写个排序组件了,散仙会在下面的代码给出例子,因为我们是要基于词频排序的,所以需要反转K,V来实现对词频的排序,map端代码如下:
/**
		 * 排序作业
		 * map的实现
		 * 
		 * **/
		@Override
		protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {
			String s[]=value.toString().split("::");//按两个冒号拆分每行数据
			word.set(s[0]);//
			one.set(Integer.parseInt(s[1].trim()));//
			context.write(one, word);//注意,此部分,需要反转K,V顺序
		}

reduce端代码如下:
/***
  * 
  * 排序作业的
  * reduce代码
  * **/		
		@Override
		protected void reduce(IntWritable arg0, Iterable<Text> arg1, Context arg2)
				throws IOException, InterruptedException {
			for(Text t:arg1){
				result.set(t.toString());
				 arg2.write(result, arg0);
			}
		}



下面,我们再来看下排序组件的代码:

/***
 * 按词频降序排序
 * 的类
 * 
 * **/
	public static class DescSort extends  WritableComparator{

		 public DescSort() {
			 super(IntWritable.class,true);//注册排序组件
		}
		 @Override
		public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3,
				int arg4, int arg5) {
			return -super.compare(arg0, arg1, arg2, arg3, arg4, arg5);//注意使用负号来完成降序
		}
		 
		 @Override
		public int compare(Object a, Object b) {
	 
			return   -super.compare(a, b);//注意使用负号来完成降序
		}
		
	}


main方法里面的实现代码如下所示:

public static void main(String[] args) throws Exception{
		
		
		
		  Configuration conf = new Configuration();
		    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
		    if (otherArgs.length != 2) {
		        System.err.println("Usage: wordcount <in> <out>");
		        System.exit(2);
		      }
		   Job job=new Job(conf, "sort");
		   job.setOutputKeyClass(IntWritable.class);
		   job.setOutputValueClass(Text.class);
		   job.setMapperClass(SortIntValueMapper.class);
		   job.setReducerClass(SortIntValueReducer.class)	;
		   job.setSortComparatorClass(DescSort.class);//加入排序组件
		   job.setInputFormatClass(org.apache.hadoop.mapreduce.lib.input.TextInputFormat.class);
		   job.setOutputFormatClass(TextOutputFormat.class);
		   FileInputFormat.setInputPaths(job, new Path(args[0]));
		   FileOutputFormat.setOutputPath(job, new Path(args[1]));
		 
		   System.exit(job.waitForCompletion(true) ? 0 : 1);
	}


输出结果,如下所示:
good student	3
patient	2
a good student	1
patient a	1


至此,我们可以成功实现,统计并排序的业务,当然这种类型的需求非常多而且常见,如对某个海量日志IP的分析,散仙上面的例子使用的只是测试的数据,而真实数据是对几亿或几十亿的短语构建语料库使用,配置集群方面,可以根据自己的需求,配置集群的节点个数以及map,reduce的个数,而代码,只需要我们写好,提交给hadoop集群执行即可。

最后在简单总结一下,数据处理过程中,格式是需要提前定制好的,也就是说你得很清楚的你的格式代表什么意思,另外一点,关于hadoop的中文编码问题,这个是内部固定的UTF-8格式,如果你是GBK的文件编码,则需要自己单独在map或reduce过程中处理一下,否则输出的结果可能是乱码,最好的方法就是统一成UTF-8格式,否则,很容易出现一些编码问题的。

已有 0 人发表留言,猛击->> 这里<<-参与讨论


ITeye推荐



相关 [hadoop 量数 统计] 推荐:

如何使用hadoop对海量数据进行统计并排序

- - ITeye博客
hadoop在如下的几种应用场景里,用的还是非常广泛的,1,搜索引擎建索引,2,topK热关键词统计,3,海量日志的数据分析等等. 散仙,今天的这个例子的场景要对几亿的单词或短语做统计和并按词频排序,当然这些需求都是类似WordCount问题,如果你把Hadoop自带的WordCount的例子,给搞懂了,基本上做一些IP,热词的统计与分析就很很容易了,WordCount问题,确实是一个非常具有代表性的例子.

Hadoop Streaming 编程

- - 学着站在巨人的肩膀上
Hadoop Streaming是Hadoop提供的一个编程工具,它允许用户使用任何可执行文件或者脚本文件作为Mapper和Reducer,例如:. 采用shell脚本语言中的一些命令作为mapper和reducer(cat作为mapper,wc作为reducer). 本文安排如下,第二节介绍Hadoop Streaming的原理,第三节介绍Hadoop Streaming的使用方法,第四节介绍Hadoop Streaming的程序编写方法,在这一节中,用C++、C、shell脚本 和python实现了WordCount作业,第五节总结了常见的问题.

Hadoop使用(一)

- Pei - 博客园-首页原创精华区
Hadoop使用主/从(Master/Slave)架构,主要角色有NameNode,DataNode,secondary NameNode,JobTracker,TaskTracker组成. 其中NameNode,secondary NameNode,JobTracker运行在Master节点上,DataNode和TaskTracker运行在Slave节点上.

Hadoop MapReduce技巧

- - 简单文本
我在使用Hadoop编写MapReduce程序时,遇到了一些问题,通过在Google上查询资料,并结合自己对Hadoop的理解,逐一解决了这些问题. Hadoop对MapReduce中Key与Value的类型是有要求的,简单说来,这些类型必须支持Hadoop的序列化. 为了提高序列化的性能,Hadoop还为Java中常见的基本类型提供了相应地支持序列化的类型,如IntWritable,LongWritable,并为String类型提供了Text类型.

Hadoop TaskScheduler浅析

- - kouu&#39;s home
TaskScheduler,顾名思义,就是MapReduce中的任务调度器. 在MapReduce中,JobTracker接收JobClient提交的Job,将它们按InputFormat的划分以及其他相关配置,生成若干个Map和Reduce任务. 然后,当一个TaskTracker通过心跳告知JobTracker自己还有空闲的任务Slot时,JobTracker就会向其分派任务.

HADOOP安装

- - OracleDBA Blog---三少个人自留地
最近有时间看看hadoop的一些东西,而且在测试的环境上做了一些搭建的工作. 首先,安装前需要做一些准备工作. 使用一台pcserver作为测试服务器,同时使用Oracle VM VirtualBox来作为虚拟机的服务器. 新建了三个虚拟机以后,安装linux,我安装的linux的版本是redhat linux 5.4 x64版本.

Hadoop Corona介绍

- - 董的博客
Dong | 可以转载, 但必须以超链接形式标明文章原始出处和作者信息及 版权声明. 网址: http://dongxicheng.org/hadoop-corona/hadoop-corona/. Hadoop Corona是facebook开源的下一代MapReduce框架. 其基本设计动机和Apache的YARN一致,在此不再重复,读者可参考我的这篇文章 “下一代Apache Hadoop MapReduce框架的架构”.

Hadoop RPC机制

- - 企业架构 - ITeye博客
RPC(Remote Procedure Call Protocol)远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议. Hadoop底层的交互都是通过 rpc进行的. 例如:datanode和namenode 、tasktracker和jobtracker、secondary namenode和namenode之间的通信都是通过rpc实现的.

Hadoop Rumen介绍

- - 董的博客
Dong | 新浪微博: 西成懂 | 可以转载, 但必须以超链接形式标明文章原始出处和作者信息及 版权声明. 网址: http://dongxicheng.org/mapreduce/hadoop-rumen-introduction/. 什么是Hadoop Rumen?. Hadoop Rumen是为Hadoop MapReduce设计的日志解析和分析工具,它能够将JobHistory 日志解析成有意义的数据并格式化存储.

Hadoop contrib介绍

- - 董的博客
Dong | 可以转载, 但必须以超链接形式标明文章原始出处和作者信息及 版权声明. 网址: http://dongxicheng.org/mapreduce/hadoop-contrib/. Hadoop Contrib是Hadoop代码中第三方公司贡献的工具包,一般作为Hadoop kernel的扩展功能,它包含多个非常有用的扩展包,本文以Hadoop 1.0为例对Hadoop Contrib中的各个工具包进行介绍.