Python-ElasticSearch客户端的封装(聚合查询、统计查询、全量数据)

目录

  • ES Python客户端介绍
  • 封装代码
  • 测试代码
  • 参考


ES Python客户端介绍

官方提供了两个客户端elasticsearch、elasticsearch-dsl

pip install elasticsearch
pip install elasticsearch-dsl

第二个是对第一个的封装,类似ORM操作数据库,可以.filter、.groupby,个人感觉很鸡肋,star数也不多。平时使用的时候一般会在kibana上测试,然后直接把query拷贝过来获取更多数据,所以这里做下第一个的封装。

封装代码

  1. 封装后依然暴露了es,方便有特殊情况下使用
  2. index一般很少改动,就直接放到对象中了,可以使用set_index修改
  3. 常用的应该是get_doc和get_doc_scroll来获取少量和全量数据

代码测试时使用的是7.17.12版本,大于此版本可能由于官方改动出异常

pip install elasticsearch==7.17.12

es.py

import random
import string
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from typing import List,Dictclass ESClient:def __init__(self, host="127.0.0.1",index="", http_auth = None):self.index = indexif http_auth is None:self.es = Elasticsearch(hosts=host)else:self.es = Elasticsearch(hosts=host, http_auth=http_auth)print("success to connect " + host)def close(self):self.es.close()# 设置索引def set_index(self,index:str):self.index = index# 创建索引def create_index(self, index_name: str, mappings=None):res = self.es.indices.create(index=index_name, mappings=mappings)return res# 删除索引def delete_index(self, index_name: str):res = self.es.indices.delete(index=index_name)return res# 获取索引def get_index(self, index_name: str):res = self.es.indices.get(index=index_name)return res# 创建文档(单个)def create_doc(self,body, _id=''.join(random.sample(string.ascii_letters+string.ascii_uppercase+string.digits,20))):res = self.es.create(index=self.index, body=body, id=_id)return res# 创建文档(批量)def create_doc_bulk(self, docs: List[Dict]):actions = []for doc in docs:action = {"_index": self.index,"_op_type": "create","_id": ''.join(random.sample(string.ascii_letters+string.ascii_uppercase+string.digits,20))}for k,v in doc.items():action[k] = vactions.append(action)res = bulk(client=self.es, actions=actions)return res# 删除文档def delete_doc(self, doc_id):res = self.es.delete(index=self.index, id=doc_id)return res# 更新文档def update_doc(self, doc_id, doc:Dict):body = {"doc" : doc}res = self.es.update(index=self.index, id=doc_id, body=body)return res# 分页获取超过100000的文档def get_doc_scroll(self,query:Dict):res = self.es.search(index=self.index,size=10000,body=query,search_type="query_then_fetch",scroll="5m")data_list = []hits = res.get("hits")scroll_id = res.get('_scroll_id')total_value = 0# total 可能为Dict或intif isinstance(hits.get('total'),Dict):total_value= hits.get('total').get('value')else:total_value = hits.get('total')if total_value>0:for data in hits.get('hits'):data_list.append(data.get('_source'))return scroll_id,data_list# 通过scroll_id分页获取后序文档def get_doc_by_scroll_id(self,scroll_id):page = self.es.scroll(scroll_id=scroll_id,scroll="5m")data_list = []scroll_id = page.get('_scroll_id')for data in page.get('hits').get('hits'):data_list.append(data)return scroll_id,data_list# 清空scroll_id,防止服务端不够用def clear_scroll(self,scroll_id):self.es.clear_scroll(scroll_id)# 获取索引的hits内容(一般用于获取文档id、总数)def get_doc_all(self):res = self.es.search(index=self.index)return res['hits']# 获取一个文档def get_doc_by_id(self, id_):res = self.es.get(index=self.index, id=id_)return res["_source"]# 获取所有文档的_source内容(小于100000)def get_doc(self,query:Dict,size:int=100000):query['size'] = sizeres = self.es.search(index=self.index,body=query)data_list = []hits = res.get("hits")total_value = 0# total 可能为Dict或intif isinstance(hits.get('total'), Dict):total_value = hits.get('total').get('value')else:total_value = hits.get('total')if total_value > 0:for data in hits.get('hits'):data_list.append(data.get('_source'))return data_list# 聚合查询(分组条件名为group_by,返回buckets)def get_doc_agg(self, query):res = self.es.search(index=self.index, body=query)return res['aggregations']['group_by'].get('buckets')# 统计查询(统计条件为stats_by,返回最值、平均值等)def get_doc_stats(self,query):res = self.es.search(index=self.index,body=query)return res['aggregations']["stats_by"]

测试代码

