Kafka系列之-Kafka监控工具KafkaOffsetMonitor配置及使用 - CSDN博客

标签: | 发表时间:2018-07-24 11:16 | 作者:
出处:https://blog.csdn.net

  KafkaOffsetMonitor是一个可以用于监控Kafka的Topic及Consumer消费状况的工具,其配置和使用特别的方便。源项目Github地址为: https://github.com/quantifind/KafkaOffsetMonitor。 
  最简单的使用方式是从Github上下载一个最新的 KafkaOffsetMonitor-assembly-0.2.1.jar,上传到某服务器上,然后执行一句命令就可以运行起来。但是在使用过程中有可能会发现页面反应缓慢或者无法显示相应内容的情况。据说这是由于jar包中的某些js等文件需要连接到网络,或者需要翻墙导致的。网上找的一个修改版的KafkaOffsetMonitor对应jar包,可以完全在本地运行,经过测试效果不错。下载地址是: http://pan.baidu.com/s/1ntzIUPN,在此感谢一下贡献该修改版的原作者。链接失效的话,可以博客下方留言联系我。

一、KafkaOffsetMonitor的使用

  因为完全没有安装配置的过程,所以直接从KafkaOffsetMonitor的使用开始。 
  将KafkaOffsetMonitor-assembly-0.2.0.jar上传到服务器后,可以新建一个脚本用于启动该应用。脚本内容如下:

    java -cp KafkaOffsetMonitor-assembly-0.2.0.jar \
    com.quantifind.kafka.offsetapp.OffsetGetterWeb \
    --zk m000:2181,m001:2181,m002:2181 \
    --port 8088 \
    --refresh 10.seconds \
    --retain 2.days    
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

  各参数的作用可以参考一下Github上的描述:

  • offsetStorage valid options are ”zookeeper”, ”kafka” or ”storm”. Anything else falls back to ”zookeeper”
  • zk the ZooKeeper hosts
  • port on what port will the app be available
  • refresh how often should the app refresh and store a point in the DB
  • retain how long should points be kept in the DB
  • dbName where to store the history (default ‘offsetapp’)
  • kafkaOffsetForceFromStart only applies to ”kafka” format. Force KafkaOffsetMonitor to scan the commit messages from start (see notes below)
  • stormZKOffsetBase only applies to ”storm” format. Change the offset storage base in zookeeper, default to ”/stormconsumers” (see notes below)
  • pluginsArgs additional arguments used by extensions (see below)

      启动后,访问 m000:8088端口,可以看到如下页面: 
       这里写图片描述
      在这个页面上,可以看到当前Kafka集群中现有的Consumer Groups。

    在上图中有一个Visualizations选项卡,点击其中的Cluster Overview可以查看当前Kafka集群的Broker情况 
    这里写图片描述

      接下来将继续上一篇Kafka相关的文章 Kafka系列之-自定义Producer,在最后对Producer进行包装的基础上,分别实现一个简单的往随机Partition写messge,以及自定义Partitioner的Producer,对KafkaOffsetMonitor其他页面进行展示。

二、简单的Producer

1、新建一个Topic

  首先为本次试验新建一个Topic,命令如下:

    bin/kafka-topics.sh \
    --create \
    --zookeeper m000:2181 \
    --replication-factor 3 \
    --partition 3 \
    --topic kafkamonitor-simpleproducer    
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

2、新建SimpleProducer代码

  在上一篇文章中提到的Producer封装Github代码的基础上,写了一个往kafkamonitor-simpleproducer发送message的java代码。

    import com.ckm.kafka.producer.impl.KafkaProducerToolImpl;
import com.ckm.kafka.producer.inter.KafkaProducerTool;

/**
 * Created by ckm on 2016/8/30.
 */
public class SimpleProducer {
    public static void main(String[] args) {
        KafkaProducerTool kafkaProducerTool = new KafkaProducerToolImpl();
        int i = 0;
        String message = "";
        while (true) {
            message = "test-simple-producer : " + i ++;
            kafkaProducerTool.publishMessage("kafkamonitor-simpleproducer", message);
        }
    }
}    
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

  程序运行效果: 
   这里写图片描述

