Kafka的有且仅有一次语义与事务消息

标签: kafka 有且仅有 语义 | 发表时间:2018-07-28 09:00 | 作者:
出处:http://www.dengshenyu.com/

最近看到Kafka官方wiki上有一篇关于有且仅有一次语义与事务消息的文档(见 这里),里面说的非常详细。对于有且仅有一次语义与事务消息是什么东西,大家可以看我的上一篇博客,或者看Kafka的这篇wiki,这里不做展开。这篇文章主要整理关于该语义和事务消息的API接口、数据流和配置。

生产者接口

生产者API的改动

生产者新增了5个新的方法(initTransactions, beginTransaction, sendOffsets, commitTransaction, abortTransaction),并且发送接口也增加了一个新的异常。见下面:

  public interface Producer<K,V> extends Closeable {
   
  /*
   * Needs to be called before any of the other transaction methods. Assumes that
   * the transactional.id is specified in the producer configuration.
   *
   * This method does the following:
   *   1. Ensures any transactions initiated by previous instances of the producer
   *      are completed. If the previous instance had failed with a transaction in
   *      progress, it will be aborted. If the last transaction had begun completion,
   *      but not yet finished, this method awaits its completion.
   *   2. Gets the internal producer id and epoch, used in all future transactional
   *      messages issued by the producer.
   *
   * @throws IllegalStateException if the TransactionalId for the producer is not set
   *         in the configuration.
   */
  void initTransactions() throws IllegalStateException;
   
  /*
   * Should be called before the start of each new transaction.
   *
   * @throws ProducerFencedException if another producer is with the same
   *         transactional.id is active.
   */
  void beginTransaction() throws ProducerFencedException;
   
  /*
   * Sends a list of consumed offsets to the consumer group coordinator, and also marks
   * those offsets as part of the current transaction. These offsets will be considered
   * consumed only if the transaction is committed successfully.
   *
   * This method should be used when you need to batch consumed and produced messages
   * together, typically in a consume-transform-produce pattern.
   *
   * @throws ProducerFencedException if another producer is with the same
   *         transactional.id is active.
   */
  void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
                                String consumerGroupId) throws ProducerFencedException;
   
  /*
   * Commits the ongoing transaction.
   *
   * @throws ProducerFencedException if another producer is with the same
   *         transactional.id is active.
   */
  void commitTransaction() throws ProducerFencedException;
   
  /*
   * Aborts the ongoing transaction.
   *
   * @throws ProducerFencedException if another producer is with the same
   *         transactional.id is active.
 
 
   */
  void abortTransaction() throws ProducerFencedException;
 
 
  /*
   * Send the given record asynchronously and return a future which will eventually contain the response information.
   *
   * @param record The record to send
   * @return A future which will eventually contain the response information
   *
   */
  public Future<RecordMetadata> send(ProducerRecord<K, V> record);
 
  /*
   * Send a record and invoke the given callback when the record has been acknowledged by the server
   **/
  public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
}

OutOfOrderSequence异常

如果broker检测出数据丢失,生产者接口会抛出OutOfOrderSequenceException异常。换句话说,就是broker发现序列号比预期序列号高。异常会在Future中返回,并且如果存在callback的话会把异常传给callback。这是一个严重异常,生产者后续调用send, beginTransaction, commitTransaction等方法都会抛出一个IlegalStateException。

应用示例

以下是一个使用上述API的简单应用:

  public class KafkaTransactionsExample {
  
  public static void main(String args[]) {
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfig);
 
 
    // Note that the ‘transactional.id’ configuration _must_ be specified in the
    // producer config in order to use transactions.
    KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfig);
 
    // We need to initialize transactions once per producer instance. To use transactions,
    // it is assumed that the application id is specified in the config with the key
    // transactional.id.
    //
    // This method will recover or abort transactions initiated by previous instances of a
    // producer with the same app id. Any other transactional messages will report an error
    // if initialization was not performed.
    //
    // The response indicates success or failure. Some failures are irrecoverable and will
    // require a new producer  instance. See the documentation for TransactionMetadata for a
    // list of error codes.
    producer.initTransactions();
     
    while(true) {
      ConsumerRecords<String, String> records = consumer.poll(CONSUMER_POLL_TIMEOUT);
      if (!records.isEmpty()) {
        // Start a new transaction. This will begin the process of batching the consumed
        // records as well
        // as an records produced as a result of processing the input records.
        //
        // We need to check the response to make sure that this producer is able to initiate
        // a new transaction.
        producer.beginTransaction();
         
        // Process the input records and send them to the output topic(s).
        List<ProducerRecord<String, String>> outputRecords = processRecords(records);
        for (ProducerRecord<String, String> outputRecord : outputRecords) {
          producer.send(outputRecord);
        }
         
        // To ensure that the consumed and produced messages are batched, we need to commit
        // the offsets through
        // the producer and not the consumer.
        //
        // If this returns an error, we should abort the transaction.
         
        sendOffsetsResult = producer.sendOffsetsToTransaction(getUncommittedOffsets());
         
      
        // Now that we have consumed, processed, and produced a batch of messages, let's
        // commit the results.
        // If this does not report success, then the transaction will be rolled back.
        producer.endTransaction();
      }
    }
  }
}

