Flink 如何实时分析 Iceberg 数据湖的 CDC 数据

标签: flink 实时 分析 | 发表时间:2021-02-24 06:50 | 作者:Flink_China
出处:https://juejin.cn/backend

本文由李劲松、胡争分享,社区志愿者杨伟海、李培殿整理。主要介绍在数据湖的架构中,CDC 数据实时读写的方案和原理。文章主要分为 4 个部分内容:

  1. 常见的 CDC 分析方案
  2. 为何选择 Flink + Iceberg
  3. 如何实时写入读取
  4. 未来规划

一、常见的 CDC 分析方案

我们先看一下今天的 topic 需要设计的是什么?输入是一个 CDC 或者 upsert 的数据,输出是 Database 或者是用于大数据 OLAP 分析的存储。

我们常见的输入主要有两种数据,第一种数据是数据库的 CDC 数据,不断的产生 changeLog;另一种场景是流计算产生的 upsert 数据,在最新的 Flink 1.12 版本已经支持了 upsert 数据。

1.1 离线 HBase 集群分析 CDC 数据

我们通常想到的第一个方案,就是把 CDC upsert 的数据通过 Flink 进行一些处理之后,实时的写到 HBase 当中。HBase 是一个在线的、能提供在线点查能力的一种数据库,具有非常高的实时性,对写入操作是非常友好的,也可以支持一些小范围的查询,而且集群可扩展。

image.png

这种方案其实跟普通的点查实时链路是同一套,那么用 HBase 来做大数据的 OLAP 的查询分析有什么问题呢?

首先,HBase 是一个面向点查设计的一种数据库,是一种在线服务,它的行存的索引不适合分析任务。典型的数仓设计肯定是要列存的,这样压缩效率和查询效率才会高。第二,HBase 的集群维护成本比较高。最后,HBase 的数据是 HFile,不方便与大数据里数仓当中典型的 Parquet、Avro、Orc 等结合。

1.2 Apache Kudu 维护 CDC 数据集

针对 HBase 分析能力比较弱的情况,社区前几年出现了一个新的项目,这就是 Apache Kudu 项目。Kudu 项目拥有 HBase 的点查能力的同时,采用列存,这样列存加速非常适合 OLAP 分析。

image.png

这种方案会有什么问题呢?

首先 Kudu 是比较小众的、独立的集群,维护成本也比较高,跟 HDFS、S3、OSS 比较割裂。其次由于 Kudu 在设计上保留了点查能力,所以它的批量扫描性能不如 parquet,另外 Kudu 对于 delete 的支持也比较弱,最后它也不支持增量拉取。

1.3 直接导入 CDC 到 Hive 分析

第三种方案,也是大家在数仓中比较常用的方案,就是把 MySQL 的数据写到 Hive,流程是:维护一个全量的分区,然后每天做一个增量的分区,最后把增量分区写好之后进行一次 Merge ,写入一个新的分区,流程上这样是走得通的。Hive 之前的全量分区是不受增量的影响的,只有当增量 Merge 成功之后,分区才可查,才是一个全新的数据。这种纯列存的 append 的数据对于分析是非常友好的。

image.png

这种方案会有什么问题呢?

增量数据和全量数据的 Merge 是有延时的,数据不是实时写入的,典型的是一天进行一次 Merge,这就是 T+1 的数据了。所以,时效性很差,不支持实时 upsert。每次 Merge 都需要把所有数据全部重读重写一遍,效率比较差、比较浪费资源。

1.4 Spark + Delta 分析 CDC 数据

针对这个问题,Spark + Delta 在分析 CDC 数据的时候提供了 MERGE INTO 的语法。这并不仅仅是对 Hive 数仓的语法简化,Spark + Delta 作为新型数据湖的架构(例如 Iceberg、Hudi),它对数据的管理不是分区,而是文件,因此 Delta 优化 MERGE INTO 语法,仅扫描和重写发生变化的文件即可,因此高效很多。

image.png

