以个性化新闻推荐为例,整个过程分成两个mapreduce阶段,由于hadoop流不支持多个mapreduce过程的自动化,所以所有mapreduce过程命令必须人工一个一个的执行。
1、首先需要将原始数据处理成如下形式的两个文件
文件一:Item_user_score.txt
格式:物品—用户—分数
如下图中第一行,物品100655565被用户1634974浏览过,则将分数记为1
文件二:Item_Item_number.txt
格式:物品—物品—相似度
如下图中第二行,物品100654360与物品100650498同时被两个用户浏览过
2、矩阵乘法
其实文件一和文件二分别保存着一个矩阵,第二步就是要做矩阵乘法。
step1mapper.py
#!/usr/bin/env python
"""A more advanced Reducer, using Python iterators and generators."""
from itertools import groupby
from operator import itemgetter
import sys
def read_mapper_output(file, separator='\t'):
for line in file:
yield line.strip().split(separator,1)
def main(separator='\t'):
# input comes from STDIN (standard input)
datas = read_mapper_output(sys.stdin, separator=separator)
# groupby groups multiple word-count pairs by word,
# and creates an iterator that returns consecutive keys and their group:
# current_word - string containing a word (the key)
# group - iterator yielding all ["<current_word>", "<count>"] items
#print data
for data in datas:
print "%s%s%s" % (data[0], separator, data[1])
if __name__ == "__main__":
main()
step1mapper.py的主要目的是将两个文件中的键值对取出,然后相同的key输入到同一个reducer。
step1reducer.py
#!/usr/bin/env python
"""A more advanced Reducer, using Python iterators and generators."""
from itertools import groupby
from operator import itemgetter
import sys
def read_mapper_output(file, separator='\t'):
for line in file:
yield line.split()
def main(separator='\t'):
# input comes from STDIN (standard input)
data = read_mapper_output(sys.stdin, separator=separator)
# groupby groups multiple word-count pairs by word,
# and creates an iterator that returns consecutive keys and their group:
# current_word - string containing a word (the key)
# group - iterator yielding all ["<current_word>", "<count>"] items
#print data
datanew={}
for key in data:
if key[0] not in datanew.keys():
datanew[key[0]]=[]
datanew[key[0]].append([key[1],key[2]])
for line in datanew.keys():
user_score=[]
item_number=[]
for temp in datanew[line]:
if int(temp[0])/1000000 < 99:
user_score.append(temp)
else :
item_number.append(temp)
for attr1 in user_score:
for attr2 in item_number:
print "%s%s%s" % (attr1[0], separator, attr2[0]+separator+str(float(attr1[1])*int(attr2[1])))
if __name__ == "__main__":
main()
step1reducer.py的输出形式为:1634974 10065436034.0
可以理解为用户1634974对新闻100654360的喜爱程度为34.
执行命令为:
~/hadoop-2.3.0/bin/hadoop jar hadoop-streaming-2.3.0.jar -mapper step1mapper.py -reducer step1reducer.py -input /input/test.txt -output /step1out.txt -file step1mapper.py -file step1reducer.py
3、结果融合
将上一个过程reducer的输出作为该过程mapper阶段的输入。
step2mapper.py
#!/usr/bin/env python
"""A more advanced Reducer, using Python iterators and generators."""
from itertools import groupby
from operator import itemgetter
import sys
def read_mapper_output(file, separator='\t'):
for line in file:
yield line.strip().split(separator,1)
def main(separator='\t'):
# input comes from STDIN (standard input)
datas = read_mapper_output(sys.stdin, separator=separator)
# groupby groups multiple word-count pairs by word,
# and creates an iterator that returns consecutive keys and their group:
# current_word - string containing a word (the key)
# group - iterator yielding all ["<current_word>", "<count>"] items
#print data
for data in datas:
print "%s%s%s" % (data[0], separator, data[1])
if __name__ == "__main__":
main()
step2reducer.py
推荐最可能的5个
#!/usr/bin/env python
"""A more advanced Reducer, using Python iterators and generators."""
import operator
from itertools import groupby
from operator import itemgetter
import sys
def read_mapper_output(file, separator='\t'):
for line in file:
yield line.split()
def main(separator='\t'):
# input comes from STDIN (standard input)
data = read_mapper_output(sys.stdin, separator=separator)
# groupby groups multiple word-count pairs by word,
# and creates an iterator that returns consecutive keys and their group:
# current_word - string containing a word (the key)
# group - iterator yielding all ["<current_word>", "<count>"] items
#print data
datanew={}
for key in data:
# print "\n--------------------------"
if key[0] not in datanew.keys():
datanew[key[0]]=[]
datanew[key[0]].append([key[1],key[2]])
for key in datanew.keys():
result={}
for data in datanew[key]:
if data[0] not in result.keys():
result[data[0]]=0.0
result[data[0]] += float(data[1])
for rec_item in sorted(result.iteritems(),key=operator.itemgetter(1),reverse=True)[:5]:
print "%s\t%s\t%s" % (key, rec_item[0], rec_item[1])
if __name__ == "__main__":
main()
执行命令为:
~/hadoop-2.3.0/bin/hadoop jar hadoop-streaming-2.3.0.jar -mapper step2mapper.py -reducer step2reducer.py -input/output/step1out.txt -output /step2out.txt -file step2mapper.py -file step2reducer.py
输出形式为:
PS:
其实两个mapreduce阶段的mapper都是冗余的,完全可以在执行命令的时候加入参数:
-D stream.map.input.field.separator='\t'
//输入文件中的数据每行都以tab作为分隔符
-D stream.num.map.input.key.fields=1
//每行第一个tab之前的数据作为key,其余的作为value
这样两个mapreduce过程就可以合并成一个mapreduce过程执行。
作者:keepreder 发表于2015/8/7 17:44:38
原文链接