3、ConsoleConsumer消费该topic

  用kafka自带的ConsoleConsumer消费kafkamonitor-simpleproducer中的message。

    bin/kafka-console-consumer.sh --zookeeper m000:2181 --from-beginning --topic kafkamonitor-simpleproducer    
  • 1

  消费截图如下: 
   这里写图片描述

4、KafkaOffsetMonitor页面

(1)在Topic List选项卡中,我们可以看到刚才新建的 kafkamonitor-simpleproducer 
   这里写图片描述
(2)点开后,能看到有一个console-consumer正在消费该topic 
   这里写图片描述
(3)继续进入该Consumer,可以查看该Consumer当前的消费状况 
   这里写图片描述
  这张图片的左上角显示了当前Topic的生产速率,右上角显示了当前Consumer的消费速率。 
  图片中还有三种颜色的线条,蓝色的表示当前Topic中的Message数目,灰色的表示当前Consumer消费的offset位置,红色的表示蓝色灰色的差值,即当前Consumer滞后于Producer的message数目。 
(4)看一眼各partition中的message消费情况 
   这里写图片描述
  从上图可以看到,当前有3个Partition,每个Partition中的message数目分布很不均匀。这里可以与接下来的自定义Producer的情况进行一个对比。

三、自定义Partitioner的Producer

1、新建一个Topic

    bin/kafka-topics.sh \
    --create \
    --zookeeper m000:2181 \
    --replication-factor 3 \
    --partition 3 \
    --topic kafkamonitor-partitionedproducer    
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

2、Partitioner代码

  逻辑很简单,循环依次往各Partition中发送message。

    import kafka.producer.Partitioner;

/**
 * Created by ckm on 2016/8/30.
 */
public class TestPartitioner implements Partitioner {
    public TestPartitioner() {
    }

    @Override
    public int partition(Object key, int numPartitions) {
        int intKey = (int) key;
        return intKey % numPartitions;
    }
}
    
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

3、Producer代码

  将自定义的Partitioner设置到Producer,其他调用过程和二中类似。

    import com.ckm.kafka.producer.impl.KafkaProducerToolImpl;
import com.ckm.kafka.producer.inter.KafkaProducerTool;

/**
 * Created by ckm on 2016/8/30.
 */
public class PartitionedProducer {
    public static void main(String[] args) {
        KafkaProducerTool kafkaProducerTool = new KafkaProducerToolImpl();
        kafkaProducerTool.getProducerProperties().put("partitioner.class", "TestPartitioner");
        int i = 0;
        String message = "";
        while (true) {
            message = "test-partitioner-producer : " + i;
            System.out.println(message);
            kafkaProducerTool.publishPartitionedMessage("kafkamonitor-partitionedproducer", i + "", message);
            i ++;
        }
    }
}    
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

  代码运行效果如下图: 
   这里写图片描述

4、ConsoleConsumer消费Message

    bin/kafka-console-consumer.sh --zookeeper m000:2181 --from-beginning --topic kafkamonitor-partitionedproducer    
  • 1

  消费效果如下图: 
   这里写图片描述

5、KafkaOffsetMonitor页面

  其他页面与上面的类似,这里只观察一下每个partition中的message数目与第二节中的对比。可以看到这里每个Partition中message分别是很均匀的。 
   这里写图片描述

注意事项: 
  注意这里有一个坑,默认情况下Producer往一个不存在的Topic发送message时会自动创建这个Topic。由于在这个封装中,有同时传递message和topic的情况,如果调用方法时传入的参数反了,将会在Kafka集群中自动创建Topic。在正常情况下,应该是先把Topic根据需要创建好,然后Producer往该Topic发送Message,最好把Kafka这个默认自动创建Topic的功能关掉。 
  那么,假设真的不小心创建了多余的Topic,在删除时,会出现“marked for deletion”提示,只是将该topic标记为删除,使用list命令仍然能看到。如果需要调整这两个功能的话,在server.properties中配置如下两个参数:

参数 默认值 作用
auto.create.topics.enable true Enable auto creation of topic on the server
delete.topic.enable false Enables delete topic. Delete topic through the admin tool will have no effect if this config is turned off

相关 [kafka 系列 kafka] 推荐:

kafka监控之kafka-run-class.sh