我们评估一下这个方案,他的优点是仅依赖 Spark + Delta 架构简洁、没有在线服务、列存,分析速度非常快。优化之后的 MERGE INTO 语法速度也够快。

这个方案,业务上是一个 Copy On Write 的一个方案,它只需要 copy 少量的文件,可以让延迟做的相对低。理论上,在更新的数据跟现有的存量没有很大重叠的话,可以把天级别的延迟做到小时级别的延迟,性能也是可以跟得上的。

image.png

这个方案在 Hive 仓库处理 upsert 数据的路上已经前进了一小步了。但小时级别的延迟毕竟不如实时更有效,因此这个方案最大的缺点在 Copy On Write 的 Merge 有一定的开销,延迟不能做的太低。

第一部分大概现有的方案就是这么多,同时还需要再强调一下,upsert 之所以如此重要,是因为在数据湖的方案中,upsert 是实现数据库准实时、实时入湖的一个关键技术点。

二、为何选择 Flink + Iceberg

2.1 Flink 对 CDC 数据消费的支持

第一,Flink 原生支持 CDC 数据消费。在前文 Spark + Delta 的方案中,MARGE INTO 的语法,用户需要感知 CDC 的属性概念,然后写到 merge 的语法上来。但是 Flink 是原生支持 CDC 数据的。用户只要声明一个 Debezium 或者其他 CDC 的 format,Flink 上面的 SQL 是不需要感知任何 CDC 或者 upsert 的属性的。Flink 中内置了 hidden column 来标识它 CDC 的类型数据,所以对用户而言比较简洁。

image.png

如下图示例,在 CDC 的处理当中,Flink 在只用声明一个 MySQL Binlog 的 DDL 语句,后面的 select 都不用感知 CDC 属性。

image.png

2.2 Flink 对 Change Log Stream 的支持

下图介绍的是 Flink 原生支持 Change Log Stream,Flink 在接入一个 Change Log Stream 之后,拓扑是不用关心 Change Log flag 的 SQL。拓扑完全是按照自己业务逻辑来定义,并且一直到最后写入 Iceberg,中间不用感知 Change Log 的 flag。

image.png

2.3 Flink + Iceberg CDC 导入方案评估

最后,Flink + Iceberg 的 CDC 导入方案的优点是什么?

对比之前的方案,Copy On Write 跟 Merge On Read 都有适用的场景,侧重点不同。Copy On Write 在更新部分文件的场景中,当只需要重写其中的一部分文件时是很高效的,产生的数据是纯 append 的全量数据集,在用于数据分析的时候也是最快的,这是 Copy On Write 的优势。

另外一个是 Merge On Read,即将数据连同 CDC flag 直接 append 到 Iceberg 当中,在 merge 的时候,把这些增量的数据按照一定的组织格式、一定高效的计算方式与全量的上一次数据进行一次 merge。这样的好处是支持近实时的导入和实时数据读取;这套计算方案的 Flink SQL 原生支持 CDC 的摄入,不需要额外的业务字段设计。

Iceberg 是统一的数据湖存储,支持多样化的计算模型,也支持各种引擎(包括 Spark、Presto、hive)来进行分析;产生的 file 都是纯列存的,对于后面的分析是非常快的;Iceberg 作为数据湖基于 snapshot 的设计,支持增量读取;Iceberg 架构足够简洁,没有在线服务节点,纯 table format 的,这给了上游平台方足够的能力来定制自己的逻辑和服务化。

image.png

三、如何实时写入读取

3.1 批量更新场景和 CDC 写入场景

首先我们来了解一下在整个数据湖里面批量更新的两个场景。

  • 第一批量更新的这种场景,在这个场景中我们使用一个 SQL 更新了成千上万行的数据,比如欧洲的 GDPR 策略,当一个用户注销掉自己的账户之后,后台的系统是必须将这个用户所有相关的数据全部物理删除。
  • 第二个场景是我们需要将 date lake 中一些拥有共同特性的数据删除掉,这个场景也是属于批量更新的一个场景,在这个场景中删除的条件可能是任意的条件,跟主键(Primary key)没有任何关系,同时这个待更新的数据集是非常大,这种作业是一个长耗时低频次的作业。

