基于的Map/Reduce的ItemCF
ItemCF为基于邻域的方法使用用户共同行为来对Item之间的相似度进行计算,从而利用k-近邻算法使用用户曾经有个行为的Item进行推荐。好处是系统只需要存储Item x Item的相似度矩阵,对于Item数量远小于用户数量的应用来说,具有很高的性价比。
为什么要并行
ItemCF最核心的计算为item之间相似度矩阵的计算,同时还需要能够在短时间内响应Item变化情况(用户有行为之后就会造成相似度矩阵的重新计算,实际中不会全部重新计算而会使用增量计算的方式。)。
可以想象ItemCF的输入为Item x User的行为矩阵(非常稀疏的矩阵,了解到稀疏度能够到1%就已经很不错了),如果每一行表示一个Item的话, 每个Item可以理解为用户行为表示的n维向量(u1, u2, …. un),然后需要对每个Item对进行向量距离计算就可以得到Item到Item的一个距离分布,也就组成了Item x Item的相似度矩阵。
了解到目前大多数公司在几十G数据量级上单机还能够撑住,所以这里的并行并不是指在多计算结点中进行计算,而是希望利用起单机服务器多CPU Core并行计算的能力。之前的实现方式为Python单线程运行,之前的数据需要几个小时的计算时间。需要找到一种可以利用并行计算的方式。 (Python貌似可以利用multiprocessing包,使用mmap进行内存共享)
为什么要Map/Reduce?
Map/Reduce 是一种先进的计算模型,可以在Map阶段将数据分片到不同的计算结点上进行,之后在Reduce阶段对各个结点返回的数据进行加工处理或者收集。 ItemCF使用的Item用户矩阵是一个高度稀疏的矩阵,矩阵的基本操作已经有Map/Reduce版本的实现,但是Item用户矩阵又有自己的特点,可以针对计算目标对过程进行另外的理解。
ItemCF基于Map/Reduce的基本思路有下面几个子过程:
- 输入数据为<uid, iid, rates>, 输出为<uid, (iid, iid …, iid)>用户行为列表,还有<iid, (uid, rates), (uid, rates) …>的Item用户向量
- 输入数据为<uid, (iid, iid, iid, ….)>, 输出为<iid, iid>的Item对(此处应该尽量优化,输出只有实际计算意义的Item)
- 输入数据为<iid, iid>Item对,输出为<iid, iid, iid_vector1, iid_vector2> (这点在实现中略有不同)
- 输入数据为<iid, iid, iid_vector1, iid_vector2>,输出为<iid, iid, distance>相似度距离矩阵
选择Spark/Scala计算平台
Spark是 一个基于Map/Reduce的高效计算平台,核心概念为他的RDD数据集概念,这个数据集可以用来方便的分布到计算结点上。Spark提供非常友好的 Map/Reduce封装,会自动将Map/Reduce分割为子任务交给调度然后分配到计算平台上进行计算。此外,他还提供了直接访问HDFS的接口, 可以运行在Mesos平台上,也可以单独进行Master/Worker的部署,同样可以跑在单机使用多进程(Master/Worker部署)和本地多线程(local[n]部署)模式上,从而充分利用单机的多核计算能力。Spark对内存没有特殊的要求,但是要想达到他高效的计算目标(30倍于传统 Hadoop MR任务),内存应该配置高一些。
Spark采用Scala语言编写,Scala是运行在JVM上的一种很有前途的语言,同时支持OOP和FP的编程模式,提供大量语法糖,非常适合用来编写DSL进行领域编程。同时,Scala可以把代码编译为Java的Byte Code,最后可以直接使用Java运行,效率很高。使用Scala实现单线程ItemCF过程,同样的数据集耗时100分钟完成。
代码分析
首先,Spark支持从本地读取文件,然后可以使用Map/Reduce方法对文件进行解析和统计。下面代码段表示读取文件(文件每一行为 用户Id/Item ID/Rate)并进行解析和预处理
val sc = new SparkContext("local[2]", "ItemCF")
val lines = sc.textFile("xxx.data")
val data = lines.map { line =>
val vs = line.split("\t")
((vs(0).toLong, vs(1).toLong), (vs(1).toLong, (vs(0).toLong, vs(2).toDouble)))
}
然后生成用户行为列表(<uid, (iid, iid, iid …)>)以及Item用户向量,其中需要包括具体数值(<iid, (uid, rate), (uid, rate)>)
val userVector = data.map(_._1).groupByKey().filter(_._2.length > 1)
val itemVector = sc.broadcast(data.map(_._2).groupByKey().map { p =>
val squaredSum = p._2.foldLeft(0.0)((acc, x) => acc + x._2 * x._2)
(p._1, (p._2, squaredSum))
}.collect().toMap)
其中使用了sc.broadcast函数来在Spark环境中纪录一个全局的广播变量,这个广播变量可以被所有 Map/Reduce任务读取,类似于开辟了一个共享内存,这样所有子任务都可以从这个变量中读取数值。同时,我们在生成Item向量时预先计算了他的平方和,为了方便后面计算欧式距离。
接下来,需要生成所有需要计算的Item对<iid, iid>:
val itemPairs = userVector.map { p =>
val is = for(item <- p._2; other <- p._2
if item > other
) yield(item, other)
is
}.flatMap(p => p)
.distinct()
最后就可以对这个Item对进行距离计算,Spark会把他们分散到不同的计算结点(CPU Core)上
itemPairs.map { p =>
val iv1 = itemVector.value(p._1)
val iv2 = itemVector.value(p._2)
var distance = iv1._2 + iv2._2
for (a <- iv1._1; b <- iv2._1
if (a._1 == b._1)) {
distance += (a._2 - b._2) * (a._2 - b._2)
distance -= a._2 * a._2 + b._2 * b._2
}
(p._1, p._2, sqrt(distance))
}
上面代码返回的结果就是每个Item对的距离向量,然后就可以保存数据以便之后的推荐时计算。采用上述过程,在本地四个Core上运行,之前同样数据量的计算耗时24分钟完成。因为Spark中提供了一个可以供所有子任务读取的广播变量,所以可以把itemVector作为广播变量让所有任务获得,在第三步的计算中就不需要代入对应的Item向量,貌似Mahout的实现中还需要代入这个向量的信息然后再分布计算。