基于的Map/Reduce的ItemCF

标签: tech itemcf recommendation | 发表时间:2012-11-05 16:37 | 作者:Jia Mi
出处:http://www.mijia.org/blog

ItemCF为基于邻域的方法使用用户共同行为来对Item之间的相似度进行计算,从而利用k-近邻算法使用用户曾经有个行为的Item进行推荐。好处是系统只需要存储Item x Item的相似度矩阵,对于Item数量远小于用户数量的应用来说,具有很高的性价比。

为什么要并行

ItemCF最核心的计算为item之间相似度矩阵的计算,同时还需要能够在短时间内响应Item变化情况(用户有行为之后就会造成相似度矩阵的重新计算,实际中不会全部重新计算而会使用增量计算的方式。)。

可以想象ItemCF的输入为Item x User的行为矩阵(非常稀疏的矩阵,了解到稀疏度能够到1%就已经很不错了),如果每一行表示一个Item的话, 每个Item可以理解为用户行为表示的n维向量(u1, u2, …. un),然后需要对每个Item对进行向量距离计算就可以得到Item到Item的一个距离分布,也就组成了Item x Item的相似度矩阵。

了解到目前大多数公司在几十G数据量级上单机还能够撑住,所以这里的并行并不是指在多计算结点中进行计算,而是希望利用起单机服务器多CPU Core并行计算的能力。之前的实现方式为Python单线程运行,之前的数据需要几个小时的计算时间。需要找到一种可以利用并行计算的方式。 (Python貌似可以利用multiprocessing包,使用mmap进行内存共享)

为什么要Map/Reduce?

Map/Reduce 是一种先进的计算模型,可以在Map阶段将数据分片到不同的计算结点上进行,之后在Reduce阶段对各个结点返回的数据进行加工处理或者收集。 ItemCF使用的Item用户矩阵是一个高度稀疏的矩阵,矩阵的基本操作已经有Map/Reduce版本的实现,但是Item用户矩阵又有自己的特点,可以针对计算目标对过程进行另外的理解。

ItemCF基于Map/Reduce的基本思路有下面几个子过程:

  1. 输入数据为<uid, iid, rates>, 输出为<uid, (iid, iid …, iid)>用户行为列表,还有<iid, (uid, rates), (uid, rates) …>的Item用户向量
  2. 输入数据为<uid, (iid, iid, iid, ….)>, 输出为<iid, iid>的Item对(此处应该尽量优化,输出只有实际计算意义的Item)
  3. 输入数据为<iid, iid>Item对,输出为<iid, iid, iid_vector1, iid_vector2> (这点在实现中略有不同)
  4. 输入数据为<iid, iid, iid_vector1, iid_vector2>,输出为<iid, iid, distance>相似度距离矩阵

选择Spark/Scala计算平台

Spark是 一个基于Map/Reduce的高效计算平台,核心概念为他的RDD数据集概念,这个数据集可以用来方便的分布到计算结点上。Spark提供非常友好的 Map/Reduce封装,会自动将Map/Reduce分割为子任务交给调度然后分配到计算平台上进行计算。此外,他还提供了直接访问HDFS的接口, 可以运行在Mesos平台上,也可以单独进行Master/Worker的部署,同样可以跑在单机使用多进程(Master/Worker部署)和本地多线程(local[n]部署)模式上,从而充分利用单机的多核计算能力。Spark对内存没有特殊的要求,但是要想达到他高效的计算目标(30倍于传统 Hadoop MR任务),内存应该配置高一些。

Spark采用Scala语言编写,Scala是运行在JVM上的一种很有前途的语言,同时支持OOP和FP的编程模式,提供大量语法糖,非常适合用来编写DSL进行领域编程。同时,Scala可以把代码编译为Java的Byte Code,最后可以直接使用Java运行,效率很高。使用Scala实现单线程ItemCF过程,同样的数据集耗时100分钟完成。

