Python-ElasticSearch客户端的封装(聚合查询、统计查询、全量数据)

目录

  • ES Python客户端介绍
  • 封装代码
  • 测试代码
  • 参考


ES Python客户端介绍

官方提供了两个客户端elasticsearch、elasticsearch-dsl

pip install elasticsearch
pip install elasticsearch-dsl

第二个是对第一个的封装,类似ORM操作数据库,可以.filter、.groupby,个人感觉很鸡肋,star数也不多。平时使用的时候一般会在kibana上测试,然后直接把query拷贝过来获取更多数据,所以这里做下第一个的封装。

封装代码

  1. 封装后依然暴露了es,方便有特殊情况下使用
  2. index一般很少改动,就直接放到对象中了,可以使用set_index修改
  3. 常用的应该是get_doc和get_doc_scroll来获取少量和全量数据

代码测试时使用的是7.17.12版本,大于此版本可能由于官方改动出异常

pip install elasticsearch==7.17.12

es.py

import random
import string
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from typing import List,Dictclass ESClient:def __init__(self, host="127.0.0.1",index="", http_auth = None):self.index = indexif http_auth is None:self.es = Elasticsearch(hosts=host)else:self.es = Elasticsearch(hosts=host, http_auth=http_auth)print("success to connect " + host)def close(self):self.es.close()# 设置索引def set_index(self,index:str):self.index = index# 创建索引def create_index(self, index_name: str, mappings=None):res = self.es.indices.create(index=index_name, mappings=mappings)return res# 删除索引def delete_index(self, index_name: str):res = self.es.indices.delete(index=index_name)return res# 获取索引def get_index(self, index_name: str):res = self.es.indices.get(index=index_name)return res# 创建文档(单个)def create_doc(self,body, _id=''.join(random.sample(string.ascii_letters+string.ascii_uppercase+string.digits,20))):res = self.es.create(index=self.index, body=body, id=_id)return res# 创建文档(批量)def create_doc_bulk(self, docs: List[Dict]):actions = []for doc in docs:action = {"_index": self.index,"_op_type": "create","_id": ''.join(random.sample(string.ascii_letters+string.ascii_uppercase+string.digits,20))}for k,v in doc.items():action[k] = vactions.append(action)res = bulk(client=self.es, actions=actions)return res# 删除文档def delete_doc(self, doc_id):res = self.es.delete(index=self.index, id=doc_id)return res# 更新文档def update_doc(self, doc_id, doc:Dict):body = {"doc" : doc}res = self.es.update(index=self.index, id=doc_id, body=body)return res# 分页获取超过100000的文档def get_doc_scroll(self,query:Dict):res = self.es.search(index=self.index,size=10000,body=query,search_type="query_then_fetch",scroll="5m")data_list = []hits = res.get("hits")scroll_id = res.get('_scroll_id')total_value = 0# total 可能为Dict或intif isinstance(hits.get('total'),Dict):total_value= hits.get('total').get('value')else:total_value = hits.get('total')if total_value>0:for data in hits.get('hits'):data_list.append(data.get('_source'))return scroll_id,data_list# 通过scroll_id分页获取后序文档def get_doc_by_scroll_id(self,scroll_id):page = self.es.scroll(scroll_id=scroll_id,scroll="5m")data_list = []scroll_id = page.get('_scroll_id')for data in page.get('hits').get('hits'):data_list.append(data)return scroll_id,data_list# 清空scroll_id,防止服务端不够用def clear_scroll(self,scroll_id):self.es.clear_scroll(scroll_id)# 获取索引的hits内容(一般用于获取文档id、总数)def get_doc_all(self):res = self.es.search(index=self.index)return res['hits']# 获取一个文档def get_doc_by_id(self, id_):res = self.es.get(index=self.index, id=id_)return res["_source"]# 获取所有文档的_source内容(小于100000)def get_doc(self,query:Dict,size:int=100000):query['size'] = sizeres = self.es.search(index=self.index,body=query)data_list = []hits = res.get("hits")total_value = 0# total 可能为Dict或intif isinstance(hits.get('total'), Dict):total_value = hits.get('total').get('value')else:total_value = hits.get('total')if total_value > 0:for data in hits.get('hits'):data_list.append(data.get('_source'))return data_list# 聚合查询(分组条件名为group_by,返回buckets)def get_doc_agg(self, query):res = self.es.search(index=self.index, body=query)return res['aggregations']['group_by'].get('buckets')# 统计查询(统计条件为stats_by,返回最值、平均值等)def get_doc_stats(self,query):res = self.es.search(index=self.index,body=query)return res['aggregations']["stats_by"]

测试代码

