踩坑记:Flink 事件时间语义下数据乱序丢数

标签: dev | 发表时间:2020-09-23 00:00 | 作者:
出处:http://itindex.net/relian

本文详细介绍了在上游使用处理时间语义的 flink 任务出现故障后,重启消费大量积压在上游的数据并产出至下游数据乱序特别严重时,下游 flink 任务使用事件时间语义时遇到的大量丢数问题以及相关的解决方案。

本文分为以下几个部分:

  • 「1.本次踩坑的应用场景」
  • 「2.应用场景中发生的丢数故障分析」
  • 「3.待修复的故障点」
  • 「4.丢数故障解决方案及原理」
  • 「5.总结」

应用场景

应用场景如下:

  • 「flink 任务 A」「处理时间」语义做过滤产出新增 xx 明细数据至 「Kafka Y」
  • 「flink 任务 B」「事件时间」语义消费 「Kafka Y」做窗口聚合操作产出分钟级别聚合指标至 「Kafka Z」
  • 「Kafka Z」实时导入至 「Druid」以做即时 OLAP 分析,并且展示在 BI 应用看板

丢数故障分析

简要介绍下这次生产中故障场景。整条故障追踪链路如下:

故障一:

  • 收到报警反馈 「flink 任务 A」入口流量为 0
  • 定位 「flink 任务 A」中某个算子的故障导致整个 job 卡住
  • 导致此 「flink 任务 A」上游 「kafka X」积压了大量数据
  • 重启 「flink 任务 A」后,消费大量积压在上游 「kafka X」数据完成,任务恢复正常

故障一从而引发下游的故障二:

  • 由于 「flink 任务 A」使用了 「处理时间」语义处理数据,并且有过滤和 keyBy 分桶窗口逻辑,在重启后消费大量积压在上游的数据时,导致 sink rebalance 后产出到下游 「kafka Y」各个分区数据中的 server_timestamp 是乱序的
  • 下游 「flink 任务 B」在消费 「Kafka Y」时使用了 「事件时间」语义处理数据,并且使用了数据中的 server_timestamp 作为 「事件时间」时间戳
  • 「flink 任务 B」消费了乱序很严重的数据之后,导致在窗口聚合计算时丢失了大量数据
  • 最终展示在 BI 应用中的报表有丢失数据的情况

待修复的故障点

  • 1. 「flink 任务 A」的稳定性故障,这部分解决方案暂不在本文中介绍
  • 2. 「flink 任务 B」消费上游乱序丢数故障,解决方案在下文介绍

解决方案以及原理

丢数故障解决方案

解决方案是以下游 「flink 任务 B」作为切入点,直接给出 「flink 任务 B」的 sql 代码解决方案,java code 也可以按照这个方案实现,其本质原理相同。下文进行原理解释。

  SELECT   
  to_unix_timestamp(server_timestamp / bucket) AS timestamp, -- format 成原有的事件时间戳
  count(id) as id_cnt,
  sum(duration) as duration_sum
FROM
  source_table
GROUP BY
  TUMBLE(proctime, INTERVAL '1' MINUTE),
  server_timestamp / bucket -- 根据事件时间分桶计算,将相同范围(比如 1 分钟)事件时间的数据分到一个桶内

解决方案原理

首先明确一个无法避免的问题,在不考虑 watermark 允许延迟设置特别大的情况下,只要上游使用到了处理时间语义,下游使用事件时间语义,一旦上游发生故障重启并在短时间内消费大量数据,就不可避免的会出现上述错误以及故障。

在下游消费方仍然需要将对应事件时间戳的数据展示在 BI 平台报表中、并且全链路时间语义都为处理时间保障不丢数的前提下。解决方案就是在聚合并最终产出对应事件时间戳的数据。

最后的方案如下:整条链路全部为处理时间语义,窗口计算也使用处理时间,但是产出数据中的时间戳全部为事件时间戳。在出现故障的场景下,一分钟的窗口内的数据的事件时间戳可能相差几个小时,但在最终窗口聚合时可以根据事件时间戳划分到对应的事件时间窗口内,下游 BI 应用展示时使用此事件时间戳即可。

注意:sql 中的 bucket 需要根据具体使用场景进行设置,如果设置过于小,比如非故障场景下按照处理时间开 1 分钟的窗口,bucket 设为 60000(1 分钟),那么极有可能,这个时间窗口中所有数据的 server_timestamp 都集中在某两分钟内,那么这些数据就会被分到两个桶(bucket)内,则会导致严重的数据倾斜。

输入数据样例

模拟上述故障, 「flink B」的任务某一个窗口内的数据输入如下。

server_timestamp id duration
2020/9/01 21:14:38 1 300
2020/9/01 21:14:50 1 500
2020/9/01 21:25:38 2 600
2020/9/01 21:25:38 3 900
2020/9/01 21:25:38 2 800

输出数据样例

按照上述解决方案中的 sql 处理过后,输出数据如下,则可以解决此类型丢数故障。

timestamp id_cnt duration_sum
2020/9/01 21:14:00 2 900
2020/9/01 21:25:00 3 2300

总结

