如何收集 Yarn/K8s 集群中的 Flink 任务日志?

标签: 大数据 Flink 流式计算 | 发表时间:2023-03-17 00:00 | 作者:
出处:http://www.54tianzhisheng.cn/

从一个小需求(任务异常或者失败时排查问题不便)出发,开始调研业界常见的解决方案进行解决我们的需求,接着有对应的代码实现和效果展示。

背景

不管是 Flink On Yarn 还是 On k8s,如果任务正常运行,我们是可以通过 Flink Web UI 去查看 JobManager 和 TaskManager 日志,虽然日志量大的时候去不同的 TaskManager 找日志有点困难(如何快速知道日志在哪个 TaskManager 上;在 TaskManager 里面可能有多个滚动的日志文件,如何快速找到 root cause 异常;如果 TaskManager OOM 掉了该容器的日志就看不到了),但是起码给了一个可以看日志的途径。

熟悉 Flink On Yarn 的应该知道 Flink 任务运行结束/失败后,只能去 Yarn UI 看到任务的 Jobmanager 日志,对于 TaskManager 日志这些是看不到的,这对于有时候想排查下任务失败的原因日志会比较困难(不过大多数任务挂掉的原因日志都会在 Jobmanager 存在)。

熟悉 Flink On K8s 的那更能体验到查看日志的痛苦了,在任务运行失败和结束后,所有的 Pod 都会退出,如果没有收集这些运行日志,那几乎很难知道任务为啥会失败。

Flink History Server 不像 Spark History Server 一样可以看到任务所有运行的 Excutor 日志,所以对于故障定位 Flink 任务异常日志这个场景,Flink 自带的那些体验不是很友好。因此也有本文的出现,来讲述一下如何针对上面两种运行模式下 Flink 任务的日志收集,来解决我们不方便定位任务异常失败的需求。

当然了,我们收集到这些日志数据后,可以用来做异常日志告警提醒任务负责人作业异常信息(这个后面可以专门开篇文章来写),也可以收集起来存储到 ES,方便用户排查任务异常日志。

方案选择

常见的收集日志方案有下面两种:

1、统一 LogAgent 收集。不管是使用 Flink On Yarn 还是 Flink On K8s,日志都可以配置一个路径(路径有规则),然后每台计算节点机器专门部署一个 LogAgent (比如有 Filebeat)去收集这些运行日志。K8s 的话会比 Yarn 的日志要收集的话稍微会复杂一些,需要 Flink 任务挂载磁盘,这样日志文件数据路径比较固定,否则日志文件是在容器 Pod 内,会随着 Pod 的生命周期而消失。这种方式需要在每台机器都部署一个专门用来收集日志的 Agent,还要额外维护它的稳定性,不然可能会漏收集到任务的日志。

2、自定义 Kafka Appender。这种方式要根据日志框架进行自定义一个 Appender,将定义好的 Appender 打包后放到 Flink lib 目录,然后配置好 log4j 配置,任务启动后会自动加载这个依赖,运行过程中会自动实时将日志发送到 Kafka。这个 Appender 定义可以比较灵活(具体的可以看下文的代码实现),比如加入一些过滤条件:只收集 warn 级别以上的日志(因为任务多了的话收集所有的级别日志数据量会很大,但是对排查问题带来的作用有限)。这种方式和任务运行在 Yarn 和 K8s 无关,都可以正常收集日志,不用单独配置,也不用单独去维护什么组件的稳定性,唯一的缺点就是对已经在运行的任务如果想要收集日志需要重启一下即可,相比来说我个人觉得还是这种方式会比较合适。

整个架构如上,你理解图中的 Reporter 包含三个:第一个是自定义的 Kafka Appender,第二个是自定义的 Kafka Metrics Reporter,第三个是根据官方的 Prometheus PushGateway Metrics Reporter 做了内部改造的。前两个是本篇要讲解的,后两个后面也可以单独再开文章来讲。

自定义 Kafka Appender

需求

自定义 Kafka Appender,将日志数据发到 kafka,但是如何对日志数据进行标识,需要利用 log.file 环境变量来获取作业的 application id、container id、host、jobmanager/taskmanager 信息等。

1     
2
3
2021-07-27 20:39:55,777 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     -Dout.file=/data/HDATA/yarn/logs/application_1596708697506_12674/container_e11_1596708697506_12674_01_000005/taskmanager.out     
2021-07-27 20:39:55,777 INFO org.apache.flink.yarn.YarnTaskExecutorRunner [] - -Dlog.file=/data/HDATA/yarn/logs/application_1596708697506_12674/container_e11_1596708697506_12674_01_000005/taskmanager.log
2021-07-27 20:39:55,777 INFO org.apache.flink.yarn.YarnTaskExecutorRunner [] - -Derr.file=/data/HDATA/yarn/logs/application_1596708697506_12674/container_e11_1596708697506_12674_01_000005/taskmanager.err

上面这个是针对 Yarn 可以根据 log.file 这个环境变量拿到作业的一些维度数据,其实 K8s 也可以在环境变量中拿到这些想要的信息,可能需要改造点 Flink-K8s 模块代码(其实就是添加一点容器运行环境变量,比如 K8s 中的任务 ClusterId、运行的物理机器 IP、Pod IP),如下图所示:

修改框架源码

在 1.10 和 1.12 中,Flink 作业在发生状态转换成 FAILED 时,作业打印出来的日志级别竟然是 info,而这种日志是作业异常的根因,对于我们要收集作业的异常日志,那么就得更改源码,提高日志级别。

提高这些地方的日志级别后,即可收集作业出现 failed 或者 restarting 时的异常日志,从而可以方便用户定位作业异常的原因(当然了,上面这些属于锦上添花修改了,如果不改其实有办法能够在自定义收集器里面去收集到这些数据)。

因为早期开发的时候公司内部 Flink 1.10 和 1.12 两个版本共存,1.10 使用的是log4j,1.12 使用的是 log4j2,两个版本不一致所以自定义 Kafka Appender 也有点区别,所以分别开发了两个版本,这里也都讲一下,请挑选自己需要的阅读。

我已经将代码上传至 Github 了,感兴趣可以参考一下: https://github.com/zhisheng17/flink-learning/tree/master/flink-learning-extends/FlinkLogKafkaAppender 如果对你有帮助的话可以帮忙点个 star。代码结构如下图所示:

FlinkLogKafkaAppender 父模块引入了基础需要的依赖:

1     
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"     
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>flink-learning-extends</artifactId>
<groupId>com.zhisheng.flink</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>FlinkLogKafkaAppender</artifactId>
<packaging>pom</packaging>

<name>FlinkLogKafkaAppender</name>

<modules>
<module>Log4jKafkaAppender</module>
<module>Log4j2KafkaAppender</module>
<module>KafkaAppenderCommon</module>
</modules>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<kafka.version>2.4.1</kafka.version>
<scope>provided</scope>
<jackson.version>2.11.0</jackson.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.6</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
  • KafkaAppenderCommon 模块:定义了日志类和工具类
  • Log4j2KafkaAppender 模块:针对使用了 log4j2 的 Flink 版本
  • Log4jKafkaAppender 模块:针对使用了 log4j 的 Flink 版本

