storm笔记 -- 与kafka的集成

标签: storm 笔记 kafka | 发表时间:2014-10-21 21:17 | 作者:yaochitc
出处:http://www.iteye.com

   storm与kafka的结合,即前端的采集程序将实时数据源源不断采集到队列中,而storm作为消费者拉取计算,是典型的应用场景。因此,storm的发布包中也包含了一个集成jar,支持从kafka读出数据,供storm应用使用。这里结合自己的应用做个简单总结。

  由于storm已经提供了storm-kafka,因此可以直接使用,使用kafka的低级api读取数据。如果有需要的话,自己实现也并不困难。使用方法如下:

  

// 设置kafka的zookeeper集群
BrokerHosts hosts = new ZkHosts("10.1.80.249:2181,10.1.80.250:2181,10.1.80.251:2181/kafka");
// 初始化配置信息
SpoutConfig conf = new SpoutConfig(hosts, "topic", "/zkroot","topo");
// 在topology中设置spout
builder.setSpout("kafka-spout", new KafkaSpout(conf));

   

  这里需要注意的是,spout会根据config的后面两个参数在zookeeper上为每个kafka分区创建保存读取偏移的节点,如:/zkroot/topo/partition_0。默认情况下,spout下会发射域名为bytes的binary数据,如果有需要,可以通过设置schema进行修改。

 

  如上面所示,使用起来还是很简单的,下面简单的分析一下实现细节。

  (1) 初始化:

/**
    KafkaSpout.open
**/
// 初始化用于读写zookeeper的客户端对象_state 
Map stateConf = new HashMap(conf);
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers);
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort);
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot);
_state = new ZkState(stateConf);

// 初始化用于读取kafka数据coordinator,真正数据读取使用的是内部的PartitionManager
_coordinator = new ZkCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);
 

  (2) 读取数据:

/**
    KafkaSpout.nextTuple
**/
// 通过各个分区对应的PartitionManager读取数据 
List<PartitionManager> managers = _coordinator.getMyManagedPartitions();
for (int i = 0; i < managers.size(); i++) {
            // in case the number of managers decreased
            _currPartitionIndex = _currPartitionIndex % managers.size();
           // 调用manager的next方法读取数据并emit
           EmitState state = managers.get(_currPartitionIndex).next(_collector);
}

// 提交读取到的位置到zookeeper
long now = System.currentTimeMillis();
if((now - _lastUpdateMs) > _spoutConfig.stateUpdateIntervalMs) {
    commit();
}

 

 (3) ack和fail:

/**
    KafkaSpout.ack
**/
KafkaMessageId id = (KafkaMessageId) msgId;
PartitionManager m = _coordinator.getManager(id.partition);
if (m != null) {
    //调用PartitionManager的ack
    m.ack(id.offset);
}

/**
    KafkaSpout.fail
**/
KafkaMessageId id = (KafkaMessageId) msgId;
PartitionManager m = _coordinator.getManager(id.partition);
if (m != null) {
    //调用PartitionManager的fail
    m.fail(id.offset);
}
 
   可以看出,主要的逻辑都在PartitionManager这个类中。下面对它做个简单的分析:
  (1) 构造:
//从zookeeper中读取上一次的偏移
Map<Object, Object> json = _state.readJSON(path);
//根据当前时间获取一个偏移
Long currentOffset = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig);

//maxOffsetBehind为两个偏移的最大范围,如果超过这个范围,则用最新偏移覆盖读取偏移,两个偏移间的数据会被丢弃。如果不希望这样,应该将它设置成一个较大的值或者MAX_VALUE
if (currentOffset - _committedTo > spoutConfig.maxOffsetBehind || _committedTo <= 0) {
    _committedTo = currentOffset;
}

//初始化当前偏移
_emittedToOffset = _committedTo;
   
  (2) next和fill:
/**
    PartitionManager.next
**/
//调用fill填充待发送队列
if (_waitingToEmit.isEmpty()) {
    fill();
}

//发送数据
while (true) {
    MessageAndRealOffset toEmit = _waitingToEmit.pollFirst();
    Iterable<List<Object>> tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg);
    if (tups != null) {
        for (List<Object> tup : tups) {
            collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));
        }
        break;
    } else {
        ack(toEmit.offset);
    }
}

/**
    PartitionManager.fill
**/
//初始化当前偏移,读取消息
if (had_failed) {
    //先处理失败的偏移
    offset = failed.first();
} else {
    offset = _emittedToOffset;
}
ByteBufferMessageSet msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset);
for (MessageAndOffset msg : msgs) {
    final Long cur_offset = msg.offset();
    if (cur_offset < offset) {
        // Skip any old offsets.
        continue;
    }
    if (!had_failed || failed.contains(cur_offset)) {
        numMessages += 1;
        //将偏移添加到pending中
        _pending.add(cur_offset);
        //将消息添加到待发送中
        _waitingToEmit.add(new MessageAndRealOffset(msg.message(), cur_offset));
        _emittedToOffset = Math.max(msg.nextOffset(), _emittedToOffset);
        if (had_failed) {
            failed.remove(cur_offset);
        }
    }
}
   
  (3) ack和fail
