谈谈 Flink Shuffle 演进

标签: Flink Flink 流批一体 | 发表时间:2022-08-14 22:27 | 作者:
出处:https://link3280.github.io/

在分布式计算中,Shuffle 是非常关键但常常容易被忽视的一环。比如著名的 MapReduce 的命名跳过 Shuffle ,只包含其前后的 Map 跟 Reduce。背后原因一方面是 Shuffle 是底层框架在做的事情,用户基本不会感知到其存在,另一方面是 Shuffle 听起来似乎是比较边缘的基础服务。然而,笔者认为大数据计算与在线服务最基础的区别正在于 Shuffle。

众所周知,分布式算法的基础在于分治,而分治的三步为: 分解(Divide)、解决(Conquer)、合并(Combine),其中最为核心的分解与合并两步都与 Shuffle 密不可分。除了数据同步之类的完全并行(Embarrassingly parallel)作业,大多数分布式计算作业都会包含一到多轮 Shuffle,而这些 Shuffle 的本质则是将上一轮计算的中间结果按照下一轮计算需要的方式重新组织。例如,在著名的 WordCount 案例中,在 Map 阶段的数据是随机分布的,而在 Shuffle 过后则是按照单词为分区 Key 来分布。而剩下的完全并行作业,本质上并不是在处理一个需要分治的大问题,而是处理重复的大量小问题,这样的需求其实跟普通的 Web 服务是类似的,若不考虑效率,完全可以用微服务框架来实现。

如果说数据分治的核心在于 Shuffle,那么计算分治的核心则在于调度器,两者相辅相成,比如流式调度和批式调度会搭配不同的 Shuffle。对于实践流批一体的 Flink 而言,Shuffle 面临的问题比其他计算引擎更加复杂,因此 Flink 做了更多的优化,包括流计算的 Pipeline Shuffle、批计算的 Blocking Shuffle 以及结合二者特点的 Hybird Shuffle。

流计算的 Pipeline Shuffle

相比起 MapReduce 和 Spark 等批计算引擎,Flink 流计算的 Shuffle 相对简单,主要原因是所有 Task 同时在运行,上下游 Task 可以通过网络流式地传输中间结果,不需要落盘,这种 Shuffle 被称为 Pipeline Shuffle。

相信不少读者都接触过 Flink DAG 的边类型。当我们在 DAG 构建中使用 Partition (将数据分区)相关的操作,比如 DataStream 的 keyByrescale、SQL 中的 Group By,Flink 会引入一轮 Shuffle,体现在可视化的 DAG 上就是上下游划分到不同的两个节点,两者以一条边相连。边的类型有 HASHBROADCASTREBALANCE 等等(见下图)。

图1. Flink DAG 的边

尽管逻辑上的 Partition 有多种多样的算法,产生的边五花八门,但它们的区别仅在于产出的结果如何划分给不同的下游 Subtask,所以从底层的 Shuffle 看来要做的事情是一样的:将中间结果提供给不同的下游 Subtask 读取。结合下图用 Flink 的话语讲,Partition 算法决定如何划分出 SubPartition,而 Shuffle 决定如何将 SubPartition 传递给 InputGate。

图2. Flink Shuffle 实现

面对 Pipeline Shuffle 的需求,最容易想到的实现方式便是上游 Subtask 所在 TaskManager 直接通过网络推给下游 Subtask 的 TaskManager。事实上,Flink 也的确是这么做的。Flink 在 TaskManager 里内嵌了基于 Netty 的 Shuffle Service,计算得出的中间数据会存到 TaskManager 的缓存池中,由 Netty 去定时轮询发送给下游。

图3. 内置 Netty Shuffle

Pipeline Shuffle 实现上有很多值得研究的地方,其中最重要的是 Flink 1.5 版本引入的 Credit-Based 流控机制。简单来说,Credit-Based 流控实现了类似 TCP 滑动窗口的机制,让上游 Subtask 依据下游 Subtask 的空闲 buffer(Credit)来发送数据,避免多个 Subtask 共用的一条 TCP 链接因为其中一个 Subtask 被阻塞。感兴趣的读者推荐阅读《批流统一计算引擎的动力源泉—Shuffle机制的重构与优化》这篇博客[11]。

批计算的 Blocking Shuffle