KafkaAppenderCommon

定义的要发送到 Kafka 的数据结构类:

1     
2
3
4
5
6
7
8
9
10
11
12
13
public class LogEvent {     

private String source; // default is flink, maybe others will use this kafka appender in future

private String id; // log id, default it is UUID

private Long timestamp;

private String content; // log message

private Map<String, String> tags = new HashMap<>(); // tags of the log, eg: host_name, application_id, job_name etc

}

序列化工具类:

1     
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class JacksonUtil {     

private final static ObjectMapper mapper = new ObjectMapper();

/**
* 将对象转换成普通的 JSON 数据
*
* @param value
* @return
* @throws JsonProcessingException
*/
public static String toJson(Object value) throws JsonProcessingException {
return mapper.writeValueAsString(value);
}

/**
* 将对象转换成结构化的 JSON 数据
*
* @param value
* @return
* @throws JsonProcessingException
*/
public static String toFormatJson(Object value) throws JsonProcessingException {
return mapper.writerWithDefaultPrettyPrinter().writeValueAsString(value);
}
}

对异常日志数据处理的工具类:

1     
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
public class ExceptionUtil {     

private static final char TAB = '';
private static final char CR = '\r';
private static final char LF = '\n';
private static final String SPACE = " ";
private static final String EMPTY = "";

/**
* 堆栈转为单行完整字符串
*
* @param throwable 异常对象
* @param limit 限制最大长度
* @return 堆栈转为的字符串
*/
public static String stacktraceToOneLineString(Throwable throwable, int limit) {
Map<Character, String> replaceCharToStrMap = new HashMap<>();
replaceCharToStrMap.put(CR, SPACE);
replaceCharToStrMap.put(LF, SPACE);
replaceCharToStrMap.put(TAB, SPACE);
return stacktraceToString(throwable, limit, replaceCharToStrMap);
}

public static String stacktraceToString(Throwable throwable) {
final OutputStream baos = new ByteArrayOutputStream();
throwable.printStackTrace(new PrintStream(baos));
return baos.toString();
}


/**
* 堆栈转为完整字符串
*
* @param throwable 异常对象
* @param limit 限制最大长度
* @param replaceCharToStrMap 替换字符为指定字符串
* @return 堆栈转为的字符串
*/
private static String stacktraceToString(Throwable throwable, int limit, Map<Character, String> replaceCharToStrMap) {
final OutputStream baos = new ByteArrayOutputStream();
throwable.printStackTrace(new PrintStream(baos));
String exceptionStr = baos.toString();
int length = exceptionStr.length();
if (limit > 0 && limit < length) {
length = limit;
}

if (!replaceCharToStrMap.isEmpty()) {
final StringBuilder sb = new StringBuilder();
char c;
String value;
for (int i = 0; i < length; i++) {
c = exceptionStr.charAt(i);
value = replaceCharToStrMap.get(c);
if (null != value) {
sb.append(value);
} else {
sb.append(c);
}
}
return sb.toString();
} else {
return sub(exceptionStr, 0, limit);
}
}

/**
* 改进JDK subString<br>
* index从0开始计算,最后一个字符为-1<br>
* 如果from和to位置一样,返回 "" <br>
* 如果from或to为负数,则按照length从后向前数位置,如果绝对值大于字符串长度,则from归到0,to归到length<br>
* 如果经过修正的index中from大于to,则互换from和to example: <br>
* abcdefgh 2 3 =》 c <br>
* abcdefgh 2 -3 =》 cde <br>
*
* @param str String
* @param fromIndex 开始的index(包括)
* @param toIndex 结束的index(不包括)
* @return 字串
*/
private static String sub(CharSequence str, int fromIndex, int toIndex) {
if (isEmpty(str)) {
return str(str);
}
int len = str.length();

if (fromIndex < 0) {
fromIndex = len + fromIndex;
if (fromIndex < 0) {
fromIndex = 0;
}
} else if (fromIndex > len) {
fromIndex = len;
}

if (toIndex < 0) {
toIndex = len + toIndex;
if (toIndex < 0) {
toIndex = len;
}
} else if (toIndex > len) {
toIndex = len;
}

if (toIndex < fromIndex) {
int tmp = fromIndex;
fromIndex = toIndex;
toIndex = tmp;
}

if (fromIndex == toIndex) {
return EMPTY;
}

return str.toString().substring(fromIndex, toIndex);
}

/**
* {@link CharSequence} 转为字符串,null安全
*
* @param cs {@link CharSequence}
* @return 字符串
*/
private static String str(CharSequence cs) {
return null == cs ? null : cs.toString();
}

/**
* 字符串是否为空,空的定义如下:<br>
* 1、为null <br>
* 2、为""<br>
*
* @param str 被检测的字符串
* @return 是否为空
*/
private static boolean isEmpty(CharSequence str) {
return str == null || str.length() == 0;
}
}

自定义 Log4j2 Kafka Appender

引入依赖

1     
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
<?xml version="1.0" encoding="UTF-8"?>     
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>LogKafkaAppender</artifactId>
<groupId>org.example</groupId>
<version>1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>Log4j2KafkaAppender</artifactId>

<properties>
<log4j.version>2.12.1</log4j.version>
<flink.shaded.version>12.0</flink.shaded.version>
<jackson.version>2.10.1</jackson.version>
</properties>


