Elasticsearch:Retrievers 介绍 - Python Jupyter notebook

在今天的文章里,我是继上一篇文章 “Elasticsearch:介绍 retrievers - 搜索一切事物” 来使用一个可以在本地设置的 Elasticsearch 集群来展示 Retrievers 的使用。在本篇文章中,你将学到如下的内容:

  • 从 Kaggle 下载 IMDB 数据集
  • 创建两个推理服务
  • 部署 ELSER
  • 部署 e5-small
  • 创建摄取管道
  • 创建映射
  • 摄取 IMDB 数据,在摄取过程中创建嵌入
  • 缩小查询负载模型
  • 运行示例检索器

安装

 Elasticsearch 及 Kibana

 如果你还没有安装好自己的 Elasticsearch 及 Kibana,请参考如下的链接来进行安装:

  • 如何在 Linux,MacOS 及 Windows 上进行安装 Elasticsearch
  • Kibana:如何在 Linux,MacOS 及 Windows上安装 Elastic 栈中的 Kibana

在安装的时候,我们选择 Elastic Stack 8.x 来进行安装。在首次启动 Elasticsearch 的时候,我们可以看到如下的输出:

在上面,我们可以看到 elastic 超级用户的密码。我们记下它,并将在下面的代码中进行使用。

我们还可以在安装 Elasticsearch 目录中找到 Elasticsearch 的访问证书:

$ pwd
/Users/liuxg/elastic/elasticsearch-8.14.1/config/certs
$ ls
http.p12      http_ca.crt   transport.p12

在上面,http_ca.crt 是我们需要用来访问 Elasticsearch 的证书。

 我们首先克隆已经写好的代码:

git clone https://github.com/liu-xiao-guo/elasticsearch-labs

我们然后进入到该项目的根目录下:

$ pwd
/Users/liuxg/python/elasticsearch-labs/supporting-blog-content/introducing-retrievers
$ ls
retrievers_intro_notebook.ipynb

如上所示,retrievers_intro_notebook.ipynb 就是我们今天想要工作的 notebook。

我们通过如下的命令来拷贝所需要的证书:

$ pwd
/Users/liuxg/python/elasticsearch-labs/supporting-blog-content/introducing-retrievers
$ cp ~/elastic/elasticsearch-8.14.1/config/certs/http_ca.crt .
$ ls
http_ca.crt                     retrievers_intro_notebook.ipynb

安装所需要的 python 依赖包

pip3 install -qqq pandas elasticsearch python-dotenv

我们可以使用如下的方法来查看 elasticsearch 的版本:

$ pip3 list | grep elasticsearch
elasticsearch                           8.14.0

 创建环境变量

为了能够使得下面的应用顺利执行,在项目当前的目录下运行如下的命令:

export ES_ENDPOINT="localhost"
export ES_USER="elastic"
export ES_PASSWORD="uK+7WbkeXMzwk9YvP-H3"

你需要根据自己的 Elasticsearch 设置进行相应的修改。

下载数据集

我们去到地址 IMDB movies dataset | Kaggle 下载数据集并解压缩。

$ pwd
/Users/liuxg/python/elasticsearch-labs/supporting-blog-content/introducing-retrievers
$ ls
archive (13).zip                http_ca.crt                     retrievers_intro_notebook.ipynb
$ unzip archive\ \(13\).zip 
Archive:  archive (13).zipinflating: imdb_movies.csv         
$ mkdir -p content
$ mv imdb_movies.csv content/
$ tree -L 2
.
├── archive\ (13).zip
├── content
│   └── imdb_movies.csv
├── http_ca.crt
└── retrievers_intro_notebook.ipynb

如上所示,我们吧 imdb_movies.csv 文件置于当前工作目录下的 content 目录下。

代码展示

我们在当前项目的根目录下打入如下的命令:

设置

import os
import zipfile
import pandas as pd
from elasticsearch import Elasticsearch, helpers
from elasticsearch.exceptions import ConnectionTimeout
from elastic_transport import ConnectionError
from time import sleep
import time
import logging# Get the logger for 'elastic_transport.node_pool'
logger = logging.getLogger("elastic_transport.node_pool")# Set its level to ERROR
logger.setLevel(logging.ERROR)# Suppress warnings from the elastic_transport module
logging.getLogger("elastic_transport").setLevel(logging.ERROR)

连接到 Elasticsearch

