使用Spark ALS模型 + Faiss向量检索实现用户扩量实例

1、通过ALS模型实现用户/商品Embedding的效果,获得其向量表示

准备训练数据, M = (U , I, R) 即 用户集U、商品集I、及评分数据R。

(1)商品集I的选择:可以根据业务目标确定商品候选集,比如TopK热度召回、或者流行度不高但在业务用户中区分度比较高的商品集等。个人建议量级控制在5W内,1W-2W左右比较合适,太大的话,用户产生行为的商品比较少,评分数据会非常的稀疏。

(2)用户集U的选择: 最好是粗召回策略确定的用户范围,因为ALS模型会生成所有U用户的特征向量表示,对于没有见过的用户u,没有其向量表示,其推荐也是冷启动策略。这里可以根据业务需要限制一个大范围,比如4000W-5000W的或大几百万的用户(从计算效率和内存使用上,个人建议500W内比较合适)。比如用户U定义为某些类目下购买人群、或者近期活跃人群等符合业务人群目标的潜在客户群。模型训练完之后,也是在这个用户集U中筛选出TopK相似的用户做推荐或扩量。

(3)评分数据R的选择:我们能采集到的大多是隐式反馈的数据,比如购买行为、浏览行为、收藏行为等。确定了U、I,确定了评分指标类型,就可以统计一段时间内,U对I的反馈数据R。数据量级大约在7亿条-10亿条,在模型参数设置合理的情况下,大约20-30分钟就可以训练完。

from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import expr, isnull""" ALS模型参数解读,和大小设置建议:
:paramrank=10, maxIter=10, regParam=0.1, numUserBlocks=10,numItemBlocks=10, implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item",seed=None, ratingCol="rating", nonnegative=False, checkpointInterval=10,intermediateStorageLevel="MEMORY_AND_DISK",finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", blockSize=4096NumBlocks分块数:分块是为了并行计算,默认为10。可以根据数据量级适当放大,比如20。 可以对 numUserBlocks\numItemBlocks 单独进行配置并行度 ,也可以通过setNumBlocks(30)一起设置。正则化参数:默认为1。 
秩rank:模型中隐藏因子的个数,默认是10。即特征向量的维度。
implicitPrefs:显式偏好信息-false,隐式偏好信息-true,默认false(显示) 。 电商场景中 购买、点击、分享,都是隐式反馈。
alpha:隐式反馈时的置信度参数,默认是1.0。只用于隐式的偏好数据。
setMaxIter(10):最大迭代次数,设置太大发生java.lang.StackOverflowError。建议范围 10 ~20。 超过20,比较容易失败。
coldStartStrategy: 预测时冷启动策略。默认是nan, 可以选择 drop。
"""ratings = spark.sql("""selectuser_acct, user_id, main_sku_id, item_id, ratingfrom dmb_dev.dmb_dev_als_model_rating_matrix""").repartition(3600)
train_data, test_data = ratings.randomSplit([0.9, 0.1], seed=4226)
train_data.cache()       
als = ALS() \.setImplicitPrefs(True) \.setAlpha(0.7) \.setMaxIter(20) \.setRank(10) \.setRegParam(0.01) \.setNumBlocks(30) \.setUserCol("user_id") \.setItemCol("item_id") \.setRatingCol("rating") \.setColdStartStrategy("drop")
print(als.explainParams())als_model = als.fit(train_data)
als_model.write().overwrite().save(model_save_path)# 训练集合所有用户U的向量表示
candidate_user_factors = als_model.userFactors.withColumnRenamed("id", "user_id")\.join(train_data.select("user_acct", "user_id").dropDuplicates(), ["user_id"])\.withColumn("bin_group", expr("round(rand(),1)"))
candidate_user_factors.cache()
candidate_user_factors.write.format("orc").mode("overwrite")\.saveAsTable("dev.dev_als_model_all_trained_users_factor_result")
train_data.unpersist()# query用户的向量表示
target_user_factors = spark.sql("""selectuser_acct, user_idfrom dev.dev_wdy_als_seed_users_tablegroup by user_acct, user_id""").join(candidate_user_factors, ["user_acct", "user_id"])
target_user_factors.cache()
target_user_factors.write.format("orc").mode("overwrite")\.saveAsTable("dev.dev_als_model_seed_users_factor")# 候选用户向量表示
search_user_factors = candidate_user_factors.join(target_user_factors,candidate_user_factors["user_acct"] == target_user_factors["user_acct"],"left_outer")\.where(isnull(target_user_factors["user_acct"]))\.select(candidate_user_factors["user_acct"], candidate_user_factors["user_id"],candidate_user_factors["features"], candidate_user_factors["bin_group"])
search_user_factors.write.format("orc").mode("overwrite")\.saveAsTable("dev.dev_als_model_candidate_users_factor")
candidate_user_factors.unpersist()
target_user_factors.unpersist()