代码分析

首先,Spark支持从本地读取文件,然后可以使用Map/Reduce方法对文件进行解析和统计。下面代码段表示读取文件(文件每一行为 用户Id/Item ID/Rate)并进行解析和预处理

   
val sc = new SparkContext("local[2]", "ItemCF")
val lines = sc.textFile("xxx.data")
val data = lines.map { line =>
    val vs = line.split("\t")
    ((vs(0).toLong, vs(1).toLong), (vs(1).toLong, (vs(0).toLong, vs(2).toDouble)))
  }

然后生成用户行为列表(<uid, (iid, iid, iid …)>)以及Item用户向量,其中需要包括具体数值(<iid, (uid, rate), (uid, rate)>)

   
val userVector = data.map(_._1).groupByKey().filter(_._2.length > 1)
val itemVector = sc.broadcast(data.map(_._2).groupByKey().map { p =>
    val squaredSum = p._2.foldLeft(0.0)((acc, x) => acc + x._2 * x._2)
    (p._1, (p._2, squaredSum))
  }.collect().toMap)


其中使用了sc.broadcast函数来在Spark环境中纪录一个全局的广播变量,这个广播变量可以被所有 Map/Reduce任务读取,类似于开辟了一个共享内存,这样所有子任务都可以从这个变量中读取数值。同时,我们在生成Item向量时预先计算了他的平方和,为了方便后面计算欧式距离。

接下来,需要生成所有需要计算的Item对<iid, iid>:

   
val itemPairs = userVector.map { p =>
    val is = for(item <- p._2; other <- p._2
                     if item > other
                ) yield(item, other)
    is
  }.flatMap(p => p)
  .distinct()

最后就可以对这个Item对进行距离计算,Spark会把他们分散到不同的计算结点(CPU Core)上

   
itemPairs.map { p =>
    val iv1 = itemVector.value(p._1)
    val iv2 = itemVector.value(p._2)
    var distance = iv1._2 + iv2._2
    for (a <- iv1._1; b <- iv2._1
          if (a._1 == b._1)) {
      distance += (a._2 - b._2) * (a._2 - b._2)
      distance -= a._2 * a._2 + b._2 * b._2
    }
    (p._1, p._2, sqrt(distance))
}


上面代码返回的结果就是每个Item对的距离向量,然后就可以保存数据以便之后的推荐时计算。采用上述过程,在本地四个Core上运行,之前同样数据量的计算耗时24分钟完成。因为Spark中提供了一个可以供所有子任务读取的广播变量,所以可以把itemVector作为广播变量让所有任务获得,在第三步的计算中就不需要代入对应的Item向量,貌似Mahout的实现中还需要代入这个向量的信息然后再分布计算。

相关 [map reduce itemcf] 推荐:

基于的Map/Reduce的ItemCF

