kafka集群broker节点扩容方案_watermelonbig的专栏-CSDN博客

标签: | 发表时间:2020-02-03 10:35 | 作者:
出处:https://blog.csdn.net
在用kafka集群有3个节点,即host1, host2, host3,现需要将broker节点扩容至6个,以提供更高的数据处理能力。
一、上架物理服务器用于提供更多的资源
新扩容3个broker节点,host4,host5,host6

二、在三台新增节点上部署kafka应用程序
这些不是本文的重点,略过。

三、重新分布原有的topic分区

1、查看集群中当前所有可用的topic
./kafka-topics.sh --list --zookeeper ip:port
lcf-201612201649
test-for-sys-monitor
2、查看特定topic的详细信息
./kafka-topics.sh --describe --zookeeper 192.168.1.92:2181 --topic lcf-201612201649
Topic:lcf-201612201649    PartitionCount:24    ReplicationFactor:3    Configs:
    Topic: lcf-201612201649    Partition: 0    Leader: 1    Replicas: 1,3,2    Isr: 1,3,2
    Topic: lcf-201612201649    Partition: 1    Leader: 2    Replicas: 2,1,3    Isr: 3,1,2
    Topic: lcf-201612201649    Partition: 2    Leader: 3    Replicas: 3,2,1    Isr: 3,1,2
    Topic: lcf-201612201649    Partition: 3    Leader: 1    Replicas: 1,2,3    Isr: 1,3,2
    Topic: lcf-201612201649    Partition: 4    Leader: 2    Replicas: 2,3,1    Isr: 3,1,2
    Topic: lcf-201612201649    Partition: 5    Leader: 3    Replicas: 3,1,2    Isr: 3,1,2
    Topic: lcf-201612201649    Partition: 6    Leader: 1    Replicas: 1,3,2    Isr: 1,3,2
    Topic: lcf-201612201649    Partition: 7    Leader: 2    Replicas: 2,1,3    Isr: 1,3,2
    Topic: lcf-201612201649    Partition: 8    Leader: 3    Replicas: 3,2,1    Isr: 3,1,2
    Topic: lcf-201612201649    Partition: 9    Leader: 1    Replicas: 1,2,3    Isr: 1,3,2
    Topic: lcf-201612201649    Partition: 10    Leader: 2    Replicas: 2,3,1    Isr: 3,1,2
    Topic: lcf-201612201649    Partition: 11    Leader: 3    Replicas: 3,1,2    Isr: 3,1,2
    Topic: lcf-201612201649    Partition: 12    Leader: 1    Replicas: 1,3,2    Isr: 1,3,2
    Topic: lcf-201612201649    Partition: 13    Leader: 2    Replicas: 2,1,3    Isr: 3,1,2
    Topic: lcf-201612201649    Partition: 14    Leader: 3    Replicas: 3,2,1    Isr: 3,1,2
    Topic: lcf-201612201649    Partition: 15    Leader: 1    Replicas: 1,2,3    Isr: 1,3,2
    Topic: lcf-201612201649    Partition: 16    Leader: 2    Replicas: 2,3,1    Isr: 3,1,2
    Topic: lcf-201612201649    Partition: 17    Leader: 3    Replicas: 3,1,2    Isr: 3,1,2
    Topic: lcf-201612201649    Partition: 18    Leader: 1    Replicas: 1,3,2    Isr: 1,3,2
    Topic: lcf-201612201649    Partition: 19    Leader: 2    Replicas: 2,1,3    Isr: 1,3,2
    Topic: lcf-201612201649    Partition: 20    Leader: 3    Replicas: 3,2,1    Isr: 3,1,2
    Topic: lcf-201612201649    Partition: 21    Leader: 1    Replicas: 1,2,3    Isr: 1,3,2
    Topic: lcf-201612201649    Partition: 22    Leader: 2    Replicas: 2,3,1    Isr: 3,1,2
    Topic: lcf-201612201649    Partition: 23    Leader: 3    Replicas: 3,1,2    Isr: 3,1,2

3、kafka集群分区日志迁移
我们的目标是把原来分布在3个节点上的topic(24partitions, 3replicas),将全部partitions重新分布到全部的6个节点上去。

(1) 先制作topics-to-move.json文件
cat << EOF > topic-to-move.json
{"topics": [{"topic": "lcf-201612201649"}],
"version":1
}
EOF

