storm高并发UV统计

标签: storm 并发 uv | 发表时间:2017-05-14 22:05 | 作者:javafu
出处:http://www.iteye.com
统计高并发UV可行的方案(类似WordCount的计算去重word总数):
bolt1通过fieldGrouping 进行多线程局部汇总,下一级blot2进行单线程保存session_id和count数到Map且进行遍历,可以得到:
Pv、UV、访问深度(按每个session_id 的浏览数)
2014-05-01     UV数(按日期统计)


既然去重,必须持久化。两种持久化数据:

1、内存(适用中小型数据)
数据结构Map

2、no-sql 分布式数据库,如Hbase(适用大型数据)





1、数据源
public class SourceSpout implements IRichSpout{

	/**
	 * 数据源Spout
	 */
	private static final long serialVersionUID = 1L;
	
	Queue<String> queue = new ConcurrentLinkedQueue<String>();
	
	SpoutOutputCollector collector = null;
	
	String str = null;

	public void nextTuple() {
		if (queue.size() >= 0) {
			collector.emit(new Values(queue.poll()));
		}
		try {
			Thread.sleep(500) ;
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	
	}
	
	public void open(Map conf, TopologyContext context,
			SpoutOutputCollector collector) {
		try {
			this.collector = collector;
			
			Random random = new Random();
			String[] hosts = { "www.taobao.com" };
			String[] session_id = { "ABYH6Y4V4SCVXTG6DPB4VH9U123", "XXYH6YCGFJYERTT834R52FDXV9U34", "BBYH61456FGHHJ7JL89RG5VV9UYU7",
					"CYYH6Y2345GHI899OFG4V9U567", "VVVYH6Y4V4SFXZ56JIPDPB4V678" };
			String[] time = { "2014-01-07 08:40:50", "2014-01-07 08:40:51", "2014-01-07 08:40:52", "2014-01-07 08:40:53", 
					"2014-01-07 09:40:49", "2014-01-07 10:40:49", "2014-01-07 11:40:49", "2014-01-07 12:40:49" };
			
			for (int i = 0; i < 20; i++) {
				queue.add(hosts[0]+"\t"+session_id[random.nextInt(5)]+"\t"+time[random.nextInt(8)]);
			}
			
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
	public void close() {
		// TODO Auto-generated method stub
	}
	
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		// TODO Auto-generated method stub
		declarer.declare(new Fields("log"));
	}

	
	public Map<String, Object> getComponentConfiguration() {
		// TODO Auto-generated method stub
		return null;
	}
	
	public void ack(Object msgId) {
		// TODO Auto-generated method stub
		System.out.println("spout ack:"+msgId.toString());
	}

	
	public void activate() {
		// TODO Auto-generated method stub
		
	}



	
	public void deactivate() {
		// TODO Auto-generated method stub
		
	}

	
	public void fail(Object msgId) {
		// TODO Auto-generated method stub
		System.out.println("spout fail:"+msgId.toString());
	}

}


2、日期格式化处理类

public class FmtLogBolt implements IBasicBolt{

	/**
	 * 格式化日期
	 */
	private static final long serialVersionUID = 1L;

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("date","session_id"));
		
	}

	public Map<String, Object> getComponentConfiguration() {
		// TODO Auto-generated method stub
		return null;
	}

	public void prepare(Map stormConf, TopologyContext context) {
		// TODO Auto-generated method stub
		
	}

	String eachLog = null;
	public void execute(Tuple input, BasicOutputCollector collector) {
		eachLog=input.getStringByField("log");
		if (eachLog != null && eachLog.length() > 0 ) {
			collector.emit(new Values(DateFmt.getCountDate(eachLog.split("\t")[2],DateFmt.date_short),eachLog.split("\t")[1])) ;// 日期, session_id
		}
	}

	public void cleanup() {
		// TODO Auto-generated method stub
		
	}

}


3、多线程局部汇总深度数据

public class DeepVisitBolt implements IBasicBolt{