- - 开源软件 - ITeye博客
kafka自带了很多工具类,在源码kafka.tools里可以看到:. 这些类该如何使用呢,kafka的设计者早就为我们考虑到了,在${KAFKA_HOME}/bin下,有很多的脚本,其中有一个kafka-run-class.sh,通过这个脚本,可以调用其中的tools的部分功能,如调用kafka.tools里的ConsumerOffsetChecker.scala,.

Kafka系列之-Kafka监控工具KafkaOffsetMonitor配置及使用 - CSDN博客

- -
  KafkaOffsetMonitor是一个可以用于监控Kafka的Topic及Consumer消费状况的工具,其配置和使用特别的方便.   最简单的使用方式是从Github上下载一个最新的. KafkaOffsetMonitor-assembly-0.2.1.jar,上传到某服务器上,然后执行一句命令就可以运行起来.

闲扯kafka mq

- - 开源软件 - ITeye博客
本文主要讲解关于kafka mq的设计思想及个人理解. 关于kafka的详细信息,大家可以参考官网的文献 http://kafka.apache.org/documentation.html这是一篇相当不错的文章,值得仔细研读. 第一个问题:消息队列(Message Queue)是干嘛用的. 首先,要对消息队列有一个基本的理解.

Kafka优化

- - ITeye博客
配置优化都是修改server.properties文件中参数值. 1.网络和io操作线程配置优化. # broker处理消息的最大线程数. # broker处理磁盘IO的线程数. 一般num.network.threads主要处理网络io,读写缓冲区数据,基本没有io等待,配置线程数量为cpu核数加1.

Kafka Connect简介

- - 鸟窝
Kafka 0.9+增加了一个新的特性 Kafka Connect,可以更方便的创建和管理数据流管道. 它为Kafka和其它系统创建规模可扩展的、可信赖的流数据提供了一个简单的模型,通过 connectors可以将大数据从其它系统导入到Kafka中,也可以从Kafka中导出到其它系统. Kafka Connect可以将完整的数据库注入到Kafka的Topic中,或者将服务器的系统监控指标注入到Kafka,然后像正常的Kafka流处理机制一样进行数据流处理.

kafka consumer group offset

- - 开源软件 - ITeye博客
     kafka0.9及以前版本kafka offset 保存在zookeeper, 因频繁读写zookeeper性能不高;从0.10开始,主题分区offset存储于kafka独立主题中.     管理监控kafka主题及分区offset至关重要,原网上很开源流行工具KafkaOffsetMonitor、kafka-manager,旧版offset保存于zookeeper,kafka consumer无相应API,从kafka0.10.1.1以后提供相应API读取主题分区offset(也可以调用KafkaClient API,kafka管理API由scala语言编写).

Kafka系列(八)跨集群数据镜像

- - Dengshenyu
本系列文章为对《Kafka:The Definitive Guide》的学习整理,希望能够帮助到大家. 在之前系列文章中,我们讨论了一个Kafka集群的搭建、维护和使用,而在实际情况中我们往往拥有多个Kafka集群,而且这些Kafka集群很可能是相互隔离的. 一般来说,这些集群之间不需要进行数据交流,但如果在某些情况下这些集群之间存在数据依赖,那么我们可能需要持续的将数据从一个集群复制到另一个集群.

Kafka设计解析(二):Kafka High Availability (上)

- -
Kafka在0.8以前的版本中,并不提供High Availablity机制,一旦一个或多个Broker宕机,则宕机期间其上所有Partition都无法继续提供服务. 若该Broker永远不能再恢复,亦或磁盘故障,则其上数据将丢失. 而Kafka的设计目标之一即是提供数据持久化,同时对于分布式系统来说,尤其当集群规模上升到一定程度后,一台或者多台机器宕机的可能性大大提高,对Failover要求非常高.

GitHub - andreas-schroeder/kafka-health-check: Health Check for Kafka Brokers.

- -
At AutoScout24, to keep the OS up to date of our clusters running on AWS, we perform regular in-place rolling updates. As we run immutable servers, we terminate each broker and replace them with fresh EC2 instances (keeping the previous broker ids).

Kafka编程实例

- - CSDN博客云计算推荐文章
    Producer是一个应用程序,它创建消息并发送它们到Kafka broker中. 这些producer在本质上是不同. 比如,前端应用程序,后端服务,代理服务,适配器对于潜在的系统,Hadoop对于的Producer. 这些不同的Producer能够使用不同的语言实现,比如java、C和Python.