Spark对数据倾斜的八种处理方法 | Peripateticism

标签: | 发表时间:2018-01-10 16:04 | 作者:
出处:http://yuenshome.cn

Spark对数据倾斜的八种处理方法

本文主要讲Spark针对数据倾斜的解决方案(来自数盟的一篇文章《数据倾斜是多么痛?spark作业/面试/调优必备秘籍》,见文末参考),但核心思想也可迁移到其它框架的使用上,部分需要看图更好理解(毕竟本文只是对其理解,相当于摘要,建议直接打开文末参考的连接)。

之前在做垃圾短信分类中,也遇到过OOM的问题,我的解决方案是使用RDD.randomSplit对RDD进行指定比例切分出多个subRDD,没有本文考虑地如此细致。

本文目录

  1. 1. 什么是数据倾斜
  2. 2. 解决数据倾斜需要
  3. 3. 导致Spark数据倾斜的本质
  4. 4. 定位最慢的Task所处的源码位置
  5. 5. 解决方案
    1. 方案一:使用Hive ETL预处理
    2. 方案二:过滤导致倾斜的key
    3. 方案三:提高Shuffle操作并行度
    4. 方案四:两阶段聚合(局部聚合+全局聚合)
    5. 方案五:将reduce join转为map join
    6. 方案六:采样倾斜key并分拆join操作
    7. 方案七:用随机前缀和扩容RDD进行join
    8. 方案八:多种方案组合
  6. 6. 参考
 

1. 什么是数据倾斜

数据倾斜是一种很常见的问题(依据二八定律),简单来说,比方WordCount中某个Key对应的数据量非常大的话,就会产生数据倾斜,导致两个后果:

  1. OOM(单或少数的节点);
  2. 拖慢整个Job执行时间(其他已经完成的节点都在等这个还在做的节点)。

2. 解决数据倾斜需要

  1. 搞定 Shuffle;
  2. 搞定业务场景;
  3. 搞定 CPU core 的使用情况;
  4. 搞定 OOM 的根本原因等:一般都因为数据倾斜(某task任务的数据量过大,GC压力大,和Kafka不同在于Kafka的内存不经过JVM,其基于Linux的Page)。

3. 导致Spark数据倾斜的本质

Shuffle时,需将各节点的相同key的数据拉取到某节点上的一个task来处理,若某个key对应的数据量很大就会发生数据倾斜。比方说大部分key对应10条数据,某key对应10万条,大部分task只会被分配10条数据,很快做完,个别task分配10万条数据,不仅运行时间长,且整个stage的作业时间由最慢的task决定。

数据倾斜只会发生在Shuffle过程,以下算法可能触发Shuffle操作: distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。

4. 定位最慢的Task所处的源码位置

步骤一: 看数据倾斜发生在哪个stage(也就是看以上算子出现在哪个阶段)。yarn-client模式下查看本地log或Spark Web UI中当前运行的是哪个stage;yarn-cluster模式下,通过Spark Web UI查看运行到了哪个Stage。
主要看最慢的Stage各task分配的数据量,来确定是否是数据倾斜。

步骤二:根据Stage划分,推算倾斜发生的代码(必然有Shuffle类算子)。简单实用方法:只要看到shuffle类算子或Spark SQL的SQL语句会有Shuffle类的算子的句子,就可以该地方划分为前后两个Stage。(之前用Python的PySpark接口,Spark Web UI会查看task在源码中的行数,Java或者Scala虽没用过,但我想应该有)

5. 解决方案

方案一:使用Hive ETL预处理

  • 场景:若Hive表中数据不均匀,且业务中会频繁用Spark对Hive表分析;
  • 思路:用Hive对数据预处理(对key聚合等操作),原本是Spark对Hive的原表操作,现在就是对Hive预处理后的表操作;
  • 原理:从根源解决了数据倾斜,规避了了Spark进行Shuffle类算子操作。但Hive ETL中进行聚合等操作会发生数据倾斜,只是把慢转移给了Hive ETL;
  • 优点:方便,效果好,规避了Spark数据倾斜;
  • 缺点:治标不治本,Hive ETL会数据倾斜。

