python -【es】基本使用

一. 前言

在Python中使用Elasticsearch(ES)通常涉及安装Elasticsearch的Python客户端库,然后通过该库与Elasticsearch集群进行交互。

二. 基本使用

1. 安装Elasticsearch Python客户端库

首先,你需要安装elasticsearch库。你可以使用pip来安装它:

pip install elasticsearch

2. 连接到Elasticsearch集群

在Python中,你可以通过创建一个Elasticsearch对象来连接到Elasticsearch集群。

from elasticsearch import Elasticsearch# 创建Elasticsearch客户端实例
es = Elasticsearch(['http://localhost:9200'])# 检查连接是否成功
if es.ping():print("Successfully connected to Elasticsearch!")
else:print("Could not connect to Elasticsearch")

3. 执行索引操作

创建索引
在Elasticsearch中,索引类似于关系型数据库中的表。可以使用客户端实例的indices.create()方法来创建一个新的索引。

# 创建索引的请求体(这里是一个简单的例子,实际使用中可能更复杂)
index_body = {"settings": {"number_of_shards": 1,"number_of_replicas": 1},"mappings": {"properties": {"field1": {"type": "text"},"field2": {"type": "integer"}}}
}# 创建索引
es.indices.create(index='my_index', body=index_body)

添加文档
可以使用index()方法来向索引中添加文档。

# 添加文档的请求体
doc_body = {"field1": "value1","field2": 123
}# 添加文档(指定索引名和文档ID)
es.index(index='my_index', id=1, body=doc_body)

4. 执行搜索操作

使用search()方法来执行搜索查询。

# 查询DSL
query_body = {"query": {"match": {"field1": "value1"}}
}# 执行搜索
response = es.search(index='my_index', body=query_body)# 处理响应
for hit in response['hits']['hits']:print(hit['_source'])

三. 整合封装成一个类来使用

