【实践】Spark 协同过滤ALS之Item2Item相似度计算优化 - CSDN博客

标签: | 发表时间:2018-08-03 15:29 | 作者:
出处:https://blog.csdn.net

最近项目在做推荐系统中 match策略中的 CF召回优化,自之前第一版自己实现的基于item的协同过滤算法 http://blog.csdn.net/dengxing1234/article/details/76122465,考虑到用户隐型评分的 稀疏性问题,所以尝试用Spark ml包(非mllib)中的ALS算法的中间产物item的隐性向量,进行进一步item到item的余弦相似度计算。

由于item的数据量较大(百万级别),涉及计算每个item与其他所有item的向量计算,尝试过以下几种方法:

  • (1)spark rdd的笛卡尔积
  • (2)spark dataframe的cross join
  • (3)避免集群shuffle,采用driver端的本地计算
  • (4)spark分布式矩阵运算IndexedRowMatrix
效果都不佳,数据量一大几乎任务没法运行,本文从两个方面优化相似度计算过程,第一从计算引擎底层考虑使用Breeze Blas计算包,第二从数据结构算法方面考虑,以下是自己的实现代码

      package model

import org.apache.log4j.{Level, Logger}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.DataFrame
import scala.collection.mutable
import org.apache.spark.ml.recommendation.ALS
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import util.BlasSim


object ModelBasedCF {
  case class Rating(userId: Int, movieId: Int, rating: Double)
  /** 基于dt时间获取原始数据源
    *
    * @param spark SparkContext
    * @param hql   hql
    * @return 原始数据的dataFrame
    */
  def getResource(spark: SparkSession, hql: String) = {
    import spark.sql
    val resource = sql(hql)
    resource
  }

  /**
    * 基于item相似度矩阵为user生成topN推荐列表
    *
    * @param resource
    * @param item_sim_bd
    * @param topN
    * @return RDD[(user,List[(item,score)])]
    */
  def recommend(resource: DataFrame, item_sim_bd: Broadcast[scala.collection.Map[String, List[(String, Double)]]], topN: Int = 50) = {
    val user_item_score = resource.rdd.map(
      row => {
        val uid = row.getString(0)
        val aid = row.getString(1)
        val score = row.getDouble(2)
        ((uid, aid), score)
      }
    )
    /*
     * 提取user_item_score为((user,item2),sim * score)
     * RDD[(user,item2),sim * score]
     */
    val user_item_simscore = user_item_score.flatMap(
      f => {
        val items_sim = item_sim_bd.value.getOrElse(f._1._2, List(("0", 0.0)))
        for (w <- items_sim) yield ((f._1._1, w._1), w._2 * f._2)
      })

    /*
     * 聚合user_item_simscore为 ((user,item2),sim1 * score1 + sim2 * score2))
     * 假设user观看过两个item,评分分别为score1和score2,item2是与user观看过的两个item相似的item,相似度分别为sim1,sim2
     * RDD[(user,item2),sim1 * score1 + sim2 * score2))]
     */
    val user_item_rank = user_item_simscore.reduceByKey(_ + _, 1000)
    /*
     * 过滤用户已看过的item,并对user_item_rank基于user聚合
     * RDD[(user,CompactBuffer((item2,rank2),(item3,rank3)...))]
     */
    // val user_items_ranks = user_item_rank.subtractByKey(user_item_score).map(f => (f._1._1, (f._1._2, f._2))).groupByKey(500)
    val user_items_ranks = user_item_rank.map(f => (f._1._1, (f._1._2, f._2))).groupByKey(500)
    /*
     * 对user_items_ranks基于rank降序排序,并提取topN,其中包括用户已观看过的item
     * RDD[(user,ArrayBuffer((item2,rank2),...,(itemN,rankN)))]
     */
    val user_items_ranks_desc = user_items_ranks.map(f => {
      val item_rank_list = f._2.toList
      val item_rank_desc = item_rank_list.sortWith((x, y) => x._2 > y._2)
      (f._1, item_rank_desc.take(topN))
    })
    user_items_ranks_desc
  }

