用Hadoop流实现mapreduce版推荐系统基于物品的协同过滤算法

标签: hadoop mapreduce 推荐系统 | 发表时间:2015-08-07 17:44 | 作者:keepreder
出处:http://blog.csdn.net

       以个性化新闻推荐为例,整个过程分成两个mapreduce阶段,由于hadoop流不支持多个mapreduce过程的自动化,所以所有mapreduce过程命令必须人工一个一个的执行。

1、首先需要将原始数据处理成如下形式的两个文件

文件一:Item_user_score.txt

格式:物品—用户—分数

如下图中第一行,物品100655565被用户1634974浏览过,则将分数记为1


文件二:Item_Item_number.txt

格式:物品—物品—相似度

如下图中第二行,物品100654360与物品100650498同时被两个用户浏览过


2、矩阵乘法

其实文件一和文件二分别保存着一个矩阵,第二步就是要做矩阵乘法。

step1mapper.py

#!/usr/bin/env python
"""A more advanced Reducer, using Python iterators and generators."""

from itertools import groupby
from operator import itemgetter
import sys

def read_mapper_output(file, separator='\t'):
	for line in file:
		yield line.strip().split(separator,1)

def main(separator='\t'):
	# input comes from STDIN (standard input)
	datas = read_mapper_output(sys.stdin, separator=separator)
	# groupby groups multiple word-count pairs by word,
	# and creates an iterator that returns consecutive keys and their group:
	#   current_word - string containing a word (the key)
	#   group - iterator yielding all ["<current_word>", "<count>"] items
	#print data
	for data in datas:
		print "%s%s%s" % (data[0], separator, data[1])
if __name__ == "__main__":
	main()


step1mapper.py的主要目的是将两个文件中的键值对取出,然后相同的key输入到同一个reducer。



step1reducer.py

#!/usr/bin/env python
"""A more advanced Reducer, using Python iterators and generators."""

from itertools import groupby
from operator import itemgetter
import sys

def read_mapper_output(file, separator='\t'):
	for line in file:
		yield line.split()

def main(separator='\t'):
	# input comes from STDIN (standard input)
	data = read_mapper_output(sys.stdin, separator=separator)
	# groupby groups multiple word-count pairs by word,
	# and creates an iterator that returns consecutive keys and their group:
	#   current_word - string containing a word (the key)
	#   group - iterator yielding all ["<current_word>", "<count>"] items
	#print data
	datanew={}
	for key in data:
		if key[0] not in datanew.keys():
			datanew[key[0]]=[]
		datanew[key[0]].append([key[1],key[2]])
	for line in datanew.keys():
		user_score=[]
		item_number=[]
		for temp in datanew[line]:
			if int(temp[0])/1000000 < 99:
				user_score.append(temp)
			else :
				item_number.append(temp)
		for attr1 in user_score:
			for attr2 in item_number:
				print "%s%s%s" % (attr1[0], separator, attr2[0]+separator+str(float(attr1[1])*int(attr2[1])))
if __name__ == "__main__":
	main()


step1reducer.py的输出形式为:1634974 10065436034.0

可以理解为用户1634974对新闻100654360的喜爱程度为34.


执行命令为:

  ~/hadoop-2.3.0/bin/hadoop jar hadoop-streaming-2.3.0.jar -mapper step1mapper.py -reducer step1reducer.py -input /input/test.txt -output /step1out.txt -file step1mapper.py -file step1reducer.py  


3、结果融合

将上一个过程reducer的输出作为该过程mapper阶段的输入。

step2mapper.py

#!/usr/bin/env python
"""A more advanced Reducer, using Python iterators and generators."""

from itertools import groupby
from operator import itemgetter
import sys

def read_mapper_output(file, separator='\t'):
	for line in file:
		yield line.strip().split(separator,1)

def main(separator='\t'):
	# input comes from STDIN (standard input)
	datas = read_mapper_output(sys.stdin, separator=separator)
	# groupby groups multiple word-count pairs by word,
	# and creates an iterator that returns consecutive keys and their group:
	#   current_word - string containing a word (the key)
	#   group - iterator yielding all ["<current_word>", "<count>"] items
	#print data
	for data in datas:
		print "%s%s%s" % (data[0], separator, data[1])
if __name__ == "__main__":
	main()

step2reducer.py

推荐最可能的5个

#!/usr/bin/env python
"""A more advanced Reducer, using Python iterators and generators."""
import operator
from itertools import groupby
from operator import itemgetter
import sys

def read_mapper_output(file, separator='\t'):
	for line in file:
		yield line.split()

def main(separator='\t'):
	# input comes from STDIN (standard input)
	data = read_mapper_output(sys.stdin, separator=separator)
	# groupby groups multiple word-count pairs by word,
	# and creates an iterator that returns consecutive keys and their group:
	#   current_word - string containing a word (the key)
	#   group - iterator yielding all ["<current_word>", "<count>"] items
	#print data
	datanew={}
	for key in data:
#		print "\n--------------------------"
		if key[0] not in datanew.keys():
			datanew[key[0]]=[]
		datanew[key[0]].append([key[1],key[2]])
	for key in datanew.keys():
		result={}
		for data in datanew[key]:
			if data[0] not in result.keys():
				result[data[0]]=0.0
			result[data[0]] += float(data[1])
		for rec_item in sorted(result.iteritems(),key=operator.itemgetter(1),reverse=True)[:5]:
			print "%s\t%s\t%s" % (key, rec_item[0], rec_item[1])