<dependencies>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>${scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>${scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>${scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-jackson</artifactId>
<version>${jackson.version}-${flink.shaded.version}</version>
<scope>${scope}</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<id>shade-flink</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<includes>
<include>org.apache.kafka:*</include>
</includes>
</artifactSet>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

自定义的 Kafka Appender 类

1     
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
package com.zhisheng.log.appender;     

import com.zhisheng.flink.model.LogEvent;
import com.zhisheng.flink.util.ExceptionUtil;
import com.zhisheng.flink.util.JacksonUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.ConfigException;
import org.apache.logging.log4j.core.Filter;
import org.apache.logging.log4j.core.Layout;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.config.Property;
import org.apache.logging.log4j.core.config.plugins.Plugin;
import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
import org.apache.logging.log4j.core.config.plugins.PluginElement;
import org.apache.logging.log4j.core.config.plugins.PluginFactory;
import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;

import java.io.File;
import java.io.Serializable;
import java.net.InetAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;

@Slf4j
@Plugin(name = "KafkaLog4j2Appender", category = "Core", elementType = "appender", printObject = true)
public class KafkaLog4j2Appender extends AbstractAppender {

private final String source;

private final String topic;

private final String level;

private final Producer<String, String> producer;

private String appId;

private String containerId;

private String containerType;

private final String taskName;

private final String taskId;

private String nodeIp;

protected KafkaLog4j2Appender(String name, Filter filter, Layout<? extends Serializable> layout, boolean ignoreExceptions, Property[] properties, String source, String bootstrapServers, String topic, String level) {
super(name, filter, layout, ignoreExceptions, properties);
this.source = source;
this.topic = topic;
this.level = level;

Properties envProperties = System.getProperties();
Map<String, String> envs = System.getenv();
String clusterId = envs.get("CLUSTER_ID");
if (clusterId != null) {
//k8s cluster
appId = clusterId;
containerId = envs.get("HOSTNAME");
if (envs.get("HOSTNAME").contains("taskmanager")) {
containerType = "taskmanager";
} else {
containerType = "jobmanager";
}
//k8s 物理机器 ip
if (envs.get("_HOST_IP_ADDRESS") != null) {
nodeIp = envs.get("_HOST_IP_ADDRESS");
}
} else {
//yarn cluster
String logFile = envProperties.getProperty("log.file");
String[] values = logFile.split(File.separator);
if (values.length >= 3) {
appId = values[values.length - 3];
containerId = values[values.length - 2];
String log = values[values.length - 1];
if (log.contains("jobmanager")) {
containerType = "jobmanager";
} else if (log.contains("taskmanager")) {
containerType = "taskmanager";
} else {
containerType = "others";
}
} else {
log.error("log.file Property ({}) doesn't contains yarn application id or container id", logFile);
}
}

taskName = envProperties.getProperty("taskName", null);
taskId = envProperties.getProperty("taskId", null);

Properties props = new Properties();
for (Property property : properties) {
props.put(property.getName(), property.getValue());
}

if (bootstrapServers != null) {
props.setProperty("bootstrap.servers", bootstrapServers);
} else {
throw new ConfigException("The bootstrap servers property must be specified");
}
if (this.topic == null) {
throw new ConfigException("Topic must be specified by the Kafka log4j appender");
}

String clientIdPrefix = taskId != null ? taskId : appId;

if (clientIdPrefix != null) {
props.setProperty("client.id", clientIdPrefix + "_log");
}

if (props.getProperty("acks") == null) {
props.setProperty("acks", "0");
}

if (props.getProperty("retries") == null) {
props.setProperty("retries", "0");
}

if (props.getProperty("batch.size") == null) {
props.setProperty("batch.size", "16384");
}

if (props.getProperty("linger.ms") == null) {
props.setProperty("linger.ms", "5");
}

if (props.getProperty("compression.type") == null) {
props.setProperty("compression.type", "lz4");
}

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

producer = new KafkaProducer<>(props);
}


@Override
public void append(org.apache.logging.log4j.core.LogEvent event) {
try {
if (level.contains(event.getLevel().toString().toUpperCase()) && !event.getLoggerName().contains("xxx")) { //控制哪些类的日志不收集
producer.send(new ProducerRecord<>(topic, appId, subAppend(event)));
}
} catch (Exception e) {
log.warn("Parsing the log event or send log event to kafka has exception", e);
}
}

private String subAppend(org.apache.logging.log4j.core.LogEvent event) throws JsonProcessingException {
LogEvent logEvent = new LogEvent();
Map<String, String> tags = new HashMap<>();
String logMessage = null;
try {
InetAddress inetAddress = InetAddress.getLocalHost();
tags.put("host_name", inetAddress.getHostName());
tags.put("host_ip", inetAddress.getHostAddress());
} catch (Exception e) {
log.error("Error getting the ip and host name of the node where the job({}) is running", appId, e);
} finally {
try {
logMessage = ExceptionUtil.stacktraceToString(event.getThrown());
logEvent.setContent(logMessage);
} catch (Exception e) {
if (logMessage != null) {
logMessage = logMessage + "\n\t" + e.getMessage();
}
logEvent.setContent(logMessage);
} finally {
logEvent.setId(UUID.randomUUID().toString());
logEvent.setTimestamp(event.getTimeMillis());
logEvent.setSource(source);
if (logMessage != null) {
logMessage = event.getMessage().getFormattedMessage() + "\n" + logMessage;
} else {
logMessage = event.getMessage().getFormattedMessage();
}
logEvent.setContent(logMessage);

StackTraceElement eventSource = event.getSource();
tags.put("class_name", eventSource.getClassName());
tags.put("method_name", eventSource.getMethodName());
tags.put("file_name", eventSource.getFileName());
tags.put("line_number", String.valueOf(eventSource.getLineNumber()));

tags.put("logger_name", event.getLoggerName());
tags.put("level", event.getLevel().toString());
tags.put("thread_name", event.getThreadName());
tags.put("app_id", appId);
tags.put("container_id", containerId);
tags.put("container_type", containerType);
if (taskId != null) {
tags.put("task_id", taskId);
}
if (taskName != null) {
tags.put("task_name", taskName);
}
if (nodeIp != null) {
tags.put("node_ip", nodeIp);
}
logEvent.setTags(tags);
}
}
return JacksonUtil.toJson(logEvent);
}


@PluginFactory
public static KafkaLog4j2Appender createAppender(@PluginElement("Layout") final Layout<? extends Serializable> layout,
@PluginElement("Filter") final Filter filter,
@Required(message = "No name provided for KafkaLog4j2Appender") @PluginAttribute("name") final String name,
@PluginAttribute(value = "ignoreExceptions", defaultBoolean = true) final boolean ignoreExceptions,
@Required(message = "No bootstrapServers provided for KafkaLog4j2Appender") @PluginAttribute("bootstrapServers") final String bootstrapServers,
@Required(message = "No source provided for KafkaLog4j2Appender") @PluginAttribute("source") final String source,
@Required(message = "No topic provided for KafkaLog4j2Appender") @PluginAttribute("topic") final String topic,
@Required(message = "No level provided for KafkaLog4j2Appender") @PluginAttribute("level") final String level,
@PluginElement("Properties") final Property[] properties) {
return new KafkaLog4j2Appender(name, filter, layout, ignoreExceptions, properties, source, bootstrapServers, topic, level);
}

@Override
public void stop() {
super.stop();
if (producer != null) {
producer.close();
}
}
}

修改 Log4j2 配置

1     
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
################################################################################     
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

# monitorInterval=30

# This affects logging for both user code and Flink
rootLogger.level = INFO
rootLogger.appenderRef.file.ref = MainAppender
rootLogger.appenderRef.kafka.ref = KafkaLog4j2Appender


# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO

# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO

# Log all infos in the given file
appender.main.name = MainAppender
appender.main.type = RollingFile
appender.main.append = true
appender.main.fileName = ${sys:log.file}
appender.main.filePattern = ${sys:log.file}.%i
appender.main.layout.type = PatternLayout
appender.main.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.main.policies.type = Policies
appender.main.policies.size.type = SizeBasedTriggeringPolicy
appender.main.policies.size.size = 100MB
appender.main.policies.startup.type = OnStartupTriggeringPolicy
appender.main.strategy.type = DefaultRolloverStrategy
appender.main.strategy.max = ${env:MAX_LOG_FILE_NUMBER:-10}


appender.kafka.name = KafkaLog4j2Appender
appender.kafka.type = KafkaLog4j2Appender
appender.kafka.source = flink-1.12.0
appender.kafka.bootstrapServers=http://fat-kafka1.com.cn:9092,http://fat-kafka2.com.cn:9092,http://fat-kafka3.com.cn:9092
appender.kafka.topic = yarn_flink_log
appender.kafka.level = ERROR,WARN

logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF

注意,针对 K8s 需要修改的是 log4j-console.properties 才能生效

发到 Kafka 数据的结构

1     
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
{     
"source": "flink-1.12.0",
"id": "855986dd-993a-4efe-99ab-658fe6bf1683",
"timestamp": 1628131862944,
"content": "Source: Custom Source (1/2) (6302f688e5405815b719bb236634f341) switched from RUNNING to FAILED on container_e12_1626247520347_2269_01_000002 @ uat-hadoopuat-dc01-025187.vm.dc01.tech (dataPort=9916).\njava.lang.Exception: [2021-08-05 10:51:00.641]Container killed on request. Exit code is 137\n[2021-08-05 10:51:00.641]Container exited with a non-zero exit code 137. \n[2021-08-05 10:51:00.643]Killed by external signal\n\n\tat org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.onWorkerTerminated(ActiveResourceManager.java:219)\n\tat org.apache.flink.yarn.YarnResourceManagerDriver$YarnContainerEventHandler.lambda$onContainersCompleted$0(YarnResourceManagerDriver.java:522)\n\tat org.apache.flink.yarn.YarnResourceManagerDriver$YarnContainerEventHandler.lambda$runAsyncWithFatalHandler$2(YarnResourceManagerDriver.java:549)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:404)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:197)\n\tat org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)\n\tat scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)\n\tat akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)\n\tat akka.actor.Actor$class.aroundReceive(Actor.scala:517)\n\tat akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)\n\tat akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)\n\tat akka.actor.ActorCell.invoke(ActorCell.scala:561)\n\tat akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)\n\tat akka.dispatch.Mailbox.run(Mailbox.scala:225)\n\tat akka.dispatch.Mailbox.exec(Mailbox.scala:235)\n\tat akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)\n",
"tags": {
"host_ip": "10.69.1.10",
"method_name": "transitionState",
"level": "WARN",
"file_name": "Execution.java",
"line_number": "1608",
"thread_name": "flink-akka.actor.default-dispatcher-3",
"container_type": "jobmanager",
"logger_name": "org.apache.flink.runtime.executiongraph.ExecutionGraph",
"class_name": "org.apache.flink.runtime.executiongraph.Execution",
"app_id": "application_1626247520347_2269",
"host_name": "FAT-hadoopuat-69110.vm.dc01.tech",
"container_id": "container_e12_1626247520347_2269_01_000001"
}
}

自定义 Log4j Kafka Appender

引入依赖

1     
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
<?xml version="1.0" encoding="UTF-8"?>     
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>FlinkLogKafkaAppender</artifactId>
<groupId>com.zhisheng.flink</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>Log4jKafkaAppender</artifactId>

<properties>
<slf4j.version>1.7.15</slf4j.version>
<flink.shaded.version>9.0</flink.shaded.version>
<jackson.version>2.10.1</jackson.version>
</properties>


<dependencies>
<dependency>
<groupId>com.zhisheng.flink</groupId>
<artifactId>KafkaAppenderCommon</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
<scope>${scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-jackson</artifactId>
<version>${jackson.version}-${flink.shaded.version}</version>
<scope>${scope}</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<id>shade-flink</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<includes>
<include>org.apache.kafka:*</include>
</includes>
</artifactSet>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

KafkaLog4jAppender: 继承 AppenderSkeleton 抽象类

1     
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
public class KafkaLog4jAppender extends AppenderSkeleton {     

private String bootstrapServers;

private String source;

private String topic;

private String level;

private String acks;

private String compressionType;

private String retries;

private String batchSize;

private String lingerMs;

private String maxRequestSize;

private String requestTimeoutMs;

private Producer<String, String> producer;

private String appId;

private String containerId;

private String containerType;

private String taskId;

private String taskName;

@Override
public void activateOptions() {
super.activateOptions();

Properties envProperties = System.getProperties();

String logFile = envProperties.getProperty("log.file");
String[] values = logFile.split(File.separator);
if (values.length >= 3) {
appId = values[values.length - 3];
containerId = values[values.length - 2];
String log = values[values.length - 1];
if (log.contains("jobmanager")) {
containerType = "jobmanager";
} else if (log.contains("taskmanager")) {
containerType = "taskmanager";
} else {
containerType = "others";
}
} else {
log.error("log.file Property ({}) doesn't contains yarn application id or container id", logFile);
}

taskId = envProperties.getProperty("taskId", null);
taskName = envProperties.getProperty("taskName", null);

Properties props = new Properties();
if (this.bootstrapServers != null) {
props.setProperty("bootstrap.servers", this.bootstrapServers);
} else {
throw new ConfigException("The bootstrap servers property must be specified");
}
if (this.topic == null) {
throw new ConfigException("Topic must be specified by the Kafka log4j appender");
}
if (this.source == null) {
throw new ConfigException("Source must be specified by the Kafka log4j appender");
}

String clientIdPrefix = taskId != null ? taskId : appId;

if (clientIdPrefix != null) {
props.setProperty("client.id", clientIdPrefix + "_log");
}

if (this.acks != null) {
props.setProperty("acks", this.acks);
} else {
props.setProperty("acks", "0");
}

if (this.retries != null) {
props.setProperty("retries", this.retries);
} else {
props.setProperty("retries", "0");
}

if (this.batchSize != null) {
props.setProperty("batch.size", this.batchSize);
} else {
props.setProperty("batch.size", "16384");
}

if (this.lingerMs != null) {
props.setProperty("linger.ms", this.lingerMs);
} else {
props.setProperty("linger.ms", "5");
}

if (this.compressionType != null) {
props.setProperty("compression.type", this.compressionType);
} else {
props.setProperty("compression.type", "lz4");
}

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

producer = new KafkaProducer<>(props);
}

@Override
protected void append(LoggingEvent loggingEvent) {
try {
if (level.contains(loggingEvent.getLevel().toString().toUpperCase()) && !loggingEvent.getLoggerName().contains("xxx")) { //控制哪些类的日志不收集
producer.send(new ProducerRecord<>(topic, appId, subAppend(loggingEvent)));
}
} catch (Exception e) {
log.warn("Parsing the log event or send log event to kafka has exception", e);
}
}

private String subAppend(LoggingEvent event) throws JsonProcessingException {
LogEvent logEvent = new LogEvent();
Map<String, String> tags = new HashMap<>();
String logMessage = null;
try {
InetAddress inetAddress = InetAddress.getLocalHost();
tags.put("host_name", inetAddress.getHostName());
tags.put("host_ip", inetAddress.getHostAddress());
} catch (Exception e) {
log.error("Error getting the ip and host name of the node where the job({}) is running", appId, e);
} finally {
try {
logMessage = ExceptionUtil.stacktraceToString(event.getThrowableInformation().getThrowable());
logEvent.setContent(logMessage);
} catch (Exception e) {
if (logMessage != null) {
logMessage = logMessage + "\n\t" + e.getMessage();
}
logEvent.setContent(logMessage);
} finally {
logEvent.setId(UUID.randomUUID().toString());
logEvent.setTimestamp(event.getTimeStamp());
logEvent.setSource(source);
if (logMessage != null) {
logMessage = event.getMessage().toString() + "\n" + logMessage;
} else {
logMessage = event.getMessage().toString();
}
logEvent.setContent(logMessage);
LocationInfo locationInformation = event.getLocationInformation();
tags.put("class_name", locationInformation.getClassName());
tags.put("method_name", locationInformation.getMethodName());
tags.put("file_name", locationInformation.getFileName());
tags.put("line_number", locationInformation.getLineNumber());
tags.put("logger_name", event.getLoggerName());
tags.put("level", event.getLevel().toString());
tags.put("thread_name", event.getThreadName());
tags.put("app_id", appId);
tags.put("container_id", containerId);
tags.put("container_type", containerType);
if (taskName != null) {
tags.put("task_name", taskName);
}
if (taskId != null) {
tags.put("task_id", taskId);
}
logEvent.setTags(tags);
}
}
return JacksonUtil.toJson(logEvent);
}

@Override
public void close() {
if (!this.closed) {
this.closed = true;
this.producer.close();
}
}

@Override
public boolean requiresLayout() {
return false;
}

}

将上面打出来的 jar,放到 flink 的 lib 目录下面,修改 log4j.properties 配置文件:

1     
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# This affects logging for both user code and Flink     
log4j.rootLogger=INFO, RFA, kafka

# Uncomment this if you want to _only_ change Flink's logging
#log4j.logger.org.apache.flink=INFO

# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
log4j.logger.akka=INFO
log4j.logger.org.apache.kafka=INFO
log4j.logger.org.apache.hadoop=INFO
log4j.logger.org.apache.zookeeper=INFO

log4j.appender.RFA=org.apache.log4j.RollingFileAppender
log4j.appender.RFA.File=${log.file}
log4j.appender.RFA.MaxFileSize=256MB
log4j.appender.RFA.Append=true
log4j.appender.RFA.MaxBackupIndex=10
log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
log4j.appender.RFA.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %t %-5p %-60c %x - %m%n

log4j.logger.org.apache.kafka.clients.Metadata=WARN,kafka# 解决死锁问题
log4j.appender.kafka=com.zhisheng.log.appender.KafkaLog4jAppender
log4j.appender.kafka.source=flink-1.10.0
log4j.appender.kafka.bootstrapServers=http://kafka1.com.cn:9092,http://kafka2.com.cn:9092,http://kafka3.com.cn:9092
log4j.appender.kafka.topic=yarn_flink_log
log4j.appender.kafka.level=ERROR,WARN

# Suppress the irrelevant (wrong) warnings from the Netty channel handler
log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, RFA

注意上面的一个解决死锁问题的配置,我们在生产也遇到过,打出来的 jstack 的信息为

1     
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
2021-08-20 01:19:24     
Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.201-b09 mixed mode):

"Attach Listener" #14 daemon prio=9 os_prio=0 tid=0x00007f061c001800 nid=0x728 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE

Locked ownable synchronizers:
- None

"kafka-producer-network-thread | application_1610962534438_1256_log" #11 daemon prio=5 os_prio=0 tid=0x00007f06394ad800 nid=0x27e waiting for monitor entry [0x00007f060be16000]
java.lang.Thread.State: BLOCKED (on object monitor)
at org.apache.log4j.Category.callAppenders(Category.java:205)
- waiting to lock <0x00000000ffad7510> (a org.apache.log4j.spi.RootLogger)
at org.apache.log4j.Category.forcedLog(Category.java:391)
at org.apache.log4j.Category.log(Category.java:856)
at org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:324)
at org.apache.kafka.clients.Metadata.update(Metadata.java:379)
- locked <0x00000000ff1421d8> (a org.apache.kafka.clients.Metadata)
at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleCompletedMetadataResponse(NetworkClient.java:1039)
at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:822)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:544)
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:312)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:235)
at java.lang.Thread.run(Thread.java:748)