新的配置

broker配置

  • transactional.id.timeout.ms:事务协调者超过多长时间没有收到生产者TransactionalId的事务状态更新就认为其过期。默认值为604800000(7天),这个值使得每星期执行一次的生产者任务可以持续维护其ID。
  • max.transaction.timeout.ms:事务超时时间。如果client请求事务时间超过这个值,那么broker会在InitPidRequest中返回一个InvalidTransactionTimeout异常。这防止client出现超时时间太长,这会使得消费者消费事务相关的主题时变慢。默认值为900000(15分钟),这是一个保守的上限值。
  • transaction.state.log.replication.facto:事务状态主题的副本个数,默认值为3。
  • transaction.state.log.num.partitions:事务状态主题的分区个数,默认值为50。
  • transaction.state.log.min.isr:事务状态主题每个分区拥有多少个insync的副本才被视为上线。默认为2。
  • transaction.state.log.segment.bytes:事务状态主题的日志段大小,默认为104857600字节。

生产者配置

  • enable.idempotence:是否使用幂等写(默认为false)。如果为false,生产者发送消息请求时不会携带PID字段,保持为与之前的语义一样。如果希望使用事务,那么这个值必须置位true。如果为true,那么会额外要求acks=all,retries > 1,和 max.inflight.requests.per.connection=1。因为如果这些条件不满足,那么无法保证幂等性。如果应用没有显示指明这些属性,那么在启用幂等性时生产者会设置acks=all,retries=Integer.MAX_VALUE,和 max.inflight.requests.per.connection=1。
  • transaction.timeout.ms:生产者超过多久没有更新事务状态,事务协调者会将其进行中的事务回滚。这个值会随着InitPidRequest一起发送给事务协调者。如果这个值大于broker设置的max.transaction.timeout.ms,那么请求会抛出InvalidTransactionTimeout异常。默认值为60000,防止下游消费阻塞等待超过1分钟。
  • transactional.id:事务投递所使用的TransactionalId值。这个可以保证多个生产者会话的可靠性语义,因为这可以保证在使用相同TransactionalId的情况下,老的事务必须完成才能开启新的事务。需要注意的是,如果启用这个值,必须先设置enable.idempotence为true。此值默认为空,意味着没有使用事务。

消费者配置

  • isolation.level:以下是可以取的值(默认为read_uncommitted):1)read_uncommitted:按位移顺序按序消费消息,无论其为提交还是未提交。2)read_committed:按位移顺序按序消费消息,但只消费非事务消息和已提交的事务消息;为了保持位移顺序,read_committed会使得消费者需要在获取到同一事务中的所有消息前需要缓存消息。

语义保证

生产者幂等性保证

为了实现生产者幂等性语义,我们引入了生产者ID(也称为PID)和消息序列号的概念。每一个新的生产者在初始化的时候都会赋予一个PID。PID的设置是对使用者透明的,不会在客户端中暴露出来。

对于一个指定的PID,序列号从0开始并且单调递增,每个主题分区都有一个序列号序列。生产者发送消息到broker后会增加序列号。broker则在内存中维护每个PID发到主题分区的序列号,一旦发现当前收到的序列号没有比上一次收到的序列号刚好大1,那么就会拒绝当前的生产者请求。如果消息携带的序列号比预期低而导致重复异常,生产者会忽略掉这个异常;如果消息携带的序列号比预期高而导致乱序异常,这就意味着有一些消息可能丢失了,这个异常是非常严重的。

通过这样的方法,就保证了即便生产者在出现失败的情况下进行重试,每个消息也只会在日志中仅出现一次。由于每个新的生产者实例都会分配一个新的唯一PID,因此只能保证单个生产者会话中实现幂等性。

这些幂等的生产者语义对于像指标跟踪和审计等应用可能非常有用。

事务保证

事务保证的核心就是,使得应用能够原子性的生产消息到多个分区,写入到这些分区的消息要么都成功要么都失败。

