Spark算子:RDD键值转换操作(4)–cogroup、join – lxw的大数据田地

标签: spark 算子 rdd | 发表时间:2017-09-10 13:08 | 作者:
出处:http://lxw1234.com

关键字:Spark算子、Spark RDD键值转换、cogroup、join

cogroup

##参数为1个RDD

def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]

def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))]

def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))]

 

##参数为2个RDD

def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]

def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]

def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]

 

##参数为3个RDD

def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]

def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]

def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]

 

cogroup相当于SQL中的全外关联full outer join,返回左右RDD中的记录,关联不上的为空。

参数numPartitions用于指定结果的分区数。

参数partitioner用于指定分区函数。

##参数为1个RDD的例子

  1. var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
  2. var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2)
  3.  
  4. scala> var rdd3 =rdd1.cogroup(rdd2)rdd3: org.apache.spark.rdd.RDD[(String, (Iterable[String], Iterable[String]))] = MapPartitionsRDD[12] at cogroup at :25
  5.  
  6. scala> rdd3.partitions.size
  7. res3: Int = 2
  8.  
  9. scala> rdd3.collect
  10. res1: Array[(String, (Iterable[String], Iterable[String]))] = Array(
  11. (B,(CompactBuffer(2),CompactBuffer())),
  12. (D,(CompactBuffer(),CompactBuffer(d))),
  13. (A,(CompactBuffer(1),CompactBuffer(a))),
  14. (C,(CompactBuffer(3),CompactBuffer(c)))
  15. )
  16.  
  17.  
  18. scala> var rdd4 =rdd1.cogroup(rdd2,3)rdd4: org.apache.spark.rdd.RDD[(String, (Iterable[String], Iterable[String]))] = MapPartitionsRDD[14] at cogroup at :25
  19.  
  20. scala> rdd4.partitions.size
  21. res5: Int = 3
  22.  
  23. scala> rdd4.collect
  24. res6: Array[(String, (Iterable[String], Iterable[String]))] = Array(
  25. (B,(CompactBuffer(2),CompactBuffer())),
  26. (C,(CompactBuffer(3),CompactBuffer(c))),
  27. (A,(CompactBuffer(1),CompactBuffer(a))),
  28. (D,(CompactBuffer(),CompactBuffer(d))))

##参数为2个RDD的例子

  1. var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
  2. var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2)
  3. var rdd3 = sc.makeRDD(Array(("A","A"),("E","E")),2)
  4.  
  5. scala> var rdd4 =rdd1.cogroup(rdd2,rdd3)rdd4: org.apache.spark.rdd.RDD[(String, (Iterable[String], Iterable[String], Iterable[String]))] =
  6. MapPartitionsRDD[17] at cogroup at :27
  7.  
  8. scala> rdd4.partitions.size
  9. res7: Int = 2
  10.  
  11. scala> rdd4.collect
  12. res9: Array[(String, (Iterable[String], Iterable[String], Iterable[String]))] = Array(
  13. (B,(CompactBuffer(2),CompactBuffer(),CompactBuffer())),
  14. (D,(CompactBuffer(),CompactBuffer(d),CompactBuffer())),
  15. (A,(CompactBuffer(1),CompactBuffer(a),CompactBuffer(A))),
  16. (C,(CompactBuffer(3),CompactBuffer(c),CompactBuffer())),
  17. (E,(CompactBuffer(),CompactBuffer(),CompactBuffer(E))))

##参数为3个RDD示例略,同上。

join

def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]

def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]

def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]

 

join相当于SQL中的内关联join,只返回两个RDD根据K可以关联上的结果,join只能用于两个RDD之间的关联,如果要多个RDD关联,多关联几次即可。

参数numPartitions用于指定结果的分区数

参数partitioner用于指定分区函数

  1. var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
  2. var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2)
  3.  
  4. scala> rdd1.join(rdd2).collect
  5. res10: Array[(String, (String, String))] = Array((A,(1,a)), (C,(3,c)))

更多关于Spark算子的介绍,可参考Spark算子系列文章

http://lxw1234.com/archives/2015/07/363.htm

如果觉得本博客对您有帮助,请赞助作者

转载请注明:lxw的大数据田地»Spark算子:RDD键值转换操作(4)–cogroup、join

喜欢 (10)分享 (0)

相关 [spark 算子 rdd] 推荐:

Spark和RDD模型研究