from dotenv import load_dotenvload_dotenv()ES_USER = os.getenv("ES_USER")
ES_PASSWORD = os.getenv("ES_PASSWORD")
ES_ENDPOINT = os.getenv("ES_ENDPOINT")
COHERE_API_KEY = os.getenv("COHERE_API_KEY")url = f"https://{ES_USER}:{ES_PASSWORD}@{ES_ENDPOINT}:9200"
print(url)es = Elasticsearch(url, ca_certs = "./http_ca.crt", verify_certs = True)
print(es.info())

如上所示,我们的客户端连接到 Elasticsearch 是成功的。

部署 ELSER 及 E5

下面的两个代码块将部署嵌入模型并自动扩展 ML 容量。

部署及启动 ELSER

from elasticsearch.exceptions import BadRequestErrortry:resp = es.options(request_timeout=5).inference.put_model(task_type="sparse_embedding",inference_id="my-elser-model",body={"service": "elser","service_settings": {"num_allocations": 64, "num_threads": 1},},)
except ConnectionTimeout:pass
except BadRequestError as e:print(e)

如果你之前已经部署过 ELSER,你可能会得到一个 resource already exists 这样的错误。你可以使用如下的命令来删除之前的 inference_id。

DELETE /_inference/my-elser-model

在运行完上面的命令后,需要经过一定的时间下载 ELSER 模型。这个依赖于你的网络速度。我们可以在 Kibana 中进行查看:

部署及启动 es-small

try:resp = es.inference.put_model(task_type="text_embedding",inference_id="my-e5-model",body={"service": "elasticsearch","service_settings": {"num_allocations": 8,"num_threads": 1,"model_id": ".multilingual-e5-small",},},)
except ConnectionTimeout:pass
except BadRequestError as e:print(e)

在运行完上面的代码后,我们可以在 Kibana 界面中:

点击上面的 "Add trained model" 来安装 .multilingual-e5-small 模型。

我们到最后能看到这个:

整个下载及部署需要很长的时间,需要你耐心等待!

提示:如果你的机器是在 x86 架构的机器上运行的话,那么你在上面可以选择 .multilingual-e5-small_linux-x86_64 作为其 model_id

检查模型部署状态

这将循环检查,直到 ELSER 和 e5 都已完全部署。如果你在上面已经等了足够久的话,那么下面的代码讲很快地执行。

如果需要分配额外容量来运行模型,这可能需要几分钟

from time import sleep
from elasticsearch.exceptions import ConnectionTimeoutdef wait_for_models_to_start(es, models):model_status_map = {model: False for model in models}while not all(model_status_map.values()):try:model_status = es.ml.get_trained_models_stats()except ConnectionTimeout:print("A connection timeout error occurred.")continuefor x in model_status["trained_model_stats"]:model_id = x["model_id"]# Skip this model if it's not in our list or it has already startedif model_id not in models or model_status_map[model_id]:continueif "deployment_stats" in x:if ("nodes" in x["deployment_stats"]and len(x["deployment_stats"]["nodes"]) > 0):if (x["deployment_stats"]["nodes"][0]["routing_state"]["routing_state"]== "started"):print(f"{model_id} model deployed and started")model_status_map[model_id] = Trueif not all(model_status_map.values()):sleep(0.5)models = [".elser_model_2", ".multilingual-e5-small"]
wait_for_models_to_start(es, models)
.elser_model_2 model deployed and started
.multilingual-e5-small model deployed and started

创建索引模板并链接到摄取管道

template_body = {"index_patterns": ["imdb_movies*"],"template": {"settings": {"index": {"default_pipeline": "elser_e5_embed"}},"mappings": {"properties": {"budget_x": {"type": "double"},"country": {"type": "keyword"},"crew": {"type": "text"},"date_x": {"type": "date", "format": "MM/dd/yyyy||MM/dd/yyyy[ ]"},"genre": {"type": "keyword"},"names": {"type": "text"},"names_sparse": {"type": "sparse_vector"},"names_dense": {"type": "dense_vector"},"orig_lang": {"type": "keyword"},"orig_title": {"type": "text"},"overview": {"type": "text"},"overview_sparse": {"type": "sparse_vector"},"overview_dense": {"type": "dense_vector"},"revenue": {"type": "double"},"score": {"type": "double"},"status": {"type": "keyword"},}},},
}# Create the template
es.indices.put_index_template(name="imdb_movies", body=template_body)

创建采集管道

