传说中 Kafka 的 0 消息丢失配置是怎么回事
这是我参与11月更文挑战的第16天,活动详情查看: 2021最后一次更文挑战
本文讨论 Kafka 的 0 消息丢失配置的实现。
确立问题边界
之所以要先确立问题的边界,是因为,任何事都不是 100% 绝对的,要达到 0 消息丢失一定需要先确立前提。要保证 Kafka 消息不丢失的前提有两个:
- 消息必须是已经被提交的。已提交的意思是说,消息被生产者发送给 Broker,Broker 成功保存消息并告诉生产者消息已被成功提交。我们可以通过配置参数决定集群中有几个 Broker 成功保存消息后算成功提交。
- 集群中必须有至少一个 Broker 是存活的。这一点容易理解,如果 Kafka 集群整体都不可用了,比如所有的机房都停电了,那一定是不可能保证消息不丢失的。
消息如何丢失
要达到消息不丢失的结果,我们先要知道消息是如何丢失的,可以从生产者、Broker、消费者三方面分析。
生产者
Kafka producer 有一个发送消息的方法是 producer.send(msg)
,这个方法是异步处理消息的,也就是,当这个方法调用成功的时候,只是消息发出去了,而并不代表着发送成功了,或者消息被成功提交了。如果出现了网络不可用、消息本身不合格等原因导致消息根本没有被 Broker 接收,那就相当于消息在生产者端就消失了。
因此,建议在生产者端发送消息的时候,使用 producer.send(msg, callback)
带有回调的方法,这样我们就知道消息发送是成功了还是失败了,消息失败后,可以做针对性的处理。
对于发送失败的情况造成的消息丢失,责任在生产者一方,应该由生产者做相应的处理。
Broker
Broker 端的消息丢失,一般是由 Broker 服务不可用造成的,如果 Broker 都宕机了导致消息丢失,那么 Kafka 不会认为这属于已经提交的消息,因此这在我们讨论问题的边界之外。
消费者
最后一种就是消费者端造成的消息丢失。
消费者在消费消息的过程中,会同时更新消费者位移,也就是「已经消费到哪一条消息了」。这里就存在一个问题,当消费一个消息的时候,是先处理消息,成功后再更新位移,还是先更新位移,再处理消息。
如果先更新位移,在处理消息,当消息处理出现问题,或者更新完位移、消息还未处理,消费者出现宕机等问题的时候,消息就会丢失。
而如果先处理消息再更新位移,虽然可能会出现重复消费同一个消息的问题,但是,我们可以通过消费者处理逻辑实现幂等的方式来解决。
如何配置
根据上面的分析,下面给出大概的配置方案:
- 生产者端:
- 发送消息使用
producer.send(msg, callback)
而非producer.send(msg)
。 - 配置
acks = all
,也就是所有的副本 Broker 都接收到消息,才算「已提交」。 - 给消费者设置足够大的重试次数,避免消息因为网络原因等发送失败。
- 发送消息使用
- Broker 端
- 禁止落后的 Broker 竞选新 Leader,通过配置
unclean.leader.election.enable = false
来实现。 - 提供超过 3 个副本,通过冗余来防止丢失。
- 配置
min.insync.replicas
的值,指定需要保存到多少个副本才算「已提交」,这个值要大于 1。 - 确保副本数量大于
min.insync.replicas
的值,否则只要有一个副本宕机,服务就不可用了,建议比它的值大 1 即可。
- 禁止落后的 Broker 竞选新 Leader,通过配置
- 消费者端
- 配置
enable.auto.commit=false
取消自动提交位移,采用消息处理完手动提交的方式。
- 配置