Storm Trident 学习

标签: storm trident 学习 | 发表时间:2015-10-05 00:00 | 作者:
分享到:
出处:http://yangxikun.github.io

Storm支持的三种语义:

  1. 至少一次
  2. 至多一次
  3. 有且仅有一次

至少一次语义的Topology写法


参考资料: Storm消息的可靠性保障 Storm提供了Acker的机制来保证数据至少被处理一次,是由编程人员决定是否使用这一特性,要使用这一特性需要:

  • 在Spout emit时添加一个MsgID,那么ack和fail方法将会被调用当Tuple被正确地处理了或发生了错误。 _collector.emit(new Values("field1", "field2", 3) , msgId);
  • 在Bolt emit时进行锚定。 _collector.emit(tuple, new Values(word));

Spout例子:

     
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class RandomSentenceSpout extends BaseRichSpout {
    private SpoutOutputCollector _collector;
    private Random _rand;
    private ConcurrentHashMap<UUID, Values> _pending;

    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        _collector = spoutOutputCollector;
        _rand = new Random();
        _pending = new ConcurrentHashMap<UUID, Values>();
    }

    public void nextTuple() {
        Utils.sleep(1000);
        String[] sentences = new String[] {
                "I write php",
                "I learning java",
                "I want to learn swool and tfs"
        };
        String sentence = sentences[_rand.nextInt(sentences.length)];
        Values v = new Values(sentence);
        UUID msgId = UUID.randomUUID();
        this._pending.put(msgId, v);
        _collector.emit(v, msgId);//发射tuple时,添加msgId

    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("world"));
    }

    public void ack(Object msgId) {
        this._pending.remove(msgId);//当消息被完整性地处理了

    }

    public void fail(Object msgId) {
        this._collector.emit(this._pending.get(msgId), msgId);//当消息处理失败了,重新发射,当然也可以做其他的逻辑处理

    }
}

Bolt例子:

     
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class SplitSentence extends BaseRichBolt {
    OutputCollector _collector;

    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        _collector = outputCollector;
    }

    public void execute(Tuple tuple) {
        String sentence = tuple.getString(0);
        for (String word : sentence.split(" "))
            _collector.emit(tuple, new Values(word));//发射tuple时进行锚定

        _collector.ack(tuple);//对处理完的tuple进行确认

    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("word"));
    }
}

至多一次的语义


如果只需要至多一次的语义,那么只需要在编程的时候不用Storm的Acker机制,可以通过如下方法达到:

  • 把Config.TOPOLOGY_ACKERS设置成0。在这种情况下,Storm会在Spout发射一个Tuple之后马上调用Spout的ack方法。也就是说这个Tuple树不会被跟踪。
  • 在Tuple层面去掉可靠性。 你可以在发射Tuple的时候不指定MessageID来达到不不跟踪这个Tuple的目的。
  • 如果你对于一个Tuple树里面的某一部分到底成不成功不是很关心,那么可以在Bolt发射这些Tuple的时候不锚定它们。这样这些Tuple就不在Tuple树里面,也就不会被跟踪了。

需要注意的点:每个待处理的 tuple 都必须显式地应答(ack)或者失效(fail)。因为 Storm 是使用内存来跟踪每个 tuple 的,所以,如果你不对每个 tuple 进行应答或者失效,那么负责跟踪的任务很快就会发生内存溢出。

有且仅有一次的语义


参考资料: Trident State Trident中的三种Spout,根据Spout代码情况可判断属于哪个: 事务型Spout的特点:

  • 每个 batch 的 txid 永远不会改变。对于某个特定的 txid,batch 在执行重新处理操作时所处理的 tuple 集和它的第一次处理操作完全相同。
  • 不同 batch 中的 tuple 不会出现重复的情况(某个 tuple 只会出现在一个 batch 中,而不会同时出现在多个 batch 中)。
  • 每个 tuple 都会放入一个 batch 中(处理操作不会遗漏任何的 tuple)。

模糊型Spout的特点:

  • 不同 batch 中的 tuple 不会出现重复的情况(某个 tuple 只会出现在一个 batch 中,而不会同时出现在多个 batch 中)。
  • 每个 tuple 都会放入一个 batch 中(处理操作不会遗漏任何的 tuple)。
  • 每个 tuple 都会通过某个 batch 处理完成。不过,在 tuple 处理失败的时候,tuple 有可能继续在另一个 batch 中完成处理,而不一定是在原先的 batch 中完成处理。

非事务型Spout的特点:

  • 非事务型 spout 有可能提供一种“至多一次”的处理模型,在这种情况下 batch 处理失败后 tuple 并不会重新处理。
  • 也有可能提供一种“至少一次”的处理模型,在这种情况下可能会有多个 batch 分别处理某个 tuple。

假如你的拓扑需要计算单词数,而且你准备将计数结果存入一个 K-V 型存储系统中。这里的 key 就是单词,value 对应于单词数。如果仅仅存储计数结果是无法确定某个batch中的tuple是否已经被处理过的,例如一个batch已经处理到最后一步并成功写入到了存储系统中,正准备Ack时挂了,那么重新发射这个batch后,被完整地处理了,batch中的tuple就被处理了多次,为了解决这个问题需要State的帮助,即存储的时候做一些处理。 Trident中的三种State,Storm已经提供了多中存储系统State的实现,如MemoryMapState、MemcachedState: 事务型State

  • 特点:对于特定的 txid 永远只与同一组 tuple 相关联。
  • 实现方式:将单词、txid与计数值一起存入数据库,每次更新时,比较txid是否相同,如果不相同则更新,否则不更新。