Locked ownable synchronizers:
- None

"Service Thread" #9 daemon prio=9 os_prio=0 tid=0x00007f063865c800 nid=0x27c runnable [0x0000000000000000]
java.lang.Thread.State: RUNNABLE

Locked ownable synchronizers:
- None

"C1 CompilerThread2" #8 daemon prio=9 os_prio=0 tid=0x00007f063862f000 nid=0x27b waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE

Locked ownable synchronizers:
- None

"C2 CompilerThread1" #7 daemon prio=9 os_prio=0 tid=0x00007f063862d000 nid=0x27a waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE

Locked ownable synchronizers:
- None

"C2 CompilerThread0" #6 daemon prio=9 os_prio=0 tid=0x00007f063862a800 nid=0x279 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE

Locked ownable synchronizers:
- None

"Signal Dispatcher" #5 daemon prio=9 os_prio=0 tid=0x00007f0638628800 nid=0x278 runnable [0x0000000000000000]
java.lang.Thread.State: RUNNABLE

Locked ownable synchronizers:
- None

"Surrogate Locker Thread (Concurrent GC)" #4 daemon prio=9 os_prio=0 tid=0x00007f0638626800 nid=0x277 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE

Locked ownable synchronizers:
- None

"Finalizer" #3 daemon prio=8 os_prio=0 tid=0x00007f06385f3800 nid=0x276 in Object.wait() [0x00007f0617c07000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000000fff08ed0> (a java.lang.ref.ReferenceQueue$Lock)
at j ava.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:144)
- locked <0x00000000fff08ed0> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:165)
at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:216)