2、通过Faiss快速实现向量TopK相似检索

如果没有装faiss,可以选择安装CPU/GPU版本, pip install faiss-cpu

关于faiss的使用说明,可以参考向量数据库入坑指南:聊聊来自元宇宙大厂 Meta 的相似度检索技术 Faiss - 知乎

 faiss来自facebook 开源 Meta Research · GitHub的github库为:GitHub - facebookresearch/faiss: A library for efficient similarity search and clustering of dense vectors.

根据业务需求的查询速度、精准度要求来选择合适的Faiss TopK向量查询方法。


# 判断 npy文件是否存在,不存在则执行以下操作;否则跳过此步骤,直接读取文件。
user_embedding = spark.sql("""select features[0],features[1],features[2],features[3],features[4],features[5],features[6],features[7],features[8],features[9] from dev.dev_als_model_candidate_users_factorwhere bin_group=0.1""").toPandas()# 量级500W内执行顺利,再大的量级容易内存溢出失败。
np.save("user_embedding_01.npy", np.array(user_embedding, order='C'))user_embedding = np.load("user_embedding_01.npy")
print("user_embedding data sample:", user_embedding[:3])
print("user embedding shape", user_embedding.shape)
dimension = user_embedding.shape[1]
nums_user = user_embedding.shape[0]faiss.normalize_L2(user_embedding)
index = faiss.IndexFlatIP(dimension)
index.add(user_embedding)
print("index is trained:", index.is_trained)
print("index n total:", index.ntotal)# 判断文件是否存在,如果存在则直接读取,否则先下载保存到本地。
## 这里k=30 或更大时,查询易失败。 k=20, 查询耗时久,但会成功,大约3小时。 k=10时,
k = 5
query1 = spark.sql("""select features[0],features[1],features[2],features[3],features[4],features[5],features[6],features[7],features[8],features[9] from dev.dev_als_model_seed_users_factor""").toPandas()
np.save("query.npy", np.array(query1, order='C'))
query = np.load("query.npy")
print("query shape:", query.shape)# 查询
t0 = time.time()
Deg, Ind = index.search(query, k)
t1 = time.time()
print("平均耗时 %7.3f min" % ((t1 - t0)/60))# 保存索引
faiss.write_index(index, "faiss_01.index")
np.save("Ind_01.npy", Ind)
np.save("Deg_01.npy", Deg)res = []
for i in range(query.shape[0]):q_vector = query[i]r_list = Ind[i]for j in range(len(r_list)):r_vector = user_embedding[r_list][j]sim = Deg[i][j]res.append(([float(v) for v in r_vector], float(sim)))res = spark.createDataFrame(res, ["recommend_vector", "similarity"]).repartition(10)
res.cache()res.write.format("orc").mode("overwrite")\.saveAsTable("dev.dev_als_model_recommend_vector_result")user_embedding = spark.sql("""select *from dev.dev_als_model_candidate_users_factorwhere bin_group=0.1""")
res.join(user_embedding, res["recommend_vector"] == user_embedding["features"])\.write.format("orc").mode("overwrite")\.saveAsTable("dev.dev_als_model_recommend_user_pin_result")

3、通过I2I2U 或者 I2U2U来获得用户扩量结果