import json
import uuid
from datetime import datetime, timedeltafrom elasticsearch import Elasticsearchfrom base.common.time_format import get_isoformat_time
from configure.configure import config
from configure.server_config import logger
import time
import tracebackes_conf = config['elasticsearch']class ElasticSearchService(Elasticsearch):es_service = Nonemappings = {"properties": {# "id": {"type": "keyword"},"content": {"type": "text","analyzer": "ik_max_word","search_analyzer": "ik_smart"},"time": {"type": "date", "format": "yyyy-MM-dd'T'HH:mm:ss"},"qst_id": {"type": "keyword"},"reply_type": {"type": "integer"}}}def __init__(self, index_name, addrs, *args, **kwargs):self.max_result_window = es_conf['max_result_window']self.username = es_conf['username']self.password = es_conf['password']self.index_name = index_nameself.addrs = addrssuper().__init__(self.addrs, basic_auth=(self.username, self.password), request_timeout=3600)# 1.查询index是否存在if not self.indices.exists(index=self.index_name):self.create_index(self.index_name)if not self.ping():logger.error(f"ElasticSearchHandler Connection failed")logger.info(f"Connect to ElasticService successfully!!! addrs:{addrs}, index_name:{self.index_name}")def create_index(self, index_name):# 创建索引if not self.indices.exists(index=index_name):response = self.indices.create(index=index_name, body={})logger.info(f"Index [{index_name}] created successfully!")# 检查索引是否创建成功if not response.get('acknowledged'):logger.info(f"Failed to create index '{index_name}'. Response: {response}")return Falseself.create_mapping_session_history()self.create_index_setting()if response.get('shards_acknowledged'):logger.info(f"Index '{index_name}' All shards are acknowledged.")else:logger.info(f"Index '{index_name}' Not all shards are acknowledged.")def create_mapping_session_history(self):mapping = ElasticSearchService.mappings# 将mapping添加到索引response = self.indices.put_mapping(index=self.index_name, body=mapping)# 检查索引是否创建成功if response.get('acknowledged'):logger.info(f"Index '{self.index_name}' created successfully with mapping.")else:logger.info(f"Failed to create index '{self.index_name}'. Response: {response}")def create_index_setting(self):setting = {"number_of_replicas": "0"}# 将setting添加到索引response = self.indices.put_settings(index=self.index_name, body=setting)# 检查索引是否创建成功if response.get('acknowledged'):logger.info(f"Index '{self.index_name}' created successfully with setting.")else:logger.info(f"Failed to create index setting'{self.index_name}'. Response: {response}")def delete_index(self, index_name):res = self.indices.delete(index=index_name)logger.info(f"Index [{index_name}] deleted successfully!, res: {res}")return resdef insert_doc(self, hist_hash_id: str, doc_body: dict):"""新增数据:param hist_hash_id::param doc::return:"""try:self.index(index=self.index_name, id=hist_hash_id, body=doc_body)# 刷新索引以确保文档立即可见res = self.indices.refresh(index=self.index_name)logger.info(f"Document hist_hash_id:[{hist_hash_id}] indexed successfully!")return resexcept Exception as e:logger.error(f"Failed to index document hist_hash_id:[{hist_hash_id}]: {e}")def bulk_insert_docs(self, session_histories: list):"""批量新增数据:param chitchat_list::return:"""try:# 准备批量数据bulk_actions = []failed_list = []batch_count = 1000for i in range(0, len(session_histories), batch_count):for item in session_histories[i:i + batch_count]:doc = {# "id": item.get('id', 0),"you_feild": item.get('you_feild', ''),...}action = {"index": {  # Use "index" as the action"_index": self.index_name,# 如果需要指定文档ID,可以取消下面的注释"_id": item.get('hist_hash_id', '')}}# 将 action 和 doc 作为元组的两个元素添加到 bulk_actions 列表中bulk_actions.append(action)bulk_actions.append(doc)print(f"insert data -> {item}")response = self.bulk(index=self.index_name, body=bulk_actions)# 检查响应中的成功和失败项for item in response['items']:if item['index']['status'] != 201:failed_list.append(item)logger.info(f"Elasticsearch 批量新增完成,failed_list:{failed_list}")# 刷新索引以确保文档立即可见self.indices.refresh(index=self.index_name)return failed_listexcept Exception as e:traceback.print_exc()logger.error(f"Elasticsearch bulk insert doc error:{e}")def delete_doc_by_id(self, doc_ids):""" 删除文档 """try:failed_list = []for doc_id in doc_ids:response = self.delete(index=self.index_name, id=doc_id)# 检查响应状态if response.meta.status != 200:failed_list.append(doc_id)logger.info(f"Document with ID {doc_id} in index {self.index_name} was deleted failed!")logger.info(f"Document with ID {doc_id} in index {self.index_name} was deleted successfully.")return failed_listexcept Exception as e:traceback.print_exc()logger.error(f"Elasticsearch delete_doc error:{e}")def delete_docs_by_query_body(self, query_body):# 使用_delete_by_query API 删除所有文档# 注意:这里我们使用了一个匹配所有文档的查询:{"query": {"match_all": {}}}try:response = self.delete_by_query(index=self.index_name, body=query_body)logger.info("Deleted documents:", response['_deleted'])  # 这将显示被删除的文档数量except Exception as e:# 捕获并处理异常logger.error(f"Deleted docs error: {e}")def update_doc(self, datas: list):""" 更新文档 """try:failed_list = []for data in datas:# 更新文档(实际上是重新索引)response = self.index(index=self.index_name, id=data['doc_id'], body=data['doc'])logger.info("Update Response:", response)if response.meta.status != 200:failed_list.append(data)# 刷新索引以立即应用更改(可选)self.indices.refresh(index=self.index_name)logger.info(f"Elasticsearch update_doc finished! failed_list -> {failed_list}")except Exception as e:traceback.print_exc()logger.error(f"Elasticsearch update_doc error: {e}")def get_doc(self, doc_id):""" 获取文档数据 """try:doc = self.get(index=self.index_name, id=doc_id)['_source']return docexcept Exception as e:logger.error(f"Error retrieving document {doc_id}: {e}")return Nonedef search_index(self, query_body):"""检索文档query_body:查询体(Query DSL)"""try:logger.info(f'elasticsearch search index_name={self.index_name},query_body={query_body}')response = self.search(index=self.index_name, body=query_body)logger.info(f'elasticsearch search [{self.index_name}] response.meta.status={response.meta.status}')if response.get('_shards', {}).get('successful') == response.get('_shards', {}).get('total'):logger.info(f"Search index successful! total count={response['hits']['total']['value']}, match count={len(response['hits']['hits'])}")# logger.info(f"search response -> {response}")if response['hits']['total']['value'] > 0:return responsereturn Nonereturn Noneexcept Exception as e:traceback.print_exc()logger.error(f"ElasticService search_index error:{e}")def search_index_by_scroll_api(self, query_body):"""分页查询query_body:查询体(Query DSL)"""try:logger.info(f'elasticsearch search index_name={self.index_name},query_body={query_body}')response = self.search(index=self.index_name, body=query_body, scroll='1m')logger.info(f'elasticsearch search [{self.index_name}] response.meta.status={response.meta.status}')if response.get('_shards', {}).get('successful') == response.get('_shards', {}).get('total'):logger.info(f"Search index successful! total count={response['hits']['total']['value']}, match count={len(response['hits']['hits'])}")# logger.info(f"search response -> {response}")if response['hits']['total']['value'] > 0:return responsereturn Nonereturn Noneexcept Exception as e:traceback.print_exc()logger.error(f"ElasticService search_index error:{e}")def search_by_sql(self, sql_body):try:logger.info(f'elasticsearch search index_name={self.index_name},sql_body={sql_body}')response = self.sql.query(body=sql_body)logger.info(f'elasticsearch search [{self.index_name}] response.meta.status={response.meta.status}')if response.meta.status == 200:columns = response.get('columns')rows = response.get('rows')# 提取列名column_names = [col['name'] for col in columns]# 组织成字典格式result_dicts = []for row in rows:result_dict = {column_names[i]: row[i] for i in range(len(column_names))}result_dicts.append(result_dict)logger.info(f"Search index successful! match count={len(result_dicts)}")return result_dictsreturn []except Exception as e:traceback.print_exc()logger.error(f"ElasticService search_by_sql error:{e}")return []def get_elastic_instance(index_name, addrs):_es_service = None_wait_times = 0_try_count = 5_interval_seconds = 10for i in range(_try_count):  # 初始化后,尝试启动5次,第次间隔10秒try:_es_service = ElasticSearchService(index_name, addrs)if _es_service:logger.info(f"ElasticService initial successfully!")print(f"ElasticService initial successfully!")return _es_servicelogger.warning(f"Connect to elasticServer failed, try reconnect to elasticServer [{i}]!")except Exception as e:traceback.print_exc()logger.warning(f"初始化ElasticService失败,结果:{_es_service}, 异常原因:{str(e)}, 应用将在{_interval_seconds}秒后重新尝试.")time.sleep(_interval_seconds)es_service = None
port = es_conf['port']
host = es_conf['host']
addrs = [f"http://{host}:{port}", ]if config['elasticsearch']['enabled']:index_name = config['elasticsearch']['session_history_index']es_service = get_elastic_instance(index_name, addrs)
else:logger.info(f"[elasticsearch] 未启用! enabled -> {config['elasticsearch']['enabled']}")if __name__ == '__main__':index_name = config['elasticsearch']['session_history_index']es_service = get_elastic_instance(index_name, addrs)# 添加文档docs = [{# "id": i + 1,"you_feild": "",...} for i in range(5)]# 插入数据# es_service.insert_doc('2', doc)print(es_service.bulk_insert_docs(docs))# 删除index# print(es_service.delete_index(index_name))# 获取文档# print(es_service.get_doc('c2b27b31-80f8-4cf6-b3f2-36683b60d7da'))# logger.info(es_service.get_doc('2'))# 删除文档# logger.info(es_service.delete_doc_by_id(['f554d0e5-e4cc-4556-952b-b12cdc640fe56']))# query_body = {"query": {"match_all": {}}}# logger.info(es_service.delete_docs_by_query_body(query_body))# 更新数据# datas = [{'doc_id': 'c2b27b31-80f8-4cf6-b3f2-36683b60d7da', 'doc': {'qst_content': 'qqq'}}]# print(es_service.update_doc(datas))# 查询数据keyword = "缴清"query_body = {"query": {"multi_match": {"query": keyword,"fields": ["reply_content", "qst_content", "standard_qst"]}},"from": 0,"size": 10,"sort": [{"chat_qst_time": "desc"}]}# print(es_service.search_index(query_body))

四. 总结

以上是使用Python与Elasticsearch进行交互的基本步骤。可以根据实际需求扩展这些操作,例如处理更复杂的查询、使用聚合、批量操作等。Elasticsearch的Python客户端库提供了丰富的API,可以满足大多数与Elasticsearch交互的需求。
希望对你有所帮助!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/66011.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

GitHub Actions 自动构建和部署容器到 Azure Web App

在当今快速发展的软件开发世界中,持续集成和持续部署(CI/CD)已成为提高效率和保证质量的关键实践。本文将详细介绍如何使用 GitHub Actions 来自动构建 Docker 容器并将其部署到 Azure Web App。这个自动化流程不仅能节省大量时间,还能显著减少人为错误,让开发团队更专注于…

自行下载foremos命令

文章目录 问题描述其他小伙伴的成功解决方案,但对我不适用解决思路失败告终 最终解决成功解决思路解决步骤 问题描述 在kali系统终端中输入foremost,显示无此命令 其他小伙伴的成功解决方案,但对我不适用 解决思路 正常来说使用命令 apt-g…

LED背光驱动芯片RT9293应用电路

一)简介: RT9293 是一款高频、异步的 Boost 升压型 LED 定电流驱动控制器,其工作原理如下: 1)基本电路结构及原理 RT9293的主要功能为上图的Q1. Boost 电路核心原理:基于电感和电容的特性实现升压功能。当…

