不可不知的spark shuffle

标签: geek | 发表时间:2018-12-21 00:00 | 作者:
出处:http://itindex.net/relian


640?wx_fmt=pngshuffle概览

一个spark的RDD有一组固定的分区组成,每个分区有一系列的记录组成。对于由窄依赖变换(例如map和filter)返回的RDD,会延续父RDD的分区信息,以pipeline的形式计算。每个对象仅依赖于父RDD中的单个对象。诸如coalesce之类的操作可能导致任务处理多个输入分区,但转换仍然被认为是窄依赖的,因为一个父RDD的分区只会被一个子RDD分区继承。

Spark还支持宽依赖的转换,例如groupByKey和reduceByKey。在这些依赖项中,计算单个分区中的记录所需的数据可以来自于父数据集的许多分区中。要执行这些转换,具有相同key的所有元组必须最终位于同一分区中,由同一任务处理。为了满足这一要求,Spark产生一个shuffle,它在集群内部传输数据,并产生一个带有一组新分区的新stage。

可以看下面的代码片段:

   sc.textFile("someFile.txt").map(mapFunc).flatMap(flatMapFunc).filter(filterFunc).count()

上面的代码片段只有一个action操作,count,从输入textfile到action经过了三个转换操作。这段代码只会在一个stage中运行,因为,三个转换操作没有shuffle,也即是三个转换操作的每个分区都是只依赖于它的父RDD的单个分区。

但是,下面的单词统计就跟上面有很大区别:

   val tokenized = sc.textFile(args(0)).flatMap(_.split(' '))    
val wordCounts = tokenized.map((_,1)).reduceByKey(_ + _)
val filtered = wordCounts.filter(_._2 >=1000)
val charCounts = filtered.flatMap(_._1.toCharArray).map((_,1)).reduceByKey(_ + _)
charCounts.collect()

这段代码里有两个reducebykey操作,三个stage。

下面图更复杂,因为有一个join操作:

640?wx_fmt=png

粉框圈住的就是整个DAG的stage划分。

640?wx_fmt=png

在每个stage的边界,父stage的task会将数据写入磁盘,子stage的task会将数据通过网络读取。由于它们会导致很高的磁盘和网络IO,所以shuffle代价相当高,应该尽量避免。父stage的数据分区往往和子stage的分区数不同。触发shuffle的操作算子往往可以指定分区数的,也即是numPartitions代表下个stage会有多少个分区。就像mr任务中reducer的数据是非常重要的一个参数一样,shuffle的时候指定分区数也将在很大程度上决定一个应用程序的性能。

640?wx_fmt=png优化shuffle

通常情况可以选择使用产生相同结果的action和transform相互替换。但是并不是产生相同结果的算子就会有相同的性能。通常避免常见的陷阱并选择正确的算子可以显著提高应用程序的性能。

当选择转换操作的时候,应最小化shuffle次数和shuffle的数据量。shuffle是非常消耗性能的操作。所有的shuffle数据都会被写入磁盘,然后通过网络传输。repartition , join, cogroup, 和  *By 或者 *ByKey 类型的操作都会产生shuffle。我们可以对一下几个操作算子进行优化:

1. groupByKey某些情况下可以被reducebykey代替。

2. reduceByKey某些情况下可以被 aggregatebykey代替。

3. flatMap-join-groupBy某些情况下可以被cgroup代替。

具体细节,知识星球球友可以点击 阅读 原文进入知识星球阅读。

640?wx_fmt=pngno shuffle

在某些情况下,前面描述的转换操作不会导致shuffle。当先前的转换操作已经使用了和shuffle相同的分区器分区数据的时候,spark就不会产生shuffle。

举个例子:

   rdd1= someRdd.reduceByKey(...)    

rdd2= someOtherRdd.reduceByKey(...)

rdd3= rdd1.join(rdd2)

由于使用redcuebykey的时候没有指定分区器,所以都是使用的默认分区器,会导致rdd1和rdd2都采用的是hash分区器。两个reducebykey操作会产生两个shuffle过程。如果,数据集有相同的分区数,执行join操作的时候就不需要进行额外的shuffle。由于数据集的分区相同,因此rdd1的任何单个分区中的key集合只能出现在rdd2的单个分区中。 因此,rdd3的任何单个输出分区的内容仅取决于rdd1中单个分区的内容和rdd2中的单个分区,并且不需要第三个shuffle。

例如,如果someRdd有四个分区,someOtherRdd有两个分区,而reduceByKeys都使用三个分区,运行的任务集如下所示:

640?wx_fmt=png

如果rdd1和rdd2使用不同的分区器或者相同的分区器不同的分区数,仅仅一个数据集在join的过程中需要重新shuffle

640?wx_fmt=png


在join的过程中为了避免shuffle,可以使用广播变量。当executor内存可以存储数据集,在driver端可以将其加载到一个hash表中,然后广播到executor。然后,map转换可以引用哈希表来执行查找。

640?wx_fmt=png增加shuffle

有时候需要打破最小化shuffle次数的规则。

当增加并行度的时候,额外的shuffle是有利的。例如,数据中有一些文件是不可分割的,那么该大文件对应的分区就会有大量的记录,而不是说将数据分散到尽可能多的分区内部来使用所有已经申请cpu。在这种情况下,使用reparition重新产生更多的分区数,以满足后面转换算子所需的并行度,这会提升很大性能。

使用reduce和aggregate操作将数据聚合到driver端,也是修改区数的很好的例子。

在对大量分区执行聚合的时候,在driver的单线程中聚合会成为瓶颈。要减driver的负载,可以首先使用reducebykey或者aggregatebykey执行一轮分布式聚合,同时将结果数据集分区数减少。实际思路是首先在每个分区内部进行初步聚合,同时减少分区数,然后再将聚合的结果发到driver端实现最终聚合。典型的操作是treeReduce 和 treeAggregate。

