Kafka编程实例

标签: kafka 编程 实例 | 发表时间:2014-08-29 18:15 | 作者:GreatElite
出处:http://blog.csdn.net

 编程

    Producer是一个应用程序,它创建消息并发送它们到Kafka broker中。这些producer在本质上是不同。比如,前端应用程序,后端服务,代理服务,适配器对于潜在的系统,Hadoop对于的Producer。这些不同的Producer能够使用不同的语言实现,比如java、C和Python。下面的这部图表解释了消息producer的Kafka API.


下面将详细介绍如果编写一个简单的Producer和Consumer应用程序。

发送简单消息给Kafka broker,Producer端编写类ClusterProducer。

public classClusterProducer extends Thread {
    private static final Log log =LogFactory.getLog(ClusterProducer.class);
 
    public void sendData() {
        Random rnd = new Random();
        Properties props =PropertiesParser.getProperties(PropertiesSettings.PRODUCER_FILE_NAME);
        if (props == null) {
            log.error("can't loadspecified file " + PropertiesSettings.PRODUCER_FILE_NAME);
           return;
        }
        //set the producer configurationproperties
        ProducerConfig config = newProducerConfig(props);
        Producer<String, String> producer= new Producer<String, String>(config);
 
        //Send the data
        int count = 1;
        KeyedMessage<String, String>data;
        while (count < 100) {
            String sign = "*";
            String ip = "192.168.2."+ rnd.nextInt(255);
            StringBuffer sb = newStringBuffer();
            for (int i = 0; i < count; i++){
                sb.append(sign);
            }
            log.info("set data:" +sb);
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            data = new KeyedMessage<String,String>(PropertiesSettings.TOPIC_NAME, ip, sb.toString());
            producer.send(data);
            count++;
        }
        producer.close();
    }
 
    public void run() {
        sendData();
    }
 
    public static void main(String[] args) {
        new ClusterProducer().sendData();
    }
}


定于Consumer获取端,获取对应topic的数据:

public class Consumerextends Thread {
    private static final Log log =LogFactory.getLog(Consumer.class);
    private final ConsumerConnector consumer;
    private final String topic;
 
    public Consumer(String topic) {
        consumer =kafka.consumer.Consumer.createJavaConsumerConnector(
                createConsumerConfig());
        this.topic = topic;
    }
 
    private static ConsumerConfigcreateConsumerConfig() {
        Properties props = new Properties();
       props.put("zookeeper.connect", KafkaProperties.zkConnect);
        props.put("group.id",KafkaProperties.groupId);
       props.put("zookeeper.session.timeout.ms", "400");
       props.put("zookeeper.sync.time.ms", "200");
       props.put("auto.commit.interval.ms", "1000");
 
        return new ConsumerConfig(props);
 
    }
 
