storm准实时应用

标签: storm 实时 应用 | 发表时间:2014-12-30 06:33 | 作者:lili72
出处:http://blog.csdn.net

1 应用背景: 需要实时统计用户的登陆数,在线人数,活跃时间,下载等指标的数据,或者清洗后移到hdfs上。

 

2 设计架构:

        1) 客户端产生数据---

        2) kafka-生产者实时采集数据(保留7天)-----

        3) storm实时消费数据,处理数据

        4)把实时数据统计结果缓存到memcached 中

        5) 把数据保存到mysql

 

3 组件之间的通信.

3.1  客户端发送数据---Nginx接收 分布式放在多台服务器上。 

3.2  (flume读取接 收集文件信息传给kafka)-kafka生产者直接收集文件信息。 

3.3  kafka与storm 通过插件storm-kafka 通信

3.4  storm 与缓存 memcached   java程序 读取mysql的结果缓存到 memcached

3.5   zookeeper 用工具 curator-client,锁同步机制。

        (对应的插件可以在github上找到 https://github.com/)

4  场景在现:即席查询用户注册数,用户登录数,当前在线人数

   4.1   Storm 处理:

          4.1.1 数据清理阶段:

         Storm从kafka得到对应的topic数据,然后对数据进行清洗。Storm获取实时JSON数据,然后通过解析JSON数据,格式化之后利用storm-hdfs把数据传到HDFS上。或者实时统计数据存放到关系型数据库中。

      

     

package com.ks.topology;

import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;

import com.google.common.collect.ImmutableList;
import com.ks.bolt.ConJsonToData;
import com.ks.bolt.CounterBolt;

/**
 * @author root
 *
 */
public class CountUserLogin {

	/**
	 * @param args
	 */
	public static void main(String[] args) {
		try{
			String kafkaZookeeper = "192.168.119.131:2181,192.168.119.132:2181,192.168.119.133:2181";
			BrokerHosts brokerHosts = new ZkHosts(kafkaZookeeper);
			SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, "userlogin", "/userlogin", "id");
	        kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
	        kafkaConfig.zkServers =  ImmutableList.of("192.168.119.131","192.168.119.132","192.168.119.133");
	        kafkaConfig.zkPort = 2181;
			
	        //kafkaConfig.forceFromStart = true;
			
	        TopologyBuilder builder = new TopologyBuilder();
	        builder.setSpout("spout", new KafkaSpout(kafkaConfig), 2);
	        builder.setBolt("counter", new CounterBolt(),1).shuffleGrouping("spout");
	       builder.setBolt("ConJsonToData", new ConJsonToData(),1).shuffleGrouping("counter");
	        
	        Config config = new Config();
	        config.setDebug(true);
	        
	        if(args!=null && args.length > 0) {
	            config.setNumWorkers(2);
	            
	            StormSubmitter.submitTopology(args[0], config, builder.createTopology());
	        } else {        
	            config.setMaxTaskParallelism(3);
	
	            LocalCluster cluster = new LocalCluster();
	            cluster.submitTopology("CountUserLogin-topology", config, builder.createTopology());
	        
	            Thread.sleep(500000);
	
	            cluster.shutdown();
	        }
		}catch (Exception e) {
			e.printStackTrace();
		}
	}

}

package com.ks.bolt;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class CounterBolt extends BaseBasicBolt {

	/**
	 * 
	 */
	private static final long serialVersionUID = -5508421065181891596L;
	
	private static long counter = 0;
	
	@Override
	public void execute(Tuple tuple, BasicOutputCollector collector) {
		
		System.out.println("msg = "+tuple.getString(0)+" -------------counter = "+(counter++));
		collector.emit(new Values(tuple));
		
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("userloginStr"));

	}

}

package com.ks.bolt;

import java.io.IOException;

import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;

import com.ks.model.UserModel;

public class ConJsonToData  extends BaseBasicBolt{

	private static final ObjectMapper mapper = new ObjectMapper();
	private static final long serialVersionUID = 5596476183440049414L;

	@Override
	public void execute(Tuple tuple, BasicOutputCollector collector) {
		String  str =tuple .getString(0);
		System.out.println("str------------"  +str+"  str------------");
		UserModel bean =null;
        if(str!=null){
       	 try {
			bean = mapper.readValue(str, UserModel.class);
			 System.out.println(bean.toString());
		} catch (JsonParseException e) {
			e.printStackTrace();
		} catch (JsonMappingException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}
       
        }
		
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer arg0) {
		
	}

}
model

package com.ks.model;

public class UserLog {
	
//{"serverid":"1001","time":"2014-12-11 00:00:51","userid":12345678,"appid":8888,"client_ip":"192.136.20.210"}
	
	private String   serverid="";
	
	private String  time="";
	
	private  String  userid="";
	
	private  Integer  appid=0;
	
	private  String  client_ip="";
	
	
	public UserLog(){
		
	}
	
	public UserLog(String serverid, String time, String userid, Integer appid,
			String client_ip) {
		this.serverid = serverid;
		this.time = time;
		this.userid = userid;
		this.appid = appid;
		this.client_ip = client_ip;
	}

	public String getServerid() {
		return serverid;
	}

	public void setServerid(String serverid) {
		this.serverid = serverid;
	}
	

	public String getTime() {
		return time;
	}

	public void setTime(String time) {
		this.time = time;
	}

	public String getUserid() {
		return userid;
	}

	public void setUserid(String userid) {
		this.userid = userid;
	}

	public Integer getAppid() {
		return appid;
	}

	public void setAppid(Integer appid) {
		this.appid = appid;
	}

