(转) 基于MapReduce的ItemBase推荐算法的共现矩阵实现(一)

转自:http://zengzhaozheng.blog.51cto.com/8219051/1557054

一、概述

这2个月为公司数据挖掘系统做一些根据用户标签情况对用户的相似度进行评估,其中涉及一些推荐算法知识,在这段时间研究了一遍《推荐算法实践》和《Mahout in action》,在这里主要是根据这两本书的一些思想和自己的一些理解对分布式基于ItemBase的推荐算法进行实现。其中分两部分,第一部分是根据共现矩阵的方式来简单的推算出用户的推荐项,第二部分则是通过传统的相似度矩阵的方法来实践ItemBase推荐算法。这篇blog主要记录第一部分的内容,并且利用MapReduce进行实现,下一篇blog则是记录第二部分的内容和实现。

二、算法原理

协同推荐算法,作为众多推荐算法中的一种已经被广泛的应用。其主要分为2种,第一种就是基于用户的协同过滤,第二种就是基于物品的协同过滤。

所谓的itemBase推荐算法简单直白的描述就是:用户A喜欢物品X1,用户B喜欢物品X2,如果X1和X2相似则,将A之前喜欢过的物品推荐给B,或者B之前喜欢过的物品推荐给A。这种算法是完全依赖于用户的历史喜欢物品的;所谓的UserBase推荐算法直白地说就是:用户A喜欢物品X1,用户B喜欢物品X2,如果用户A和用户B相似则将物品X1推荐给用户B,将物品X2推荐给用户A。简单的示意图:

wKioL1Qe6-7gZbvgAAF26e9WFgA063.jpgwKiom1Qe62Ox5ZhrAADsaRMZB0s347.jpg

至于选择哪种要看自己的实际情况,如果用户量比物品种类多得多那么就采用ItemBase的协同过滤推荐算法,如果是用户量比物品种类少的多则采用UserBase的协同顾虑推荐算,这样选择的一个原因是为了让物品的相似度矩阵或者用户相似度矩阵或者共现矩阵的规模最小化。

三、数据建模

基本的算法上面已经大概说了一下,对于算法来说,对数据建模使之运用在算法之上是重点也是难点。这小节主要根据自己相关项目的经验和《推荐引擎实践》的一些观点来讨论一些。分开2部分说,一是根据共现矩阵推荐、而是根据相似度算法进行推荐。

(1)共现矩阵方式:

第一步:转换成用户向量

1[102:0.1,103:0.2,104:0.3]:表示用户1喜欢的物品列表,以及他们对应的喜好评分。

2[101:0.1,102:0.7,105:0.9]:表示用户2喜欢的物品列表,以及他们对应的喜好评分。

3[102:0.1,103:0.7,104:0.2]:表示用户3喜欢的物品列表,以及他们对应的喜好评分。

第二步:计算共现矩阵

简单地说就是将同时喜欢物品x1和x2的用户数组成矩阵。

wKiom1QfAT_xHw_EAAD7ZBEpZ4E049.jpg

第三步:

生成用户对物品的评分矩阵

wKiom1QfA5_xQYhxAAB86TsiCxE249.jpg

第四步:物品共现矩阵和用户对物品的评分矩阵相乘得到推荐结果

wKiom1QfDkfDDcVWAAFPZSfTYP4878.jpg

举个例子计算用户1的推荐列表过程:

用户1对物品101的总评分计算:

1*0+1*0.1+0*0.2+0*0.3+1*0=0.1

用户1对物品102的总评分计算:

1*0+3*0.1+1*0.2+2*0.3+2*0=1.1

用户1对物品103的总评分计算:

0*0+1*0.1+1*0.2+1*0.3+0*0=0.6

用户1对物品104的总评分计算:

0*0+2*0.1+1*0.2+2*0.3+1*0=1.0

用户1对物品105的总评分计算:

1*0+2*0.1+0*0.2+1*0.3+2*0=0.5

从而得到用户1的推荐列表为1[101:0.1,102:1.1,103:0.6,104:1.0,105:0.5]再经过排序得到最终推荐列表1[102:1.1,104:1.0,103:0.6,105:0.5,101:0.1]。

(2)通过计算机物品相似度方式计算用户的推荐向量。

通过计算机物品相似度方式计算用户的推荐向量和上面通过共现矩阵的方式差不多,就是将物品相似度矩阵代替掉共现矩阵和用户对物品的评分矩阵相乘,然后在计算推荐向量。

计算相似度矩阵:

在计算之前我们先了解一下物品相似度相关的计算方法。

对于计算物品相似度的算法有很多,要根据自己的数据模型进行选择。基于皮尔逊相关系数计算、欧几里德定理(实际上是算两点距离)、基于余弦相似度计算斯皮尔曼相关系数计算、基于谷本系数计算、基于对数似然比计算。其中谷本系数和对数似然比这两种方式主要是针对那些没有指名对物品喜欢度的数据模型进行相似度计算,也就是mahout中所指的Boolean数据模型。下面主要介绍2种,欧几里德和余弦相似度算法。

wKioL1Qfw8GgP-nIAAIircdsK3M870.jpg现在关键是怎么将现有数据转化成对应的空间向量模型使之适用这些定理,这是个关键点。下面我以欧几里德定理作为例子看看那如何建立模型:

第一步:将用户向量转化为物品向量

用户向量:

1[102:0.1,103:0.2,104:0.3]

2[101:0.1,102:0.7,105:0.9]

3[102:0.1,103:0.7,104:0.2]

转为为物品向量:

101[2:0.1]

102[1:0.1,2:0.7,3:0.1]

103[1:0.2,3:0.7]

104[1:0.3,3:0.2]

105[2:0.9]

第二步:

