Kafka+Storm+HDFS整合实践

标签: kafka storm hdfs | 发表时间:2016-05-09 17:21 | 作者:
出处:http://eric-gcm.iteye.com

原文地址: http://shiyanjun.cn/archives/934.html

 

在基于Hadoop平台的很多应用场景中,我们需要对数据进行离线和实时分析,离线分析可以很容易地借助于Hive来实现统计分析,但是对于实时的 需求Hive就不合适了。实时应用场景可以使用Storm,它是一个实时处理系统,它为实时处理类应用提供了一个计算模型,可以很容易地进行编程处理。为 了统一离线和实时计算,一般情况下,我们都希望将离线和实时计算的数据源的集合统一起来作为输入,然后将数据的流向分别经由实时系统和离线分析系统,分别 进行分析处理,这时我们可以考虑将数据源(如使用Flume收集日志)直接连接一个消息中间件,如Kafka,可以整合 Flume+Kafka,Flume作为消息的Producer,生产的消息数据(日志数据、业务请求数据等等)发布到Kafka中,然后通过订阅的方 式,使用Storm的Topology作为消息的Consumer,在Storm集群中分别进行如下两个需求场景的处理:

  • 直接使用Storm的Topology对数据进行实时分析处理
  • 整合Storm+HDFS,将消息处理后写入HDFS进行离线分析处理

实时处理,只要开发满足业务需要的Topology即可,不做过多说明。这里,我们主要从安装配置Kafka、Storm,以及整合 Kafka+Storm、整合Storm+HDFS、整合Kafka+Storm+HDFS这几点来配置实践,满足上面提出的一些需求。配置实践使用的软 件包如下所示:

  • zookeeper-3.4.5.tar.gz
  • kafka_2.9.2-0.8.1.1.tgz
  • apache-storm-0.9.2-incubating.tar.gz
  • hadoop-2.2.0.tar.gz

程序配置运行所基于的操作系统为CentOS 5.11。

Kafka安装配置

我们使用3台机器搭建Kafka集群:

1 192.168.4.142   h1
2 192.168.4.143   h2
3 192.168.4.144   h3

在安装Kafka集群之前,这里没有使用Kafka自带的Zookeeper,而是独立安装了一个Zookeeper集群,也是使用这3台机器,保证Zookeeper集群正常运行。
首先,在h1上准备Kafka安装文件,执行如下命令:

1 cd /usr/ local /
3 tar xvzf kafka_2.9.2-0.8.1.1.tgz
4 ln -s /usr/ local /kafka_2.9.2-0.8.1.1 /usr/ local /kafka
5 chown -R kafka:kafka /usr/ local /kafka_2.9.2-0.8.1.1 /usr/ local /kafka

修改配置文件/usr/local/kafka/config/server.properties,修改如下内容:

1 broker.id=0
2 zookeeper.connect=h1:2181,h2:2181,h3:2181/kafka

这里需要说明的是,默认Kafka会使用ZooKeeper默认的/路径,这样有关Kafka的ZooKeeper配置就会散落在根路径下面,如果 你有其他的应用也在使用ZooKeeper集群,查看ZooKeeper中数据可能会不直观,所以强烈建议指定一个chroot路径,直接在 zookeeper.connect配置项中指定:

1 zookeeper.connect=h1:2181,h2:2181,h3:2181/kafka

而且,需要手动在ZooKeeper中创建路径/kafka,使用如下命令连接到任意一台ZooKeeper服务器:

1 cd /usr/ local /zookeeper
2 bin/zkCli.sh

在ZooKeeper执行如下命令创建chroot路径:

1 create /kafka ''

这样,每次连接Kafka集群的时候(使用 --zookeeper选项),也必须使用带chroot路径的连接字符串,后面会看到。
然后,将配置好的安装文件同步到其他的h2、h3节点上:

1 scp -r /usr/ local /kafka_2.9.2-0.8.1.1/ h2:/usr/ local /
2 scp -r /usr/ local /kafka_2.9.2-0.8.1.1/ h3:/usr/ local /

最后,在h2、h3节点上配置,执行如下命令:

1 cd /usr/ local /
2 ln -s /usr/ local /kafka_2.9.2-0.8.1.1 /usr/ local /kafka
3 chown -R kafka:kafka /usr/ local /kafka_2.9.2-0.8.1.1 /usr/ local /kafka

并修改配置文件/usr/local/kafka/config/server.properties内容如下所示:

1 broker.id=1  # 在h1修改
2  
3 broker.id=2  # 在h2修改

因为Kafka集群需要保证各个Broker的id在整个集群中必须唯一,需要调整这个配置项的值(如果在单机上,可以通过建立多个Broker进程来模拟分布式的Kafka集群,也需要Broker的id唯一,还需要修改一些配置目录的信息)。
在集群中的h1、h2、h3这三个节点上分别启动Kafka,分别执行如下命令:

1 bin/kafka-server-start.sh /usr/ local /kafka/config/server.properties &

可以通过查看日志,或者检查进程状态,保证Kafka集群启动成功。
我们创建一个名称为my-replicated-topic5的Topic,5个分区,并且复制因子为3,执行如下命令:

1 bin/kafka-topics.sh --create --zookeeper h1:2181,h2:2181,h3:2181/kafka --replication-factor 3 --partitions 5 --topic my-replicated-topic5

查看创建的Topic,执行如下命令:

1 bin/kafka-topics.sh --describe --zookeeper h1:2181,h2:2181,h3:2181/kafka --topic my-replicated-topic5

结果信息如下所示:

1 Topic:my-replicated-topic5     PartitionCount:5     ReplicationFactor:3     Configs:
2       Topic: my-replicated-topic5     Partition: 0     Leader: 0     Replicas: 0,2,1     Isr: 0,2,1
3       Topic: my-replicated-topic5     Partition: 1     Leader: 0     Replicas: 1,0,2     Isr: 0,2,1
4       Topic: my-replicated-topic5     Partition: 2     Leader: 2     Replicas: 2,1,0     Isr: 2,0,1
5       Topic: my-replicated-topic5     Partition: 3     Leader: 0     Replicas: 0,1,2     Isr: 0,2,1
6       Topic: my-replicated-topic5     Partition: 4     Leader: 2     Replicas: 1,2,0     Isr: 2,0,1

上面Leader、Replicas、Isr的含义如下:

1 Partition: 分区
2 Leader   : 负责读写指定分区的节点
3 Replicas : 复制该分区log的节点列表
4 Isr      : "in-sync" replicas,当前活跃的副本列表(是一个子集),并且可能成为Leader

我们可以通过Kafka自带的bin/kafka-console-producer.sh和bin/kafka-console-consumer.sh脚本,来验证演示如果发布消息、消费消息。
在一个终端,启动Producer,并向我们上面创建的名称为my-replicated-topic5的Topic中生产消息,执行如下脚本:

1 bin/kafka-console-producer.sh --broker-list h1:9092,h2:9092,h3:9092 --topic my-replicated-topic5

在另一个终端,启动Consumer,并订阅我们上面创建的名称为my-replicated-topic5的Topic中生产的消息,执行如下脚本:

1 bin/kafka-console-consumer.sh --zookeeper h1:2181,h2:2181,h3:2181/kafka --from-beginning --topic my-replicated-topic5

可以在Producer终端上输入字符串消息行,然后回车,就可以在Consumer终端上看到消费者消费的消息内容。
也可以参考Kafka的Producer和Consumer的Java API,通过API编码的方式来实现消息生产和消费的处理逻辑。

Storm安装配置

Storm集群也依赖Zookeeper集群,要保证Zookeeper集群正常运行。Storm的安装配置比较简单,我们仍然使用下面3台机器搭建:

1 192.168.4.142   h1
2 192.168.4.143   h2
3 192.168.4.144   h3

首先,在h1节点上,执行如下命令安装:

1 cd /usr/ local /
3 tar xvzf apache-storm-0.9.2-incubating. tar .gz
4 ln -s /usr/ local /apache-storm-0.9.2-incubating /usr/ local /storm
5 chown -R storm:storm /usr/ local /apache-storm-0.9.2-incubating /usr/ local /storm

然后,修改配置文件conf/storm.yaml,内容如下所示:

01 storm.zookeeper.servers:
02       - "h1"
03       - "h2"
04       - "h3"
05 storm.zookeeper.port: 2181
06 #
07 nimbus.host: "h1"
08  
09 supervisor.slots.ports:
10      - 6700
11      - 6701
12      - 6702
13      - 6703
14  
15 storm.local.dir: "/tmp/storm"

将配置好的安装文件,分发到其他节点上:

1 scp -r /usr/ local /apache-storm-0.9.2-incubating/ h2:/usr/ local /
2 scp -r /usr/ local /apache-storm-0.9.2-incubating/ h3:/usr/ local /

最后,在h2、h3节点上配置,执行如下命令:

1 cd /usr/ local /
2 ln -s /usr/ local /apache-storm-0.9.2-incubating /usr/ local /storm
3 chown -R storm:storm /usr/ local /apache-storm-0.9.2-incubating /usr/ local /storm

Storm集群的主节点为Nimbus,从节点为Supervisor,我们需要在h1上启动Nimbus服务,在从节点h2、h3上启动Supervisor服务:

1 bin/storm nimbus &
2 bin/storm supervisor &

为了方便监控,可以启动Storm UI,可以从Web页面上监控Storm Topology的运行状态,例如在h2上启动:

1 bin/storm ui &

这样可以通过访问 http://h2:8080/来查看Topology的运行状况。

整合Kafka+Storm

消息通过各种方式进入到Kafka消息中间件,比如可以通过使用Flume来收集日志数据,然后在Kafka中路由暂存,然后再由实时计算程序 Storm做实时分析,这时我们就需要将在Storm的Spout中读取Kafka中的消息,然后交由具体的Spot组件去分析处理。实际 上,apache-storm-0.9.2-incubating这个版本的Storm已经自带了一个集成Kafka的外部插件程序storm- kafka,可以直接使用,例如我使用的Maven依赖配置,如下所示:

01 < dependency >
02       < groupId >org.apache.storm</ groupId >
03       < artifactId >storm-core</ artifactId >
04       < version >0.9.2-incubating</ version >
05       < scope >provided</ scope >
06 </ dependency >
07 < dependency >
08       < groupId >org.apache.storm</ groupId >
09       < artifactId >storm-kafka</ artifactId >
10       < version >0.9.2-incubating</ version >
11 </ dependency >
12 < dependency >
13       < groupId >org.apache.kafka</ groupId >
14       < artifactId >kafka_2.9.2</ artifactId >
15       < version >0.8.1.1</ version >
16       < exclusions >
17            < exclusion >
18                 < groupId >org.apache.zookeeper</ groupId >
19                 < artifactId >zookeeper</ artifactId >
20            </ exclusion >
21            < exclusion >
22                 < groupId >log4j</ groupId >
23                 < artifactId >log4j</ artifactId >
24            </ exclusion >
25       </ exclusions >
26 </ dependency >

下面,我们开发了一个简单WordCount示例程序,从Kafka读取订阅的消息行,通过空格拆分出单个单词,然后再做词频统计计算,实现的Topology的代码,如下所示:

001 package org.shirdrn.storm.examples;
002  
003 import java.util.Arrays;
004 import java.util.HashMap;
005 import java.util.Iterator;
006 import java.util.Map;
007 import java.util.Map.Entry;
008 import java.util.concurrent.atomic.AtomicInteger;
009  
010 import org.apache.commons.logging.Log;
011 import org.apache.commons.logging.LogFactory;
012  
013 import storm.kafka.BrokerHosts;
014 import storm.kafka.KafkaSpout;
015 import storm.kafka.SpoutConfig;
016 import storm.kafka.StringScheme;
017 import storm.kafka.ZkHosts;
018 import backtype.storm.Config;
019 import backtype.storm.LocalCluster;
020 import backtype.storm.StormSubmitter;
021 import backtype.storm.generated.AlreadyAliveException;
022 import backtype.storm.generated.InvalidTopologyException;
023 import backtype.storm.spout.SchemeAsMultiScheme;
024 import backtype.storm.task.OutputCollector;
025 import backtype.storm.task.TopologyContext;
026 import backtype.storm.topology.OutputFieldsDeclarer;
027 import backtype.storm.topology.TopologyBuilder;
028 import backtype.storm.topology.base.BaseRichBolt;
029 import backtype.storm.tuple.Fields;
030 import backtype.storm.tuple.Tuple;
031 import backtype.storm.tuple.Values;
032  
033 public class MyKafkaTopology {
034  
035       public static class KafkaWordSplitter extends BaseRichBolt {
036  
037            private static final Log LOG = LogFactory.getLog(KafkaWordSplitter. class );
038            private static final long serialVersionUID = 886149197481637894L;
039            private OutputCollector collector;
040           
041            @Override
042            public void prepare(Map stormConf, TopologyContext context,
043                      OutputCollector collector) {
044                 this .collector = collector;             
045            }
046  
047            @Override
048            public void execute(Tuple input) {
049                 String line = input.getString( 0 );
050                 LOG.info( "RECV[kafka -> splitter] " + line);
051                 String[] words = line.split( "\\s+" );
052                 for (String word : words) {
053                      LOG.info( "EMIT[splitter -> counter] " + word);
054                      collector.emit(input, new Values(word, 1 ));
055                 }
056                 collector.ack(input);
057            }
058  
059            @Override
060            public void declareOutputFields(OutputFieldsDeclarer declarer) {
061                 declarer.declare( new Fields( "word" , "count" ));        
062            }
063           
064       }
065      
066       public static class WordCounter extends BaseRichBolt {
067  
068            private static final Log LOG = LogFactory.getLog(WordCounter. class );
069            private static final long serialVersionUID = 886149197481637894L;
070            private OutputCollector collector;
071            private Map<String, AtomicInteger> counterMap;
072           
073            @Override
074            public void prepare(Map stormConf, TopologyContext context,
075                      OutputCollector collector) {
076                 this .collector = collector;   
077                 this .counterMap = new HashMap<String, AtomicInteger>();
078            }
079  
080            @Override
081            public void execute(Tuple input) {
082                 String word = input.getString( 0 );
083                 int count = input.getInteger( 1 );
084                 LOG.info( "RECV[splitter -> counter] " + word + " : " + count);
085                 AtomicInteger ai = this .counterMap.get(word);
086                 if (ai == null ) {
087                      ai = new AtomicInteger();
088                      this .counterMap.put(word, ai);
089                 }
090                 ai.addAndGet(count);
091                 collector.ack(input);
092                 LOG.info( "CHECK statistics map: " + this .counterMap);
093            }
094  
095            @Override
096            public void cleanup() {
097                 LOG.info( "The final result:" );
098                 Iterator<Entry<String, AtomicInteger>> iter = this .counterMap.entrySet().iterator();
099                 while (iter.hasNext()) {
100                      Entry<String, AtomicInteger> entry = iter.next();
101                      LOG.info(entry.getKey() + "\t:\t" + entry.getValue().get());
102                 }
103                
104            }
105  
106            @Override
107            public void declareOutputFields(OutputFieldsDeclarer declarer) {
108                 declarer.declare( new Fields( "word" , "count" ));        
109            }
110       }
111      
112       public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
113            String zks = "h1:2181,h2:2181,h3:2181" ;
114            String topic = "my-replicated-topic5" ;
115            String zkRoot = "/storm" ; // default zookeeper root configuration for storm
116            String id = "word" ;
117           
118            BrokerHosts brokerHosts = new ZkHosts(zks);
119            SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);
120            spoutConf.scheme = new SchemeAsMultiScheme( new StringScheme());
121            spoutConf.forceFromStart = false ;
122            spoutConf.zkServers = Arrays.asList( new String[] { "h1" , "h2" , "h3" });
123            spoutConf.zkPort = 2181 ;
124           
125            TopologyBuilder builder = new TopologyBuilder();
126            builder.setSpout( "kafka-reader" , new KafkaSpout(spoutConf), 5 ); // Kafka我们创建了一个5分区的Topic,这里并行度设置为5
127            builder.setBolt( "word-splitter" , new KafkaWordSplitter(), 2 ).shuffleGrouping( "kafka-reader" );
128            builder.setBolt( "word-counter" , new WordCounter()).fieldsGrouping( "word-splitter" , new Fields( "word" ));
129           
130            Config conf = new Config();
131           
132            String name = MyKafkaTopology. class .getSimpleName();
133            if (args != null && args.length > 0 ) {
134                 // Nimbus host name passed from command line
135                 conf.put(Config.NIMBUS_HOST, args[ 0 ]);
136                 conf.setNumWorkers( 3 );
137                 StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
138            } else {
139                 conf.setMaxTaskParallelism( 3 );
140                 LocalCluster cluster = new LocalCluster();
141                 cluster.submitTopology(name, conf, builder.createTopology());
142                 Thread.sleep( 60000 );
143                 cluster.shutdown();
144            }
145       }
146 }

上面程序,在本地调试(使用LocalCluster)不需要输入任何参数,提交到实际集群中运行时,需要传递一个参数,该参数为Nimbus的主机名称。
通过Maven构建,生成一个包含依赖的single jar文件(不要把Storm的依赖包添加进去),例如storm-examples-0.0.1-SNAPSHOT.jar,在提交Topology程 序到Storm集群之前,因为用到了Kafka,需要拷贝一下依赖jar文件到Storm集群中的lib目录下面:

1 cp /usr/ local /kafka/libs/kafka_2.9.2-0.8.1.1.jar /usr/ local /storm/lib/
2 cp /usr/ local /kafka/libs/scala-library-2.9.2.jar /usr/ local /storm/lib/
3 cp /usr/ local /kafka/libs/metrics-core-2.2.0.jar /usr/ local /storm/lib/
4 cp /usr/ local /kafka/libs/snappy-java-1.0.5.jar /usr/ local /storm/lib/
5 cp /usr/ local /kafka/libs/zkclient-0.3.jar /usr/ local /storm/lib/
6 cp /usr/ local /kafka/libs/log4j-1.2.15.jar /usr/ local /storm/lib/
7 cp /usr/ local /kafka/libs/slf4j-api-1.7.2.jar /usr/ local /storm/lib/
8 cp /usr/ local /kafka/libs/jopt-simple-3.2.jar /usr/ local /storm/lib/

然后,就可以提交我们开发的Topology程序了:

1 bin/storm jar /home/storm/storm-examples-0.0.1-SNAPSHOT.jar org.shirdrn.storm.examples.MyKafkaTopology h1

可以通过查看日志文件(logs/目录下)或者Storm UI来监控Topology的运行状况。如果程序没有错误,可以使用前面我们使用的Kafka Producer来生成消息,就能看到我们开发的Storm Topology能够实时接收到并进行处理。
上面Topology实现代码中,有一个很关键的配置对象SpoutConfig,配置属性如下所示:

1 spoutConf.forceFromStart = false ;

该配置是指,如果该Topology因故障停止处理,下次正常运行时是否从Spout对应数据源Kafka中的该订阅Topic的起始位置开始读 取,如果forceFromStart=true,则之前处理过的Tuple还要重新处理一遍,否则会从上次处理的位置继续处理,保证Kafka中的 Topic数据不被重复处理,是在数据源的位置进行状态记录。

整合Storm+HDFS

Storm实时计算集群从Kafka消息中间件中消费消息,有实时处理需求的可以走实时处理程序,还有需要进行离线分析的需求,如写入到HDFS进行分析。下面实现了一个Topology,代码如下所示:

001 package org.shirdrn.storm.examples;
002  
003 import java.text.DateFormat;
004 import java.text.SimpleDateFormat;
005 import java.util.Date;
006 import java.util.Map;
007 import java.util.Random;
008  
009 import org.apache.commons.logging.Log;
010 import org.apache.commons.logging.LogFactory;
011 import org.apache.storm.hdfs.bolt.HdfsBolt;
012 import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
013 import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
014 import org.apache.storm.hdfs.bolt.format.FileNameFormat;
015 import org.apache.storm.hdfs.bolt.format.RecordFormat;
016 import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
017 import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
018 import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy.TimeUnit;
019 import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
020 import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
021  
022 import backtype.storm.Config;
023 import backtype.storm.LocalCluster;
024 import backtype.storm.StormSubmitter;
025 import backtype.storm.generated.AlreadyAliveException;
026 import backtype.storm.generated.InvalidTopologyException;
027 import backtype.storm.spout.SpoutOutputCollector;
028 import backtype.storm.task.TopologyContext;
029 import backtype.storm.topology.OutputFieldsDeclarer;
030 import backtype.storm.topology.TopologyBuilder;
031 import backtype.storm.topology.base.BaseRichSpout;
032 import backtype.storm.tuple.Fields;
033 import backtype.storm.tuple.Values;
034 import backtype.storm.utils.Utils;
035  
036 public class StormToHDFSTopology {
037  
038       public static class EventSpout extends BaseRichSpout {
039  
040            private static final Log LOG = LogFactory.getLog(EventSpout. class );
041            private static final long serialVersionUID = 886149197481637894L;
042            private SpoutOutputCollector collector;
043            private Random rand;
044            private String[] records;
045           
046            @Override
047            public void open(Map conf, TopologyContext context,
048                      SpoutOutputCollector collector) {
049                 this .collector = collector;   
050                 rand = new Random();
051                 records = new String[] {
052                           "10001     ef2da82d4c8b49c44199655dc14f39f6     4.2.1     HUAWEI G610-U00     HUAWEI     2     70:72:3c:73:8b:22     2014-10-13 12:36:35" ,
053                           "10001     ffb52739a29348a67952e47c12da54ef     4.3     GT-I9300     samsung     2     50:CC:F8:E4:22:E2     2014-10-13 12:36:02" ,
054                           "10001     ef2da82d4c8b49c44199655dc14f39f6     4.2.1     HUAWEI G610-U00     HUAWEI     2     70:72:3c:73:8b:22     2014-10-13 12:36:35"
055                 };
056            }
057  
058  
059            @Override
060            public void nextTuple() {
061                 Utils.sleep( 1000 );
062                 DateFormat df = new SimpleDateFormat( "yyyy-MM-dd_HH-mm-ss" );
063                 Date d = new Date(System.currentTimeMillis());
064                 String minute = df.format(d);
065                 String record = records[rand.nextInt(records.length)];
066                 LOG.info( "EMIT[spout -> hdfs] " + minute + " : " + record);
067                 collector.emit( new Values(minute, record));
068            }
069  
070            @Override
071            public void declareOutputFields(OutputFieldsDeclarer declarer) {
072                 declarer.declare( new Fields( "minute" , "record" ));        
073            }
074  
075  
076       }
077      
078       public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
079            // use "|" instead of "," for field delimiter
080            RecordFormat format = new DelimitedRecordFormat()
081                    .withFieldDelimiter( " : " );
082  
083            // sync the filesystem after every 1k tuples
084            SyncPolicy syncPolicy = new CountSyncPolicy( 1000 );
085  
086            // rotate files
087            FileRotationPolicy rotationPolicy = new TimedRotationPolicy( 1 .0f, TimeUnit.MINUTES);
088  
089            FileNameFormat fileNameFormat = new DefaultFileNameFormat()
090                    .withPath( "/storm/" ).withPrefix( "app_" ).withExtension( ".log" );
091  
092            HdfsBolt hdfsBolt = new HdfsBolt()
093                    .withFsUrl( " hdfs://h1:8020" )
094                    .withFileNameFormat(fileNameFormat)
095                    .withRecordFormat(format)
096                    .withRotationPolicy(rotationPolicy)
097                    .withSyncPolicy(syncPolicy);
098           
099            TopologyBuilder builder = new TopologyBuilder();
100            builder.setSpout( "event-spout" , new EventSpout(), 3 );
101            builder.setBolt( "hdfs-bolt" , hdfsBolt, 2 ).fieldsGrouping( "event-spout" , new Fields( "minute" ));
102           
103            Config conf = new Config();
104           
105            String name = StormToHDFSTopology. class .getSimpleName();
106            if (args != null && args.length > 0 ) {
107                 conf.put(Config.NIMBUS_HOST, args[ 0 ]);
108                 conf.setNumWorkers( 3 );
109                 StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
110            } else {
111                 conf.setMaxTaskParallelism( 3 );
112                 LocalCluster cluster = new LocalCluster();
113                 cluster.submitTopology(name, conf, builder.createTopology());
114                 Thread.sleep( 60000 );
115                 cluster.shutdown();
116            }
117       }
118  
119 }

上面的处理逻辑,可以对HdfsBolt进行更加详细的配置,如FileNameFormat、SyncPolicy、 FileRotationPolicy(可以设置在满足什么条件下,切出一个新的日志,如可以指定多长时间切出一个新的日志文件,可以指定一个日志文件大 小达到设置值后,再写一个新日志文件),更多设置可以参考storm-hdfs,。
上面代码在打包的时候,需要注意,使用storm-starter自带的Maven打包配置,可能在将Topology部署运行的时候,会报错,可以使用maven-shade-plugin这个插件,如下配置所示:

01 < plugin >
02      < groupId >org.apache.maven.plugins</ groupId >
03      < artifactId >maven-shade-plugin</ artifactId >
04      < version >1.4</ version >
05      < configuration >
06          < createDependencyReducedPom >true</ createDependencyReducedPom >
07      </ configuration >
08      < executions >
09          < execution >
10              < phase >package</ phase >
11              < goals >
12                  < goal >shade</ goal >
13              </ goals >
14              < configuration >
15                  < transformers >
16                      < transformer
17                              implementation = "org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
18                      < transformer
19                              implementation = "org.apache.maven.plugins.shade.resource.ManifestResourceTransformer" >
20                          < mainClass ></ mainClass >
21                      </ transformer >
22                  </ transformers >
23              </ configuration >
24          </ execution >
25      </ executions >
26 </ plugin >

整合Kafka+Storm+HDFS

上面分别对整合Kafka+Storm和Storm+HDFS做了实践,可以将后者的Spout改成前者的Spout,从Kafka中消费消息,在 Storm中可以做简单处理,然后将数据写入HDFS,最后可以在Hadoop平台上对数据进行离线分析处理。下面,写了一个简单的例子,从Kafka消 费消息,然后经由Storm处理,写入到HDFS存储,代码如下所示:

001 package org.shirdrn.storm.examples;
002  
003 import java.util.Arrays;
004 import java.util.Map;
005  
006 import org.apache.commons.logging.Log;
007 import org.apache.commons.logging.LogFactory;
008 import org.apache.storm.hdfs.bolt.HdfsBolt;
009 import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
010 import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
011 import org.apache.storm.hdfs.bolt.format.FileNameFormat;
012 import org.apache.storm.hdfs.bolt.format.RecordFormat;
013 import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
014 import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
015 import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy.TimeUnit;
016 import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
017 import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
018  
019 import storm.kafka.BrokerHosts;
020 import storm.kafka.KafkaSpout;
021 import storm.kafka.SpoutConfig;
022 import storm.kafka.StringScheme;
023 import storm.kafka.ZkHosts;
024 import backtype.storm.Config;
025 import backtype.storm.LocalCluster;
026 import backtype.storm.StormSubmitter;
027 import backtype.storm.generated.AlreadyAliveException;
028 import backtype.storm.generated.InvalidTopologyException;
029 import backtype.storm.spout.SchemeAsMultiScheme;
030 import backtype.storm.task.OutputCollector;
031 import backtype.storm.task.TopologyContext;
032 import backtype.storm.topology.OutputFieldsDeclarer;
033 import backtype.storm.topology.TopologyBuilder;
034 import backtype.storm.topology.base.BaseRichBolt;
035 import backtype.storm.tuple.Fields;
036 import backtype.storm.tuple.Tuple;
037 import backtype.storm.tuple.Values;
038  
039 public class DistributeWordTopology {
040      
041       public static class KafkaWordToUpperCase extends BaseRichBolt {
042  
043            private static final Log LOG = LogFactory.getLog(KafkaWordToUpperCase. class );
044            private static final long serialVersionUID = -5207232012035109026L;
045            private OutputCollector collector;
046           
047            @Override
048            public void prepare(Map stormConf, TopologyContext context,
049                      OutputCollector collector) {
050                 this .collector = collector;             
051            }
052  
053            @Override
054            public void execute(Tuple input) {
055                 String line = input.getString( 0 ).trim();
056                 LOG.info( "RECV[kafka -> splitter] " + line);
057                 if (!line.isEmpty()) {
058                      String upperLine = line.toUpperCase();
059                      LOG.info( "EMIT[splitter -> counter] " + upperLine);
060                      collector.emit(input, new Values(upperLine, upperLine.length()));
061                 }
062                 collector.ack(input);
063            }
064  
065            @Override
066            public void declareOutputFields(OutputFieldsDeclarer declarer) {
067                 declarer.declare( new Fields( "line" , "len" ));        
068            }
069           
070       }
071      
072       public static class RealtimeBolt extends BaseRichBolt {
073  
074            private static final Log LOG = LogFactory.getLog(KafkaWordToUpperCase. class );
075            private static final long serialVersionUID = -4115132557403913367L;
076            private OutputCollector collector;
077           
078            @Override
079            public void prepare(Map stormConf, TopologyContext context,
080                      OutputCollector collector) {
081                 this .collector = collector;             
082            }
083  
084            @Override
085            public void execute(Tuple input) {
086                 String line = input.getString( 0 ).trim();
087                 LOG.info( "REALTIME: " + line);
088                 collector.ack(input);
089            }
090  
091            @Override
092            public void declareOutputFields(OutputFieldsDeclarer declarer) {
093                
094            }
095  
096       }
097  
098       public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
099  
100            // Configure Kafka
101            String zks = "h1:2181,h2:2181,h3:2181" ;
102            String topic = "my-replicated-topic5" ;
103            String zkRoot = "/storm" ; // default zookeeper root configuration for storm
104            String id = "word" ;
105            BrokerHosts brokerHosts = new ZkHosts(zks);
106            SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);
107            spoutConf.scheme = new SchemeAsMultiScheme( new StringScheme());
108            spoutConf.forceFromStart = false ;
109            spoutConf.zkServers = Arrays.asList( new String[] { "h1" , "h2" , "h3" });
110            spoutConf.zkPort = 2181 ;
111           
112            // Configure HDFS bolt
113            RecordFormat format = new DelimitedRecordFormat()
114                    .withFieldDelimiter( "\t" ); // use "\t" instead of "," for field delimiter
115            SyncPolicy syncPolicy = new CountSyncPolicy( 1000 ); // sync the filesystem after every 1k tuples
116            FileRotationPolicy rotationPolicy = new TimedRotationPolicy( 1 .0f, TimeUnit.MINUTES); // rotate files
117            FileNameFormat fileNameFormat = new DefaultFileNameFormat()
118                    .withPath( "/storm/" ).withPrefix( "app_" ).withExtension( ".log" ); // set file name format
119            HdfsBolt hdfsBolt = new HdfsBolt()
120                    .withFsUrl( " hdfs://h1:8020" )
121                    .withFileNameFormat(fileNameFormat)
122                    .withRecordFormat(format)
123                    .withRotationPolicy(rotationPolicy)
124                    .withSyncPolicy(syncPolicy);
125           
126            // configure & build topology
127            TopologyBuilder builder = new TopologyBuilder();
128            builder.setSpout( "kafka-reader" , new KafkaSpout(spoutConf), 5 );
129            builder.setBolt( "to-upper" , new KafkaWordToUpperCase(), 3 ).shuffleGrouping( "kafka-reader" );
130            builder.setBolt( "hdfs-bolt" , hdfsBolt, 2 ).shuffleGrouping( "to-upper" );
131            builder.setBolt( "realtime" , new RealtimeBolt(), 2 ).shuffleGrouping( "to-upper" );
132           
133            // submit topology
134            Config conf = new Config();
135            String name = DistributeWordTopology. class .getSimpleName();
136            if (args != null && args.length > 0 ) {
137                 String nimbus = args[ 0 ];
138                 conf.put(Config.NIMBUS_HOST, nimbus);
139                 conf.setNumWorkers( 3 );
140                 StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
141            } else {
142                 conf.setMaxTaskParallelism( 3 );
143                 LocalCluster cluster = new LocalCluster();
144                 cluster.submitTopology(name, conf, builder.createTopology());
145                 Thread.sleep( 60000 );
146                 cluster.shutdown();
147            }
148       }
149  
150 }

上面代码中,名称为to-upper的Bolt将接收到的字符串行转换成大写以后,会将处理过的数据向后面的hdfs-bolt、realtime这两个Bolt各发一份拷贝,然后由这两个Bolt分别根据实际需要(实时/离线)单独处理。
打包后,在Storm集群上部署并运行这个Topology:

1 bin/storm jar ~/storm-examples-0.0.1-SNAPSHOT.jar org.shirdrn.storm.examples.DistributeWordTopology h1

可以通过Storm UI查看Topology运行情况,可以查看HDFS上生成的数据。

参考链接