# Define the pipeline configuration
pipeline_body = {"processors": [{"inference": {"model_id": ".multilingual-e5-small","description": "embed names with e5 to names_dense nested field","input_output": [{"input_field": "names", "output_field": "names_dense"}],}},{"inference": {"model_id": ".multilingual-e5-small","description": "embed overview with e5 to names_dense nested field","input_output": [{"input_field": "overview", "output_field": "overview_dense"}],}},{"inference": {"model_id": ".elser_model_2","description": "embed overview with .elser_model_2 to overview_sparse nested field","input_output": [{"input_field": "overview", "output_field": "overview_sparse"}],}},{"inference": {"model_id": ".elser_model_2","description": "embed names with .elser_model_2 to names_sparse nested field","input_output": [{"input_field": "names", "output_field": "names_sparse"}],}},],"on_failure": [{"append": {"field": "_source._ingest.inference_errors","value": [{"message": "{{ _ingest.on_failure_message }}","pipeline": "{{_ingest.pipeline}}","timestamp": "{{{ _ingest.timestamp }}}",}],}}],
}# Create the pipeline
es.ingest.put_pipeline(id="elser_e5_embed", body=pipeline_body)

提取文档

这将

  • 进行一些预处理
  • 批量提取 10,178 条 IMDB 记录
  • 使用 ELSER 模型为 overview 和 name 字段生成稀疏向量嵌入
  • 使用 ELSER 模型为 overview 和 name 字段生成密集向量嵌入

使用上述分配设置通常需要一定的时间才能完成。这个依赖于你自己电脑的配置。

# Load CSV data into a pandas DataFrame
df = pd.read_csv("./content/imdb_movies.csv")# Replace all NaN values in DataFrame with None
df = df.where(pd.notnull(df), None)# Convert DataFrame into a list of dictionaries
# Each dictionary represents a document to be indexed
documents = df.to_dict(orient="records")# Define a function to generate actions for bulk API
def generate_bulk_actions(documents):for doc in documents:yield {"_index": "imdb_movies","_source": doc,}# Use the bulk helper to insert documents, 200 at a time
start_time = time.time()
helpers.bulk(es, generate_bulk_actions(documents), chunk_size=200)
end_time = time.time()print(f"The function took {end_time - start_time} seconds to run")

我们可以在 Kibana 中进行查看:

    

    

  

  

我们需要等一定的时间来完成上面的摄取工作。值得注意的是:在上面的代码中我把 chunk_size 设置为 20。这个是为了避免 "Connection timeout" 错误。如果我们把这个值设置很大,那么摄取的时间可能过长,那么就会发生 "Connection timeout" 这样的错误。我们在批量处理时,选择比较少的文档来完成摄取工作。有关如何设置这个 timeout 的时间,我们可以参考文章 “在 Elasticsearch 中扩展 ML 推理管道:如何避免问题并解决瓶颈”。

针对我的电脑,它花费了如下的时间来完成 10,178 个文档的摄取:

The function took 1292.8102316856384 seconds to run

这个将近20分钟。

缩小 ELSER 和 e5 模型

我们不需要大量的模型分配来进行测试查询,因此我们将每个模型分配缩小到 1 个

for model_id in [".elser_model_2","my-e5-model"]:result = es.perform_request("POST",f"/_ml/trained_models/{model_id}/deployment/_update",headers={"content-type": "application/json", "accept": "application/json"},body={"number_of_allocations": 1},)

Retriever 测试

我们将使用搜索输入 clueless slackers 在数据集中的 overview 字段(文本或嵌入)中搜索电影

请随意将下面的 movie_search 变量更改为其他内容

movie_search = "clueless slackers"

Standard - 搜索所有文本! - bm25

response = es.search(index="imdb_movies",body={"query": {"match": {"overview": movie_search}},"size": 3,"fields": ["names", "overview"],"_source": False,},
)for hit in response["hits"]["hits"]:print(f"{hit['fields']['names'][0]}\n- {hit['fields']['overview'][0]}\n")

    

kNN-搜索所有密集向量!

response = es.search(index="imdb_movies",body={"retriever": {"knn": {"field": "overview_dense","query_vector_builder": {"text_embedding": {"model_id": "my-e5-model","model_text": movie_search,}},"k": 5,"num_candidates": 5,}},"size": 3,"fields": ["names", "overview"],"_source": False,},
)for hit in response["hits"]["hits"]:print(f"{hit['fields']['names'][0]}\n- {hit['fields']['overview'][0]}\n")

  

text_expansion - 搜索所有稀疏向量! - elser

response = es.search(index="imdb_movies",body={"retriever": {"standard": {"query": {"text_expansion": {"overview_sparse": {"model_id": ".elser_model_2","model_text": movie_search,}}}}},"size": 3,"fields": ["names", "overview"],"_source": False,},
)for hit in response["hits"]["hits"]:print(f"{hit['fields']['names'][0]}\n- {hit['fields']['overview'][0]}\n")

  