import unittest
from es import ESClientcli = ESClient(host="http://10.28.144.3:9200",http_auth=["elastic","changeme"])
def test_create_index():res = cli.create_index(index_name="test")print(res)def test_delete_index():res = cli.delete_index(index_name="test")print(res)def test_get_index():res = cli.get_index(index_name="test")print(res)def test_set_index():cli.set_index(index="test")def test_create_doc():body = {"name": "lady_killer9","age": 19}res = cli.create_doc(body=body)print(res)def test_create_doc_bulk():from copy import deepcopybody = {"name": "lady_killer9"}users = []for i in range(100001):tmp = deepcopy(body)tmp["age"] = iusers.append(tmp)res = cli.create_doc_bulk(docs=users)print(res)def test_get_doc_all():res = cli.get_doc_all()print(res)def test_get_doc_by_id():res = cli.get_doc_by_id("jHALXDQaENQZPM4C9EUt")print(res)def test_get_doc():query = {"query": {"match_all": {}}}res = cli.get_doc(query=query,size=20)print(res)def test_update_doc():body={"name": "lady_killer_after_update"}res = cli.update_doc(doc_id="jHALXDQaENQZPM4C9EUt",doc=body)print(res)def test_delete_doc():res = cli.delete_doc(doc_id="jHALXDQaENQZPM4C9EUt")print(res)def test_get_doc_agg():query = {"aggs": {"group_by": {"terms": {"field": "age"}}}}res = cli.get_doc_agg(query=query)print(res)def test_get_doc_stats():query = {"aggs": {"stats_by": {"stats": {"field": "age"}}}}res = cli.get_doc_stats(query=query)print(res)def test_get_doc_scroll():query = {"query": {"match_all": {}}}scroll_id,data_list = cli.get_doc_scroll(query=query)res = []while data_list:res.extend(data_list)scroll_id,data_list = cli.get_doc_by_scroll_id(scroll_id=scroll_id)print(len(res))if __name__ == '__main__':# test_delete_index()test_create_index()test_get_index()# test_set_index()# test_create_doc()# test_create_doc_bulk()# test_get_doc_all()# test_update_doc()# test_get_doc_by_id()# test_get_doc()# test_delete_doc()# test_get_doc_agg()# test_get_doc_stats()# test_get_doc_scroll()cli.close()

测试截图
在这里插入图片描述
更多python相关内容:【python总结】python学习框架梳理

本人b站账号:一路狂飚的蜗牛

有问题请下方评论,转载请注明出处,并附有原文链接,谢谢!如有侵权,请及时联系。如果您感觉有所收获,自愿打赏,可选择支付宝18833895206(小于),您的支持是我不断更新的动力。

参考

github-elasticsearch
github-elasticsearch-dsl

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/20134.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

线上通过Nginx部署前端工程,并且配置SSL

介绍、为了更好的帮助大家学习,减少歧义,IP地址我就不隐藏了,公司也是我自己的公司。你们就别来攻击了。 下面给出步骤: 一、前期准备工作 通过在目标服务器上安装宝塔面板、安装redis、mysql、nginx、jdk环境等 1、 2、前端工程通过npm run build 打…

MMDeploy安装、python API测试及C++推理

服务器配置如下: Cuda版本:11.1 Cudnn版本:8.2.0 显卡版本:RTX3090 使用转换脚本将.pth模型转换为ONNX格式 python mmdeploy/tools/deploy.py \mmdeploy/configs/mmdet/detection/detection_onnxruntime_dynamic.py \mmdetect…

go 语言实战入门案例之命令行排版词典

文章和代码已经归档至【Github仓库:https://github.com/timerring/backend-tutorial 】或者公众号【AIShareLab】回复 go 也可获取。 文章目录 命令行排版的词典生成 request body解析 response body打印结果结构完善 命令行排版的词典 先看一下用到的 API &#x…

【C++】STL——set和map及multiset和multiset的介绍及使用

🚀 作者简介:一名在后端领域学习,并渴望能够学有所成的追梦人。 🚁 个人主页:不 良 🔥 系列专栏:🛸C 🛹Linux 📕 学习格言:博观而约取&#xff0…

kafka 理论知识

1 首先要了解kafka是什么 Kafka是一个分布式的消息订阅系统 1.1 kafka存储消息的过程 消息被持久化到一个topic中,topic是按照“主题名-分区”存储的,一个topic可以分为多个partition,在parition(分区)内的每条消息都有一个有序的id号&am…

wxwidgets Ribbon使用简单实例

// RibbonSample.cpp : 定义控制台应用程序的入口点。 // #include "stdafx.h" #include <wx/wx.h> #include "wx/wxprec.h" #include "wx/app.h" #include "wx/frame.h" #include "wx/textctrl.h" #include "…

大数据学习教程:Linux 高级教程(上)

一、Linux用户与权限 1. 用户和权限的基本概念 1.1、基本概念 用户 是Linux系统工作中重要的一环, 用户管理包括 用户 与 组 管理 在Linux系统中, 不论是由本级或是远程登录系统, 每个系统都必须拥有一个账号, 并且对于不同的系统资源拥有不同的使用权限 对 文件 / 目录 的…

[C++项目] Boost文档 站内搜索引擎(1): 项目背景介绍、相关技术栈、相关概念介绍...

