基于storm的在线关联规则

标签: storm 在线 | 发表时间:2014-08-08 17:34 | 作者:huilixiang
分享到:
出处:http://blog.csdn.net

    基于storm的在线视频推荐算法, 算法依据youtube的推荐算法  算法相对简单,可以认为是关联规则只挖掘频繁二项集。下面给出与storm的结合实现在线实时算法 , 关于storm见这里。首先给出数据流图(不同颜色的线条代表不同的数据流。在storm里面bolt也是可以声明数据流的。)

    

    关联规则挖掘数据项的时候,有事务的概念,这里的事务的定义为:给定时间窗口内用户看过的视频集。所以,我们需要这样一个bolt,根据实时日志收集每个用户看过的视频集----user_videos aggregate bolt。 我们怎样挖掘频繁二项集呢?其实就是视频对共同出现的次数,当视频a和b被共同观看的次数(用户看了视频a又看了视频b)大于某个阈值的时候,{a , b}就是一个频繁二项集。

所以我们定时的输出a:b这样的视频对,然后对其计数即可。这个任务是由video_pair counter bolt完成的。这样频繁项挖掘基本完了,如果对于推荐可能需要再走一步:对于看了a的人推荐b 的可信度有多高?如果为a推荐了b,那么对于b的曝光来说提升度是多少呢(可以这样理解,b本身很热门,你再把b推荐出来对于b本身曝光量没有多大作用,这也叫打压热门)? 所以我们需要一个计数器,里面有每个视频被观看的次数---video_counter_bolt。这样,我们就有了youtube算法公式所需要的所有值。

     storm本身是流式的,我们这里需要用到统计用户看过的视频集,所以得有一个池子,不停的收集用户看过的视频,定时的放水(定时放水的任务就有timed_notifier_spout完成)。所以整体的流程如下描述:

1、rt-log spout按user分组,将数据流推给uva-bolt.

2、tn-spout 会定期向下游推送时间窗口关闭的通知

3、uva-bolt里面维护一个map , 里面是用户到其观看过的视频集的映射。它第接收到一条日志就会更新这个map , 同时向计数器vc-bolt发送一条播放数据.当收到tn-spout的通知时,便会将map里面的数据构建成视频对,分组后推送给相关的vp-bolt.

4、vp-bolt 也会维护一个map , 用以视频对的计数。 当收到tn-spout的通知时向vc-bolt发送这些统计信息,并清空这个map.

3、vc-bolt内容也维护一个map , 里面是视频到其它被观看次数的映射 。它每接收到一条日志都会分析日志的类型, 如果是计数类型的就会更新这个map .如果收到vp-bolt的数据,便会计算两两视频的相似度(youtube的公式)。

               整个topology结构代码:

      

 <span style="white-space:pre">		</span>TopologyBuilder builder = new TopologyBuilder();
	        SpoutConfig spoutConfig = new SpoutConfig(new ZkHosts(conf.getString("zk.server")),
                                    conf.getString("topic"),conf.getString("zk.path"),conf.getString("myid"));
	        spoutConfig.scheme = new NginxLogScheme();
                builder.setSpout("nt-spout" , new NotifierSpout(900) , 1);
	        builder.setSpout("log-spout", new KafkaSpout(spoutConfig), 3);
	        builder.setBolt("uv-bolt", new UserVideoAggregationBolt(), conf.getInt("blot.threads"))
                    .fieldsGrouping("log-spout" , new Fields("cookie")).allGrouping("nt-spout" , "nt");
	        builder.setBolt("vp-bolt", new VideoPairBolt(), 3).fieldsGrouping("uv-bolt" , "vp" , new Fields("vidPair"))
                    .allGrouping("nt-spout" , "nt");
	        builder.setBolt("vc-bolt", new VideoCountBolt(), 3).allGrouping("uv-bolt" , "vc")
	        	.fieldsGrouping("vp-bolt" , "vc" , new Fields("vidPair"))
                .allGrouping("nt-spout" , "nt").addConfiguration("mysql.host", conf.getString("mysql.host"))
	        	.addConfiguration("mysql.usr",conf.getString("mysql.usr"))
	            .addConfiguration("mysql.pass",conf.getString("mysql.pass"))
	            .addConfiguration("mysql.port",conf.getInt("mysql.port"))
	            .addConfiguration("mysql.schema",conf.getString("mysql.schema"));
                builder.setBolt("rec-redis-bolt" , new RedisRecBolt() , 1).allGrouping("nt-spout" , "nt")
                    .addConfiguration("mysql.host", conf.getString("mysql.host"))
                    .addConfiguration("mysql.usr",conf.getString("mysql.usr"))
                    .addConfiguration("mysql.pass",conf.getString("mysql.pass"))
                    .addConfiguration("mysql.port",conf.getInt("mysql.port"))
                    .addConfiguration("mysql.schema",conf.getString("mysql.schema"));

注意事项:

1、bolt的outputcollector对于并发可能报错,需要一个定制的线程安全的outputcollector 。

2、这种实现方式属于试验性,不知其是否科学