import unittest
from es import ESClientcli = ESClient(host="http://10.28.144.3:9200",http_auth=["elastic","changeme"])
def test_create_index():res = cli.create_index(index_name="test")print(res)def test_delete_index():res = cli.delete_index(index_name="test")print(res)def test_get_index():res = cli.get_index(index_name="test")print(res)def test_set_index():cli.set_index(index="test")def test_create_doc():body = {"name": "lady_killer9","age": 19}res = cli.create_doc(body=body)print(res)def test_create_doc_bulk():from copy import deepcopybody = {"name": "lady_killer9"}users = []for i in range(100001):tmp = deepcopy(body)tmp["age"] = iusers.append(tmp)res = cli.create_doc_bulk(docs=users)print(res)def test_get_doc_all():res = cli.get_doc_all()print(res)def test_get_doc_by_id():res = cli.get_doc_by_id("jHALXDQaENQZPM4C9EUt")print(res)def test_get_doc():query = {"query": {"match_all": {}}}res = cli.get_doc(query=query,size=20)print(res)def test_update_doc():body={"name": "lady_killer_after_update"}res = cli.update_doc(doc_id="jHALXDQaENQZPM4C9EUt",doc=body)print(res)def test_delete_doc():res = cli.delete_doc(doc_id="jHALXDQaENQZPM4C9EUt")print(res)def test_get_doc_agg():query = {"aggs": {"group_by": {"terms": {"field": "age"}}}}res = cli.get_doc_agg(query=query)print(res)def test_get_doc_stats():query = {"aggs": {"stats_by": {"stats": {"field": "age"}}}}res = cli.get_doc_stats(query=query)print(res)def test_get_doc_scroll():query = {"query": {"match_all": {}}}scroll_id,data_list = cli.get_doc_scroll(query=query)res = []while data_list:res.extend(data_list)scroll_id,data_list = cli.get_doc_by_scroll_id(scroll_id=scroll_id)print(len(res))if __name__ == '__main__':# test_delete_index()test_create_index()test_get_index()# test_set_index()# test_create_doc()# test_create_doc_bulk()# test_get_doc_all()# test_update_doc()# test_get_doc_by_id()# test_get_doc()# test_delete_doc()# test_get_doc_agg()# test_get_doc_stats()# test_get_doc_scroll()cli.close()

测试截图
在这里插入图片描述
更多python相关内容:【python总结】python学习框架梳理

本人b站账号:一路狂飚的蜗牛

有问题请下方评论,转载请注明出处,并附有原文链接,谢谢!如有侵权,请及时联系。如果您感觉有所收获,自愿打赏,可选择支付宝18833895206(小于),您的支持是我不断更新的动力。

参考

github-elasticsearch
github-elasticsearch-dsl

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/20134.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

线上通过Nginx部署前端工程,并且配置SSL

介绍、为了更好的帮助大家学习,减少歧义,IP地址我就不隐藏了,公司也是我自己的公司。你们就别来攻击了。 下面给出步骤: 一、前期准备工作 通过在目标服务器上安装宝塔面板、安装redis、mysql、nginx、jdk环境等 1、 2、前端工程通过npm run build 打…

MMDeploy安装、python API测试及C++推理

服务器配置如下: Cuda版本:11.1 Cudnn版本:8.2.0 显卡版本:RTX3090 使用转换脚本将.pth模型转换为ONNX格式 python mmdeploy/tools/deploy.py \mmdeploy/configs/mmdet/detection/detection_onnxruntime_dynamic.py \mmdetect…

go 语言实战入门案例之命令行排版词典

文章和代码已经归档至【Github仓库:https://github.com/timerring/backend-tutorial 】或者公众号【AIShareLab】回复 go 也可获取。 文章目录 命令行排版的词典生成 request body解析 response body打印结果结构完善 命令行排版的词典 先看一下用到的 API &#x…

【C++】STL——set和map及multiset和multiset的介绍及使用

🚀 作者简介:一名在后端领域学习,并渴望能够学有所成的追梦人。 🚁 个人主页:不 良 🔥 系列专栏:🛸C 🛹Linux 📕 学习格言:博观而约取&#xff0…

kafka 理论知识

1 首先要了解kafka是什么 Kafka是一个分布式的消息订阅系统 1.1 kafka存储消息的过程 消息被持久化到一个topic中,topic是按照“主题名-分区”存储的,一个topic可以分为多个partition,在parition(分区)内的每条消息都有一个有序的id号&am…

wxwidgets Ribbon使用简单实例

// RibbonSample.cpp : 定义控制台应用程序的入口点。 // #include "stdafx.h" #include <wx/wx.h> #include "wx/wxprec.h" #include "wx/app.h" #include "wx/frame.h" #include "wx/textctrl.h" #include "…

大数据学习教程:Linux 高级教程(上)

一、Linux用户与权限 1. 用户和权限的基本概念 1.1、基本概念 用户 是Linux系统工作中重要的一环, 用户管理包括 用户 与 组 管理 在Linux系统中, 不论是由本级或是远程登录系统, 每个系统都必须拥有一个账号, 并且对于不同的系统资源拥有不同的使用权限 对 文件 / 目录 的…

[C++项目] Boost文档 站内搜索引擎(1): 项目背景介绍、相关技术栈、相关概念介绍...

项目背景 Boost库是C中一个非常重要的开源库. 它实现了许多C标准库中没有涉及的特性和功能, 一度成为了C标准库的拓展库. C新标准的内容, 很大一部分脱胎于Boost库中. Boost库的高质量代码 以及 提供了更多实用方便的C组件, 使得Boost库在C开发中会被高频使用 为方便开发者学…