方案二:过滤导致倾斜的key

  • 场景:发生倾斜的key很少且不重要;
  • 思路:对发生倾斜的key过滤掉。比方在Spark SQL中用where子句或filter过滤,若每次作业执行,需要动态判定可使用sample算子对RDD采样后取数据量最多的key过滤;
  • 原理:对倾斜的key过滤后,这些key便不会参与后面的计算,从本质上消除数据倾斜;
  • 优点:简单,效果明显;
  • 缺点:适用场景少,实际中导致倾斜的key很多。

方案三:提高Shuffle操作并行度

  • 场景:任何场景都可以,优先选择的最简单方案;
  • 思路:对RDD操作的Shuffle算子传入一个参数,也就是设置Shuffle算子执行时的Shuffle read task数量。对于Spark SQL的Shuffle类语句(如group by,join)即spark.sql.shuffle.partitions,代表shuffle read task的并行度,默认值是200可修改;
  • 原理:增大shuffle read task参数值,让每个task处理比原来更少的数据;
  • 优点:简单,有效;
  • 缺点:缓解的效果很有限。

方案四:两阶段聚合(局部聚合+全局聚合)

  • 场景:对RDD进行reduceByKey等聚合类shuffle算子,SparkSQL的groupBy做分组聚合这两种情况
  • 思路:首先通过map给每个key打上n以内的随机数的前缀并进行局部聚合,即(hello, 1) (hello, 1) (hello, 1) (hello, 1)变为(1_hello, 1) (1_hello, 1) (2_hello, 1),并进行reduceByKey的局部聚合,然后再次map将key的前缀随机数去掉再次进行全局聚合;
  • 原理:对原本相同的key进行随机数附加,变成不同key,让原本一个task处理的数据分摊到多个task做局部聚合,规避单task数据过量。之后再去随机前缀进行全局聚合;
  • 优点:效果非常好(对聚合类Shuffle操作的倾斜问题);
  • 缺点:范围窄(仅适用于聚合类的Shuffle操作,join类的Shuffle还需其它方案)。

方案五:将reduce join转为map join

  • 场景:对RDD或Spark SQL使用join类操作或语句,且join操作的RDD或表比较小(百兆或1,2G);
  • 思路:使用broadcast和map类算子实现join的功能替代原本的join,彻底规避shuffle。对较小RDD直接collect到内存,并创建broadcast变量;并对另外一个RDD执行map类算子,在该算子的函数中,从broadcast变量(collect出的较小RDD)与当前RDD中的每条数据依次比对key,相同的key执行你需要方式的join;
  • 原理:若RDD较小,可采用广播小的RDD,并对大的RDD进行map,来实现与join同样的效果。简而言之,用broadcast-map代替join,规避join带来的shuffle(无Shuffle无倾斜);
  • 优点:效果很好(对join操作导致的倾斜),根治;
  • 缺点:适用场景小(大表+小表),广播(driver和executor节点都会驻留小表数据)小表也耗内存。

方案六:采样倾斜key并分拆join操作

  • 场景:两个较大的(无法采用方案五)RDD/Hive表进行join时,且一个RDD/Hive表中少数key数据量过大,另一个RDD/Hive表的key分布较均匀(RDD中两者之一有一个更倾斜);
  • 思路:
    • 1. 对更倾斜rdd1进行采样(RDD.sample)并统计出数据量最大的几个key;
    • 2. 对这几个倾斜的key从原本rdd1中拆出形成一个单独的rdd1_1,并打上0~n的随机数前缀,被拆分的原rdd1的另一部分(不包含倾斜key)又形成一个新rdd1_2;
    • 3. 对rdd2过滤出rdd1倾斜的key,得到rdd2_1,并将其中每条数据扩n倍,对每条数据按顺序附加0~n的前缀,被拆分出key的rdd2也独立形成另一个rdd2_2;
      【个人认为,这里扩了n倍,最后union完还需要将每个倾斜key对应的value减去(n-1)】
    • 4. 将加了随机前缀的rdd1_1和rdd2_1进行join(此时原本倾斜的key被打散n份并被分散到更多的task中进行join);
      【个人认为,这里应该做两次join,两次join中间有一个map去前缀】
    • 5. 另外两个普通的RDD(rdd1_2、rdd2_2)照常join;
    • 6. 最后将两次join的结果用union结合得到最终的join结果。
  • 原理:对join导致的倾斜是因为某几个key,可将原本RDD中的倾斜key拆分出原RDD得到新RDD,并以加随机前缀的方式打散n份做join,将倾斜key对应的大量数据分摊到更多task上来规避倾斜;
  • 优点:前提是join导致的倾斜(某几个key倾斜),避免占用过多内存(只需对少数倾斜key扩容n倍);
  • 缺点:对过多倾斜key不适用。