另外是 CDC 写入的场景,对于对 Flink 来说,一般常用的有两种场景,第一种场景是上游的 Binlog 能够很快速的写到 data lake 中,然后供不同的分析引擎做分析使用; 第二种场景是使用 Flink 做一些聚合操作,输出的流是 upsert 类型的数据流,也需要能够实时的写到数据湖或者是下游系统中去做分析。如下图示例中 CDC 写入场景中的 SQL 语句,我们使用单条 SQL 更新一行数据,这种计算模式是一种流式增量的导入,而且属于高频的更新。

image.png

3.2 Apache Iceberg 设计 CDC 写入方案需要考虑的问题

接下来我们看下 iceberg 对于 CDC 写入这种场景在方案设计时需要考虑哪些问题。

  • 第一是正确性,即需要保证语义及数据的正确性,如上游数据 upsert 到 iceberg 中,当上游 upsert 停止后, iceberg 中的数据需要和上游系统中的数据保持一致。
  • 第二是高效写入,由于 upsert 的写入频率非常高,我们需要保持高吞吐、高并发的写入。
  • 第三是快速读取,当数据写入后我们需要对数据进行分析,这其中涉及到两个问题,第一个问题是需要支持细粒度的并发,当作业使用多个 task 来读取时可以保证为各个 task 进行均衡的分配以此来加速数据的计算;第二个问题是我们要充分发挥列式存储的优势来加速读取。
  • 第四是支持增量读,例如一些传统数仓中的 ETL,通过增量读取来进行进一步数据转换。

image.png

3.3 Apache Iceberg Basic

在介绍具体的方案细节之前,我们先了解一下 Iceberg 在文件系统中的布局,总体来讲 Iceberg 分为两部分数据,第一部分是数据文件,如下图中的 parquet 文件,每个数据文件对应一个校验文件(.crc文件)。第二部分是表元数据文件(Metadata 文件),包含 Snapshot 文件(snap- .avro)、Manifest 文件(.avro)、TableMetadata 文件(*.json)等。

image.png

下图展示了在 iceberg 中 snapshot、manifest 及 partition 中的文件的对应关系。下图中包含了三个 partition,第一个 partition 中有两个文件 f1、f3,第二个 partition 有两个文件f4、f5,第三个 partition 有一个文件f2。对于每一次写入都会生成一个 manifest 文件,该文件记录本次写入的文件与 partition 的对应关系。再向上层有 snapshot 的概念,snapshot 能够帮助快速访问到整张表的全量数据,snapshot 记录多个 manifest,如第二个 snapshot 包含 manifest2 和 manifest3。

image.png

3.4 INSERT、UPDATE、DELETE 写入

在了解了基本的概念,下面介绍 iceberg 中 insert、update、delete 操作的设计。

下图示例的 SQL 中展示的表包含两个字段即 id、data,两个字段都是 int 类型。在一个 transaction 中我们进行了图示中的数据流操作,首先插入了(1,2)一条记录,接下来将这条记录更新为(1,3),在 iceberg 中 update 操作将会拆为 delete 和 insert 两个操作。

这么做的原因是考虑到 iceberg 作为流批统一的存储层,将 update 操作拆解为 delete 和 insert 操作可以保证流批场景做更新时读取路径的统一,如在批量删除的场景下以 Hive 为例,Hive 会将待删除的行的文件 offset 写入到 delta 文件中,然后做一次 merge on read,因为这样会比较快,在 merge 时通过 position 将原文件和 delta 进行映射,将会很快得到所有未删除的记录。

接下来又插入记录(3,5),删除了记录(1,3),插入记录(2,5),最终查询是我们得到记录(3,5)(2,5)。

image.png

上面操作看上去非常简单,但在实现中是存在一些语义上的问题。如下图中,在一个 transaction 中首先执行插入记录(1,2)的操作,该操作会在 data file1 文件中写入 INSERT(1,2),然后执行删除记录(1,2)操作,该操作会在 equalify delete file1 中写入 DELETE(1,2),接着又执行插入记录(1,2)操作,该操作会在 data file1 文件中再写入INSERT(1,2),然后执行查询操作。

