Flink 流批一体中的数据边界

标签: Flink Flink 随想 | 发表时间:2022-02-13 15:27 | 作者:
出处:https://link3280.github.io/

众所周知,流场景和批场景最为根本的区别在于 Data Boundness(数据集有界性)。Data Boundness 将数据分为 Bounded 和 Un-Bounded。在业界过去多年的实践中,两者分别绑定对应领域的存储系统和计算引擎,然而在流批一体的趋势下,领域的边界在逐渐弱化。例如,消息队列通常用作流场景,但 Pravega 的 StreamCut 支持将指定队列中某一段消息作为批处理的输入[1]。在混合使用流批的场景下,不少原本大家习以为常的设定都需要重新去审视,其中的一项便是数据集内部的边界。

存储边界与计算边界

数据边界不仅包括数据集整体的逻辑边界,也包括数据集内部的存储单元逻辑边界,比如 HDFS 等文件系统的文件及底层的 Block、Kafka 等消息队列的 Partition 等等。数据边界在批处理中扮演着十分关键的角色,比如作为分治基础,比如标识计算的结束。

在批计算中,整个 Job 会被数据边界划分为多个小 Task,每个小 Task 都可以视为一个事务,计算是由数据的边界驱动的。如果将事务看成计算的逻辑单元,那么一个计算的逻辑单元的数据输入就对应一至多个存储的逻辑单元,因此我们可以说计算和存储是 对齐的。例如,在 MapReduce 中,一个 Map 的输入对应一个 Split,而一个 Split 由一至多个 HDFS Block 组成,但不会出现一个 Map 对应 1.5 个 Block 的情况。

图1. 批计算的对齐边界

在流计算中,虽然计算是连续不断的,但出于容错等原因,仍然会将计算划分为多个事务处理。以主流实时计算引擎 Apache Flink 来说,Flink 通常会定时触发两阶段提交(2PC)事务,也就是常说的 Checkpoint。Checkpoint 会向数据流注入 Checkpoint Barrier,作为每个 Checkpoint 对应数据的边界。以 Checkpoint Barrier 划分的数据单元和数据源本身的逻辑存储单元并无关系,因此两者的边界通常不会重合,我们可以说它们是 非对齐的。例如 Flink 读取 Kafka 数据,并不需要感知到 Partition 底层的 Segment,而 Kafka 也没有将这样的数据边界暴露给用户。

图2. 流计算的非对齐边界

在流批一体场景下,引擎常要读写有边界的数据集。取决于不同存储系统,不对齐的边界可能导致流计算的容错、可维护性都大打折扣。主要问题有数据血缘、结束条件和可重复读,下文逐一分析。

数据血缘

数据血缘指的是输入数据到输出数据之间依赖关系。如上文所说,批计算的输入数据边界与计算边界是对齐的,而计算边界很自然地又体现在输出数据的边界上。这点很容易理解,因为一个计算事务结束必然会 commit 数据,而这些数据会以文件、对象为单元独立存储,不会跟其他事务的数据混在一起。以文件、对象为单位,我们很容易追踪到数据上下游的血缘关系。

图3. 流计算的血缘关系

清晰的血缘关系能大大提高数据的可维护性。如果出现脏数据或者程序 bug 等异常,需要回滚计算时,我们可以方便地识别出异常的数据,删掉重新计算或者写 ad-hoc 脚本修复数据。比如在上图中的输入文件 1 出现问题,那么我们只需要处理事务 1 的输出数据即可,影响范围是十分明确的。此外,批计算的输入输出通常是以时间索引的(比如 Hive 中常用的天或小时分区),因此我们还可以依据时间来回滚事务。

然而,在流式计算中,即使输入数据是存在边界的,这样的边界信息并不会体现在计算上,计算仍是连续不断的,辅以周期性的事务。在触发 Checkpoint 快照的时候,Flink 会记录当前正在读取和正在写的文件的 Offset,作为对应事务的数据边界。

图4. 流计算的血缘关系