批计算的上下游 Subtask 通常不会同时调度起来,所以上游产出数据首先需要落盘存储,等下游调度起来再去读取,这种方式被称为 Blocking Shuffle。自 Flink 开始定位为流批一体计算引擎后,社区便持续对 Flink 批计算的 Blocking Shuffle 进行改良。

首先是 Flink 1.9 将 Shuffle Service 与计算解耦,改为插件化的架构(见 FLIP-31[31])。在此之前,Shuffle Service 作为 TaskManager 职责之一,绑定使用 TaskManager 内置的 Netty Shuffle Service。Netty Shuffle Service 在 Pipeline Shuffle 的场景下会直接通过 TCP 流式发送数据,而在 Blocking Shuffle 的场景下则会先写本地文件,再等下游 Subtask 拉取。然而,后一种情况会导致问题是,上游已经结束的 Subtask 想要释放 TM 的资源,必须先等下游 Subtask 被调度起来并拉完数据,这会造成资源的浪费甚至死锁。更加重要的是,在某些批计算场景下(比如交互式查询),同一批中间数据可能会被消费多次,这是 TaskManager 兼任的 Shuffle Service 无法满足的。

熟悉 Spark 的读者可能会想起 Spark 的 ESS (External Shuffle Service) 和 RSS (Remote Shuffle Service)。前者支持 Spark Executor 本地部署 Shuffle Service,比如部署在 YARN NodeManager 里的 YARN Shuffle Service,而后者支持在远端部署 Shuffle Service,比如阿里的 Aliyun RSS[12]、腾讯刚贡献给 Apache 的 Uniffle(原名 Firestorm)[13]。Flink 参考Spark 的经验,在 FLIP-31 中同时考虑了 ESS 和 RSS 的需求,为后续迭代奠定了良好基础。

其次,Flink 1.12、1.13 引入并完善了 Blocking Shuffle 的 Sort-Merge 实现(见 FLIP-148[4])。Blocking Shuffle 有 Hash Shuffle 和 Sort-Merge Shuffle 两种常见策略。在此之前,Flink 只支持比较简单的 Hash Shuffle,而缺少性能更好更适合生产使用的 Sort-Merge Shuffle。

简单而言,Hash Shuffle 是将数据按照下游每个消费者一个文件的形式组织,当并行度高时会产生大量的文件,容易耗光操作系统的文件描述符,并产生大量随机 IO 对 HDD 磁盘不友好,此外每个文件需要一个独立 Buffer 占内存过多。

图4. Hash Shuffle

相比之下,Sort-Merge Shuffle 是将上游所有的结果写入同一个文件,文件内部再按照下游消费者的 ID 进行排序并维护索引,下游有读取数据请求时,则按照索引来读取大文件中的某一段。

图5. Sort Shuffle

参考 Spark,Spark 在 1.1 版本引入 Sort-Merge Shuffle,并在 1.2 版本用其替代 Hase Shuffle,成为默认的 Shuffle 策略。说句题外话,一方面 Spark 1.1 2014 年发布,而 Flink 1.12 2020 年发布,Flink 在批计算落后于 Spark 6 年,而另一方面,Spark 今年(2022)新宣布的流计算 ProjectLightspeed(Structured Streaming 升级版)[14]要做的特性基本上是 Flink 5 年前已经实现的,可谓有趣的对称。Flink 在批计算上落后于 Spark,正如同 Spark 在流计算上落后于 Flink。

批场景下流批一体的 Hybrid Shuffle

如上文所讲,流计算用 Pipeline Shuffle,批计算用 Blocking Shuffle,那么流批一体用什么 Shuffle 呢?大家很容易联系到本节要讨论的 Hybrid Shuffle,但遗憾的是这句话大概只对一半,因为目前的 Hybrid Shuffle 只针对批场景有效。

众所周知,Flink 遵循 “批是流的特殊案例” 的流批一体理念,因而 Flink 中的批计算是能以流计算的方式去跑的。然而,大多数情况下我们不会这么做,因为批场景有额外的特点能让我们进行优化,比如借助 Blocking Shuffle 可以解耦上下游,让它们无需同时运行,相当于用时间换空间,让作业资源门槛比 Pipeline Shuffle 更低。这点也体现在 Flink 的配置上: Flink 的批作业可以通过 execution.batch-shuffle-mode 指定 Shuffle 模式,默认为 Blocking 模式(其余模式还有 Pipelined 和 Hybird)。

