Storm系列之最基本的例子

标签: storm 系列 | 发表时间:2013-12-31 03:41 | 作者:xeseo
出处:http://blog.csdn.net

前言:

        本文会从如何写一个Storm的topology开始,来对Storm实现的细节进行阐述。避免干巴巴的讲理论。


1. 建立Maven项目

我们用Maven来管理项目,方便lib依赖的引用和版本控制。

建立最基本的pom.xml如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.edi.storm</groupId>
<artifactId>storm-samples</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>


<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>


<repositories>
<repository>
<id>clojars.org</id>
<url>http://clojars.org/repo</url>
</repository>
</repositories>


<build>
<finalName>storm-samples</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>


<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>


</plugin>
</plugins>
</build>


<dependencies>
<dependency>
<groupId>storm</groupId>
<artifactId>storm</artifactId>
<version>0.9.0-rc2</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
这里我额外添加了两个build 插件:

maven-compiler-plugin : 为了方便指定编译时jdk。Storm的依赖包里面某些是jdk1.5的.

和 

maven-assembly-plugin: 为了把所有依赖包最后打到一个jar包去,方便测试和部署。后面会提到如果不想打到一个jar该怎么做。


2. 建立Spout

前文提到过,Storm中的spout负责发射数据。

我们来实现这样一个spout:

它会随机发射一系列的句子,句子的格式是 谁:说的话

代码如下:

public class RandomSpout extends BaseRichSpout {

	private SpoutOutputCollector collector;

	private Random rand;
	
	private static String[] sentences = new String[] {"edi:I'm happy", "marry:I'm angry", "john:I'm sad", "ted:I'm excited", "laden:I'm dangerous"};
	
	@Override
	public void open(Map conf, TopologyContext context,
			SpoutOutputCollector collector) {
		this.collector = collector;
		this.rand = new Random();
	}

	@Override
	public void nextTuple() {
		String toSay = sentences[rand.nextInt(sentences.length)];
		this.collector.emit(new Values(toSay));
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("sentence"));
	}

}

这里要先理解Tuple的概念。

Storm中,基本元数据是靠Tuple才承载的。或者说,Tuple是数据的一个大抽象。它要求实现类必须能序列化。


该Spout代码里面最核心的部分有两个:

a. 用collector.emit()方法发射tuple。我们不用自己实现tuple,我们只需要定义tuple的value,Storm会帮我们生成tuple。Values对象接受变长参数。Tuple中以List存放Values,List的Index按照new Values(obj1, obj2,...)的参数的index,例如我们emit(new Values("v1", "v2")), 那么Tuple的属性即为:{ [ "v1" ], [ "V2" ] }

b. declarer.declare方法用来给我们发射的value在整个Stream中定义一个别名。可以理解为key。该值必须在整个topology定义中唯一。


3. 建立Bolt

既然有了源,那么我们就来建立节点处理源流出来的数据。怎么处理呢?为了演示,我们来做些无聊的事情:末尾添加"!",然后打印。

两个功能,两个Bolt。

先看添加"!"的Bolt

public class ExclaimBasicBolt extends BaseBasicBolt {

	@Override
	public void execute(Tuple tuple, BasicOutputCollector collector) {
		//String sentence = tuple.getString(0);
		String sentence = (String) tuple.getValue(0);
		String out = sentence + "!";
		collector.emit(new Values(out));
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("excl_sentence"));
	}

}

在RandomSpout中,我们发射的Tuple具有这样的属性 { [ "edi:I'm Happy" ] }, 所以tuple的value list中第0个值,肯定是个String。我们用tuple.getvalue(0)取到。

Storm为tuple封装了一些方法方便我们取一些基本类型,例如String,我们可以直接用getString(int N) 。

取到以后,我们在末尾添加"!"后,仍然发射一个Tuple,定义其唯一的value的field 名字为"excl_sentence"


打印Bolt

public class PrintBolt extends BaseBasicBolt {

	@Override
	public void execute(Tuple tuple, BasicOutputCollector collector) {
		String rec = tuple.getString(0);
		System.err.println("String recieved: " + rec);
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		// do nothing
	}

}

仍然是取第一个,因为我们并没有定义过第二个value


4. 建立Topology

现在我们建立拓扑结构的主要组件都有了,可以创建topology了。

public class ExclaimBasicTopo {

	public static void main(String[] args) throws Exception {
		TopologyBuilder builder = new TopologyBuilder();
		
		builder.setSpout("spout", new RandomSpout());
		builder.setBolt("exclaim", new ExclaimBasicBolt()).shuffleGrouping("spout");
		builder.setBolt("print", new PrintBolt()).shuffleGrouping("exclaim");

		Config conf = new Config();
		conf.setDebug(false);

		if (args != null && args.length > 0) {
			conf.setNumWorkers(3);

			StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
		} else {

			LocalCluster cluster = new LocalCluster();
			cluster.submitTopology("test", conf, builder.createTopology());
			Utils.sleep(100000);
			cluster.killTopology("test");
			cluster.shutdown();
		}
	}
}

很简单,对吧。

其中,

builder.setSpout("spout", new RandomSpout());
定义一个spout,id为"spout"
builder.setBolt("exclaim", new ExclaimBasicBolt()).shuffleGrouping("spout"); 
定义了一个id为"exclaim"的bolt,并且按照随机分组获得"spout"发射的tuple

builder.setBolt("print", new PrintBolt()).shuffleGrouping("exclaim");
定义了一个id为"print"的bolt,并且按照随机分组获得"exclaim”发射出来的tuple

.shuffleGrouping
是指明Storm按照何种策略将tuple分配到后续的bolt去。

可以看到,如果我们运行时不带参数,是把topology提交到了LocalCluster的,即所有的task都在一个本地JVM去执行。可以用LocalCluster来调试。如果后面带一个参数,即为该topology的名字,那么就把该topology提交到集群上去了。

把项目用M2E插件导入Eclipse直接运行试试

String recieved: marry:I'm angry!
String recieved: edi:I'm happy!
String recieved: john:I'm sad!
String recieved: edi:I'm happy!
String recieved: ted:I'm excited!
String recieved: laden:I'm dangerous!
String recieved: edi:I'm happy!
String recieved: edi:I'm happy!

这里我们并没有指定并行,那么其实是每个spout、bolt仅有一个线程对应去执行。

我们修改下代码,指定并行数

		builder.setBolt("exclaim", new ExclaimBasicBolt(), 2).shuffleGrouping("spout");
		builder.setBolt("print", new PrintBolt(),3).shuffleGrouping("exclaim");

由于我们并没有多指定task数目,所以默认,会有两个exectuor去执行两个exclaimBasicBolt的task,3个executor去执行3个PrintBolt的task。

为了方便体现确实是并行,我们修改PrintBolt代码如下:

public class PrintBolt extends BaseBasicBolt {

    private int indexId;
	
	@Override
	public void prepare(Map stormConf, TopologyContext context) {
		this.indexId = context.getThisTaskIndex();
    }

	@Override
	public void execute(Tuple tuple, BasicOutputCollector collector) {
		String rec = tuple.getString(0);
		System.err.println(String.format("Bolt[%d] String recieved: %s",this.indexId, rec));
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		// do nothing
	}

}
这里从上下文中拿到该Bolt的TaskIndex,我们指定了3的并发度,所以理论上有3个task,那么该值应该为[1,2,3]。

运行下看看:

Bolt[0] String recieved: marry:I'm angry!
Bolt[2] String recieved: john:I'm sad!
Bolt[2] String recieved: ted:I'm excited!
Bolt[2] String recieved: john:I'm sad!
Bolt[2] String recieved: john:I'm sad!

证实确实是并发了。

本地测试通过了,我们用 mvn clean install 命令编译,然后把target目录下生成的 storm-samples-jar-with-dependencies.jar 拷到nimbus机器上,执行

./storm jar storm-samples-jar-with-dependencies.jar com.edi.storm.topos.ExclaimBasicTopo test
在StormUI里面,点进 test


看到spout 已然已经emit了 11347280个tuple了…… 而id为exclaim的bolt也已经接受了2906920个tuple了。print没有输出,所以emit为0。



截止到这里,一个简单的Storm的topology已经完成了。

但是,这里依然有些问题:

1. 什么是acker?

2. Bolt为什么有两个继承类和接口?

3. Topology的提交方式到底有几种?

4. 除了随机分组,还有哪些分组策略?

5. Storm是如何保证tuple不被丢失的?

6. 我看到spout发送数据比bolt处理的速度快太多了,我能不能在spout里面sleep?

7. 并发数要如何指定呢?

要知后事如何,且听下回分解~


作者:xeseo 发表于2013-12-30 19:41:11 原文链接
阅读:76 评论:0 查看评论

相关 [storm 系列] 推荐:

【Twitter Storm系列】 Storm简单实例讲解

- - CSDN博客云计算推荐文章
实例来自书籍《Oreilly.Getting.Started.with.Storm.Aug.2012》. 先讲下我们这次所需涉及到的概念:Topology、Spout、Blot. Topology:Storm的运行单位,相当于Hadoop中的job,一个topology是spouts和bolts组成的图, 通过stream groupings将图中的spouts和bolts连接起来.

Storm系列之最基本的例子

- - CSDN博客云计算推荐文章
        本文会从如何写一个Storm的topology开始,来对Storm实现的细节进行阐述. 我们用Maven来管理项目,方便lib依赖的引用和版本控制. 建立最基本的pom.xml如下:. 这里我额外添加了两个build 插件:. maven-compiler-plugin : 为了方便指定编译时jdk.

storm简介

- - 搜索技术博客-淘宝
伴随着信息科技日新月异的发展,信息呈现出爆发式的膨胀,人们获取信息的途径也更加多样、更加便捷,同时对于信息的时效性要求也越来越高. 举个搜索场景中的例子,当一个卖家发布了一条宝贝信息时,他希望的当然是这个宝贝马上就可以被卖家搜索出来、点击、购买啦,相反,如果这个宝贝要等到第二天或者更久才可以被搜出来,估计这个大哥就要骂娘了.

Storm Trident 学习

- - 小火箭
Storm支持的三种语义:. 至少一次语义的Topology写法. 参考资料: Storm消息的可靠性保障 Storm提供了Acker的机制来保证数据至少被处理一次,是由编程人员决定是否使用这一特性,要使用这一特性需要:. 在Spout emit时添加一个MsgID,那么ack和fail方法将会被调用当Tuple被正确地处理了或发生了错误.

Storm实战之WordCount

- - 编程语言 - ITeye博客
 在全面介绍Storm之前,我们先通过一个简单的Demo让大家整体感受一下什么是Storm. 本地模式(Local Mode): 即Topology(相当于一个任务,后续会详细讲解)  运行在本地机器的单一JVM上,这个模式主要用来开发、调试. 远程模式(Remote Mode):在这个模式,我们把我们的Topology提交到集群,在这个模式中,Storm的所有组件都是线程安全的,因为它们都会运行在不同的Jvm或物理机器上,这个模式就是正式的生产模式.

storm常见问题解答

- - BlogJava-庄周梦蝶
    最近有朋友给我邮件问一些storm的问题,集中解答在这里. 一、我有一个数据文件,或者我有一个系统里面有数据,怎么导入storm做计算. 你需要实现一个Spout,Spout负责将数据emit到storm系统里,交给bolts计算. 怎么实现spout可以参考官方的kestrel spout实现:.

Storm 实时性分析

- - CSDN博客架构设计推荐文章
都说Storm是一个实时流处理系统,但Storm的实时性体现在什么方面呢. 首先有一个前提:这里的实时性和我们通常所说的实时系统(芯片+汇编或C编写的实时处理软件)的实时性肯定是没法比的,也不是同一个概念. 这里的实时性应该是一个相对的实时性(相对于Hadoop之类 ). 总结一下,Storm的实时性可能主要体现在:.

那些storm的坑坑

- - 开源软件 - ITeye博客
转载请声明出处:http://blackwing.iteye.com/blog/2147633. 在使用storm的过程中,感觉它还是不如hadoop那么成熟. 当然,它的流式处理能力挺让人眼前一亮,以前做的个性化推荐都是离线计算,现在总算把实时部分也加上了. 总结一下storm使用的些心得:. 1.尽量把大量数据处理行为分拆成多个处理component.

storm准实时应用

- - CSDN博客推荐文章
1 应用背景: 需要实时统计用户的登陆数,在线人数,活跃时间,下载等指标的数据,或者清洗后移到hdfs上.         1) 客户端产生数据---.         2) kafka-生产者实时采集数据(保留7天)-----.         3) storm实时消费数据,处理数据.         4)把实时数据统计结果缓存到memcached 中.

Storm核心概念剖析

- - 互联网 - ITeye博客
最近团队中有分析的场景,用到了JStorm来做数据的实时分析,于是花时间对于一些概念做了了解. 这个的话出来应该有几年时间了,阿里巴巴也重写了一套JStorm,核心的类名都是服用的Storm的,他是一套实时数据处理系统,容错行好,然后足够稳定,目前很多数据实时分析的场景,选择Storm的越来越多了.