kafka发布订阅消息

标签: kafka 消息 | 发表时间:2015-02-28 12:54 | 作者:hyy044101331
出处:http://www.iteye.com

1. kafka原理

 

1-1 基本构成


 

1-2 工作原理



① 每个partition会创建3个备份replica,并分配到broker集群中; --replication-factor 3

② 用zookeeper来管理,consumer、producer、broker的活动状态;

③ 分配的每个备份replica的id和broker的id保持一致;

④ 对每个partition,会选择一个broker作为集群的leader; 

 

1-3 使用场景

① 站点用户活动追踪:页面浏览,搜索,点击;

② 用户/管理员网站操作的监控;

③ 日志处理;

 

2. kafka发布/订阅消息 

2-1 启动zookeeper

 

sh zk-jiqun-start.sh

 

2-2 启动kafka

sh jiqun-start.sh

 

  1 #!/bin/bash
  2 
  3 ###
  4 #  start 3 broker kafka cluster
  5 ###
  6 
  7 sh /Users/hyy044101331/java_tools/kafka_2.10-0.8.2.0/bin/kafka-server-start.sh /Users/hyy044101331/java_tools/kafka_2.10-0.8.2.0/config/server_1.properties&
  8 sh /Users/hyy044101331/java_tools/kafka_2.10-0.8.2.0/bin/kafka-server-start.sh /Users/hyy044101331/java_tools/kafka_2.10-0.8.2.0/config/server_2.properties&
  9 sh /Users/hyy044101331/java_tools/kafka_2.10-0.8.2.0/bin/kafka-server-start.sh /Users/hyy044101331/java_tools/kafka_2.10-0.8.2.0/config/server_3.properties

 

2-3  添加maven依赖

<!-- *** kafka *** -->
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka_2.10</artifactId>
			<version>0.8.0</version>
			<scope>compile</scope>
			<exclusions>
				<exclusion>
					<artifactId>jmxri</artifactId>
					<groupId>com.sun.jmx</groupId>
				</exclusion>
				<exclusion>
					<artifactId>jms</artifactId>
					<groupId>javax.jms</groupId>
				</exclusion>
				<exclusion>
					<artifactId>jmxtools</artifactId>
					<groupId>com.sun.jdmk</groupId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.scala-lang</groupId>
			<artifactId>scala-library</artifactId>
			<version>2.10.3</version>
		</dependency>
		<!-- <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> 
			<version>0.8.2.0</version> </dependency> -->
		<dependency>
			<groupId>com.yammer.metrics</groupId>
			<artifactId>metrics-core</artifactId>
			<version>2.1.2</version>
		</dependency>
		<!-- *************** -->

 

2-4 启动consumer

import com.mengka.kafka.KafkaConstant;

public class Taa {

	public static void main(String[] args) throws Exception{
		
		ConsumerGroup consumerGroup = new ConsumerGroup(KafkaConstant.KAFKA_ZOOKEEPER_CONNECT,"pv",KafkaConstant.KAFKA_TOPIC);
		consumerGroup.consumer();
		
		Thread.sleep(10000);
		consumerGroup.shutdown();
	}

}

 

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/**
 * 配置consumer, <br>
 * <br>
 * 》》kafka提供的consumer API,高级api里面已经做了以下处理:
 * <ul>
 * <li>维护consumer状态;</li>
 * <li>负载均衡;</li>
 * </ul>
 * 
 * @author mengka.hyy
 * 
 */
public class ConsumerGroup {

	private static final Log log = LogFactory.getLog(ConsumerGroup.class);

	private final ConsumerConnector consumer;

	private final String topic;

	public ConsumerGroup(String zk, String groupId, String topic) {
		log.info("---------, init consumer zk = " + zk + " , groupId = "
				+ groupId + " , topic = " + topic);
		consumer = kafka.consumer.Consumer
				.createJavaConsumerConnector(createConsumerConfig(zk, groupId));
		this.topic = topic;
	}

	/**
	 *  消费消息,String类型
	 * 
	 */
	public void consumer_String() {
		Map<String, Integer> topicMap = new HashMap<String, Integer>();
		topicMap.put(topic, new Integer(1));

		StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
		StringDecoder valueDecoder = new StringDecoder(
				new VerifiableProperties());

		Map<String, List<KafkaStream<String, String>>> consumerMap = consumer
				.createMessageStreams(topicMap, keyDecoder, valueDecoder);
		KafkaStream<String, String> stream = consumerMap.get(topic).get(0);

		ConsumerIterator<String, String> it = stream.iterator();
		while (it.hasNext()){
			log.info("----------------, receive message = "+it.next().message());
		}		
	}