在正常情况下查询结果应该返回记录 INSERT(1,2),但在实现中,DELETE(1,2)操作无法得知删除的是 data file1 文件中的哪一行,因此两行 INSERT(1,2)记录都将被删除。

image.png

那么如何来解决这个问题呢,社区当前的方式是采用了 Mixed position-delete and equality-delete。Equality-delete 即通过指定一列或多列来进行删除操作,position-delete 是根据文件路径和行号来进行删除操作,通过将这两种方法结合起来以保证删除操作的正确性。

如下图我们在第一个 transaction 中插入了三行记录,即 INSERT(1,2)、INSERT(1,3)、INSERT(1,4),然后执行 commit 操作进行提交。接下来我们开启一个新的 transaction 并执行插入一行数据(1,5),由于是新的 transaction,因此新建了一个 data file2 并写入 INSERT(1,5)记录,接下来执行删除记录(1,5),实际写入 delete 时是:

在 position delete file1 文件写入(file2, 0),表示删除 data file2 中第 0 行的记录,这是为了解决同一个 transaction 内同一行数据反复插入删除的语义的问题。 在 equality delete file1 文件中写入 DELETE (1,5),之所以写入这个 delete 是为了确保本次 txn 之前写入的 (1,5) 能被正确删除。

然后执行删除(1,4)操作,由于(1,4)在当前 transaction 中未曾插入过,因此该操作会使用 equality-delete 操作,即在 equality delete file1 中写入(1,4)记录。在上述流程中可以看出在当前方案中存在 data file、position delete file、equality delete file 三类文件。

image.png

在了解了写入流程后,如何来读取呢。如下图所示,对于 position delete file 中的记录(file2, 0)只需和当前 transaction 的 data file 进行 join 操作,对于 equality delete file 记录(1,4)和之前的 transaction 中的 data file 进行 join 操作。最终得到记录 INSERT(1,3)、INSERT(1,2)保证了流程的正确性。

image.png

3.5 Manifest 文件的设计

上面介绍了 insert、update 及 delete,但在设计 task 的执行计划时我们对 manifest 进行了一些设计,目的是通过 manifest 能够快速到找到 data file,并按照数据大小进行分割,保证每个 task 处理的数据尽可能的均匀分布。

如下图示例,包含四个 transaction,前两个 transaction 是 INSERT 操作,对应 M1、M2,第三个 transaction 是 DELETE 操作,对应 M3,第四个 transaction 是 UPDATE 操作,包含两个 manifest 文件即 data manifest 和 delete manifest。

image.png

对于为什么要对 manifest 文件拆分为 data manifest 和 delete manifest 呢,本质上是为了快速为每个 data file 找到对应的 delete file 列表。可以看下图示例,当我们在 partition-2 做读取时,需要将 deletefile-4 与datafile-2、datafile-3 做一个 join 操作,同样也需要将 deletefile-5 与 datafile-2、datafile-3 做一个 join 操作。

以 datafile-3 为例,deletefile 列表包含 deletefile-4 和 deletefile-5 两个文件,如何快速找到对应的 deletefIle 列表呢,我们可以根据上层的 manifest 来进行查询,当我们将 manifest 文件拆分为 data manifest 和 delete manifest 后,可以将 M2(data manifest)与 M3、M4(delete manifest)先进行一次 join 操作,这样便可以快速的得到 data file 所对应的 delete file 列表。

image.png

3.6 文件级别的并发

另一个问题是我们需要保证足够高的并发读取,在 iceberg 中这点做得非常出色。在 iceberg 中可以做到文件级别的并发读取,甚至文件中更细粒度的分段的并发读取,比如文件有 256MB,可以分为两个 128MB 进行并发读取。这里举例说明,假设 insert 文件跟 delete 文件在两个 Bucket 中的布局方式如下图所示。

image.png