进一步地,由于消费者也是通过写入到位移主题来进行记录的,因此事务的能力可以用来使得应用将消费动作和生产动作原子化,也就是说消息被消费了当且仅当整个“消费-转换-生产”的链条都执行完毕。

另外,有状态的应用也可以实现跨越多个会话的连续性。也就是说,Kafka可以保证跨越应用边界的生产幂等性和事务性。为了达到这个目标,应用需要提供一个唯一ID,而且这个唯一ID能够跨越应用边界保持稳定不变。在下面的阐述中,会使用TransactionalId表示这个ID。TransactionalId和PID是一一对应的,区别在于TransactionalId是用户提供的,至于为什么TransactionalId能够保证跨越生产者会话的幂等性的原因下面来分析。

当提供了这样的一个TransactionalId,Kafka保证:

  1. 对于一个TransactionalId,只会有一个活跃的生产者。当具有相同TransactionalId的生产者上线时,会把老的生产者强制下线。
  2. 事务恢复跨越应用会话。如果一个应用实例死亡,下一个实例启动时会保证之前进行中的事务会被结束(提交或回滚),这样就保证了新的实例处于一个干净的状态。

需要注意的是,这里提到的事务保证是从生产者的角度来看的。对于消费者,这个保证会稍微弱一点。具体来讲,我们不能保证一个已提交事务的所有消息可以一起被消费。原因如下:

  1. 对于compact类型的主题,一个事务中的消息可能被更新的版本所代替。
  2. 事务可能跨越日志段。因此当老的日志段被删除了,可能会损失一个事务的开始部分。
  3. 消费者可以定位到事务中的任意位置开始消费,因此可能会丢失该事务的开始部分消息。
  4. 消费者可能消费不到事务中涉及到的分区。因此不能读取到该事务的所有消息。

核心概念

为了实现事务,也就是保证一组消息可以原子性生产和消费,Kafka引入了如下概念;

  1. 引入了事务协调者(Transaction Coordinator)的概念。与消费者的组协调者类似,每个生产者会有对应的事务协调者,赋予PID和管理事务的逻辑都由事务协调者来完成。
  2. 引入了事务日志(Transaction Log)的内部主题。与消费者位移主题类似,事务日志是每个事务的持久化多副本存储。事务协调者使用事务日志来保存当前活跃事务的最新状态快照。
  3. 引入了控制消息(Control Message)的概念。这些消息是客户端产生的并写入到主题的特殊消息,但对于使用者来说不可见。它们是用来让broker告知消费者之前拉取的消息是否被原子性提交。控制消息之前在 这里被提到过。
  4. 引入了TransactionalId的概念,TransactionalId可以让使用者唯一标识一个生产者。一个生产者被设置了相同的TransactionalId的话,那么该生产者的不同实例会恢复或回滚之前实例的未完成事务。
  5. 引入了生产者epoch的概念。生产者epoch可以保证对于一个指定的TransactionalId只会有一个合法的生产者实例,从而保证事务性即便出现故障的情况下。

除了引入了上述概念之外,Kafka还有新的请求类型、已有请求类型的版本升级和新的消息格式,以支持事务。这些细节在本篇文章中不过多涉及。

数据流

data-flow

在上面这幅图中,尖角框代表不同的机器,圆角框代表Kafka的主题分区(TopicPartition),对角线圆角框代表运行在broker中的逻辑实体。

每个箭头代表一个rpc或者主题的写入。这些操作的先后顺序见旁边的数字,下面按顺序来介绍这些操作。

1. 查询事务协调者(FindCoordinatorRequest请求)

事务协调者是设置PID和管理事务的核心,因此生产者第一件事就是向broker发起FindCoordinatorRequest请求(之前命名为GroupCoordinatorRequest,此版本将其重命名)获取其协调者。

2. 获取生产者ID(InitPidRequest请求)

在查询到事务协调者之后,生产者下一步就是获取其生产者ID,这一步是通过向事务协调者发送InitPidRequest来实现。

2.1 如果指定了TransactionalId的话

如果在配置中指定了transactional.id,transactional.id会在InitPidRequest请求中传递过来,transactional.id与生产者ID的映射会在步骤2a中记录到事务日志。这样未来的生产者如果发送了相同的transactional.id则返回这个相同的PID,从而可以恢复或回滚之前未完成的事务。

在返回PID之外,InitPidRequest还会完成如下任务:

  1. 增加生产者的epoch值,这样之前的生产者僵尸实例会被断开,不能继续操作事务。
  2. 恢复(提交或回滚)之前该PID对应的生产者实例的未完成事务。

