MapReduce之二次排序
- - CSDN博客云计算推荐文章 面试官问了一个MapReduce问题:“如何用MapReduce实现两个表的连接”. 我说“用两个job实现,第一个. 于是这两天回来看了下MapReduce的二次排序. 在某些情况下,需要对reduce中的value进行排序. 二次排序,可以将根据key聚合起来的valueList根据value进行排序.
面试官问了一个MapReduce问题:“如何用MapReduce实现两个表的连接”。
我说“用两个job实现,第一个。。。,第二个。。。”
“还要用两个?二次排序会不会?”
“不会”。于是这两天回来看了下MapReduce的二次排序。
在某些情况下,需要对reduce中的value进行排序。而这时,可以利用二次排序。二次排序,可以将根据key聚合起来的valueList根据value进行排序。
输入数据 | 输出数据 |
1 2
3 4
5 6
7 8
7 82
12 211
20 21
20 53
20 522
31 42
40 511
50 51
50 52
50 53
50 53
50 54
50 62
50 512
50 522
60 51
60 52
60 53
60 56
60 56
60 57
60 57
60 61
63 61
70 54
70 55
70 56
70 57
70 58
70 58
71 55
71 56
73 57
74 58
203 21
530 54
730 54
740 58
|
1 2
3 4
5 6
7 8
7 82
12 211
20 21
20 53
20 522
31 42
40 511
50 51
50 52
50 53
50 53
50 54
50 62
50 512
50 522
60 51
60 52
60 53
60 56
60 56
60 57
60 57
60 61
63 61
70 54
70 55
70 56
70 57
70 58
70 58
71 55
71 56
73 57
74 58
203 21
530 54
730 54
740 58
|
/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.hadoop.examples; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.util.GenericOptionsParser; /** * This is an example Hadoop Map/Reduce application. * It reads the text input files that must contain two integers per a line. * The output is sorted by the first and second number and grouped on the * first number. * * To run: bin/hadoop jar build/hadoop-examples.jar secondarysort * <i>in-dir</i> <i>out-dir</i> */ public class SecondarySort { /** * Define a pair of integers that are writable. * They are serialized in a byte comparable format. */ public static class IntPair implements WritableComparable<IntPair> { private int first = 0; private int second = 0; /** * Set the left and right values. */ public void set(int left, int right) { first = left; second = right; } public int getFirst() { return first; } public int getSecond() { return second; } /** * Read the two integers. * Encoded as: MIN_VALUE -> 0, 0 -> -MIN_VALUE, MAX_VALUE-> -1 */ @Override public void readFields(DataInput in) throws IOException { first = in.readInt() + Integer.MIN_VALUE; second = in.readInt() + Integer.MIN_VALUE; } @Override public void write(DataOutput out) throws IOException { out.writeInt(first - Integer.MIN_VALUE); out.writeInt(second - Integer.MIN_VALUE); } @Override public int hashCode() { return first * 157 + second; } @Override public boolean equals(Object right) { if (right instanceof IntPair) { IntPair r = (IntPair) right; return r.first == first && r.second == second; } else { return false; } } /** A Comparator that compares serialized IntPair. */ public static class Comparator extends WritableComparator { public Comparator() { super(IntPair.class); } public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return compareBytes(b1, s1, l1, b2, s2, l2); } } static { // register this comparator WritableComparator.define(IntPair.class, new Comparator()); } @Override public int compareTo(IntPair o) { if (first != o.first) { return first < o.first ? -1 : 1; } else if (second != o.second) { return second < o.second ? -1 : 1; } else { return 0; } } } /** * Partition based on the first part of the pair. */ public static class FirstPartitioner extends Partitioner<IntPair,IntWritable>{ @Override public int getPartition(IntPair key, IntWritable value, int numPartitions) { return Math.abs(key.getFirst() * 127) % numPartitions; } } /** * Compare only the first part of the pair, so that reduce is called once * for each value of the first part. */ public static class FirstGroupingComparator implements RawComparator<IntPair> { @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return WritableComparator.compareBytes(b1, s1, Integer.SIZE/8, b2, s2, Integer.SIZE/8); } @Override public int compare(IntPair o1, IntPair o2) { int l = o1.getFirst(); int r = o2.getFirst(); return l == r ? 0 : (l < r ? -1 : 1); } } /** * Read two integers from each line and generate a key, value pair * as ((left, right), right). */ public static class MapClass extends Mapper<LongWritable, Text, IntPair, IntWritable> { private final IntPair key = new IntPair(); private final IntWritable value = new IntWritable(); @Override public void map(LongWritable inKey, Text inValue, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(inValue.toString()); int left = 0; int right = 0; if (itr.hasMoreTokens()) { left = Integer.parseInt(itr.nextToken()); if (itr.hasMoreTokens()) { right = Integer.parseInt(itr.nextToken()); } key.set(left, right); value.set(right); context.write(key, value); } } } /** * A reducer class that just emits the sum of the input values. */ public static class Reduce extends Reducer<IntPair, IntWritable, Text, IntWritable> { private static final Text SEPARATOR = new Text("------------------------------------------------"); private final Text first = new Text(); @Override public void reduce(IntPair key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { context.write(SEPARATOR, null); first.set(Integer.toString(key.getFirst())); for(IntWritable value: values) { context.write(first, value); } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: secondarysort <in> <out>"); System.exit(2); } Job job = new Job(conf, "secondary sort"); job.setJarByClass(SecondarySort.class); job.setMapperClass(MapClass.class); job.setReducerClass(Reduce.class); // group and partition by the first int in the pair job.setPartitionerClass(FirstPartitioner.class); job.setGroupingComparatorClass(FirstGroupingComparator.class); // the map output is IntPair, IntWritable job.setMapOutputKeyClass(IntPair.class); job.setMapOutputValueClass(IntWritable.class); // the reduce output is Text, IntWritable job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }