背景概述 写道
kafka0.9及以前版本kafka offset 保存在zookeeper, 因频繁读写zookeeper性能不高;从0.10开始,主题分区offset存储于kafka独立主题中。
管理监控kafka主题及分区offset至关重要,原网上很开源流行工具KafkaOffsetMonitor、kafka-manager,旧版offset保存于zookeeper,kafka consumer无相应API,从kafka0.10.1.1以后提供相应API读取主题分区offset(也可以调用KafkaClient API,kafka管理API由scala语言编写)
上述开源kafka管理工具对于配置有KAFKA SASL安全认证的集群,需要修改相应代码重新编译,也就是直接下载连接不上kafka集群。KafkaOffsetMonitor、kafka-manager都是采用scala语言编写,本节程序大都用Java程序编写,少部份采用JAVA调用了scala接口方法。如果有必要我们下节将KafkaOffsetMonitor升级,增加kafka安全性支持。
Kafka SASL安全配置,本节暂不做讲解。
KafkaConsumer 接口描述 写道
KafkaConsumer 接口
1)Map<String, List<PartitionInfo>> listTopics() 返回所有主题分区列表信息
2)List<PartitionInfo> partitionsFor(String topic) 返回指定主题的分区列表信息
3)Map<TopicPartition, Long> beginningOffsets(List<TopicPartition> topicPartition) 返回指定分区列表中所有分区开始偏移Offset
4)Map<TopicPartition, Long> endOffsets(List<TopicPartition> topicPartition) 返回指定分区列表中所有分区结束偏移Offset(即为LogEndOffset)
KafkaConsumer 线程不安全,所在close方法上加上锁; 通下述接口可以获取主题及分区Offset信息。
package com.sunshine.kafka;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import com.sunshine.boot.MainBootApplication;
/**
* Kafa meta informations
* @author oy
*
*/
public class KafkaConsumeMeta {
private final KafkaConsumer<Integer, String> consumer;
private final String topic;
private ReentrantLock lock = new ReentrantLock();
public KafkaConsumeMeta(String servers) {
this(null, servers);
}
public Properties getDefaultProperties(){
Properties props = new Properties();
// kafka server
//props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.178:9092");
// group id
props.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "90000");
//props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30000");
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "670000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// load props
Map<String, String> map = MainBootApplication.getAllPropertiesByPrefix("kafka");
for(Map.Entry<String, String> entry : map.entrySet()){
props.put(entry.getKey(), entry.getValue());
}
return props;
}
public KafkaConsumeMeta(String topic, String servers) {
Properties props = getDefaultProperties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
consumer = new KafkaConsumer<>(props);
this.topic = topic;
}
public KafkaConsumeMeta(String servers, boolean security) {
this(null, servers, security);
}
public KafkaConsumeMeta(String topic, String servers, boolean security) {
Properties props = getDefaultProperties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
// 安全
if(security){
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
}
consumer = new KafkaConsumer<>(props);
this.topic = topic;
}
/**
* 查询所有主题分区信息
* key=topic
* value=List<PartitionInfo>
* @return
*/
public Map<String, List<PartitionInfo>> getAllTopicPartiions(){
Map<String, List<PartitionInfo>> map = consumer.listTopics();
return map;
}
/**
* 查询指定主题分区列表信息
* @param topic
* @return
*/
public List<PartitionInfo> getPartitionInfo(String topic){
return consumer.partitionsFor(topic);
}
/**
*
* @param info
* @return
*/
public TopicPartition transforTopicPartition(PartitionInfo info){
return new TopicPartition(info.topic(),info.partition());
}
/**
*
* @param list
* @return
*/
public Collection<TopicPartition> transforTopicPartition(List<PartitionInfo> list){
List<TopicPartition> result = new ArrayList<TopicPartition>();
for(PartitionInfo info : list){
result.add(transforTopicPartition(info));
}
return result;
}
/**
* 获取分区开始的偏移量
* @param partitions
* @return
*/
public Map<TopicPartition, Long> getBeginningOffsets(List<TopicPartition> topicPartition){
return consumer.beginningOffsets(topicPartition);
}
/**
* 获取分区结束的偏移量
* @param partitions
* @return
*/
public Map<TopicPartition, Long> getEndOffsets(List<TopicPartition> topicPartition){
return consumer.endOffsets(topicPartition);
}
/**
* @return the consumer
*/
public KafkaConsumer<Integer, String> getConsumer() {
return consumer;
}
public OffsetAndMetadata getCurrentOffsetAndMetadata(TopicPartition tp){
return consumer.committed(new TopicPartition(tp.topic(),tp.partition()));
}
public void close(){
// consumer is unsafe thread object
lock.lock();
try {
if(consumer!=null)
consumer.close();
} catch (Exception e) {
e.printStackTrace();
} finally{
lock.unlock();
}
}
}
主题消费组Offfset信息 写道
管理监控中我们需要获取某个主题、使用某个组在各个分区消耗费位情况,以此来了解消费者的消费能力。
package com.sunshine.kafka;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.sunshine.boot.MainBootApplication;
import kafka.admin.AdminClient;
import kafka.admin.AdminClient.ConsumerSummary;
import kafka.common.TopicAndPartition;
import kafka.coordinator.GroupOverview;
import scala.Option;
import scala.collection.JavaConverters;
public class KafkaConsumeGroupMetaByAdminClient {
private final AdminClient client ;
private final String servers;
private final boolean security;
private final String groupId;
private final String topic;
private KafkaConsumer<String, String> consumer;
private Logger log = LoggerFactory.getLogger(getClass());
private ReentrantLock lock = new ReentrantLock();
public KafkaConsumeGroupMetaByAdminClient(String servers, String topic, String groupId, boolean security){
Properties properties = new Properties();
String deserializer = new StringDeserializer().getClass().getName();
// kafka servers
properties.put(ExtConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
properties.put(ExtConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.put(ExtConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
properties.put(ExtConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "500000");
properties.put(ExtConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializer);
properties.put(ExtConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);
if (security) {
properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.mechanism", "PLAIN");
}
this.security = security;
this.servers = servers;
this.groupId = groupId;
this.topic = topic;
this.client = AdminClient.create(properties);
}
public Set<GroupOverview> listAllConsumerGroups() {
scala.collection.immutable.List<GroupOverview> list = client.listAllConsumerGroupsFlattened();
List<GroupOverview> result = JavaConverters.asJavaListConverter(list).asJava();
return new HashSet<GroupOverview>(result);
}
/**
* 使用Kafka自带管理接口AdminClient获取组消费情况
* 使用同一组名消费的所有主题及分区列表信息
* @return
*/
public List<CounsumeGroupMode> getConsumeGroup() {
lock.lock();
Option<scala.collection.immutable.List<ConsumerSummary>> option = client.describeConsumerGroup(groupId);
try{
if(option.isEmpty()){
log.error("Consumer group "+groupId+" is rebalancing.");
} else {
List<ConsumerSummary> consumers = JavaConverters.asJavaListConverter(option.get()).asJava();
KafkaConsumer<String, String> consumer = getConsumer();
List<CounsumeGroupMode> result = new ArrayList<>();
for(ConsumerSummary consumerSummary : consumers){
List<TopicPartition> tps = JavaConverters.asJavaListConverter(consumerSummary.assignment()).asJava();
List<TopicAndPartition> taps = new ArrayList<>();
Map<TopicAndPartition, Long> partitionOffsets = new HashMap<TopicAndPartition, Long>();
for(TopicPartition tp : tps){
TopicAndPartition topicAndPartition = new TopicAndPartition(tp.topic(), tp.partition());
taps.add(topicAndPartition);
OffsetAndMetadata offsetAndMetadata = consumer.committed(new TopicPartition(tp.topic(),tp.partition()));
partitionOffsets.put(topicAndPartition, (offsetAndMetadata == null)?0:offsetAndMetadata.offset());
}
List<CounsumeGroupMode> t = describeTopicPartition(groupId, taps, partitionOffsets,consumerSummary.clientId()+"_"+consumerSummary.clientHost());
result.addAll(t);
}
return result;
}
} catch(Exception e){
e.printStackTrace();
} finally{
lock.unlock();
}
return Collections.EMPTY_LIST;
}
/**
* 获取指定主题所有分区开始偏移Offset、最大Offset、当前组消费Offset
* @return
*/
public List<ExtPartitionInfo> getTopicPartitionsAllOffset(){
List<ExtPartitionInfo> result = new ArrayList<ExtPartitionInfo>();
try{
lock.lock();
KafkaConsumer<String, String> consumer = getConsumer();
List<PartitionInfo> pis = consumer.partitionsFor(this.topic);
List<TopicPartition> tps = transforTopicPartition(pis);
Map<TopicPartition, Long> beginOffsets = consumer.beginningOffsets(tps);
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(tps); //1
for (PartitionInfo pi : pis) {
TopicPartition _tp = new TopicPartition(pi.topic(), pi.partition());
// 获取当前主题分区消息位置方法1
//OffsetAndMetadata offsetAndMetadata = consumer.committed(_tp);
//long currentOffset = (offsetAndMetadata == null)?0:offsetAndMetadata.offset();
// 获取当前主题分区消息位置方法2
long currentOffset = getCurrentOffset(pi.topic(), pi.partition());
// 统一使用endOffsets值
// 原注释位置1先执行,所以注释位置2获取的值大于等于注释位置1的值
//long logendOffset = getLogEndOffset(pi.topic(), pi.partition()); //2
ExtPartitionInfo epi = new ExtPartitionInfo(pi.topic(), pi.partition(), pi.leader(), pi.replicas(),
pi.inSyncReplicas(), beginOffsets.get(_tp), endOffsets.get(_tp), currentOffset, endOffsets.get(_tp));
result.add(epi);
}
Collections.sort(result);
} catch(Exception e){
e.printStackTrace();
} finally{
lock.unlock();
}
return result;
}
private List<TopicPartition> transforTopicPartition(List<PartitionInfo> list){
List<TopicPartition> result = new ArrayList<TopicPartition>();
for(PartitionInfo info : list){
result.add(transforTopicPartition(info));
}
return result;
}
private TopicPartition transforTopicPartition(PartitionInfo info){
return new TopicPartition(info.topic(),info.partition());
}
private List<CounsumeGroupMode> describeTopicPartition(String group, List<TopicAndPartition> topicPartitions,
Map<TopicAndPartition, Long> partitionOffsets,String owner){
List<CounsumeGroupMode> rs = new ArrayList<>();
for(TopicAndPartition tap : topicPartitions){
long logEndOffset = getLogEndOffset(tap.topic(), tap.partition());
long currentOffset = partitionOffsets.get(tap);
CounsumeGroupMode cgm = new CounsumeGroupMode(group, tap.topic(), tap.partition(),
currentOffset, logEndOffset, owner);
rs.add(cgm);
}
return rs;
}
/**
* 查询当前主题分区消费Offset
* @param topic
* @param partition
* @return
*/
private long getCurrentOffset(String topic, int partition){
KafkaConsumer<String, String> consumer = getConsumer();
TopicPartition topicPartition = new TopicPartition(topic, partition);
consumer.assign(Arrays.asList(topicPartition));
long logEndOffset = consumer.position(topicPartition);
return logEndOffset;
}
private long getLogEndOffset(String topic, int partition){
KafkaConsumer<String, String> consumer = getConsumer();
TopicPartition topicPartition = new TopicPartition(topic, partition);
consumer.assign(Arrays.asList(topicPartition));
consumer.seekToEnd(Arrays.asList(topicPartition));
long logEndOffset = consumer.position(topicPartition);
return logEndOffset;
}
public KafkaConsumer<String, String> getConsumer(){
if (consumer == null)
consumer = createNewConsumer();
return consumer;
}
public List<Node> listAllBrokers() {
return JavaConverters.asJavaListConverter(client.bootstrapBrokers()).asJava();
}
public void close() {
lock.lock();
try {
client.close();
if (consumer != null)
consumer.close();
} catch (Exception e) {
e.printStackTrace();
} finally{
lock.unlock();
}
}
private KafkaConsumer<String, String> createNewConsumer() {
Properties properties = new Properties();
String deserializer = new StringDeserializer().getClass().getName();
properties.put(ExtConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.servers);
if(groupId != null)
properties.put(ExtConsumerConfig.GROUP_ID_CONFIG, this.groupId);
properties.put(ExtConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.put(ExtConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
// New consumer api
// 用新组查询主题分区当前消费位置时,需要设置为earliest,默认为lastest
properties.put(ExtConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "lastest");
properties.put(ExtConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "500000");
properties.put(ExtConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializer);
properties.put(ExtConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);
if (security) {
properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.mechanism", "PLAIN");
}
return new KafkaConsumer<String, String>(properties);
}
public class CounsumeGroupMode implements Comparable<CounsumeGroupMode>{
public String getGroup() {
return group;
}
public void setGroup(String group) {
this.group = group;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public int getPartition() {
return partition;
}
public void setPartition(int partition) {
this.partition = partition;
}
public long getCurrent_offset() {
return current_offset;
}
public void setCurrent_offset(long current_offset) {
this.current_offset = current_offset;
}
public long getLog_eng_offset() {
return log_eng_offset;
}
public void setLog_eng_offset(long log_eng_offset) {
this.log_eng_offset = log_eng_offset;
}
public long getLAG() {
return LAG;
}
public void setLAG(long lAG) {
LAG = lAG;
}
public String getOwner() {
return owner;
}
public void setOwner(String owner) {
this.owner = owner;
}
private String group;
private String topic;
private int partition;
private long current_offset;
private long log_eng_offset;
private long LAG;
private String owner;
public CounsumeGroupMode(String group, String topic, int partition,
long current_offset, long log_eng_offset, String owner) {
this.group = group;
this.topic = topic;
this.partition = partition;
this.current_offset = current_offset;
this.log_eng_offset = log_eng_offset;
LAG = (log_eng_offset-current_offset);
this.owner = owner;
}
@Override
public int compareTo(CounsumeGroupMode o) {
if(this.partition > o.partition){
return 1;
} else if(this.partition == o.partition){
return 0;
}
return -1;
}
@Override
public int hashCode() {
int code = 10;
return group.hashCode()*code +
topic.hashCode()*code +
partition*code +
owner.hashCode();
}
}
}
写道
扩展PartitionInfo,增加主题总数据量、已消费数据旱、剩余待消费数据量等信息
package com.sunshine.kafka;
import org.apache.kafka.common.Node;
/**
* 扩展
* @author oy
*
*/
public class ExtPartitionInfo implements Comparable<ExtPartitionInfo>{
//========主题分区===========
private final String topic;
private final int partition;
//========主题据量统计============
private final long beginOffsets;
private final long endOffsets;
private final long pcounts;
//=======备份、ISR同步节点信息====
private final Node leader;
private final Node[] replicas;
private final Node[] inSyncReplicas;
private final String leaderStr;
private final String relicasStr;
private final String inSyncReplicasStr;
//========消费统计==========
private long currentOffset;// 当前消费offset
private long logendOffset; // 分区最大offset
private long lag;// 待消费数据量
public ExtPartitionInfo(String topic, int partition, Node leader,
Node[] replicas, Node[] inSyncReplicas, long beginOffsets,
long endOffsets) {
this.topic = topic;
this.partition = partition;
this.leader = leader;
this.replicas = replicas;
this.inSyncReplicas = inSyncReplicas;
this.beginOffsets = beginOffsets;
this.endOffsets = endOffsets;
//this.pcounts = (endOffsets-beginOffsets);
this.pcounts = endOffsets;
this.leaderStr = leader.toString();
this.relicasStr = fmtNodeIds(replicas);
this.inSyncReplicasStr = fmtNodeIds(inSyncReplicas);
}
public ExtPartitionInfo(String topic, int partition, Node leader, Node[] replicas, Node[] inSyncReplicas,
long beginOffsets, long endOffsets, long currentOffset, long logendOffset) {
super();
this.topic = topic;
this.partition = partition;
this.leader = leader;
this.replicas = replicas;
this.inSyncReplicas = inSyncReplicas;
this.beginOffsets = beginOffsets;
this.endOffsets = endOffsets;
//this.pcounts = (endOffsets-beginOffsets);
this.pcounts = endOffsets;
this.leaderStr = leader.toString();
this.relicasStr = fmtNodeIds(replicas);
this.inSyncReplicasStr = fmtNodeIds(inSyncReplicas);
this.currentOffset = currentOffset;
this.logendOffset = logendOffset;
this.lag = (logendOffset-currentOffset);
}
@Override
public String toString() {
return String.format("Partition(topic = %s, partition = %d, beginOffset=%d, endOffset=%d, counts=%d, leader = %s, replicas = %s, isr = %s)",
topic,
partition,
beginOffsets,
endOffsets,
pcounts,
leader == null ? "none" : leader.id(),
fmtNodeIds(replicas),
fmtNodeIds(inSyncReplicas));
}
/* Extract the node ids from each item in the array and format for display */
private String fmtNodeIds(Node[] nodes) {
StringBuilder b = new StringBuilder("[");
for (int i = 0; i < nodes.length - 1; i++) {
b.append(Integer.toString(nodes[i].id()));
b.append(',');
}
if (nodes.length > 0) {
b.append(Integer.toString(nodes[nodes.length - 1].id()));
b.append(',');
}
b.append("]");
return b.toString();
}
@Override
public int compareTo(ExtPartitionInfo arg0) {
if(arg0.partition < this.partition){
return 1;
} else if(arg0.partition > this.partition){
return -1;
}
return 0;
}
/**
* @return the pcounts
*/
public long getPcounts() {
return pcounts;
}
/**
* @return the topic
*/
public String getTopic() {
return topic;
}
/**
* @return the partition
*/
public int getPartition() {
return partition;
}
/**
* @return the leader
*/
public Node getLeader() {
return leader;
}
/**
* @return the replicas
*/
public Node[] getReplicas() {
return replicas;
}
/**
* @return the inSyncReplicas
*/
public Node[] getInSyncReplicas() {
return inSyncReplicas;
}
/**
* @return the beginOffsets
*/
public long getBeginOffsets() {
return beginOffsets;
}
/**
* @return the endOffsets
*/
public long getEndOffsets() {
return endOffsets;
}
public String getLeaderStr() {
return leaderStr;
}
public String getRelicasStr() {
return relicasStr;
}
public String getInSyncReplicasStr() {
return inSyncReplicasStr;
}
public long getCurrentOffset() {
return currentOffset;
}
public long getLogendOffset() {
return logendOffset;
}
public long getLag() {
return lag;
}
}
写道
单元测试如下:
package com.sunshine.kafka.test;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
import org.apache.http.HttpResponse;
import org.apache.http.NameValuePair;
import org.apache.http.client.HttpClient;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.requests.ListGroupsResponse;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.sunshine.boot.MainBootApplication;
import com.sunshine.kafka.KafkaConsumeMeta;
import com.sunshine.kafka.KafkaJavaClient;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:/spring/applicationContext.xml")
//@SpringBootTest(classes=MainBootApplication.class)
public class KafkaMetaTest {
public static final String HTTP = "http://127.0.0.1:8080/kmeta/api/kafka/meta/topic";
public HttpClient client = HttpClients.createDefault();
private String servers = "192.168.2.178:9092";
@Before
public void init(){
URL url = MainBootApplication.class.getClassLoader().getResource("kafka_client_jaas.conf");
System.setProperty("java.security.auth.login.config", url.getPath());
}
@Test
public void testKafkaMetaList()throws Exception{
long btime = System.currentTimeMillis();
//String httpURL = HTTP + "/430100-passrec";
String httpURL = HTTP + "/test";
HttpPost post = new HttpPost(httpURL);
List<NameValuePair> params=new ArrayList<NameValuePair>();
params.add(new BasicNameValuePair("servers",servers));
params.add(new BasicNameValuePair("security","true"));
post.setEntity(new UrlEncodedFormEntity(params));
HttpResponse response = client.execute(post);
String sendPostResult = EntityUtils.toString(response.getEntity());
long etime = System.currentTimeMillis();
long ttime = (etime - btime);
System.out.println("查询时间:" + ttime + "--- sendPostResult:" + sendPostResult);
}
@Test
public void testTopicOffset()throws Exception{
long btime = System.currentTimeMillis();
KafkaConsumeMeta kafkaMeta = new KafkaConsumeMeta(servers, true);
List<PartitionInfo> list = kafkaMeta.getPartitionInfo("topic1");
for(PartitionInfo p : list){
System.out.println("主题:"+p.topic() + "-->" + p.toString());
}
Map<TopicPartition, Long> offsets = kafkaMeta.getEndOffsets((List)kafkaMeta.transforTopicPartition(list));
for(Entry<TopicPartition, Long> entry : offsets.entrySet()){
TopicPartition tp = entry.getKey() ;
System.out.println(tp.topic() + "-" + tp.partition() + "---" + entry.getValue());
}
long etime = System.currentTimeMillis();
long ttime = (etime - btime);
System.out.println("查询时间:" + ttime);
}
@Test
public void testTopicPartition()throws Exception{
long btime = System.currentTimeMillis();
KafkaConsumeMeta kafkaMeta = new KafkaConsumeMeta(servers, true);
List<PartitionInfo> list = kafkaMeta.getPartitionInfo("topic1");
for(PartitionInfo p : list){
System.out.println("主题:"+p.topic() + "-->" + p.toString());
}
long etime = System.currentTimeMillis();
long ttime = (etime - btime);
System.out.println("查询时间:" + ttime);
}
@Test
public void testConsumGroup()throws Exception{
try {
KafkaJavaClient kcg = new KafkaJavaClient(servers,true);
//Node node = new Node(0,"68.28.6.104",9094);
List<ListGroupsResponse.Group> result = kcg.listGroups(kcg.getLoadedNode());
//List<ListGroupsResponse.Group> result = kcg.listGroups(node);
for(ListGroupsResponse.Group group : result){
System.out.println(group.groupId() + "--" + group.protocolType());
}
for(Node node : kcg.findAllBrokers()){
System.out.println(node.toString());
}
System.out.println("coordinator:" + kcg.findCoordinator("sc_veh_group3").toString());
System.out.println("consumer groups:" + kcg.listAllConsumerGroups());
System.out.println("describe group:" + kcg.describeGroup("sc_veh_group3"));
} catch (Exception e) {
e.printStackTrace();
} catch (Throwable e) {
e.printStackTrace();
}
}
}
package com.sunshine.kafka.test;
import java.net.URL;
import java.util.List;
import java.util.Set;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.sunshine.boot.MainBootApplication;
import com.sunshine.kafka.ExtPartitionInfo;
import com.sunshine.kafka.KafkaConsumeGroupMetaByAdminClient;
import com.sunshine.kafka.KafkaConsumeGroupMetaByAdminClient.CounsumeGroupMode;
import kafka.coordinator.GroupOverview;
@RunWith(SpringJUnit4ClassRunner.class)
public class KafkaConsumeGroupMetaByAdminClientTest {
private String servers = "192.168.121.200:9092";
@Before
public void init(){
URL url = MainBootApplication.class.getClassLoader().getResource("kafka_client_jaas.conf");
System.setProperty("java.security.auth.login.config", url.getPath());
}
@Test
public void testListAllConsumeGroup(){
KafkaConsumeGroupMetaByAdminClient client = new KafkaConsumeGroupMetaByAdminClient(servers,"","", true);
Set<GroupOverview> set = client.listAllConsumerGroups();
for (GroupOverview go : set) {
System.out.println(go.groupId() + "-" + go.protocolType());
}
}
@Test
public void testGetConsumeGroup(){
KafkaConsumeGroupMetaByAdminClient client = new KafkaConsumeGroupMetaByAdminClient(servers,"","g1", true);
List<CounsumeGroupMode> list = client.getConsumeGroup();
for(CounsumeGroupMode cgm : list){
System.out.println("GROUP:"+cgm.getGroup() + "-TOPIC:" + cgm.getTopic() + "-PARTITION:" + cgm.getPartition() +
"-CURRENTOFFSET:" + cgm.getCurrent_offset() + "-LOGENDOFFSET:" + cgm.getLog_eng_offset() + "-LAG:" +
cgm.getLAG());
}
}
@Test
public void testGetTopicPartitionOffset(){
long btime = System.currentTimeMillis();
KafkaConsumeGroupMetaByAdminClient client = new KafkaConsumeGroupMetaByAdminClient(servers,"440100_PASS","group1", true);
List<ExtPartitionInfo> list = client.getTopicPartitionsAllOffset();
for(ExtPartitionInfo epi : list){
System.out.println("-TOPIC:" + epi.getTopic() + "-PARTITION:" + epi.getPartition() +
"-CURRENTOFFSET:" + epi.getCurrentOffset() + "-LOGENDOFFSET:" + epi.getLogendOffset() + "-LAG:" +
epi.getLag() + "-BEGINOFFSET:" + epi.getBeginOffsets() + "-ENDOFFSET:" + epi.getEndOffsets());
}
long etime = System.currentTimeMillis();
long ttime = (etime - btime);
System.out.println("查询时间:" + ttime);
}
}
写道
完整代码可以附件下载