	/**
	 *  消费消息,byte类型
	 * 
	 */
	public void consumer() {
		Map<String, Integer> topicMap = new HashMap<String, Integer>();
		topicMap.put(topic, new Integer(1));

		Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
				.createMessageStreams(topicMap);
		List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

		KafkaStream<byte[], byte[]> stream = streams.get(0);
		ConsumerIterator<byte[], byte[]> it = stream.iterator();
		while (it.hasNext()){
			String message = new String(it.next().message());
			log.info("----------------, receive message = "+message);
		}	

	}

	/**
	 * consumer配置
	 * <ul>
	 * <li>
	 * "zookeeper.session.timeout.ms": <br>
	 * zookeeper的session过期时间,默认5000ms,用于检测消费者是否挂掉,当消费者挂掉,
	 * 其他消费者要等该指定时间才能检查到并且触发重新负载均衡;</li>
	 * <li>
	 * "group.id":<br>
	 * 指定消费组;</li>
	 * <li>
	 * "zookeeper.sync.time.ms":<br>
	 * 当consumer reblance时,重试失败时时间间隔;</li>
	 * <li>
	 * "auto.commit.interval.ms":<br>
	 * 自动更新时间。默认60 * 1000;</li>
	 * </ul>
	 * 
	 * @param a_zookeeper
	 * @param a_groupId
	 * @return
	 */
	private static ConsumerConfig createConsumerConfig(String a_zookeeper,
			String a_groupId) {
		Properties props = new Properties();
		props.put("auto.offset.reset", "smallest"); // 必须要加,如果要读旧数据
		props.put("zookeeper.connect", a_zookeeper);
		props.put("group.id", a_groupId);
		props.put("zookeeper.session.timeout.ms", "10000");
		// props.put("zookeeper.connection.timeout.ms", "10000");
		props.put("zookeeper.sync.time.ms", "2000");
		props.put("auto.commit.interval.ms", "1000");
		props.put("serializer.class", "kafka.serializer.StringEncoder");
		ConsumerConfig config = new ConsumerConfig(props);
		return config;
	}

	/**
	 * 释放资源
	 */
	public void shutdown() {
		if (consumer != null)
			consumer.shutdown();
	}
}

 

public class KafkaConstant {

	/**
	 * kafka集群配置: 
	 *   --broker-list localhost:9093,localhost:9094,localhost:9095
	 */
	public static final String KAFKA_BROKER_LIST = "localhost:9093,localhost:9094,localhost:9095";
	
	/**
	 *  topic名称
	 *   --topic mengka-broker-3
	 */
	public static final String KAFKA_TOPIC = "mengka-broker-3";
	
	/**
	 * zookeeper配置:
	 *   --zookeeper localhost:2181
	 */
	public static final String KAFKA_ZOOKEEPER_CONNECT = "localhost:2181";
	
	public static final String KAFKA_CONSUMER_ZOOKEEPER_CONNECT = "localhost:2181/config/mobile/mq/mafka";
}

 

2-5 启动producer

public class TaaProducer {

	public static void main(String[] args) {

		/**
		 * step01: 配置producer
		 */
		Properties props = new Properties();
		props.put("metadata.broker.list", KafkaConstant.KAFKA_BROKER_LIST);
		props.put("serializer.class", "kafka.serializer.StringEncoder");//配置value的序列化类
		props.put("key.serializer.class", "kafka.serializer.StringEncoder"); //配置key的序列化类
		props.put("partitioner.class",
				"com.mengka.kafka.producer.SimplePartitioner");
		props.put("request.required.acks", "1");
		ProducerConfig config = new ProducerConfig(props);

		/**
		 * step02: 发送kafka消息
		 */
		KeyedMessage<String, String> data1 = new KeyedMessage<String, String>(
				KafkaConstant.KAFKA_TOPIC, "12.13.14.15", "baicai AAA.."
						+ TimeUtil.toDate(new Date(), TimeUtil.format_1));
		KeyedMessage<String, String> data2 = new KeyedMessage<String, String>(
				KafkaConstant.KAFKA_TOPIC, "12.13.14.16", "baicai BBB.."
						+ TimeUtil.toDate(new Date(), TimeUtil.format_1));

		Producer<String, String> producer = new Producer<String, String>(config);
		producer.send(data1);
		producer.send(data2);

		/**
		 * step03: 释放资源
		 */
		producer.close();
	}

}
public class SimplePartitioner implements Partitioner {

