实用教程|Spark性能优化之道——解决Spark数据倾斜

标签: geek | 发表时间:2017-03-23 08:00 | 作者:
出处:http://itindex.net/admin/pagedetail

实用教程|Spark性能优化之道——解决Spark数据倾斜

    2017-03-16 11:31  浏览次数:108

 

为何要处理数据倾斜(Data Skew)

 

什么是数据倾斜

对Spark/Hadoop这样的大数据系统来讲,数据量大并不可怕,可怕的是数据倾斜。

何谓数据倾斜?数据倾斜指的是,并行处理的数据集中,某一部分(如Spark或Kafka的一个Partition)的数据显著多于其它部分,从而使得该部分的处理速度成为整个数据集处理的瓶颈。

数据倾斜是如何造成的

 在Spark中,同一个Stage的不同Partition可以并行处理,而具有依赖关系的不同Stage之间是串行处理的。假设某个Spark Job分为Stage 0和Stage 1两个Stage,且Stage 1依赖于Stage 0,那Stage 0完全处理结束之前不会处理Stage 1。而Stage 0可能包含N个Task,这N个Task可以并行进行。如果其中N-1个Task都在10秒内完成,而另外一个Task却耗时1分钟,那该Stage的总时间至少为1分钟。换句话说,一个Stage所耗费的时间,主要由最慢的那个Task决定。

由于同一个Stage内的所有Task执行相同的计算,在排除不同计算节点计算能力差异的前提下,不同Task之间耗时的差异主要由该Task所处理的数据量决定。

Stage的数据来源主要分为如下两类

  • 从数据源直接读取。如读取HDFS,Kafka
  • 读取上一个Stage的Shuffle数据

如何缓解/消除数据倾斜

尽量避免数据源的数据倾斜

以Spark Stream通过DirectStream方式读取Kafka数据为例。由于Kafka的每一个Partition对应Spark的一个Task(Partition),所以Kafka内相关Topic的各Partition之间数据是否平衡,直接决定Spark处理该数据时是否会产生数据倾斜。

如《Kafka设计解析(一)- Kafka背景及架构介绍》一文所述,Kafka某一Topic内消息在不同Partition之间的分布,主要由Producer端所使用的Partition实现类决定。如果使用随机Partitioner,则每条消息会随机发送到一个Partition中,从而从概率上来讲,各Partition间的数据会达到平衡。此时源Stage(直接读取Kafka数据的Stage)不会产生数据倾斜。

但很多时候,业务场景可能会要求将具备同一特征的数据顺序消费,此时就需要将具有相同特征的数据放于同一个Partition中。一个典型的场景是,需要将同一个用户相关的PV信息置于同一个Partition中。此时,如果产生了数据倾斜,则需要通过其它方式处理。

调整并行度分散同一个Task的不同Key

原理

Spark在做Shuffle时,默认使用HashPartitioner(非Hash Shuffle)对数据进行分区。如果并行度设置的不合适,可能造成大量不相同的Key对应的数据被分配到了同一个Task上,造成该Task所处理的数据远大于其它Task,从而造成数据倾斜。

 如果调整Shuffle时的并行度,使得原本被分配到同一Task的不同Key发配到不同Task上处理,则可降低原Task所需处理的数据量,从而缓解数据倾斜问题造成的短板效应。

案例

现有一张测试表,名为student_external,内有10.5亿条数据,每条数据有一个唯一的id值。现从中取出id取值为9亿到10.5亿的共1.5条数据,并通过一些处理,使得id为9亿到9.4亿间的所有数据对12取模后余数为8(即在Shuffle并行度为12时该数据集全部被HashPartition分配到第8个Task),其它数据集对其id除以100取整,从而使得id大于9.4亿的数据在Shuffle时可被均匀分配到所有Task中,而id小于9.4亿的数据全部分配到同一个Task中。处理过程如下

     INSERT OVERWRITE TABLE test
SELECT CASE WHEN id < 940000000 THEN (9500000  + (CAST (RAND() * 8 AS INTEGER)) * 12 )
       ELSE CAST(id/100 AS INTEGER)
       END,
       name
FROM student_external
WHERE id BETWEEN 900000000 AND 1050000000;