(2)使用-generate生成迁移计划
[testuser@c4 bin]$ ./bin/kafka-reassign-partitions.sh --zookeeper 192.168.1.92:2181 --topics-to-move-json-file  ./plans/topic-to-move.json  --broker-list "1,2,3,4,5,6" --generate
Current partition replica assignment
{"version":1,"partitions":[{"topic":"lcf-201612201649","partition":1,"replicas":[2,1,3]},{"topic":"lcf-201612201649","partition":8,"replicas":[3,2,1]},{"topic":"lcf-201612201649","partition":19,"replicas":[2,1,3]},{"topic":"lcf-201612201649","partition":15,"replicas":[1,2,3]},{"topic":"lcf-201612201649","partition":18,"replicas":[1,3,2]},{"topic":"lcf-201612201649","partition":13,"replicas":[2,1,3]},{"topic":"lcf-201612201649","partition":0,"replicas":[1,3,2]},{"topic":"lcf-201612201649","partition":10,"replicas":[2,3,1]},{"topic":"lcf-201612201649","partition":5,"replicas":[3,1,2]},{"topic":"lcf-201612201649","partition":12,"replicas":[1,3,2]},{"topic":"lcf-201612201649","partition":17,"replicas":[3,1,2]},{"topic":"lcf-201612201649","partition":9,"replicas":[1,2,3]},{"topic":"lcf-201612201649","partition":7,"replicas":[2,1,3]},{"topic":"lcf-201612201649","partition":20,"replicas":[3,2,1]},{"topic":"lcf-201612201649","partition":23,"replicas":[3,1,2]},{"topic":"lcf-201612201649","partition":3,"replicas":[1,2,3]},{"topic":"lcf-201612201649","partition":2,"replicas":[3,2,1]},{"topic":"lcf-201612201649","partition":4,"replicas":[2,3,1]},{"topic":"lcf-201612201649","partition":11,"replicas":[3,1,2]},{"topic":"lcf-201612201649","partition":6,"replicas":[1,3,2]},{"topic":"lcf-201612201649","partition":14,"replicas":[3,2,1]},{"topic":"lcf-201612201649","partition":22,"replicas":[2,3,1]},{"topic":"lcf-201612201649","partition":16,"replicas":[2,3,1]},{"topic":"lcf-201612201649","partition":21,"replicas":[1,2,3]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"lcf-201612201649","partition":1,"replicas":[4,1,2]},{"topic":"lcf-201612201649","partition":15,"replicas":[6,5,1]},{"topic":"lcf-201612201649","partition":8,"replicas":[5,3,4]},{"topic":"lcf-201612201649","partition":19,"replicas":[4,5,6]},{"topic":"lcf-201612201649","partition":13,"replicas":[4,3,5]},{"topic":"lcf-201612201649","partition":18,"replicas":[3,4,5]},{"topic":"lcf-201612201649","partition":0,"replicas":[3,6,1]},{"topic":"lcf-201612201649","partition":10,"replicas":[1,5,6]},{"topic":"lcf-201612201649","partition":5,"replicas":[2,5,6]},{"topic":"lcf-201612201649","partition":12,"replicas":[3,2,4]},{"topic":"lcf-201612201649","partition":9,"replicas":[6,4,5]},{"topic":"lcf-201612201649","partition":17,"replicas":[2,1,3]},{"topic":"lcf-201612201649","partition":7,"replicas":[4,2,3]},{"topic":"lcf-201612201649","partition":20,"replicas":[5,6,1]},{"topic":"lcf-201612201649","partition":23,"replicas":[2,3,4]},{"topic":"lcf-201612201649","partition":3,"replicas":[6,3,4]},{"topic":"lcf-201612201649","partition":4,"replicas":[1,4,5]},{"topic":"lcf-201612201649","partition":2,"replicas":[5,2,3]},{"topic":"lcf-201612201649","partition":11,"replicas":[2,6,1]},{"topic":"lcf-201612201649","partition":6,"replicas":[3,1,2]},{"topic":"lcf-201612201649","partition":22,"replicas":[1,2,3]},{"topic":"lcf-201612201649","partition":14,"replicas":[5,4,6]},{"topic":"lcf-201612201649","partition":16,"replicas":[1,6,2]},{"topic":"lcf-201612201649","partition":21,"replicas":[6,1,2]}]}
[testuser@c4 bin]$
注:生成一个向broker 1,2,3,4,5,6迁移指定topic的迁移计划。输出内容中包括当前的分布配置和即将改变后的分布配置。
将以上命令的输出内容保存为json文件。其中当前分布配置备份为backup.json,改变后的分布配置保存为expand-cluster-reassignment.json