那么物品相似度计算为:

wKiom1Qf0FyAsRypAAJKQFVRufY677.jpg

第三步:

最终得到物品相似度矩阵为:(这里省略掉没有意义的自关联相似度)

wKiom1Qf1svzxfXzAAEMhjBgreE838.jpg

第四步:物品相似度矩阵和用户对物品的评分矩阵相乘得到推荐结果:

wKioL1Qf3dqiMzOKAAF8WAjWLEo861.jpg

举个例子计算用户1的类似推荐列表过程:

用户1对物品101的总评分计算:

1*0+1*0.6186429+0*0.6964322+0*0.7277142+1*0.55555556=1.174198

用户1对物品102的总评分计算:

1*0.6186429+3*0+1*0.5188439+2*0.5764197+2*0.8032458=3.896818

用户1对物品103的总评分计算:

0*0.6964322+1*0.5188439+1*0+1*0.662294+0*0.463481=1.181138

用户1对物品104的总评分计算:

0*0.7277142+2*0.5764197+1*0.662294+2*0+1*0.5077338=2.322867

用户1对物品105的总评分计算:

1*0.55555556+2*0.8032458+0*0.463481+1*0.5077338=2.669780

四、共现矩阵方式的MapReduce实现

这里主要是利用MapReduce结合Mahout连的一些数据类型对共现矩阵方式的推荐方法进行实现,至于相似度矩阵方式进行推荐的在下一篇blog写。这里采用Boolean数据模型,即用户是没有对喜欢的物品进行初始打分的,我们在程序中默认都为1。

先看看整个MapReduce的数据流向图:

wKioL1Qf8tnQZNJUAALGyzmu93M351.jpg

具体代码实现:HadoopUtil

package com.util;import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.mahout.common.iterator.sequencefile.PathType;
import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterator;
import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public final class HadoopUtil {private static final Logger log = LoggerFactory.getLogger(HadoopUtil.class);private HadoopUtil() { }public static Job prepareJob(String jobName,String[] inputPath,String outputPath,Class<? extends InputFormat> inputFormat,Class<? extends Mapper> mapper,Class<? extends Writable> mapperKey,Class<? extends Writable> mapperValue,Class<? extends OutputFormat> outputFormat, Configuration conf) throws IOException {Job job = new Job(new Configuration(conf)); 
job.setJobName(jobName);
Configuration jobConf = job.getConfiguration();if (mapper.equals(Mapper.class)) {throw new IllegalStateException("Can't figure out the user class jar file from mapper/reducer");
}
job.setJarByClass(mapper);job.setInputFormatClass(inputFormat);
job.setInputFormatClass(inputFormat);
StringBuilder inputPathsStringBuilder =new StringBuilder();
for(String p : inputPath){
inputPathsStringBuilder.append(",").append(p);
}
inputPathsStringBuilder.deleteCharAt(0);
jobConf.set("mapred.input.dir", inputPathsStringBuilder.toString());job.setMapperClass(mapper);
job.setMapOutputKeyClass(mapperKey);
job.setMapOutputValueClass(mapperValue);
job.setOutputKeyClass(mapperKey);
job.setOutputValueClass(mapperValue);
jobConf.setBoolean("mapred.compress.map.output", true);
job.setNumReduceTasks(0);job.setOutputFormatClass(outputFormat);
jobConf.set("mapred.output.dir", outputPath);return job;}public static Job prepareJob(String jobName,String[] inputPath,String outputPath,Class<? extends InputFormat> inputFormat,Class<? extends Mapper> mapper,Class<? extends Writable> mapperKey,Class<? extends Writable> mapperValue, Class<? extends Reducer> reducer,Class<? extends Writable> reducerKey,Class<? extends Writable> reducerValue,Class<? extends OutputFormat> outputFormat,Configuration conf) throws IOException {Job job = new Job(new Configuration(conf));
job.setJobName(jobName);
Configuration jobConf = job.getConfiguration();if (reducer.equals(Reducer.class)) {if (mapper.equals(Mapper.class)) {
throw new IllegalStateException("Can't figure out the user class jar file from mapper/reducer");}job.setJarByClass(mapper);
} else {job.setJarByClass(reducer);
}job.setInputFormatClass(inputFormat);
StringBuilder inputPathsStringBuilder =new StringBuilder();
for(String p : inputPath){
inputPathsStringBuilder.append(",").append(p);
}
inputPathsStringBuilder.deleteCharAt(0);
jobConf.set("mapred.input.dir", inputPathsStringBuilder.toString());job.setMapperClass(mapper);
if (mapperKey != null) {job.setMapOutputKeyClass(mapperKey);
}
if (mapperValue != null) {job.setMapOutputValueClass(mapperValue);
}jobConf.setBoolean("mapred.compress.map.output", true);job.setReducerClass(reducer);
job.setOutputKeyClass(reducerKey);
job.setOutputValueClass(reducerValue);job.setOutputFormatClass(outputFormat);
jobConf.set("mapred.output.dir", outputPath);return job;}public static Job prepareJob(String jobName, String[] inputPath,
String outputPath, Class<? extends InputFormat> inputFormat,
Class<? extends Mapper> mapper,
Class<? extends Writable> mapperKey,
Class<? extends Writable> mapperValue,
Class<? extends Reducer> combiner,
Class<? extends Reducer> reducer,
Class<? extends Writable> reducerKey,
Class<? extends Writable> reducerValue,
Class<? extends OutputFormat> outputFormat, Configuration conf)
throws IOException {Job job = new Job(new Configuration(conf));
job.setJobName(jobName);
Configuration jobConf = job.getConfiguration();if (reducer.equals(Reducer.class)) {
if (mapper.equals(Mapper.class)) {
throw new IllegalStateException(
"Can't figure out the user class jar file from mapper/reducer");
}
job.setJarByClass(mapper);
} else {
job.setJarByClass(reducer);
}job.setInputFormatClass(inputFormat);
StringBuilder inputPathsStringBuilder = new StringBuilder();
for (String p : inputPath) {
inputPathsStringBuilder.append(",").append(p);
}
inputPathsStringBuilder.deleteCharAt(0);
jobConf.set("mapred.input.dir", inputPathsStringBuilder.toString());job.setMapperClass(mapper);
if (mapperKey != null) {
job.setMapOutputKeyClass(mapperKey);
}
if (mapperValue != null) {
job.setMapOutputValueClass(mapperValue);
}jobConf.setBoolean("mapred.compress.map.output", true);job.setCombinerClass(combiner);job.setReducerClass(reducer);
job.setOutputKeyClass(reducerKey);
job.setOutputValueClass(reducerValue);job.setOutputFormatClass(outputFormat);
jobConf.set("mapred.output.dir", outputPath);return job;
}public static String getCustomJobName(String className, JobContext job,Class<? extends Mapper> mapper,Class<? extends Reducer> reducer) {
StringBuilder name = new StringBuilder(100);
String customJobName = job.getJobName();
if (customJobName == null || customJobName.trim().isEmpty()) {name.append(className);
} else {name.append(customJobName);
}
name.append('-').append(mapper.getSimpleName());
name.append('-').append(reducer.getSimpleName());
return name.toString();}public static void delete(Configuration conf, Iterable<Path> paths) throws IOException {
if (conf == null) {conf = new Configuration();
}
for (Path path : paths) {FileSystem fs = path.getFileSystem(conf);if (fs.exists(path)) {
log.info("Deleting {}", path);
fs.delete(path, true);}
}}public static void delete(Configuration conf, Path... paths) throws IOException {
delete(conf, Arrays.asList(paths));}public static long countRecords(Path path, Configuration conf) throws IOException {
long count = 0;
Iterator<?> iterator = new SequenceFileValueIterator<Writable>(path, true, conf);
while (iterator.hasNext()) {iterator.next();count++;
}
return count;}public static long countRecords(Path path, PathType pt, PathFilter filter, Configuration conf) throws IOException {
long count = 0;
Iterator<?> iterator = new SequenceFileDirValueIterator<Writable>(path, pt, filter, null, true, conf);
while (iterator.hasNext()) {iterator.next();count++;
}
return count;}
}

先看看写的工具类:

第一步:处理原始输入数据

处理原始数据的SourceDataToItemPrefsJob作业的mapper:SourceDataToItemPrefsMapper

package com.mapper;import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.mahout.math.VarLongWritable;/*** mapper输入格式:userID:itemID1 itemID2 itemID3....* mapper输出格式:<userID,itemID>* @author 曾昭正*/
public class SourceDataToItemPrefsMapper extends Mapper<LongWritable, Text, VarLongWritable, VarLongWritable>{
//private static final Logger logger = LoggerFactory.getLogger(SourceDataToItemPrefsMapper.class);
private static final Pattern NUMBERS = Pattern.compile("(\\d+)");
private String line = null;@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {line = value.toString();if(line == null) return ;
// logger.info("line:"+line);Matcher matcher = NUMBERS.matcher(line);matcher.find();//寻找第一个分组,即userIDVarLongWritable userID = new VarLongWritable(Long.parseLong(matcher.group()));//这个类型是在mahout中独立进行封装的VarLongWritable itemID = new VarLongWritable();while(matcher.find()){itemID.set(Long.parseLong(matcher.group()));
//	 logger.info(userID + " " + itemID);context.write(userID, itemID);}
}
}

处理原始数据的SourceDataToItemPrefsJob作业的reducer:SourceDataToItemPrefsMapper

package com.reducer;import java.io.IOException;import org.apache.hadoop.mapreduce.Reducer;
import org.apache.mahout.math.RandomAccessSparseVector;
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** reducer输入:<userID,Iterable<itemID>>* reducer输出:<userID,VecotrWriable<index=itemID,valuce=pres>....>* @author 曾昭正*/
public class SourceDataToUserVectorReducer extends Reducer<VarLongWritable, VarLongWritable, VarLongWritable, VectorWritable>{
private static final Logger logger = LoggerFactory.getLogger(SourceDataToUserVectorReducer.class);
@Override
protected void reduce(VarLongWritable userID, Iterable<VarLongWritable> itemPrefs,Context context)
throws IOException, InterruptedException {
/***  DenseVector,它的实现就是一个浮点数数组,对向量里所有域都进行存储,适合用于存储密集向量。
RandomAccessSparseVector 基于浮点数的 HashMap 实现的,key 是整形 (int) 类型,value 是浮点数 (double) 类型,它只存储向量中不为空的值,并提供随机访问。
SequentialAccessVector 实现为整形 (int) 类型和浮点数 (double) 类型的并行数组,它也只存储向量中不为空的值,但只提供顺序访问。
用户可以根据自己算法的需求选择合适的向量实现类,如果算法需要很多随机访问,应该选择 DenseVector 或者 RandomAccessSparseVector,如果大部分都是顺序访问,SequentialAccessVector 的效果应该更好。
介绍了向量的实现,下面我们看看如何将现有的数据建模成向量,术语就是“如何对数据进行向量化”,以便采用 Mahout 的各种高效的聚类算法。*/
Vector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
for(VarLongWritable itemPref : itemPrefs){
userVector.set((int)itemPref.get(), 1.0f);//RandomAccessSparseVector.set(index,value),用户偏好类型为boolean类型,将偏好值默认都为1.0f
}
logger.info(userID+" "+new VectorWritable(userVector));
context.write(userID, new VectorWritable(userVector));
}
}