  /**
    * 计算推荐的召回率
    *
    * @param recTopN
    * @param testData
    */
  def getRecall(recTopN: RDD[(String, List[(String, Double)])], testData: DataFrame) = {
    val uid_rec = recTopN.flatMap(r => {
      val uid = r._1
      val itemList = r._2
      for (item <- itemList) yield (uid, item._1)
    })
    val uid_test = testData.rdd.map(row => {
      val uid = row.getString(0)
      val aid = row.getString(1)
      (uid, aid)
    })
    uid_rec.intersection(uid_test).count() / uid_test.count().toDouble
  }

  def main(args: Array[String]) {
    //屏蔽日志
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

    val spark = SparkSession
      .builder
      .appName("ModelBased CF")
      .enableHiveSupport()
      .getOrCreate()

    val trainHql = args(0)
    val testHql = args(1)

    val trainDataFrame = getResource(spark, trainHql).repartition(100).cache()
    val testDataRrame = getResource(spark, testHql).repartition(100)

    val trainRdd = trainDataFrame.rdd.map(row => {
      val uid = row.getString(0)
      val aid = row.getString(1)
      val score = row.getDouble(2)
      (uid, (aid, score))
    }).cache()

    val userIndex = trainRdd.map(x => x._1).distinct().zipWithIndex().map(x => (x._1, x._2.toInt)).cache()
    val itemIndex = trainRdd.map(x => x._2._1).distinct().zipWithIndex().map(x => (x._1, x._2.toInt)).cache()

    import spark.sqlContext.implicits._
    val trainRatings = trainRdd
      .join(userIndex, 500)
      .map(x => (x._2._1._1, (x._2._2, x._2._1._2)))
      .join(itemIndex, 500)
      .map(x => Rating(x._2._1._1.toInt, x._2._2.toInt, x._2._1._2.toDouble)).toDF()


    val rank = 200

    val als = new ALS()
      .setMaxIter(10)
      .setRank(rank)
      .setNumBlocks(100)
      .setUserCol("userId")
      .setItemCol("movieId")
      .setRatingCol("rating")
      .setImplicitPrefs(true)

    val model = als.fit(trainRatings)
    val itemFeature = model.itemFactors.rdd.map(
      row => {
        val item = row.getInt(0)
        val vec = row.get(1).asInstanceOf[mutable.WrappedArray[Float]]
        (item, vec)
      }
    ).sortByKey().collect()



    val numItems = itemIndex.count().toInt
    val itemVectors = itemFeature.flatMap(x => x._2)
    val itemIndex_tmp = itemIndex.collectAsMap()

    val blasSim = new BlasSim(numItems, rank, itemVectors, itemIndex_tmp)
    val itemString = itemIndex.map(x => (x._2, x._1)).repartition(100)

    val item_sim_rdd = itemString.map(x => {
      (x._2, blasSim.getCosinSimilarity((itemFeature(x._1)._2.toVector), 50, None).toList)
    })


    // 广播相似度矩阵
    val item_sim_map = item_sim_rdd.collectAsMap()
    val item_sim_bd: Broadcast[scala.collection.Map[String, List[(String, Double)]]] = spark.sparkContext.broadcast(item_sim_map)
    // 为用户生成推荐列表
    val recTopN = recommend(trainDataFrame, item_sim_bd, 50)
    // 计算召回率
    println(getRecall(recTopN, testDataRrame))


    spark.stop()
  }
}

      package util

import com.github.fommil.netlib.BLAS.{getInstance => blas}
import java.io.Serializable
import java.util.{PriorityQueue => JPriorityQueue}
import scala.collection.JavaConverters._
import scala.collection.Map
import scala.collection.generic.Growable


class BlasSim(val numItems: Int, val vectorSize: Int, val itemVectors: Array[Float], val itemIndex: Map[java.lang.String, Int])extends Serializable{