本文分析了在 flink 应用中:

  • 「上游使用处理时间语义的 flink 任务出现故障、重启消费大量积压数据并产出至下游数据乱序特别严重时,下游使用事件时间语义时遇到的大量丢数问题」
  • 「以整条链路为处理时间语义的前提下,产出的数据时间戳为事件时间戳解决上述问题」
  • 「以 sql 代码给出了丢数故障解决方案样例」

学习资料

flink

  • https://github.com/flink-china/flink-training-course/blob/master/README.md
  • https://ververica.cn/developers-resources/
  • https://space.bilibili.com/33807709


相关 [flink 事件 时间] 推荐:

踩坑记:Flink 事件时间语义下数据乱序丢数

- - IT瘾-dev
❝ 本文详细介绍了在上游使用处理时间语义的 flink 任务出现故障后,重启消费大量积压在上游的数据并产出至下游数据乱序特别严重时,下游 flink 任务使用事件时间语义时遇到的大量丢数问题以及相关的解决方案. 「1.本次踩坑的应用场景」. 「2.应用场景中发生的丢数故障分析」. 「4.丢数故障解决方案及原理」.

flink-watermark

- - ITeye博客
     当我们统计用户点击的时候,有时候会因为各种情况数据延迟,我们需要一个允许最大的延迟范围进行统计.        模拟初始数据:早上10:00 11.10 用户点击了一次,但是延迟到10:00 11.15 才发送过来,允许最大延迟5秒, 5秒窗口统计. /** 实际时间-偏移量 偏移后的时间*/.

一文精通 Flink on YARN

- - IT瘾-dev
本文主要是讲解flink on yarn的部署过程,然后yarn-session的基本原理,如何启动多个yarn-session的话如何部署应用到指定的yarn-session上,然后是用户jar的管理配置及故障恢复相关的参数. flink on yarn的整个交互过程图,如下:. 要使得flink运行于yarn上,flink要能找到hadoop配置,因为要连接到yarn的resourcemanager和hdfs.

Flink SQL 编程实践

- - Jark's Blog
注: 本教程实践基于 Ververica 开源的. sql-training 项目. 基于 Flink 1.7.2. 本文将通过五个实例来贯穿 Flink SQL 的编程实践,主要会涵盖以下几个方面的内容. 如何使用 SQL CLI 客户端. 如何在流上运行 SQL 查询. 运行 window aggregate 与 non-window aggregate,理解其区别.

谈谈 Flink Shuffle 演进

- - 时间与精神的小屋
在分布式计算中,Shuffle 是非常关键但常常容易被忽视的一环. 比如著名的 MapReduce 的命名跳过 Shuffle ,只包含其前后的 Map 跟 Reduce. 背后原因一方面是 Shuffle 是底层框架在做的事情,用户基本不会感知到其存在,另一方面是 Shuffle 听起来似乎是比较边缘的基础服务.

Flink 1.16:Hive SQL 如何平迁到 Flink SQL

- - Jark's Blog
Hive SQL 迁移的动机. Flink 已经是流计算的事实标准,当前国内外做实时计算或流计算一般都会选择 Flink 和 Flink SQL. 另外,Flink 也是是家喻户晓的流批一体大数据计算引擎. 然而,目前 Flink 也面临着挑战. 比如虽然现在大规模应用都以流计算为主,但 Flink 批计算的应用并不广泛,想要进一步推动真正意义上的流批一体落地,需要推动业界更多地落地 Flink 批计算,需要更积极地拥抱现有的离线生态.

Flink Kafka Connector与Exactly Once剖析

- - SegmentFault 最新的文章
Flink Kafa Connector是Flink内置的Kafka连接器,它包含了从Kafka Topic读入数据的 Flink Kafka Consumer以及向Kafka Topic写出数据的 Flink Kafka Producer,除此之外Flink Kafa Connector基于Flink Checkpoint机制提供了完善的容错能力.

Flink在唯品会的实践

- - DockOne.io
唯品会自2017年开始基于Kubernetes深入打造高性能、稳定、可靠、易用的实时计算平台,支持唯品会内部业务在平时以及大促的平稳运行. 现平台支持Flink、Spark、Storm等主流框架. 本文主要分享Flink的容器化实践应用以及产品化经验. 平台支持公司内部所有部门的实时计算应用. 主要的业务包括实时大屏,推荐,实验平台,实时监控和实时数据清洗等.

使用 Kubernetes 部署 Flink 应用

- - 张吉的博客
Kubernetes 是目前非常流行的容器编排系统,在其之上可以运行 Web 服务、大数据处理等各类应用. 这些应用被打包在一个个非常轻量的容器中,我们通过声明的方式来告知 Kubernetes 要如何部署和扩容这些程序,并对外提供服务. Flink 同样是非常流行的分布式处理框架,它也可以运行在 Kubernetes 之上.

Flink CDC 核心:Debezium 1.9.0.Beta1 发布!

- - IT瘾-dev
我很高兴地宣布 Debezium  1.9.0.Beta1的发布. 此版本包括 Debezium Server 的许多新功能,包括 Knative Eventing 支持和使用 Redis 接收器的偏移存储管理、SQL Server 连接器的多分区缩放以及各种错误修复和改进. 总体而言,此版本已修复56 个问题.