推03,最最最简单的推荐系统是什么样的 | 附Spark实践案例
接前面这篇《 推02,就算是非技术人员也都有必要了解的一些推荐系统常识》,之前的开篇01/02,其实都是以理论、场景化,概念进行铺垫的,让大伙儿大概知道推荐系统是怎么回事,从这篇开始,照顾一下技术的童鞋,我们开始回归到技术层面,并且 附上代码案例(见后面部分)。当然,依然是入门级,高高高手可以绕路。
01 什么是最最最简单的推荐机制
如标题,既然是“最最最简单”的推荐系统,其实也不能说是推荐系统,之前也说了,系统是一个复合的完整系统,所以这里说推荐机制可能会更恰当些。结合之前大致陈述的一些推荐机制,最最最简单的推荐机制,无疑是基于主体属性相似或者相关推荐了,连个性化都说不上,铁定最最最简单了。
说到这,说不定有些人不愿意干了,既然如此简陋的推荐机制,不看也罢。BUT,真的不要小看基于内容相似的推荐,存在即合理。我们在进行信息获取时,其实本身就是具有一定识别能力的,这意味着我们最终选择查看的信息都是经过我们大脑大致思考的结论,这意味着这些信息是有参考价值的。
并且,在很多时候,我们是需要同类信息进行我们获取到的信息进行补全的,完善我们对目标信息的获取程度。基于这个考量,基于内容属性的推荐其实是说的过去的。
别不服,我们来上例子,还是以前面文章那个腾讯视频的推荐场景图为例。
图是在使用腾讯视频观看视频时,我亲手截推荐栏位的内容,补充一下背景(很重要,请注意):
1 我当时观看的是应该是《蓝色星球第二季》纪录片
2 我经常在腾讯视频上看的一般是大片,并且一般是国外的
3 由于是VIP账号,梓尘兄也经常用这个账号看动画片,诸如《小猪佩奇》之类的
4 在腾讯视频上,纪录片中,我只看过《地球脉动》和《蓝色星球》,并且,我真不是纪录片的爱好者,只是喜欢这两部而已
基于上面我提供的个人行为数据,再结合看这批推荐列表,不难发现,上面有很多的纪录片,你觉得跟我们当时正在浏览的内容有没有关系?或者你认为我行为记录中很多纪录片的记录?又或者是我是纪录片的狂热者,导致了腾讯视频给我猛推纪录片。
所以,连腾讯视频都会考虑基于当前浏览内容的属性进行推荐(并且是大范围),你还觉得这种做法十分之LOW吗?当然你也可以认为腾讯视频推的不准,瞎J吧推,也是可以的,我也认同,不是非常准(哈哈,《地球脉动》所有我都看过了,还给我瞎推,上面给推的没几个有欲望去点的,给腾讯视频推荐的开发兄弟们打脸了,不好意思)。
我只想表达的是,这种简单的推荐机制,在整个推荐系统中真的是不可缺少的部分,很多时候简单并不代表无效,类似上面这种情况,我可以举出太多有名有姓的实际案例来,说多了没意义,所以,咱继续。
02 其实并没有这么简单
从直接的推荐机制来看,整个实现流程看着真的很简单,但是在实际的操作过程中,还是有一些东西值得探讨以及注意的。
第一、首先是,相似计算的过程
之前文章有大致提到过,相似或者相关计算还是有很多可以选择的,他们每一种都有各自的特点以及适应性。以相似计算中使用最多的欧式距离与余弦相似为例,专业点的说法就是余弦夹角可以有效规避个体相同认知中不同程度的差异表现,更注重维度之间的差异,而不注重数值上的差异,而欧式距离则是对个体异常数值会比较敏感。
这意味着,在我们需要区分异常样本时,使用距离计算会更恰当,聚个栗子,比如电商领域中高价值与低价值用户的区分,其实我们核心是想把他们的差异性拉大的,得以体现出对比,这个时候使用余弦就是不合理的。
在回归到距离上说,市面上除了欧式距离,还有好几种距离度量,诸如马氏、曼哈顿距离等等,其实其度量侧重都是不一样的,我们需要结合实际的场景去使用。还有更偏向于相关度量的皮尔森相关系数等。
第二、需要解决相似计算中,计算矩阵过大的问题
按照标准流程,假设有1万个物品,则对于每个物品来说,需要与其他物品计算与其的相似度或者相关度,然后再排个序,取TopN形成自身的待推荐列表。那么,简单的数学题来了10000*10000=10000万次计算,这显然是不合理的。
所以,优化这个过程是必然的,关键是如何优化。核心思想其实就是初筛嘛!把那些完全没啥多大鸟关系的直接干掉,省掉计算相似的过程,节省资源。如何筛选?一个比较常见的做法是,寻找核心关键影响因素,保证关键因素的相关性。
比如,在实际的生产操作过程中,很多时候会通过关键属性是否基本匹配作为判断依据,或者直接通过搜索构建进行检索初筛,把大致相关的先过滤一遍,通过这种简单而粗暴的方式,其实已经能把大部分相关度很低的候选集给过滤掉,对于整体计算量级来说,计算复杂度直接下降。
第三、多个因子如何权衡影响权重
基于属性计算相似,从整体上来看,其实一般主体都不止一个属性,那么计算相关的时候到底看那个属性呢?或者说哪些属性应该占有更高的权重,哪些因素是次要因素。
还是以上面的腾讯视频的推荐为例,从结果上来反推相似推荐的部分(当然,实际情况不详哈,只是推断而已),显然当前视频的类别占了很大的权重(记录片),除此之外包括导演啊,一些其他特征属性应该也会参考在内的。
回到常规问题,如何确定影响权重是个操作难题。最简单并且实际上还挺有效的一种方式就是专家评判法,即通过权威经验来划定影响因子的权重,还有就是通过标注的样本进行反向拟合每种因素的占比权重。除此之外还有一些其他学术上的方法,包括什么主成分分析法,层次分析法,还有什么熵权法,其实都是找因子影响能力的主次关系。
最终确定好了影响因素,在实际上线回收到数据之后,依然是需要逐步的进行权重影响调整的,我们可以通过结果的样本数据,进行LR的回归拟合,寻找最合适的权重配比。
03 说实践案例比较实在
说了这么多理论,不能光说不练,标题上写着“附Spark案例”,很多人都是冲着这来的呢,前面BB了这么多屁话,还不见代码。来,我们这就上正文。
不过不用期待过多,毕竟这只是一个简单的相似计算的过程而已,所以权当属性实验数据以及Spark开发了,高手可以略过了。
一、实验数据简介
其实看到这三部分数据的简介,一些老手估计已经知道是什么数据了,是的,就是那份有名的电影数据集(MovieLens开放数据),并且取的是完全版的那份,简直成了推荐系统的标配实验数据了。
三个文件,其中电影数据集共1万多个电影基础数据,评分数据集最大共100万条评分数据,以及10万条的用户对电影的打标签数据,总大小约为几百兆,不大,但是用来做实验玩玩那是相当足够了。
二、推荐机制逻辑
我们的核心计算逻辑还是内容属性上的相似嘛,所以核心是看看围绕电影,有哪些属性是可以抽取出来的,并且参与计算的。
第一,电影的类别,基于上面腾讯视频的考虑,其实这个显然很重要,而电影的类别信息存储于电影数据集中,并且是一对多的关系,即一个电影可以对应多个类目,我们需要进行切割,由于计算这个维度相似的时候,是多对多的关系,天然的计算相似或者相关的特征。
第二、电影的播放年份,电影的年份其实有种潜在的关联关系,举个例子可以说明,比如说零几年的电影与现状的电影风格是不同的,当时间跨度有一定差距时,这个还是蛮明显的。关于电影的年份数据,从数据样本可以知道,它隐藏在电影的名字中,写个正则过滤出来即可。至于说如何计算这个维度的相关,我们可以用两者差距来算相关,比如年份绝对值越远,这个值越小,越近甚至是重叠就越大。
第三,电影的标签,电影本身是没有标签属性的,但它有用户对他打的标签信息,所以我们需要进一步处理,把它变成电影的属性,需要清洗、规整以及处理。标签本身也是多对多的关系,同样可以计算相似度,比如欧式或者余弦。
第四、电影的名称,名称上进行寻找关联性,听上去很扯,但其实有一定的逻辑在里头,比如我在视频网站搜索“三国”,显然我期望从名称上寻找三国相关题材的视频,他们就是在名称上建立起关联关系的,所以,名称从某种程度上来说,可以体现相关性。在计算相似或者相关方式上,我们可以进行分词,去除停词,然后再以词维度进行余弦计算。
第五、候选集电影的评分,对于做推荐来说,首先需要保证的推荐的候选集一定是优质的,从这个维度上说,抛开其他因素,那么就是整体评分高的电影是相对优质的电影。在处理的过程中,由于一个电影对应多个评分,所以,我们需要进行进行归一计算,最简单的做法就是计算整体评分的平均值,作为电影的评分数据,评分过低的数据直接从候选集中干掉,又大大的降低了计算次数。
三、代码逻辑
Spark2.0之后,不用再构建sparkcontext了,以创建一个复合多功能的SparkSession替代,可以正常的从HDFS读取文件,也可以从Hive中获取DataFrame等等。
val sparkSession = SparkSession
.builder()
.appName("base-content-Recommand") //spark任务名称
.enableHiveSupport()
.getOrCreate()
那三个表可以先load到Hive中,然后spark直接从Hive中读取,形成DataFrame。
//从hive中,获取rating评分数据集,最终形成如下格式数据(movie,avg_rate)
val movieAvgRate = sparkSession.sql("select movieid,round(avg(rate),1) as avg_rate from tx.tx_ratings group by movieid").rdd.map{
f=>
(f.get(0),f.get(1))
}
//获取电影的基本属性数据,包括电影id,名称,以及genre类别
val moviesData = sparkSession.sql("select movieid,title,genre from tx.tx_movies").rdd
//获取电影tags数据,这里取到所有的电影tag
val tagsData = sparkSession.sql("select movieid,tag from tx.tx_tags").rdd
先对tag进行处理,很多tag其实说的是同一个东西,我们需要进行一定程度上的合并,这样才能让tag更加的合理(有朋友有意见了,就一个实验案例而已,搞这么复杂),举个简单例子,blood、bloods、bloody其实都是想说这个电影很血腥暴力,但是不同的人使用的词不同的(这点大伙儿可以自由查看实验数据),所以我们需要进行一定程度上的合并。
val tagsStandardizeTmp = tagsStandardize.collect()
val tagsSimi = tagsStandardize.map{
f=>
var retTag = f._2
if (f._2.toString.split(" ").size == 1) {
var simiTmp = ""
val tagsTmpStand = tagsStandardizeTmp
.filter(_._2.toString.split(" ").size != 1 )
.filter(f._2.toString.size < _._2.toString.size)
.sortBy(_._2.toString.size)
var x = 0
val loop = new Breaks
tagsTmpStand.map{
tagTmp=>
val flag = getEditSize(f._2.toString,tagTmp._2.toString)
if (flag == 1){
retTag = tagTmp._2
loop.break()
}
}
((f._1,retTag),1)
} else {
((f._1,f._2),1)
}
}
其中getEditSize是求取,两个词的编辑距离的,编辑距离在一定时候,进行合并,具体逻辑见代码了,不复杂。
def getEditSize(str1:String,str2:String): Int ={
if (str2.size > str1.size){
0
} else {
//计数器
var count = 0
val loop = new Breaks
//以较短的str2为中心,进行遍历,并逐个比较字符
val lengthStr2 = str2.getBytes().length
var i = 0
for ( i <- 1 to lengthStr2 ){
if (str2.getBytes()(i) == str1.getBytes()(i)) {
//逐个匹配字节,相等则计数器+1
count += 1
} else {
//一旦出现前缀不一致则中断循环,开始计算重叠度
loop.break()
}
}
//计算重叠度,当前缀重叠度大于等于2/7时,进行字符串合并,从长的往短的合并
if (count.asInstanceOf[Double]/str1.getBytes().size.asInstanceOf[Double] >= (1-0.286)){
1
}else{
0
}
}
}
继续对tag进行处理,统计tag频度,取TopN个作为电影对应的tag属性。
val movieTag = tagsSimi.reduceByKey(_+_).groupBy(k=>k._1._1).map{
f=>
(f._1,f._2.map{
ff=>
(ff._1._2,ff._2)
}.toList.sortBy(_._2).reverse.take(10).toMap)
}
接下来处理年龄、年份和名称,这个会简单点,进行分词处理的话,怎么简单怎么来了,直接使用第三方的HanLP进行关键词抽取作为分词结果,直接屏蔽了停用词。
val moviesGenresTitleYear = moviesData.map{
f=>
val movieid = f.get(0)
val title = f.get(1)
val genres = f.get(2).toString.split("|").toList.take(10)
val titleWorlds = HanLP.extractKeyword(title.toString, 10).toList
val year = movieYearRegex.movieYearReg(title.toString)
(movieid,(genres,titleWorlds,year))
}
取年份的正则函数如下,是个Java写的精通工具类(Scala和Java混写,简直无比美妙)。
package utils;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Desc: 抽取年份公式
*/
public class movieYearRegex {
private static String moduleType = ".* \\(([1-9][0-9][0-9][0-9])\\).*";
public static void main(String[] args){
System.out.println(movieYearReg("GoldenEye (1995)"));
}
public static int movieYearReg(String str){
int retYear = 1994;
Pattern patternType = Pattern.compile(moduleType);
Matcher matcherType = patternType.matcher(str);
while (matcherType.find()) {
retYear = Integer.parseInt(matcherType.group(1));
}
return retYear;
}
}
通过join进行数据合并,生成一个以电影id为核心的属性集合。
val movieContent = movieTag.join(movieAvgRate).join(moviesGenresTitleYear).map{
f=>
//(movie,tagList,titleList,year,genreList,rate)
(f._1,f._2._1._1,f._2._2._2,f._2._2._3,f._2._2._1,f._2._1._2)
}
相似计算开始之前,还记得我们之前说的吗,可以进行候选集阉割,我们先根据一些规则裁剪一下候选集。
val movieConetentTmp = movieContent.filter(f=>f._6.asInstanceOf[java.math.BigDecimal].doubleValue() < 3.5).collect()
然后真正的开始计算相似,使用余弦相似度计算,取排序之后的Top20作为推荐列表。
val movieContentBase = movieContent.map{
f=>
val currentMoiveId = f._1
val currentTagList = f._2 //[(tag,score)]
val currentTitleWorldList = f._3
val currentYear = f._4
val currentGenreList = f._5
val currentRate = f._6.asInstanceOf[java.math.BigDecimal].doubleValue()
val recommandMovies = movieConetentTmp.map{
ff=>
val tagSimi = getCosTags(currentTagList,ff._2)
val titleSimi = getCosList(currentTitleWorldList,ff._3)
val genreSimi = getCosList(currentGenreList,ff._5)
val yearSimi = getYearSimi(currentYear,ff._4)
val rateSimi = getRateSimi(ff._6.asInstanceOf[java.math.BigDecimal].doubleValue())
val score = 0.4*genreSimi + 0.25*tagSimi + 0.1*yearSimi + 0.05*titleSimi + 0.2*rateSimi
(ff._1,score)
}.toList.sortBy(k=>k._2).reverse.take(20)
(currentMoiveId,recommandMovies)
}.flatMap(f=>f._2.map(k=>(f._1,k._1,k._2))).map(f=>Row(f._1,f._2,f._3))
最后,将结果存入Hive中,Hive中提前建好结果表。
//我们先进行DataFrame格式化申明
val schemaString2 = "movieid movieid_recommand score"
val schemaContentBase = StructType(schemaString2.split(" ")
.map(fieldName=>StructField(fieldName,if (fieldName.equals("score")) DoubleType else StringType,true)))
val movieContentBaseDataFrame = sparkSession.createDataFrame(movieContentBase,schemaContentBase)
//将结果存入hive,需要先进行临时表创建
val userTagTmpTableName = "mite_content_base_tmp"
val userTagTableName = "mite8.mite_content_base_reco"
movieContentBaseDataFrame.registerTempTable(userTagTmpTableName)
sparkSession.sql("insert into table " + userTagTableName + " select * from " + userTagTmpTableName)
到这里,基本大的代码逻辑就完了,可能还有一些边边角角的代码遗漏了,但不妨碍主干了。
04 写在最后
写到这里,一篇有业务、有理论、还有代码的硬文章就出来了,不过在文章中嵌代码总是很难搞的,想要看整体的代码,还是得看工程。
想要进一步研究代码逻辑以及实际跑一跑这个Spark实验案例的,可以加我,从我这要实验数据以及完整的代码文件,不收钱。
当然,如果你是初学者,想要向我了解更多推荐相关的东西,以及代码逻辑进行详细的讲解,甚至是跑到Spark环境中的整体流程,这个是要收咨询费的,哈哈,老老实实给我打100大洋的红包,包服务到家(没办法穷),甚至后续的《基于用户画像的推荐》、《基于协同的推荐》相关的实践工程都一并讲了,联系微信见下。
关于我:
大数据行业半个老鸟,我家梓尘兄的超级小弟,会敲代码、会写文章,还会泡奶粉哄小屁孩。
想和我交流的,可以加我个人微信mute88,可以拉你入交流群,但请注明身份and来意~
系列文章:
《推03,最最最简单的推荐系统是什么样的 | 附Spark实践案例》(本文)