通过上述处理,一份可能造成后续数据倾斜的测试数据即以准备好。接下来,使用Spark读取该测试数据,并通过 groupByKey(12)对id分组处理,且Shuffle并行度为12。代码如下

       public class SparkDataSkew {
  public static void main(String[] args) {
    SparkSession sparkSession = SparkSession.builder()
      .appName("SparkDataSkewTunning")
      .config("hive.metastore.uris", "thrift://hadoop1:9083")
      .enableHiveSupport()
      .getOrCreate();     Dataset dataframe = sparkSession.sql( "select * from test");
    dataframe.toJavaRDD()
      .mapToPair((Row row) -> new Tuple2(row.getInt(0),row.getString(1)))
      .groupByKey(12)
      .mapToPair((Tuple2> tuple) -> {
        int id = tuple._1();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        tuple._2().forEach((String name) -> atomicInteger.incrementAndGet());
        return new Tuple2(id, atomicInteger.get());
      }).count();

      sparkSession.stop();
      sparkSession.close();
  }
  
}

本次实验所使用集群节点数为4,每个节点可被Yarn使用的CPU核数为16,内存为16GB。使用如下方式提交上述应用,将启动4个Executor,每个Executor可使用核数为12(该配置并非生产环境下的最优配置,仅用于本文实验),可用内存为12GB。

   spark-submit --queue ambari --num-executors 4 --executor-cores 12 --executor-memory 12g --class com.jasongj.spark.driver.SparkDataSkew --master yarn --deploy-mode client SparkExample-with-dependencies-1.0.jar

GroupBy Stage的Task状态如下图所示,Task 8处理的记录数为4500万,远大于(9倍于)其它11个Task处理的500万记录。而Task 8所耗费的时间为38秒,远高于其它11个Task的平均时间(16秒)。整个Stage的时间也为38秒,该时间主要由最慢的Task 8决定。

在这种情况下,可以通过调整Shuffle并行度,使得原来被分配到同一个Task(即该例中的Task 8)的不同Key分配到不同Task,从而降低Task 8所需处理的数据量,缓解数据倾斜。

通过 groupByKey(48)将Shuffle并行度调整为48,重新提交到Spark。新的Job的GroupBy Stage所有Task状态如下图所示。

从上图可知,记录数最多的Task 20处理的记录数约为1125万,相比于并行度为12时Task 8的4500万,降低了75%左右,而其耗时从原来Task 8的38秒降到了24秒。

 在这种场景下,调整并行度,并不意味着一定要增加并行度,也可能是减小并行度。如果通过 groupByKey(11)将Shuffle并行度调整为11,重新提交到Spark。新Job的GroupBy Stage的所有Task状态如下图所示。

从上图可见,处理记录数最多的Task 6所处理的记录数约为1045万,耗时为23秒。处理记录数最少的Task 1处理的记录数约为545万,耗时12秒。

总结

适用场景
大量不同的Key被分配到了相同的Task造成该Task数据量过大。

解决方案
调整并行度。一般是增大并行度,但有时如本例减小并行度也可达到效果。

优势
实现简单,可在需要Shuffle的操作算子上直接设置并行度或者使用 spark.default.parallelism设置。如果是Spark SQL,还可通过 SET spark.sql.shuffle.partitions=[num_tasks]设置并行度。可用最小的代价解决问题。一般如果出现数据倾斜,都可以通过这种方法先试验几次,如果问题未解决,再尝试其它方法。

劣势
适用场景少,只能将分配到同一Task的不同Key分散开,但对于同一Key倾斜严重的情况该方法并不适用。并且该方法一般只能缓解数据倾斜,没有彻底消除问题。从实践经验来看,其效果一般。

自定义Partitioner

原理

使用自定义的Partitioner(默认为HashPartitioner),将原本被分配到同一个Task的不同Key分配到不同Task。

案例

以上述数据集为例,继续将并发度设置为12,但是在 groupByKey算子上,使用自定义的 Partitioner(实现如下)

       .groupByKey(new Partitioner() {
  @Override
  public int numPartitions() {
    return 12;
  }

  @Override
  public int getPartition(Object key) {
    int id = Integer.parseInt(key.toString());
    if(id >= 9500000 && id <= 9500084 && ((id - 9500000) % 12) == 0) {
      return (id - 9500000) / 12;
    } else {
      return id % 12;
    }
  }
})

由下图可见,使用自定义Partition后,耗时最长的Task 6处理约1000万条数据,用时15秒。并且各Task所处理的数据集大小相当。

总结

