【Twitter Storm系列】 Storm简单实例讲解

标签: twitter storm 系列 | 发表时间:2013-12-19 03:22 | 作者:WeiJonathan
出处:http://blog.csdn.net

实例来自书籍《Oreilly.Getting.Started.with.Storm.Aug.2012》

先讲下我们这次所需涉及到的概念:Topology、Spout、Blot

Topology:Storm的运行单位,相当于Hadoop中的job,一个topology是spouts和bolts组成的图, 通过stream groupings将图中的spouts和bolts连接起来


Spout:消息源,topology里面的消息生产者,一般来说消息源会从一个外部源读取数据并且向topology里面发出消息:tuple。

Blot:所有的消息处理逻辑被封装在bolts里面。Bolts可以做很多事情:过滤,聚合,查询数据库等等。

一下是实例的流程图


words.txt--将要执行wordcount操作的文件

storm
test
are
great
is
an
storm
simple
application
but
very
powerfull
really
StOrm
is
great

1、Topology的创建

 	//Topology definition
		TopologyBuilder builder = new TopologyBuilder();
		builder.setSpout("word-reader",new WordReader());
		builder.setBolt("word-normalizer", new WordNormalizer())
			.shuffleGrouping("word-reader");
		builder.setBolt("word-counter", new WordCounter(),1)
			.fieldsGrouping("word-normalizer", new Fields("word"));
		
        //Configuration
		Config conf = new Config();
		conf.put("wordsFile", args[0]);
		conf.setDebug(false);
        //Topology run
		conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
		LocalCluster cluster = new LocalCluster();
		cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());
		Thread.sleep(1000);
		cluster.shutdown();
从上面代码我们可以看到,Topology是通过TopologyBuilder的createTopology来创建。通过TopologyBuilder对象设置Spout以及Blot。

Config对象用于设置集群计算所需的参数,这里的参数是要执行wordcount操作的文件路径。

例子是以本地集群方式运行。

Topology提交通过cluster对象的submitTopology方法来提交。参数包括任务名、配置、以及Topology对象。

注:我们可以看下这里的TopologyBuilder对象的创建。

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word-reader",new WordReader());
builder.setBolt("word-normalizer", new WordNormalizer())
	.shuffleGrouping("word-reader");
builder.setBolt("word-counter", new WordCounter(),1)
	.fieldsGrouping("word-normalizer", new Fields("word"));
可以看出,这里有一个流程的执行关系。也可以说是一定订阅关系。第一个blot设置了shuffleGrouping的名称为Spout的名称,而第二个blot则指定了第一个blot的名称。就如我们上面的流程图执行流程一致。

2、Spout(WordReader)

package spouts;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

public class WordReader extends BaseRichSpout {

	private SpoutOutputCollector collector;
	private FileReader fileReader;
	private boolean completed = false;
	public void ack(Object msgId) {
		System.out.println("OK:"+msgId);
	}
	public void close() {}
	public void fail(Object msgId) {
		System.out.println("FAIL:"+msgId);
	}

	/**
	 * The only thing that the methods will do It is emit each 
	 * file line
	 */
	public void nextTuple() {
		/**
		 * The nextuple it is called forever, so if we have been readed the file
		 * we will wait and then return
		 */
		if(completed){
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				//Do nothing
			}
			return;
		}
		String str;
		//Open the reader
		BufferedReader reader = new BufferedReader(fileReader);
		try{
			//Read all lines
			while((str = reader.readLine()) != null){
				/**
				 * By each line emmit a new value with the line as a their
				 */
				this.collector.emit(new Values(str),str);
			}
		}catch(Exception e){
			throw new RuntimeException("Error reading tuple",e);
		}finally{
			completed = true;
		}
	}

	/**
	 * We will create the file and get the collector object
	 */
	public void open(Map conf, TopologyContext context,
			SpoutOutputCollector collector) {
		try {
			this.fileReader = new FileReader(conf.get("wordsFile").toString());
		} catch (FileNotFoundException e) {
			throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]");
		}
		this.collector = collector;
	}

	/**
	 * Declare the output field "line"
	 */
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("line"));
	}
}
* declareOutputFields方法用于定义输出字段名

* 首先集群提交Topology之后,会先调用open方法,open内部使用config对象获取文件并获取fileReader对象。

* nextTuple()方法通过BufferReader读取到fileReader数据之后,读取每一行数据,然后通过emit方法发送数据到订阅了数据的blot上。
3、第一个Blot(拆分出所有的wordcount传入下一个blot)

package bolts;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class WordNormalizer extends BaseBasicBolt {

	public void cleanup() {}

	/**
	 * The bolt will receive the line from the
	 * words file and process it to Normalize this line
	 * 
	 * The normalize will be put the words in lower case
	 * and split the line to get all words in this 
	 */
	public void execute(Tuple input, BasicOutputCollector collector) {
        String sentence = input.getString(0);
        String[] words = sentence.split(" ");
        for(String word : words){
            word = word.trim();
            if(!word.isEmpty()){
                word = word.toLowerCase();
                collector.emit(new Values(word));
            }
        }
	}
	

	/**
	 * The bolt will only emit the field "word" 
	 */
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("word"));
	}
}
这个blot基本上没做什么事情,只是在execute方法中将words切分然后emit到下一个blot

4、第二个blot(执行单词统计)

package bolts;

import java.util.HashMap;
import java.util.Map;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;

public class WordCounter extends BaseBasicBolt {

	Integer id;
	String name;
	Map<String, Integer> counters;

	/**
	 * At the end of the spout (when the cluster is shutdown
	 * We will show the word counters
	 */
	@Override
	public void cleanup() {
		System.out.println("-- Word Counter ["+name+"-"+id+"] --");
		for(Map.Entry<String, Integer> entry : counters.entrySet()){
			System.out.println(entry.getKey()+": "+entry.getValue());
		}
	}

