Spark MLlib中的协同过滤

标签: spark mllib 协同过滤 | 发表时间:2015-04-16 16:00 | 作者:
出处:http://blog.javachen.com/rss.xml

本文主要通过Spark官方的例子理解ALS协同过滤算法的原理和编码过程,然后通过对电影进行推荐来熟悉一个完整的推荐过程。

协同过滤

协同过滤常被应用于推荐系统,旨在补充用户-商品关联矩阵中所缺失的部分。MLlib当前支持基于模型的协同过滤,其中用户和商品通过一小组隐语义因子进行表达,并且这些因子也用于预测缺失的元素。Spark MLlib实现了 交替最小二乘法(ALS) 来学习这些隐性语义因子。

在 MLlib 中的实现类为org.apache.spark.mllib.recommendation.ALS.scala,其有如下的参数:

  • numUserBlocks:是用于并行化计算的分块个数 (设置为-1,为自动配置)。
  • numProductBlocks:是用于并行化计算的分块个数 (设置为-1,为自动配置)。
  • rank:是模型中隐语义因子的个数。
  • iterations:是迭代的次数,推荐值:10-20。
  • lambda:惩罚函数的因数,是ALS的正则化参数,推荐值:0.01。
  • implicitPrefs:决定了是用显性反馈ALS的版本还是用适用隐性反馈数据集的版本。
  • alpha:是一个针对于隐性反馈 ALS 版本的参数,这个参数决定了偏好行为强度的基准。
  • seed:随机种子

可以调整这些参数,不断优化结果,使均方差变小。比如:iterations越多,lambda较小,均方差会较小,推荐结果较优。

提供以下方法:

  def run(ratings: RDD[Rating]): MatrixFactorizationModel

def train(ratings: RDD[Rating],rank: Int,iterations: Int,lambda: Double,blocks: Int,seed: Long): MatrixFactorizationModel
def train(ratings: RDD[Rating],rank: Int,iterations: Int,lambda: Double,blocks: Int): MatrixFactorizationModel
def train(ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double): MatrixFactorizationModel
def train(ratings: RDD[Rating], rank: Int, iterations: Int): MatrixFactorizationModel

def trainImplicit(ratings: RDD[Rating],rank: Int,iterations: Int,lambda: Double,blocks: Int,alpha: Double,seed: Long): MatrixFactorizationModel
def trainImplicit(ratings: RDD[Rating],rank: Int,iterations: Int,lambda: Double,blocks: Int,alpha: Double): MatrixFactorizationModel
def trainImplicit(ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double, alpha: Double): MatrixFactorizationModel
def trainImplicit(ratings: RDD[Rating], rank: Int, iterations: Int): MatrixFactorizationModel

以上所有方法需要一个参数Rating,其为一个包括三个元素的 case class:

  case class Rating(user: Int, product: Int, rating: Double)

另外,以上方法均返回MatrixFactorizationModel类型的对象,提供以下方法:

  /** Predict the rating of one user for one product. */
def predict(user: Int, product: Int): Double

/**Predict the rating of many users for many products.*/
def predict(usersProducts: RDD[(Int, Int)]): RDD[Rating]

// Recommends products to a user.
def recommendProducts(user: Int, num: Int): Array[Rating]

//Recommends users to a product.
def recommendUsers(product: Int, num: Int): Array[Rating]

def save(sc: SparkContext, path: String): Unit

def load(sc: SparkContext, path: String): MatrixFactorizationModel =

隐性反馈 vs 显性反馈

基于矩阵分解的协同过滤的标准方法一般将用户商品矩阵中的元素作为用户对商品的显性偏好。
在许多的现实生活中的很多场景中,我们常常只能接触到隐性的反馈(例如游览,点击,购买,喜欢,分享等等)在 MLlib 中所用到的处理这种数据的方法来源于文献: Collaborative Filtering for Implicit Feedback Datasets。 本质上,这个方法将数据作为二元偏好值和偏好强度的一个结合,而不是对评分矩阵直接进行建模。因此,评价就不是与用户对商品的显性评分而是和所观察到的用户偏好强度关联了起来。然后,这个模型将尝试找到隐语义因子来预估一个用户对一个商品的偏好。

代码示例

下面例子来自 http://spark.apache.org/docs/latest/mllib-collaborative-filtering.html,并做了稍许修改。

Scala 示例

为了测试简单,使用Spark本地运行模式进行测试。下面代码可以在spark-shell中运行:

  import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel
import org.apache.spark.mllib.recommendation.Rating

// Load and parse the data
val data = sc.textFile("data/mllib/als/test.data")
val ratings = data.map(_.split(',') match { case Array(user, item, rate) =>
    Rating(user.toInt, item.toInt, rate.toDouble)
  })

// Build the recommendation model using ALS
val rank = 10
val numIterations = 20
val model = ALS.train(ratings, rank, numIterations, 0.01)

// Evaluate the model on rating data
val usersProducts = ratings.map { case Rating(user, product, rate) =>
  (user, product)
}
val predictions = 
  model.predict(usersProducts).map { case Rating(user, product, rate) => 
    ((user, product), rate)
  }
val ratesAndPreds = ratings.map { case Rating(user, product, rate) => 
  ((user, product), rate)
}.join(predictions)
val MSE = ratesAndPreds.map { case ((user, product), (r1, r2)) => 
  val err = (r1 - r2)
  err * err
}.mean()
println("Mean Squared Error = " + MSE)

// Save and load model
model.save(sc, "myModelPath")
val sameModel = MatrixFactorizationModel.load(sc, "myModelPath")

Java示例

  import scala.Tuple2;

import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.recommendation.ALS;
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel;
import org.apache.spark.mllib.recommendation.Rating;
import org.apache.spark.SparkConf;

public class JavaALS {
  public static void main(String[] args) {
    SparkConf conf = new SparkConf().setAppName("Collaborative Filtering Example");
    JavaSparkContext sc = new JavaSparkContext(conf);

    // Load and parse the data
    String path = "data/mllib/als/test.data";
    JavaRDD<String> data = sc.textFile(path);
    JavaRDD<Rating> ratings = data.map(
      new Function<String, Rating>() {
        public Rating call(String s) {
          String[] sarray = s.split(",");
          return new Rating(Integer.parseInt(sarray[0]), Integer.parseInt(sarray[1]), 
                            Double.parseDouble(sarray[2]));
        }
      }
    );

    // Build the recommendation model using ALS
    int rank = 10;
    int numIterations = 20;
    float lambda = 0.01;
    MatrixFactorizationModel model = ALS.train(JavaRDD.toRDD(ratings), rank, numIterations, lambda); 

    // Evaluate the model on rating data
    JavaRDD<Tuple2<Object, Object>> userProducts = ratings.map(
      new Function<Rating, Tuple2<Object, Object>>() {
        public Tuple2<Object, Object> call(Rating r) {
          return new Tuple2<Object, Object>(r.user(), r.product());
        }
      }
    );
    JavaPairRDD<Tuple2<Integer, Integer>, Double> predictions = JavaPairRDD.fromJavaRDD(
      model.predict(JavaRDD.toRDD(userProducts)).toJavaRDD().map(
        new Function<Rating, Tuple2<Tuple2<Integer, Integer>, Double>>() {
          public Tuple2<Tuple2<Integer, Integer>, Double> call(Rating r){
            return new Tuple2<Tuple2<Integer, Integer>, Double>(
              new Tuple2<Integer, Integer>(r.user(), r.product()), r.rating());
          }
        }
    ));
    JavaRDD<Tuple2<Double, Double>> ratesAndPreds = 
      JavaPairRDD.fromJavaRDD(ratings.map(
        new Function<Rating, Tuple2<Tuple2<Integer, Integer>, Double>>() {
          public Tuple2<Tuple2<Integer, Integer>, Double> call(Rating r){
            return new Tuple2<Tuple2<Integer, Integer>, Double>(
              new Tuple2<Integer, Integer>(r.user(), r.product()), r.rating());
          }
        }
    )).join(predictions).values();
    double MSE = JavaDoubleRDD.fromRDD(ratesAndPreds.map(
      new Function<Tuple2<Double, Double>, Object>() {
        public Object call(Tuple2<Double, Double> pair) {
          Double err = pair._1() - pair._2();
          return err * err;
        }
      }
    ).rdd()).mean();
    System.out.println("Mean Squared Error = " + MSE);
  }
}

Python示例

下面代码可以在 pyspark 中运行下面代码:

  from pyspark.mllib.recommendation import ALS
from numpy import array

# Load and parse the data
data = sc.textFile("data/mllib/als/test.data")
ratings = data.map(lambda line: array([float(x) for x in line.split(',')]))

# Build the recommendation model using Alternating Least Squares
rank = 10
numIterations = 20
model = ALS.train(ratings, rank, numIterations)

# Evaluate the model on training data
testdata = ratings.map(lambda p: (int(p[0]), int(p[1])))
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).reduce(lambda x, y: x + y)/ratesAndPreds.count()
print("Mean Squared Error = " + str(MSE))

总结

使用Spark MLlib的ALS算法进行协同过滤,首先需要了解推荐的过程,然后需要根据测试不断修改训练测试,建立合理的模型,最后再给用户进行推荐商品,保存推荐结果。