适用场景
大量不同的Key被分配到了相同的Task造成该Task数据量过大。

解决方案
使用自定义的Partitioner实现类代替默认的HashPartitioner,尽量将所有不同的Key均匀分配到不同的Task中。

优势
不影响原有的并行度设计。如果改变并行度,后续Stage的并行度也会默认改变,可能会影响后续Stage。

劣势
适用场景有限,只能将不同Key分散开,对于同一Key对应数据集非常大的场景不适用。效果与调整并行度类似,只能缓解数据倾斜而不能完全消除数据倾斜。而且需要根据数据特点自定义专用的Partitioner,不够灵活。

将Reduce side Join转变为Map side Join

原理通过Spark的Broadcast机制,将Reduce侧Join转化为Map侧Join,避免Shuffle从而完全消除Shuffle带来的数据倾斜。

案例

通过如下SQL创建一张具有倾斜Key且总记录数为1.5亿的大表test。

       INSERT OVERWRITE TABLE test
SELECT CAST(CASE WHEN id < 980000000 THEN (95000000  + (CAST (RAND() * 4 AS INT) + 1) * 48 )
       ELSE CAST(id/10 AS INT) END AS STRING),
       name
FROM student_external
WHERE id BETWEEN 900000000 AND 1050000000;

使用如下SQL创建一张数据分布均匀且总记录数为50万的小表test_new。

       INSERT OVERWRITE TABLE test_new
SELECT CAST(CAST(id/10 AS INT) AS STRING),
       name
FROM student_delta_external
WHERE id BETWEEN 950000000 AND 950500000;

直接通过Spark Thrift Server提交如下SQL将表test与表test_new进行Join并将Join结果存于表test_join中。

       INSERT OVERWRITE TABLE test_join
SELECT test_new.id, test_new.name
FROM test
JOIN test_new
ON test.id = test_new.id;

该SQL对应的DAG如下图所示。从该图可见,该执行过程总共分为三个Stage,前两个用于从Hive中读取数据,同时二者进行Shuffle,通过最后一个Stage进行Join并将结果写入表test_join中。

从下图可见,最近Join Stage各Task处理的数据倾斜严重,处理数据量最大的Task耗时7.1分钟,远高于其它无数据倾斜的Task约2s秒的耗时。

 接下来,尝试通过Broadcast实现Map侧Join。实现Map侧Join的方法,并非直接通过 CACHE TABLE test_new将小表test_new进行cache。现通过如下SQL进行Join。

       CACHE TABLE test_new;
INSERT OVERWRITE TABLE test_join
SELECT test_new.id, test_new.name
FROM test
JOIN test_new
ON test.id = test_new.id;

通过如下DAG图可见,该操作仍分为三个Stage,且仍然有Shuffle存在,唯一不同的是,小表的读取不再直接扫描Hive表,而是扫描内存中缓存的表。

并且数据倾斜仍然存在。如下图所示,最慢的Task耗时为7.1分钟,远高于其它Task的约2秒。

正确的使用Broadcast实现Map侧Join的方式是,通过 SET spark.sql.autoBroadcastJoinThreshold=104857600;将Broadcast的阈值设置得足够大。

再次通过如下SQL进行Join。

       SET spark.sql.autoBroadcastJoinThreshold=104857600;
INSERT OVERWRITE TABLE test_join
SELECT test_new.id, test_new.name
FROM test
JOIN test_new
ON test.id = test_new.id;

通过如下DAG图可见,该方案只包含一个Stage。

并且从下图可见,各Task耗时相当,无明显数据倾斜现象。并且总耗时为1.5分钟,远低于Reduce侧Join的7.3分钟。

总结

适用场景
参与Join的一边数据集足够小,可被加载进Driver并通过Broadcast方法广播到各个Executor中。

解决方案
在Java/Scala代码中将小数据集数据拉取到Driv

更多行业资讯,更新鲜的技术动态,尽在 慧都学院

 

相关 [spark 性能优化 spark] 推荐:

Spark性能优化指南——基础篇

- - 美团点评技术团队
在大数据计算领域,Spark已经成为了越来越流行、越来越受欢迎的计算平台之一. Spark的功能涵盖了大数据领域的离线批处理、SQL类处理、流式/实时计算、机器学习、图计算等各种不同类型的计算操作,应用范围与前景非常广泛. 在美团•大众点评,已经有很多同学在各种项目中尝试使用Spark. 大多数同学(包括笔者在内),最初开始尝试使用Spark的原因很简单,主要就是为了让大数据计算作业的执行速度更快、性能更高.