上述实现的是U2U的扩量方法,使用的是User-Factor向量表示。第一个U来自于业务营销目标I下的历史已购人群。即这是一个I2U2U的扩量方法。 I (目标商品)---> U(历史购买) ---> U(TopK相似) 。

当然也可以通过使用Item-Factor向量表示,实现 I2I2U,即 I (目标商品)---> I(TopK相似) ---> U(历史购买) ,这样来做商品相似召回,实现用户的扩量。

基于实验效果,或历史数据的验证来选择使用哪种方法投产。

4、算法设计框架总结

可以看到,这个算法设计框架其实是 Embedding + Faiss ,即用户/商品的向量表示 + Faiss快速向量相似检索 的设计模式。

那么第一部分的ALS模型当然可以替换成任何一种可以效果更好的Embedding算法模型,比如BERT 、Transformer等深度学习模型。而第二部分Faiss的查询可以保持不动,只要替换查询数据源就可以了。当然也可以将其优化成GPU的,或更快速的查询方式,以满足线上业务的需求。

但整体的算法设计框架是不变的,Embedding向量化 + Faiss相似检索。

Done.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/31348.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Bytebase 2.5.0 - VCS 集成支持 Azure DevOps,支持达梦数据库

🚀 新功能 VCS 集成支持 Azure DevOps。研发版本支持达梦数据库。允许用户设置需要重新登录的频率。支持选择并导出数据库变更历史。新增 MySQL Schema 设计器。支持字段模板库。 🎄 改进 在 SQL 编辑器中,优化 MongoDB 的查询结果。优化 …

SQL-每日一题【1251. 平均售价】

题目 Table: Prices Table: UnitsSold 编写SQL查询以查找每种产品的平均售价。average_price 应该四舍五入到小数点后两位。 查询结果格式如下例所示: 解题思路 1.题目要求查询每种产品的平均售价。给出了两个表,我们用聚合查询来解决此问题。 2.首先我…

Samba(二)

问题 Rocky Linux使用smbclient访问win11的共享文件时提示 Error NT_STATUS_IO_TIMEOUT 分析 通过测试,发现关闭windows公用网络防火墙时,可正常显示服务器端所分享出来的所有资源;进一步发现单独放行防火墙进站规则中的文件和打印机共享&a…

20、stm32使用FMC驱动SDRAM(IS42S32800G-6BLI)

