Mapreduce实例-分组排重(group by distinct)

标签: mapreduce 实例 分组 | 发表时间:2013-09-06 06:17 | 作者:liuzhoulong
出处:http://blog.csdn.net

需要实现以下几个类,代码太多,列了下主要代码,可根据排重数据的特征判读是否需要添加combiner来提速。

 

public class GroupComparator implements RawComparator<MyBinaryKey> {
 
 @Override
 public int compare(MyBinaryKey o1, MyBinaryKey o2) {
  return o1.toString().compareTo(o2.toString());
 }

 @Override
 public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
  return WritableComparator.compareBytes(b1, s1, Long.SIZE / 8 + Integer.SIZE / 8 * 3, b2, s2,  Long.SIZE / 8 + Integer.SIZE / 8 * 3);
 }

}

public abstract class UVBinaryKey  extends BinaryComparable implements WritableComparable<BinaryComparable>{
 //根据需要添加属性;
  @Override
 public void readFields(DataInput in) throws IOException {

} 

@Override
 public byte[] getBytes() {

}

}

public class MyPartitioner extends Partitioner<MyBinaryKey, NullWritable> {

 /**
  * 根据uv/ip取模分区,保证相同uv/ip落在同一分区
  */
 @Override
 public int getPartition(MyBinaryKey key, NullWritable value, int numPartitions) {
  
  int k=0;
  for(byte b : key.getAttr()){
   k+=b&0xff;
  }
  return k%numPartitions;
 }

}



  job.setMapOutputKeyClass(UVBinaryKey.class);
  job.setGroupingComparatorClass(GroupComparator.class);
   job.setPartitionerClass(MyPartitioner.class);

map略
combiner(根据需要添加)
reduce中的实现:
       @Override
        protected void reduce(UVBinaryKey key, Iterable<NullWritable> values, Context context)
                throws IOException,
                InterruptedException {
            long count = 0;
            byte[] tbsign = null;
            for (NullWritable nullWritable : values) {
                byte[] attr = key.getAttr();
                if (tbsign == null) {
                    tbsign = attr;
                    count++;
                }
                if (tbsign != null) {
                    if (tbsign.length != attr.length) {
                        count++;
                        tbsign = attr;
                    } else {
                        for (int i = 0; i < tbsign.length; i++) {
                            if (tbsign[i] != attr[i]) {
                                count++;
                                tbsign = attr;
                                break;
                            }
                        }
                    }
                }

            }
            StringBuffer out = new StringBuffer();
            out.append(new String(key.getCity()))
                    .append(Constants.FIELDS_TERMINATED).append(count);
            context.write(new Text(out.toString()), NullWritable.get());

        }


 

 

 

 

作者:liuzhoulong 发表于2013-9-5 22:17:26 原文链接
阅读:74 评论:0 查看评论

相关 [mapreduce 实例 分组] 推荐:

Mapreduce实例-分组排重(group by distinct)

- - CSDN博客云计算推荐文章
需要实现以下几个类,代码太多,列了下主要代码,可根据排重数据的特征判读是否需要添加combiner来提速. job.setPartitionerClass(MyPartitioner.class); map略. combiner(根据需要添加) reduce中的实现:. 作者:liuzhoulong 发表于2013-9-5 22:17:26 原文链接.

mapreduce实例-Join连接 (reduce Side Join)

- - CSDN博客云计算推荐文章
//根据连接类型做不同处理. //设置不同map处理不同输入. 外键作为map输出的key,相同的外键值必然落在一个reduce中,在reduce端根据需要做不同形式的连接. 作者:liuzhoulong 发表于2013-9-5 21:35:16 原文链接. 阅读:83 评论:0 查看评论.

Mapreduce小结

- MAGI-CASPER/Peter Pan - 博客园-唯有前进值得敬仰
读完mapreduce论文小结一下. 1.MapReduce是一个编程模型,封装了并行计算、容错、数据分布、负载均衡等细节问题. 输入是一个key-value对的集合,中间输出也是key-value对的集合,用户使用两个函数:Map和Reduce. Map函数接受一个输入的key-value对,然后产生一个中间key-value 对的集合.

Hadoop MapReduce技巧

- - 简单文本
我在使用Hadoop编写MapReduce程序时,遇到了一些问题,通过在Google上查询资料,并结合自己对Hadoop的理解,逐一解决了这些问题. Hadoop对MapReduce中Key与Value的类型是有要求的,简单说来,这些类型必须支持Hadoop的序列化. 为了提高序列化的性能,Hadoop还为Java中常见的基本类型提供了相应地支持序列化的类型,如IntWritable,LongWritable,并为String类型提供了Text类型.

MapReduce原理

- - C++博客-牵着老婆满街逛
       MapReduce 是由Google公司的Jeffrey Dean 和 Sanjay Ghemawat 开发的一个针对大规模群组中的海量数据处理的分布式编程模型. MapReduce实现了两个功能. Map把一个函数应用于集合中的所有成员,然后返回一个基于这个处理的结果集. 而Reduce是把从两个或更多个Map中,通过多个线程,进程或者独立系统并行执行处理的结果集进行分类和归纳.

MapReduce优化

- - 行业应用 - ITeye博客
相信每个程序员在 编程时都会问自己两个问题“我如何完成这个任务”,以及“怎么能让程序运行得更快”. 同样,MapReduce计算模型的多次优化也是为了更好地解答这两个问题. MapReduce计算模型的优化涉及了方方面面的内容,但是主要集中在两个方面:一是计算性能方面的优化;二是I/O操作方面的优化.

Spark与Mapreduce?

- - 崔永键的博客
我本人是类似Hive平台的系统工程师,我对MapReduce的熟悉程度是一般,它是我的底层框架. 我隔壁组在实验Spark,想将一部分计算迁移到Spark上. 年初的时候,看Spark的评价,几乎一致表示,Spark是小数据集上处理复杂迭代的交互系统,并不擅长大数据集,也没有稳定性. 但是最近的风评已经变化,尤其是14年10月他们完成了Peta sort的实验,这标志着Spark越来越接近替代Hadoop MapReduce了.

Google Percolator替代MapReduce

- Hao - Solidot
Google在新一代内容索引系统中放弃了MapReduce,替代者是尚不为人知的分布式数据处理系统Percolator. The Register报道,Percolator是一种增量处理平台,它能持续更新索引系统,无需从头重新处理一遍整个系统. Google的工程师计划在下个月举行的年度USENIX Symposium 会议上公布Percolator相关论文.

下一代Hadoop MapReduce

- Jia - NoSQLFan
本文来自Hadoop Summit大会的一个演讲稿,主讲是Hadoop核心开发团队的Arun C Murthy (@acmurthy),同时他也是Yahoo!刚刚剥离的Hadoop独立公司Hortonworks的 Founder和架构师. 演讲中他讲述了现在的Hadoop存在的一些问题和集群上限,并展望了下一代Hadoop和其MapReduce将会得到的巨大提升.

MapReduce执行流程

- - CSDN博客云计算推荐文章
MapReduce的大体流程是这样的,如图所示:. 由图片可以看到mapreduce执行下来主要包含这样几个步骤. 1.首先对输入数据源进行切片. 2.master调度worker执行map任务. 3.worker读取输入源片段. 4.worker执行map任务,将任务输出保存在本地. 5.master调度worker执行reduce任务,reduce worker读取map任务的输出文件.