spark基于HNSW向量检索

参考文档:https://talks.anghami.com/blazing-fast-approximate-nearest-neighbour-search-on-apache-spark-using-hnsw/
HNSW参数调优文档:https://github.com/nmslib/hnswlib/blob/master/ALGO_PARAMS.md

spark 运行HNSW向量检索分为以下三步
1 创建HNSW索引,并存储到磁盘
2 将存储的索引分发到每个executor
3 进行向量检索
使用HHSW构建索引,并使用spark进行分布式向量检索,1200万向量构建索引40分钟,向量检索10分钟完成(时间取决于m和ef的大小,本人m=30,ef=1000,不然总是报错m或者ef太小)如m=30,ef=1000 1200万构建索引20分钟,向量检索还是10分钟。

1 创建HNSW索引

输入为spark dataset格式数据,有id和features组成,features为Array[Float]形式向量


import com.stepstone.search.hnswlib.jna.{Index, SpaceName}
import org.apache.spark.SparkFiles
import org.apache.spark.sql.{Dataset, Encoder, SparkSession}
import java.nio.file.Paths
import scala.reflect.runtime.universe.TypeTag
class annUtilsHnsw {/*** Builds an hnsw index.** Default HNSW parameters are found to be good enough.** HNSW index requires integer based object ids, so the builder re-indexes the original objects keys into integer* keys.** For information on HNSW parameter tuning, [[https://github.com/nmslib/hnswlib/blob/master/ALGO_PARAMS.md]]** @param vectorSize features vector size* @param features objects features to build an index for* @param m a parameter for construction HNSW index* @param efConstruction a parameter for construction HNSW index* @tparam Key type of the object id in features objects* @return*/def buildHnswIndex[Key : TypeTag : Encoder](spark:SparkSession,vectorSize: Int,features: Dataset[(Key, Array[Float])],m: Int = 100,efConstruction: Int = 200): HnswIndex[Key] = {// map objects keys to integer based index to be used in the HNSW index as it only accepts integer keyimport spark.implicits._val featuresReindexed = features.rdd.zipWithIndex().map(x=>{(x._1._1,x._1._2,x._2.toInt)})     .toDF("id", "features","index_id").select("index_id", "id", "features").cache()// collect feature vectorsval featuresList = featuresReindexed.select($"index_id", $"features".cast("array<float>")).as[(Int, Array[Float])].collect()val objectIDsMap = featuresReindexed.select("index_id", "id").as[(Int, Key)].repartition(100)// build indexval index = new Index(SpaceName.COSINE, vectorSize)index.initialize(featuresList.length, m, efConstruction, (System.currentTimeMillis() / 1000).toInt)//    index.initialize(indexLength, 16, 200, (System.currentTimeMillis() / 1000).toInt)println("featuresList length",featuresList.length)// add vectors in parallel using .parfeaturesList.par.foreach {case (id: Int, vector: Array[Float]) =>index.addItem(vector, id)}// return wrapped indexnew HnswIndex(vectorSize, index, objectIDsMap)}}

2 索引存储及查找

存储索引,加载索引并分发到每个executor.然后进行ANN查找


import com.stepstone.search.hnswlib.jna.{Index, SpaceName}
import org.apache.spark.SparkFiles
import org.apache.spark.sql.{Dataset, Encoder, SparkSession}
import java.nio.file.Paths
import scala.reflect.runtime.universe.TypeTagclass HnswIndex[DstKey : TypeTag : Encoder](vectorSize: Int,index: Index,objectIDsMap: Dataset[(Int, DstKey)]) {/*** Executres KNN query using an HNSW index.** @param queryFeatures      features to generates recs for* @param minScoreThreshold  Minimum similarity/distance.* @param topK               number of top recommendations to generate per instance* @param ef                 HNSW search time parameter* @param queryNumPartitions number of partitions for query vectors* @return*/def knnQuery[SrcKey: TypeTag : Encoder](spark: SparkSession, queryFeatures: Dataset[(SrcKey, Array[Float])],minScoreThreshold: Double,topK: Int,ef: Int,queryNumPartitions: Int = 200, indexSavePath: String, m: Int, efConstruction: Int): Dataset[(SrcKey, DstKey, Double)] = {import spark.implicits._// init tmp directoryval indexLength = index.getLengthval saveLocalPath = "index"val indexLocalLocation = Paths.get(saveLocalPath)val indexFileName = indexLocalLocation.getFileName.toStringprintln("indexFileName", indexFileName)// saving index locallyindex.save(indexLocalLocation)println(index.getData(0).get().mkString(","))val saveAbsoluteLocalPath = saveLocalPathprintln("local path", indexLocalLocation.toAbsolutePath.toString)println("absolute path: ", saveAbsoluteLocalPath)// add file to spark context to be sent to running nodesspark.sparkContext.addFile(indexFileName, true)//    spark.sparkContext.addFile(indexSavePath,true)println("context path: ", SparkFiles.getRootDirectory + "/" + indexFileName)// The current interface to HNSW misses the functionality of setting the ef query time// parameter, but it's lower bounded by topK as per https://github.com/nmslib/hnswlib/blob/master/ALGO_PARAMS.md#search-parameters,// so set a large value of k as max(ef, topK) to get high recall, then cut off after getting the nearest neighbor.val k = math.max(topK, ef)// local scope vectorSizeval vectorSizeLocal = vectorSize// execute queryingqueryFeatures.repartition(queryNumPartitions).toDF("id", "features").withColumn("features", $"features".cast("array<float>")).as[(SrcKey, Array[Float])].mapPartitions((it: Iterator[(SrcKey, Array[Float])]) => {// load indexval index = new Index(SpaceName.COSINE, vectorSizeLocal)index.initialize(indexLength, m, efConstruction, (System.currentTimeMillis() / 1000).toInt)index.load(Paths.get(SparkFiles.getRootDirectory + "/" + indexFileName), indexLength)it.flatMap(x => {val idx = x._1val vector = x._2val queryTuple = index.knnQuery(vector, k)val result = queryTuple.getIds//            queryTuple.getLabels.zip(queryTuple.getCoefficients).map(qt => (idx, qt._1, 1.0 - qt._2.toDouble)).filter(_._3 >= minScoreThreshold).sortBy(_._3).reverse.slice(0, topK)result})}).as[(Int, Int, Double)].toDF("src_id", "index_id", "score").join(objectIDsMap.toDF("index_id", "dst_id"), Seq("index_id")).select("src_id", "dst_id", "score").repartition(400).as[(SrcKey, DstKey, Double)]}
}

3 word2vec向量检索实例

  • 训练word2vec模型
  • 将模型的向量取出,调用上面buildHnswIndex 构建索引
  • 分布式进行knnQuery 向量检索
import org.apache.spark.ml.feature.Word2VecModel
import org.apache.spark.ml.linalg.DenseVectorobject exampleWord2Vec {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().getOrCreate()val GraphInputModel =  "graph/model/word2vecmodel"val indexPath =  "graph/model/index"spark.udf.register("denseVec2Array",(vec:DenseVector ) => vec.toArray.map(_.toFloat))spark.udf.register("vectorSplit",(a:String)=>(a.split(',').map(_.toFloat)))import spark.implicits._val word2vec = Word2VecModel.load(GraphInputModel )println(word2vec .getVectors.schema)word2vec .getVectors.show(10)println(word2vec .getVectors.count())val itemEmbeddings = word2vec .getVectors.selectExpr("cast(word as Int) as word", "denseVec2Array(vector) features").as[(Int,Array[Float])]itemEmbeddings.show()println(itemEmbeddings.schema)val vectorsize=itemEmbeddings.take(1)(0)._2.lengthval hnswIndex = new annUtilsHnsw().buildHnswIndex(spark, vectorsize, itemEmbeddings, 20)val queryDF=hnswIndex.knnQuery[Int](spark,itemEmbeddings.limit(20),0.3,20,200,160,indexPath,20,200)queryDF .write.mode("overwrite").save(savePathMl + "graph/muiscEmbedding")}}

4 HNSW pom依赖文件

hnswlib-jna