第二步:将SourceDataToItemPrefsJob作业的reduce输出结果组合成共现矩阵

UserVectorToCooccurrenceJob作业的mapper:UserVectorToCooccurrenceMapper

package com.mapper;import java.io.IOException;
import java.util.Iterator;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;/*** mapper输入:<userID,VecotrWriable<index=itemID,valuce=pres>....>* mapper输出:<itemID,itemID>(共现物品id对)* @author 曾昭正*/
public class UserVectorToCooccurrenceMapper extends Mapper<VarLongWritable, VectorWritable, IntWritable, IntWritable>{
@Override
protected void map(VarLongWritable userID, VectorWritable userVector,Context context)
throws IOException, InterruptedException {
Iterator<Vector.Element> it = userVector.get().nonZeroes().iterator();//过滤掉非空元素
while(it.hasNext()){
int index1 = it.next().index();
Iterator<Vector.Element> it2 = userVector.get().nonZeroes().iterator();
while(it2.hasNext()){
int index2  = it2.next().index();
context.write(new IntWritable(index1), new IntWritable(index2));
}
}}
}

UserVectorToCooccurrenceJob作业的reducer:UserVectorToCoocurrenceReducer

package com.reducer;import java.io.IOException;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable;
import org.apache.mahout.math.RandomAccessSparseVector;
import org.apache.mahout.math.Vector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/*** reducer输入:<itemID,Iterable<itemIDs>>* reducer输出:<mainItemID,Vector<coocItemID,coocTime(共现次数)>....>* @author 曾昭正*/
public class UserVectorToCoocurrenceReducer extends Reducer<IntWritable, IntWritable, IntWritable, VectorOrPrefWritable>{
private static final Logger logger = LoggerFactory.getLogger(UserVectorToCoocurrenceReducer.class);
@Override
protected void reduce(IntWritable mainItemID, Iterable<IntWritable> coocItemIDs,Context context)
throws IOException, InterruptedException {
Vector coocItemIDVectorRow = new RandomAccessSparseVector(Integer.MAX_VALUE,100);
for(IntWritable coocItem : coocItemIDs){
int itemCoocTime = coocItem.get();
coocItemIDVectorRow.set(itemCoocTime,coocItemIDVectorRow.get(itemCoocTime)+1.0);//将共现次数累加
}
logger.info(mainItemID +" "+new VectorOrPrefWritable(coocItemIDVectorRow));
context.write(mainItemID, new VectorOrPrefWritable(coocItemIDVectorRow));//记录mainItemID的完整共现关系
}
}

第三步:将SourceDataToItemPrefsJob作业的reduce输出结果进行分割

userVecotrSplitJob作业的mapper:UserVecotrSplitMapper

package com.mapper;import java.io.IOException;
import java.util.Iterator;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable;
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.Vector.Element;
import org.apache.mahout.math.VectorWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** 将用户向量分割,以便和物品的共现向量进行合并* mapper输入:<userID,Vector<itemIDIndex,preferenceValuce>....>* reducer输出:<itemID,Vecotor<userID,preferenceValuce>....> * @author 曾昭正*/
public class UserVecotrSplitMapper extends Mapper<VarLongWritable, VectorWritable, IntWritable, VectorOrPrefWritable>{
private static final Logger logger = LoggerFactory.getLogger(UserVecotrSplitMapper.class);
@Override
protected void map(VarLongWritable userIDWritable, VectorWritable value,Context context)
throws IOException, InterruptedException {
IntWritable itemIDIndex = new IntWritable();
long userID = userIDWritable.get();
Vector userVector = value.get();
Iterator<Element> it = userVector.nonZeroes().iterator();//只取非空用户向量
while(it.hasNext()){
Element e = it.next();
int itemID = e.index();
itemIDIndex.set(itemID);
float preferenceValuce = (float) e.get();
logger.info(itemIDIndex +" "+new VectorOrPrefWritable(userID,preferenceValuce));
context.write(itemIDIndex, new VectorOrPrefWritable(userID,preferenceValuce));
}}
}

第四步:将userVecotrSplitJob和UserVectorToCooccurrenceJob作业的输出结果合并

combineUserVectorAndCoocMatrixJob作业的mapper:CombineUserVectorAndCoocMatrixMapper

package com.mapper;import java.io.IOException;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable;/*** 将共现矩阵和分割后的用户向量进行合并,以便计算部分的推荐向量* 这个mapper其实没有什么逻辑处理功能,只是将数据按照输入格式输出* 注意:这里的mapper输入为共现矩阵和分割后的用户向量计算过程中的共同输出的2个目录* mapper输入:<itemID,Vecotor<userID,preferenceValuce>> or <itemID,Vecotor<coocItemID,coocTimes>>* mapper输出:<itemID,Vecotor<userID,preferenceValuce>/Vecotor<coocItemID,coocTimes>>* @author 曾昭正*/
public class CombineUserVectorAndCoocMatrixMapper extends Mapper<IntWritable, VectorOrPrefWritable, IntWritable, VectorOrPrefWritable>{
@Override
protected void map(IntWritable itemID, VectorOrPrefWritable value,Context context)
throws IOException, InterruptedException {
context.write(itemID, value);
}}

combineUserVectorAndCoocMatrixJob作业的CombineUserVectorAndCoocMatrixReducer