Blocking Shuffle 带来的一个限制是排斥上下游同时运行,因为上游计算结束之前,下游是没办法访问到其不完整的结果数据的,即使调度下游 Subtask 也只会让其空跑。这点在批计算的角度看来很正常,但对于流批一体的 Flink 而言其实是有优化空间的。设想如果在执行上游作业时,集群有空余资源能跑下游作业,那么我们是不是可以尽量 fallback 回 Pipeline Shuffle,用空间换时间,让作业更快完成?

基于这个思路,Flink 社区在 FLIP-235 [8] 中提出了 Hybird Shuffle。Hybird Shuffle 支持以内存(Pipeline Shuffle 风格)或文件(Blocking Shuffle 风格)的方式存储上游产出的结果数据,原则是优先内存,内存满了后 spill 到文件(见下图)。无论是在内存或者文件中,所有数据在产出后即对下游可见,因此可以同时支持流式的消费或批式的消费。

图6. Hybird Shuffle 的数据生产和消费

以 WordCount 作业为例,假设一共有 2 个 Map 和 2 个 Reduce,但现在计算资源只有 3 个 slot,采用不同的 Shuffle 有以下效果:

  • Blocking Shuffle: 先调度 2 个 Map,再调度 2 个 Reduce,有 1 个 slot 被浪费。
  • Pipeline Shuffle: 要求 4 个 slot,因此作业无法运行。
  • Hybird Shuffle: 先调度 2 个 Map 和 1 个 Reduce,剩余一个 Reduce 等三者任意一个完成后再调度(见图 7)。

图7. Hybird Shuffle 下的 Wordcount

从图中可以看到,Map 产出的 Subpartition 1 被下游的 Reduce 1 流式读取,因此数据很可能是缓存在内存中;而 Subpartition 2 由于消费者 Reduce 2 还未运行,所以数据可能会在内存满之后 spill 到磁盘,等 Reduce 2 启动后再读取。

总结

Shuffle 是分布式计算中关键的一环,它与计算调度相辅相成,成为分布式计算分治的基础。对于流批一体的 Flink 而言,Shuffle 不仅要满足流计算调度、批计算调度,还要满足流批一体的调度。前两个场景的 Shuffle 经过多年的发展目前在业界已经比较成熟,而最后的流批一体 Shuffle 还有不少探索的空间。Flink 1.16 版本即将引入的 Hybird Shuffle 针对批场景的流批一体 Shuffle 进行优化,使得 Flink 可以在运行时根据资源情况灵活决定使用类似流计算的 Shuffle 还是批计算的 Shuffle,以提高空闲资源利用率。

参考

  1. Sort-Based Blocking Shuffle Implementation in Flink - Part One
  2. Sort-Based Blocking Shuffle Implementation in Flink - Part Two
  3. FLIP-31: Pluggable Shuffle Service
  4. FLIP-148: Introduce Sort-Based Blocking Shuffle to Flink
  5. FLIP-184: Refine ShuffleMaster lifecycle management for pluggable shuffle service framework
  6. FLIP-199: Change some default config values of blocking shuffle for better usability
  7. FLIP-209: Support to run multiple shuffle plugins in one session cluster
  8. FLIP-235: Hybrid Shuffle Mode Skip to end of metadata
  9. Flink 1.13,面向流批一体的运行时与 DataStream API 优化
  10. Advanced Flink Application Patterns Vol.2: Dynamic Updates of Application Logic
  11. 批流统一计算引擎的动力源泉—Shuffle机制的重构与优化
  12. Aliyun Remote Shuffle Service
  13. Firestorm
  14. Project Lightspeed: Faster and Simpler Stream Processing With Apache Spark
  15. Improvements in task scheduling for batch workloads in Apache Flink 1.12

相关 [flink shuffle] 推荐:

谈谈 Flink Shuffle 演进

- - 时间与精神的小屋
在分布式计算中,Shuffle 是非常关键但常常容易被忽视的一环. 比如著名的 MapReduce 的命名跳过 Shuffle ,只包含其前后的 Map 跟 Reduce. 背后原因一方面是 Shuffle 是底层框架在做的事情,用户基本不会感知到其存在,另一方面是 Shuffle 听起来似乎是比较边缘的基础服务.

HADOOP SHUFFLE(转载)

