[原]Spark MLlib系列(二):基于协同过滤的电影推荐系统

标签: | 发表时间:2015-02-13 16:36 | 作者:shifenglov
出处:http://blog.csdn.net/shifenglov

前言

随着大数据时代的到来,数据当中挖取金子的工作越来越有吸引力。利用Spark在内存迭代运算、机器学习领域强悍性能的优势,使用spark处理数据挖掘问题就显得很有实际价值。这篇文章给大家分享一个spark MLlib 的推荐实战例子。我将会分享怎样用spark MLlib做一个电影评分的推荐系统。使用到的算法是user-based协同过滤。如果对Spark MLlib不太了解的,请阅读我的上一篇博客。


推荐系统的对比



应该说,自从Amazone公布了协同过滤算法后,在推荐系统领域,它就占据了很重要的地位。不像传统的内容推荐,协同过滤不需要考虑物品的属性问题,用户的行为,行业问题等,只需要建立用户与物品的关联关系即可,可以物品之间更多的内在关系,类似于经典的啤酒与尿不湿的营销案例。所以,讲到推荐必须要首先分享协同过滤。

Spark MLlib中的协同过滤

协同过滤常被应用于推荐系统。这些技术旨在补充用户-商品关联矩阵中所缺失的部分。MLlib当前支持基于模型的协同过滤,其中用户和商品通过一小组隐语义因子进行表达,并且这些因子也用于预测缺失的元素。为此,我们实现了 交替最小二乘法(ALS) 来学习这些隐性语义因子。在 MLlib 中的实现有如下的参数:

numBlocks 是用于并行化计算的分块个数 (设置为-1为自动配置)。
rank 是模型中隐语义因子的个数。
iterations 是迭代的次数。
lambda 是ALS的正则化参数。
implicitPrefs 决定了是用 显性反馈ALS的版本还是用适用 隐性反馈数据集的版本。
alpha 是一个针对于 隐性反馈 ALS 版本的参数,这个参数决定了偏好行为强度的 基准

隐性反馈 vs 显性反馈

基于矩阵分解的协同过滤的标准方法一般将用户商品矩阵中的元素作为用户对商品的显性偏好。

在许多的现实生活中的很多场景中,我们常常只能接触到隐性的反馈(例如游览,点击,购买,喜欢,分享等等)在 MLlib 中所用到的处理这种数据的方法来源于文献:  Collaborative Filtering for Implicit Feedback Datasets。 本质上,这个方法将数据作为二元偏好值和偏好强度的一个结合,而不是对评分矩阵直接进行建模。因此,评价就不是与用户对商品的显性评分而是和所观察到的用户偏好强度关联了起来。然后,这个模型将尝试找到隐语义因子来预估一个用户对一个商品的偏好。

目前可用的协同过滤的算法:


数据准备

数据准备, MoiveLens的数据集,有100k到10m的数据都有。我们这里选择100k的数据。
对下载的数据解压之后,会出现很多文件,我们需要使用u.data和u.user文件。详细的数据说明可以参见README。

u.data是用户对电影评分的数据,也是训练集。数据分别表示userId,moiveId,评分rate,时间戳。如下图所示

u.user是用户的个人信息数据,用以推荐使用,分别表示userId,age,sex,job,zip code。我们只使用userId即可。如下图所示

实现的功能

这里有10w条用户对电影的评分,从1-5分,1分表示差劲,5分表示非常好看。根据用户对电影的喜好,给用户推荐可能感兴趣的电影。

实现思路

代码实现如下:
1、加载u.data数据到rating RDD中
2、对rating RDD的数据进行分解,只需要userId,moiveId,rating
3、使用rating RDD训练ALS模型
4、使用ALS模型为u.user中的用户进行电影推荐,数据保存到HBase中  
5、评估模型的均方差

代码

package com.ml.recommender

import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.mllib.recommendation._
import org.apache.spark.rdd.{ PairRDDFunctions, RDD }
import org.apache.spark.SparkContext
import scala.collection.mutable.HashMap
import java.util.List
import java.util.ArrayList
import scopt.OptionParser

import com.ml.util.HbaseUtil

/**
 * moivelens 电影推荐
 *
 */
object MoiveRecommender {

  val numRecommender = 10