3、storm会自动重启bolt , 理由是worker heartbeat timeout , 引起这个的问题可能是worker gc的问题。因为我这里有很多的内存缓存,所以会出现频繁full gc

                              以至于超时。这种频繁的full gc很可能是由于定期向下游放水时短时间内生成大量对象造成的。

4、以上代码仅限结构参考,没有整理。我们用到了kafka.

                


    

作者:huilixiang 发表于2014-8-8 17:34:46 原文链接
阅读:57 评论:0 查看评论

相关 [storm 在线] 推荐:

基于storm的在线关联规则

- - CSDN博客互联网推荐文章
    基于storm的在线视频推荐算法, 算法依据youtube的推荐算法  算法相对简单,可以认为是关联规则只挖掘频繁二项集. 下面给出与storm的结合实现在线实时算法 ,. 首先给出数据流图(不同颜色的线条代表不同的数据流. 在storm里面bolt也是可以声明数据流的.     关联规则挖掘数据项的时候,有事务的概念,这里的事务的定义为:给定时间窗口内用户看过的视频集.

storm简介

- - 搜索技术博客-淘宝
伴随着信息科技日新月异的发展,信息呈现出爆发式的膨胀,人们获取信息的途径也更加多样、更加便捷,同时对于信息的时效性要求也越来越高. 举个搜索场景中的例子,当一个卖家发布了一条宝贝信息时,他希望的当然是这个宝贝马上就可以被卖家搜索出来、点击、购买啦,相反,如果这个宝贝要等到第二天或者更久才可以被搜出来,估计这个大哥就要骂娘了.

Storm Trident 学习

- - 小火箭
Storm支持的三种语义:. 至少一次语义的Topology写法. 参考资料: Storm消息的可靠性保障 Storm提供了Acker的机制来保证数据至少被处理一次,是由编程人员决定是否使用这一特性,要使用这一特性需要:. 在Spout emit时添加一个MsgID,那么ack和fail方法将会被调用当Tuple被正确地处理了或发生了错误.

Storm实战之WordCount

- - 编程语言 - ITeye博客
 在全面介绍Storm之前,我们先通过一个简单的Demo让大家整体感受一下什么是Storm. 本地模式(Local Mode): 即Topology(相当于一个任务,后续会详细讲解)  运行在本地机器的单一JVM上,这个模式主要用来开发、调试. 远程模式(Remote Mode):在这个模式,我们把我们的Topology提交到集群,在这个模式中,Storm的所有组件都是线程安全的,因为它们都会运行在不同的Jvm或物理机器上,这个模式就是正式的生产模式.

storm常见问题解答

- - BlogJava-庄周梦蝶
    最近有朋友给我邮件问一些storm的问题,集中解答在这里. 一、我有一个数据文件,或者我有一个系统里面有数据,怎么导入storm做计算. 你需要实现一个Spout,Spout负责将数据emit到storm系统里,交给bolts计算. 怎么实现spout可以参考官方的kestrel spout实现:.

Storm 实时性分析

- - CSDN博客架构设计推荐文章
都说Storm是一个实时流处理系统,但Storm的实时性体现在什么方面呢. 首先有一个前提:这里的实时性和我们通常所说的实时系统(芯片+汇编或C编写的实时处理软件)的实时性肯定是没法比的,也不是同一个概念. 这里的实时性应该是一个相对的实时性(相对于Hadoop之类 ). 总结一下,Storm的实时性可能主要体现在:.

那些storm的坑坑

- - 开源软件 - ITeye博客
转载请声明出处:http://blackwing.iteye.com/blog/2147633. 在使用storm的过程中,感觉它还是不如hadoop那么成熟. 当然,它的流式处理能力挺让人眼前一亮,以前做的个性化推荐都是离线计算,现在总算把实时部分也加上了. 总结一下storm使用的些心得:. 1.尽量把大量数据处理行为分拆成多个处理component.

storm准实时应用

- - CSDN博客推荐文章
1 应用背景: 需要实时统计用户的登陆数,在线人数,活跃时间,下载等指标的数据,或者清洗后移到hdfs上.         1) 客户端产生数据---.         2) kafka-生产者实时采集数据(保留7天)-----.         3) storm实时消费数据,处理数据.         4)把实时数据统计结果缓存到memcached 中.

Storm核心概念剖析

- - 互联网 - ITeye博客
最近团队中有分析的场景,用到了JStorm来做数据的实时分析,于是花时间对于一些概念做了了解. 这个的话出来应该有几年时间了,阿里巴巴也重写了一套JStorm,核心的类名都是服用的Storm的,他是一套实时数据处理系统,容错行好,然后足够稳定,目前很多数据实时分析的场景,选择Storm的越来越多了.

Kafka+Storm+HDFS整合实践

- -
原文地址: http://shiyanjun.cn/archives/934.html. 在基于Hadoop平台的很多应用场景中,我们需要对数据进行离线和实时分析,离线分析可以很容易地借助于Hive来实现统计分析,但是对于实时的 需求Hive就不合适了. 实时应用场景可以使用Storm,它是一个实时处理系统,它为实时处理类应用提供了一个计算模型,可以很容易地进行编程处理.