	/**
	 * 多线程局部汇总深度数据
	 */
	private static final long serialVersionUID = 1L;

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("date_session_id","count"));
		
	}

	public Map<String, Object> getComponentConfiguration() {
		// TODO Auto-generated method stub
		return null;
	}

	public void prepare(Map stormConf, TopologyContext context) {
		// TODO Auto-generated method stub
		
	}

	Map<String, Integer> counts = new HashMap<String, Integer>();
	public void execute(Tuple input, BasicOutputCollector collector) {
		String dateString =input.getStringByField("date");
		String session_id = input.getStringByField("session_id");
		Integer count = counts.get(dateString+"_"+session_id);
		if (count == null) {
			count = 0;
		}
		count ++ ;
		
		counts.put(dateString+"_"+session_id,count) ;
		collector.emit(new Values(dateString+"_"+session_id,count)) ;
	}

	public void cleanup() {
		// TODO Auto-generated method stub
		
	};

}


4、单线程汇总数据
public class UVSumBolt implements IBasicBolt{

	/**
	 * 单线程汇总数据
	 */
	private static final long serialVersionUID = 1L;
	Map<String, Integer> counts = new HashMap<String, Integer>();

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		// TODO Auto-generated method stub
	}

	public Map<String, Object> getComponentConfiguration() {
		// TODO Auto-generated method stub
		return null;
	}

	public void prepare(Map stormConf, TopologyContext context) {
		cur_date = DateFmt.getCountDate("2014-01-07", DateFmt.date_short);
		
	}
	
	long beginTime = System.currentTimeMillis() ;
	long endTime = 0;
	String cur_date = null;
	public void execute(Tuple input, BasicOutputCollector collector) {
		try {
			endTime = System.currentTimeMillis() ;
			long PV = 0;// 总数
			long UV = 0; // 个数,去重后

			String dateSession_id = input.getString(0);
			Integer count = input.getInteger(1);

			//清空不是当天的数据
			if (!dateSession_id.startsWith(cur_date)
					&& DateFmt.parseDate(dateSession_id.split("_")[0]).after(
							DateFmt.parseDate(cur_date))) {
				cur_date = dateSession_id.split("_")[0];
				counts.clear();
			}

			counts.put(dateSession_id, count);

			if (endTime - beginTime >= 2000) {//两秒输出一次
				// 获取word去重个数,遍历counts 的keySet,取count
				Iterator<String> i2 = counts.keySet().iterator();
				while (i2.hasNext()) {
					String key = i2.next();
					if (key != null) {
						if (key.startsWith(cur_date)) {
							UV++;
							PV += counts.get(key);
						}
					}
				}
				System.err.println("PV=" + PV + ";  UV="+ UV);
			}

		} catch (Exception e) {
			throw new FailedException("SumBolt fail!");
		}
		
	}

	public void cleanup() {
		// TODO Auto-generated method stub
		
	}

}


5、topoly类

public class UVTopo {

	/**
	 * topoly类
	 */
	public static void main(String[] args) {

		TopologyBuilder builder = new TopologyBuilder();

		builder.setSpout("spout", new SourceSpout(), 1);
		builder.setBolt("FmtLogBolt", new FmtLogBolt(), 4).shuffleGrouping("spout");
		// Fields Grouping:按Field分组,比如按word来分组, 具有同样word的tuple会被分到相同的Bolts, 而不同的word则会被分配到不同的Bolts。 
		builder.setBolt("sumBolt", new DeepVisitBolt(),4).fieldsGrouping("FmtLogBolt", new Fields("date","session_id"));
		builder.setBolt("UvSum", new UVSumBolt(), 1).shuffleGrouping("sumBolt") ;
		
		Config conf = new Config() ;
		conf.setDebug(true);

		if (args.length > 0) {
			try {
				StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
			} catch (AlreadyAliveException e) {
				e.printStackTrace();
			} catch (InvalidTopologyException e) {
				e.printStackTrace();
			} catch (AuthorizationException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}else {
			LocalCluster localCluster = new LocalCluster();
			localCluster.submitTopology("mytopology", conf, builder.createTopology());
		}

	}

}


6、pom.xml文件
引用

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.test</groupId>
  <artifactId>StormMavenProject</artifactId>
  <packaging>jar</packaging>
  <version>0.0.1-SNAPSHOT</version>
  <name>StormMavenProject</name>
  <url>http://maven.apache.org</url>
  <dependencies>
   