package com.reducer;import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.mahout.cf.taste.hadoop.item.VectorAndPrefsWritable;
import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable;
import org.apache.mahout.math.Vector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** 将共现矩阵和分割后的用户向量进行合并,以便计算部分的推荐向量* @author 曾昭正*/
public class CombineUserVectorAndCoocMatrixReducer extends Reducer<IntWritable, VectorOrPrefWritable, IntWritable, VectorAndPrefsWritable>{
private static final Logger logger = LoggerFactory.getLogger(CombineUserVectorAndCoocMatrixReducer.class);
@Override
protected void reduce(IntWritable itemID, Iterable<VectorOrPrefWritable> values,Context context)
throws IOException, InterruptedException {
VectorAndPrefsWritable vectorAndPrefsWritable = new VectorAndPrefsWritable();
List<Long> userIDs = new ArrayList<Long>();
List<Float> preferenceValues = new ArrayList<Float>();
Vector coocVector = null;
Vector coocVectorTemp = null;
Iterator<VectorOrPrefWritable> it = values.iterator();
while(it.hasNext()){
VectorOrPrefWritable e = it.next();
coocVectorTemp = e.getVector() ;
if(coocVectorTemp == null){
userIDs.add(e.getUserID());
preferenceValues.add(e.getValue());
}else{
coocVector = coocVectorTemp;
}
}
if(coocVector != null){
//这个需要注意,根据共现矩阵的计算reduce聚合之后,到了这个一个Reudce分组就有且只有一个vecotr(即共现矩阵的一列或者一行,这里行和列是一样的)了。
vectorAndPrefsWritable.set(coocVector, userIDs, preferenceValues);
logger.info(itemID+" "+vectorAndPrefsWritable);
context.write(itemID, vectorAndPrefsWritable);
}
}
}

第五步:将combineUserVectorAndCoocMatrixJob作业的输出结果生成推荐列表

caclPartialRecomUserVectorJob作业的mapper:CaclPartialRecomUserVectorMapper

package com.mapper;import java.io.IOException;
import java.util.List;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.mahout.cf.taste.hadoop.item.VectorAndPrefsWritable;
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** 计算部分用户推荐向量* @author 曾昭正*/
public class CaclPartialRecomUserVectorMapper extends Mapper<IntWritable,VectorAndPrefsWritable, VarLongWritable, VectorWritable>{
private static final Logger logger = LoggerFactory.getLogger(CaclPartialRecomUserVectorMapper.class);
@Override
protected void map(IntWritable itemID, VectorAndPrefsWritable values,Context context)
throws IOException, InterruptedException {
Vector coocVectorColumn = values.getVector();
List<Long> userIDs = values.getUserIDs();
List<Float> preferenceValues = values.getValues();
for(int i = 0; i< userIDs.size(); i++){
long userID = userIDs.get(i);
float preferenceValue = preferenceValues.get(i);
logger.info("userID:" + userID);
logger.info("preferenceValue:"+preferenceValue);
//将共现矩阵中userID对应的列相乘,算出部分用户对应的推荐列表分数
Vector preferenceParScores = coocVectorColumn.times(preferenceValue);
context.write(new VarLongWritable(userID), new VectorWritable(preferenceParScores));
}
}
}

caclPartialRecomUserVectorJob作业的combiner:ParRecomUserVectorCombiner

package com.reducer;import java.io.IOException;import org.apache.hadoop.mapreduce.Reducer;
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/*** 将计算部分用户推荐向量的结果进行合并,将userID对应的贡现向量的分值进行相加(注意:这个只是将一个map的输出进行合并,所以这个也是只部分结果)* @author 曾昭正*/
public class ParRecomUserVectorCombiner extends Reducer<VarLongWritable, VectorWritable, VarLongWritable, VectorWritable>{
private static final Logger logger = LoggerFactory.getLogger(ParRecomUserVectorCombiner.class);
@Override
protected void reduce(VarLongWritable userID, Iterable<VectorWritable> coocVectorColunms,Context context)
throws IOException, InterruptedException {Vector vectorColunms = null;for(VectorWritable  coocVectorColunm : coocVectorColunms){
vectorColunms = vectorColunms == null ? coocVectorColunm.get() : vectorColunms.plus(coocVectorColunm.get());
}
logger.info(userID +" " + new VectorWritable(vectorColunms));
context.write(userID, new VectorWritable(vectorColunms));
}
}

caclPartialRecomUserVectorJob作业的reducer:MergeAndGenerateRecommendReducer

package com.reducer;import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;import org.apache.hadoop.mapreduce.Reducer;
import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
import org.apache.mahout.cf.taste.impl.recommender.ByValueRecommendedItemComparator;
import org.apache.mahout.cf.taste.impl.recommender.GenericRecommendedItem;
import org.apache.mahout.cf.taste.recommender.RecommendedItem;
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.Vector.Element;
import org.apache.mahout.math.VectorWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** 合并所有已经评分的共现矩阵* @author 曾昭正*/
public class MergeAndGenerateRecommendReducer extends Reducer<VarLongWritable, VectorWritable, VarLongWritable, RecommendedItemsWritable>{
private static final Logger logger = LoggerFactory.getLogger(MergeAndGenerateRecommendReducer.class);
private int recommendationsPerUser;
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
recommendationsPerUser = context.getConfiguration().getInt("recomandItems.recommendationsPerUser", 5);
}
@Override
protected void reduce(VarLongWritable userID, Iterable<VectorWritable> cooVectorColumn,Context context)
throws IOException, InterruptedException {
//分数求和合并
Vector recommdVector = null;
for(VectorWritable vector : cooVectorColumn){
recommdVector = recommdVector == null ? vector.get() : recommdVector.plus(vector.get());
}
//对推荐向量进行排序,为每个UserID找出topM个推荐选项(默认找出5个),此队列按照item对应的分数进行排序
//注意下:PriorityQueue队列的头一定是最小的元素,另外这个队列容量增加1是为了为添加更大的新元素时使用的临时空间
Queue<RecommendedItem> topItems = new PriorityQueue<RecommendedItem>(recommendationsPerUser+1, ByValueRecommendedItemComparator.getInstance());Iterator<Element> it = recommdVector.nonZeroes().iterator();
while(it.hasNext()){
Element e = it.next();
int itemID = e.index();
float preValue = (float) e.get();
//当队列容量小于推荐个数,往队列中填item和分数
if(topItems.size() < recommendationsPerUser){
topItems.add(new GenericRecommendedItem(itemID, preValue));
}
//当前item对应的分数比队列中的item的最小分数大,则将队列头原始(即最小元素)弹出,并且将当前item:分数加入队列
else if(preValue > topItems.peek().getValue()){
topItems.add(new GenericRecommendedItem(itemID, preValue));
//弹出头元素(最小元素)
topItems.poll();
}
}//重新调整队列的元素的顺序
List<RecommendedItem> recommdations = new ArrayList<RecommendedItem>(topItems.size());
recommdations.addAll(topItems);//将队列中所有元素添加即将排序的集合
Collections.sort(recommdations,ByValueRecommendedItemComparator.getInstance());//排序//输出推荐向量信息
logger.info(userID+" "+ new RecommendedItemsWritable(recommdations));
context.write(userID, new RecommendedItemsWritable(recommdations));}
}