  case class Params(
    input: String = null,
    numIterations: Int = 20,
    lambda: Double = 1.0,
    rank: Int = 10,
    numUserBlocks: Int = -1,
    numProductBlocks: Int = -1,
    implicitPrefs: Boolean = false,
    userDataInput: String = null)

  def main(args: Array[String]) {

    val defaultParams = Params()

    val parser = new OptionParser[Params]("MoiveRecommender") {
      head("MoiveRecommender: an example app for ALS on MovieLens data.")
      opt[Int]("rank")
        .text(s"rank, default: ${defaultParams.rank}}")
        .action((x, c) => c.copy(rank = x))
      opt[Int]("numIterations")
        .text(s"number of iterations, default: ${defaultParams.numIterations}")
        .action((x, c) => c.copy(numIterations = x))
      opt[Double]("lambda")
        .text(s"lambda (smoothing constant), default: ${defaultParams.lambda}")
        .action((x, c) => c.copy(lambda = x))
      opt[Int]("numUserBlocks")
        .text(s"number of user blocks, default: ${defaultParams.numUserBlocks} (auto)")
        .action((x, c) => c.copy(numUserBlocks = x))
      opt[Int]("numProductBlocks")
        .text(s"number of product blocks, default: ${defaultParams.numProductBlocks} (auto)")
        .action((x, c) => c.copy(numProductBlocks = x))
      opt[Unit]("implicitPrefs")
        .text("use implicit preference")
        .action((_, c) => c.copy(implicitPrefs = true))
      opt[String]("userDataInput")
        .required()
        .text("use data input path")
        .action((x, c) => c.copy(userDataInput = x))
      arg[String]("<input>")
        .required()
        .text("input paths to a MovieLens dataset of ratings")
        .action((x, c) => c.copy(input = x))
      note(
        """
          |For example, the following command runs this app on a synthetic dataset:
          |
          | bin/spark-submit --class com.zachary.ml.MoiveRecommender \
          |  examples/target/scala-*/spark-examples-*.jar \
          |  --rank 5 --numIterations 20 --lambda 1.0 \
          |  data/mllib/u.data
        """.stripMargin)
    }

    parser.parse(args, defaultParams).map { params =>
      run(params)
    } getOrElse {
      System.exit(1)
    }

  }

  def run(params: Params) {

    //本地运行模式,读取本地的spark主目录
    var conf = new SparkConf().setAppName("Moive Recommendation")
      .setSparkHome("D:\\work\\hadoop_lib\\spark-1.1.0-bin-hadoop2.4\\spark-1.1.0-bin-hadoop2.4")
    conf.setMaster("local[*]")

    //集群运行模式,读取spark集群的环境变量
    //var conf = new SparkConf().setAppName("Moive Recommendation")

    val context = new SparkContext(conf)

    //加载数据
    val data = context.textFile(params.input)

    /**
     * *MovieLens ratings are on a scale of 1-5:
     * 5: Must see
     * 4: Will enjoy
     * 3: It's okay
     * 2: Fairly bad
     * 1: Awful
     */
    val ratings = data.map(_.split("\t") match {
      case Array(user, item, rate, time) => Rating(user.toInt, item.toInt, rate.toDouble)
    })

    //使用ALS建立推荐模型    
    //也可以使用简单模式    val model = ALS.train(ratings, ranking, numIterations)
    val model = new ALS()
      .setRank(params.rank)
      .setIterations(params.numIterations)
      .setLambda(params.lambda)
      .setImplicitPrefs(params.implicitPrefs)
      .setUserBlocks(params.numUserBlocks)
      .setProductBlocks(params.numProductBlocks)
      .run(ratings)

    predictMoive(params, context, model)

    evaluateMode(ratings, model)

    //clean up
    context.stop()

  }

  /**
   * 模型评估
   */
  private def evaluateMode(ratings: RDD[Rating], model: MatrixFactorizationModel) {

    //使用训练数据训练模型
    val usersProducets = ratings.map(r => r match {
      case Rating(user, product, rate) => (user, product)
    })

    //预测数据
    val predictions = model.predict(usersProducets).map(u => u match {
      case Rating(user, product, rate) => ((user, product), rate)
    })

    //将真实分数与预测分数进行合并
    val ratesAndPreds = ratings.map(r => r match {
      case Rating(user, product, rate) =>
        ((user, product), rate)
    }).join(predictions)

    //计算均方差
    val MSE = ratesAndPreds.map(r => r match {
      case ((user, product), (r1, r2)) =>
        var err = (r1 - r2)
        err * err
    }).mean()

    //打印出均方差值
    println("Mean Squared Error = " + MSE)
  }