我们通过 manifest 对比发现,datafile-2 的 delete file 列表只有 deletefile-4,这样可以将这两个文件作为一个单独的 task(图示中Task-2)进行执行,其他的文件也是类似,这样可以保证每个 task 数据较为均衡的进行 merge 操作。

image.png

对于这个方案我们做了简单的总结,如下图所示。首先这个方案的优点可以满足正确性,并且可以实现高吞吐写入和并发高效的读取,另外可以实现 snapshot 级别的增量的拉取。

当前该方案还是比较粗糙,下面也有一些可以优化的点。

  • 第一点,如果同一个 task 内的 delete file 有重复可以做缓存处理,这样可以提高 join 的效率。
  • 第二点,当 delete file 比较大需要溢写到磁盘时可以使用 kv lib 来做优化,但这不依赖外部服务或其他繁重的索引。
  • 第三点,可以设计 Bloom filter(布隆过滤器)来过滤无效的 IO,因为对于 Flink 中常用的 upsert 操作会产生一个 delete 操作和一个 insert 操作,这会导致在 iceberg 中 data file 和 delete file 大小相差不大,这样 join 的效率不会很高。如果采用 Bloom Filter,当 upsert 数据到来时,拆分为 insert 和 delete 操作,如果通过 bloom filter 过滤掉那些之前没有 insert 过数据的 delete 操作(即如果这条数据之前没有插入过,则不需要将 delete 记录写入到 delete file 中),这将极大的提高 upsert 的效率。
  • 第四点,是需要一些后台的 compaction 策略来控制 delete file 文件大小,当 delete file 越少,分析的效率越高,当然这些策略并不会影响正常的读写。

image.png

3.7 增量文件集的 Transaction 提交

前面介绍了文件的写入,下图我们介绍如何按照 iceberg 的语义进行写入并且供用户读取。主要分为数据和 metastore 两部分,首先会有 IcebergStreamWriter 进行数据的写入,但此时写入数据的元数据信息并没有写入到 metastore,因此对外不可见。第二个算子是 IcebergFileCommitter,该算子会将数据文件进行收集, 最终通过 commit transaction 来完成写入。

在 Iceberg 中并没有其他任何其他第三方服务的依赖,而 Hudi 在某些方面做了一些 service 的抽象,如将 metastore 抽象为独立的 Timeline,这可能会依赖一些独立的索引甚至是其他的外部服务来完成。

image.png

四、未来规划

下面是我们未来的一些规划,首先是 Iceberg 内核的一些优化,包括方案中涉及到的全链路稳定性测试及性能的优化, 并提供一些 CDC 增量拉取的相关 Table API 接口。

在 Flink 集成上,会实现 CDC 数据的自动和手动合并数据文件的能力,并提供 Flink 增量拉取 CDC 数据的能力。

在其他生态集成上,我们会对 Spark、Presto 等引擎进行集成,并借助 Alluxio 加速数据查询。

image.png

相关 [flink 实时 分析] 推荐:

Flink 如何实时分析 Iceberg 数据湖的 CDC 数据

- - 掘金 后端
本文由李劲松、胡争分享,社区志愿者杨伟海、李培殿整理. 主要介绍在数据湖的架构中,CDC 数据实时读写的方案和原理. 文章主要分为 4 个部分内容:. 常见的 CDC 分析方案. 为何选择 Flink + Iceberg. 一、常见的 CDC 分析方案. 我们先看一下今天的 topic 需要设计的是什么.

趣头条基于Flink+ClickHouse打造实时数据分析平台

- -
趣头条一直致力于使用大数据分析指导业务发展. 目前在实时化领域主要使用 Flink+ClickHouse 解决方案,覆盖场景包括实时数据报表、Adhoc 即时查询、事件分析、漏斗分析、留存分析等精细化运营策略,整体响应 80% 在 1 秒内完成,大大提升了用户实时取数体验,推动业务更快迭代发展. Flink to Hive 的小时级场景.

flink-watermark