第六步:组装各个作业关系

PackageRecomendJob

package com.mapreduceMain;import java.io.IOException;
import java.net.URI;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
import org.apache.mahout.cf.taste.hadoop.item.VectorAndPrefsWritable;
import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable;
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.VectorWritable;
import com.mapper.CaclPartialRecomUserVectorMapper;
import com.mapper.CombineUserVectorAndCoocMatrixMapper;
import com.mapper.UserVecotrSplitMapper;
import com.mapper.UserVectorToCooccurrenceMapper;
import com.mapper.SourceDataToItemPrefsMapper;
import com.reducer.CombineUserVectorAndCoocMatrixReducer;
import com.reducer.MergeAndGenerateRecommendReducer;
import com.reducer.ParRecomUserVectorCombiner;
import com.reducer.UserVectorToCoocurrenceReducer;
import com.reducer.SourceDataToUserVectorReducer;
import com.util.HadoopUtil;/*** 组装各个作业组件,完成推荐作业* @author 曾昭正*/
public class PackageRecomendJob extends Configured implements Tool{
String[] dataSourceInputPath = {"/user/hadoop/z.zeng/distruteItemCF/dataSourceInput"};
String[] uesrVectorOutput = {"/user/hadoop/z.zeng/distruteItemCF/uesrVectorOutput/"};
String[] userVectorSpliltOutPut = {"/user/hadoop/z.zeng/distruteItemCF/userVectorSpliltOutPut"};
String[] cooccurrenceMatrixOuptPath = {"/user/hadoop/z.zeng/distruteItemCF/CooccurrenceMatrixOuptPath"};
String[] combineUserVectorAndCoocMatrixOutPutPath = {"/user/hadoop/z.zeng/distruteItemCF/combineUserVectorAndCoocMatrixOutPutPath"};
String[] caclPartialRecomUserVectorOutPutPath = {"/user/hadoop/z.zeng/distruteItemCF/CaclPartialRecomUserVectorOutPutPath"};protected void setup(Configuration configuration)
throws IOException, InterruptedException {
FileSystem hdfs = FileSystem.get(URI.create("hdfs://cluster-master"), configuration);
Path p1 = new Path(uesrVectorOutput[0]);
Path p2 = new Path(userVectorSpliltOutPut[0]);
Path p3 = new Path(cooccurrenceMatrixOuptPath[0]);
Path p4 = new Path(combineUserVectorAndCoocMatrixOutPutPath[0]);
Path p5 = new Path(caclPartialRecomUserVectorOutPutPath[0]);if (hdfs.exists(p1)) {
hdfs.delete(p1, true);
} 
if (hdfs.exists(p2)) {
hdfs.delete(p2, true);
} 
if (hdfs.exists(p3)) {
hdfs.delete(p3, true);
} 
if (hdfs.exists(p4)) {
hdfs.delete(p4, true);
} 
if (hdfs.exists(p5)) {
hdfs.delete(p5, true);
}
}
@Override
public int run(String[] args) throws Exception {Configuration conf=getConf(); //获得配置文件对象setup(conf);//	DistributedCache.addArchiveToClassPath(new Path("/user/hadoop/z.zeng/distruteItemCF/lib"), conf);//配置计算用户向量作业Job wikipediaToItemPrefsJob = HadoopUtil.prepareJob("WikipediaToItemPrefsJob",dataSourceInputPath, uesrVectorOutput[0], TextInputFormat.class, SourceDataToItemPrefsMapper.class, VarLongWritable.class, VarLongWritable.class, SourceDataToUserVectorReducer.class, VarLongWritable.class, VectorWritable.class, SequenceFileOutputFormat.class, conf);//配置计算共现向量作业Job userVectorToCooccurrenceJob = HadoopUtil.prepareJob("UserVectorToCooccurrenceJob",uesrVectorOutput, cooccurrenceMatrixOuptPath[0], SequenceFileInputFormat.class, UserVectorToCooccurrenceMapper.class, IntWritable.class, IntWritable.class, UserVectorToCoocurrenceReducer.class, IntWritable.class, VectorOrPrefWritable.class, SequenceFileOutputFormat.class, conf);//配置分割用户向量作业Job userVecotrSplitJob = HadoopUtil.prepareJob("userVecotrSplitJob",uesrVectorOutput, userVectorSpliltOutPut[0], SequenceFileInputFormat.class, UserVecotrSplitMapper.class, IntWritable.class, VectorOrPrefWritable.class, SequenceFileOutputFormat.class, conf);//合并共现向量和分割之后的用户向量作业//这个主意要将分割用户向量和共现向量的输出结果一起作为输入String[] combineUserVectorAndCoocMatrixIutPutPath = {cooccurrenceMatrixOuptPath[0],userVectorSpliltOutPut[0]};Job combineUserVectorAndCoocMatrixJob = HadoopUtil.prepareJob("combineUserVectorAndCoocMatrixJob",combineUserVectorAndCoocMatrixIutPutPath,combineUserVectorAndCoocMatrixOutPutPath[0], SequenceFileInputFormat.class, CombineUserVectorAndCoocMatrixMapper.class, IntWritable.class, VectorOrPrefWritable.class, CombineUserVectorAndCoocMatrixReducer.class, IntWritable.class, VectorAndPrefsWritable.class, SequenceFileOutputFormat.class, conf);//计算用户推荐向量Job caclPartialRecomUserVectorJob= HadoopUtil.prepareJob("caclPartialRecomUserVectorJob",combineUserVectorAndCoocMatrixOutPutPath,caclPartialRecomUserVectorOutPutPath[0], SequenceFileInputFormat.class, CaclPartialRecomUserVectorMapper.class, VarLongWritable.class, VectorWritable.class, ParRecomUserVectorCombiner.class,//为map设置combiner减少网络IOMergeAndGenerateRecommendReducer.class, VarLongWritable.class, RecommendedItemsWritable.class, TextOutputFormat.class, conf);//串联各个jobif(wikipediaToItemPrefsJob.waitForCompletion(true)){if(userVectorToCooccurrenceJob.waitForCompletion(true)){if(userVecotrSplitJob.waitForCompletion(true)){if(combineUserVectorAndCoocMatrixJob.waitForCompletion(true)){int rs = caclPartialRecomUserVectorJob.waitForCompletion(true) ? 1 :0;return rs;}else{throw new Exception("合并共现向量和分割之后的用户向量作业失败!!");}}else{throw new Exception("分割用户向量作业失败!!");}}else{throw new Exception("计算共现向量作业失败!!");}}else{throw new Exception("计算用户向量作业失败!!");}
}
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
try {
int returnCode =  ToolRunner.run(new PackageRecomendJob(),args);
System.exit(returnCode);
} catch (Exception e) {
}
}}

