kafka如何保证消息顺序性?

标签: kafka 消息 | 发表时间:2024-03-04 18:57 | 作者:半亩方塘立身
出处:https://juejin.cn/backend

highlight: androidstudio theme: cyanosis

kafka架构如下:

图片

Kafka 保证消息顺序性的关键在于其分区(Partition)机制。在 Kafka 中,每个主题(Topic)可以被分割成多个分区,消息被追加到每个分区中,并且在每个分区内部,消息是有序的。但是,Kafka 只保证单个分区内的消息顺序,而不保证跨分区的消息顺序。如果需要保证顺序消费,可以采用以下策略:

  1. 分区设计:在 Kafka 主题中根据一定的规则为业务标识分配一个唯一的标识符,并将相同标识符的消息发送到同一个分区中。例如,可以使用组织的ID作为消息的key,这样相同ID的消息会被发送到同一个分区。
  2. 消费者组配置:确保每个消费者组只有一个消费者,这样每个分区只有一个消费者消费消息。这可以确保相同分区的消息只会按照顺序被一个消费者消费。

组织调整如何使用kafka同步下游

当调整组织架构时,确保消息的顺序性尤为重要,因为组织结构的变更可能会影响到多个层级和部门。以下是使用 Kafka 来同步组织架构调整的步骤,我们将通过一个例子来展示如何实现这一过程。

确定分区键

为了保证组织架构调整的顺序性,可以使用组织ID或者根组织ID作为分区键。这样,同一个组织或相关联的组织的所有调整消息都会被发送到同一个分区。

生产者发送消息

生产者在发送组织架构调整消息时,使用组织ID作为键。这样做确保了同一个组织的所有相关消息都会顺序地发送到同一个分区中。

消费者处理消息

消费者从各自的分区读取消息,并按照接收的顺序处理这些组织架构调整的消息。这保证了在单个分区内,组织架构的变更是有序的。

流程图

image.png

  • 生产者(Producer)根据组织ID将组织架构调整消息发送到 Kafka 主题(Topic)。
  • Kafka 根据提供的键(组织ID)将消息路由到相应的分区。
  • 消费者组(Consumer Group)中的消费者按分区消费消息,保证了分区内消息的顺序性。
  • 消费者处理组织架构调整消息并更新数据库。

实现

生产者

生产者将组织架构调整消息发送到Kafka,使用组织ID作为键来保证同一个组织的消息被发送到同一分区。

  public class OrgProducer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        String topic = "org-structure-changes";
        String orgId = "org123"; // 组织ID作为键
        String message = "Org structure updated for org123";

        ProducerRecord<String, String> record = new ProducerRecord<>(topic, orgId, message);

        producer.send(record);
        producer.close();
    }
}

消费者

消费者从Kafka读取组织架构调整的消息,并按顺序处理它们。

  public class OrgConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "org-structure-consumer-group");
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        String topic = "org-structure-changes";
        consumer.subscribe(Collections.singletonList(topic));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("Received message: key = %s, value = %s%n", record.key(), record.value());
                // 处理组织架构调整消息
            }
        }
    }
}

在这个例子中,生产者使用组织ID作为键发送消息,以确保相同组织的消息被发送到相同的分区。消费者从分区中读取消息并按顺序处理,保证了组织架构调整的顺序性。

相关 [kafka 消息] 推荐:

apache kafka消息服务

- - CSDN博客架构设计推荐文章
apache kafka中国社区QQ群:162272557. apache kafka参考. 消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息. 消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息. Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费.

kafka发布订阅消息

- - 企业架构 - ITeye博客
① 每个partition会创建3个备份replica,并分配到broker集群中; --replication-factor 3. ② 用zookeeper来管理,consumer、producer、broker的活动状态;. ③ 分配的每个备份replica的id和broker的id保持一致;.

分布式消息系统:Kafka

- - 标点符
Kafka是分布式发布-订阅消息系统. 它最初由LinkedIn公司开发,之后成为Apache项目的一部分. Kafka是一个分布式的,可划分的,冗余备份的持久性的日志服务. 在大数据系统中,常常会碰到一个问题,整个大数据是由各个子系统组成,数据需要在各个子系统中高性能,低延迟的不停流转. 传统的企业消息系统并不是非常适合大规模的数据处理.

kafka分布式消息系统

- - CSDN博客云计算推荐文章
Kafka[1]是linkedin用于日志处理的分布式消息队列,linkedin的日志数据容量大,但对可靠性要求不高,其日志数据主要包括用户行为(登录、浏览、点击、分享、喜欢)以及系统运行日志(CPU、内存、磁盘、网络、系统及进程状态). 当前很多的消息队列服务提供可靠交付保证,并默认是即时消费(不适合离线).

高性能消息系统——Kafka

- - 互联网 - ITeye博客
引用官方原文: “Kafka is a distributed, partitioned, replicated commit log service.”. 它提供了一个非常特殊的消息机制,不同于传统的mq. 官网:https://kafka.apache.org.     传统的MQ,消息被消化掉后会被mq删除,而kafka中消息被消化后不会被删除,而是到配置的expire时间后,才删除.

Kafka 的消息可靠传递

- - IT瘾-dev
Kafka提供的基础保障可以用来构建可靠的系统, 却无法保证完全可靠. 需要在可靠性和吞吐之间做取舍.. Kafka在分区上提供了消息的顺序保证.. 生产的消息在写入到所有的同步分区上后被认为是. 生产者可以选择在消息提交完成后接收broker的确认, 是写入leader之后, 或者所有的副本. 只要有一个副本存在, 提交的消息就不会丢失.

kafka如何保证消息顺序性?

- - 掘金 后端
Kafka 保证消息顺序性的关键在于其分区(Partition)机制. 在 Kafka 中,每个主题(Topic)可以被分割成多个分区,消息被追加到每个分区中,并且在每个分区内部,消息是有序的. 但是,Kafka 只保证单个分区内的消息顺序,而不保证跨分区的消息顺序. 如果需要保证顺序消费,可以采用以下策略:.

kafka:一个分布式消息系统 - 十九画生

- - 博客园_首页
最近因为工作需要,调研了追求高吞吐的轻量级消息系统Kafka,打算替换掉线上运行的ActiveMQ,主要是因为明年的预算日流量有十亿,而ActiveMQ的分布式实现的很奇怪,所以希望找一个适合分布式的消息系统. 以下是内容是调研过程中总结的一些知识和经验,欢迎拍砖. 首先,我们来看看什么是消息队列,维基百科里的解释翻译过来如下:.

Apache Kafka:下一代分布式消息系统

- - zzm
Apache Kafka是分布式发布-订阅消息系统. 它最初由LinkedIn公司开发,之后成为Apache项目的一部分. Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务. Apache Kafka与传统消息系统相比,有以下不同:. 它被设计为一个分布式系统,易于向外扩展;.

Kafka无消息丢失配置 - huxihx - 博客园

- -
Kafka到底会不会丢数据(data loss). 通常不会,但有些情况下的确有可能会发生. 下面的参数配置及Best practice列表可以较好地保证数据的持久性(当然是trade-off,牺牲了吞吐量). 笔者会在该列表之后对列表中的每一项进行讨论,有兴趣的同学可以看下后面的分析. 使用KafkaProducer.send(record, callback).