  /**
   * 预测数据并保存到HBase中
   */
  private def predictMoive(params: Params, context: SparkContext, model: MatrixFactorizationModel) {

    var recommenders = new ArrayList[java.util.Map[String, String]]();

    //读取需要进行电影推荐的用户数据
    val userData = context.textFile(params.userDataInput)

    userData.map(_.split("\\|") match {
      case Array(id, age, sex, job, x) => (id)
    }).collect().foreach(id => {
      //为用户推荐电影 
      var rs = model.recommendProducts(id.toInt, numRecommender)
      var value = ""
      var key = 0

      //保存推荐数据到hbase中
      rs.foreach(r => {
        key = r.user
        value = value + r.product + ":" + r.rating + ","
      })

      //成功,则封装put对象,等待插入到Hbase中
      if (!value.equals("")) {
        var put = new java.util.HashMap[String, String]()
        put.put("rowKey", key.toString)
        put.put("t:info", value)
        recommenders.add(put)
      }
    })

    //保存到到HBase的[recommender]表中
    //recommenders是返回的java的ArrayList,可以自己用Java或者Scala写HBase的操作工具类,这里我就不给出具体的代码了,应该可以很快的写出
    HbaseUtil.saveListMap("recommender", recommenders)
  }
}



运行

1、在scala IDE(或者eclipse安装scala插件)运行:
设置工程名,main类等


设置运行参数

--rank 10 --numIterations 40 --lambda 0.01 --userDataInput D:\\ml_data\\data_col\\ml-100k\\ml-100k\\u.user D:\\ml_data\\data_col\\ml-100k\\ml-100k\\u.data

2、在集群中运行如下:
/bin/spark-submit --jars hbase-client-0.98.0.2.1.5.0-695-hadoop2.jar,hbase-common-0.98.0.2.1.5.0-695-hadoop2.jar,hbase-protocol-0.98.0.2.1.5.0-695-hadoop2.jar,htrace-core-2.04.jar,protobuf-java-2.5.0.jar --master yarn-cluster --class com.ml.recommender.MoiveRecommender moive.jar
--rank 10 --numIterations 40 --lambda 0.01 --userDataInput hdfs:/spark_test/u.user hdfs:/spark_test/u.data

注意:
--jars表示项目需要的依赖包
moive.jar表示项目打包的名称

运行结果

均方差如下所示



HBase中推荐数据如下所示



比如 939 用户的推荐电影(格式 moivedID:rating):516:7.574462241760971,1056:6.979575106203245,1278:6.918614235693566,1268:6.914693317049802,1169:6.881813878580957,1316:6.681612000425281,564:6.622223206958775,909:6.597412586878512,51:6.539969654136097,1385:6.503960660826889

优化

1、可以调整这些参数,不断优化结果,使均方差变小。比如 iterations越多,lambda较小,均方差会较小,推荐结果较优

numBlocks 是用于并行化计算的分块个数 (设置为-1为自动配置)。
rank 是模型中隐语义因子的个数。
iterations 是迭代的次数。
lambda 是ALS的正则化参数。
implicitPrefs 决定了是用显性反馈ALS的版本还是用适用隐性反馈数据集的版本。
alpha 是一个针对于隐性反馈 ALS 版本的参数,这个参数决定了偏好行为强度的基准。

2、可以写一个程序去读取HBase的推荐数据,对外暴露一个rest接口,这样可以更方便展示。


欢迎有不同意见者来拍砖!谢谢。
转载请标明出处。

作者:shifenglov 发表于2015/2/13 16:36:00 原文链接
阅读:15676 评论:8 查看评论

相关 [spark mllib 系列] 推荐:

[原]Spark MLlib系列(二):基于协同过滤的电影推荐系统

- -
随着大数据时代的到来,数据当中挖取金子的工作越来越有吸引力. 利用Spark在内存迭代运算、机器学习领域强悍性能的优势,使用spark处理数据挖掘问题就显得很有实际价值. 这篇文章给大家分享一个spark MLlib 的推荐实战例子. 我将会分享怎样用spark MLlib做一个电影评分的推荐系统.