五、总结

本blog主要说了下itemBase推荐算法的一些概念,以及如何多现有数据进行建模。其中对共现矩阵方式的推荐用MapReduce结合Mahout的内置数据类型进行了实现。写完这篇blog和对算法实现完毕后,发现Mapreduce编程虽然数据模型非常简单,只有2个过程:数据的分散与合并,但是在分散与合并的过程中可以使用自定义的各种数据组合类型使其能够完成很多复杂的功能。

参考文献:《Mahout in action》、《推荐引擎实践》

 

来源: <http://www.tuicool.com/articles/BZVBRz>

 

来自为知笔记(Wiz)



转载于:https://www.cnblogs.com/baixl/p/4165712.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/492689.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Waymo十周年:多尔戈夫讲述从被嘲笑到硕果累累

来源&#xff1a;TechCrunch 编译&#xff1a;网易智能十年前&#xff0c;大约十几名工程师聚集在谷歌位于查尔斯顿路上的山景城&#xff0c;为"司机项目"献力&#xff0c;这是该科技巨头的“X工厂”旗下的一个秘密项目。这个司机项目俗称“谷歌自动驾驶汽车项目”&a…

中国AI登上Nature子刊:看病历分析儿科疾病,准确率90%,超人类医师

铜灵 发自 凹非寺量子位 出品 | 公众号 QbitAIAI大夫的能力又精进了。以前的AI要想要辅助人类诊断&#xff0c;得先学会输入大量带标注的医学影像训练模型。现在不用了&#xff0c;AI只需读一读电子文字简历&#xff0c;就能具备病情分析能力。有产品有真相。最近&#xff0c;A…

当医疗遇上人工智能,你不可不知的未来医疗5大趋势

2015年&#xff0c;一部《超能陆战队》戳中了很多人的泪点&#xff0c;一个叫做“大白”的机器人更是萌化了无数人。在电影中&#xff0c;大白是一个融合人工智能、医疗救护、战斗能力的机器人&#xff0c;最大的任务就是守护。从医疗救护的角度来说&#xff0c;大白可以通过摄…

QT事件过滤器eventFilter函数

Qt的事件模型一个强大的功能是一个QObject对象能够监视发送其他QObject对象的事件&#xff0c;在事件到达之前对其进行处理。 假设我们有一个CustomerInfoDialog控件&#xff0c;由一些QLineEdit控件组成。我们希望使用Space键得到下一个QLineEdit的输入焦点。一个最直接的方法…

信息哲学给哲学带来根本性革命了吗

来源&#xff1a;中国科学报 摘要&#xff1a;信息哲学乃是区别于所有其他哲学的一种元哲学或最高哲学。由于信息哲学首先是在哲学的最高范式&#xff08;“存在领域的划分”&#xff09;层面引发的变革&#xff0c;所以信息哲学实现了人类哲学的第一次根本性转向&#xff0c;从…

蜜蜂会算术吗

来源&#xff1a;中国科学报大脑袋或许并不是做数学题所必需的。蜜蜂通过了一项可能要求其进行加减的算术测试&#xff0c;尽管有人质疑这是否是真的。在测试中&#xff0c;研究人员首先向蜜蜂展示了含有1~5种形状的图片。图形颜色全部是蓝色或黄色&#xff0c;蓝色代表“加1”…

人工智能:主导下一轮科技创新红利