本文将使用安富莱的STM32H743XIH板子驱动SDRAM 引脚连接情况 一、CubeMx配置工程 1、开启调试口 2、开启外部高速时钟 配置时钟树 3、开启串口1 4、配置MPU 按照安富莱的例程配置: /* ********************************************************************…

什么是POP3协议?

POP3(Post Office Protocol Version 3)是一个用于从电子邮件服务器获取邮件的应用层协议。以下是关于POP3的详细解释: 基本操作:使用POP3,电子邮件客户端可以从邮件服务器上下载电子邮件,并将其保存在本地。…

【JPCS出版】第五届能源、电力与电网国际学术会议(ICEPG 2023)

第五届能源、电力与电网国际学术会议(ICEPG 2023) 2023 5th International Conference on Energy, Power and Grid 最近几年,不少代表委员把目光投向能源电力领域,对促进新能源发电产业健康发展、电力绿色低碳发展,提…

笙默考试管理系统-MyExamTest----codemirror(2)

笙默考试管理系统-MyExamTest----codemirror(2) 目录 一、 笙默考试管理系统-MyExamTest----codemirror 二、 笙默考试管理系统-MyExamTest----codemirror 三、 笙默考试管理系统-MyExamTest----codemirror 四、 笙默考试管理系统-MyExamTest---…

什么是 XSS 攻击?

概念 XSS 攻击指的是跨站脚本攻击,是一种代码注入攻击。攻击者通过在网站注入恶意脚本,使之在用户的浏览器上运行,从而盗取用户的信息如 cookie 等。 XSS 的本质是因为网站没有对恶意代码进行过滤,与正常的代码混合在一起了&…

cpu的架构

明天继续搞一下cache,还有后面的, 下面是cpu框架图 开始解释cpu 1.控制器 控制器又称为控制单元(Control Unit,简称CU),下面是控制器的组成 1.指令寄存器IR:是用来存放当前正在执行的的一条指令。当一条指令需要被执行时,先按…

【C语言】指针的进阶2

指针进阶 函数指针数组指向函数指针数组的指针回调函数指针和数组经典题目的解析 函数指针数组 数组是一个存放相同类型数据的存储空间,那我们已经学习了指针数组, 比如: int* arr[10];//数组的每个元素是int*那要把函数的地址存到一个数组…

无涯教程-Perl - getpwnam函数

描述 此函数基于EXPR指定的用户名,从/etc/passwd文件提取的列表context中返回字段列表。通常这样使用- ($name,$passwd,$uid,$gid,$quota,$comment,$gcos,$dir,$shell) getpwnam($user); 在标量context中,返回数字用户ID。如果尝试访问整个/etc/passwd文件,则应使用getpwent…

Lecoode有序数组的平方977

题目建议: 本题关键在于理解双指针思想 题目链接:力扣(LeetCode)官网 - 全球极客挚爱的技术成长平台 文章讲解:代码随想录 视频讲解: 双指针法经典题目 | LeetCode:977.有序数组的平方_哔哩…

什么是 CSRF 攻击?

概念 CSRF 攻击指的是跨站请求伪造攻击,攻击者诱导用户进入一个第三方网站,然后该网站向被攻击网站发送跨站请求。如果用户在被攻击网站中保存了登录状态,那么攻击者就可以利用这个登录状态,绕过后台的用户验证,冒充用…

【Linux】内核宏定义解释postcore_initcall,arch_initcall,subsys_initcall

postcore_initcall postcore_initcall(pcibus_class_init) 是一个宏,用于在Linux内核初始化过程中注册一个后期初始化函数。 这个宏的含义如下: postcore_initcall 是一个宏定义,用于指定注册的函数在内核初始化的哪个阶段执行。 pcibus_cl…

Spring Gateway+Security+OAuth2+RBAC 实现SSO统一认证平台

背景:新项目准备用SSO来整合之前多个项目的登录和权限,同时引入网关来做后续的服务限流之类的操作,所以搭建了下面这个系统雏形。 关键词:Spring Gateway, Spring Security, JWT, OAuth2, Nacos, Redis, Danymic datasource, Jav…

nginx代理服务、网关配置

一、nginx安装在服务器,本机运行服务,如何使用远程nginx代理本机服务? 打开nginx配置文件,位置:/usr/local/nginx/conf/nginx.conf,在http模块中添加以下server代码段: http {server {listen …

HTTP 请求方法详解

HTTP 请求方法详解 请求方法 请求方法(Request Methods)是在 HTTP 请求中用于指定对目标资源执行的操作类型。每个请求都需要指定一个请求方法,以告知服务器要执行的操作。 以下是一些常见的 HTTP 请求方法及其主要用途: GET&…

Stable Diffusion - 人物坐姿 (Sitting) 的提示词组合 与 LoRA 和 Embeddings 配置

欢迎关注我的CSDN:https://spike.blog.csdn.net/ 本文地址:https://spike.blog.csdn.net/article/details/132201960 拍摄人物坐姿时,需要注意: 选择一个舒适和自然的坐姿,符合个性和心情。可以坐在椅子、沙发、长凳、…

vue-pc端Message 消息提示防抖处理-短时间内只触发一次

前言 element提供的的message消息提示用确实方便直接代码就搞定。但是在特定的场景并不适用 点击某一个点位提示用户点击了或者websocket推送提示用户来信息了(这种提示用户场景) 如果有很多推送信息来,或者用户一直点击。这是屏幕会一直弹…

Java各个版本的switch表达式

文章目录 传统switch声明语句的弊端:JDK12中预览特性:JDK13**JDK17的预览特性:switch的模式匹配** 传统switch声明语句的弊端: 匹配是自上而下的,如果忘记写break,后面的case语句不论匹配与否都会执行&…