C语言实现定时器,定时触发函数

最近想到使用C语言实现一个简单的定时器。使用操作系统windows.h提供的多线程API就能实现 首先定义一个定时器结构体&#xff0c;包含定时时间和触发的函数指针 typedef struct Stimer{int valid;//定时器有效long timingMS;//定时时间TriggerFunc tf;//触发函数 }Stimer;创建…

【数据结构|二叉树遍历】递归与非递归实现前序遍历、中序遍历、后序遍历

递归与非递归实现二叉树的前序遍历、中序遍历、后序遍历。 二叉树图 定义 前序遍历&#xff08;Preorder Traversal&#xff09;&#xff1a; 前序遍历的顺序是先访问根节点&#xff0c;然后按照先左后右的顺序访问子节点。对于上面的二叉树&#xff0c;前序遍历的结果是&…

【FPGA IP系列】FIFO深度计算详解

FIFO(First In First Out)是一种先进先出的存储结构&#xff0c;经常被用来在FPGA设计中进行数据缓存或者匹配传输速率。 FIFO的一个关键参数是其深度&#xff0c;也就是FIFO能够存储的数据条数&#xff0c;深度设计的合理&#xff0c;可以防止数据溢出&#xff0c;也可以节省…

Stable Diffusion教程(8) - X/Y/Z 图表使用

1. 介绍 这项功能可以在 文生图/图生图 界面的左下角种 “脚本” 一栏内选择 “X/Y/Z 图表” 以启用。 它创建具有不同参数的图像网格。使用 X 类型和 Y 类型字段选择应由行和列共享的参数&#xff0c;并将这些参数以逗号分隔输入 X 值 / Y 值字段。支持整数、浮点数和范围。…

【AI】《动手学-深度学习-PyTorch版》笔记(七):自动微分

AI学习目录汇总 1、什么是自动微分 自动微分:automatic differentiation,深度学习框架通过自动计算导数,即自动微分,自动微分使系统能够随后反向传播梯度。 计算图:computational graph,根据设计好的模型,系统会构建一个计算图, 来跟踪计算是哪些数据通过哪些操作组合…

【工具使用】git基础操作1

目录 一.拉取git代码1.首次拉取命令2.使用图形化拉取代码3.Idea 开发工具拉取代码 二.查看当前状态1.查看在你上次提交之后是否有对文件进行再次修改 三.创建分支3.1.创建分支3.2.创建分支并切换至分支3.3.提交分支至远程仓 远程没有自动创建 四.查看分支4.1.查看本地分支 当前…

spring boot 2.x 使用 jpa 映射 json mysql列数据映射乱码

通过下面的依赖&#xff0c;可以将 mysql 的 json 列字段&#xff08;mysql 5.7及以上的版本支持&#xff09;&#xff0c;映射成 Java Bean <dependency><groupId>com.vladmihalcea</groupId><artifactId>hibernate-types-52</artifactId><v…

Python如何快速实现爬取网页?

首先我们对要编写的爬虫程序进行简单地分析&#xff0c;该程序可分为以下三个部分&#xff1a; 拼接 url 地址发送请求将照片保存至本地 明确逻辑后&#xff0c;我们就可以正式编写爬虫程序了。 导入所需模块 本节内容使用 urllib 库来编写爬虫&#xff0c;下面导入程序所用…

【iOS】json数据解析以及简单的网络数据请求

文章目录 前言一、json数据解析二、简单的网络数据请求三、实现访问API得到网络数据总结 前言 近期写完了暑假最后一个任务——天气预报&#xff0c;在里面用到了简单的网络数据请求以及json数据的解析&#xff0c;特此记录博客总结 一、json数据解析 JSON是一种轻量级的数据…

AP5179 高端电流采样降压恒流驱动IC SOP8 LED车灯电源驱动

产品描述 AP5179是一款连续电感电流导通模式的降压恒流源&#xff0c;用于驱动一颗或多颗串联LED输入电压范围从 5 V 到 60V&#xff0c;输出电流 最大可达 2.0A 。根据不同的输入电压和外部器件&#xff0c; 可以驱动高达数十瓦的 LED。内置功率开关&#xff0c;采用高端电流…

实验3-5 查询水果价格 (15 分)

实验3-5 查询水果价格 &#xff08;15 分&#xff09; 给定四种水果&#xff0c;分别是苹果&#xff08;apple&#xff09;、梨&#xff08;pear&#xff09;、桔子&#xff08;orange&#xff09;、葡萄&#xff08;grape&#xff09;&#xff0c;单价分别对应为3.00元/公斤、…

网络爬虫请求头中的Referer和User-Agent与代理IP的配合使用

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8EJgMcgK-1691050515642)(https://cdn.nlark.com/yuque/0/2023/png/1313150/1691048724422-2a76d7b8-3ec3-48b7-9aec-d609d09b16d4.png#averageHue%2385b0a7&clientIdu3856fd20-7701-4&fromui&…