Spark MLlib中的协同过滤

标签: spark mllib 协同过滤 | 发表时间:2015-04-17 00:00 | 作者:
出处:http://blog.javachen.com/rss.xml

本文主要通过Spark官方的例子理解ALS协同过滤算法的原理和编码过程,然后通过对电影进行推荐来熟悉一个完整的推荐过程。

协同过滤

协同过滤常被应用于推荐系统,旨在补充用户-商品关联矩阵中所缺失的部分。MLlib当前支持基于模型的协同过滤,其中用户和商品通过一小组隐语义因子进行表达,并且这些因子也用于预测缺失的元素。Spark MLlib实现了 交替最小二乘法(ALS) 来学习这些隐性语义因子。

在 MLlib 中的实现类为org.apache.spark.mllib.recommendation.ALS.scala,其有如下的参数:

  • numUserBlocks:是用于并行化计算的分块个数 (设置为-1,为自动配置)。
  • numProductBlocks:是用于并行化计算的分块个数 (设置为-1,为自动配置)。
  • rank:是模型中隐语义因子的个数。
  • iterations:是迭代的次数,推荐值:10-20。
  • lambda:惩罚函数的因数,是ALS的正则化参数,推荐值:0.01。
  • implicitPrefs:决定了是用显性反馈ALS的版本还是用适用隐性反馈数据集的版本。
  • alpha:是一个针对于隐性反馈 ALS 版本的参数,这个参数决定了偏好行为强度的基准。
  • seed:随机种子

可以调整这些参数,不断优化结果,使均方差变小。比如:iterations越多,lambda较小,均方差会较小,推荐结果较优。

提供以下方法:

  def run(ratings: RDD[Rating]): MatrixFactorizationModel

def train(ratings: RDD[Rating],rank: Int,iterations: Int,lambda: Double,blocks: Int,seed: Long): MatrixFactorizationModel
def train(ratings: RDD[Rating],rank: Int,iterations: Int,lambda: Double,blocks: Int): MatrixFactorizationModel
def train(ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double): MatrixFactorizationModel
def train(ratings: RDD[Rating], rank: Int, iterations: Int): MatrixFactorizationModel

def trainImplicit(ratings: RDD[Rating],rank: Int,iterations: Int,lambda: Double,blocks: Int,alpha: Double,seed: Long): MatrixFactorizationModel
def trainImplicit(ratings: RDD[Rating],rank: Int,iterations: Int,lambda: Double,blocks: Int,alpha: Double): MatrixFactorizationModel
def trainImplicit(ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double, alpha: Double): MatrixFactorizationModel
def trainImplicit(ratings: RDD[Rating], rank: Int, iterations: Int): MatrixFactorizationModel

以上所有方法需要一个参数Rating,其为一个包括三个元素的 case class:

  case class Rating(user: Int, product: Int, rating: Double)

另外,以上方法均返回MatrixFactorizationModel类型的对象,提供以下方法:

  /** Predict the rating of one user for one product. */
def predict(user: Int, product: Int): Double

/**Predict the rating of many users for many products.*/
def predict(usersProducts: RDD[(Int, Int)]): RDD[Rating]

// Recommends products to a user.
def recommendProducts(user: Int, num: Int): Array[Rating]

//Recommends users to a product.
def recommendUsers(product: Int, num: Int): Array[Rating]

def save(sc: SparkContext, path: String): Unit

def load(sc: SparkContext, path: String): MatrixFactorizationModel =

隐性反馈 vs 显性反馈

基于矩阵分解的协同过滤的标准方法一般将用户商品矩阵中的元素作为用户对商品的显性偏好。
在许多的现实生活中的很多场景中,我们常常只能接触到隐性的反馈(例如游览,点击,购买,喜欢,分享等等)在 MLlib 中所用到的处理这种数据的方法来源于文献: Collaborative Filtering for Implicit Feedback Datasets。 本质上,这个方法将数据作为二元偏好值和偏好强度的一个结合,而不是对评分矩阵直接进行建模。因此,评价就不是与用户对商品的显性评分而是和所观察到的用户偏好强度关联了起来。然后,这个模型将尝试找到隐语义因子来预估一个用户对一个商品的偏好。

代码示例

下面例子来自 http://spark.apache.org/docs/latest/mllib-collaborative-filtering.html,并做了稍许修改。

Scala 示例

为了测试简单,使用Spark本地运行模式进行测试。下面代码可以在spark-shell中运行:

  import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel
import org.apache.spark.mllib.recommendation.Rating

// Load and parse the data
val data = sc.textFile("data/mllib/als/test.data")
val ratings = data.map(_.split(',') match { case Array(user, item, rate) =>
    Rating(user.toInt, item.toInt, rate.toDouble)
  })