方案七:用随机前缀和扩容RDD进行join

  • 场景:RDD中有大量key导致倾斜;
  • 思路:与方案六类似。
    1. 查看RDD/Hive表中数据分布并找到造成倾斜的RDD/表;
    2. 对倾斜RDD中的每条数据打上n以内的随机数前缀;
    3. 对另外一个正常RDD的每条数据扩容n倍,扩容出的每条数据依次打上0到n的前缀;
    4. 对处理后的两个RDD进行join。
  • 原理:与方案六只有唯一不同在于这里对不倾斜RDD中所有数据进行扩大n倍,而不是找出倾斜key进行扩容(这是方案六);
  • 优点:对join类的数据倾斜都可处理,效果非常显著;
  • 缺点:缓解,扩容需要大内存。
    【个人认为,这里和方案六一样,也需要对扩容的key对应的value最后减去(n-1),除非只需大小关系,对值没有要求】

方案八:多种方案组合

实际中,需综合着对业务全盘考虑,可先用方案一和二进行预处理,同时在需要Shuffle的操作提升Shuffle的并行度,最后针对数据分布选择后面方案中的一种或多种。实际中需要对数据和方案思路理解灵活应用。

6. 参考

  • 数据倾斜是多么痛?spark作业/面试/调优必备秘籍
    http://mp.weixin.qq.com/s?__biz=MzA4NjA4MTkzMw==&mid=2651987825&idx=1&sn=0b253fc2e636547a4501bb8140b5b37b&scene=1&srcid=07147NC7qNbqP5r5HjOlE9Gu#wechat_redirect

相关 [spark 数据 方法] 推荐:

Spark对数据倾斜的八种处理方法 | Peripateticism

- -
Spark对数据倾斜的八种处理方法. 本文主要讲Spark针对数据倾斜的解决方案(来自数盟的一篇文章《数据倾斜是多么痛. spark作业/面试/调优必备秘籍》,见文末参考),但核心思想也可迁移到其它框架的使用上,部分需要看图更好理解(毕竟本文只是对其理解,相当于摘要,建议直接打开文末参考的连接). 之前在做垃圾短信分类中,也遇到过OOM的问题,我的解决方案是使用RDD.randomSplit对RDD进行指定比例切分出多个subRDD,没有本文考虑地如此细致.

spark结构化数据处理:Spark SQL、DataFrame和Dataset | smallx's sth.

- -
本文讲解Spark的结构化数据处理,主要包括:Spark SQL、DataFrame、Dataset以及Spark SQL服务等相关内容. 本文主要讲解Spark 1.6.x的结构化数据处理相关东东,但因Spark发展迅速(本文的写作时值Spark 1.6.2发布之际,并且Spark 2.0的预览版本也已发布许久),因此请随时关注.

Spark-ML-数据获取/处理/准备

- - CSDN博客综合推荐文章
UCL机器学习知识库:包括近300个不同大小和类型的数据集,可用于分类、回归、聚类 和推荐系统任务. 数据集列表位于: http://archive.ics.uci.edu/ml/. Amazon AWS公开数据集:包含的通常是大型数据集,可通过Amazon S3访问. 这些数据 集包括人类 基因组项目 、 Common Crawl 网页语料 库、维基百 科数据和 Google Books Ngrams.