这意味着 Flink 计算时是无视存储逻辑单元边界的,边界信息被限制在与存储系统打交道的 Connector 中,这样的设计更符合单一职责原则,更加优雅,但也导致了存储边界信息以及血缘关系的丢失。当出需要回滚事务时,我们很难识别出影响范围,只能基于时间来过滤数据而不能直接回滚对应事务。

比如若发现上图中的文件 1 某条数据不准确,我们很难识别出需要回滚事务 1 还是事务 2,或者两者都需要,因此只能选择比较安全的做法,回滚全部事务。更加严重的问题在于,如果异常作业除了 Source/Sink 有还别的有状态的算子,那么我们无法直接丢弃原先的 Checkpoint 重新开始,只能从有限的几个可选 Checkpoint 中选一个来恢复,而这个 Checkpoint 记录的输入输出文件及其 Offset 又不一定符合当前最新状态,可能造成作业恢复状态后提交事务失败。

解决数据血缘丢失的关键在于,Flink Checkpoint 记录的数据存储 Offset 应当同步持久化到外部,最好可以有存储系统的原生支持。如此一来,即使事务数据即使没有对齐存储单元,要追踪和操作事务涉及的数据也比较方便。举个大家熟悉的例子就是 Kafka 的 Consumer Group Offset。不过 Kafka 仍有个问题在于 Consumer Group Offset 没有版本控制,所以只能记录最新的一组 Offset。在这点上,Pravega 允许多组 StreamCut 则更加友好。

结束条件

相对于数据血缘主要是业务应用层面问题,结束条件则更多是计算引擎层面的问题,而且是流批一体最大的障碍之一。幸运的是这些问题在最新的 Flink 1.14 都得到了基本解决。

在批计算中,计算的事务和输入数据的边界是对齐的,因此输入数据结束则代表事务可提交;而在流计算中,计算的事务是由周期性 Checkpoint 而不是输入数据边界驱动的,因此事务可提交的标识是输入数据结束加上 Checkpoint 快照成功。这点在 Flink 1.14 中有所体现,现在 Flink 可以在 Bounded Source 结束以后会马上触发一个 Checkpoint,来提交最后一个事务的数据,不过为保持与之前版本的行为一致,这个功能暂时在默认情况下是关闭的。

另外一个跟结束条件相关的问题是,在混合使用 Bounded 数据集和 Un-Bounded 数据集的情况下,会遇到 Bounded 数据集已经输入完毕(因此 Task 为 Finished)但整体作业还在运行的情况,这时 Flink 需要继续能进行正确 Checkpoint。这个问题听起来不算难,但其实有非常复杂的实现细节需要考虑,感兴趣的同学可以阅读 FLIP-147 [4]。本文只列举其中最为核心的三个备选解决方案,其中最后一个为被采纳的最终方案:

  • 让已经结束的 Source Task 继续保持在 Running 状态,不要转为 Finished 状态。这个方案比较投机取巧,但有点滥用了 Task 状态,带来的后果就是不能依靠 Source 结束产生的 EndOfPartition 事件来代表输出结束,而是要另外引入新的事件。
  • 让 Task 转为 Finished,同时记录 Finished Task 的 State 到 Checkpoint。这个行为听起来很自然,但实现起来有诸多问题,比如 Task 转为 Finished 前的最后一次 Checkpoint 包含着这个 Task 最终的 State,而作业后续的每次 Checkpoint 都会引用它,导致 Checkpoint 难以清理。这是因为 Task 变为 Finished 后状态不再可以访问,所以不能从当前的算子从获取。
  • 让 Task 转为 Finished,但不记录 Finished Task的 State 到 Checkpoint。这个方案相当于将 Finished Task 的 State 丢弃掉,因此在这之前的一个 Checkpoint 需要触发相关算子将中间结果全部 flush,效果类似 stop-with-savepoint --drain 命令。

数据源可重复读

