apache kafka消息服务
- - CSDN博客架构设计推荐文章apache kafka中国社区QQ群:162272557. apache kafka参考. 消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息. 消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息. Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费.
① 每个partition会创建3个备份replica,并分配到broker集群中; --replication-factor 3
② 用zookeeper来管理,consumer、producer、broker的活动状态;
③ 分配的每个备份replica的id和broker的id保持一致;
④ 对每个partition,会选择一个broker作为集群的leader;
① 站点用户活动追踪:页面浏览,搜索,点击;
② 用户/管理员网站操作的监控;
③ 日志处理;
sh zk-jiqun-start.sh
sh jiqun-start.sh
1 #!/bin/bash 2 3 ### 4 # start 3 broker kafka cluster 5 ### 6 7 sh /Users/hyy044101331/java_tools/kafka_2.10-0.8.2.0/bin/kafka-server-start.sh /Users/hyy044101331/java_tools/kafka_2.10-0.8.2.0/config/server_1.properties& 8 sh /Users/hyy044101331/java_tools/kafka_2.10-0.8.2.0/bin/kafka-server-start.sh /Users/hyy044101331/java_tools/kafka_2.10-0.8.2.0/config/server_2.properties& 9 sh /Users/hyy044101331/java_tools/kafka_2.10-0.8.2.0/bin/kafka-server-start.sh /Users/hyy044101331/java_tools/kafka_2.10-0.8.2.0/config/server_3.properties
<!-- *** kafka *** --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.0</version> <scope>compile</scope> <exclusions> <exclusion> <artifactId>jmxri</artifactId> <groupId>com.sun.jmx</groupId> </exclusion> <exclusion> <artifactId>jms</artifactId> <groupId>javax.jms</groupId> </exclusion> <exclusion> <artifactId>jmxtools</artifactId> <groupId>com.sun.jdmk</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.10.3</version> </dependency> <!-- <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.8.2.0</version> </dependency> --> <dependency> <groupId>com.yammer.metrics</groupId> <artifactId>metrics-core</artifactId> <version>2.1.2</version> </dependency> <!-- *************** -->
import com.mengka.kafka.KafkaConstant; public class Taa { public static void main(String[] args) throws Exception{ ConsumerGroup consumerGroup = new ConsumerGroup(KafkaConstant.KAFKA_ZOOKEEPER_CONNECT,"pv",KafkaConstant.KAFKA_TOPIC); consumerGroup.consumer(); Thread.sleep(10000); consumerGroup.shutdown(); } }
import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.serializer.StringDecoder; import kafka.utils.VerifiableProperties; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; /** * 配置consumer, <br> * <br> * 》》kafka提供的consumer API,高级api里面已经做了以下处理: * <ul> * <li>维护consumer状态;</li> * <li>负载均衡;</li> * </ul> * * @author mengka.hyy * */ public class ConsumerGroup { private static final Log log = LogFactory.getLog(ConsumerGroup.class); private final ConsumerConnector consumer; private final String topic; public ConsumerGroup(String zk, String groupId, String topic) { log.info("---------, init consumer zk = " + zk + " , groupId = " + groupId + " , topic = " + topic); consumer = kafka.consumer.Consumer .createJavaConsumerConnector(createConsumerConfig(zk, groupId)); this.topic = topic; } /** * 消费消息,String类型 * */ public void consumer_String() { Map<String, Integer> topicMap = new HashMap<String, Integer>(); topicMap.put(topic, new Integer(1)); StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties()); StringDecoder valueDecoder = new StringDecoder( new VerifiableProperties()); Map<String, List<KafkaStream<String, String>>> consumerMap = consumer .createMessageStreams(topicMap, keyDecoder, valueDecoder); KafkaStream<String, String> stream = consumerMap.get(topic).get(0); ConsumerIterator<String, String> it = stream.iterator(); while (it.hasNext()){ log.info("----------------, receive message = "+it.next().message()); } } /** * 消费消息,byte类型 * */ public void consumer() { Map<String, Integer> topicMap = new HashMap<String, Integer>(); topicMap.put(topic, new Integer(1)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer .createMessageStreams(topicMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); KafkaStream<byte[], byte[]> stream = streams.get(0); ConsumerIterator<byte[], byte[]> it = stream.iterator(); while (it.hasNext()){ String message = new String(it.next().message()); log.info("----------------, receive message = "+message); } } /** * consumer配置 * <ul> * <li> * "zookeeper.session.timeout.ms": <br> * zookeeper的session过期时间,默认5000ms,用于检测消费者是否挂掉,当消费者挂掉, * 其他消费者要等该指定时间才能检查到并且触发重新负载均衡;</li> * <li> * "group.id":<br> * 指定消费组;</li> * <li> * "zookeeper.sync.time.ms":<br> * 当consumer reblance时,重试失败时时间间隔;</li> * <li> * "auto.commit.interval.ms":<br> * 自动更新时间。默认60 * 1000;</li> * </ul> * * @param a_zookeeper * @param a_groupId * @return */ private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) { Properties props = new Properties(); props.put("auto.offset.reset", "smallest"); // 必须要加,如果要读旧数据 props.put("zookeeper.connect", a_zookeeper); props.put("group.id", a_groupId); props.put("zookeeper.session.timeout.ms", "10000"); // props.put("zookeeper.connection.timeout.ms", "10000"); props.put("zookeeper.sync.time.ms", "2000"); props.put("auto.commit.interval.ms", "1000"); props.put("serializer.class", "kafka.serializer.StringEncoder"); ConsumerConfig config = new ConsumerConfig(props); return config; } /** * 释放资源 */ public void shutdown() { if (consumer != null) consumer.shutdown(); } }
public class KafkaConstant { /** * kafka集群配置: * --broker-list localhost:9093,localhost:9094,localhost:9095 */ public static final String KAFKA_BROKER_LIST = "localhost:9093,localhost:9094,localhost:9095"; /** * topic名称 * --topic mengka-broker-3 */ public static final String KAFKA_TOPIC = "mengka-broker-3"; /** * zookeeper配置: * --zookeeper localhost:2181 */ public static final String KAFKA_ZOOKEEPER_CONNECT = "localhost:2181"; public static final String KAFKA_CONSUMER_ZOOKEEPER_CONNECT = "localhost:2181/config/mobile/mq/mafka"; }
public class TaaProducer { public static void main(String[] args) { /** * step01: 配置producer */ Properties props = new Properties(); props.put("metadata.broker.list", KafkaConstant.KAFKA_BROKER_LIST); props.put("serializer.class", "kafka.serializer.StringEncoder");//配置value的序列化类 props.put("key.serializer.class", "kafka.serializer.StringEncoder"); //配置key的序列化类 props.put("partitioner.class", "com.mengka.kafka.producer.SimplePartitioner"); props.put("request.required.acks", "1"); ProducerConfig config = new ProducerConfig(props); /** * step02: 发送kafka消息 */ KeyedMessage<String, String> data1 = new KeyedMessage<String, String>( KafkaConstant.KAFKA_TOPIC, "12.13.14.15", "baicai AAA.." + TimeUtil.toDate(new Date(), TimeUtil.format_1)); KeyedMessage<String, String> data2 = new KeyedMessage<String, String>( KafkaConstant.KAFKA_TOPIC, "12.13.14.16", "baicai BBB.." + TimeUtil.toDate(new Date(), TimeUtil.format_1)); Producer<String, String> producer = new Producer<String, String>(config); producer.send(data1); producer.send(data2); /** * step03: 释放资源 */ producer.close(); } }
public class SimplePartitioner implements Partitioner { public SimplePartitioner(VerifiableProperties props) { } @Override public int partition(Object key, int a_numPartitions) { int partition = 0; String stringKey = (String) key; int offset = stringKey.lastIndexOf('.'); if (offset > 0) { partition = Integer.parseInt(stringKey.substring(offset + 1)) % a_numPartitions; } return partition; } }