来源&#xff1a;中信证券分析师&#xff1a;许英博、陈俊云前言&#xff1a;人工智能(AI)将接棒移动互联网&#xff0c;成为下一轮科技创新红利的主要驱动力。透过丰富的数据采集(互联网和IoT)、更快的数据传输(5G)、更强大的数据运算处理(AI)&#xff0c;科技企业和传统企业将…

AI药物全球100领军人物:43%来自学界 美英顶级AI专家人数最多

来源&#xff1a;网易智能近日&#xff0c;深度知识分析公司Deep Knowledge Analytics从最初的500名优秀候选人中&#xff0c;挑选出了“药物发现和先进医疗领域100大AI领军人物”。除非有新的“AI寒冬”来袭&#xff0c;并席卷科学探索领域&#xff0c;否则这100位科学家的工作…

机器学习算法集锦:从贝叶斯到深度学习及各自优缺点

来源&#xff1a;图灵人工智能目录正则化算法&#xff08;Regularization Algorithms&#xff09;集成算法&#xff08;Ensemble Algorithms&#xff09;决策树算法&#xff08;Decision Tree Algorithm&#xff09;回归&#xff08;Regression&#xff09;人工神经网络&#x…

MongoDB学习笔记-06 数据库命令、固定集合、GridFS、javascript脚本

介绍MongoDB支持的一些高级功能&#xff1a; 数据库命令 固定大小的集合 GridFS存储大文件 MongoDB对服务端JavaScript的支持 数据库命令 命令的原理 MongoDB中的命令其实是作为一种特殊类型的查询来实现的&#xff0c;这些查询针对$cmd集合来执行。runCommand仅仅是接受命令文…

汽车与智能家居互联时代 语音控制很关键

来源&#xff1a; I CTA编译&#xff1a;网易智能 nariiy摘要&#xff1a;对于大多数人而言&#xff0c;最昂贵的两个物件是家和汽车。如今&#xff0c;技术将这二者联系在一起&#xff0c;并互为延伸。在不断加速发展的趋势中&#xff0c;智能家居和联网汽车正在融合&#xff…

[问题解决]同时显示多个Notification时PendingIntent的Intent被覆盖?

情况是这样的&#xff0c;使用NotificationManager触发多个Notification: Java代码 private Notification genreNotification(Context context, int icon, String tickerText, String title, String content, Intent intent){ Notification notification new Notifi…

【工业革命】第四次工业革命:自主经济的崛起

来源&#xff1a;产业智能官摘要&#xff1a;数据是新的资源&#xff0c;数据的处理和应用将带动第四次工业革命。随着大数据、云计算、物联网、人工智能、区块链等技术的崛起&#xff0c;很多人都说第四次工业革命即将到来。第四次工业革命到底指的是什么&#xff1f;应该如何…

2019年大数据发展将走向何方

来源&#xff1a;网络大数据近日&#xff0c;包括CCF(中国计算机学会)大数据专家委员会、IDC公司(Internet Data Center 互联网数据中心)和Gartner公司等多家国内外知名信息技术研究机构均发布报告&#xff0c;对2019年乃至未来若干年的大数据产业发展趋势做出预测&#xff0c;…

AI人必看!89页全网最全清华知识图谱报告

来源&#xff1a;智东西摘要&#xff1a;谷歌冲锋&#xff0c;淘宝猛追&#xff0c;这个AI秘密武器强在哪&#xff1f;知识图谱&#xff08;Knowledge Graph&#xff09;是人工智能的重要分支技术&#xff0c;它在2012年由谷歌提出&#xff0c;成为建立大规模知识的杀手锏应用&…

人机融合智能的现状与展望

来源&#xff1a;人机与认知实验室作者&#xff1a;刘伟 苌凯旋摘要&#xff1a;本文对人机融合智能的概念、应用、发展将面临的关键问题以及未来发展的方向进行简要介绍。1 引言1.1 现有人工智能的不足与挑战人工智能&#xff08;AI&#xff09;的概念于1956年的达特蒙斯学院暑…

Unity自带网络功能——NetworkView组件、Serialize、RPC

Unity拥有大量的第三方插件&#xff0c;专门提供了对网络功能的支持。可是&#xff0c;大部分开发人员第一次接触到的还是Unity自带的网络功能&#xff0c;也就是大家常常说到的Unity Networking API。这些API是借助于组件NetworkView发挥作用的&#xff0c;而它能够简化开发人…

Science:人工智能的发展离不开神经科学,先天结构或是下一个方向

来源&#xff1a;Science编译&#xff1a;机器之心摘要&#xff1a;人工智能从神经科学领域吸收了大量养分&#xff0c;并由此催生了深度学习和强化学习等智能处理方法。以色列魏茨曼科学研究学院计算机科学系教授 Shimon Ullman 相信神经科学还能为人工智能发展提供进一步的助…

Ubuntu20.04 及深度学习环境anaconda、cuda、cudnn、pytorch、paddle2.3安装记录

学习目标&#xff1a; Ubuntu20.04下装好torch、paddle深度学习环境。 选择的版本环境是 &#xff1a;最新的nvidia驱动、cuda 11.1 、cudnn v8.1.1&#xff0c;下面会说为啥这么选。 学习内容&#xff1a; 1. Ubuntu20.04仓库换源 本节参考Ubuntu 20.04 Linux更换源教程 2…

NSIS脚本语言安装与编译

NSIS是什么 当项目中需要把安装包发给客户时&#xff0c;不能直接发送release文件&#xff0c;需要把release文件压缩打包&#xff0c;这时就可以用到NSIS工具&#xff0c;NSIS工具可以进行包装&#xff0c;使用时需要下载&#xff1a; 编译环境&#xff1a;nullsoft scriptab…