Kafka幂等性原理及实现剖析 - 哥不是小萝莉 - 博客园

标签: | 发表时间:2019-11-24 19:25 | 作者:
出处:https://www.cnblogs.com

1.概述

最近和一些同学交流的时候反馈说,在面试Kafka时,被问到Kafka组件组成部分、API使用、Consumer和Producer原理及作用等问题都能详细作答。但是,问到一个平时不注意的问题,就是Kafka的幂等性,被卡主了。那么,今天笔者就为大家来剖析一下Kafka的幂等性原理及实现。

2.内容

2.1 Kafka为啥需要幂等性?

Producer在生产发送消息时,难免会重复发送消息。Producer进行retry时会产生重试机制,发生消息重复发送。而引入幂等性后,重复发送只会生成一条有效的消息。Kafka作为分布式消息系统,它的使用场景常见与分布式系统中,比如消息推送系统、业务平台系统(如物流平台、银行结算平台等)。以银行结算平台来说,业务方作为上游把数据上报到银行结算平台,如果一份数据被计算、处理多次,那么产生的影响会很严重。

2.2 影响Kafka幂等性的因素有哪些?

在使用Kafka时,需要确保Exactly-Once语义。分布式系统中,一些不可控因素有很多,比如网络、OOM、FullGC等。在Kafka Broker确认Ack时,出现网络异常、FullGC、OOM等问题时导致Ack超时,Producer会进行重复发送。可能出现的情况如下:

 

 

2.3 Kafka的幂等性是如何实现的?

Kafka为了实现幂等性,它在底层设计架构中引入了ProducerID和SequenceNumber。那这两个概念的用途是什么呢?

  • ProducerID:在每个新的Producer初始化时,会被分配一个唯一的ProducerID,这个ProducerID对客户端使用者是不可见的。
  • SequenceNumber:对于每个ProducerID,Producer发送数据的每个Topic和Partition都对应一个从0开始单调递增的SequenceNumber值。

2.3.1 幂等性引入之前的问题?

Kafka在引入幂等性之前,Producer向Broker发送消息,然后Broker将消息追加到消息流中后给Producer返回Ack信号值。实现流程如下:

 

上图的实现流程是一种理想状态下的消息发送情况,但是实际情况中,会出现各种不确定的因素,比如在Producer在发送给Broker的时候出现网络异常。比如以下这种异常情况的出现:

 

上图这种情况,当Producer第一次发送消息给Broker时,Broker将消息(x2,y2)追加到了消息流中,但是在返回Ack信号给Producer时失败了(比如网络异常) 。此时,Producer端触发重试机制,将消息(x2,y2)重新发送给Broker,Broker接收到消息后,再次将该消息追加到消息流中,然后成功返回Ack信号给Producer。这样下来,消息流中就被重复追加了两条相同的(x2,y2)的消息。

2.3.2 幂等性引入之后解决了什么问题?

面对这样的问题,Kafka引入了幂等性。那么幂等性是如何解决这类重复发送消息的问题的呢?下面我们可以先来看看流程图:

 

 同样,这是一种理想状态下的发送流程。实际情况下,会有很多不确定的因素,比如Broker在发送Ack信号给Producer时出现网络异常,导致发送失败。异常情况如下图所示:

 

 当Producer发送消息(x2,y2)给Broker时,Broker接收到消息并将其追加到消息流中。此时,Broker返回Ack信号给Producer时,发生异常导致Producer接收Ack信号失败。对于Producer来说,会触发重试机制,将消息(x2,y2)再次发送,但是,由于引入了幂等性,在每条消息中附带了PID(ProducerID)和SequenceNumber。相同的PID和SequenceNumber发送给Broker,而之前Broker缓存过之前发送的相同的消息,那么在消息流中的消息就只有一条(x2,y2),不会出现重复发送的情况。

2.3.3 ProducerID是如何生成的?

客户端在生成Producer时,会实例化如下代码:

//实例化一个Producer对象Producer<String, String> producer =newKafkaProducer<>(props);

在org.apache.kafka.clients.producer.internals.Sender类中,在run()中有一个maybeWaitForPid()方法,用来生成一个ProducerID,实现代码如下:

privatevoidmaybeWaitForPid() {if(transactionState ==null)return;while(!transactionState.hasPid()) {try{
                Node node=awaitLeastLoadedNodeReady(requestTimeout);if(node !=null) {
                    ClientResponse response=sendAndAwaitInitPidRequest(node);if(response.hasResponse() && (response.responseBody()instanceofInitPidResponse)) {
                        InitPidResponse initPidResponse=(InitPidResponse) response.responseBody();
                        transactionState.setPidAndEpoch(initPidResponse.producerId(), initPidResponse.epoch());
                    }else{
                        log.error("Received an unexpected response type for an InitPidRequest from {}. " +
                                "We will back off and try again.", node);
                    }
                }else{
                    log.debug("Could not find an available broker to send InitPidRequest to. " +
                            "We will back off and try again.");
                }
            }catch(Exception e) {
                log.warn("Received an exception while trying to get a pid. Will back off and retry.", e);
            }
            log.trace("Retry InitPidRequest in {}ms.", retryBackoffMs);
            time.sleep(retryBackoffMs);
            metadata.requestUpdate();
        }
    }

3.事务

与幂等性有关的另外一个特性就是事务。Kafka中的事务与数据库的事务类似,Kafka中的事务属性是指一系列的Producer生产消息和消费消息提交Offsets的操作在一个事务中,即原子性操作。对应的结果是同时成功或者同时失败。

这里需要与数据库中事务进行区别,操作数据库中的事务指一系列的增删查改,对Kafka来说,操作事务是指一系列的生产和消费等原子性操作。

3.1 Kafka引入事务的用途?

在事务属性引入之前,先引入Producer的幂等性,它的作用为:

  • Producer多次发送消息可以封装成一个原子性操作,即同时成功,或者同时失败;
  • 消费者&生产者模式下,因为Consumer在Commit Offsets出现问题时,导致重复消费消息时,Producer重复生产消息。需要将这个模式下Consumer的Commit Offsets操作和Producer一系列生产消息的操作封装成一个原子性操作。

产生的场景有:

比如,在Consumer中Commit Offsets时,当Consumer在消费完成时Commit的Offsets为100(假设最近一次Commit的Offsets为50),那么执行触发Balance时,其他Consumer就会重复消费消息(消费的Offsets介于50~100之间的消息)。

3.2 事务提供了哪些可使用的API?

Producer提供了五种事务方法,它们分别是:initTransactions()、beginTransaction()、sendOffsetsToTransaction()、commitTransaction()、abortTransaction(),代码定义在org.apache.kafka.clients.producer.Producer<K,V>接口中,具体定义接口如下:

//初始化事务,需要注意确保transation.id属性被分配voidinitTransactions();//开启事务voidbeginTransaction()throwsProducerFencedException;//为Consumer提供的在事务内Commit Offsets的操作voidsendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata>offsets,
                              String consumerGroupId)throwsProducerFencedException;//提交事务voidcommitTransaction()throwsProducerFencedException;//放弃事务,类似于回滚事务的操作voidabortTransaction()throwsProducerFencedException;

3.3 事务的实际应用场景有哪些?

在Kafka事务中,一个原子性操作,根据操作类型可以分为3种情况。情况如下:

  • 只有Producer生产消息,这种场景需要事务的介入;
  • 消费消息和生产消息并存,比如Consumer&Producer模式,这种场景是一般Kafka项目中比较常见的模式,需要事务介入;
  • 只有Consumer消费消息,这种操作在实际项目中意义不大,和手动Commit Offsets的结果一样,而且这种场景不是事务的引入目的。

4.总结

Kafka的幂等性和事务是比较重要的特性,特别是在数据丢失和数据重复的问题上非常重要。Kafka引入幂等性,设计的原理也比较好理解。而事务与数据库的事务特性类似,有数据库使用的经验对理解Kafka的事务也比较容易接受。

5.结束语

这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

另外,博主出书了《 Kafka并不难学》和《 Hadoop大数据挖掘从入门到进阶实战》,喜欢的朋友或同学, 可以在公告栏那里点击购买链接购买博主的书进行学习,在此感谢大家的支持。关注下面公众号,根据提示,可免费获取书籍的教学视频。 

相关 [kafka 幂等 原理] 推荐:

Kafka幂等性原理及实现剖析 - 哥不是小萝莉 - 博客园

- -
最近和一些同学交流的时候反馈说,在面试Kafka时,被问到Kafka组件组成部分、API使用、Consumer和Producer原理及作用等问题都能详细作答. 但是,问到一个平时不注意的问题,就是Kafka的幂等性,被卡主了. 那么,今天笔者就为大家来剖析一下Kafka的幂等性原理及实现. 2.1 Kafka为啥需要幂等性.