  val itemList = {
    val (wl, _) = itemIndex.toSeq.sortBy(_._2).unzip
    wl.toArray
  }

  val wordVecNorms: Array[Double] = {
    val wordVecNorms = new Array[Double](numItems)
    var i = 0
    while (i < numItems) {
      val vec = itemVectors.slice(i * vectorSize, i * vectorSize + vectorSize)
      wordVecNorms(i) = blas.snrm2(vectorSize, vec, 1)
      i += 1
    }
    wordVecNorms
  }

  def getCosinSimilarity(vector:Vector[Float], num: Int, wordOpt: Option[String]): Array[(String, Double)] = {
    require(num > 0, "Number of similar words should > 0")
    val fVector = vector.toArray.map(_.toFloat)
    val cosineVec = Array.fill[Float](numItems)(0)
    val alpha: Float = 1
    val beta: Float = 0
    // 归一化输入向量
    val vecNorm = blas.snrm2(vectorSize, fVector, 1)
    if (vecNorm != 0.0f) {
      blas.sscal(vectorSize, 1 / vecNorm, fVector, 0, 1)
    }
    blas.sgemv("T", vectorSize, numItems, alpha, itemVectors, vectorSize, fVector, 1, beta, cosineVec, 1)
    val cosVec = cosineVec.map(_.toDouble)
    var ind = 0
    while (ind < numItems) {
      val norm = wordVecNorms(ind)
      if (norm == 0.0) {
        cosVec(ind) = 0.0
      } else {
        cosVec(ind) /= norm
      }
      ind += 1
    }
    val pq = new BoundedPriorityQueue[(String, Double)](num + 1)(Ordering.by(_._2))
    for (i <- cosVec.indices) {
      pq += Tuple2(itemList(i), cosVec(i))
    }

    val scored = pq.toSeq.sortBy(-_._2)

    val filtered = wordOpt match {
      case Some(w) => scored.filter(tup => w != tup._1)
      case None => scored
    }
    filtered.take(num).toArray
  }
}

class BoundedPriorityQueue[A](maxSize: Int)(implicit ord: Ordering[A])
  extends Iterable[A] with Growable[A] with Serializable {

  private val underlying = new JPriorityQueue[A](maxSize, ord)

  override def iterator: Iterator[A] = underlying.iterator.asScala

  override def size: Int = underlying.size

  override def ++=(xs: TraversableOnce[A]): this.type = {
    xs.foreach {
      this += _
    }
    this
  }

  override def +=(elem: A): this.type = {
    if (size < maxSize) {
      underlying.offer(elem)
    } else {
      maybeReplaceLowest(elem)
    }
    this
  }

  override def +=(elem1: A, elem2: A, elems: A*): this.type = {
    this += elem1 += elem2 ++= elems
  }

  override def clear() {
    underlying.clear()
  }

  private def maybeReplaceLowest(a: A): Boolean = {
    val head = underlying.peek()
    if (head != null && ord.gt(a, head)) {
      underlying.poll()
      underlying.offer(a)
    } else {
      false
    }
  }
}



相关 [实践 spark 协同过滤] 推荐:

【实践】Spark 协同过滤ALS之Item2Item相似度计算优化 - CSDN博客

- -
CF召回优化,自之前第一版自己实现的基于item的协同过滤算法. http://blog.csdn.net/dengxing1234/article/details/76122465,考虑到用户隐型评分的. 稀疏性问题,所以尝试用Spark ml包(非mllib)中的ALS算法的中间产物item的隐性向量,进行进一步item到item的余弦相似度计算.

Spark MLlib中的协同过滤

- - JavaChen Blog
本文主要通过Spark官方的例子理解ALS协同过滤算法的原理和编码过程,然后通过对电影进行推荐来熟悉一个完整的推荐过程. 协同过滤常被应用于推荐系统,旨在补充用户-商品关联矩阵中所缺失的部分. MLlib当前支持基于模型的协同过滤,其中用户和商品通过一小组隐语义因子进行表达,并且这些因子也用于预测缺失的元素.