- - ITeye博客
     当我们统计用户点击的时候,有时候会因为各种情况数据延迟,我们需要一个允许最大的延迟范围进行统计.        模拟初始数据:早上10:00 11.10 用户点击了一次,但是延迟到10:00 11.15 才发送过来,允许最大延迟5秒, 5秒窗口统计. /** 实际时间-偏移量 偏移后的时间*/.

Apache Flink:特性、概念、组件栈、架构及原理分析

- - 简单之美
Apache Flink是一个面向分布式数据流处理和批量数据处理的开源计算平台,它能够基于同一个Flink运行时(Flink Runtime),提供支持流处理和批处理两种类型应用的功能. 现有的开源计算方案,会把流处理和批处理作为两种不同的应用类型,因为他们它们所提供的SLA是完全不相同的:流处理一般需要支持低延迟、Exactly-once保证,而批处理需要支持高吞吐、高效处理,所以在实现的时候通常是分别给出两套实现方法,或者通过一个独立的开源框架来实现其中每一种处理方案.

美团点评基于 Flink 的实时数仓建设实践

- - 美团点评技术团队
近些年,企业对数据服务实时化服务的需求日益增多. 本文整理了常见实时数据组件的性能特点和适用场景,介绍了美团如何通过 Flink 引擎构建实时数据仓库,从而提供高效、稳健的实时数据服务. 此前我们美团技术博客发布过一篇文章《 流计算框架 Flink 与 Storm 的性能对比》,对 Flink 和 Storm 俩个引擎的计算性能进行了比较.

基于 Flink SQL CDC 的实时数据同步方案 (developer.aliyun.com)

- - IT瘾-jianshu
整理:陈政羽(Flink 社区志愿者). Flink 1.11 引入了 Flink SQL CDC,CDC 能给我们数据和业务间能带来什么变化. 本文由 Apache Flink PMC,阿里巴巴技术专家伍翀 (云邪)分享,内容将从传统的数据同步方案,基于 Flink CDC 同步的解决方案以及更多的应用场景和 CDC 未来开发规划等方面进行介绍和演示.

用Flink SQL CDC + ES实现数据实时化真香!

- -
本人目前参与的项目属于公司里面数据密集、计算密集的一个重要项目,需要提供高效且准确的 OLAP 服务,提供灵活且实时的报表. 业务数据存储在 MySQL 中,通过主从复制同步到报表库. 作为集团级公司,数据增长多而且快,出现了多个千万级、亿级的大表. 为了实现各个维度的各种复杂的报表业务,有些千万级大表仍然需要进行 Join,计算规模非常惊人,经常不能及时响应请求.

基于Flink构建实时数仓实践

- -
随着公司用户增长业务快速发展,陆续孵化出 部落、同镇、C 端会员、游戏等非常多的业务板块. 与此同时产品及运营对实时数据需求逐渐增多,帮助他们更快的做出决策,更好的进行产品迭代,实时数仓的建设变得越发重要起来. 本文主要介绍用户增长业务基于 Flink 构建实时数仓的实践之路. 如下图是早期的实时计算架构,实时数据需求较少,架构简单,数据链路少,一路到底的开发模式能很快满足业务需求;.

Flink CDC 如何简化实时数据入湖入仓

- - Jark's Blog
一、Flink CDC 介绍. 从广义的概念上讲,能够捕获数据变更的技术, 我们都可以称为 CDC 技术. 通常我们说的 CDC 技术是一种用于捕获数据库中数据变更的技术. CDC 技术应用场景也非常广泛,包括:. 数据分发,将一个数据源分发给多个下游,常用于业务解耦、微服务. 数据集成,将分散异构的数据源集成到数据仓库中,消除数据孤岛,便于后续的分析.

Flink 零基础实战教程:如何计算实时热门商品

- - Jark's Blog
在 上一篇入门教程中,我们已经能够快速构建一个基础的 Flink 程序了. 本文会一步步地带领你实现一个更复杂的 Flink 应用程序:实时热门商品. 在开始本文前我们建议你先实践一遍上篇文章,因为本文会沿用上文的 my-flink-project项目框架. 如何基于 EventTime 处理,如何指定 Watermark.