- - CSDN博客云计算推荐文章
现今分布式计算框架像MapReduce和Dryad都提供了高层次的原语,使用户不用操心任务分发和错误容忍,非常容易地编写出并行计算程序. 然而这些框架都缺乏对分布式内存的抽象和支持,使其在某些应用场景下不够高效和强大. RDD(Resilient Distributed Datasets弹性分布式数据集)模型的产生动机主要来源于两种主流的应用场景:.

Spark算子:RDD键值转换操作(4)–cogroup、join – lxw的大数据田地

- -
关键字:Spark算子、Spark RDD键值转换、cogroup、join. cogroup相当于SQL中的全外关联full outer join,返回左右RDD中的记录,关联不上的为空. 参数numPartitions用于指定结果的分区数. 参数partitioner用于指定分区函数. ##参数为3个RDD示例略,同上.

Spark RDD弹性表现和来源

- - 开源软件 - ITeye博客
hadoop 的MapReduce是基于数据集的,位置感知,容错 负载均衡. 基于数据集的处理:从物理存储上加载数据,然后操作数据,然后写入物理存储设备;. 基于数据集的操作不适应的场景:. 重点是:基于数据流的方式 不能够复用曾经的结果或者中间计算结果;. spark RDD是基于工作集的. 工作流和工作集的共同特点:位置感知,自动容错,负载均衡等.

Spark常用函数讲解之键值RDD转换 - MOBIN - 博客园

- -
RDD:弹性分布式数据集,是一种特殊集合 ‚ 支持多种来源 ‚ 有容错机制 ‚ 可以被缓存 ‚ 支持并行操作,一个RDD代表一个分区里的数据集.         Transformation(转换):Transformation属于延迟计算,当一个RDD转换成另一个RDD时并没有立即进行转换,仅仅是记住       了数据集的逻辑操作.

Spark概览

- - 简单文本
Spark具有先进的DAG执行引擎,支持cyclic data flow和内存计算. 因此,它的运行速度,在内存中是Hadoop MapReduce的100倍,在磁盘中是10倍. 这样的性能指标,真的让人心动啊. Spark的API更为简单,提供了80个High Level的操作,可以很好地支持并行应用.

Spark与Mapreduce?

- - 崔永键的博客
我本人是类似Hive平台的系统工程师,我对MapReduce的熟悉程度是一般,它是我的底层框架. 我隔壁组在实验Spark,想将一部分计算迁移到Spark上. 年初的时候,看Spark的评价,几乎一致表示,Spark是小数据集上处理复杂迭代的交互系统,并不擅长大数据集,也没有稳定性. 但是最近的风评已经变化,尤其是14年10月他们完成了Peta sort的实验,这标志着Spark越来越接近替代Hadoop MapReduce了.

Spark迷思

- - ITeye博客
目前在媒体上有很大的关于Apache Spark框架的声音,渐渐的它成为了大数据领域的下一个大的东西. 证明这件事的最简单的方式就是看google的趋势图:. 上图展示的过去两年Hadoop和Spark的趋势. Spark在终端用户之间变得越来越受欢迎,而且这些用户经常在网上找Spark相关资料. 这给了Spark起了很大的宣传作用;同时围绕着它的也有误区和思维错误,而且很多人还把这些误区作为银弹,认为它可以解决他们的问题并提供比Hadoop好100倍的性能.

Spark 优化

- - CSDN博客推荐文章
提到Spark与Hadoop的区别,基本最常说的就是Spark采用基于内存的计算方式,尽管这种方式对数据处理的效率很高,但也会往往引发各种各样的问题,Spark中常见的OOM等等. 效率高的特点,注定了Spark对性能的严苛要求,那Spark不同程序的性能会碰到不同的资源瓶颈,比如:CPU,带宽、内存.

Spark&Spark性能调优实战

- - CSDN博客互联网推荐文章
       Spark特别适用于多次操作特定的数据,分mem-only和mem & disk. 其中mem-only:效率高,但占用大量的内存,成本很高;mem & disk:内存用完后,会自动向磁盘迁移,解决了内存不足的问题,却带来了数据的置换的消费. Spark常见的调优工具有nman、Jmeter和Jprofile,以下是Spark调优的一个实例分析:.

Mesos上部署spark

- - 开源小站
还是回到之前一直持续的 Mesos话题. 在之前的环节里,我们已经尝试了Mesos的安装,Marathon守护服务以及相对比较主流的Mesos作为Hadoop的资源管理器的实际操作. 这次就说说同属于伯克利出品的Spark. 其实spark最初0.7以前的版本还没有自己的资源管理系统,资源的调度都是通过Mesos来执行的.