	public String getClient_ip() {
		return client_ip;
	}

	public void setClient_ip(String client_ip) {
		this.client_ip = client_ip;
	}

	
	@Override
		public String toString() {
			return  serverid+"|" + userid+"|" +appid+"|"+time+"|"+client_ip;
		}
	
}

package com.ks.model;

public class UserModel {
	
	private  UserLog  data;
	
	private String  type="" ;
	
	public String getType() {
		return type;
	}

	public void setType(String type) {
		this.type = type;
	}

	public  UserModel(){
	}

	public UserLog getData() {
		return data;
	}

	public void setData(UserLog data) {
		this.data = data;
	}
	
	@Override
	public String toString() {
		
		return data.toString()+"|"+type;
	}

}



作者:lili72 发表于2014-12-29 22:33:16 原文链接
阅读:0 评论:0 查看评论

相关 [storm 实时 应用] 推荐:

storm准实时应用

- - CSDN博客推荐文章
1 应用背景: 需要实时统计用户的登陆数,在线人数,活跃时间,下载等指标的数据,或者清洗后移到hdfs上.         1) 客户端产生数据---.         2) kafka-生产者实时采集数据(保留7天)-----.         3) storm实时消费数据,处理数据.         4)把实时数据统计结果缓存到memcached 中.

Storm 实时性分析

- - CSDN博客架构设计推荐文章
都说Storm是一个实时流处理系统,但Storm的实时性体现在什么方面呢. 首先有一个前提:这里的实时性和我们通常所说的实时系统(芯片+汇编或C编写的实时处理软件)的实时性肯定是没法比的,也不是同一个概念. 这里的实时性应该是一个相对的实时性(相对于Hadoop之类 ). 总结一下,Storm的实时性可能主要体现在:.

Storm :twitter的实时数据处理工具

- d0ngd0ng - yiihsia[互联网后端技术]_yiihsia[互联网后端技术]
昨天在家里一直发不出文章,于是干脆先发到了iteye上. Twitter在9月19日的Strange Loop大会上公布Storm的代码. 这个类似于Hadoop的即时数据处理工具是BackType开发的,后来被Twitter收购用于Twitter. Twitter列举了Storm的三大类应用:. 1. 信息流处理{Stream processing}.

开放实时数据处理平台 Twitter Storm

- We_Get - 开源中国社区最新软件
Storm 代码来自于Twitter上月收购的BackType,似乎是Twitter为方便用户解析数据的努力. 现在Storm的势头相当强劲,Twitter开发的使其完美的工具,已经变得非常强大. 类似于Hadoop,另一个开源数据操作平台,Storm也可能成为一项大业务. 据报道,雅虎正在考虑分拆Hadoop,打造一个规模达数十亿美元的业务.

Storm实时计算:流操作入门编程实践

- - 简单之美
Storm是一个分布式是实时计算系统,它设计了一种对流和计算的抽象,概念比较简单,实际编程开发起来相对容易. 下面,简单介绍编程实践过程中需要理解的Storm中的几个概念:. 一个Topology运行以后就不能停止,它会无限地运行下去,除非手动干预(显式执行bin/storm kill )或意外故障(如停机、整个Storm集群挂掉)让它终止.

使用Storm实现实时大数据分析

- - 开源软件 - ITeye博客
摘要:随着数据体积的越来越大,实时处理成为了许多机构需要面对的首要挑战. Shruthi Kumar和Siddharth Patankar在Dr.Dobb’s上结合了汽车超速监视,为我们演示了使用Storm进行实时大数据分析. 简单和明了,Storm让大数据分析变得轻松加愉快. 当今世界,公司的日常运营经常会生成TB级别的数据.

基于Storm的Nginx log实时监控系统

- - UC技术博客
UAE(UC App Engine)是一个UC内部的PaaS平台,总体架构有点类似CloudFoundry,包括:. 快速部署:支持Node.js、Play!、PHP等框架. 信息透明:运维过程、系统状态、业务状况. 灰度试错:IP灰度、地域灰度. 基础服务:key-value存储、MySQL高可用、图片平台等.

大数据架构:flume-ng+Kafka+Storm+HDFS 实时系统组合

- - 行业应用 - ITeye博客
大数据我们都知道hadoop,但并不都是hadoop.我们该如何构建大数据库项目. 对于离线处理,hadoop还是比较适合的,但是对于实时性比较强的,数据量比较大的,我们可以采用Storm,那么Storm和什么技术搭配,才能够做一个适合自己的项目. 可以带着下面问题来阅读本文章:. 1.一个好的项目架构应该具备什么特点.

实时收集Storm日志到ELK集群

- - 编程语言 - ITeye博客
我们的storm实时流计算项目已经上线几个月了,由于各种原因迟迟没有进行监控,每次出现问题都要登录好几台机器,然后使用sed,shell,awk,vi等各种命令来查询原因,效率非常低下,而且有些统计是没法做的,所以很有必要对storm本身相关的日志以及我们运行在storm上面的任务的日志做一个统一的日志收集,分析,查询,统计平台.

storm简介

- - 搜索技术博客-淘宝
伴随着信息科技日新月异的发展,信息呈现出爆发式的膨胀,人们获取信息的途径也更加多样、更加便捷,同时对于信息的时效性要求也越来越高. 举个搜索场景中的例子,当一个卖家发布了一条宝贝信息时,他希望的当然是这个宝贝马上就可以被卖家搜索出来、点击、购买啦,相反,如果这个宝贝要等到第二天或者更久才可以被搜出来,估计这个大哥就要骂娘了.