/**
    PartitionManager.ack
**/
//从_pending中移除该偏移,如果该偏移与当前偏移的差大于maxOffsetBehind,则清空pending
if (!_pending.isEmpty() && _pending.first() < offset - _spoutConfig.maxOffsetBehind) {
    // Too many things pending!
    _pending.headSet(offset).clear();
} else {
    _pending.remove(offset);
}
numberAcked++;

/**
  PartitionManager.fail
**/
//将偏移添加到失败队列中
failed.add(offset);
numberFailed++;
 
   最后,加上一张图做个总结:

 


已有 0 人发表留言,猛击->> 这里<<-参与讨论


ITeye推荐



相关 [storm 笔记 kafka] 推荐:

storm笔记 -- 与kafka的集成

- - 开源软件 - ITeye博客
   storm与kafka的结合,即前端的采集程序将实时数据源源不断采集到队列中,而storm作为消费者拉取计算,是典型的应用场景. 因此,storm的发布包中也包含了一个集成jar,支持从kafka读出数据,供storm应用使用. 这里结合自己的应用做个简单总结.   由于storm已经提供了storm-kafka,因此可以直接使用,使用kafka的低级api读取数据.

Kafka+Storm+HDFS整合实践

- -
原文地址: http://shiyanjun.cn/archives/934.html. 在基于Hadoop平台的很多应用场景中,我们需要对数据进行离线和实时分析,离线分析可以很容易地借助于Hive来实现统计分析,但是对于实时的 需求Hive就不合适了. 实时应用场景可以使用Storm,它是一个实时处理系统,它为实时处理类应用提供了一个计算模型,可以很容易地进行编程处理.

大数据架构:flume-ng+Kafka+Storm+HDFS 实时系统组合

- - 行业应用 - ITeye博客
大数据我们都知道hadoop,但并不都是hadoop.我们该如何构建大数据库项目. 对于离线处理,hadoop还是比较适合的,但是对于实时性比较强的,数据量比较大的,我们可以采用Storm,那么Storm和什么技术搭配,才能够做一个适合自己的项目. 可以带着下面问题来阅读本文章:. 1.一个好的项目架构应该具备什么特点.

storm、hbase、kafka整合过程中遇到的log4j冲突问题

- - 行业应用 - ITeye博客
storm、hbase、kafka整合过程中遇到的log4j冲突问题. log4j-over-slf4j.jar AND slf4j-log4j12.jar 循环调用冲突了,再进一步原因是kafka、hbase中用的是log4j. * 方案一:把storm中的log4j-over-slf4j 依赖排除;.

Kafka笔记—可靠性、幂等性和事务 - luozhiyun - 博客园

- -
这几天很忙,但是我现在给我的要求是一周至少要出一篇文章,所以先拿这篇笔记来做开胃菜,源码分析估计明后两天应该能写一篇. Kafka只对“已提交”的消息(committed message)做有限度的持久化保证. 当Kafka的若干个Broker成功地接收到一条消息并写入到日志文件后,它们会告诉生产者程序这条消息已成功提交.

storm简介

- - 搜索技术博客-淘宝
伴随着信息科技日新月异的发展,信息呈现出爆发式的膨胀,人们获取信息的途径也更加多样、更加便捷,同时对于信息的时效性要求也越来越高. 举个搜索场景中的例子,当一个卖家发布了一条宝贝信息时,他希望的当然是这个宝贝马上就可以被卖家搜索出来、点击、购买啦,相反,如果这个宝贝要等到第二天或者更久才可以被搜出来,估计这个大哥就要骂娘了.

Storm Trident 学习

- - 小火箭
Storm支持的三种语义:. 至少一次语义的Topology写法. 参考资料: Storm消息的可靠性保障 Storm提供了Acker的机制来保证数据至少被处理一次,是由编程人员决定是否使用这一特性,要使用这一特性需要:. 在Spout emit时添加一个MsgID,那么ack和fail方法将会被调用当Tuple被正确地处理了或发生了错误.

Storm实战之WordCount

- - 编程语言 - ITeye博客
 在全面介绍Storm之前,我们先通过一个简单的Demo让大家整体感受一下什么是Storm. 本地模式(Local Mode): 即Topology(相当于一个任务,后续会详细讲解)  运行在本地机器的单一JVM上,这个模式主要用来开发、调试. 远程模式(Remote Mode):在这个模式,我们把我们的Topology提交到集群,在这个模式中,Storm的所有组件都是线程安全的,因为它们都会运行在不同的Jvm或物理机器上,这个模式就是正式的生产模式.

kafka监控之kafka-run-class.sh

- - 开源软件 - ITeye博客
kafka自带了很多工具类,在源码kafka.tools里可以看到:. 这些类该如何使用呢,kafka的设计者早就为我们考虑到了,在${KAFKA_HOME}/bin下,有很多的脚本,其中有一个kafka-run-class.sh,通过这个脚本,可以调用其中的tools的部分功能,如调用kafka.tools里的ConsumerOffsetChecker.scala,.

闲扯kafka mq

- - 开源软件 - ITeye博客
本文主要讲解关于kafka mq的设计思想及个人理解. 关于kafka的详细信息,大家可以参考官网的文献 http://kafka.apache.org/documentation.html这是一篇相当不错的文章,值得仔细研读. 第一个问题:消息队列(Message Queue)是干嘛用的. 首先,要对消息队列有一个基本的理解.