        <dependency><groupId>com.stepstone.search.hnswlib.jna</groupId><artifactId>hnswlib-jna</artifactId><version>1.4.2</version></dependency>

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/50206.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

VUE初识

vue是一个用于构建用户界面的渐进是框架 优点&#xff0c;大大提高开发效率 缺点&#xff0c;需要记忆语法规则 构建用户界面 创建vue实例&#xff0c;初始化渲染 1&#xff1a;准备容启 2&#xff1a;引包&#xff0c;开发版本和生产版本 3.创建vue实例 new Vue&#x…

mybatis plus 控制台和日志文件中打印sql配置

1 控制台输出sql 配置mybatis-plus的日志实现类为StdOutImpl&#xff0c;该实现类中打印日志是通过System.out.println(s)的方式来打印日志的 mybatis-plus:configuration:log-impl: org.apache.imbatis.logging.stdout.StdOutImpl2 日志文件中写入sql 日志文件中输入sql需要…

分布式事务(4):两阶段提交协议与三阶段提交区别

1 两阶段提交协议 两阶段提交方案应用非常广泛&#xff0c;几乎所有商业OLTP数据库都支持XA协议。但是两阶段提交方案锁定资源时间长&#xff0c;对性能影响很大&#xff0c;基本不适合解决微服务事务问题。 缺点&#xff1a; 如果协调者宕机&#xff0c;参与者没有协调者指…

Solidity 合约安全,常见漏洞 (下篇)

Solidity 合约安全&#xff0c;常见漏洞 &#xff08;下篇&#xff09; Solidity 合约安全&#xff0c;常见漏洞 &#xff08;上篇&#xff09; 不安全的随机数 目前不可能用区块链上的单一交易安全地产生随机数。区块链需要是完全确定的&#xff0c;否则分布式节点将无法达…

Grafana 安装配置教程

Grafana 安装配置教程 一、介绍二、Grafana 安装及配置2.1 下载2.2 安装2.2.1 windows安装 - 图形界面2.2.2 linux安装 - 安装脚本 三、Grafana的基本配置3.1 登录3.2 Grafana设置中文 四、grafana基本使用 一、介绍 Grafana是一个通用的可视化工具。对于Grafana而言&#xff0…

华为OD-分积木/分苹果

题目描述 哥哥弟弟分一堆积木&#xff0c;每块积木重量不同。弟弟要求平分两组&#xff0c;每组数量可以不同但总重量必须相等。 然而弟弟只会二进制并且加法不进位。例如三块积木 3,5,6 分成两组 [3] 和 [5,6] 弟弟认为 5&#xff08;二进制1001&#xff09;加上6&#xff08…

Linux常用命令——dhcrelay命令

在线Linux命令查询工具 dhcrelay 使用dhcrelay命令可以提供中继DHCP和BOOTP请求 补充说明 dhcrelay命令使用dhcrelay命令可以提供中继DHCP和BOOTP请求&#xff0c;从一个没有DHCP服务器的子网直接连接到其它子网内的一个或多个DHCP服务器。该命令在DHCP中继服务器上使用&am…

Docker容器与虚拟化技术:Gitlab账户注册

目录 一、实验 1.gitlab 一、实验 1.gitlab (1) 概念 GitLab 是一个用于仓库管理系统的开源项目&#xff0c;使用Git作为代码管理工具&#xff0c;并在此基础上搭建起来的Web服务。 &#xff08;2&#xff09;官网 The DevSecOps Platform | GitLab &#xff08;3&#…

基于swing的火车站订票系统java jsp车票购票管理mysql源代码

本项目为前几天收费帮学妹做的一个项目&#xff0c;Java EE JSP项目&#xff0c;在工作环境中基本使用不到&#xff0c;但是很多学校把这个当作编程入门的项目来做&#xff0c;故分享出本项目供初学者参考。 一、项目描述 基于swing的火车站订票系统 系统有2权限&#xff1a;…

软考高级系统架构设计师系列论文七十:论信息系统的安全体系

软考高级系统架构设计师系列论文七十:论信息系统的安全体系 一、信息系统相关知识点二、摘要三、正文四、总结一、信息系统相关知识点 软考高级信息系统项目管理师系列之四十三:信息系统安全管理

LeetCode算法递归类—二叉树的右视图

目录 199. 二叉树的右视图 题解&#xff1a; 目标&#xff1a; 思路&#xff1a; 过程&#xff1a; 代码&#xff1a; 运行结果&#xff1a; 给定一个二叉树的 根节点 root&#xff0c;想象自己站在它的右侧&#xff0c;按照从顶部到底部的顺序&#xff0c;返回从右侧所…

构建 NodeJS 影院预订微服务并使用 docker 部署(03/4)

一、说明 构建一个微服务的电影网站&#xff0c;需要Docker、NodeJS、MongoDB&#xff0c;这样的案例您见过吗&#xff1f;如果对此有兴趣&#xff0c;您就继续往下看吧。 你好社区&#xff0c;这是&#x1f3f0;“构建 NodeJS 影院微服务”系列的第三篇文章。本系列文章演示了…

MySql 环境搭建

目录 MySql 在 CentOS 7 环境下安装。 说明&#xff1a; 1.卸载不要的环境 2.配置 mysql 官方 yum 源 3.开始安装 4.启动 mysql 5.mysql 登录 6.配置 mysql 7. 设置开机启动 MySql 在 CentOS 7 环境下安装。 说明&#xff1a; 在安装与卸载中&#xff0c;用户切换成 r…

Java请求webservice踩过的坑

最近项目对接过程中&#xff0c;因为对方系统比较旧&#xff0c;我们和对方进行交互使用webservice方式进行&#xff0c;对方给出相关文档&#xff0c; 接口地址&#xff1a;http://ip:port/abc/def/xxxService?wsdl 接口名称&#xff1a;methodA 1-springboot配合CXF使用 …

PostgreSQL+SSL链路测试

SSL一个各种证书在此就不详细介绍了&#xff0c;PostgreSQL要支持SSL的前提需要打开openssl选项&#xff0c;包括客户端和服务器端。 测试过程。 1. 生成私钥 root用户&#xff1a; mkdir -p /opt/ssl/private mkdir -p /opt/ssl/share/ca-certificateschmod 755 -R /opt/ss…

java判断ip是否为指定网段

具体网络知识原理请看这个博文 /**** param address servletRequest.getRemoteAddr();* param host servletRequest.getRemoteHost();* return* Description 检验IP是否符合安全限定*/private boolean ipIsInNet(String address, String host){Set<String> iPset allow…

缓存优化--使用Redis将项目进行优化

缓存优化–使用Redis 文章目录 缓存优化--使用Redis1、环境搭建2、缓存短信验证码2.1、实现思路2.2、代码改造 3、缓存菜品数据3.1、实现思路3.2、代码改造 问题描述&#xff1a; 用户数量多&#xff0c;系统访问量大的时候&#xff0c;用户发送大量的请求&#xff0c;导致频繁…

Laravel 框架构造器的查询表达式构造器的 Where 派生查询 ⑥

作者 : SYFStrive 博客首页 : HomePage &#x1f4dc;&#xff1a; THINK PHP &#x1f4cc;&#xff1a;个人社区&#xff08;欢迎大佬们加入&#xff09; &#x1f449;&#xff1a;社区链接&#x1f517; &#x1f4cc;&#xff1a;觉得文章不错可以点点关注 &#x1f44…

中文医学知识语言模型:BenTsao

介绍 BenTsao&#xff1a;[原名&#xff1a;华驼(HuaTuo)]: 基于中文医学知识的大语言模型指令微调 本项目开源了经过中文医学指令精调/指令微调(Instruction-tuning) 的大语言模型集&#xff0c;包括LLaMA、Alpaca-Chinese、Bloom、活字模型等。 我们基于医学知识图谱以及医…

ansible-playbook yml 查看进程

- name: 查看 sshd 进程hosts: your_hoststasks:- name: 运行 pgrep 命令查找 sshd 进程shell: pgrep sshdregister: command_result- name: 打印进程输出debug:var: command_result.stdout_linesansible-playbook process.yml stdout_lines 是变量的一个属性&#xff0c;变量结…