InitPidRequest请求是同步的,一旦返回,生产者可以发送数据和开启新的事务。

2.2 如果TransactionalId未指定

如果TransactionalId未指定,会赋予一个新的PID,该生产者可以在其当前会话期间实现幂等性和事务性语义。

3. 开启事务(beginTransaction方法)

新的KafkaProducer有一个beginTransaction()方法,调用该方法会开启一个新的事务。生产者在本地状态中记录事务已经开始,只有发送第一个记录时协调者才会知道事务开始状态。

4. 消费-转换-生产的循环

在这个阶段中,生产者开始事务中的消费-转换-生产循环,这个阶段比较长而且可能由多个请求组成。

4.1 AddPartitionsToTxnRequest

在一个事务中,如果需要写入一个新的主题分区,那么生产者会发送此请求到事务协调者。协调者在步骤4.1a中会记录该分区添加到事务中。这个信息是必要的,因为这样才能写入提交或回滚标记到事务中的每个分区(见5.2步骤)。如果这是事务写入的第一个分区,那么协调者还会开始事务定时器。

4.2 ProduceRequest

生产者通过一个或多个ProduceRequests请求(在生产者send方法内部发出)写入消息到主题中。这些请求包含PID,epoch和序列号,见图中的4.2a。

4.3 AddOffsetCommitsToTxnRequest

生产者有一个新的sendOffsetsToTransaction方法,该方法可以将消息消费和生产结合起来。方法参数包含一个Map<TopicPartitions, OffsetAndMetadata>和一个groupId。

sendOffsetsToTransaction内部发送一个带有groupId的AddOffsetCommitsToTxnRequests请求到事务协调者,事务协调者从内部的__consumer-offsets主题中根据此消费者组获取到相应的主题分区。事务协调者在步骤4.3a中把这个主题分区记录到事务日志中。

4.4 TxnOffsetCommitRequest

生产者发送TxnOffsetCommitRequest请求到消费协调者来在主题__consumer-offsets中持久化位移(见4.4a)。消费协调者通过请求中的PID和生产者epoch来验证生产者是否允许发起该请求。

已消费的位移在提交事务之后才对外可见,此过程在下面来讨论。

5. 提交或回滚事务

消息数据写入之后,使用者需要调用KafkaProducer中的commitTransaction或abortTransaction方法,这两个方法分别为事务的提交和回滚处理方法。

5.1 EndTxnRequest

当生产者结束事务的时候,需要调用KafkaProducer.endTransaction或者KafkaProducer.abortTransaction方法。前者使得步骤4中的数据对下游的消费者可见,后者则从日志中抹除已生产的数据:这些数据不会对用户可见,也就是说下游消费者会读取并丢弃这些回滚消息。

无论调用哪个方法,生产者都是会发起EndTxnRequest请求到事务协调者,然后通过参数来指明事务提交或回滚。接收到此请求后,协调者会:

  1. 写入PREPARE_COMMIT或PREPARE_ABORT消息到事务日志(见5.1a)
  2. 通过WriteTxnMarkerRequest请求写入命令消息(COMMIT或ABORT)到用户日志中(见下面5.2)
  3. 写入COMMITTED或ABORTED消息到事务日志中

5.2 WriteTxnMarkerRequest请求

这个请求是事务协调者发给事务中每个分区的leader的。接收到此请求后,每个broker会写入COMMIT(PID)或ABORT(PID) 控制消息到日志中(步骤5.2a)。

这个消息向消费者指明该PID的消息传递给用户还是丢弃。因此,消费者接收到带有PID的消息后会缓存起来,直到读取到COMMIT或者ABORT消息,然后决定消息是通知用户还是丢弃。

另外,如果事务中涉及到__consumer-offsets主题,那么commit或者abort的标记同样写入到日志中,消费协调者会被告知这些位移是否标记为已消费(事务提交则为已消费,事务回滚则忽略这些位移)。见步骤4.2a。

5.3 写入最后的提交或回滚消息

在commit或abort标记写入到数据日志后,事务协调者写入最终的COMMITTED或ABORTED消息到事务日志,标记该事务已经完成(见图中的步骤5.3)。在这个时候,事务日志中关于这个事务的大部分消息都可以被删除;只需要保留该事务的PID和时间戳,这样可以最终删除关于该生产者的TransactionalId->PID映射,详情可参考PID过期的相关资料。

相关 [kafka 有且仅有 语义] 推荐:

Kafka的有且仅有一次语义与事务消息