- - M.J.
ItemCF为基于邻域的方法使用用户共同行为来对Item之间的相似度进行计算,从而利用k-近邻算法使用用户曾经有个行为的Item进行推荐. 好处是系统只需要存储Item x Item的相似度矩阵,对于Item数量远小于用户数量的应用来说,具有很高的性价比. ItemCF最核心的计算为item之间相似度矩阵的计算,同时还需要能够在短时间内响应Item变化情况(用户有行为之后就会造成相似度矩阵的重新计算,实际中不会全部重新计算而会使用增量计算的方式.

hadoop学习(七)WordCount+Block+Split+Shuffle+Map+Reduce技术详解

- - CSDN博客数据库推荐文章
纯干活:通过WourdCount程序示例:详细讲解MapReduce之Block+Split+Shuffle+Map+Reduce的区别及数据处理流程.        Shuffle过程是MapReduce的核心,集中了MR过程最关键的部分. 要想了解MR,Shuffle是必须要理解的. 了解Shuffle的过程,更有利于我们在对MapReduce job性能调优的工作,以及对MR内部机理有更深一步的了解.

map-reduce自定义分组自定义排序

- - 行业应用 - ITeye博客
1 * @author zm * * 当第一列相同时,求出第二列的最小值---> 由要求分析如下: * 1 必然以 row1来进行分组. * 2 必然也是以 row1,row2作为一个整体来进行比较才能有 当第一列相同时,在比较第二列的状态发生 * 3 mr中,执行流程是 -->-->--> *.

第一个完整的Map/Reduce小程序

- - ITeye博客
          从在自己的win7下面装好虚拟机,然后在虚拟机上面安装hadoop,然后再安装hadoop-eclipse插件,过去好像有一个星期了,之前装虚拟机和hadoop都没成功,上个星期解除了封印,一口气把hadoop学习前期的所有的东西都搞定了,接下来就是遥遥无期的hadoop之路.          今天按着别人的思路在win7下面的eclipse里面敲了算是处女作的Map/Reduce程序,虽然很简单,但是自己还是一步一步的走通了,因为hadoop是安装在虚拟机上的,但是eclipse是在win7下面,所以在中间运行的时候会有一系列的错误,昨天晚上把遇到的问题百度的百度,问神的问神,烧香的烧香,基本上都解决了,现在能把程序跑起来,感觉自己的熬夜什么的都没有白费.

map和reduce 个数的设定 (Hive优化)经典

- - 研发管理 - ITeye博客
一、    控制hive任务中的map数: . 1.    通常情况下,作业会通过input的目录产生一个或者多个map任务. 主要的决定因素有: input的文件总个数,input的文件大小,集群设置的文件块大小(目前为128M, 可在hive中通过set dfs.block.size;命令查看到,该参数不能自定义修改);.

JavaScript Source Map 详解

- - 阮一峰的网络日志
上周, jQuery 1.9发布. 这是2.0版之前的最后一个新版本,有很多新功能,其中一个就是支持Source Map. 访问 http://ajax.googleapis.com/ajax/libs/jquery/1.9.0/jquery.min.js,打开压缩后的版本,滚动到底部,你可以看到最后一行是这样的:.

mapreduce实例-Join连接 (reduce Side Join)

- - CSDN博客云计算推荐文章
//根据连接类型做不同处理. //设置不同map处理不同输入. 外键作为map输出的key,相同的外键值必然落在一个reduce中,在reduce端根据需要做不同形式的连接. 作者:liuzhoulong 发表于2013-9-5 21:35:16 原文链接. 阅读:83 评论:0 查看评论.

Hibernate调优之select new map()

- - CSDN博客架构设计推荐文章
        Hibernate调优不只是设置一下lazy,调整一下由谁来维护这个字段而已.         这次要说的是对查询语句进行优化——select new map().         select new map语句结果说明.         语句一:.         结果list中,每条记录对应一个object数组,object[]中每个元素为hql语句中列的序号(从0开始).

【转载】在Google Map上玩LEGO

- - HTML5研究小组
Google又放出了很帅又充满了Google式小清新风格的HTML5在线游戏,这次和LEGO合作——在Google Map上砌LEGO积木:. 这个游戏不知道是哪个和我一样买不起房的屌丝想出来的,不知道梦见几次在地球上某个有待开荒的土地上占一个山头盖属于自己的房子之后用满腔的热血把它做出来了. 不过貌似只能选择在大洋洲范围内的土地,估计开发者是那边的穷矮矬.

开发基于 Google Map 的 Android 应用

- - 博客 - 伯乐在线
简介: 随着移动互联网应用的迅速发展,利用智能手机提供的实时地理位置信息服务功能扩展出众多 LBS(Location Based Service) 应用,将实时地理位置信息与手机的便捷、移动特性结合,为人们提供多种多样的应用场景,比如实时定位、导航、搜索周围好友、基于地理位置的信息推荐等. 本文通过实例介绍如何开发基于 Google Map 的 Android 应用.