   <dependency>
    <groupId>org.ow2.asm</groupId>
    <artifactId>asm</artifactId>
    <version>5.0.3</version>
   </dependency>
<dependency>
    <groupId>org.clojure</groupId>
    <artifactId>clojure</artifactId>
    <version>1.7.0</version>
</dependency>
<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.3.2</version>
</dependency>
<dependency>
    <groupId>com.esotericsoftware</groupId>
    <artifactId>kryo</artifactId>
    <version>3.0.3</version>
</dependency>
<dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-api</artifactId>
    <version>2.8</version>
</dependency>
<dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-core</artifactId>
    <version>2.8</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>log4j-over-slf4j</artifactId>
    <version>1.6.6</version>
</dependency>
<dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-slf4j-impl</artifactId>
    <version>2.8</version>
</dependency>
<dependency>
    <groupId>com.esotericsoftware</groupId>
    <artifactId>minlog</artifactId>
    <version>1.3.0</version>
</dependency>
<dependency>
    <groupId>org.objenesis</groupId>
    <artifactId>objenesis</artifactId>
    <version>2.1</version>
</dependency>
<dependency>
    <groupId>com.esotericsoftware</groupId>
    <artifactId>reflectasm</artifactId>
    <version>1.10.1</version>
</dependency>

<dependency>
    <groupId>javax.servlet</groupId>
    <artifactId>servlet-api</artifactId>
    <version>2.5</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.7.21</version>
</dependency>
<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>1.1.0</version>
</dependency>
<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-rename-hack</artifactId>
    <version>1.1.0</version>
</dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>

<dependency>
    <groupId>ring-cors</groupId>
    <artifactId>ring-cors</artifactId>
    <version>0.1.5</version>
</dependency>


<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.10</version>
</dependency>



  </dependencies>
  <build>
    <finalName>StormMavenProject</finalName>
  </build>
</project>


7、日期处理类
public class DateFmt {
	/*
	 * 日期处理类
	 */
	public static final String date_long = "yyyy-MM-dd HH:mm:ss" ;
	public static final String date_short = "yyyy-MM-dd" ;
	
	public static SimpleDateFormat sdf = new SimpleDateFormat(date_short);
	
	public static String getCountDate(String date,String patton)
	{
		SimpleDateFormat sdf = new SimpleDateFormat(patton);
		Calendar cal = Calendar.getInstance(); 
		if (date != null) {
			try {
				cal.setTime(sdf.parse(date)) ;
			} catch (ParseException e) {
				e.printStackTrace();
			}
		}
		return sdf.format(cal.getTime());
	}
	
	public static Date parseDate(String dateStr) throws Exception
	{
		return sdf.parse(dateStr);
	}
	