项目背景 Boost库是C中一个非常重要的开源库. 它实现了许多C标准库中没有涉及的特性和功能, 一度成为了C标准库的拓展库. C新标准的内容, 很大一部分脱胎于Boost库中. Boost库的高质量代码 以及 提供了更多实用方便的C组件, 使得Boost库在C开发中会被高频使用 为方便开发者学…

C语言实现定时器,定时触发函数

最近想到使用C语言实现一个简单的定时器。使用操作系统windows.h提供的多线程API就能实现 首先定义一个定时器结构体&#xff0c;包含定时时间和触发的函数指针 typedef struct Stimer{int valid;//定时器有效long timingMS;//定时时间TriggerFunc tf;//触发函数 }Stimer;创建…

【数据结构|二叉树遍历】递归与非递归实现前序遍历、中序遍历、后序遍历

递归与非递归实现二叉树的前序遍历、中序遍历、后序遍历。 二叉树图 定义 前序遍历&#xff08;Preorder Traversal&#xff09;&#xff1a; 前序遍历的顺序是先访问根节点&#xff0c;然后按照先左后右的顺序访问子节点。对于上面的二叉树&#xff0c;前序遍历的结果是&…

Stable Diffusion教程(8) - X/Y/Z 图表使用

1. 介绍 这项功能可以在 文生图/图生图 界面的左下角种 “脚本” 一栏内选择 “X/Y/Z 图表” 以启用。 它创建具有不同参数的图像网格。使用 X 类型和 Y 类型字段选择应由行和列共享的参数&#xff0c;并将这些参数以逗号分隔输入 X 值 / Y 值字段。支持整数、浮点数和范围。…

【工具使用】git基础操作1

目录 一.拉取git代码1.首次拉取命令2.使用图形化拉取代码3.Idea 开发工具拉取代码 二.查看当前状态1.查看在你上次提交之后是否有对文件进行再次修改 三.创建分支3.1.创建分支3.2.创建分支并切换至分支3.3.提交分支至远程仓 远程没有自动创建 四.查看分支4.1.查看本地分支 当前…

【iOS】json数据解析以及简单的网络数据请求

文章目录 前言一、json数据解析二、简单的网络数据请求三、实现访问API得到网络数据总结 前言 近期写完了暑假最后一个任务——天气预报&#xff0c;在里面用到了简单的网络数据请求以及json数据的解析&#xff0c;特此记录博客总结 一、json数据解析 JSON是一种轻量级的数据…

AP5179 高端电流采样降压恒流驱动IC SOP8 LED车灯电源驱动

产品描述 AP5179是一款连续电感电流导通模式的降压恒流源&#xff0c;用于驱动一颗或多颗串联LED输入电压范围从 5 V 到 60V&#xff0c;输出电流 最大可达 2.0A 。根据不同的输入电压和外部器件&#xff0c; 可以驱动高达数十瓦的 LED。内置功率开关&#xff0c;采用高端电流…

PHP8的运算符-PHP8知识详解

运算符是可以通过给出的一或多个值&#xff08;用编程行话来说&#xff0c;表达式&#xff09;来产生另一个值&#xff08;因而整个结构成为一个表达式&#xff09;的东西。 PHP8的运算符有很多&#xff0c;按类型分有一元运算符、二元运算符、三元运算符。 一元运算符只对一…

选择适合的项目管理系统,了解有哪些选择和推荐

随着科技的进步和全球竞争的加剧&#xff0c;项目管理已经成为企业成功的关键要素。为了更好地组织和监控项目&#xff0c;许多企业和组织正在采用项目管理系统(PMS)。本文将探讨项目管理系统的主要组成部分以及其在实际应用中的优势。 “项目管理系统有哪些?国际上比较常见的…

侧边栏的打开与收起

侧边栏的打开与收起 <template><div class"box"><div class"sideBar" :class"showBox ? : controller-box-hide"><div class"showBnt" click"showBox!showBox"><i class"el-icon-arrow-r…

天气API强势对接

&#x1f935;‍♂️ 个人主页&#xff1a;香菜的个人主页&#xff0c;加 ischongxin &#xff0c;备注csdn ✍&#x1f3fb;作者简介&#xff1a;csdn 认证博客专家&#xff0c;游戏开发领域优质创作者,华为云享专家&#xff0c;2021年度华为云年度十佳博主 &#x1f40b; 希望…

分布式系统的 38 个知识点

天天说分布式分布式&#xff0c;那么我们是否知道什么是分布式&#xff0c;分布式会遇到什么问题&#xff0c;有哪些理论支撑&#xff0c;有哪些经典的应对方案&#xff0c;业界是如何设计并保证分布式系统的高可用呢&#xff1f; 1. 架构设计 这一节将从一些经典的开源系统架…

静态路由下一跳地址怎么确定(静态路由配置及讲解)

一、用到的所有命令及功能 ①ip route-static 到达网络地址 子网掩码 下一跳 // 配置静态路由下一跳指的是和当前网络直接连接的路由器的接口地址非直连网段必须全部做路由路径是手工指定的&#xff0c;在大规模网络上不能用&#xff0c;效率低&#xff0c;路径是固定的稳定的…