数据源可重复读是 Flink Checkpoint 机制对数据源的要求之一,意味着 Flink 作业在进行主动或被动的重启之后,仍然可以依据 Checkpoint 记录的状态重新读取跟之前相同的数据。数据源可重复读是数据准确性基本保证。跟数据库领域的可重复读类似,数据源可重复读要求数据源在被读取期间不被同时发生的更新修改操作所影响。

从严格意义来说,数据源可重复与本文主题数据边界并没有必然关系。但 Bounded 存储系统通常基于文件、对象等可更新的抽象概念,而 Un-Bounded 存储系统通常基于消息队列这样不可更新的抽象概念,所以比起 Un-Bounded 存储,Bounded 存储需要额外考虑可重复读的问题。

如果一个文件、对象被流计算作业所读,可以认为它涉入了一个生命周期等同于作业能回滚的最大时长的长事务。由于 Bounded 存储系统通常没有类似数据库 MVCC 的多版本控制,因此在这个长事务期间,文件必须保持不变,以确保若作业出现事务回滚(也就是作业恢复至之前的某个 Checkpoint)时,读取到的数据还是跟以前一致的。这对于主要用作数据仓库或者数据归档场景的 HDFS、S3 来说问题并不是很大,因为数据写入之后常常不会再更新,但也有一些例外的情况,比如要对数据进行压缩合并或者作为冷数据降级到更便宜的存储系统上。所以在实际生产中,一般还是明确需要限制流计算可以回滚的最大时长,在超过这个阈值之后解除数据不可更新的限制。

另外一个更加有趣的场景发生在流计算直接读取数据库时(虽然生产环境很少这么做)。数据库的更新操作要比大数据存储频繁得多,而且优先级更高,没有办法要求数据库锁表不更新,只能依靠 MVCC 来保证写不影响读。然而,MVCC 的作用范围只有单个数据库事务,对齐到 Flink 端就是单个 Checkpoint,而 Flink 要求的可重复读是横跨多个 Checkpoint 的。这个问题是笔者在开发一个第三方的 Flink MongoDB Connector [5]时遇到的,以直接读取的方式实现的 Source 很难配合 Flink Checkpoint 机制,因此还是应该以 CDC 方式来读取数据库。

总结

不难看出,Flink 虽然已经实现流批一体引擎及其跟各种存储系统的接口,但在批场景下的结合传统 Bounded 存储系统的使用体验距离传统批计算引擎还有一定的距离或差异。当然,这也是 Iceberg、Hudi 等数据湖在近年来异军突起的原因。在批计算场景下,这些数据湖屏蔽底层文件、并发写和多版本控制的特性可以很好地弥补传统 Bounded 存储系统与 Flink 的间隙,同时也支持接近数据库的 ACID,满足 Serving 需求。

参考

  1. Pravega: StreamCut with BatchClient
  2. Flink 执行引擎:流批一体的融合之路
  3. Apache Flink 1.14.0 Release Announcement
  4. FLIP-147: Support Checkpoints After Tasks Finished
  5. MongoFlink

相关 [flink 数据 边界] 推荐:

Flink 流批一体中的数据边界

- - 时间与精神的小屋
众所周知,流场景和批场景最为根本的区别在于 Data Boundness(数据集有界性). Data Boundness 将数据分为 Bounded 和 Un-Bounded. 在业界过去多年的实践中,两者分别绑定对应领域的存储系统和计算引擎,然而在流批一体的趋势下,领域的边界在逐渐弱化. 例如,消息队列通常用作流场景,但 Pravega 的 StreamCut 支持将指定队列中某一段消息作为批处理的输入[1].

Flink 如何实时分析 Iceberg 数据湖的 CDC 数据

- - 掘金 后端
本文由李劲松、胡争分享,社区志愿者杨伟海、李培殿整理. 主要介绍在数据湖的架构中,CDC 数据实时读写的方案和原理. 文章主要分为 4 个部分内容:. 常见的 CDC 分析方案. 为何选择 Flink + Iceberg. 一、常见的 CDC 分析方案. 我们先看一下今天的 topic 需要设计的是什么.