使用Mahout实现协同过滤 spark

- - zzm
Mahout使用了Taste来提高协同过滤算法的实现,它是一个基于Java实现的可扩展的,高效的推荐引擎. Taste既实现了最基本的基 于用户的和基于内容的推荐算法,同时也提供了扩展接口,使用户可以方便的定义和实现自己的推荐算法. 同时,Taste不仅仅只适用于Java应用程序,它 可以作为内部服务器的一个组件以HTTP和Web Service的形式向外界提供推荐的逻辑.

如何使用Spark ALS实现协同过滤

- - 鸟窝
转载自 JavaChen Blog,作者: Junez. 本文主要记录最近一段时间学习和实现Spark MLlib中的协同过滤的一些总结,希望对大家熟悉Spark ALS算法有所帮助. 【2016.06.12】Spark1.4.0中MatrixFactorizationModel提供了recommendForAll方法实现离线批量推荐,见 SPARK-3066.

基于Spark MLlib平台的协同过滤算法---电影推荐系统

- - zzm
又好一阵子没有写文章了,阿弥陀佛...最近项目中要做理财推荐,所以,回过头来回顾一下协同过滤算法在推荐系统中的应用.     说到推荐系统,大家可能立马会想到协同过滤算法. 本文基于Spark MLlib平台实现一个向用户推荐电影的简单应用. 基于模型的协同过滤应用---电影推荐.     一、协同过滤算法概述.

[原]Spark MLlib系列(二):基于协同过滤的电影推荐系统

- -
随着大数据时代的到来,数据当中挖取金子的工作越来越有吸引力. 利用Spark在内存迭代运算、机器学习领域强悍性能的优势,使用spark处理数据挖掘问题就显得很有实际价值. 这篇文章给大家分享一个spark MLlib 的推荐实战例子. 我将会分享怎样用spark MLlib做一个电影评分的推荐系统.

RHadoop实践系列之三 R实现MapReduce的协同过滤算法

- - 统计之都
Author:张丹(Conan). @晒粉丝 http://www.fens.me. @每日中国天气 http://apps.weibo.com/chinaweatherapp. RHadoop实践系列文章. RHadoop实践系列文章,包含了R语言与Hadoop结合进行海量数据分析. Hadoop主要用来存储海量数据,R语言完成MapReduce 算法,用来替代Java的MapReduce实现.

Spark在美团的实践

- - 美团点评技术团队
本文已发表在《程序员》杂志2016年4月期. 美团是数据驱动的互联网服务,用户每天在美团上的点击、浏览、下单支付行为都会产生海量的日志,这些日志数据将被汇总处理、分析、挖掘与学习,为美团的各种推荐、搜索系统甚至公司战略目标制定提供数据支持. 大数据处理渗透到了美团各业务线的各种应用场景,选择合适、高效的数据处理引擎能够大大提高数据生产的效率,进而间接或直接提升相关团队的工作效率.

Spark Streaming 调优实践

- - IT瘾-dev
分享嘉宾:肖力涛 拼多多 资深算法工程师. 注:欢迎转载,转载请注明出处. 在使用 Spark 和 Spark Streaming 时,当我们将应用部署在集群上时,可能会碰到运行慢、占用过多资源、不稳定等问题,这时需要做一些优化才能达到最好的性能. 有时候一个简单的优化可以起到化腐朽为神奇的作用,使得程序能够更加有效率,也更加节省资源.

协同过滤算法

- - CSDN博客推荐文章
今天要讲的主要内容是 协同过滤,即Collaborative Filtering,简称 CF.    关于协同过滤的一个最经典的例子就是看电影,有时候不知道哪一部电影是我们喜欢的或者评分比较高的,那.    么通常的做法就是问问周围的朋友,看看最近有什么好的电影推荐. 在问的时候,都习惯于问跟自己口味差不.