随着移动互联网的快速发展,UGC标签系统受到越来越多推荐应用,标签不但能反映用户的兴趣又能描述物品的本身特征。现有的标签推荐算法没有考虑用户的连续行为所产生的影响,于是人们提出了一种基于标签的个性化推荐算法。该算法将〈用户-标签-物品〉的三维关系拆分为〈用户-标签〉和〈标签-物品〉两个二维关系。 通过推荐标签集来匹配与其相对应的物品,为了提高推荐的精准率,该算法利用标签之间的影响,并基于匹配物品中所含标签间存在的关联关系对物品进行满意度建模,该模型是一种概率模型。在计算用户-标签和用户-物品之间的兴趣度和满意度时使用了协同过滤的思想来补全稀疏值。在公开的数据集中,与现有算法相比,该算法在精准率、召回率上均有明显提高。
因此,本文将主要探讨如何利用用户打标签的行为为其推荐物品。一个用户标签行为的数据集可以由一个三元组集合表示,其中每条样本(u,i,b)表示用户u为物品i打上了标签b。
1、标签作为特征/类别划分
我们可以将标签看做是LFM中的隐类(这里是显式的),用户u对物品i的兴趣度可表示为:
p ( u , i ) = ∑ b n u b n b i p(u,i)=\sum_{b}n_{ub}n_{bi} p(u,i)=b∑nubnbi
n u b n_{ub} nub:用户u打过标签b的次数,可以看做是用户u对b类物品的喜好程度;
n b i n_{bi} nbi:物品i被打过b标签的次数,可以看做物品i属于b类的”概率“值
仔细研究以上公式我们可以发现如下缺点:
- 热门标签给 n u b n_{ub} nub的权重很大;
- 热门物品给 n b i n_{bi} nbi的权重也很大;
借鉴TF-IDF的思路,可以通过惩罚用户或物品中热门标签的方式来优化以上公式:
p ( u , i ) = ∑ b n u b l o g ( 1 + n b u ) n b i l o g ( 1 + n i u ) p(u,i)=\sum_{b}\frac{n_{ub}}{log(1+n_b^u)}\frac{n_{bi}}{log(1+n_i^u)} p(u,i)=b∑log(1+nbu)nublog(1+niu)nbi
- 1 + n b u 1+n_b^u 1+nbu:标签b被多少个用户使用过;
- n i u n_i^u niu:物品i被多少个用户打过标签;
标签的稀疏性:对于新用户或新物品,集合B(u)⋂B(i)中的标签数量很少。
1.1 标签扩展
为提高推荐准确率,需要对标签集合进行进一步扩展,如果用户使用过某个标签,我们可以将与这个标签相似的其他标签也加入到用户标签集合中去。扩展标签的方式有很多,常用的有话题模型,这里介绍一种基于邻域的方法,核心是计算标签之间的相似度。可以用两个标签下物品的重合度来计算它们之间的相似度
2、 适用场景:
- 商城类:用户偏好商品UGC标签推荐;
- 社交类:用户喜欢UGC标签推荐;
3、冷启动问题解决方案:
- 客户注册时,勾选喜欢的标签,如是商城勾选喜欢的商品标签,如是社交内网站勾选喜欢社交类别;
- 热门商品填充;
4、基于商品标签推荐代码
/**商品数据*/
case class Product(productId:Int, name:String, imageUrl:String, categories:String, tags:String)
//mongodb 配置
case class MongoConfig(uri:String, db:String)
//定义标准推荐对象
case class Recommendation(productId:Int, score:Double)
//定义商品相似度列表
case class ProductRecs(productId:Int, recs:Seq[Recommendation])object ContentRecommender {//定义数据存储mongodb存储的表名val MONGODB_PRODUCT_COLLECTION = "product"//定义一个基于内容推荐商品列表val CONTENT_PRODUCT_RECS = "ContentBasedProductRecs"//总特征数量:本身词汇比较多,不设置会按照很大特征值(消耗内存高), 特征数量其实Hash桶数量,设置少会导致hash碰撞var NUM_FEATURES = 800//推荐商品数,通过商品最小相似度值控制val PRODUCT_SIMILAR_RATE = 0.4def main(args: Array[String]): Unit = {val config = Map("spark.cores" -> "local[*]","mongo.uri" -> "mongodb://localhost:27017/spark-recommender","mongo.db" -> "spark-recommender")val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("ContentRecommender")val spark = SparkSession.builder().config(sparkConf).getOrCreate()import spark.implicits._implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))//1、加载商品数据val productTagsDF = spark.read.option("uri", mongoConfig.uri).option("collection", MONGODB_PRODUCT_COLLECTION).format("com.mongodb.spark.sql").load().as[Product].map {item =>//通过对tags的|替换为 ' '空格,后面分词器默认按照空格分割val tags = item.tags.replace('|', ' ')(item.productId, item.name, tags)}.toDF("productId", "name", "tags").cache()//2、计算文档总分词数val dsTags:Dataset[Array[String]] = productTagsDF.select("tags").map(x => x.toString.split(" "))//使用flatMap合并Array[String]所有项val all_terms: Array[String] = dsTags.flatMap(iterArr => iterArr).distinct().collect()//所有词典(总词)特征数量NUM_FEATURES = all_terms.length//3: 用TF-IDF提取商品特征向量val tokenizer = new Tokenizer().setInputCol("tags").setOutputCol("words")val wordsDataDF = tokenizer.transform(productTagsDF)//4、计算TF:定义一个HashingTF工具,计算频次val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(NUM_FEATURES)val featurizedDataDF = hashingTF.transform(wordsDataDF)//5、定义一个IDF工具,计算TF-IDFval idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")//计算逆文件频率: 训练一个idf的模型val idfModel = idf.fit(featurizedDataDF)//将词频(TF)转换为TF-IDF向量: 得到增加新列feature的DF,(最终特征向量)val rescaledTfIDF = idfModel.transform(featurizedDataDF)//6、对数据进行转换,得到RDD的featuresval productFeatures = rescaledTfIDF.map {row => (row.getAs[Int]("productId"), row.getAs[SparseVector]("features").toArray)}.rdd.map {case (productId, features) => (productId, new DoubleMatrix(features))}//两两配对做笛卡尔积,计算余弦相似度val productRecsDF = productFeatures.cartesian(productFeatures).filter {case (a, b) => a._1 != b._1 }.map {case (a, b) =>val simScore = cosineSim(a._2, b._2) (a._1, (b._1, simScore))}.filter(_._2._2 > PRODUCT_SIMILAR_RATE) .groupByKey().map {case (productId, recs) =>val list = recs.toListProductRecs(productId, list.map(x => Recommendation(x._1, x._2)))}.toDF()storeDFInMongoDB(productRecsDF, CONTENT_PRODUCT_RECS)spark.stop()}/*** 存储信息到mongodb数据库** @param df* @param collectionName* @param mongoConfig*/def storeDFInMongoDB(df: DataFrame, collectionName: String)(implicit mongoConfig: MongoConfig): Unit = {df.write.option("uri", mongoConfig.uri).option("collection", collectionName).mode(SaveMode.Overwrite).format("com.mongodb.spark.sql").save()}/*** 计算两个向量集合的余弦相似度** @param product1 商品1的矩阵* @param product2 商品2的矩阵*/def cosineSim(product1: DoubleMatrix, product2: DoubleMatrix): Double = {product1.dot(product2) / (product1.norm2() * product2.norm2())}
}