storm高并发UV统计
- - 企业架构 - ITeye博客统计高并发UV可行的方案(类似WordCount的计算去重word总数):. bolt1通过fieldGrouping 进行多线程局部汇总,下一级blot2进行单线程保存session_id和count数到Map且进行遍历,可以得到:. Pv、UV、访问深度(按每个session_id 的浏览数).
public class SourceSpout implements IRichSpout{ /** * 数据源Spout */ private static final long serialVersionUID = 1L; Queue<String> queue = new ConcurrentLinkedQueue<String>(); SpoutOutputCollector collector = null; String str = null; public void nextTuple() { if (queue.size() >= 0) { collector.emit(new Values(queue.poll())); } try { Thread.sleep(500) ; } catch (InterruptedException e) { e.printStackTrace(); } } public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { try { this.collector = collector; Random random = new Random(); String[] hosts = { "www.taobao.com" }; String[] session_id = { "ABYH6Y4V4SCVXTG6DPB4VH9U123", "XXYH6YCGFJYERTT834R52FDXV9U34", "BBYH61456FGHHJ7JL89RG5VV9UYU7", "CYYH6Y2345GHI899OFG4V9U567", "VVVYH6Y4V4SFXZ56JIPDPB4V678" }; String[] time = { "2014-01-07 08:40:50", "2014-01-07 08:40:51", "2014-01-07 08:40:52", "2014-01-07 08:40:53", "2014-01-07 09:40:49", "2014-01-07 10:40:49", "2014-01-07 11:40:49", "2014-01-07 12:40:49" }; for (int i = 0; i < 20; i++) { queue.add(hosts[0]+"\t"+session_id[random.nextInt(5)]+"\t"+time[random.nextInt(8)]); } } catch (Exception e) { e.printStackTrace(); } } public void close() { // TODO Auto-generated method stub } public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub declarer.declare(new Fields("log")); } public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } public void ack(Object msgId) { // TODO Auto-generated method stub System.out.println("spout ack:"+msgId.toString()); } public void activate() { // TODO Auto-generated method stub } public void deactivate() { // TODO Auto-generated method stub } public void fail(Object msgId) { // TODO Auto-generated method stub System.out.println("spout fail:"+msgId.toString()); } }
public class FmtLogBolt implements IBasicBolt{ /** * 格式化日期 */ private static final long serialVersionUID = 1L; public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("date","session_id")); } public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } public void prepare(Map stormConf, TopologyContext context) { // TODO Auto-generated method stub } String eachLog = null; public void execute(Tuple input, BasicOutputCollector collector) { eachLog=input.getStringByField("log"); if (eachLog != null && eachLog.length() > 0 ) { collector.emit(new Values(DateFmt.getCountDate(eachLog.split("\t")[2],DateFmt.date_short),eachLog.split("\t")[1])) ;// 日期, session_id } } public void cleanup() { // TODO Auto-generated method stub } }
public class DeepVisitBolt implements IBasicBolt{ /** * 多线程局部汇总深度数据 */ private static final long serialVersionUID = 1L; public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("date_session_id","count")); } public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } public void prepare(Map stormConf, TopologyContext context) { // TODO Auto-generated method stub } Map<String, Integer> counts = new HashMap<String, Integer>(); public void execute(Tuple input, BasicOutputCollector collector) { String dateString =input.getStringByField("date"); String session_id = input.getStringByField("session_id"); Integer count = counts.get(dateString+"_"+session_id); if (count == null) { count = 0; } count ++ ; counts.put(dateString+"_"+session_id,count) ; collector.emit(new Values(dateString+"_"+session_id,count)) ; } public void cleanup() { // TODO Auto-generated method stub }; }
public class UVSumBolt implements IBasicBolt{ /** * 单线程汇总数据 */ private static final long serialVersionUID = 1L; Map<String, Integer> counts = new HashMap<String, Integer>(); public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub } public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } public void prepare(Map stormConf, TopologyContext context) { cur_date = DateFmt.getCountDate("2014-01-07", DateFmt.date_short); } long beginTime = System.currentTimeMillis() ; long endTime = 0; String cur_date = null; public void execute(Tuple input, BasicOutputCollector collector) { try { endTime = System.currentTimeMillis() ; long PV = 0;// 总数 long UV = 0; // 个数,去重后 String dateSession_id = input.getString(0); Integer count = input.getInteger(1); //清空不是当天的数据 if (!dateSession_id.startsWith(cur_date) && DateFmt.parseDate(dateSession_id.split("_")[0]).after( DateFmt.parseDate(cur_date))) { cur_date = dateSession_id.split("_")[0]; counts.clear(); } counts.put(dateSession_id, count); if (endTime - beginTime >= 2000) {//两秒输出一次 // 获取word去重个数,遍历counts 的keySet,取count Iterator<String> i2 = counts.keySet().iterator(); while (i2.hasNext()) { String key = i2.next(); if (key != null) { if (key.startsWith(cur_date)) { UV++; PV += counts.get(key); } } } System.err.println("PV=" + PV + "; UV="+ UV); } } catch (Exception e) { throw new FailedException("SumBolt fail!"); } } public void cleanup() { // TODO Auto-generated method stub } }
public class UVTopo { /** * topoly类 */ public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new SourceSpout(), 1); builder.setBolt("FmtLogBolt", new FmtLogBolt(), 4).shuffleGrouping("spout"); // Fields Grouping:按Field分组,比如按word来分组, 具有同样word的tuple会被分到相同的Bolts, 而不同的word则会被分配到不同的Bolts。 builder.setBolt("sumBolt", new DeepVisitBolt(),4).fieldsGrouping("FmtLogBolt", new Fields("date","session_id")); builder.setBolt("UvSum", new UVSumBolt(), 1).shuffleGrouping("sumBolt") ; Config conf = new Config() ; conf.setDebug(true); if (args.length > 0) { try { StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } catch (AlreadyAliveException e) { e.printStackTrace(); } catch (InvalidTopologyException e) { e.printStackTrace(); } catch (AuthorizationException e) { // TODO Auto-generated catch block e.printStackTrace(); } }else { LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("mytopology", conf, builder.createTopology()); } } }
public class DateFmt { /* * 日期处理类 */ public static final String date_long = "yyyy-MM-dd HH:mm:ss" ; public static final String date_short = "yyyy-MM-dd" ; public static SimpleDateFormat sdf = new SimpleDateFormat(date_short); public static String getCountDate(String date,String patton) { SimpleDateFormat sdf = new SimpleDateFormat(patton); Calendar cal = Calendar.getInstance(); if (date != null) { try { cal.setTime(sdf.parse(date)) ; } catch (ParseException e) { e.printStackTrace(); } } return sdf.format(cal.getTime()); } public static Date parseDate(String dateStr) throws Exception { return sdf.parse(dateStr); } public static void main(String[] args) throws Exception{ // System.out.println(DateFmt.getCountDate("2014-03-01 12:13:14", DateFmt.date_short)); System.out.println(parseDate("2014-05-02").after(parseDate("2014-05-01"))); } }