Spark性能优化指南——高级篇

- - 美团点评技术团队
继 基础篇讲解了每个Spark开发人员都必须熟知的开发调优与资源调优之后,本文作为《Spark性能优化指南》的高级篇,将深入分析数据倾斜调优与shuffle调优,以解决更加棘手的性能问题. 有的时候,我们可能会遇到大数据计算中一个最棘手的问题——数据倾斜,此时Spark作业的性能会比期望差很多. 数据倾斜调优,就是使用各种技术方案解决不同类型的数据倾斜问题,以保证Spark作业的性能.

Spark性能优化——和shuffle搏斗

- - 四火的唠叨
Spark的性能分析和调优很有意思,今天再写一篇. 主要话题是shuffle,当然也牵涉一些其他代码上的小把戏. 以前写过一篇文章,比较了 几种不同场景的性能优化,包括portal的性能优化,web service的性能优化,还有Spark job的性能优化. Spark的性能优化有一些特殊的地方,比如实时性一般不在考虑范围之内,通常我们用Spark来处理的数据,都是要求异步得到结果的数据;再比如数据量一般都很大,要不然也没有必要在集群上操纵这么一个大家伙,等等.

实用教程|Spark性能优化之道——解决Spark数据倾斜

- - IT瘾-geek
实用教程|Spark性能优化之道——解决Spark数据倾斜.     2017-03-16 11:31  浏览次数:108. 为何要处理数据倾斜(Data Skew). 对Spark/Hadoop这样的大数据系统来讲,数据量大并不可怕,可怕的是数据倾斜. 数据倾斜指的是,并行处理的数据集中,某一部分(如Spark或Kafka的一个Partition)的数据显著多于其它部分,从而使得该部分的处理速度成为整个数据集处理的瓶颈.

Spark性能优化之道——解决Spark数据倾斜(Data Skew)的N种姿势

- - IT瘾-bigdata
本文结合实例详细阐明了Spark数据倾斜的几种场景以及对应的解决方案,包括避免数据源倾斜,调整并行度,使用自定义Partitioner,使用Map侧Join代替Reduce侧Join,给倾斜Key加上随机前缀等. 为何要处理数据倾斜(Data Skew). 对Spark/Hadoop这样的大数据系统来讲,数据量大并不可怕,可怕的是数据倾斜.

HBase最佳实践-写性能优化策略 – 有态度的HBase/Spark/BigData

- -
上一篇文章主要介绍了HBase读性能优化的基本套路,本篇文章来说道说道如何诊断HBase写数据的异常问题以及优化写性能. 和读相比,HBase写数据流程倒是显得很简单:数据先顺序写入HLog,再写入对应的缓存Memstore,当Memstore中数据大小达到一定阈值(128M)之后,系统会异步将Memstore中数据flush到HDFS形成小文件.

HBase最佳实践-读性能优化策略 – 有态度的HBase/Spark/BigData

- -
任何系统都会有各种各样的问题,有些是系统本身设计问题,有些却是使用姿势问题. HBase也一样,在真实生产线上大家或多或少都会遇到很多问题,有些是HBase还需要完善的,有些是我们确实对它了解太少. 总结起来,大家遇到的主要问题无非是Full GC异常导致宕机问题、RIT问题、写吞吐量太低以及读延迟较大.

Spark概览

- - 简单文本
Spark具有先进的DAG执行引擎,支持cyclic data flow和内存计算. 因此,它的运行速度,在内存中是Hadoop MapReduce的100倍,在磁盘中是10倍. 这样的性能指标,真的让人心动啊. Spark的API更为简单,提供了80个High Level的操作,可以很好地支持并行应用.

Spark与Mapreduce?

- - 崔永键的博客
我本人是类似Hive平台的系统工程师,我对MapReduce的熟悉程度是一般,它是我的底层框架. 我隔壁组在实验Spark,想将一部分计算迁移到Spark上. 年初的时候,看Spark的评价,几乎一致表示,Spark是小数据集上处理复杂迭代的交互系统,并不擅长大数据集,也没有稳定性. 但是最近的风评已经变化,尤其是14年10月他们完成了Peta sort的实验,这标志着Spark越来越接近替代Hadoop MapReduce了.