flink-watermark

- - ITeye博客
     当我们统计用户点击的时候,有时候会因为各种情况数据延迟,我们需要一个允许最大的延迟范围进行统计.        模拟初始数据:早上10:00 11.10 用户点击了一次,但是延迟到10:00 11.15 才发送过来,允许最大延迟5秒, 5秒窗口统计. /** 实际时间-偏移量 偏移后的时间*/.

基于 Flink SQL CDC 的实时数据同步方案 (developer.aliyun.com)

- - IT瘾-jianshu
整理:陈政羽(Flink 社区志愿者). Flink 1.11 引入了 Flink SQL CDC,CDC 能给我们数据和业务间能带来什么变化. 本文由 Apache Flink PMC,阿里巴巴技术专家伍翀 (云邪)分享,内容将从传统的数据同步方案,基于 Flink CDC 同步的解决方案以及更多的应用场景和 CDC 未来开发规划等方面进行介绍和演示.

用Flink SQL CDC + ES实现数据实时化真香!

- -
本人目前参与的项目属于公司里面数据密集、计算密集的一个重要项目,需要提供高效且准确的 OLAP 服务,提供灵活且实时的报表. 业务数据存储在 MySQL 中,通过主从复制同步到报表库. 作为集团级公司,数据增长多而且快,出现了多个千万级、亿级的大表. 为了实现各个维度的各种复杂的报表业务,有些千万级大表仍然需要进行 Join,计算规模非常惊人,经常不能及时响应请求.

Flink CDC 如何简化实时数据入湖入仓

- - Jark's Blog
一、Flink CDC 介绍. 从广义的概念上讲,能够捕获数据变更的技术, 我们都可以称为 CDC 技术. 通常我们说的 CDC 技术是一种用于捕获数据库中数据变更的技术. CDC 技术应用场景也非常广泛,包括:. 数据分发,将一个数据源分发给多个下游,常用于业务解耦、微服务. 数据集成,将分散异构的数据源集成到数据仓库中,消除数据孤岛,便于后续的分析.

基于 Flink SQL 构建实数据仓库:OPPO 数据中台之基石

- - IT瘾-dev
本文整理自 2019 年 4 月 13 日在深圳举行的 Flink Meetup 会议,分享嘉宾张俊,目前担任 OPPO 大数据平台研发负责人,也是 Apache Flink contributor. - OPPO 实时数仓的演进思路;. - 基于 Flink SQL 的扩展工作;. - 构建实时数仓的应用案例;.

维度数据实时关联的实践(w/ Flink、Vert.x & Guava Cache) - 简书

- -
在流式处理作业(特别是实时数仓ETL作业)中,我们的数据流可以视为无界事实表,其中往往缺乏一些维度信息. 例如,对于埋点日志流而言,为了减少传输冗余,可能只会带有城市ID、商品ID等,如果要映射到对应的名称,就需要与外部存储中的维度表进行关联. 这里的外部存储一般是指适合OLTP场景的数据库,如MySQL、Redis、HBase等.

趣头条基于Flink+ClickHouse打造实时数据分析平台

- -
趣头条一直致力于使用大数据分析指导业务发展. 目前在实时化领域主要使用 Flink+ClickHouse 解决方案,覆盖场景包括实时数据报表、Adhoc 即时查询、事件分析、漏斗分析、留存分析等精细化运营策略,整体响应 80% 在 1 秒内完成,大大提升了用户实时取数体验,推动业务更快迭代发展. Flink to Hive 的小时级场景.

踩坑记:Flink 事件时间语义下数据乱序丢数

- - IT瘾-dev
❝ 本文详细介绍了在上游使用处理时间语义的 flink 任务出现故障后,重启消费大量积压在上游的数据并产出至下游数据乱序特别严重时,下游 flink 任务使用事件时间语义时遇到的大量丢数问题以及相关的解决方案. 「1.本次踩坑的应用场景」. 「2.应用场景中发生的丢数故障分析」. 「4.丢数故障解决方案及原理」.