	/**
	 * On create 
	 */
	@Override
	public void prepare(Map stormConf, TopologyContext context) {
		this.counters = new HashMap<String, Integer>();
		this.name = context.getThisComponentId();
		this.id = context.getThisTaskId();
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {}


	@Override
	public void execute(Tuple input, BasicOutputCollector collector) {
		String str = input.getString(0);
		/**
		 * If the word dosn't exist in the map we will create
		 * this, if not We will add 1 
		 */
		if(!counters.containsKey(str)){
			counters.put(str, 1);
		}else{
			Integer c = counters.get(str) + 1;
			counters.put(str, c);
		}
	}
}
这里定义了一个HashMap counters用于存储单词的统计结果。在execute方法中执行单词统计。


转载请注明来源地址:http://blog.csdn.net/weijonathan/article/details/17399077



作者:WeiJonathan 发表于2013-12-18 19:22:39 原文链接
阅读:84 评论:0 查看评论

相关 [twitter storm 系列] 推荐:

【Twitter Storm系列】 Storm简单实例讲解

- - CSDN博客云计算推荐文章
实例来自书籍《Oreilly.Getting.Started.with.Storm.Aug.2012》. 先讲下我们这次所需涉及到的概念:Topology、Spout、Blot. Topology:Storm的运行单位,相当于Hadoop中的job,一个topology是spouts和bolts组成的图, 通过stream groupings将图中的spouts和bolts连接起来.

Twitter Storm安装配置(单机版)

- - 开源软件 - ITeye博客
本篇幅讲述Twitter Storm安装配置,也作为自己的笔记. storm的官方安装说明(e文):. storm的安装分为单机版和集群版,只是配置稍微有点区别,大致一样. 要使用storm首先要安装以下工具:. 第一步,安装Python2.7.2. 追加/usr/local/lib/. 第二步,安装zookeeper   .

Storm系列之最基本的例子

- - CSDN博客云计算推荐文章
        本文会从如何写一个Storm的topology开始,来对Storm实现的细节进行阐述. 我们用Maven来管理项目,方便lib依赖的引用和版本控制. 建立最基本的pom.xml如下:. 这里我额外添加了两个build 插件:. maven-compiler-plugin : 为了方便指定编译时jdk.

Twitter即将开源即时数据处理工具Storm

- Andy - 36氪
开发者的好消息:Twitter刚刚在博客上宣布将在9月19日的Strange Loop大会上公布Storm的代码. 这个类似于Hadoop的即时数据处理工具是BackType开发的,后来被Twitter收购用于Twitter. Twitter列举了Storm的三大类应用:. 信息流处理{Stream processing}: Storm可用来实时处理新数据和更新数据库,兼具容错性和可扩展性.

Storm :twitter的实时数据处理工具

- d0ngd0ng - yiihsia[互联网后端技术]_yiihsia[互联网后端技术]
昨天在家里一直发不出文章,于是干脆先发到了iteye上. Twitter在9月19日的Strange Loop大会上公布Storm的代码. 这个类似于Hadoop的即时数据处理工具是BackType开发的,后来被Twitter收购用于Twitter. Twitter列举了Storm的三大类应用:. 1. 信息流处理{Stream processing}.

开放实时数据处理平台 Twitter Storm

- We_Get - 开源中国社区最新软件
Storm 代码来自于Twitter上月收购的BackType,似乎是Twitter为方便用户解析数据的努力. 现在Storm的势头相当强劲,Twitter开发的使其完美的工具,已经变得非常强大. 类似于Hadoop,另一个开源数据操作平台,Storm也可能成为一项大业务. 据报道,雅虎正在考虑分拆Hadoop,打造一个规模达数十亿美元的业务.

storm简介

- - 搜索技术博客-淘宝
伴随着信息科技日新月异的发展,信息呈现出爆发式的膨胀,人们获取信息的途径也更加多样、更加便捷,同时对于信息的时效性要求也越来越高. 举个搜索场景中的例子,当一个卖家发布了一条宝贝信息时,他希望的当然是这个宝贝马上就可以被卖家搜索出来、点击、购买啦,相反,如果这个宝贝要等到第二天或者更久才可以被搜出来,估计这个大哥就要骂娘了.

Storm Trident 学习

- - 小火箭
Storm支持的三种语义:. 至少一次语义的Topology写法. 参考资料: Storm消息的可靠性保障 Storm提供了Acker的机制来保证数据至少被处理一次,是由编程人员决定是否使用这一特性,要使用这一特性需要:. 在Spout emit时添加一个MsgID,那么ack和fail方法将会被调用当Tuple被正确地处理了或发生了错误.

Storm实战之WordCount

- - 编程语言 - ITeye博客
 在全面介绍Storm之前,我们先通过一个简单的Demo让大家整体感受一下什么是Storm. 本地模式(Local Mode): 即Topology(相当于一个任务,后续会详细讲解)  运行在本地机器的单一JVM上,这个模式主要用来开发、调试. 远程模式(Remote Mode):在这个模式,我们把我们的Topology提交到集群,在这个模式中,Storm的所有组件都是线程安全的,因为它们都会运行在不同的Jvm或物理机器上,这个模式就是正式的生产模式.

storm常见问题解答

- - BlogJava-庄周梦蝶
    最近有朋友给我邮件问一些storm的问题,集中解答在这里. 一、我有一个数据文件,或者我有一个系统里面有数据,怎么导入storm做计算. 你需要实现一个Spout,Spout负责将数据emit到storm系统里,交给bolts计算. 怎么实现spout可以参考官方的kestrel spout实现:.