Kafka笔记—可靠性、幂等性和事务 - luozhiyun - 博客园

- -
这几天很忙,但是我现在给我的要求是一周至少要出一篇文章,所以先拿这篇笔记来做开胃菜,源码分析估计明后两天应该能写一篇. Kafka只对“已提交”的消息(committed message)做有限度的持久化保证. 当Kafka的若干个Broker成功地接收到一条消息并写入到日志文件后,它们会告诉生产者程序这条消息已成功提交.

Apache kafka原理与特性(转)

- - 互联网 - ITeye博客
 转自:http://shift-alt-ctrl.iteye.com/blog/1930345. 前言: kafka是一个轻量级的/分布式的/具备replication能力的日志采集组件,通常被集成到应用系统中,收集"用户行为日志"等,并可以使用各种消费终端(consumer)将消息转存到HDFS等其他结构化数据存储系统中.因为日志消息通常为文本数据,尺寸较小,且对实时性以及数据可靠性要求不严格,但是需要日志存储端具备较高的数据吞吐能力,这种"宽松"的设计要求,非常适合使用kafka. .

Kafka 设计与原理详解

- - IT瘾-geek
本文综合了我之前写的kafka相关文章,可作为一个全面了解学习kafka的培训学习资料. 转载请注明出处 : 本文链接. 当今社会各种应用系统诸如商业、社交、搜索、浏览等像信息工厂一样不断的生产出各种信息,在大数据时代,我们面临如下几个挑战:. 以上几个挑战形成了一个业务需求模型,即生产者生产(produce)各种信息,消费者消费(consume)(处理分析)这些信息,而在生产者与消费者之间,需要一个沟通两者的桥梁-消息系统.

kafka监控之kafka-run-class.sh

- - 开源软件 - ITeye博客
kafka自带了很多工具类,在源码kafka.tools里可以看到:. 这些类该如何使用呢,kafka的设计者早就为我们考虑到了,在${KAFKA_HOME}/bin下,有很多的脚本,其中有一个kafka-run-class.sh,通过这个脚本,可以调用其中的tools的部分功能,如调用kafka.tools里的ConsumerOffsetChecker.scala,.

闲扯kafka mq

- - 开源软件 - ITeye博客
本文主要讲解关于kafka mq的设计思想及个人理解. 关于kafka的详细信息,大家可以参考官网的文献 http://kafka.apache.org/documentation.html这是一篇相当不错的文章,值得仔细研读. 第一个问题:消息队列(Message Queue)是干嘛用的. 首先,要对消息队列有一个基本的理解.

Kafka优化

- - ITeye博客
配置优化都是修改server.properties文件中参数值. 1.网络和io操作线程配置优化. # broker处理消息的最大线程数. # broker处理磁盘IO的线程数. 一般num.network.threads主要处理网络io,读写缓冲区数据,基本没有io等待,配置线程数量为cpu核数加1.

Kafka Connect简介

- - 鸟窝
Kafka 0.9+增加了一个新的特性 Kafka Connect,可以更方便的创建和管理数据流管道. 它为Kafka和其它系统创建规模可扩展的、可信赖的流数据提供了一个简单的模型,通过 connectors可以将大数据从其它系统导入到Kafka中,也可以从Kafka中导出到其它系统. Kafka Connect可以将完整的数据库注入到Kafka的Topic中,或者将服务器的系统监控指标注入到Kafka,然后像正常的Kafka流处理机制一样进行数据流处理.

kafka consumer group offset

- - 开源软件 - ITeye博客
     kafka0.9及以前版本kafka offset 保存在zookeeper, 因频繁读写zookeeper性能不高;从0.10开始,主题分区offset存储于kafka独立主题中.     管理监控kafka主题及分区offset至关重要,原网上很开源流行工具KafkaOffsetMonitor、kafka-manager,旧版offset保存于zookeeper,kafka consumer无相应API,从kafka0.10.1.1以后提供相应API读取主题分区offset(也可以调用KafkaClient API,kafka管理API由scala语言编写).

Kafka跨集群迁移方案MirrorMaker原理、使用以及性能调优实践 - CSDN博客

- -
Kakfa MirrorMaker是Kafka 官方提供的跨数据中心的流数据同步方案. 其实现原理,其实就是通过从Source Cluster消费消息然后将消息生产到Target Cluster,即普通的消息生产和消费. 用户只要通过简单的consumer配置和producer配置,然后启动Mirror,就可以实现准实时的数据同步.