Locked ownable synchronizers:
- None

"Reference Handler" #2 daemon prio=10 os_prio=0 tid=0x00007f06385f1000 nid=0x275 in Object.wait() [0x00007f0617d08000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000000fff06bf8> (a java.lang.ref.Reference$Lock)
at java.lang.Object.wait(Object.java:502)
at java.lang.ref.Reference.tryHandlePending(Reference.java:191)
- locked <0x00000000fff06bf8> (a java.lang.ref.Reference$Lock)
at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153)

Locked ownable synchronizers:
- None

"main" #1 prio=5 os_prio=0 tid=0x00007f0638011800 nid=0x253 waiting for monitor entry [0x00007f063e7b7000]
java.lang.Thread.State: BLOCKED (on object monitor)
at org.apache.kafka.clients.Metadata.fetch(Metadata.java:129)
- waiting to lock <0x00000000ff1421d8> (a org.apache.kafka.clients.Metadata)
at org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:960)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:866)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:846)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:733)
at com.zhisheng.log.appender.KafkaLog4jAppender.append(KafkaLog4jAppender.java:147)
at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
- locked <0x00000000ff542900> (a com.zhisheng.log.appender.KafkaLog4jAppender)
at org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
at org.apache.log4j.Category.callAppenders(Category.java:206)
- locked <0x00000000ffad7510> (a org.apache.log4j.spi.RootLogger)
at org.apache.log4j.Category.forcedLog(Category.java:391)
at org.apache.log4j.Category.log(Category.java:856)
at org.slf4j.impl.Log4jLoggerAdapter.warn(Log4jLoggerAdapter.java:401)
at org.apache.hadoop.util.NativeCodeLoader.<clinit>(NativeCodeLoader.java:62)
at org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback.<init>(JniBasedUnixGroupsMappingWithFallback.java:38)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
at org.apache.hadoop.security.Groups.<init>(Groups.java:106)
at org.apache.hadoop.security.Groups.<init>(Groups.java:101)
at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:449)
- locked <0x00000000fec67710> (a java.lang.Class for org.apache.hadoop.security.Groups)
at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:327)
- locked <0x00000000fec436d0> (a java.lang.Class for org.apache.hadoop.security.UserGroupInformation)
at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:294)
- locked <0x00000000fec436d0> (a java.lang.Class for org.apache.hadoop.security.UserGroupInformation)
at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:854)
- locked <0x00000000fec436d0> (a java.lang.Class for org.apache.hadoop.security.UserGroupInformation)
at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:824)
- locked <0x00000000fec436d0> (a java.lang.Class for org.apache.hadoop.security.UserGroupInformation)
at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:693)
- locked <0x00000000fec436d0> (a java.lang.Class for org.apache.hadoop.security.UserGroupInformation)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.util.EnvironmentInformation.getHadoopUser(EnvironmentInformation.java:96)
at org.apache.flink.runtime.util.EnvironmentInformation.logEnvironmentInfo(EnvironmentInformation.java:293)
at org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:96)