    public void run() {
        Map<String, Integer>topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, newInteger(1));
        Map<String,List<KafkaStream<byte[], byte[]>>> consumerMap =consumer.createMessageStreams(topicCountMap);
        KafkaStream<byte[], byte[]>stream = consumerMap.get(topic).get(0);
        ConsumerIterator<byte[], byte[]>it = stream.iterator();
        while (it.hasNext()) {
            log.info("+message: " +new String(it.next().message()));
        }
    }
 
    public static void main(String[] args) {
        Consumer client = new Consumer("cluster_statistics_topic");
        client.

     辅助类:

public interface PropertiesSettings {

    final static String CONSUMER_FILE_NAME = "consumer.properties";
    final static String PRODUCER_FILE_NAME = "producer.properties";
    final static String TOPIC_NAME = "cluster_statistics_topic";
    final static String TOPIC_A = "cluster_statistics_topic_A";
}


package com.kafka.utils;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

/**
 * @author JohnLiu
 * @version 0.1.0
 * @date 2014/8/27
 */
public class PropertiesParser {

    private static final Log log = LogFactory.getLog(PropertiesParser.class);
    /* properties file type */
    Properties props = null;

    /* constructor method*/
    public PropertiesParser(Properties props) {
        this.props = props;
    }

    /**
     * Get the trimmed String value of the property with the given
     * <code>name</code>.  If the value the empty String (after
     * trimming), then it returns null.
     */
    public String getStringProperty(String name) {
        return getStringProperty(name, null);
    }

    /**
     * Get the trimmed String value of the property with the given
     * <code>name</code> or the given default value if the value is
     * null or empty after trimming.
     */
    public String getStringProperty(String name, String def) {
        String val = props.getProperty(name, def);
        if (val == null) {
            return def;
        }

        val = val.trim();

        return (val.length() == 0) ? def : val;
    }

    private Properties loadPropertiesFile() {
        Properties props = new Properties();
        InputStream in;
        ClassLoader cl = getClass().getClassLoader();
        if (cl == null)
            cl = findClassloader();
        if (cl == null)
            try {
                throw new ProcessingException("Unable to find a class loader on the current thread or class.");
            } catch (ProcessingException e) {
                e.printStackTrace();
            }
        in = cl.getResourceAsStream(PropertiesSettings.CONSUMER_FILE_NAME);
        try {
            props.load(in);
        } catch (IOException ioe) {
            log.error("can't load " + PropertiesSettings.CONSUMER_FILE_NAME, ioe);
        }
        return props;
    }

    private ClassLoader findClassloader() {
        // work-around set context loader for windows-service started jvms (QUARTZ-748)
        if (Thread.currentThread().getContextClassLoader() == null && getClass().getClassLoader() != null) {
            Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
        }
        return Thread.currentThread().getContextClassLoader();
    }

    public static Properties getProperties(final String fileName) {
        Properties props = new Properties();
        InputStream in = Thread.currentThread().getContextClassLoader()
                .getResourceAsStream(fileName);
        try {
            props.load(in);
        } catch (IOException ioe) {
            log.error("can't load " + fileName, ioe);
        }
        return props;
    }
}

      配置参数文件consumer.properties:

zookeeper.connect=bigdata09:2181,bigdata08:2181,bigdata07:2181
group.id=cluster_group
zookeeper.session.timeout.ms=400
zookeeper.sync.time.ms=200
auto.commit.interval.ms=1000



      配置参数文件producer.properties:

metadata.broker.list=bigdata09:9092,bigdata08:9092,bigdata07:9092
serializer.class=kafka.serializer.StringEncoder
#partitioner.class=com.kafka.producer.SimplePartitioner
request.required.acks=1


     分别执行上面的代码,可以发送或者得到对应topic信息。

     Enjoy yourself!(*^__^*) ……

作者:GreatElite 发表于2014-8-29 10:15:40 原文链接
阅读:85 评论:0 查看评论

相关 [kafka 编程 实例] 推荐:

Kafka编程实例

- - CSDN博客云计算推荐文章
    Producer是一个应用程序,它创建消息并发送它们到Kafka broker中. 这些producer在本质上是不同. 比如,前端应用程序,后端服务,代理服务,适配器对于潜在的系统,Hadoop对于的Producer. 这些不同的Producer能够使用不同的语言实现,比如java、C和Python.

kafka开发实例

- - 互联网 - ITeye博客
//启动zookeeper server (用&是为了能退出命令行):. //启动kafka server: . 已有 0 人发表留言,猛击->> 这里<<-参与讨论. —软件人才免语言低担保 赴美带薪读研.

(转)Kafka部署与代码实例

- - 开源软件 - ITeye博客
转自: http://shift-alt-ctrl.iteye.com/blog/1930791.   kafka作为分布式日志收集或系统监控服务,我们有必要在合适的场合使用它. kafka的部署包括zookeeper环境/kafka环境,同时还需要进行一些配置操作.接下来介绍如何使用kafka..

kafka监控之kafka-run-class.sh

- - 开源软件 - ITeye博客
kafka自带了很多工具类,在源码kafka.tools里可以看到:. 这些类该如何使用呢,kafka的设计者早就为我们考虑到了,在${KAFKA_HOME}/bin下,有很多的脚本,其中有一个kafka-run-class.sh,通过这个脚本,可以调用其中的tools的部分功能,如调用kafka.tools里的ConsumerOffsetChecker.scala,.

epoll网络编程实例

- - CSDN博客推荐文章
       在前面已经经过了PPC、TPC、select之类( TPC就是使用进程处理data,TPC就是使用线程处理 ),前面两个的缺点大家应该都是知道的是吧,对于select( 其实poll和他差不多 ),缺点是能同时连接的fd是在是不多,在linux中一般是1024/2048,对于很大的服务器来说是不够的.

闲扯kafka mq

- - 开源软件 - ITeye博客
本文主要讲解关于kafka mq的设计思想及个人理解. 关于kafka的详细信息,大家可以参考官网的文献 http://kafka.apache.org/documentation.html这是一篇相当不错的文章,值得仔细研读. 第一个问题:消息队列(Message Queue)是干嘛用的. 首先,要对消息队列有一个基本的理解.

Kafka优化

- - ITeye博客
配置优化都是修改server.properties文件中参数值. 1.网络和io操作线程配置优化. # broker处理消息的最大线程数. # broker处理磁盘IO的线程数. 一般num.network.threads主要处理网络io,读写缓冲区数据,基本没有io等待,配置线程数量为cpu核数加1.

Kafka Connect简介

- - 鸟窝
Kafka 0.9+增加了一个新的特性 Kafka Connect,可以更方便的创建和管理数据流管道. 它为Kafka和其它系统创建规模可扩展的、可信赖的流数据提供了一个简单的模型,通过 connectors可以将大数据从其它系统导入到Kafka中,也可以从Kafka中导出到其它系统. Kafka Connect可以将完整的数据库注入到Kafka的Topic中,或者将服务器的系统监控指标注入到Kafka,然后像正常的Kafka流处理机制一样进行数据流处理.

kafka consumer group offset

- - 开源软件 - ITeye博客
     kafka0.9及以前版本kafka offset 保存在zookeeper, 因频繁读写zookeeper性能不高;从0.10开始,主题分区offset存储于kafka独立主题中.     管理监控kafka主题及分区offset至关重要,原网上很开源流行工具KafkaOffsetMonitor、kafka-manager,旧版offset保存于zookeeper,kafka consumer无相应API,从kafka0.10.1.1以后提供相应API读取主题分区offset(也可以调用KafkaClient API,kafka管理API由scala语言编写).

Kafka设计解析(二):Kafka High Availability (上)

- -
Kafka在0.8以前的版本中,并不提供High Availablity机制,一旦一个或多个Broker宕机,则宕机期间其上所有Partition都无法继续提供服务. 若该Broker永远不能再恢复,亦或磁盘故障,则其上数据将丢失. 而Kafka的设计目标之一即是提供数据持久化,同时对于分布式系统来说,尤其当集群规模上升到一定程度后,一台或者多台机器宕机的可能性大大提高,对Failover要求非常高.