(3)使用-execute执行迁移计划
[testuser@c4 kafka]$ ./bin/kafka-reassign-partitions.sh --zookeeper 192.168.1.92:2181 --reassignment-json-file ./plans/reassignment-lcf-201612201649.json --execute
Current partition replica assignment
{"version":1,"partitions":[{"topic":"lcf-201612201649","partition":1,"replicas":[2,1,3]},{"topic":"lcf-201612201649","partition":8,"replicas":[3,2,1]},{"topic":"lcf-201612201649","partition":19,"replicas":[2,1,3]},{"topic":"lcf-201612201649","partition":15,"replicas":[1,2,3]},{"topic":"lcf-201612201649","partition":18,"replicas":[1,3,2]},{"topic":"lcf-201612201649","partition":13,"replicas":[2,1,3]},{"topic":"lcf-201612201649","partition":0,"replicas":[1,3,2]},{"topic":"lcf-201612201649","partition":10,"replicas":[2,3,1]},{"topic":"lcf-201612201649","partition":5,"replicas":[3,1,2]},{"topic":"lcf-201612201649","partition":12,"replicas":[1,3,2]},{"topic":"lcf-201612201649","partition":17,"replicas":[3,1,2]},{"topic":"lcf-201612201649","partition":9,"replicas":[1,2,3]},{"topic":"lcf-201612201649","partition":7,"replicas":[2,1,3]},{"topic":"lcf-201612201649","partition":20,"replicas":[3,2,1]},{"topic":"lcf-201612201649","partition":23,"replicas":[3,1,2]},{"topic":"lcf-201612201649","partition":3,"replicas":[1,2,3]},{"topic":"lcf-201612201649","partition":2,"replicas":[3,2,1]},{"topic":"lcf-201612201649","partition":4,"replicas":[2,3,1]},{"topic":"lcf-201612201649","partition":11,"replicas":[3,1,2]},{"topic":"lcf-201612201649","partition":6,"replicas":[1,3,2]},{"topic":"lcf-201612201649","partition":14,"replicas":[3,2,1]},{"topic":"lcf-201612201649","partition":22,"replicas":[2,3,1]},{"topic":"lcf-201612201649","partition":16,"replicas":[2,3,1]},{"topic":"lcf-201612201649","partition":21,"replicas":[1,2,3]}]}
 
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions {"version":1,"partitions":[{"topic":"lcf-201612201649","partition":12,"replicas":[3,2,4]},{"topic":"lcf-201612201649","partition":17,"replicas":[2,1,3]},{"topic":"lcf-201612201649","partition":8,"replicas":[5,3,4]},{"topic":"lcf-201612201649","partition":7,"replicas":[4,2,3]},{"topic":"lcf-201612201649","partition":23,"replicas":[2,3,4]},{"topic":"lcf-201612201649","partition":4,"replicas":[1,4,5]},{"topic":"lcf-201612201649","partition":19,"replicas":[4,5,6]},{"topic":"lcf-201612201649","partition":2,"replicas":[5,2,3]},{"topic":"lcf-201612201649","partition":20,"replicas":[5,6,1]},{"topic":"lcf-201612201649","partition":11,"replicas":[2,6,1]},{"topic":"lcf-201612201649","partition":16,"replicas":[1,6,2]},{"topic":"lcf-201612201649","partition":5,"replicas":[2,5,6]},{"topic":"lcf-201612201649","partition":14,"replicas":[5,4,6]},{"topic":"lcf-201612201649","partition":10,"replicas":[1,5,6]},{"topic":"lcf-201612201649","partition":18,"replicas":[3,4,5]},{"topic":"lcf-201612201649","partition":22,"replicas":[1,2,3]},{"topic":"lcf-201612201649","partition":15,"replicas":[6,5,1]},{"topic":"lcf-201612201649","partition":1,"replicas":[4,1,2]},{"topic":"lcf-201612201649","partition":6,"replicas":[3,1,2]},{"topic":"lcf-201612201649","partition":21,"replicas":[6,1,2]},{"topic":"lcf-201612201649","partition":13,"replicas":[4,3,5]},{"topic":"lcf-201612201649","partition":0,"replicas":[3,6,1]},{"topic":"lcf-201612201649","partition":9,"replicas":[6,4,5]},{"topic":"lcf-201612201649","partition":3,"replicas":[6,3,4]}]}

(4)使用--verify进行迁移结果的验证
[testuser@c4 kafka]$ ./bin/kafka-reassign-partitions.sh --zookeeper 192.168.1.92:2181 --reassignment-json-file ./plans/reassignment-lcf-201612201649.json --verify

查看重分布结果:
./bin/kafka-topics.sh --describe --zookeeper 192.168.1.92:2181 --topic lcf-201612201649