模糊型State

  • 特点:为了配合模糊型Spout达到有且仅有一次的语义而设计。
  • 实现方式:将单词、前一个结果值(preValue)、计数值和txid一起存入数据库,每次更新时,比较txid,若不同,则记录更新为 (单词, 存储的计数值作为preValue, 存储的计数值+batch计算出来的值作为计数值, batch的txid);若相同,则记录更新为 (单词, preValue, preValue加上batch计算出来的值作为计数值, txid)

非事务型State

  • 特点:不能保证有且仅有一次的语义
  • 实现方式:数据库只记录了单词,计数值

不同的Spout类型与不同的State类型的组合是否支持有且仅有一次的消息处理语义:

上图中的Yes表示组合支持有且仅有一次的语义。

相关 [storm trident 学习] 推荐:

Storm Trident 学习

- - 小火箭
Storm支持的三种语义:. 至少一次语义的Topology写法. 参考资料: Storm消息的可靠性保障 Storm提供了Acker的机制来保证数据至少被处理一次,是由编程人员决定是否使用这一特性,要使用这一特性需要:. 在Spout emit时添加一个MsgID,那么ack和fail方法将会被调用当Tuple被正确地处理了或发生了错误.

storm简介

- - 搜索技术博客-淘宝
伴随着信息科技日新月异的发展,信息呈现出爆发式的膨胀,人们获取信息的途径也更加多样、更加便捷,同时对于信息的时效性要求也越来越高. 举个搜索场景中的例子,当一个卖家发布了一条宝贝信息时,他希望的当然是这个宝贝马上就可以被卖家搜索出来、点击、购买啦,相反,如果这个宝贝要等到第二天或者更久才可以被搜出来,估计这个大哥就要骂娘了.

Storm实战之WordCount

- - 编程语言 - ITeye博客
 在全面介绍Storm之前,我们先通过一个简单的Demo让大家整体感受一下什么是Storm. 本地模式(Local Mode): 即Topology(相当于一个任务,后续会详细讲解)  运行在本地机器的单一JVM上,这个模式主要用来开发、调试. 远程模式(Remote Mode):在这个模式,我们把我们的Topology提交到集群,在这个模式中,Storm的所有组件都是线程安全的,因为它们都会运行在不同的Jvm或物理机器上,这个模式就是正式的生产模式.

storm常见问题解答

- - BlogJava-庄周梦蝶
    最近有朋友给我邮件问一些storm的问题,集中解答在这里. 一、我有一个数据文件,或者我有一个系统里面有数据,怎么导入storm做计算. 你需要实现一个Spout,Spout负责将数据emit到storm系统里,交给bolts计算. 怎么实现spout可以参考官方的kestrel spout实现:.

Storm 实时性分析

- - CSDN博客架构设计推荐文章
都说Storm是一个实时流处理系统,但Storm的实时性体现在什么方面呢. 首先有一个前提:这里的实时性和我们通常所说的实时系统(芯片+汇编或C编写的实时处理软件)的实时性肯定是没法比的,也不是同一个概念. 这里的实时性应该是一个相对的实时性(相对于Hadoop之类 ). 总结一下,Storm的实时性可能主要体现在:.

那些storm的坑坑

- - 开源软件 - ITeye博客
转载请声明出处:http://blackwing.iteye.com/blog/2147633. 在使用storm的过程中,感觉它还是不如hadoop那么成熟. 当然,它的流式处理能力挺让人眼前一亮,以前做的个性化推荐都是离线计算,现在总算把实时部分也加上了. 总结一下storm使用的些心得:. 1.尽量把大量数据处理行为分拆成多个处理component.

storm准实时应用

- - CSDN博客推荐文章
1 应用背景: 需要实时统计用户的登陆数,在线人数,活跃时间,下载等指标的数据,或者清洗后移到hdfs上.         1) 客户端产生数据---.         2) kafka-生产者实时采集数据(保留7天)-----.         3) storm实时消费数据,处理数据.         4)把实时数据统计结果缓存到memcached 中.

Storm核心概念剖析

- - 互联网 - ITeye博客
最近团队中有分析的场景,用到了JStorm来做数据的实时分析,于是花时间对于一些概念做了了解. 这个的话出来应该有几年时间了,阿里巴巴也重写了一套JStorm,核心的类名都是服用的Storm的,他是一套实时数据处理系统,容错行好,然后足够稳定,目前很多数据实时分析的场景,选择Storm的越来越多了.

Kafka+Storm+HDFS整合实践

- -
原文地址: http://shiyanjun.cn/archives/934.html. 在基于Hadoop平台的很多应用场景中,我们需要对数据进行离线和实时分析,离线分析可以很容易地借助于Hive来实现统计分析,但是对于实时的 需求Hive就不合适了. 实时应用场景可以使用Storm,它是一个实时处理系统,它为实时处理类应用提供了一个计算模型,可以很容易地进行编程处理.

storm高并发UV统计

- - 企业架构 - ITeye博客
统计高并发UV可行的方案(类似WordCount的计算去重word总数):. bolt1通过fieldGrouping 进行多线程局部汇总,下一级blot2进行单线程保存session_id和count数到Map且进行遍历,可以得到:. Pv、UV、访问深度(按每个session_id 的浏览数).