如何收集 Yarn/K8s 集群中的 Flink 任务日志?
从一个小需求(任务异常或者失败时排查问题不便)出发,开始调研业界常见的解决方案进行解决我们的需求,接着有对应的代码实现和效果展示。
背景
不管是 Flink On Yarn 还是 On k8s,如果任务正常运行,我们是可以通过 Flink Web UI 去查看 JobManager 和 TaskManager 日志,虽然日志量大的时候去不同的 TaskManager 找日志有点困难(如何快速知道日志在哪个 TaskManager 上;在 TaskManager 里面可能有多个滚动的日志文件,如何快速找到 root cause 异常;如果 TaskManager OOM 掉了该容器的日志就看不到了),但是起码给了一个可以看日志的途径。
熟悉 Flink On Yarn 的应该知道 Flink 任务运行结束/失败后,只能去 Yarn UI 看到任务的 Jobmanager 日志,对于 TaskManager 日志这些是看不到的,这对于有时候想排查下任务失败的原因日志会比较困难(不过大多数任务挂掉的原因日志都会在 Jobmanager 存在)。
熟悉 Flink On K8s 的那更能体验到查看日志的痛苦了,在任务运行失败和结束后,所有的 Pod 都会退出,如果没有收集这些运行日志,那几乎很难知道任务为啥会失败。
Flink History Server 不像 Spark History Server 一样可以看到任务所有运行的 Excutor 日志,所以对于故障定位 Flink 任务异常日志这个场景,Flink 自带的那些体验不是很友好。因此也有本文的出现,来讲述一下如何针对上面两种运行模式下 Flink 任务的日志收集,来解决我们不方便定位任务异常失败的需求。
当然了,我们收集到这些日志数据后,可以用来做异常日志告警提醒任务负责人作业异常信息(这个后面可以专门开篇文章来写),也可以收集起来存储到 ES,方便用户排查任务异常日志。
方案选择
常见的收集日志方案有下面两种:
1、统一 LogAgent 收集。不管是使用 Flink On Yarn 还是 Flink On K8s,日志都可以配置一个路径(路径有规则),然后每台计算节点机器专门部署一个 LogAgent (比如有 Filebeat)去收集这些运行日志。K8s 的话会比 Yarn 的日志要收集的话稍微会复杂一些,需要 Flink 任务挂载磁盘,这样日志文件数据路径比较固定,否则日志文件是在容器 Pod 内,会随着 Pod 的生命周期而消失。这种方式需要在每台机器都部署一个专门用来收集日志的 Agent,还要额外维护它的稳定性,不然可能会漏收集到任务的日志。
2、自定义 Kafka Appender。这种方式要根据日志框架进行自定义一个 Appender,将定义好的 Appender 打包后放到 Flink lib 目录,然后配置好 log4j 配置,任务启动后会自动加载这个依赖,运行过程中会自动实时将日志发送到 Kafka。这个 Appender 定义可以比较灵活(具体的可以看下文的代码实现),比如加入一些过滤条件:只收集 warn 级别以上的日志(因为任务多了的话收集所有的级别日志数据量会很大,但是对排查问题带来的作用有限)。这种方式和任务运行在 Yarn 和 K8s 无关,都可以正常收集日志,不用单独配置,也不用单独去维护什么组件的稳定性,唯一的缺点就是对已经在运行的任务如果想要收集日志需要重启一下即可,相比来说我个人觉得还是这种方式会比较合适。
整个架构如上,你理解图中的 Reporter 包含三个:第一个是自定义的 Kafka Appender,第二个是自定义的 Kafka Metrics Reporter,第三个是根据官方的 Prometheus PushGateway Metrics Reporter 做了内部改造的。前两个是本篇要讲解的,后两个后面也可以单独再开文章来讲。
自定义 Kafka Appender
需求
自定义 Kafka Appender,将日志数据发到 kafka,但是如何对日志数据进行标识,需要利用 log.file 环境变量来获取作业的 application id、container id、host、jobmanager/taskmanager 信息等。
1 | 2021-07-27 20:39:55,777 INFO org.apache.flink.yarn.YarnTaskExecutorRunner [] - -Dout.file=/data/HDATA/yarn/logs/application_1596708697506_12674/container_e11_1596708697506_12674_01_000005/taskmanager.out |
上面这个是针对 Yarn 可以根据 log.file 这个环境变量拿到作业的一些维度数据,其实 K8s 也可以在环境变量中拿到这些想要的信息,可能需要改造点 Flink-K8s 模块代码(其实就是添加一点容器运行环境变量,比如 K8s 中的任务 ClusterId、运行的物理机器 IP、Pod IP),如下图所示:
修改框架源码
在 1.10 和 1.12 中,Flink 作业在发生状态转换成 FAILED 时,作业打印出来的日志级别竟然是 info,而这种日志是作业异常的根因,对于我们要收集作业的异常日志,那么就得更改源码,提高日志级别。
提高这些地方的日志级别后,即可收集作业出现 failed 或者 restarting 时的异常日志,从而可以方便用户定位作业异常的原因(当然了,上面这些属于锦上添花修改了,如果不改其实有办法能够在自定义收集器里面去收集到这些数据)。
因为早期开发的时候公司内部 Flink 1.10 和 1.12 两个版本共存,1.10 使用的是log4j,1.12 使用的是 log4j2,两个版本不一致所以自定义 Kafka Appender 也有点区别,所以分别开发了两个版本,这里也都讲一下,请挑选自己需要的阅读。
我已经将代码上传至 Github 了,感兴趣可以参考一下: https://github.com/zhisheng17/flink-learning/tree/master/flink-learning-extends/FlinkLogKafkaAppender 如果对你有帮助的话可以帮忙点个 star。代码结构如下图所示:
FlinkLogKafkaAppender 父模块引入了基础需要的依赖:
1 | <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" |
- KafkaAppenderCommon 模块:定义了日志类和工具类
- Log4j2KafkaAppender 模块:针对使用了 log4j2 的 Flink 版本
- Log4jKafkaAppender 模块:针对使用了 log4j 的 Flink 版本
KafkaAppenderCommon
定义的要发送到 Kafka 的数据结构类:
1 | public class LogEvent { |
序列化工具类:
1 | public class JacksonUtil { |
对异常日志数据处理的工具类:
1 | public class ExceptionUtil { |
自定义 Log4j2 Kafka Appender
引入依赖:
1 | <?xml version="1.0" encoding="UTF-8"?> |
自定义的 Kafka Appender 类:
1 | package com.zhisheng.log.appender; |
修改 Log4j2 配置:
1 | ################################################################################ |
注意,针对 K8s 需要修改的是 log4j-console.properties 才能生效
发到 Kafka 数据的结构:
1 | { |
自定义 Log4j Kafka Appender
引入依赖:
1 | <?xml version="1.0" encoding="UTF-8"?> |
KafkaLog4jAppender: 继承 AppenderSkeleton 抽象类
1 | public class KafkaLog4jAppender extends AppenderSkeleton { |
将上面打出来的 jar,放到 flink 的 lib 目录下面,修改 log4j.properties 配置文件:
1 | # This affects logging for both user code and Flink |
注意上面的一个解决死锁问题的配置,我们在生产也遇到过,打出来的 jstack 的信息为:
1 | 2021-08-20 01:19:24 |
发到 Kafka 数据的结构:
1 | { |
Flink SQL 存储日志到 ES
通过 Flink SQL 将异常日志数据存储到 ES,按天索引:
1 | CREATE TABLE yarn_flink_warn_logs ( |
单个作业日志查看,可以通过 application_id/container_type/container_id 过滤,当然你可以看到上面自定义里面我们还加入了 taskId 和 taskName 的维度数据,这两个是我们实时计算平台的维度数据,也可以根据这两个进行过滤。
单条日志查看:
实时监控集群作业异常日志数据量
方便知道任务的异常日志情况,有的任务如果任务突然报出很多异常日志说明有抖动或者报错,对任务负责人和集群负责人都可以做一个提醒通知,起到预警作用。
1 | set table.exec.emit.early-fire.enabled=true; |
总结
本文从一个小需求(任务异常或者失败时排查问题不便)出发,开始调研业界常见的解决方案进行解决我们的需求,接着有对应的代码实现和效果展示。