Kafka 源码解析之 Producer 单 Partition 顺序性实现及配置说明(五) | Matt's Blog

标签: | 发表时间:2020-02-17 19:37 | 作者:
出处:https://matt33.com

今天把 Kafka Producer 最后一部分给讲述一下,Producer 大部分内容都已经在前面几篇文章介绍过了,这里简单做个收尾,但并不是对前面的总结,本文从两块来讲述:RecordAccumulator 类的实现、Kafka Producer 如何保证其顺序性以及 Kafka Producer 的配置说明,每个 Producer 线程都会有一个 RecordAccumulator 对象,它负责缓存要发送 RecordBatch、记录发送的状态并且进行相应的处理,这里会详细讲述 Kafka Producer 如何保证单 Partition 的有序性。最后,简单介绍一下 Producer 的参数配置说明,只有正确地理解 Producer 相关的配置参数,才能更好地使用 Producer,发挥其相应的作用。

RecordAccumulator

这里再看一下 RecordAccumulator 的数据结构,如下图所示,每个 topic-partition 都有一个对应的 deque,deque 中存储的是 RecordBatch,它是发送的基本单位,只有这个 topic-partition 的 RecordBatch 达到大小或时间要求才会触发发送操作(但并不是只有达到这两个条件之一才会被发送,这点要理解清楚)。

RecordAccumulator 模型

再看一下 RecordAccumulator 类的主要方法介绍,如下图所示。

RecordAccumulator 主要方法及其说明

这张图基本上涵盖了 RecordAccumulator 的主要方法,下面会选择其中几个方法详细讲述,会围绕着 Kafka Producer 如何实现单 Partition 顺序性这个主题来讲述。

mutePartition() 与 unmutePartition()

先看下  mutePartition() 与  unmutePartition() 这两个方法,它们是保证有序性关键之一,其主要做用就是将指定的 topic-partition 从 muted 集合中加入或删除,后面会看到它们的作用。

1     
2
3
4
5
6
7
8
9
private final Set<TopicPartition> muted;     

public void mutePartition(TopicPartition tp) {
muted.add(tp);
}

public void unmutePartition(TopicPartition tp) {
muted.remove(tp);
}

这里先说一下这两个方法调用的条件,这样的话,下面在介绍其他方法时才会更容易理解:

  • mutePartition():如果要求保证顺序性,那么这个 tp 对应的 RecordBatch 如果要开始发送,就将这个 tp 加入到  muted 集合中;
  • unmutePartition():如果 tp 对应的 RecordBatch 发送完成,tp 将会从  muted 集合中移除。

也就是说, muted 是用来记录这个 tp 是否有还有未完成的 RecordBatch。

ready()

ready() 是在 Sender 线程中调用的,其作用选择那些可以发送的 node,也就是说,如果这个 tp 对应的 batch 可以发送(达到时间或大小要求),就把 tp 对应的 leader 选出来。

1     
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public ReadyCheckResult ready(Cluster cluster, long nowMs) {     
Set<Node> readyNodes = new HashSet<>();
long nextReadyCheckDelayMs = Long.MAX_VALUE;
Set<String> unknownLeaderTopics = new HashSet<>();

boolean exhausted = this.free.queued() > 0;
for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
TopicPartition part = entry.getKey();
Deque<RecordBatch> deque = entry.getValue();

Node leader = cluster.leaderFor(part);
synchronized (deque) {
if (leader == null && !deque.isEmpty()) {
// This is a partition for which leader is not known, but messages are available to send.
// Note that entries are currently not removed from batches when deque is empty.
unknownLeaderTopics.add(part.topic());
} else if (!readyNodes.contains(leader) && !muted.contains(part)) {//note: part 如果 mute 就不会遍历
RecordBatch batch = deque.peekFirst();
if (batch != null) {
boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;
//note: 是否是在重试
long waitedTimeMs = nowMs - batch.lastAttemptMs;
long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
boolean full = deque.size() > 1 || batch.isFull(); //note: batch 满了
boolean expired = waitedTimeMs >= timeToWaitMs; //note: batch 超时
boolean sendable = full || expired || exhausted || closed || flushInProgress();
if (sendable && !backingOff) {
readyNodes.add(leader);// note: 将可以发送的 leader 添加到集合中
} else {
// Note that this results in a conservative estimate since an un-sendable partition may have
// a leader that will later be found to have sendable data. However, this is good enough
// since we'll just wake up and then sleep again for the remaining time.
nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
}
}
}
}
}

return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}

可以看到这一行  (!readyNodes.contains(leader) && !muted.contains(part)),如果  muted 集合包含这个 tp,那么在遍历时将不会处理它对应的 deque,也就是说,如果一个 tp 加入了  muted 集合中,即使它对应的 RecordBatch 可以发送了,也不会触发引起其对应的 leader 被选择出来。