当以上迁移过程导致kafka的leader分布,不符preferred replica分布建议,则可以手动进行再平衡维护。
注:进行分区迁移时,最好先保留一个分区在原来的磁盘,这样不会影响正常的消费和生产。部分迁移则支持正常消费和生产。

(5)关于kafka集群leader平衡机制的维护
./kafka-preferred-replica-election.sh --zookeeper ip:port
或者在配置文件中将参数设置为:
auto.leader.rebalance.enable=true


相关 [kafka 集群 broker] 推荐:

kafka集群broker节点扩容方案_watermelonbig的专栏-CSDN博客

- -
在用kafka集群有3个节点,即host1, host2, host3,现需要将broker节点扩容至6个,以提供更高的数据处理能力. 一、上架物理服务器用于提供更多的资源. 新扩容3个broker节点,host4,host5,host6. 二、在三台新增节点上部署kafka应用程序. 三、重新分布原有的topic分区.

关于Kafka broker IO的讨论 - huxihx - 博客园

- -
  Apache Kafka是大量使用磁盘和页缓存(page cache)的,特别是对page cache的应用被视为是Kafka实现高吞吐量的重要因素之一. 实际场景中用户调整page cache的手段并不太多,更多的还是通过管理好broker端的IO来间接影响page cache从而实现高吞吐量.

kafka集群安装

- - 互联网 - ITeye博客
kafka是LinkedIn开发并开源的一个分布式MQ系统,现在是Apache的一个孵化项目. 在它的主页描述kafka为一个高吞吐量的分布式(能将消息分散到不同的节点上)MQ. 在这片博文中,作者简单提到了开发kafka而不选择已有MQ系统的原因. Kafka仅仅由7000行Scala编写,据了解,Kafka每秒可以生产约25万消息(50 MB),每秒处理55万消息(110 MB).

kafka集群操作命令

- - 开源软件 - ITeye博客
默认Kafka会使用ZooKeeper默认的/路径,这样有关Kafka的ZooKeeper配置就会散落在根路径下面,如果 你有其他的应用也在使用ZooKeeper集群,查看ZooKeeper中数据可能会不直观,所以强烈建议指定一个chroot路径,直接在 zookeeper.connect配置项中指定.

GitHub - GruppoFilippetti/vertx-mqtt-broker: Vert.x based MQTT Broker

- -

【译】调优Apache Kafka集群 - huxihx - 博客园

- -
  今天带来一篇译文“调优Apache Kafka集群”,里面有一些观点并无太多新颖之处,但总结得还算详细. 该文从四个不同的目标出发给出了各自不同的参数配置,值得大家一读~ 原文地址请参考:https://www.confluent.io/blog/optimizing-apache-kafka-deployment/.

Kafka系列(八)跨集群数据镜像

- - Dengshenyu
本系列文章为对《Kafka:The Definitive Guide》的学习整理,希望能够帮助到大家. 在之前系列文章中,我们讨论了一个Kafka集群的搭建、维护和使用,而在实际情况中我们往往拥有多个Kafka集群,而且这些Kafka集群很可能是相互隔离的. 一般来说,这些集群之间不需要进行数据交流,但如果在某些情况下这些集群之间存在数据依赖,那么我们可能需要持续的将数据从一个集群复制到另一个集群.

Kafka的集群部署实践及运维相关

- - DockOne.io
上一篇 Kafka 的文章《 大白话带你认识Kafka》中我们应该已经了解了一些关于基础角色和集群架构相关的问题,这时候我们应该很想了解一下如何构建生产中的Kafka集群或者一些相关的运维工具,所以就应运而生了下文. 假设每天集群需要承载10亿数据. 一天24小时,晚上12点到凌晨8点几乎没多少数据.

Kafka跨集群迁移方案MirrorMaker原理、使用以及性能调优实践 - CSDN博客

- -
Kakfa MirrorMaker是Kafka 官方提供的跨数据中心的流数据同步方案. 其实现原理,其实就是通过从Source Cluster消费消息然后将消息生产到Target Cluster,即普通的消息生产和消费. 用户只要通过简单的consumer配置和producer配置,然后启动Mirror,就可以实现准实时的数据同步.

kafka监控之kafka-run-class.sh

- - 开源软件 - ITeye博客
kafka自带了很多工具类,在源码kafka.tools里可以看到:. 这些类该如何使用呢,kafka的设计者早就为我们考虑到了,在${KAFKA_HOME}/bin下,有很多的脚本,其中有一个kafka-run-class.sh,通过这个脚本,可以调用其中的tools的部分功能,如调用kafka.tools里的ConsumerOffsetChecker.scala,.