Locked ownable synchronizers:
- None

"VM Thread" os_prio=0 tid=0x00007f06385e7000 nid=0x274 runnable

"Gang worker#0 (Parallel GC Threads)" os_prio=0 tid=0x00007f0638029000 nid=0x256 runnable

"Gang worker#1 (Parallel GC Threads)" os_prio=0 tid=0x00007f063802a800 nid=0x257 runnable

"Gang worker#2 (Parallel GC Threads)" os_prio=0 tid=0x00007f063802c800 nid=0x258 runnable

"Gang worker#3 (Parallel GC Threads)" os_prio=0 tid=0x00007f063802e800 nid=0x259 runnable

"Gang worker#4 (Parallel GC Threads)" os_prio=0 tid=0x00007f0638030000 nid=0x25a runnable

"Gang worker#5 (Parallel GC Threads)" os_prio=0 tid=0x00007f0638032000 nid=0x25b runnable

"Gang worker#6 (Parallel GC Threads)" os_prio=0 tid=0x00007f0638034000 nid=0x25c runnable

"Gang worker#7 (Parallel GC Threads)" os_prio=0 tid=0x00007f0638035800 nid=0x25d runnable

"G1 Main Concurrent Mark GC Thread" os_prio=0 tid=0x00007f063805e000 nid=0x26b runnable

"Gang worker#0 (G1 Parallel Marking Threads)" os_prio=0 tid=0x00007f0638060000 nid=0x26f runnable

"Gang worker#1 (G1 Parallel Marking Threads)" os_prio=0 tid=0x00007f0638061800 nid=0x270 runnable

"Gang worker#2 (G1 Parallel Marking Threads)" os_prio=0 tid=0x00007f0638063800 nid=0x272 runnable

"Gang worker#3 (G1 Parallel Marking Threads)" os_prio=0 tid=0x00007f0638065800 nid=0x273 runnable

"G1 Concurrent Refinement Thread#0" os_prio=0 tid=0x00007f0638047800 nid=0x26a runnable

"G1 Concurrent Refinement Thread#1" os_prio=0 tid=0x00007f0638045800 nid=0x265 runnable

"G1 Concurrent Refinement Thread#2" os_prio=0 tid=0x00007f0638043800 nid=0x264 runnable

"G1 Concurrent Refinement Thread#3" os_prio=0 tid=0x00007f0638042000 nid=0x263 runnable

"G1 Concurrent Refinement Thread#4" os_prio=0 tid=0x00007f0638040000 nid=0x262 runnable

"G1 Concurrent Refinement Thread#5" os_prio=0 tid=0x00007f063803e000 nid=0x261 runnable

"G1 Concurrent Refinement Thread#6" os_prio=0 tid=0x00007f063803c000 nid=0x260 runnable

"G1 Concurrent Refinement Thread#7" os_prio=0 tid=0x00007f063803a800 nid=0x25f runnable

"G1 Concurrent Refinement Thread#8" os_prio=0 tid=0x00007f0638038800 nid=0x25e runnable

"VM Periodic Task Thread" os_prio=0 tid=0x00007f0638660000 nid=0x27d waiting on condition

JNI global references: 652


Found one Java-level deadlock:
=============================
"kafka-producer-network-thread | application_1610962534438_1256_log":
waiting to lock monitor 0x00007f0624005f18 (object 0x00000000ffad7510, a org.apache.log4j.spi.RootLogger),
which is held by "main"
"main":
waiting to lock monitor 0x00007f06240047b8 (object 0x00000000ff1421d8, a org.apache.kafka.clients.Metadata),
which is held by "kafka-producer-network-thread | application_1610962534438_1256_log"

