kafka开发实例

标签: kafka 开发 实例 | 发表时间:2014-07-16 20:59 | 作者:liyonghui160com
出处:http://www.iteye.com

 

 

1.启动kafka。

//启动zookeeper server (用&是为了能退出命令行):
bin/zookeeper-server-start.sh config/zookeeper.properties  &
//启动kafka server: 
bin/kafka-server-start.sh config/server.properties  &

2.新建一个生产者例子

import java.util.Properties;
 
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
 
public class KafkaTest {
    public static void main(String[] args) { 
        Properties props = new Properties(); 
        props.put("zk.connect", "10.103.22.47:2181"); 
        props.put("serializer.class", "kafka.serializer.StringEncoder"); 
        props.put("metadata.broker.list", "10.103.22.47:9092");
        props.put("request.required.acks", "1");
        //props.put("partitioner.class", "com.xq.SimplePartitioner");
        ProducerConfig config = new ProducerConfig(props); 
        Producer<String, String> producer = new Producer<String, String>(config); 
        String ip = "192.168.2.3";
        String msg ="this is a messageuuu!";
        KeyedMessage<String, String> data = new KeyedMessage<String, String>("test", ip,msg); 
        producer.send(data);
        producer.close(); 
    } 
 
}

3.新建一个消费者例子

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
import kafka.message.MessageAndMetadata;
 
 
public class ConsumerSample {
 
    public static void main(String[] args) { 
        // specify some consumer properties 
        Properties props = new Properties(); 
        props.put("zookeeper.connect", "10.103.22.47:2181"); 
        props.put("zookeeper.connectiontimeout.ms", "1000000"); 
        props.put("group.id", "test_group"); 
 
            // Create the connection to the cluster 
        ConsumerConfig consumerConfig = new ConsumerConfig(props); 
        ConsumerConnector connector = Consumer.createJavaConsumerConnector(consumerConfig); 
 
 
        Map<String,Integer> topics = new HashMap<String,Integer>(); 
        topics.put("test", 2); 
        Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams = connector.createMessageStreams(topics); 
        List<KafkaStream<byte[], byte[]>> streams = topicMessageStreams.get("test");
        ExecutorService threadPool = Executors.newFixedThreadPool(2); 
        for (final KafkaStream<byte[], byte[]> stream : streams) { 
            threadPool.submit(new Runnable() { 
                public void run() { 
                    for (MessageAndMetadata msgAndMetadata : stream) { 
                        // process message (msgAndMetadata.message()) 
                        System.out.println("topic: " + msgAndMetadata.topic()); 
                        Message message = (Message) msgAndMetadata.message(); 
                        ByteBuffer buffer = message.payload(); 
                        byte[] bytes = new byte[message.payloadSize()]; 
                        buffer.get(bytes); 
                        String tmp = new String(bytes); 
                        System.out.println("message content: " + tmp); 
                    } 
                } 
            }); 
        }   
    }
}



已有 0 人发表留言,猛击->> 这里<<-参与讨论


ITeye推荐



相关 [kafka 开发 实例] 推荐:

kafka开发实例

- - 互联网 - ITeye博客
//启动zookeeper server (用&是为了能退出命令行):. //启动kafka server: . 已有 0 人发表留言,猛击->> 这里<<-参与讨论. —软件人才免语言低担保 赴美带薪读研.

Kafka编程实例

- - CSDN博客云计算推荐文章
    Producer是一个应用程序,它创建消息并发送它们到Kafka broker中. 这些producer在本质上是不同. 比如,前端应用程序,后端服务,代理服务,适配器对于潜在的系统,Hadoop对于的Producer. 这些不同的Producer能够使用不同的语言实现,比如java、C和Python.

(转)Kafka部署与代码实例

- - 开源软件 - ITeye博客
转自: http://shift-alt-ctrl.iteye.com/blog/1930791.   kafka作为分布式日志收集或系统监控服务,我们有必要在合适的场合使用它. kafka的部署包括zookeeper环境/kafka环境,同时还需要进行一些配置操作.接下来介绍如何使用kafka..

Apache Kafka开发入门指南之2

- - CSDN博客云计算推荐文章
Apache Kafka开发入门指南之2. 作者:chszs,转载需注明. 博客主页: http://blog.csdn.net/chszs. Apache Kafka目标是统一离线和在线处理,与Flume和Scribe相比较,Kafka在处理活动流数据方面更具优势. 但是从架构的视野来看,Kafka与传统的消息系统(例如ActiveMQ或RabbitMQ)更相似一些.

kafka监控之kafka-run-class.sh

- - 开源软件 - ITeye博客
kafka自带了很多工具类,在源码kafka.tools里可以看到:. 这些类该如何使用呢,kafka的设计者早就为我们考虑到了,在${KAFKA_HOME}/bin下,有很多的脚本,其中有一个kafka-run-class.sh,通过这个脚本,可以调用其中的tools的部分功能,如调用kafka.tools里的ConsumerOffsetChecker.scala,.

[译] 每个 Apache Kafka 开发者都应该知道的 5 件事

- - IT瘾-dev
Apache Kafka 是一个开源流处理平台,如今有超过30%的财富500强企业使用该平台. Kafka 有很多特性使其成为事件流平台(event streaming platform)的事实上的标准. 在这篇博文中,我将介绍每个 Kafka 开发者都应该知道的五件事,这样在使用 Kafka 就可以避免很多问题.

闲扯kafka mq

- - 开源软件 - ITeye博客
本文主要讲解关于kafka mq的设计思想及个人理解. 关于kafka的详细信息,大家可以参考官网的文献 http://kafka.apache.org/documentation.html这是一篇相当不错的文章,值得仔细研读. 第一个问题:消息队列(Message Queue)是干嘛用的. 首先,要对消息队列有一个基本的理解.

Kafka优化

- - ITeye博客
配置优化都是修改server.properties文件中参数值. 1.网络和io操作线程配置优化. # broker处理消息的最大线程数. # broker处理磁盘IO的线程数. 一般num.network.threads主要处理网络io,读写缓冲区数据,基本没有io等待,配置线程数量为cpu核数加1.

Kafka Connect简介

- - 鸟窝
Kafka 0.9+增加了一个新的特性 Kafka Connect,可以更方便的创建和管理数据流管道. 它为Kafka和其它系统创建规模可扩展的、可信赖的流数据提供了一个简单的模型,通过 connectors可以将大数据从其它系统导入到Kafka中,也可以从Kafka中导出到其它系统. Kafka Connect可以将完整的数据库注入到Kafka的Topic中,或者将服务器的系统监控指标注入到Kafka,然后像正常的Kafka流处理机制一样进行数据流处理.

kafka consumer group offset

- - 开源软件 - ITeye博客
     kafka0.9及以前版本kafka offset 保存在zookeeper, 因频繁读写zookeeper性能不高;从0.10开始,主题分区offset存储于kafka独立主题中.     管理监控kafka主题及分区offset至关重要,原网上很开源流行工具KafkaOffsetMonitor、kafka-manager,旧版offset保存于zookeeper,kafka consumer无相应API,从kafka0.10.1.1以后提供相应API读取主题分区offset(也可以调用KafkaClient API,kafka管理API由scala语言编写).