// Build the recommendation model using ALS
val rank = 10
val numIterations = 20
val model = ALS.train(ratings, rank, numIterations, 0.01)

// Evaluate the model on rating data
val usersProducts = ratings.map { case Rating(user, product, rate) =>
  (user, product)
}
val predictions = 
  model.predict(usersProducts).map { case Rating(user, product, rate) => 
    ((user, product), rate)
  }
val ratesAndPreds = ratings.map { case Rating(user, product, rate) => 
  ((user, product), rate)
}.join(predictions)
val MSE = ratesAndPreds.map { case ((user, product), (r1, r2)) => 
  val err = (r1 - r2)
  err * err
}.mean()
println("Mean Squared Error = " + MSE)

// Save and load model
model.save(sc, "myModelPath")
val sameModel = MatrixFactorizationModel.load(sc, "myModelPath")

Java示例

  import scala.Tuple2;

import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.recommendation.ALS;
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel;
import org.apache.spark.mllib.recommendation.Rating;
import org.apache.spark.SparkConf;

public class JavaALS {
  public static void main(String[] args) {
    SparkConf conf = new SparkConf().setAppName("Collaborative Filtering Example");
    JavaSparkContext sc = new JavaSparkContext(conf);

    // Load and parse the data
    String path = "data/mllib/als/test.data";
    JavaRDD<String> data = sc.textFile(path);
    JavaRDD<Rating> ratings = data.map(
      new Function<String, Rating>() {
        public Rating call(String s) {
          String[] sarray = s.split(",");
          return new Rating(Integer.parseInt(sarray[0]), Integer.parseInt(sarray[1]), 
                            Double.parseDouble(sarray[2]));
        }
      }
    );

    // Build the recommendation model using ALS
    int rank = 10;
    int numIterations = 20;
    float lambda = 0.01;
    MatrixFactorizationModel model = ALS.train(JavaRDD.toRDD(ratings), rank, numIterations, lambda); 

    // Evaluate the model on rating data
    JavaRDD<Tuple2<Object, Object>> userProducts = ratings.map(
      new Function<Rating, Tuple2<Object, Object>>() {
        public Tuple2<Object, Object> call(Rating r) {
          return new Tuple2<Object, Object>(r.user(), r.product());
        }
      }
    );
    JavaPairRDD<Tuple2<Integer, Integer>, Double> predictions = JavaPairRDD.fromJavaRDD(
      model.predict(JavaRDD.toRDD(userProducts)).toJavaRDD().map(
        new Function<Rating, Tuple2<Tuple2<Integer, Integer>, Double>>() {
          public Tuple2<Tuple2<Integer, Integer>, Double> call(Rating r){
            return new Tuple2<Tuple2<Integer, Integer>, Double>(
              new Tuple2<Integer, Integer>(r.user(), r.product()), r.rating());
          }
        }
    ));
    JavaRDD<Tuple2<Double, Double>> ratesAndPreds = 
      JavaPairRDD.fromJavaRDD(ratings.map(
        new Function<Rating, Tuple2<Tuple2<Integer, Integer>, Double>>() {
          public Tuple2<Tuple2<Integer, Integer>, Double> call(Rating r){
            return new Tuple2<Tuple2<Integer, Integer>, Double>(
              new Tuple2<Integer, Integer>(r.user(), r.product()), r.rating());
          }
        }
    )).join(predictions).values();
    double MSE = JavaDoubleRDD.fromRDD(ratesAndPreds.map(
      new Function<Tuple2<Double, Double>, Object>() {
        public Object call(Tuple2<Double, Double> pair) {
          Double err = pair._1() - pair._2();
          return err * err;
        }
      }
    ).rdd()).mean();
    System.out.println("Mean Squared Error = " + MSE);
  }
}

Python示例

下面代码可以在 pyspark 中运行下面代码:

  from pyspark.mllib.recommendation import ALS
from numpy import array

# Load and parse the data
data = sc.textFile("data/mllib/als/test.data")
ratings = data.map(lambda line: array([float(x) for x in line.split(',')]))

# Build the recommendation model using Alternating Least Squares
rank = 10
numIterations = 20
model = ALS.train(ratings, rank, numIterations)

# Evaluate the model on training data
testdata = ratings.map(lambda p: (int(p[0]), int(p[1])))
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).reduce(lambda x, y: x + y)/ratesAndPreds.count()
print("Mean Squared Error = " + str(MSE))

总结

使用Spark MLlib的ALS算法进行协同过滤,首先需要了解推荐的过程,然后需要根据测试不断修改训练测试,建立合理的模型,最后再给用户进行推荐商品,保存推荐结果。

另外,在网上找到一些Spark做推荐的项目:

更多关于推荐相关的资源可以参考 Reading List 2015-03

参考文章