- - Dengshenyu
最近看到Kafka官方wiki上有一篇关于有且仅有一次语义与事务消息的文档(见 这里),里面说的非常详细. 对于有且仅有一次语义与事务消息是什么东西,大家可以看我的上一篇博客,或者看Kafka的这篇wiki,这里不做展开. 这篇文章主要整理关于该语义和事务消息的API接口、数据流和配置. 生产者新增了5个新的方法(initTransactions, beginTransaction, sendOffsets, commitTransaction, abortTransaction),并且发送接口也增加了一个新的异常.

kafka监控之kafka-run-class.sh

- - 开源软件 - ITeye博客
kafka自带了很多工具类,在源码kafka.tools里可以看到:. 这些类该如何使用呢,kafka的设计者早就为我们考虑到了,在${KAFKA_HOME}/bin下,有很多的脚本,其中有一个kafka-run-class.sh,通过这个脚本,可以调用其中的tools的部分功能,如调用kafka.tools里的ConsumerOffsetChecker.scala,.

闲扯kafka mq

- - 开源软件 - ITeye博客
本文主要讲解关于kafka mq的设计思想及个人理解. 关于kafka的详细信息,大家可以参考官网的文献 http://kafka.apache.org/documentation.html这是一篇相当不错的文章,值得仔细研读. 第一个问题:消息队列(Message Queue)是干嘛用的. 首先,要对消息队列有一个基本的理解.

Kafka优化

- - ITeye博客
配置优化都是修改server.properties文件中参数值. 1.网络和io操作线程配置优化. # broker处理消息的最大线程数. # broker处理磁盘IO的线程数. 一般num.network.threads主要处理网络io,读写缓冲区数据,基本没有io等待,配置线程数量为cpu核数加1.

Kafka Connect简介

- - 鸟窝
Kafka 0.9+增加了一个新的特性 Kafka Connect,可以更方便的创建和管理数据流管道. 它为Kafka和其它系统创建规模可扩展的、可信赖的流数据提供了一个简单的模型,通过 connectors可以将大数据从其它系统导入到Kafka中,也可以从Kafka中导出到其它系统. Kafka Connect可以将完整的数据库注入到Kafka的Topic中,或者将服务器的系统监控指标注入到Kafka,然后像正常的Kafka流处理机制一样进行数据流处理.

kafka consumer group offset

- - 开源软件 - ITeye博客
     kafka0.9及以前版本kafka offset 保存在zookeeper, 因频繁读写zookeeper性能不高;从0.10开始,主题分区offset存储于kafka独立主题中.     管理监控kafka主题及分区offset至关重要,原网上很开源流行工具KafkaOffsetMonitor、kafka-manager,旧版offset保存于zookeeper,kafka consumer无相应API,从kafka0.10.1.1以后提供相应API读取主题分区offset(也可以调用KafkaClient API,kafka管理API由scala语言编写).

Kafka设计解析(二):Kafka High Availability (上)

- -
Kafka在0.8以前的版本中,并不提供High Availablity机制,一旦一个或多个Broker宕机,则宕机期间其上所有Partition都无法继续提供服务. 若该Broker永远不能再恢复,亦或磁盘故障,则其上数据将丢失. 而Kafka的设计目标之一即是提供数据持久化,同时对于分布式系统来说,尤其当集群规模上升到一定程度后,一台或者多台机器宕机的可能性大大提高,对Failover要求非常高.

GitHub - andreas-schroeder/kafka-health-check: Health Check for Kafka Brokers.

- -
At AutoScout24, to keep the OS up to date of our clusters running on AWS, we perform regular in-place rolling updates. As we run immutable servers, we terminate each broker and replace them with fresh EC2 instances (keeping the previous broker ids).

Kafka编程实例

- - CSDN博客云计算推荐文章
    Producer是一个应用程序,它创建消息并发送它们到Kafka broker中. 这些producer在本质上是不同. 比如,前端应用程序,后端服务,代理服务,适配器对于潜在的系统,Hadoop对于的Producer. 这些不同的Producer能够使用不同的语言实现,比如java、C和Python.

kafka集群安装

- - 互联网 - ITeye博客
kafka是LinkedIn开发并开源的一个分布式MQ系统,现在是Apache的一个孵化项目. 在它的主页描述kafka为一个高吞吐量的分布式(能将消息分散到不同的节点上)MQ. 在这片博文中,作者简单提到了开发kafka而不选择已有MQ系统的原因. Kafka仅仅由7000行Scala编写,据了解,Kafka每秒可以生产约25万消息(50 MB),每秒处理55万消息(110 MB).