hadoop 二次排序
- - 企业架构 - ITeye博客hadoop的工作流程:. 是在key中,排序value的实现,思路是. 1.把value中需要有序的部分value-part放入key中. 2.sortCompare类或key的CompareTo方法中完成对key+value-part的比较. 3.GroupingCompare中只对key进行比较,这样相同的key跌倒获取到reduce中.
秦东亮;72 秦东亮;34 秦东亮;100 三劫;899 三劫;32 三劫;1 a;45 b;567 b;12
a 45 b 12,567 三劫 1,32,899 秦东亮 34,72,100
a 45 b 12 b 567 三劫 1 三劫 32 三劫 899 秦东亮 34 秦东亮 72 秦东亮 100
package com.qin.groupsort;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import com.qin.operadb.PersonRecoder;
import com.qin.operadb.ReadMapDB;
/**
* @author qindongliang
*
* 大数据交流群:376932160
*
*
* **/
public class GroupSort {
/**
* map任务
*
* */
public static class GMapper extends Mapper<LongWritable, Text, DescSort, IntWritable>{
private DescSort tx=new DescSort();
private IntWritable second=new IntWritable();
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
System.out.println("执行map");
// System.out.println("进map了");
//mos.write(namedOutput, key, value);
String ss[]=value.toString().split(";");
String mkey=ss[0];
int mvalue=Integer.parseInt(ss[1]);
tx.setFirstKey(mkey);
tx.setSecondKey(mvalue);
second.set(mvalue);
context.write(tx, second);
}
}
/***
* Reduce任务
*
* **/
public static class GReduce extends Reducer<DescSort, IntWritable, Text, Text>{
@Override
protected void reduce(DescSort arg0, Iterable<IntWritable> arg1, Context ctx)
throws IOException, InterruptedException {
System.out.println("执行reduce");
StringBuffer sb=new StringBuffer();
for(IntWritable t:arg1){
// sb.append(t).append(",");
//con
ctx.write(new Text(arg0.getFirstKey()), new Text(t.toString()));
/**这种写法,是这种输出
*a 45
*b 12
b 567
三劫 1
三劫 32
三劫 899
秦东亮 34
秦东亮 72
秦东亮 100
*/
}
if(sb.length()>0){
sb.deleteCharAt(sb.length()-1);//删除最后一位的逗号
}
// 在循环里拼接,在循环外输出是这种格式
// b 12,567
// 三劫 1,32,899
// 秦东亮 34,72,100
// ctx.write(new Text(arg0.getFirstKey()), new Text(sb.toString()));
}
}
/***
*
* 自定义组合键
* **/
public static class DescSort implements WritableComparable{
public DescSort() {
// TODO Auto-generated constructor stub
}
private String firstKey;
private int secondKey;
public String getFirstKey() {
return firstKey;
}
public void setFirstKey(String firstKey) {
this.firstKey = firstKey;
}
public int getSecondKey() {
return secondKey;
}
public void setSecondKey(int secondKey) {
this.secondKey = secondKey;
}
// @Override
// public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3,
// int arg4, int arg5) {
// return -super.compare(arg0, arg1, arg2, arg3, arg4, arg5);//注意使用负号来完成降序
// }
//
// @Override
// public int compare(Object a, Object b) {
//
// return -super.compare(a, b);//注意使用负号来完成降序
// }
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
firstKey=in.readUTF();
secondKey=in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(firstKey);
out.writeInt(secondKey);
}
@Override
public int compareTo(Object o) {
// TODO Auto-generated method stub
DescSort d=(DescSort)o;
//this在前代表升序
return this.getFirstKey().compareTo(d.getFirstKey());
}
}
/**
* 主要就是对于分组进行排序,分组只按照组建键中的一个值进行分组
*
* **/
public static class TextComparator extends WritableComparator{
public TextComparator() {
// TODO Auto-generated constructor stub
super(DescSort.class,true);//注册Comparator
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
System.out.println("执行TextComparator分组排序");
DescSort d1=(DescSort)a;
DescSort d2=(DescSort)b;
return d1.getFirstKey().compareTo(d2.getFirstKey());
}
}
/**
* 组内排序的策略
* 按照第二个字段排序
*
* */
public static class TextIntCompartator extends WritableComparator{
public TextIntCompartator() {
super(DescSort.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
DescSort d1=(DescSort)a;
DescSort d2=(DescSort)b;
System.out.println("执行组内排序TextIntCompartator");
if(!d1.getFirstKey().equals(d2.getFirstKey())){
return d1.getFirstKey().compareTo(d2.getFirstKey());
}else{
return d1.getSecondKey()-d2.getSecondKey();//0,-1,1
}
}
}
/**
* 分区策略
*
* */
public static class KeyPartition extends Partitioner<DescSort, IntWritable>{
@Override
public int getPartition(DescSort key, IntWritable arg1, int arg2) {
// TODO Auto-generated method stub
System.out.println("执行自定义分区KeyPartition");
return (key.getFirstKey().hashCode()&Integer.MAX_VALUE)%arg2;
}
}
public static void main(String[] args) throws Exception{
JobConf conf=new JobConf(ReadMapDB.class);
//Configuration conf=new Configuration();
conf.set("mapred.job.tracker","192.168.75.130:9001");
//读取person中的数据字段
conf.setJar("tt.jar");
//注意这行代码放在最前面,进行初始化,否则会报
/**Job任务**/
Job job=new Job(conf, "testpartion");
job.setJarByClass(GroupSort.class);
System.out.println("模式: "+conf.get("mapred.job.tracker"));;
// job.setCombinerClass(PCombine.class);
// job.setNumReduceTasks(3);//设置为3
job.setMapperClass(GMapper.class);
job.setReducerClass(GReduce.class);
/**设置分区函数*/
job.setPartitionerClass(KeyPartition.class);
//分组函数,Reduce前的一次排序
job.setGroupingComparatorClass(TextComparator.class);
//组内排序Map输出完毕后,对key进行的一次排序
job.setSortComparatorClass(TextIntCompartator.class);
//TextComparator.class
//TextIntCompartator.class
// job.setGroupingComparatorClass(TextIntCompartator.class);
//组内排序Map输出完毕后,对key进行的一次排序
// job.setSortComparatorClass(TextComparator.class);
job.setMapOutputKeyClass(DescSort.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
String path="hdfs://192.168.75.130:9000/root/outputdb";
FileSystem fs=FileSystem.get(conf);
Path p=new Path(path);
if(fs.exists(p)){
fs.delete(p, true);
System.out.println("输出路径存在,已删除!");
}
FileInputFormat.setInputPaths(job, "hdfs://192.168.75.130:9000/root/input");
FileOutputFormat.setOutputPath(job,p );
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
模式: 192.168.75.130:9001 输出路径存在,已删除! WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. INFO - FileInputFormat.listStatus(237) | Total input paths to process : 1 WARN - NativeCodeLoader.<clinit>(52) | Unable to load native-hadoop library for your platform... using builtin-java classes where applicable WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_201404152114_0003 INFO - JobClient.monitorAndPrintJob(1393) | map 0% reduce 0% INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 0% INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 33% INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 100% INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_201404152114_0003 INFO - Counters.log(585) | Counters: 29 INFO - Counters.log(587) | Job Counters INFO - Counters.log(589) | Launched reduce tasks=1 INFO - Counters.log(589) | SLOTS_MILLIS_MAPS=7040 INFO - Counters.log(589) | Total time spent by all reduces waiting after reserving slots (ms)=0 INFO - Counters.log(589) | Total time spent by all maps waiting after reserving slots (ms)=0 INFO - Counters.log(589) | Launched map tasks=1 INFO - Counters.log(589) | Data-local map tasks=1 INFO - Counters.log(589) | SLOTS_MILLIS_REDUCES=9807 INFO - Counters.log(587) | File Output Format Counters INFO - Counters.log(589) | Bytes Written=86 INFO - Counters.log(587) | FileSystemCounters INFO - Counters.log(589) | FILE_BYTES_READ=162 INFO - Counters.log(589) | HDFS_BYTES_READ=205 INFO - Counters.log(589) | FILE_BYTES_WRITTEN=111232 INFO - Counters.log(589) | HDFS_BYTES_WRITTEN=86 INFO - Counters.log(587) | File Input Format Counters INFO - Counters.log(589) | Bytes Read=93 INFO - Counters.log(587) | Map-Reduce Framework INFO - Counters.log(589) | Map output materialized bytes=162 INFO - Counters.log(589) | Map input records=9 INFO - Counters.log(589) | Reduce shuffle bytes=162 INFO - Counters.log(589) | Spilled Records=18 INFO - Counters.log(589) | Map output bytes=138 INFO - Counters.log(589) | Total committed heap usage (bytes)=176033792 INFO - Counters.log(589) | CPU time spent (ms)=970 INFO - Counters.log(589) | Combine input records=0 INFO - Counters.log(589) | SPLIT_RAW_BYTES=112 INFO - Counters.log(589) | Reduce input records=9 INFO - Counters.log(589) | Reduce input groups=4 INFO - Counters.log(589) | Combine output records=0 INFO - Counters.log(589) | Physical memory (bytes) snapshot=258830336 INFO - Counters.log(589) | Reduce output records=9 INFO - Counters.log(589) | Virtual memory (bytes) snapshot=1461055488 INFO - Counters.log(589) | Map output records=9
a 45 b 12 b 567 三劫 1 三劫 32 三劫 899 秦东亮 34 秦东亮 72 秦东亮 100