Spark Streaming 数据限流简述

- - IT瘾-dev
  Spark Streaming对实时数据流进行分析处理,源源不断的从数据源接收数据切割成一个个时间间隔进行处理;.   流处理与批处理有明显区别,批处理中的数据有明显的边界、数据规模已知;而流处理数据流并没有边界,也未知数据规模;.   由于流处理的数据流特征,使之数据流具有不可预测性,而且数据处理的速率还与硬件、网络等资源有关,在这种情况下如不对源源不断进来的数据流速率进行限制,那当Spark节点故障、网络故障或数据处理吞吐量下来时还有数据不断流进来,那将有可能将出现OOM进而导致Spark Streaming程序崩溃;.

实用教程|Spark性能优化之道——解决Spark数据倾斜

- - IT瘾-geek
实用教程|Spark性能优化之道——解决Spark数据倾斜.     2017-03-16 11:31  浏览次数:108. 为何要处理数据倾斜(Data Skew). 对Spark/Hadoop这样的大数据系统来讲,数据量大并不可怕,可怕的是数据倾斜. 数据倾斜指的是,并行处理的数据集中,某一部分(如Spark或Kafka的一个Partition)的数据显著多于其它部分,从而使得该部分的处理速度成为整个数据集处理的瓶颈.

虚拟座谈会:大数据一栈式方案Spark

- - CSDN博客架构设计推荐文章
Spark正在占据越来越多的大数据新闻的重要位置,除了性能优异,Spark到底具备了那些特性,让学术界和工业界对其充满了兴趣. 同时,Spark还处在快速发展的阶段,开发者和用户不得不解决不稳定和bug,Scala语言也有较高的学习门槛,这些也会成为Spark普及的障碍. 当然,尽管Spark提供了一栈式的大数据方案,但并不意味着他适合任何场景.

Spark:比Hadoop更强大的分布式数据计算项目

- - 标点符
Spark是一个由加州大学伯克利分校(UC Berkeley AMP)开发的一个分布式数据快速分析项目. 它的核心技术是弹性分布式数据集(Resilient distributed datasets),提供了比Hadoop更加丰富的MapReduce模型,可以快速在内存中对数据集进行多次迭代,来支持复杂的数据挖掘算法和图计算算法.

如何用 Hadoop/Spark 构建七牛数据平台

- - leejun_2005的个人页面
数据平台在大部分公司都属于支撑性平台,做的不好立刻会被吐槽,这点和运维部门很像. 所以在技术选型上优先考虑现成的工具,快速出成果,没必要去担心有技术负担. 早期,我们走过弯路,认为没多少工作量,收集存储和计算都自己研发,发现是吃力不讨好. 去年上半年开始,我们全面拥抱开源工具,搭建自己的数据平台. 公司的主要数据来源是散落在各个业务服务器上的半结构化日志,比如系统日志、程序日志、访问日志、审计日志等.

实用 | 从Apache Kafka到Apache Spark安全读取数据

- - IT瘾-bigdata
随着在CDH平台上物联网(IoT)使用案例的不断增加,针对这些工作负载的安全性显得至关重要. 本篇博文对如何以安全的方式在Spark中使用来自Kafka的数据,以及针对物联网(IoT)使用案例的两个关键组件进行了说明. Cloudera Distribution of Apache Kafka 2.0.0版本(基于Apache Kafka 0.9.0)引入了一种新型的Kafka消费者API,可以允许消费者从安全的Kafka集群中读取数据.

60 TB 数据:Facebook 是如何大规模使用 Apache Spark 的

- - IT技术博客大学习
标签:   facebook   spark.    Facebook 经常使用数据驱动的分析方法来做决策. 在过去的几年,用户和产品的增长已经需要我们的分析工程师一次查询就要操作数十 TB 大小的数据集. 我们的一些批量分析执行在古老的 Hive 平台( Apache Hive 由 Facebook 贡献于 2009 年)和 Corona 上——这是我们定制的 MapReduce 实现.