	public SimplePartitioner(VerifiableProperties props) {
	}

	@Override
	public int partition(Object key, int a_numPartitions) {
		int partition = 0;
		String stringKey = (String) key;
		int offset = stringKey.lastIndexOf('.');
		if (offset > 0) {
			partition = Integer.parseInt(stringKey.substring(offset + 1))
					% a_numPartitions;
		}
		return partition;
	}
}

  

2-6 结果


 

 



已有 0 人发表留言,猛击->> 这里<<-参与讨论


ITeye推荐



相关 [kafka 消息] 推荐:

apache kafka消息服务

- - CSDN博客架构设计推荐文章
apache kafka中国社区QQ群:162272557. apache kafka参考. 消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息. 消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息. Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费.

kafka发布订阅消息

- - 企业架构 - ITeye博客
① 每个partition会创建3个备份replica,并分配到broker集群中; --replication-factor 3. ② 用zookeeper来管理,consumer、producer、broker的活动状态;. ③ 分配的每个备份replica的id和broker的id保持一致;.

分布式消息系统:Kafka

- - 标点符
Kafka是分布式发布-订阅消息系统. 它最初由LinkedIn公司开发,之后成为Apache项目的一部分. Kafka是一个分布式的,可划分的,冗余备份的持久性的日志服务. 在大数据系统中,常常会碰到一个问题,整个大数据是由各个子系统组成,数据需要在各个子系统中高性能,低延迟的不停流转. 传统的企业消息系统并不是非常适合大规模的数据处理.

kafka分布式消息系统

- - CSDN博客云计算推荐文章
Kafka[1]是linkedin用于日志处理的分布式消息队列,linkedin的日志数据容量大,但对可靠性要求不高,其日志数据主要包括用户行为(登录、浏览、点击、分享、喜欢)以及系统运行日志(CPU、内存、磁盘、网络、系统及进程状态). 当前很多的消息队列服务提供可靠交付保证,并默认是即时消费(不适合离线).

高性能消息系统——Kafka

- - 互联网 - ITeye博客
引用官方原文: “Kafka is a distributed, partitioned, replicated commit log service.”. 它提供了一个非常特殊的消息机制,不同于传统的mq. 官网:https://kafka.apache.org.     传统的MQ,消息被消化掉后会被mq删除,而kafka中消息被消化后不会被删除,而是到配置的expire时间后,才删除.

Kafka 的消息可靠传递

- - IT瘾-dev
Kafka提供的基础保障可以用来构建可靠的系统, 却无法保证完全可靠. 需要在可靠性和吞吐之间做取舍.. Kafka在分区上提供了消息的顺序保证.. 生产的消息在写入到所有的同步分区上后被认为是. 生产者可以选择在消息提交完成后接收broker的确认, 是写入leader之后, 或者所有的副本. 只要有一个副本存在, 提交的消息就不会丢失.

kafka:一个分布式消息系统 - 十九画生

- - 博客园_首页
最近因为工作需要,调研了追求高吞吐的轻量级消息系统Kafka,打算替换掉线上运行的ActiveMQ,主要是因为明年的预算日流量有十亿,而ActiveMQ的分布式实现的很奇怪,所以希望找一个适合分布式的消息系统. 以下是内容是调研过程中总结的一些知识和经验,欢迎拍砖. 首先,我们来看看什么是消息队列,维基百科里的解释翻译过来如下:.

Apache Kafka:下一代分布式消息系统

- - zzm
Apache Kafka是分布式发布-订阅消息系统. 它最初由LinkedIn公司开发,之后成为Apache项目的一部分. Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务. Apache Kafka与传统消息系统相比,有以下不同:. 它被设计为一个分布式系统,易于向外扩展;.

linkedin高吞吐量分布式消息系统kafka使用手记

- - 五四陈科学院-坚信科学,分享技术
以下内容由 [五四陈科学院]提供. kafka是一种高吞吐量的分布式发布订阅消息系统,她有如下特性:. 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能. 高吞吐量:即使是非常普通的硬件kafka也可以支持每秒数十万的消息. 支持通过kafka服务器和消费机集群来分区消息.

kafka监控之kafka-run-class.sh

- - 开源软件 - ITeye博客
kafka自带了很多工具类,在源码kafka.tools里可以看到:. 这些类该如何使用呢,kafka的设计者早就为我们考虑到了,在${KAFKA_HOME}/bin下,有很多的脚本,其中有一个kafka-run-class.sh,通过这个脚本,可以调用其中的tools的部分功能,如调用kafka.tools里的ConsumerOffsetChecker.scala,.