Java stack information for the threads listed above:
===================================================
"kafka-producer-network-thread | application_1610962534438_1256_log":
at org.apache.log4j.Category.callAppenders(Category.java:205)
- waiting to lock <0x00000000ffad7510> (a org.apache.log4j.spi.RootLogger)
at org.apache.log4j.Category.forcedLog(Category.java:391)
at org.apache.log4j.Category.log(Category.java:856)
at org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:324)
at org.apache.kafka.clients.Metadata.update(Metadata.java:379)
- locked <0x00000000ff1421d8> (a org.apache.kafka.clients.Metadata)
at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleCompletedMetadataResponse(NetworkClient.java:1039)
at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:822)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:544)
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:312)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:235)
at java.lang.Thread.run(Thread.java:748)
"main":
at org.apache.kafka.clients.Metadata.fetch(Metadata.java:129)
- waiting to lock <0x00000000ff1421d8> (a org.apache.kafka.clients.Metadata)
at org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:960)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:866)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:846)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:733)
at com.zhisheng.log.appender.KafkaLog4jAppender.append(KafkaLog4jAppender.java:147)
at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
- locked <0x00000000ff542900> (a com.zhisheng.log.appender.KafkaLog4jAppender)
at org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
at org.apache.log4j.Category.callAppenders(Category.java:206)
- locked <0x00000000ffad7510> (a org.apache.log4j.spi.RootLogger)
at org.apache.log4j.Category.forcedLog(Category.java:391)
at org.apache.log4j.Category.log(Category.java:856)
at org.slf4j.impl.Log4jLoggerAdapter.warn(Log4jLoggerAdapter.java:401)
at org.apache.hadoop.util.NativeCodeLoader.<clinit>(NativeCodeLoader.java:62)
at org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback.<init>(JniBasedUnixGroupsMappingWithFallback.java:38)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
at org.apache.hadoop.security.Groups.<init>(Groups.java:106)
at org.apache.hadoop.security.Groups.<init>(Groups.java:101)
at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:449)
- locked <0x00000000fec67710> (a java.lang.Class for org.apache.hadoop.security.Groups)
at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:327)
- locked <0x00000000fec436d0> (a java.lang.Class for org.apache.hadoop.security.UserGroupInformation)
at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:294)
- locked <0x00000000fec436d0> (a java.lang.Class for org.apache.hadoop.security.UserGroupInformation)
at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:854)
- locked <0x00000000fec436d0> (a java.lang.Class for org.apache.hadoop.security.UserGroupInformation)
at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:824)
- locked <0x00000000fec436d0> (a java.lang.Class for org.apache.hadoop.security.UserGroupInformation)
at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:693)
- locked <0x00000000fec436d0> (a java.lang.Class for org.apache.hadoop.security.UserGroupInformation)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.util.EnvironmentInformation.getHadoopUser(EnvironmentInformation.java:96)
at org.apache.flink.runtime.util.EnvironmentInformation.logEnvironmentInfo(EnvironmentInformation.java:293)
at org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:96)

Found 1 deadlock.

发到 Kafka 数据的结构:

1     
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
{     
"source": "flink-1.10.0",
"id": "fc527ec2-0a95-4fe8-83d2-d3e889c03658",
"timestamp": 1627886187629,
"content": "Error registering AppInfo mbean\njavax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=application_1626247520347_2075\n\tat com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)\n\tat com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)\n\tat com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)\n\tat com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)\n\tat com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)\n\tat com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)\n\tat org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64)\n\tat org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:426)\n\tat org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)\n\tat org.apache.flink.metrics.kafka.KafkaReporter.open(KafkaReporter.java:107)\n\tat org.apache.flink.runtime.metrics.ReporterSetup.createReporterSetup(ReporterSetup.java:130)\n\tat org.apache.flink.runtime.metrics.ReporterSetup.lambda$setupReporters$1(ReporterSetup.java:239)\n\tat java.util.Optional.ifPresent(Optional.java:159)\n\tat org.apache.flink.runtime.metrics.ReporterSetup.setupReporters(ReporterSetup.java:236)\n\tat org.apache.flink.runtime.metrics.ReporterSetup.fromConfiguration(ReporterSetup.java:148)\n\tat org.apache.flink.runtime.taskexecutor.TaskManagerRunner.<init>(TaskManagerRunner.java:160)\n\tat org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManager(TaskManagerRunner.java:338)\n\tat org.apache.flink.runtime.taskexecutor.TaskManagerRunner.lambda$runTaskManagerSecurely$3(TaskManagerRunner.java:362)\n\tat java.security.AccessController.doPrivileged(Native Method)\n\tat javax.security.auth.Subject.doAs(Subject.java:422)\n\tat org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)\n\tat org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)\n\tat org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerSecurely(TaskManagerRunner.java:361)\n\tat org.apache.flink.yarn.YarnTaskExecutorRunner.runTaskManagerSecurely(YarnTaskExecutorRunner.java:90)\n\tat org.apache.flink.yarn.YarnTaskExecutorRunner.main(YarnTaskExecutorRunner.java:70)\n",
"tags": {
"host_ip": "10.xx.xx.17",
"method_name": "checkState",
"level": "ERROR",
"file_name": "ConnectionState.java",
"line_number": "288",
"thread_name": "main-EventThread",
"container_type": "taskmanager",
"logger_name": "org.apache.flink.shaded.curator.org.apache.curator.ConnectionState",
"class_name": "org.apache.flink.shaded.curator.org.apache.curator.ConnectionState",
"app_id": "application_1626247520347_1831",
"host_name": "FAT-hadoopuat-69117.vm.dc01.tech",
"container_id": "container_e12_1626247520347_1831_01_000002"
}
}

Flink SQL 存储日志到 ES

通过 Flink SQL 将异常日志数据存储到 ES,按天索引:

1     
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
CREATE TABLE yarn_flink_warn_logs (     
`source` STRING,
`id` STRING,
`timestamp` BIGINT,
`content` STRING,
`tags` ROW<host_ip STRING, method_name STRING, `level` STRING, `file_name` STRING, line_number STRING, thread_name STRING, container_type STRING, logger_name STRING, class_name STRING, app_id STRING, `host_name` STRING, container_id STRING>
) WITH (
'connector' = 'kafka',
'topic' = 'yarn_flink_log',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = 'fat-kafka1.com.cn:9092,fat-kafka2.com.cn:9092,fat-kafka3.com.cn:9092',
'properties.group.id' = 'yarn_flink_warn_logs',
'scan.topic-partition-discovery.interval' = '10000 ms',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true',
'source.parallelism' = '1'
);


CREATE TABLE yarn_flink_warn_logs_es (
`source` STRING,
`id` STRING,
`logTime` BIGINT,
`log_time` TIMESTAMP(3),
`content` STRING,
`tags` ROW<host_ip STRING, method_name STRING, `level` STRING, `file_name` STRING, line_number STRING, thread_name STRING, container_type STRING, logger_name STRING, class_name STRING, app_id STRING, `host_name` STRING, container_id STRING>
) WITH (
'connector' = 'elasticsearch-universal',
'hosts' = 'http://fat-search-es.cn:9200',
'index' = 'yarn_flink_warn_logs-{log_time|yyyy.MM.dd}',
'username' = 'test-admin',
'password' = 'test-admin',
'sink.parallelism' = '2',
'failure-handler' = 'ignore',
'sink.bulk-flush.max-actions' = '1000',
'sink.bulk-flush.max-size' = '5MB',
'sink.bulk-flush.interval' = '10'
);


insert into yarn_flink_warn_logs_es select `source`, `id`, `timestamp` as logTime, TO_TIMESTAMP(FROM_UNIXTIME(`timestamp` / 1000,'yyyy-MM-dd HH:mm:ss')) AS log_time, content, tags from yarn_flink_warn_logs;

单个作业日志查看,可以通过 application_id/container_type/container_id 过滤,当然你可以看到上面自定义里面我们还加入了 taskId 和 taskName 的维度数据,这两个是我们实时计算平台的维度数据,也可以根据这两个进行过滤。

单条日志查看:

实时监控集群作业异常日志数据量

方便知道任务的异常日志情况,有的任务如果任务突然报出很多异常日志说明有抖动或者报错,对任务负责人和集群负责人都可以做一个提醒通知,起到预警作用。