drain()

drain() 是用来遍历可发送请求的 node,然后再遍历在这个 node 上所有 tp,如果 tp 对应的 deque 有数据,将会被选择出来直到超过一个请求的最大长度( max.request.size)为止,也就说说即使 RecordBatch 没有达到条件,但为了保证每个 request 尽快多地发送数据提高发送效率,这个 RecordBatch 依然会被提前选出来并进行发送。

1     
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
//note: 返回该 node 对应的可以发送的 RecordBatch 的 batches,并从 queue 中移除(最大的大小为maxSize,超过的话,下次再发送)     
public Map<Integer, List<RecordBatch>> drain(Cluster cluster,
Set<Node> nodes,
int maxSize,
long now) {
if (nodes.isEmpty())
return Collections.emptyMap();

Map<Integer, List<RecordBatch>> batches = new HashMap<>();
for (Node node : nodes) {
int size = 0;
List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
List<RecordBatch> ready = new ArrayList<>();
/* to make starvation less likely this loop doesn't start at 0 */
int start = drainIndex = drainIndex % parts.size();
do {
PartitionInfo part = parts.get(drainIndex);
TopicPartition tp = new TopicPartition(part.topic(), part.partition());
// Only proceed if the partition has no in-flight batches.
if (!muted.contains(tp)) {//note: 被 mute 的 tp 依然不会被遍历
Deque<RecordBatch> deque = getDeque(new TopicPartition(part.topic(), part.partition()));
if (deque != null) {
synchronized (deque) {
RecordBatch first = deque.peekFirst();
if (first != null) {
boolean backoff = first.attempts > 0 && first.lastAttemptMs + retryBackoffMs > now;
// Only drain the batch if it is not during backoff period.
if (!backoff) {
if (size + first.sizeInBytes() > maxSize && !ready.isEmpty()) {
// there is a rare case that a single batch size is larger than the request size due
// to compression; in this case we will still eventually send this batch in a single
// request
break;
} else {
RecordBatch batch = deque.pollFirst();
batch.close();
size += batch.sizeInBytes();
ready.add(batch);
batch.drainedMs = now;
}
}
}
}
}
}
this.drainIndex = (this.drainIndex + 1) % parts.size();
} while (start != drainIndex);
batches.put(node.id(), ready);
}
return batches;
}

在遍历 node 的所有 tp 时,可以看到是有条件的 ——  !muted.contains(tp),如果这个 tp 被添加到  muted 集合中,那么它将不会被遍历,也就不会作为 request 一部分被发送出去,这也就保证了 tp 如果还有未完成的 RecordBatch,那么其对应 deque 中其他 RecordBatch 即使达到条件也不会被发送,就保证了 tp 在任何时刻只有一个 RecordBatch 在发送。

顺序性如何保证?

是否保证顺序性,还是在 Sender 线程中实现的, mutePartition() 与  unmutePartition() 也都是在 Sender 中调用的,这里看一下 KafkaProducer 是如何初始化一个 Sender 对象的。