if __name__ == "__main__":
	main()


执行命令为:

  ~/hadoop-2.3.0/bin/hadoop jar hadoop-streaming-2.3.0.jar -mapper step2mapper.py -reducer step2reducer.py -input/output/step1out.txt -output /step2out.txt -file step2mapper.py -file step2reducer.py  


输出形式为:


PS:

其实两个mapreduce阶段的mapper都是冗余的,完全可以在执行命令的时候加入参数:

-D stream.map.input.field.separator='\t'   //输入文件中的数据每行都以tab作为分隔符

-D stream.num.map.input.key.fields=1       //每行第一个tab之前的数据作为key,其余的作为value

这样两个mapreduce过程就可以合并成一个mapreduce过程执行。
作者:keepreder 发表于2015/8/7 17:44:38 原文链接
阅读:18 评论:0 查看评论

相关 [hadoop mapreduce 推荐系统] 推荐:

Hadoop MapReduce技巧

- - 简单文本
我在使用Hadoop编写MapReduce程序时,遇到了一些问题,通过在Google上查询资料,并结合自己对Hadoop的理解,逐一解决了这些问题. Hadoop对MapReduce中Key与Value的类型是有要求的,简单说来,这些类型必须支持Hadoop的序列化. 为了提高序列化的性能,Hadoop还为Java中常见的基本类型提供了相应地支持序列化的类型,如IntWritable,LongWritable,并为String类型提供了Text类型.

用Hadoop流实现mapreduce版推荐系统基于物品的协同过滤算法

- - CSDN博客推荐文章
       以个性化新闻推荐为例,整个过程分成两个mapreduce阶段,由于hadoop流不支持多个mapreduce过程的自动化,所以所有mapreduce过程命令必须人工一个一个的执行. 1、首先需要将原始数据处理成如下形式的两个文件. 文件一:Item_user_score.txt. 如下图中第一行,物品100655565被用户1634974浏览过,则将分数记为1.

下一代Hadoop MapReduce

- Jia - NoSQLFan
本文来自Hadoop Summit大会的一个演讲稿,主讲是Hadoop核心开发团队的Arun C Murthy (@acmurthy),同时他也是Yahoo!刚刚剥离的Hadoop独立公司Hortonworks的 Founder和架构师. 演讲中他讲述了现在的Hadoop存在的一些问题和集群上限,并展望了下一代Hadoop和其MapReduce将会得到的巨大提升.

"Hadoop/MapReduce/HBase"分享总结

- - ITeye博客
此分享是关于hadoop生态系统的简单介绍包括起源到相对应用. Hadoop和HBase.pdf (2.1 MB). 已有 0 人发表留言,猛击->> 这里<<-参与讨论. —软件人才免语言低担保 赴美带薪读研.

Hadoop之MapReduce单元测试

- - ITeye博客
通常情况下,我们需要用小数据集来单元测试我们写好的map函数和reduce函数. 而一般我们可以使用Mockito框架来模拟OutputCollector对象(Hadoop版本号小于0.20.0)和Context对象(大于等于0.20.0). 下面是一个简单的WordCount例子:(使用的是新API).

Hadoop MapReduce高级编程

- - 互联网 - ITeye博客
•combine函数把一个map函数产生的对(多个key, value)合并成一个新的. 将新的作为输入到reduce函数中,其格式与reduce函数相同. •这样可以有效的较少中间结果,减少网络传输负荷. •什么情况下可以使用Combiner.

[转]基于mapreduce的Hadoop join实现

- -
对于一个大数据的分析应用,join是必不可少的一项功能.现在很多构建与hadoop之上的应用,如Hive,PIG等在其内部实现了join程序,可以通过很简单的sql语句或者数据操控脚本完成相应的Join工作.那么join应该如何实现呢?今天我们就对join做一个简单的实现. 我们来看一个例子,现在有两组数据:一组为单位人员信息,如下:.

【Hadoop】MapReduce使用combiner优化性能

- - CSDN博客云计算推荐文章
当MapReduce模型中,reduce执行的任务为统计分类类型的值总量或去重后的数量,或最大值最小值时,可以考虑在Map输出后进行combine操作;这样可以减少网络传输带来的开销,同时减轻了reduce任务的负担. Combine操作是运行在每个节点上的,只会影响本地Map的输出结果;Combine的输入为本地map的输出结果(一般是数据在溢出到磁盘之前,可以减少IO开销),其输出则作为reduce的输入.

hadoop的IO和MapReduce优化参数

- - CSDN博客系统运维推荐文章
           在MapReduce执行过程中,特别是Shuffle阶段,尽量使用内存缓冲区存储数据,减少磁盘溢写次数;同时在作业执行过程中增加并行度,都能够显著提高系统性能,这也是配置优化的一个重要依据.            下面分别介绍I/O属性和MapReduce属性这两个类的部分属性,并指明其优化方向.

Hadoop MapReduce编程入门案例

- - CSDN博客云计算推荐文章
Hadoop入门例程简析中. (下面的程序下载地址: http://download.csdn.net/detail/zpcandzhj/7810829). (1)Hadoop新旧API的区别. 新的API倾向于使用虚类(抽象类),而不是接口,因为这更容易扩展. 例如,可以无需修改类的实现而在虚类中添加一个方法(即用默认的实现).