使用Spark-MLlib进行内容推荐
在许多的现实生活中的很多场景中,我们常常只能接触到隐性的反馈(例如游览,点击,购买,喜欢,分享等等)在 MLlib 中所用到的处理这种数据的方法来源于文献: Collaborative Filtering for Implicit Feedback Datasets。 本质上,这个方法将数据作为二元偏好值和偏好强度的一个结合,而不是对评分矩阵直接进行建模。因此,评价就不是与用户对商品的显性评分而是和所观察到的用户偏好强度关联了起来。然后,这个模型将尝试找到隐语义因子来预估一个用户对一个商品的偏好。
package org.apache.spark.examples.mllib;
// $example on$
import scala.Tuple2;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.recommendation.ALS;
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel;
import org.apache.spark.mllib.recommendation.Rating;
import org.apache.spark.SparkConf;
// $example off$
public class JavaRecommendationExample {
public static void main(String args[]) {
// $example on$
SparkConf conf = new SparkConf().setAppName("Java Collaborative Filtering Example");
JavaSparkContext jsc = new JavaSparkContext(conf);
// Load and parse the data
String path = "../data/mllib/als/test.data";
JavaRDD<String> data = jsc.textFile(path);
JavaRDD<Rating> ratings = data.map(
new Function<String, Rating>() {
public Rating call(String s) {
String[] sarray = s.split(",");
return new Rating(Integer.parseInt(sarray[0]), Integer.parseInt(sarray[1]),
Double.parseDouble(sarray[2]));
}
}
);
// Build the recommendation model using ALS
int rank = 10;
int numIterations = 10;
//使用具体评分数进行训练
MatrixFactorizationModel model = ALS.train(JavaRDD.toRDD(ratings), rank, numIterations, 0.01);
//忽略评分数据进行模型训练
//MatrixFactorizationModel model = ALS.trainImplicit(JavaRDD.toRDD(ratings), rank, numIterations, 0.01, 0.01);
// Evaluate the model on rating data
JavaRDD<Tuple2<Object, Object>> userProducts = ratings.map(
new Function<Rating, Tuple2<Object, Object>>() {
public Tuple2<Object, Object> call(Rating r) {
return new Tuple2<Object, Object>(r.user(), r.product());
}
}
);
JavaPairRDD<Tuple2<Integer, Integer>, Double> predictions = JavaPairRDD.fromJavaRDD(
model.predict(JavaRDD.toRDD(userProducts)).toJavaRDD().map(
new Function<Rating, Tuple2<Tuple2<Integer, Integer>, Double>>() {
public Tuple2<Tuple2<Integer, Integer>, Double> call(Rating r){
return new Tuple2<Tuple2<Integer, Integer>, Double>(
new Tuple2<Integer, Integer>(r.user(), r.product()), r.rating());
}
}
));
JavaRDD<Tuple2<Double, Double>> ratesAndPreds =
JavaPairRDD.fromJavaRDD(ratings.map(
new Function<Rating, Tuple2<Tuple2<Integer, Integer>, Double>>() {
public Tuple2<Tuple2<Integer, Integer>, Double> call(Rating r){
return new Tuple2<Tuple2<Integer, Integer>, Double>(
new Tuple2<Integer, Integer>(r.user(), r.product()), r.rating());
}
}
)).join(predictions).values();
double MSE = JavaDoubleRDD.fromRDD(ratesAndPreds.map(
new Function<Tuple2<Double, Double>, Object>() {
public Object call(Tuple2<Double, Double> pair) {
Double err = pair._1() - pair._2();
return err * err;
}
}
).rdd()).mean();
System.out.println("Mean Squared Error = " + MSE);
// Save and load model
model.save(jsc.sc(), "target/tmp/myCollaborativeFilter");
MatrixFactorizationModel sameModel = MatrixFactorizationModel.load(jsc.sc(),
"target/tmp/myCollaborativeFilter");
//使用模型为用户推荐内容
Rating[] recommendations =sameModel.recommendProducts(1, 3);
for(int i=0;i<recommendations.length;i++){
System.out.println("推荐的产品:"+recommendations[i].product());
}
// $example off$
}
}