- - 数据库 - ITeye博客
Shuffle过程是MapReduce的核心,也被称为奇迹发生的地方. 要想理解MapReduce,Shuffle是必须要了解的. 我看过很多相关的资料,但每次看完都云里雾里的绕着,很难理清大致的逻辑,反而越搅越混. 前段时间在做MapReduce job性能调优的工作,需要深入代码研究MapReduce的运行机制,这才对Shuffle探了个究竟.

flink-watermark

- - ITeye博客
     当我们统计用户点击的时候,有时候会因为各种情况数据延迟,我们需要一个允许最大的延迟范围进行统计.        模拟初始数据:早上10:00 11.10 用户点击了一次,但是延迟到10:00 11.15 才发送过来,允许最大延迟5秒, 5秒窗口统计. /** 实际时间-偏移量 偏移后的时间*/.

不可不知的spark shuffle

- - IT瘾-geek
shuffle概览 一个spark的RDD有一组固定的分区组成,每个分区有一系列的记录组成. 对于由窄依赖变换(例如map和filter)返回的RDD,会延续父RDD的分区信息,以pipeline的形式计算. 每个对象仅依赖于父RDD中的单个对象. 诸如coalesce之类的操作可能导致任务处理多个输入分区,但转换仍然被认为是窄依赖的,因为一个父RDD的分区只会被一个子RDD分区继承.

[转]MapReduce:详解Shuffle(copy,sort,merge)过程

- - 芒果先生Mango的专栏
Shuffle过程是MapReduce的核心,也被称为奇迹发生的地方. 要想理解MapReduce, Shuffle是必须要了解的. 我看过很多相关的资料,但每次看完都云里雾里的绕着,很难理清大致的逻辑,反而越搅越混. 前段时间在做MapReduce job 性能调优的工作,需要深入代码研究MapReduce的运行机制,这才对Shuffle探了个究竟.

Spark性能优化——和shuffle搏斗

- - 四火的唠叨
Spark的性能分析和调优很有意思,今天再写一篇. 主要话题是shuffle,当然也牵涉一些其他代码上的小把戏. 以前写过一篇文章,比较了 几种不同场景的性能优化,包括portal的性能优化,web service的性能优化,还有Spark job的性能优化. Spark的性能优化有一些特殊的地方,比如实时性一般不在考虑范围之内,通常我们用Spark来处理的数据,都是要求异步得到结果的数据;再比如数据量一般都很大,要不然也没有必要在集群上操纵这么一个大家伙,等等.

一文精通 Flink on YARN

- - IT瘾-dev
本文主要是讲解flink on yarn的部署过程,然后yarn-session的基本原理,如何启动多个yarn-session的话如何部署应用到指定的yarn-session上,然后是用户jar的管理配置及故障恢复相关的参数. flink on yarn的整个交互过程图,如下:. 要使得flink运行于yarn上,flink要能找到hadoop配置,因为要连接到yarn的resourcemanager和hdfs.

Flink SQL 编程实践

- - Jark's Blog
注: 本教程实践基于 Ververica 开源的. sql-training 项目. 基于 Flink 1.7.2. 本文将通过五个实例来贯穿 Flink SQL 的编程实践,主要会涵盖以下几个方面的内容. 如何使用 SQL CLI 客户端. 如何在流上运行 SQL 查询. 运行 window aggregate 与 non-window aggregate,理解其区别.

Flink 1.16:Hive SQL 如何平迁到 Flink SQL

- - Jark's Blog
Hive SQL 迁移的动机. Flink 已经是流计算的事实标准,当前国内外做实时计算或流计算一般都会选择 Flink 和 Flink SQL. 另外,Flink 也是是家喻户晓的流批一体大数据计算引擎. 然而,目前 Flink 也面临着挑战. 比如虽然现在大规模应用都以流计算为主,但 Flink 批计算的应用并不广泛,想要进一步推动真正意义上的流批一体落地,需要推动业界更多地落地 Flink 批计算,需要更积极地拥抱现有的离线生态.

消息称苹果将结束iPod Shuffle和Classic

- 品味视界 - cnBeta.COM
国外媒体TUAW的消息,苹果可能会结束赚钱效应越来越差的部分iPod产品线,特别是功能简单价格也不占优势的Shuffle和出现已经有十年之久的硬盘版iPod Classic很可能是iPod产品线中率先被砍的两个产品.