Android中创建ViewModel的几种方法

文章目录 1. 使用 `ViewModelProvider`1.1 在 `Activity` 中创建 `ViewModel`1.2 在 `Fragment` 中创建 `ViewModel`2. 使用 `ViewModelFactory`2.1 创建 `ViewModel` 和 `ViewModelFactory`2.2 在 `Activity` 或 `Fragment` 中使用 `ViewModelFactory`3. 使用 `by viewModels(…

AI大模型语音识别转文字

提取音频 本项目作用在于将常见的会议录音文件、各种语种音频文件进行转录成相应的文字,也可从特定视频中提取对应音频进行转录成文字保存在本地。最原始的从所给网址下载对应视频和音频进行处理。下载ffmpeg(https://www.gyan.dev/ffmpeg/builds/packages/ffmpeg-…

CG顶会论文阅读|《科技论文写作》硕士课程报告

文章目录 一、基本信息1.1 论文基本信息1.2 课程基本信息1.3 博文基本信息 二、论文评述(中英双语)2.1 研究问题(Research Problem)2.2 创新点(Innovation/Contribution)2.3 优点(Why this pape…

JVM实战—9.线上FGC的几种案例

大纲 1.如何优化每秒十万QPS的社交APP的JVM性能(增加S区大小 优化内存碎片) 2.如何对垂直电商APP后台系统的FGC进行深度优化(定制JVM参数模版) 3.不合理设置JVM参数可能导致频繁FGC(优化反射的软引用被每次YGC回收) 4.线上系统每天数十次FGC导致频繁卡顿的优化(大对象问题…

FreshTomato 路由器固件常见配置以及踩坑记录

本文首发于只抄博客,欢迎点击原文链接了解更多内容。 前言 在上一篇文章《网件 R6400 梅林固件恢复官方固件后刷入 FreshTomato》中,我将网件 R6400 刷入了 FreshTomato 固件,目前已经使用了大半个月了,稳定性比起 380.70_0-X7.9…

嵌入式驱动开发详解8(阻塞/非阻塞/异步通信)

文章目录 前言阻塞非阻塞异步通知后续 前言 首先来回顾一下“中断”,中断是处理器提供的一种异步机制,我们配置好中断以后就 可以让处理器去处理其他的事情了,当中断发生以后会触发我们事先设置好的中断服务函数, 在中断服务函数…

Jmeter进阶篇(31)解决java.net.BindException: Address already in use: connect报错

📚前言 近期雪雪妹妹在使用Jmeter执行压测的时候,发现了一个非常让她头疼的问题,她使用20并发跑,正确率可以达到100%,但是一旦使用200并发,就会出现大量的报错,报错内容如下: java.net.BindException: Address already in use: connectat java.net.DualStackPlainSo…

leveldb的DBSequence从哪里来,到哪里去?

(Owed by: 春夜喜雨 http://blog.csdn.net/chunyexiyu) leveldb数据库的DBSequence从哪里来,到哪里去? 大概的情形是,leveldb的记录初始DBSequence为0,随着记录的增加,记录sequence不断随着增加,并持久化…

每日一学——日志管理工具(ELK Stack)

5.1 ELK Stack 5.1.1 Elasticsearch索引机制 嘿,小伙伴们!今天我们要聊聊ELK Stack——一套由Elasticsearch、Logstash和Kibana组成的强大日志管理工具集。通过这套工具,我们可以轻松地收集、存储、搜索和可视化日志数据。首先,…

PyTorch 自动混合精度AMP Grad Scaler 源码解析:_unscale_grads_ 与 unscale_ 函数

PyTorch AMP Grad Scaler 源码解析:_unscale_grads_ 与 unscale_ 函数 引言 本文详细解析 PyTorch 自动混合精度(AMP)模块中 grad_scaler.py 文件的两个关键函数:_unscale_grads_ 和 unscale_。这些函数在梯度缩放与反缩放过程中…

docker内外如何实现ROS通信

写在前面 在一台电脑上装有docker,docker内外均装有ROS系统,现在想要实现docker内外的ROS通信,怎么办呢? 首先,因为是同一台电脑的docker内外,所以IP本身是互通的,不需要在/etc/hosts中添加IP…

双指针与滑动窗口

双指针 相向双指针 两数之和 题意是找到不同两个数使得它们相加和为target,数组有序 利用数组有序的性质,判断指针前后的区间的性质 例如:2 3 4 6 8, target 9 2 8 10 > 9, 因为非递减序列,2之后的每个数都会大等于2&…

unity开发之shader 管道介质流动特效

效果 shader graph 如果出现下面的效果,那是因为你模型的问题,建模做贴图的时候没有设置好UV映射,只需重新设置下映射即可

python +tkinter绘制彩虹和云朵

python tkinter绘制彩虹和云朵 彩虹,简称虹,是气象中的一种光学现象,当太阳光照射到半空中的水滴,光线被折射及反射,在天空上形成拱形的七彩光谱,由外圈至内圈呈红、橙、黄、绿、蓝、靛、紫七种颜色。事实…

【华为OD-E卷 - 最优资源分配 100分(python、java、c++、js、c)】

【华为OD-E卷 - 最优资源分配 100分(python、java、c、js、c)】 题目 某块业务芯片最小容量单位为1.25G,总容量为M*1.25G,对该芯片资源编号为1,2,…,M。该芯片支持3种不同的配置,分…

stable diffusion安装mov2mov

第一步: 下载mov2mov,地址:https://gitcode.com/gh_mirrors/sd/sd-webui-mov2mov 下载包到web-ui的sd-webui-aki-v4.10\extensions文件夹面解压 第二步:在文件夹中调出cmd窗口,执行下列命令, git restore…

SpringSpringBoot常用注解总结

目录 1. SpringBootApplication 2. Spring Bean 相关 2.1. Autowired 2.2. Component,Repository,Service, Controller 2.3. RestController 2.4. Scope 2.5. Configuration 3. 处理常见的 HTTP 请求类型 3.1. GET 请求 3.2. POST 请求 3.3. PUT 请求 3.4. DELETE 请…