Spark MLlib中的协同过滤

- - JavaChen Blog
本文主要通过Spark官方的例子理解ALS协同过滤算法的原理和编码过程,然后通过对电影进行推荐来熟悉一个完整的推荐过程. 协同过滤常被应用于推荐系统,旨在补充用户-商品关联矩阵中所缺失的部分. MLlib当前支持基于模型的协同过滤,其中用户和商品通过一小组隐语义因子进行表达,并且这些因子也用于预测缺失的元素.

使用Spark MLlib给豆瓣用户推荐电影

- - 鸟窝
推荐算法就是利用用户的一些行为,通过一些数学算法,推测出用户可能喜欢的东西. 随着电子商务规模的不断扩大,商品数量和种类不断增长,用户对于检索和推荐提出了更高的要求. 由于不同用户在兴趣爱好、关注领域、个人经历等方面的不同,以满足不同用户的不同推荐需求为目的、不同人可以获得不同推荐为重要特征的个性化推荐系统应运而生.

基于Spark MLlib平台的协同过滤算法---电影推荐系统

- - zzm
又好一阵子没有写文章了,阿弥陀佛...最近项目中要做理财推荐,所以,回过头来回顾一下协同过滤算法在推荐系统中的应用.     说到推荐系统,大家可能立马会想到协同过滤算法. 本文基于Spark MLlib平台实现一个向用户推荐电影的简单应用. 基于模型的协同过滤应用---电影推荐.     一、协同过滤算法概述.

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用 - shishanyuan - 博客园

- -
【注】该系列文章以及使用到安装包/测试数据 可以在《. 倾情大奉送--Spark入门实战系列》获取. l 主机操作系统:Windows 64位,双核4线程,主频2.2G,10G内存. l 虚拟软件:VMware® Workstation 9.0.0 build-812388. l 虚拟机操作系统:CentOS 64位,单核.

Spark概览

- - 简单文本
Spark具有先进的DAG执行引擎,支持cyclic data flow和内存计算. 因此,它的运行速度,在内存中是Hadoop MapReduce的100倍,在磁盘中是10倍. 这样的性能指标,真的让人心动啊. Spark的API更为简单,提供了80个High Level的操作,可以很好地支持并行应用.

Spark与Mapreduce?

- - 崔永键的博客
我本人是类似Hive平台的系统工程师,我对MapReduce的熟悉程度是一般,它是我的底层框架. 我隔壁组在实验Spark,想将一部分计算迁移到Spark上. 年初的时候,看Spark的评价,几乎一致表示,Spark是小数据集上处理复杂迭代的交互系统,并不擅长大数据集,也没有稳定性. 但是最近的风评已经变化,尤其是14年10月他们完成了Peta sort的实验,这标志着Spark越来越接近替代Hadoop MapReduce了.

Spark迷思

- - ITeye博客
目前在媒体上有很大的关于Apache Spark框架的声音,渐渐的它成为了大数据领域的下一个大的东西. 证明这件事的最简单的方式就是看google的趋势图:. 上图展示的过去两年Hadoop和Spark的趋势. Spark在终端用户之间变得越来越受欢迎,而且这些用户经常在网上找Spark相关资料. 这给了Spark起了很大的宣传作用;同时围绕着它的也有误区和思维错误,而且很多人还把这些误区作为银弹,认为它可以解决他们的问题并提供比Hadoop好100倍的性能.

Spark 优化

- - CSDN博客推荐文章
提到Spark与Hadoop的区别,基本最常说的就是Spark采用基于内存的计算方式,尽管这种方式对数据处理的效率很高,但也会往往引发各种各样的问题,Spark中常见的OOM等等. 效率高的特点,注定了Spark对性能的严苛要求,那Spark不同程序的性能会碰到不同的资源瓶颈,比如:CPU,带宽、内存.

Mllib机器学习工具包在Hadoop上的使用

- 小丑鱼 - 搜索技术博客-淘宝
         Hadoop是基于Java的数据计算平台,在我们公司得到了广泛应用. 同样mllib也是我们算法组同学经常使用的一个机器学习工具包. 但目前mllib工具包只提供了供C++程序调用的so链接库,没有可供java程序调用的jar包. 由于这个需求有一定的普遍性,笔者将mllib做了进一步封装,并通过jni的方式把其封装成了可供java程序调用的接口.