已有 0 人发表留言,猛击->> 这里<<-参与讨论


ITeye推荐



相关 [kafka storm hdfs] 推荐:

Kafka+Storm+HDFS整合实践

- -
原文地址: http://shiyanjun.cn/archives/934.html. 在基于Hadoop平台的很多应用场景中,我们需要对数据进行离线和实时分析,离线分析可以很容易地借助于Hive来实现统计分析,但是对于实时的 需求Hive就不合适了. 实时应用场景可以使用Storm,它是一个实时处理系统,它为实时处理类应用提供了一个计算模型,可以很容易地进行编程处理.

大数据架构:flume-ng+Kafka+Storm+HDFS 实时系统组合

- - 行业应用 - ITeye博客
大数据我们都知道hadoop,但并不都是hadoop.我们该如何构建大数据库项目. 对于离线处理,hadoop还是比较适合的,但是对于实时性比较强的,数据量比较大的,我们可以采用Storm,那么Storm和什么技术搭配,才能够做一个适合自己的项目. 可以带着下面问题来阅读本文章:. 1.一个好的项目架构应该具备什么特点.

storm笔记 -- 与kafka的集成

- - 开源软件 - ITeye博客
   storm与kafka的结合,即前端的采集程序将实时数据源源不断采集到队列中,而storm作为消费者拉取计算,是典型的应用场景. 因此,storm的发布包中也包含了一个集成jar,支持从kafka读出数据,供storm应用使用. 这里结合自己的应用做个简单总结.   由于storm已经提供了storm-kafka,因此可以直接使用,使用kafka的低级api读取数据.

Flume + kafka + HDFS构建日志采集系统

- - 企业架构 - ITeye博客
    Flume是一个非常优秀日志采集组件,类似于logstash,我们通常将Flume作为agent部署在application server上,用于收集本地的日志文件,并将日志转存到HDFS、kafka等数据平台中;关于Flume的原理和特性,我们稍后详解,本文只简述如何构建使用Flume + kafka + HDFS构建一套日志采集系统.

storm、hbase、kafka整合过程中遇到的log4j冲突问题

- - 行业应用 - ITeye博客
storm、hbase、kafka整合过程中遇到的log4j冲突问题. log4j-over-slf4j.jar AND slf4j-log4j12.jar 循环调用冲突了,再进一步原因是kafka、hbase中用的是log4j. * 方案一:把storm中的log4j-over-slf4j 依赖排除;.

HDFS-压缩

- - Java - 编程语言 - ITeye博客
文件压缩带来了两大益处1)减少存贮空间2)加速网络(磁盘)传输. 基于大数据的传输,都需要经过压缩处理. 压缩格式 工具 算法 文件扩展名 可分块. Java代码 复制代码 收藏代码. 24.        // io.compression.codecs 定义列表中的一个 . Native gzip 库减少解压缩时间在50%,压缩时间在10%(同java实现的压缩算法).

HDFS架构

- - 数据库 - ITeye博客
       在阅读了GFS的论文之后,对GFS的框架有了基本的了解,进一步学习自然是对HDFS的解析,不得不说,之前对GFS的一些了解,对理解HDFS还是很有帮助的,毕竟后者是建立在前者之上的分布式文件系统,二者在框架上可以找到很多的共同点,建议初次接触HFDS的技术人员可以先把GFS的那篇论文啃个两三遍,毕竟磨刀不砍柴工.

storm简介

- - 搜索技术博客-淘宝
伴随着信息科技日新月异的发展,信息呈现出爆发式的膨胀,人们获取信息的途径也更加多样、更加便捷,同时对于信息的时效性要求也越来越高. 举个搜索场景中的例子,当一个卖家发布了一条宝贝信息时,他希望的当然是这个宝贝马上就可以被卖家搜索出来、点击、购买啦,相反,如果这个宝贝要等到第二天或者更久才可以被搜出来,估计这个大哥就要骂娘了.

Storm Trident 学习

- - 小火箭
Storm支持的三种语义:. 至少一次语义的Topology写法. 参考资料: Storm消息的可靠性保障 Storm提供了Acker的机制来保证数据至少被处理一次,是由编程人员决定是否使用这一特性,要使用这一特性需要:. 在Spout emit时添加一个MsgID,那么ack和fail方法将会被调用当Tuple被正确地处理了或发生了错误.

Hadoop剖析之HDFS

- - CSDN博客数据库推荐文章
Hadoop的分布式文件系统(HDFS)是Hadoop的很重要的一部分,本文先简单介绍HDFS的几个特点,然后再分析背后的原理,即怎样实现这种特点的. 这是HDFS最核心的特性了,把大量数据部署在便宜的硬件上,即使其中某些磁盘出现故障,HDFS也能很快恢复丢失的数据. 这个的意思是HDFS适合一次写入,多次读取的程序,文件写入后,就不需要修改了.