另外,在网上找到一些Spark做推荐的项目:

更多关于推荐相关的资源可以参考 Reading List 2015-03

参考文章

相关 [spark mllib 协同过滤] 推荐:

Spark MLlib中的协同过滤

- - JavaChen Blog
本文主要通过Spark官方的例子理解ALS协同过滤算法的原理和编码过程,然后通过对电影进行推荐来熟悉一个完整的推荐过程. 协同过滤常被应用于推荐系统,旨在补充用户-商品关联矩阵中所缺失的部分. MLlib当前支持基于模型的协同过滤,其中用户和商品通过一小组隐语义因子进行表达,并且这些因子也用于预测缺失的元素.

基于Spark MLlib平台的协同过滤算法---电影推荐系统

- - zzm
又好一阵子没有写文章了,阿弥陀佛...最近项目中要做理财推荐,所以,回过头来回顾一下协同过滤算法在推荐系统中的应用.     说到推荐系统,大家可能立马会想到协同过滤算法. 本文基于Spark MLlib平台实现一个向用户推荐电影的简单应用. 基于模型的协同过滤应用---电影推荐.     一、协同过滤算法概述.

[原]Spark MLlib系列(二):基于协同过滤的电影推荐系统

- -
随着大数据时代的到来,数据当中挖取金子的工作越来越有吸引力. 利用Spark在内存迭代运算、机器学习领域强悍性能的优势,使用spark处理数据挖掘问题就显得很有实际价值. 这篇文章给大家分享一个spark MLlib 的推荐实战例子. 我将会分享怎样用spark MLlib做一个电影评分的推荐系统.

使用Mahout实现协同过滤 spark

- - zzm
Mahout使用了Taste来提高协同过滤算法的实现,它是一个基于Java实现的可扩展的,高效的推荐引擎. Taste既实现了最基本的基 于用户的和基于内容的推荐算法,同时也提供了扩展接口,使用户可以方便的定义和实现自己的推荐算法. 同时,Taste不仅仅只适用于Java应用程序,它 可以作为内部服务器的一个组件以HTTP和Web Service的形式向外界提供推荐的逻辑.

如何使用Spark ALS实现协同过滤

- - 鸟窝
转载自 JavaChen Blog,作者: Junez. 本文主要记录最近一段时间学习和实现Spark MLlib中的协同过滤的一些总结,希望对大家熟悉Spark ALS算法有所帮助. 【2016.06.12】Spark1.4.0中MatrixFactorizationModel提供了recommendForAll方法实现离线批量推荐,见 SPARK-3066.

使用Spark MLlib给豆瓣用户推荐电影

- - 鸟窝
推荐算法就是利用用户的一些行为,通过一些数学算法,推测出用户可能喜欢的东西. 随着电子商务规模的不断扩大,商品数量和种类不断增长,用户对于检索和推荐提出了更高的要求. 由于不同用户在兴趣爱好、关注领域、个人经历等方面的不同,以满足不同用户的不同推荐需求为目的、不同人可以获得不同推荐为重要特征的个性化推荐系统应运而生.

协同过滤算法

- - CSDN博客推荐文章
今天要讲的主要内容是 协同过滤,即Collaborative Filtering,简称 CF.    关于协同过滤的一个最经典的例子就是看电影,有时候不知道哪一部电影是我们喜欢的或者评分比较高的,那.    么通常的做法就是问问周围的朋友,看看最近有什么好的电影推荐. 在问的时候,都习惯于问跟自己口味差不.

协同过滤 Collaborative Filtering

- - IT技术博客大学习
   协同过滤算法是推荐系统中最古老,也是最简单高效的推荐算法. 简单说协同过滤就是根据以往的用户产生的数据分析,对用户的新行为进行匹配分析来给用户推荐用户最有可能感兴趣的内容.    协同过滤算法是为了解决 长尾现象,也就是说推荐系统是为了解决长尾现象而诞生的. 因为在之前在有限的空间(如:书店的书架、服装店的衣架、商店的货架、网页的展示区域)只能摆有限的物品进行展示,造成大量的非热门物品很难进入人们的视野,也就无法产生任何价值.

协同过滤和推荐引擎

- - 刘思喆@贝吉塔行星
推荐系统在个性化领域有着广泛的应用,从技术上讲涉及概率、抽样、最优化、机器学习、数据挖掘、搜索引擎、自然语言处理等多个领域. 东西太多,我也不准备写连载,今天仅从基本算法这个很小的切入点来聊聊推荐引擎的原理. 推荐引擎(系统)从不同的角度看有不同的划分,比如:. 按照数据的分类:协同过滤、内容过滤、社会化过滤.