当聚合已经按照key进行分组时,此方法特别适用。例如,假如一个程序计算语料库中每个单词出现的次数,并将结果使用map返回到driver。一种方法是可以使用聚合操作完成在每个分区计算局部map,然后在driver中合并map。可以用aggregateByKey以完全分布的方式进行统计,然后简单的用collectAsMap将结果返回到driver。

更多spark技巧,大数据技巧,欢迎点击 阅读原文加入 知识星球

推荐阅读:

经验|如何设置Spark资源

戳破 | hive on spark 调优点

640?wx_fmt=png

相关 [spark shuffle] 推荐:

不可不知的spark shuffle

- - IT瘾-geek
shuffle概览 一个spark的RDD有一组固定的分区组成,每个分区有一系列的记录组成. 对于由窄依赖变换(例如map和filter)返回的RDD,会延续父RDD的分区信息,以pipeline的形式计算. 每个对象仅依赖于父RDD中的单个对象. 诸如coalesce之类的操作可能导致任务处理多个输入分区,但转换仍然被认为是窄依赖的,因为一个父RDD的分区只会被一个子RDD分区继承.

Spark性能优化——和shuffle搏斗

- - 四火的唠叨
Spark的性能分析和调优很有意思,今天再写一篇. 主要话题是shuffle,当然也牵涉一些其他代码上的小把戏. 以前写过一篇文章,比较了 几种不同场景的性能优化,包括portal的性能优化,web service的性能优化,还有Spark job的性能优化. Spark的性能优化有一些特殊的地方,比如实时性一般不在考虑范围之内,通常我们用Spark来处理的数据,都是要求异步得到结果的数据;再比如数据量一般都很大,要不然也没有必要在集群上操纵这么一个大家伙,等等.

Spark Shuffle过程分析:Map阶段处理流程

- - 简单之美
默认配置情况下,Spark在Shuffle过程中会使用SortShuffleManager来管理Shuffle过程中需要的基本组件,以及对RDD各个Partition数据的计算. 我们可以在Driver和Executor对应的SparkEnv对象创建过程中看到对应的配置,如下代码所示:. 如果需要修改ShuffleManager实现,则只需要修改配置项spark.shuffle.manager即可,默认支持sort和 tungsten-sort,可以指定自己实现的ShuffleManager类.

HADOOP SHUFFLE(转载)

- - 数据库 - ITeye博客
Shuffle过程是MapReduce的核心,也被称为奇迹发生的地方. 要想理解MapReduce,Shuffle是必须要了解的. 我看过很多相关的资料,但每次看完都云里雾里的绕着,很难理清大致的逻辑,反而越搅越混. 前段时间在做MapReduce job性能调优的工作,需要深入代码研究MapReduce的运行机制,这才对Shuffle探了个究竟.

谈谈 Flink Shuffle 演进

- - 时间与精神的小屋
在分布式计算中,Shuffle 是非常关键但常常容易被忽视的一环. 比如著名的 MapReduce 的命名跳过 Shuffle ,只包含其前后的 Map 跟 Reduce. 背后原因一方面是 Shuffle 是底层框架在做的事情,用户基本不会感知到其存在,另一方面是 Shuffle 听起来似乎是比较边缘的基础服务.

[转]MapReduce:详解Shuffle(copy,sort,merge)过程

- - 芒果先生Mango的专栏
Shuffle过程是MapReduce的核心,也被称为奇迹发生的地方. 要想理解MapReduce, Shuffle是必须要了解的. 我看过很多相关的资料,但每次看完都云里雾里的绕着,很难理清大致的逻辑,反而越搅越混. 前段时间在做MapReduce job 性能调优的工作,需要深入代码研究MapReduce的运行机制,这才对Shuffle探了个究竟.

Spark概览

- - 简单文本
Spark具有先进的DAG执行引擎,支持cyclic data flow和内存计算. 因此,它的运行速度,在内存中是Hadoop MapReduce的100倍,在磁盘中是10倍. 这样的性能指标,真的让人心动啊. Spark的API更为简单,提供了80个High Level的操作,可以很好地支持并行应用.

Spark与Mapreduce?

- - 崔永键的博客
我本人是类似Hive平台的系统工程师,我对MapReduce的熟悉程度是一般,它是我的底层框架. 我隔壁组在实验Spark,想将一部分计算迁移到Spark上. 年初的时候,看Spark的评价,几乎一致表示,Spark是小数据集上处理复杂迭代的交互系统,并不擅长大数据集,也没有稳定性. 但是最近的风评已经变化,尤其是14年10月他们完成了Peta sort的实验,这标志着Spark越来越接近替代Hadoop MapReduce了.

Spark迷思

- - ITeye博客
目前在媒体上有很大的关于Apache Spark框架的声音,渐渐的它成为了大数据领域的下一个大的东西. 证明这件事的最简单的方式就是看google的趋势图:. 上图展示的过去两年Hadoop和Spark的趋势. Spark在终端用户之间变得越来越受欢迎,而且这些用户经常在网上找Spark相关资料. 这给了Spark起了很大的宣传作用;同时围绕着它的也有误区和思维错误,而且很多人还把这些误区作为银弹,认为它可以解决他们的问题并提供比Hadoop好100倍的性能.

Spark 优化

- - CSDN博客推荐文章
提到Spark与Hadoop的区别,基本最常说的就是Spark采用基于内存的计算方式,尽管这种方式对数据处理的效率很高,但也会往往引发各种各样的问题,Spark中常见的OOM等等. 效率高的特点,注定了Spark对性能的严苛要求,那Spark不同程序的性能会碰到不同的资源瓶颈,比如:CPU,带宽、内存.