相关 [spark mllib 协同过滤] 推荐:

Spark MLlib中的协同过滤

- - JavaChen Blog
本文主要通过Spark官方的例子理解ALS协同过滤算法的原理和编码过程,然后通过对电影进行推荐来熟悉一个完整的推荐过程. 协同过滤常被应用于推荐系统,旨在补充用户-商品关联矩阵中所缺失的部分. MLlib当前支持基于模型的协同过滤,其中用户和商品通过一小组隐语义因子进行表达,并且这些因子也用于预测缺失的元素.

基于Spark MLlib平台的协同过滤算法---电影推荐系统

- - zzm
又好一阵子没有写文章了,阿弥陀佛...最近项目中要做理财推荐,所以,回过头来回顾一下协同过滤算法在推荐系统中的应用.     说到推荐系统,大家可能立马会想到协同过滤算法. 本文基于Spark MLlib平台实现一个向用户推荐电影的简单应用. 基于模型的协同过滤应用---电影推荐.     一、协同过滤算法概述.

[原]Spark MLlib系列(二):基于协同过滤的电影推荐系统

- -
随着大数据时代的到来,数据当中挖取金子的工作越来越有吸引力. 利用Spark在内存迭代运算、机器学习领域强悍性能的优势,使用spark处理数据挖掘问题就显得很有实际价值. 这篇文章给大家分享一个spark MLlib 的推荐实战例子. 我将会分享怎样用spark MLlib做一个电影评分的推荐系统.

使用Mahout实现协同过滤 spark

- - zzm
Mahout使用了Taste来提高协同过滤算法的实现,它是一个基于Java实现的可扩展的,高效的推荐引擎. Taste既实现了最基本的基 于用户的和基于内容的推荐算法,同时也提供了扩展接口,使用户可以方便的定义和实现自己的推荐算法. 同时,Taste不仅仅只适用于Java应用程序,它 可以作为内部服务器的一个组件以HTTP和Web Service的形式向外界提供推荐的逻辑.

Spark机器学习案例 spark-example: spark mllib example

- -
#这是一个Spark MLlib实例 . ##1 K-means基础实例 . 命名为kmeans_data.txt,且上传到hdfs的/spark/mllib/data/路径中. 在Intellij中,点击file->选择project structure->选择Artifact->添加jar->把乱七八糟的依赖移除->勾选Build on make.

GitHub - allwefantasy/streamingpro: Build Spark Batch/Streaming/MLlib Application by SQL

- -
StreamingPro 中文文档. 应用模式:写json配置文件,StreamingPro启动后执行该文件,可以作为批处理或者流式程序. 服务模式:启动一个StreamingPro Server作为常驻程序,然后通过http接口发送MLSQL脚本进行交互. 我们强烈推荐使用第二种模式,第一种模式现在已经不太更新了,现在迅速迭代的是第二种模式,并且第二种模式可以构建AI平台.

如何使用Spark ALS实现协同过滤

- - 鸟窝
转载自 JavaChen Blog,作者: Junez. 本文主要记录最近一段时间学习和实现Spark MLlib中的协同过滤的一些总结,希望对大家熟悉Spark ALS算法有所帮助. 【2016.06.12】Spark1.4.0中MatrixFactorizationModel提供了recommendForAll方法实现离线批量推荐,见 SPARK-3066.

【实践】Spark 协同过滤ALS之Item2Item相似度计算优化 - CSDN博客

- -
CF召回优化,自之前第一版自己实现的基于item的协同过滤算法. http://blog.csdn.net/dengxing1234/article/details/76122465,考虑到用户隐型评分的. 稀疏性问题,所以尝试用Spark ml包(非mllib)中的ALS算法的中间产物item的隐性向量,进行进一步item到item的余弦相似度计算.

使用Spark MLlib给豆瓣用户推荐电影

- - 鸟窝
推荐算法就是利用用户的一些行为,通过一些数学算法,推测出用户可能喜欢的东西. 随着电子商务规模的不断扩大,商品数量和种类不断增长,用户对于检索和推荐提出了更高的要求. 由于不同用户在兴趣爱好、关注领域、个人经历等方面的不同,以满足不同用户的不同推荐需求为目的、不同人可以获得不同推荐为重要特征的个性化推荐系统应运而生.

Spark-mllib 文本特征提取算法 - CSDN博客

- -
Spark MLlib 提供三种文本特征提取方法,分别为TF-IDF、Word2Vec以及CountVectorizer,. 词频-逆向文件频率(TF-IDF)是一种在文本挖掘中广泛使用的特征向量化方法,它可以体现一个文档中词语在语料库中的重要程度. 词语由t表示,文档由d表示,语料库由D表示. 词频TF(t,,d)是词语t在文档d中出现的次数.