	public static void main(String[] args) throws Exception{

//		System.out.println(DateFmt.getCountDate("2014-03-01 12:13:14", DateFmt.date_short));
		System.out.println(parseDate("2014-05-02").after(parseDate("2014-05-01")));
	}

}


8、测试结果






已有 0 人发表留言,猛击->> 这里<<-参与讨论


ITeye推荐



相关 [storm 并发 uv] 推荐:

storm高并发UV统计

- - 企业架构 - ITeye博客
统计高并发UV可行的方案(类似WordCount的计算去重word总数):. bolt1通过fieldGrouping 进行多线程局部汇总,下一级blot2进行单线程保存session_id和count数到Map且进行遍历,可以得到:. Pv、UV、访问深度(按每个session_id 的浏览数).

关于UV

- 木頭 - 我叫吴玮
把100款UV镜按好坏等分成两堆,每对衔接在一起,悬于canon大长焦镜头上,拍摄远处的景物特写图. 对比得出结果:不装UV镜最好,装50片好的UV会有很大影响,装50片相对差的UV会更恶化镜头成像. 我很想知道实验者最后想得出什么结论. 从上到下:无UV、优质UV和劣质UV,同一拍摄对象下的对比 .

storm简介

- - 搜索技术博客-淘宝
伴随着信息科技日新月异的发展,信息呈现出爆发式的膨胀,人们获取信息的途径也更加多样、更加便捷,同时对于信息的时效性要求也越来越高. 举个搜索场景中的例子,当一个卖家发布了一条宝贝信息时,他希望的当然是这个宝贝马上就可以被卖家搜索出来、点击、购买啦,相反,如果这个宝贝要等到第二天或者更久才可以被搜出来,估计这个大哥就要骂娘了.

Storm Trident 学习

- - 小火箭
Storm支持的三种语义:. 至少一次语义的Topology写法. 参考资料: Storm消息的可靠性保障 Storm提供了Acker的机制来保证数据至少被处理一次,是由编程人员决定是否使用这一特性,要使用这一特性需要:. 在Spout emit时添加一个MsgID,那么ack和fail方法将会被调用当Tuple被正确地处理了或发生了错误.

Storm实战之WordCount

- - 编程语言 - ITeye博客
 在全面介绍Storm之前,我们先通过一个简单的Demo让大家整体感受一下什么是Storm. 本地模式(Local Mode): 即Topology(相当于一个任务,后续会详细讲解)  运行在本地机器的单一JVM上,这个模式主要用来开发、调试. 远程模式(Remote Mode):在这个模式,我们把我们的Topology提交到集群,在这个模式中,Storm的所有组件都是线程安全的,因为它们都会运行在不同的Jvm或物理机器上,这个模式就是正式的生产模式.

瞎扯三个热点: UV / 移动 / 数据

- - 数据战略 Data strategy
瞎扯三个热点: UV / 移动 / 数据. 品觉每天都看大量的数据,以及坚持早上阅读各种关于电商额报道,今天来跟大家瞎扯几个观点. 和前两年相比,今年电商热度急剧下降. 基本每家都说生意不好做了,甚至一些过去很风光的电商都传出资金链断流等着被收购. 这其中的原因很多,各种分析都有人写过. 而我从数据的角度看到了一个有意思的现象,今年电子商务的UV增长跟业务增长是不匹配的.

storm常见问题解答

- - BlogJava-庄周梦蝶
    最近有朋友给我邮件问一些storm的问题,集中解答在这里. 一、我有一个数据文件,或者我有一个系统里面有数据,怎么导入storm做计算. 你需要实现一个Spout,Spout负责将数据emit到storm系统里,交给bolts计算. 怎么实现spout可以参考官方的kestrel spout实现:.

Storm 实时性分析

- - CSDN博客架构设计推荐文章
都说Storm是一个实时流处理系统,但Storm的实时性体现在什么方面呢. 首先有一个前提:这里的实时性和我们通常所说的实时系统(芯片+汇编或C编写的实时处理软件)的实时性肯定是没法比的,也不是同一个概念. 这里的实时性应该是一个相对的实时性(相对于Hadoop之类 ). 总结一下,Storm的实时性可能主要体现在:.

那些storm的坑坑

- - 开源软件 - ITeye博客
转载请声明出处:http://blackwing.iteye.com/blog/2147633. 在使用storm的过程中,感觉它还是不如hadoop那么成熟. 当然,它的流式处理能力挺让人眼前一亮,以前做的个性化推荐都是离线计算,现在总算把实时部分也加上了. 总结一下storm使用的些心得:. 1.尽量把大量数据处理行为分拆成多个处理component.

storm准实时应用

- - CSDN博客推荐文章
1 应用背景: 需要实时统计用户的登陆数,在线人数,活跃时间,下载等指标的数据,或者清洗后移到hdfs上.         1) 客户端产生数据---.         2) kafka-生产者实时采集数据(保留7天)-----.         3) storm实时消费数据,处理数据.         4)把实时数据统计结果缓存到memcached 中.