rrf — 将所有事物结合起来!

response = es.search(index="imdb_movies",body={"retriever": {"rrf": {"retrievers": [{"standard": {"query": {"term": {"overview": movie_search}}}},{"knn": {"field": "overview_dense","query_vector_builder": {"text_embedding": {"model_id": "my-e5-model","model_text": movie_search,}},"k": 5,"num_candidates": 5,}},{"standard": {"query": {"text_expansion": {"overview_sparse": {"model_id": ".elser_model_2","model_text": movie_search,}}}}},],"window_size": 5,"rank_constant": 1,}},"size": 3,"fields": ["names", "overview"],"_source": False,},
)for hit in response["hits"]["hits"]:print(f"{hit['fields']['names'][0]}\n- {hit['fields']['overview'][0]}\n")

  

所有的源码可以在地址 elasticsearch-labs/supporting-blog-content/introducing-retrievers/retrievers_intro_notebook.ipynb at main · liu-xiao-guo/elasticsearch-labs · GitHub

下载。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/874457.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux云计算 |【第一阶段】SERVICES-DAY5

主要内容: 源码编译安装、rsync同步操作、inotify实时同步、数据库服务基础 实操前骤:(所需tools.tar.gz与users.sql) 1.两台主机设置SELinnx和关闭防火墙 setenforce 0 systemctl stop firewalld.service //停止防火墙 sy…

MySQL定时备份数据,并上传到oss

1.环境准备 1.安装阿里云的ossutil 2.安装mysql 2.编写脚本 脚本内容如下 #!/bin/bash # 数据库的配置信息,根据自己的情况进行填写 db_hostlocalhost db_usernameroot db_passwordroot db_namedb_root # oss 存贮数据的bucket地址 bucket_namerbsy-backup-buck…

软件更新的双刃剑:从”微软蓝屏”事件看网络安全的挑战与对策

引言 原文链接 近日,一场由微软视窗系统软件更新引发的全球性"微软蓝屏"事件震惊了整个科技界。这次事件源于美国电脑安全技术公司"众击"提供的一个带有"缺陷"的软件更新,如同一颗隐形炸弹在全球范围内引爆,…

Python面试宝典第17题:Z字形变换

题目 将一个给定字符串 s 根据给定的行数numRows ,以从上往下、从左到右进行Z字形排列。比如:输入字符串为"PAYPALISHIRING",行数为3时,排列如下。最后,你的输出需要从左往右逐行读取,产生出一个…

unity 实现图片的放大与缩小(根据鼠标位置拉伸放缩)