1     
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// from KafkaProducer     
this.sender = new Sender(client,
this.metadata,
his.accumulator,
config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1,
config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
(short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
config.getInt(ProducerConfig.RETRIES_CONFIG),
this.metrics,
Time.SYSTEM,
this.requestTimeoutMs);//NOTE: Sender 实例,发送请求的后台线程

// from Sender
public Sender(KafkaClient client,
Metadata metadata,
RecordAccumulator accumulator,
boolean guaranteeMessageOrder,
int maxRequestSize,
short acks,
int retries,
Metrics metrics,
Time time,
int requestTimeout) {
this.client = client;
this.accumulator = accumulator;
this.metadata = metadata;
this.guaranteeMessageOrder = guaranteeMessageOrder;
this.maxRequestSize = maxRequestSize;
this.running = true; //note: 默认为 true
this.acks = acks;
this.retries = retries;
this.time = time;
this.sensors = new SenderMetrics(metrics);
this.requestTimeout = requestTimeout;
}

对于上述过程可以这样进行解读

this.guaranteeMessageOrder = (config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1)

如果 KafkaProducer 的  max.in.flight.requests.per.connection 设置为1,那么就可以保证其顺序性,否则的话,就不保证顺序性,从下面这段代码也可以看出。

1     
2
3
4
5
6
7
8
9
//from Sender     
//note: max.in.flight.requests.per.connection 设置为1时会保证
if (guaranteeMessageOrder) {
// Mute all the partitions draine
for (List<RecordBatch> batchList : batches.values()) {
for (RecordBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition);
}
}

也就是说,如果要保证单 Partition 的顺序性,需要在 Producer 中配置  max.in.flight.requests.per.connection=1,而其实现机制则是在 RecordAccumulator 中实现的。

Producer Configs

这里是关于 Kafka Producer 一些配置的说明,内容来自官方文档 Producer Configs以及自己的一些个人理解,这里以官方文档保持一致,按其重要性分为三个级别进行讲述(涉及到权限方面的参数,这里先不介绍)。

high importance

参数名 说明 默认值
bootstrap.servers Kafka Broker 的一个列表,不用包含所有的 Broker,它用于初始化连接时,通过这几个 broker 来获取集群的信息,比如: 127.0.0.1:9092,127.0.0.2:9092,127.0.0.3:9092 -
key.serializer 对 key 进行序列化的 class,一般使用 StringSerializer -
value.serializer 对 value 进行序列化的 class,一般使用  StringDeserializer -
acks 用于设置在什么情况一条才被认为已经发送成功了。acks=0:msg 只要被 producer 发送出去就认为已经发送完成了;acks=1:如果 leader 接收到消息并发送 ack (不会等会该 msg 是否同步到其他副本)就认为 msg 发送成功了; acks=all或者-1:leader 接收到 msg 并从所有 isr 接收到 ack 后再向 producer 发送 ack,这样才认为 msg 发送成功了,这是最高级别的可靠性保证。 1
buffer.memory producer 可以使用的最大内存,如果超过这个值,producer 将会 block  max.block.ms 之后抛出异常。 33554432(32MB)
compression.type Producer 数据的压缩格式,可以选择 none、gzip、snappy、lz4 none
retries msg 发送失败后重试的次数,允许重试,如果  max.in.flight.requests.per.connection 设置不为1,可能会导致乱序 0

medium importance

下面的这些参数虽然被描述为 medium,但实际上对 Producer 的吞吐量等影响也同样很大,在实践中跟 high 参数的重要性基本一样。

参数名 说明 默认值
batch.size producer 向 partition 发送数据时,是以 batch 形式的发送数据,当 batch 的大小超过  batch.size 或者时间达到  linger.ms 就会发送 batch,根据经验,设置为1MB 吞吐会更高,太小的话吞吐小,太大的话导致内存浪费进而影响吞吐量 16384(16KB)
linger.ms 在一个 batch 达不到  batch.size 时,这个 batch 最多将会等待  linger.ms 时间,超过这个时间这个 batch 就会被发送,但也会带来相应的延迟,可以根据具体的场景进行设置 0
client.id client 的 id,主要用于追踪 request 的来源 null
connections.max.idle.ms 如果 connection 连续空闲时间超过了这个值,将会被关闭,主要使用 Selector 的  maybeCloseOldestConnection 方法 540000(9min)
max.block.ms 控制  KafkaProducer.send() 和  KafkaProducer.partitionsFor() block 的最大时间,block 的原因是 buffer 满了或者 metadata 不可用导致。 60000
max.request.size 一个请求的最大长度 1048576(1MB)
partitioner.class 获取 topic 分区的 class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes 在读取数据时 TCP receive buffer (SO_RCVBUF)的大小 32768(32KB)
request.timeout.ms 如果 producer 超过这么长时间没有收到 response,将会再次发送请求 30000
timeout.ms 用于配置 leader 等待 isr 返回 ack 的最大时间,如果超过了这个时间,将会返回给 producer 一个错误。 30000

low importance

参数名 说明 默认值
block.on.buffer.full 当 Producer 使用 buffer 达到最大设置时,如果设置为 false,将会 block  max.block.ms 后然后抛出  TimeoutException 异常,如果设置为 true,将会把  max.block.ms 设置为  Long.MAX_VALUE false
interceptor.classes 使用拦截器,实现这个  ProducerInterceptor 接口,可以对 topic 进行简单的处理。 null
max.in.flight.requests.per.connection 对一个 connection,同时发送最大请求数,不为1时,不能保证顺序性。 5
metadata.fetch.timeout.ms 获取 metadata 时的超时时间 60000
metadata.max.age.ms 强制 metadata 定时刷新的间隔 300000(5min)
metric.reporters A list of classes to use as metrics reporters. Implementing the MetricReporter interface,JmxReporter 是默认被添加的。 “”
metrics.num.samples 统计 metrics 时采样的次数 2
metrics.sample.window.ms metrics 采样计算的时间窗口 30000
reconnect.backoff.ms 重新建立建立连接的间隔 50
retry.backoff.ms 发送重试的间隔 100

对于不同的场景,合理配置相应的 Kafka Producer 参数。

至此,Kafka Producer 部分的源码分析已经结束,从下周开始将开始对 Kafka Consumer 部分进行分析。

相关 [kafka 源码 解析] 推荐:

Kafka深度解析

- - zzm
原创文章,转载请务必将下面这段话置于文章开头处. 本文转发自Jason’s Blog,原文链接. http://www.jasongj.com/2015/01/02/Kafka深度解析. Kafka是一种分布式的,基于发布/订阅的消息系统. 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能.

Kafka设计解析(二):Kafka High Availability (上)

- -
Kafka在0.8以前的版本中,并不提供High Availablity机制,一旦一个或多个Broker宕机,则宕机期间其上所有Partition都无法继续提供服务. 若该Broker永远不能再恢复,亦或磁盘故障,则其上数据将丢失. 而Kafka的设计目标之一即是提供数据持久化,同时对于分布式系统来说,尤其当集群规模上升到一定程度后,一台或者多台机器宕机的可能性大大提高,对Failover要求非常高.

Kafka 源码解析之 Producer 单 Partition 顺序性实现及配置说明(五) | Matt's Blog

- -
最后,简单介绍一下 Producer 的参数配置说明,只有正确地理解 Producer 相关的配置参数,才能更好地使用 Producer,发挥其相应的作用. 这里再看一下 RecordAccumulator 的数据结构,如下图所示,每个 topic-partition 都有一个对应的 deque,deque 中存储的是 RecordBatch,它是发送的基本单位,只有这个 topic-partition 的 RecordBatch 达到大小或时间要求才会触发发送操作(但并不是只有达到这两个条件之一才会被发送,这点要理解清楚).

深入解析Kafka高可用设计如何步步为营

- - IT瘾-bigdata
Kafka在0.8以前的版本中,并不提供High Availablity机制,一旦一个或多个Broker宕机,则宕机期间其上所有Partition都无法继续提供服务. 若该Broker永远不能再恢复,亦或磁盘故障,则其上数据将丢失. 而Kafka的设计目标之一即是提供数据持久化,同时对于分布式系统来说,尤其当集群规模上升到一定程度后,一台或者多台机器宕机的可能性大大提高,对于Failover机制的需求非常高.

kafka监控之kafka-run-class.sh

- - 开源软件 - ITeye博客
kafka自带了很多工具类,在源码kafka.tools里可以看到:. 这些类该如何使用呢,kafka的设计者早就为我们考虑到了,在${KAFKA_HOME}/bin下,有很多的脚本,其中有一个kafka-run-class.sh,通过这个脚本,可以调用其中的tools的部分功能,如调用kafka.tools里的ConsumerOffsetChecker.scala,.

闲扯kafka mq

- - 开源软件 - ITeye博客
本文主要讲解关于kafka mq的设计思想及个人理解. 关于kafka的详细信息,大家可以参考官网的文献 http://kafka.apache.org/documentation.html这是一篇相当不错的文章,值得仔细研读. 第一个问题:消息队列(Message Queue)是干嘛用的. 首先,要对消息队列有一个基本的理解.

Kafka优化

- - ITeye博客
配置优化都是修改server.properties文件中参数值. 1.网络和io操作线程配置优化. # broker处理消息的最大线程数. # broker处理磁盘IO的线程数. 一般num.network.threads主要处理网络io,读写缓冲区数据,基本没有io等待,配置线程数量为cpu核数加1.

Kafka Connect简介

- - 鸟窝
Kafka 0.9+增加了一个新的特性 Kafka Connect,可以更方便的创建和管理数据流管道. 它为Kafka和其它系统创建规模可扩展的、可信赖的流数据提供了一个简单的模型,通过 connectors可以将大数据从其它系统导入到Kafka中,也可以从Kafka中导出到其它系统. Kafka Connect可以将完整的数据库注入到Kafka的Topic中,或者将服务器的系统监控指标注入到Kafka,然后像正常的Kafka流处理机制一样进行数据流处理.

kafka consumer group offset

- - 开源软件 - ITeye博客
     kafka0.9及以前版本kafka offset 保存在zookeeper, 因频繁读写zookeeper性能不高;从0.10开始,主题分区offset存储于kafka独立主题中.     管理监控kafka主题及分区offset至关重要,原网上很开源流行工具KafkaOffsetMonitor、kafka-manager,旧版offset保存于zookeeper,kafka consumer无相应API,从kafka0.10.1.1以后提供相应API读取主题分区offset(也可以调用KafkaClient API,kafka管理API由scala语言编写).

GitHub - andreas-schroeder/kafka-health-check: Health Check for Kafka Brokers.

- -
At AutoScout24, to keep the OS up to date of our clusters running on AWS, we perform regular in-place rolling updates. As we run immutable servers, we terminate each broker and replace them with fresh EC2 instances (keeping the previous broker ids).