1     
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
set table.exec.emit.early-fire.enabled=true;     
set table.exec.emit.early-fire.delay=60000ms;
set table.exec.state.ttl=90000000ms;

CREATE TABLE yarn_flink_warn_logs (
`source` STRING,
`timestamp` BIGINT,
`tags` ROW<app_id STRING>,
windows_start_time as TO_TIMESTAMP(FROM_UNIXTIME(`timestamp` / 1000,'yyyy-MM-dd HH:mm:ss')),
WATERMARK FOR windows_start_time AS windows_start_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'yarn_flink_log',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = 'logs-kafka1.com.cn:9092,logs-kafka2.com.cn:9092,logs-kafka3.com.cn:9092',
'properties.group.id' = 'yarn_flink_warn_logs_monitor_board',
'scan.topic-partition-discovery.interval' = '10000 ms',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true',
'source.parallelism' = '6'
);

CREATE TABLE printSink (
`source` STRING,
app_id STRING,
logCount BIGINT,
`window_start_time` TIMESTAMP(3),
PRIMARY KEY (app_id) NOT ENFORCED
) WITH (
'connector' = 'print',
'sink.parallelism' = '1'
);

INSERT INTO
printSink
SELECT
`source`,
`tags`.app_id as app_id ,
count(`tags`.app_id) as logTotalCount,
TUMBLE_START(windows_start_time, INTERVAL '1' day) as window_start_time
FROM
yarn_flink_warn_logs
GROUP BY TUMBLE(windows_start_time, INTERVAL '1' day), `tags`.app_id, `source`;

总结

本文从一个小需求(任务异常或者失败时排查问题不便)出发,开始调研业界常见的解决方案进行解决我们的需求,接着有对应的代码实现和效果展示。

相关 [yarn k8s 集群] 推荐:

如何收集 Yarn/K8s 集群中的 Flink 任务日志?

- - zhisheng的博客
从一个小需求(任务异常或者失败时排查问题不便)出发,开始调研业界常见的解决方案进行解决我们的需求,接着有对应的代码实现和效果展示. 熟悉 Flink On Yarn 的应该知道 Flink 任务运行结束/失败后,只能去 Yarn UI 看到任务的 Jobmanager 日志,对于 TaskManager 日志这些是看不到的,这对于有时候想排查下任务失败的原因日志会比较困难(不过大多数任务挂掉的原因日志都会在 Jobmanager 存在).

k8s docker集群搭建 - CSDN博客

- -
一、Kubernetes系列之介绍篇.     - 一次构建,到处运行. 2.什么是kubernetes.   首先,他是一个全新的基于容器技术的分布式架构领先方案. Kubernetes(k8s)是Google开源的容器集群管理系统(谷歌内部:Borg). 在Docker技术的基础上,为容器化的应用提供部署运行、资源调度、服务发现和动态伸缩等一系列完整功能,提高了大规模容器集群管理的便捷性.

构建生产就绪的K8S集群的16点清单

- - SegmentFault 最新的文章
Kubernetes是用于构建高度可扩展系统的强大工具. 结果,许多公司已经开始或正在计划使用它来协调生产服务. 不幸的是,像大多数强大的技术一样,Kubernetes也很复杂. 我们整理了以下清单,以帮助你生产环境最佳实践Kubernetes. Kubernetes提供了一种编排容器化服务的方法,因此,如果您没有按顺序实践你的容器,那么集群一开始就不会处于良好状态.

Kubernetes(K8S)集群部署搭建图文教程(最全)

- -
Kubernetes 集群安装. Harbor采取私有的仓库去镜像使用. 第一步:Router软路由构建. 第二步:centos7安装 5台(自行安装). 复制这段内容后打开百度网盘手机App,操作更方便哦. 1、安装k8s的节点必须是大于1核心的CPU 2、安装节点的网络信息. 3、koolshare 软路由的默认面是koolshare.

基于 Ubuntu 20.04 部署 1.23版K8S 集群

- - 明哥教程
从今天开始,我将开始连载《图解 K8S》入门系列文章. 更多系列文章,可以微信搜索公众号: 写点代码的明哥,回复 k8s 获取全系列教程. 本篇是做为系列的第一篇文章,先来带大家一起搭建一个可用的 K8S 环境. K8S 环境的搭建,是很多想学习 K8S 的人止于在入门之外的第一道门槛,不少人在这一道关上就被直接被劝退了.

记 K8s 集群中 Flannel 遇到的两个问题

- - IT瘾-dev
自建的 K8s 集群的坑不少, 尤其是到了 Node 数量越来越多之后, 问题也逐渐显露了出来, 博客主要介绍我们使用 flannel之后遇到的两个问题以及解决方案, 问题其实不严重, 只是涉及到了底层的结构, 改动时候要小心.. 问题 1 flannel 的 OOM 问题. 下面这张图是官方的配置, 可以看到, 默认的资源设置仅给定了 50M 内存.

从零开始在ubuntu上安装和使用k8s集群及报错解决

- - 行业应用 - ITeye博客
这几天在学习K8S的安装和使用,在此记录一下. 两小时Kubernetes(K8S)从懵圈到熟练——大型分布式集群环境捷径部署搭建_哔哩哔哩 (゜-゜)つロ 干杯~-bilibili. 镜像+讲义+安装包,链接:https://pan.baidu.com/s/1qO697oBuR7TwQ2J8boI3EA 提取码:4mvs.

[译] Cilium 未来数据平面:支撑 100Gbit/s k8s 集群(KubeCon, 2022)

- - ArthurChiao's Blog
本文翻译自 KubeCon+CloudNativeCon North America 2022 的一篇分享: 100 Gbit/s Clusters with Cilium: Building Tomorrow’s Networking Data Plane. 作者 Daniel Borkmann, Nikolay Aleksandrov, Nico Vibert 都来自 Isovalent(Cilium 母公司).

用虚拟机搭建Kubernetes集群_The_shy等风来的博客-CSDN博客_虚拟机搭建k8s集群

- -
一、Docker到底做了什么:. 一个轻量级的,在宿主机(比如你的云服务器centos或ubuntu虚机)基础上建立的一个隔离的主机环境,我们把这个隔离的虚拟主机环境叫容器. 跟传统的虚拟机相比,docker最大的区别就是它复用了外部物理宿主机内核,所以很轻量. docker主要解决了开发与部署时的环境冲突问题以及部署项目的成本问题:.

无需手动输入命令,简单3步即可在K8S集群中启用GPU

- - InfoQ推荐
随着全球各大企业开始广泛采用Kubernetes,我们看到Kubernetes正在向新的阶段发展. 一方面,Kubernetes被边缘的工作负载所采用并提供超越数据中心的价值. 另一方面,Kubernetes正在驱动机器学习(ML)和高质量、高速的数据分析性能的发展. 我们现在所了解到的将Kubernetes应用于机器学习的案例主要源于Kubernetes 1.10中一个的功能,当时图形处理单元(GPUs)成为一个可调度的资源——现在这一功能处于beta版本.