hadoop实现共同出现的单词(Word co-occurrence)
共同出现的单词(Word co-occurrence)是指在一个句子中相邻的两个单词。每一个相邻的单词就是一个Co-Occurrence对。
Sample Input:
a b cc, c d d c
I Love U.
dd ee f g s sa dew ad da
So shaken as we are, so wan with care.
Find we a time for frighted peace to pant.
And breathe short-winded accents of new broil.
To be commenced in strands afar remote.
I Love U U love i.
i i i i
Sample Output:
a:b 1
a:time 1
a:we 1
accents:of 1
accents:short-winded 1
ad:da 1
ad:dew 1
afar:remote 1
afar:strands 1
and:breathe 1
are:so 1
are:we 1
as:shaken 1
as:we 1
b:cc 1
be:commenced 1
be:to 1
breathe:short-winded 1
broil:new 1
c:cc 1
c:d 2
care:with 1
commenced:in 1
d:d 1
dd:ee 1
dew:sa 1
ee:f 1
f:g 1
find:we 1
for:frighted 1
for:time 1
frighted:peace 1
g:s 1
i:i 3
i:love 3
in:strands 1
love:u 3
new:of 1
pant:to 1
peace:to 1
s:sa 1
shaken:so 1
so:wan 1
u:u 1
wan:with 1
Code:
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.util.GenericOptionsParser; public class CoOccurrence { public static class TextPair implements WritableComparable<TextPair> { private Text first; private Text second; public TextPair(){ set(new Text(), new Text()); } public TextPair(String left, String right) { set(new Text(left), new Text(right)); } public TextPair(Text left, Text right) { set(left, right); } public void set(Text left, Text right){ String l = left.toString(); String r = right.toString(); int cmp = l.compareTo(r); if(cmp <= 0){ this.first = left; this.second = right; }else{ this.first = right; this.second = left; } } public Text getFirst() { return first; } public Text getSecond() { return second; } @Override public void readFields(DataInput in) throws IOException { first.readFields(in); second.readFields(in); } @Override public void write(DataOutput out) throws IOException { first.write(out); second.write(out); } @Override public int hashCode() { return first.hashCode() * 163 + second.hashCode();//May be some trouble here. why 163? sometimes 157 } @Override public boolean equals(Object o) { if (o instanceof TextPair) { TextPair tp = (TextPair) o; return first.equals(tp.first) && second.equals(tp.second); } return false; } @Override public String toString(){ return first + ":" + second; } @Override public int compareTo(TextPair tp) { int cmp = first.compareTo(tp.first); if(cmp != 0) return cmp; return second.compareTo(tp.second); } // A Comparator that com.pares serialized StringPair. public static class Comparator extends WritableComparator { private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator(); public Comparator() { super(TextPair.class); } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2){ try { int firstl1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1); int firstl2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2); int cmp = TEXT_COMPARATOR.compare(b1, s1, firstl1, b2, s2, firstl2); if(cmp != 0) return cmp; return TEXT_COMPARATOR.compare(b1, s1 + firstl1, l1 - firstl1, b2, s2 + firstl2, l1 - firstl2); }catch (IOException e) { throw new IllegalArgumentException(e); } } }//End of Comparator static { // register this comparator WritableComparator.define(TextPair.class, new Comparator()); } // Compare only the first part of the pair, so that reduce is called once for each value of the first part. public static class FirstComparator extends WritableComparator { private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator(); public FirstComparator() { super(TextPair.class); } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2){ try { int firstl1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1); int firstl2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2); return TEXT_COMPARATOR.compare(b1, s1, firstl1, b2, s2, firstl2); }catch (IOException e) { throw new IllegalArgumentException(e); } } /* @Override public int compare(WritableComparator a, WritableComparator b) { if(a instanceof TextPair && b instanceof TextPair) return ((TextPair)a).first.compareTo(((TextPair)b).first); return super.compare(a, b); }*/ }//End of FirstComparator }//End of TextPair //Partition based on the first part of the pair. public static class FirstPartitioner extends Partitioner<TextPair,IntWritable>{ @Override public int getPartition(TextPair key, IntWritable value, int numPartitions) { return Math.abs(key.getFirst().toString().indexOf(0) * 127) % numPartitions;//May be some trouble here. } }//End of FirstPartitioner public static class MyMapper extends Mapper<LongWritable, Text, TextPair, IntWritable> { private final static IntWritable one = new IntWritable(1); private static Text word0 = new Text(); private static Text word1 = new Text(); private String pattern = "[^a-zA-Z0-9-']"; @Override public void map(LongWritable inKey, Text inValue, Context context)throws IOException, InterruptedException { String line = inValue.toString(); line = line.replaceAll(pattern, " "); line = line.toLowerCase(); String[] str = line.split(" +"); for(int i=0; i< str.length-1; i++) { word0.set(str[i]); word1.set(str[i+1]); TextPair pair = new TextPair(word0, word1); context.write(pair, one); } } }//End of MapClass public static class MyReducer extends Reducer<TextPair, IntWritable, TextPair, IntWritable> { private IntWritable result = new IntWritable(); @Override public void reduce(TextPair inKey, Iterable<IntWritable> inValues, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : inValues) { sum += val.get(); } result.set(sum); context.write(inKey, result); } }//End of MyReducer public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); //conf.set("Hadoop.job.ugi", "sunguoli,cs402"); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); //if (otherArgs.length != 2) { // System.err.println("Usage: CoOccurrence <in> <out>"); // System.exit(2); //} Job job = new Job(conf, "Co-Occurrence"); job.setJarByClass(CoOccurrence.class); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(TextPair.class); job.setMapOutputValueClass(IntWritable.class); job.setCombinerClass(MyReducer.class); // group and partition by the first int in the pair //job.setPartitionerClass(FirstPartitioner.class); //job.setGroupingComparatorClass(FirstGroupingComparator.class); // the reduce output is Text, IntWritable job.setReducerClass(MyReducer.class); job.setOutputKeyClass(TextPair.class); job.setOutputValueClass(IntWritable.class); //FileInputFormat.addInputPath(job, new Path("../shakespeareinput")); //FileOutputFormat.setOutputPath(job, new Path("output")); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }//End of main }//End of CoOccurrence