<< 地理空间距离计算优化 - 美团点评技术团队 | 首页 | 记一次java native memory增长问题的排查 - Axb的自我修养 >>

使用Spark-MLlib进行内容推荐

在许多的现实生活中的很多场景中,我们常常只能接触到隐性的反馈(例如游览,点击,购买,喜欢,分享等等)在 MLlib 中所用到的处理这种数据的方法来源于文献: Collaborative Filtering for Implicit Feedback Datasets。 本质上,这个方法将数据作为二元偏好值和偏好强度的一个结合,而不是对评分矩阵直接进行建模。因此,评价就不是与用户对商品的显性评分而是和所观察到的用户偏好强度关联了起来。然后,这个模型将尝试找到隐语义因子来预估一个用户对一个商品的偏好。

 

package org.apache.spark.examples.mllib;

 

// $example on$

import scala.Tuple2;

 

import org.apache.spark.api.java.*;

import org.apache.spark.api.java.function.Function;

import org.apache.spark.mllib.recommendation.ALS;

import org.apache.spark.mllib.recommendation.MatrixFactorizationModel;

import org.apache.spark.mllib.recommendation.Rating;

import org.apache.spark.SparkConf;

// $example off$

 

public class JavaRecommendationExample {

  public static void main(String args[]) {

    // $example on$

    SparkConf conf = new SparkConf().setAppName("Java Collaborative Filtering Example");

    JavaSparkContext jsc = new JavaSparkContext(conf);

 

    // Load and parse the data

    String path = "../data/mllib/als/test.data";

    JavaRDD<String> data = jsc.textFile(path);

    JavaRDD<Rating> ratings = data.map(

      new Function<String, Rating>() {

        public Rating call(String s) {

          String[] sarray = s.split(",");

          return new Rating(Integer.parseInt(sarray[0]), Integer.parseInt(sarray[1]),

            Double.parseDouble(sarray[2]));

        }

      }

    );

 

    // Build the recommendation model using ALS

    int rank = 10;

    int numIterations = 10;

//使用具体评分数进行训练

    MatrixFactorizationModel model = ALS.train(JavaRDD.toRDD(ratings), rank, numIterations, 0.01);

//忽略评分数据进行模型训练

//MatrixFactorizationModel model = ALS.trainImplicit(JavaRDD.toRDD(ratings), rank, numIterations, 0.01, 0.01);

 

    // Evaluate the model on rating data

    JavaRDD<Tuple2<Object, Object>> userProducts = ratings.map(

      new Function<Rating, Tuple2<Object, Object>>() {

        public Tuple2<Object, Object> call(Rating r) {

          return new Tuple2<Object, Object>(r.user(), r.product());

        }

      }

    );

    JavaPairRDD<Tuple2<Integer, Integer>, Double> predictions = JavaPairRDD.fromJavaRDD(

      model.predict(JavaRDD.toRDD(userProducts)).toJavaRDD().map(

        new Function<Rating, Tuple2<Tuple2<Integer, Integer>, Double>>() {

          public Tuple2<Tuple2<Integer, Integer>, Double> call(Rating r){

            return new Tuple2<Tuple2<Integer, Integer>, Double>(

              new Tuple2<Integer, Integer>(r.user(), r.product()), r.rating());

          }

        }

      ));

    JavaRDD<Tuple2<Double, Double>> ratesAndPreds =

      JavaPairRDD.fromJavaRDD(ratings.map(

        new Function<Rating, Tuple2<Tuple2<Integer, Integer>, Double>>() {

          public Tuple2<Tuple2<Integer, Integer>, Double> call(Rating r){

            return new Tuple2<Tuple2<Integer, Integer>, Double>(

              new Tuple2<Integer, Integer>(r.user(), r.product()), r.rating());

          }

        }

      )).join(predictions).values();

    double MSE = JavaDoubleRDD.fromRDD(ratesAndPreds.map(

      new Function<Tuple2<Double, Double>, Object>() {

        public Object call(Tuple2<Double, Double> pair) {

          Double err = pair._1() - pair._2();

          return err * err;

        }

      }

    ).rdd()).mean();

    System.out.println("Mean Squared Error = " + MSE);

 

    // Save and load model

    model.save(jsc.sc(), "target/tmp/myCollaborativeFilter");

    MatrixFactorizationModel sameModel = MatrixFactorizationModel.load(jsc.sc(),

      "target/tmp/myCollaborativeFilter");

    

//使用模型为用户推荐内容

    Rating[] recommendations =sameModel.recommendProducts(1, 3);

    for(int i=0;i<recommendations.length;i++){

    System.out.println("推荐的产品:"+recommendations[i].product());

    }

    // $example off$

  }

}

 

 




发表评论 发送引用通报