1创建UnityHelper.cs using UnityEngine.Events; using UnityEngine.EventSystems;public class UnityHelper {/// <summary>/// 简化向EventTrigger组件添加事件的操作。/// </summary>/// <param name"_eventTrigger">要添加事件监听的UI元素上…

DevExpress中文教程 - 如何在.NET MAUI应用中实现Material Design 3?

DevExpress .NET MAUI多平台应用UI组件库提供了用于Android和iOS移动开发的高性能UI组件&#xff0c;该组件库包括数据网格、图表、调度程序、数据编辑器、CollectionView和选项卡组件等。 获取DevExpress v24.1正式版下载 Material Design是一个由Google开发的跨平台指南系统…

HydraRPC: RPC in the CXL Era——论文阅读

ATC 2024 Paper CXL论文阅读笔记整理 问题 远程过程调用&#xff08;RPC&#xff09;是分布式系统中的一项基本技术&#xff0c;它允许函数在远程服务器上通过本地调用执行来促进网络通信&#xff0c;隐藏底层通信过程的复杂性简化了客户端/服务器交互[15]。RPC已成为数据中心…

【Hot100】LeetCode—279. 完全平方数

目录 题目1- 思路2- 实现⭐完全平方数——题解思路 3- ACM 实现 题目 原题连接&#xff1a;279. 完全平方数 1- 思路 思路 动规五部曲 2- 实现 ⭐完全平方数——题解思路 class Solution {public int numSquares(int n) {// 1. 定义 dpint[] dp new int[n1];//2. 递推公式…

论文学习记录之一种具有边缘增强特点的医学图像分割网络

标题&#xff1a;一种具有边缘增强特点的医学图像分割网络 期刊&#xff1a;电子与信息学报-&#xff08;2022年5月出刊&#xff09; 摘要&#xff1a;针对传统医学图像分割网络存在边缘分割不清晰、缺失值大等问题&#xff0c;该文提出一种具有边缘增强特点的医学图像分割网…

社交圈子小程序搭建-源码部署-服务公司

消息通知:当有新的消息、评论或回复时&#xff0c;用户需要收到系统的推送通知&#xff0c;以便及时查看和回复 活动发布与参加:用户可以在社交圈子中发布各种类型的活动&#xff0c;如聚餐、旅游、运动等。其他用户可以参加这些活动&#xff0c;并与组织者进行交流和沟通 社交…

C#初级——输出语句和转义字符

输出语句 在C#中&#xff0c;C#的输出语句是通过Console类进行输出&#xff0c;该类是一个在控制台下的一个标准输入流、输出流和错误流。使用该类下的Write()函数&#xff0c;即可打印要输出的内容。 Console.Write("Hello World!"); //在控制台应用中打印Hell…

通过QT进行服务器和客户端之间的网络通信

客户端 client.pro #------------------------------------------------- # # Project created by QtCreator 2024-07-02T14:11:20 # #-------------------------------------------------QT core gui network #网络通信greaterThan(QT_MAJOR_VERSION, 4): QT widg…

Docker安装nacos(详细教程)

Nacos 是一个开源的动态服务发现、配置管理和服务管理平台&#xff0c;广泛用于微服务架构中。在本文章中&#xff0c;博主将详细介绍如何使用 Docker 来安装 Nacos&#xff0c;以便快速启动并运行这个强大的服务管理工具。 前置条件 在开始安装 Nacos 之前&#xff0c;请确保…

pytorch 笔记:torch.optim.Adam

torch.optim.Adam 是一个实现 Adam 优化算法的类。Adam 是一个常用的梯度下降优化方法&#xff0c;特别适合处理大规模数据集和参数的深度学习模型 torch.optim.Adam(params, lr0.001, betas(0.9, 0.999), eps1e-08, weight_decay0, amsgradFalse, *, foreachNone, maximizeFa…

I2C总线二级外设驱动开发(函数和代码详解)

I2C总线二级外设驱动开发是一个涉及多个步骤和函数调用的过程&#xff0c;主要目的是使得挂接在I2C总线上的外设能够正常工作。 一、I2C总线二级外设驱动开发概述 I2C总线是一种广泛使用的串行通信总线&#xff0c;用于连接微控制器及其外围设备。在Linux内核中&#xff0c;I2…

实验四 FPGA 使用Verilog HDL设计电机运动控制程序

实验目的 1.掌握使用GPIO控制直流电机的原理。 2.掌握使用Verilog HDL设计电机运动控制程序的方法。 实验要求 采用Verilog HDL语言设计直流电机运动控制程序&#xff0c;实现直流电机的运动控制&#xff0c;并通过数码管显示当前输出的PWM波的占空比。通过按键或拔位开关可…

ArcGIS Pro不能编辑ArcGIS10.X的注记的解决办法

​ 点击下方全系列课程学习 点击学习—>ArcGIS全系列实战视频教程——9个单一课程组合系列直播回放 点击学习——>遥感影像综合处理4大遥感软件ArcGISENVIErdaseCognition 一、问题 我们利用ArcGIS Pro编辑ArcGIS10.X系列软件生成的注记要素类的时候&#xff0c;会提示不…

Apache POI-Excel入门与实战

目录 一、了解Apache POI 1.1 什么是Apache POI 1.2 为什么要使用ApaChe POI 1.3 Apache POI应用场景 1.4 Apache POI 依赖 二、Apache POI-Excel 入门案例 2.1 写入Excel文件 2.2 读取文件 四、Apache POI实战 4.1 创建一个获取天气的API 4.2高德天气请求API与响应…

怎样使用 Juicer tools 的 dump 命令将.hic文件转换为交互矩阵matrix计数文件 (Windows)

创作日志&#xff1a; 万恶的生信…一个scHiC数据集没有提供处理好的计数文件&#xff0c;需要从.hic转换。Github一个个好长的文档看了好久才定位到 juicer tools 的dump命令&#xff0c;使用起来比想象中简单。 一、下载Juicer tools 注意&#xff1a;使用Juicer tools的前提…

邮件安全篇:邮件反垃圾系统运作机制简介

1. 什么是邮件反垃圾系统&#xff1f; 邮件反垃圾系统是一种专门设计用于检测、过滤和阻止垃圾邮件的技术解决方案。用于保护用户的邮箱免受未经请求的商业广告、诈骗信息、恶意软件、钓鱼攻击和其他非用户意愿接收的电子邮件的侵扰。 反垃圾系统的常见部署形式 2. 邮件反垃圾…