如何使用hadoop对海量数据进行统计并排序
- - ITeye博客hadoop在如下的几种应用场景里,用的还是非常广泛的,1,搜索引擎建索引,2,topK热关键词统计,3,海量日志的数据分析等等. 散仙,今天的这个例子的场景要对几亿的单词或短语做统计和并按词频排序,当然这些需求都是类似WordCount问题,如果你把Hadoop自带的WordCount的例子,给搞懂了,基本上做一些IP,热词的统计与分析就很很容易了,WordCount问题,确实是一个非常具有代表性的例子.
/** * 统计词频的map端 * 代码 * * **/ public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { String [] data=value.toString().split(";");//按每行的分号拆分短语 for(String s:data){ if(s.trim().length()>0){//忽略空字符 word.set(s); context.write(word, one); } } }
/** * reduce端的 * 代码 * **/ public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get();//累加词频 } result.set(sum); context.write(new Text(key.toString()+"::"), result);//为方便短语排序,以双冒号分隔符间隔 } }
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }
a good student:: 1 good student:: 3 patient:: 2 patient a:: 1
/** * 排序作业 * map的实现 * * **/ @Override protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException { String s[]=value.toString().split("::");//按两个冒号拆分每行数据 word.set(s[0]);// one.set(Integer.parseInt(s[1].trim()));// context.write(one, word);//注意,此部分,需要反转K,V顺序 }
/*** * * 排序作业的 * reduce代码 * **/ @Override protected void reduce(IntWritable arg0, Iterable<Text> arg1, Context arg2) throws IOException, InterruptedException { for(Text t:arg1){ result.set(t.toString()); arg2.write(result, arg0); } }
/*** * 按词频降序排序 * 的类 * * **/ public static class DescSort extends WritableComparator{ public DescSort() { super(IntWritable.class,true);//注册排序组件 } @Override public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4, int arg5) { return -super.compare(arg0, arg1, arg2, arg3, arg4, arg5);//注意使用负号来完成降序 } @Override public int compare(Object a, Object b) { return -super.compare(a, b);//注意使用负号来完成降序 } }
public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job=new Job(conf, "sort"); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(Text.class); job.setMapperClass(SortIntValueMapper.class); job.setReducerClass(SortIntValueReducer.class) ; job.setSortComparatorClass(DescSort.class);//加入排序组件 job.setInputFormatClass(org.apache.hadoop.mapreduce.lib.input.TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }
good student 3 patient 2 a good student 1 patient a 1