python多方式操作elasticsearch介绍

python多方式操作elasticsearch介绍

1. requests模块操作ES

requests 是一个 Python HTTP 库,它简化了发送 HTTP 请求和处理响应的过程。通过 requests 模块,开发人员可以轻松地与 Web 服务进行通信,包括获取网页内容、执行 API 请求等。requests 提供了简洁而直观的 API,使得发送 GET、POST、PUT、DELETE 等类型的请求变得容易。它支持各种认证方式、持久连接、会话管理、文件上传等功能,同时提供了丰富的响应处理方法,包括 JSON 解析、内容解码、状态码检查等。由于其简单易用的特点,requests 成为了 Python 社区中最受欢迎的 HTTP 库之一,被广泛应用于网络爬虫、Web 开发和数据采集等场景。

  • 使用requests库操作ElasticSearch的基本方法
    1.创建/更新文档:使用PUT方法
    2.获取文档:使用GET方法
    3.删除文档:使用DELETE方法
    4.搜索文档:使用GET方法,并在URL中添加搜索参数
    pip install requests
import requestses_url = 'http://wangting_host:9200'#### 通过接口简单获取API信息
res = requests.get(f'{es_url}/_cat/nodes')
print(res.text)"""
Output:
192.170.0.181 28 92 0 0.00 0.01 0.05 cdfhilmrstw - ops03
192.170.0.150 43 98 0 0.01 0.04 0.05 cdfhilmrstw - ops01
192.170.0.13  24 87 0 0.01 0.02 0.05 cdfhilmrstw * ops02
"""#### 创建/更新文档方法 (存在更新,不存在创建)
def create_or_update_doc(index_name, doc_id, document):url = f'{es_url}/{index_name}/_doc/{doc_id}'response = requests.put(url, json=document)print(response.json())# 调用方法实现功能(调用通常使用main函数,这里直接使用)
# 调用创建
index_name = 'test0329'
doc_id = '1'
document = {'标题': 'Python ElasticSearch','正文': 'ElasticSearch is a great tool for full-text search.'
}create_or_update_doc(index_name, doc_id, document)
"""
Output:
{'_index': 'test0329', '_id': '1', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 0, '_primary_term': 1}命令行执行效果:
GET /_cat/indices?vhealth status index              uuid                   pri rep docs.count docs.deleted store.size pri.store.size
green  open   test0329           HPGaPO2xQiGNOVpozZ5wSg   1   1          1            0     11.3kb          5.6kb
"""#### 获取文档方法
def get_doc(index_name, doc_id):url = f'{es_url}/{index_name}/_doc/{doc_id}'response = requests.get(url)print(response.json())get_doc(index_name, doc_id)
"""
Output:
{'_index': 'test0329', '_id': '1', '_version': 1, '_seq_no': 0, '_primary_term': 1, 'found': True, '_source': {'标题': 'Python ElasticSearch', '正文': 'ElasticSearch is a great tool for full-text search.'}}
"""#### 搜索文档方法
def search_doc(index_name, query):url = f'{es_url}/{index_name}/_search'response = requests.get(url, params=query)print(response.json())query = {'q': 'Python'}
search_doc(index_name, query)"""
Output:
{'took': 9, 'timed_out': False, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 1, 'relation': 'eq'}, 'max_score': 0.2876821, 'hits': [{'_index': 'test0329', '_id': '1', '_score': 0.2876821, '_source': {'标题': 'Python ElasticSearch', '正文': 'ElasticSearch is a great tool for full-text search.'}}]}}
"""    #### 删除文档方法
def delete_doc(index_name, doc_id):url = f'{es_url}/{index_name}/_doc/{doc_id}'response = requests.delete(url)print(response.json())# 调用删除
delete_doc(index_name, doc_id)"""
再次使用get_doc获取方法验证
get_doc(index_name, doc_id)
Output:
{'_index': 'test0329', '_id': '1', 'found': False}
"""

2. elasticsearch模块操作ES

elasticsearch 是 Python 中用于与 Elasticsearch 进行交互的官方库,它提供了一种方便的方式来执行各种操作,包括索引、搜索、删除文档等。这个库封装了与 Elasticsearch REST API 的交互细节,使得开发人员能够更轻松地与 Elasticsearch 集群进行通信。

使用 elasticsearch 库,您可以轻松地连接到 Elasticsearch 集群,并执行各种操作。

  1. 连接到 Elasticsearch 集群:使用 Elasticsearch 类连接到 Elasticsearch 集群,可以指定多个节点,并支持 HTTP Basic 认证和其他连接参数。

  2. 索引文档:使用 index 方法可以将文档索引到 Elasticsearch 中,您可以指定索引名称、文档类型、文档 ID 和文档内容。

  3. 获取文档:使用 get 方法可以根据索引名称、文档类型和文档 ID 获取特定文档的内容。

  4. 搜索文档:使用 search 方法可以执行搜索操作,您可以指定查询条件、排序规则、分页参数等。

  5. 删除文档:使用 delete 方法可以根据索引名称、文档类型和文档 ID 删除特定文档。

  6. 批量操作elasticsearch 库支持批量索引、更新和删除文档,可以显著提高性能。

  7. 异常处理elasticsearch 库提供了对 Elasticsearch 返回的各种错误和异常的处理机制,使得开发人员能够更好地处理异常情况。

  8. 灵活性elasticsearch 库允许您以不同的方式指定查询条件、索引文档和执行其他操作,以满足各种需求。

pip install elasticsearch

2-1. elasticsearch模块基本用法

from elasticsearch import Elasticsearchdef create_index(client, index_name):if not client.indices.exists(index=index_name):result = client.indices.create(index=index_name)print(f'< def create_index >: Index[{index_name}] created successfully! {result}')else:print(f'< def create_index >: Index[{index_name}] already exists!')def delete_index(client, index_name):if client.indices.exists(index=index_name):result = client.indices.delete(index=index_name)print(f'< def delete_index >: Index[{index_name}] {result}')def insert_data(client, index_name, document_id, data):if client.indices.exists(index=index_name):result = client.index(index=index_name, id=document_id, body=data)print(f'< def insert_data >: Index[{index_name}] {result}')else:print(f'< def insert_data >: Index[{index_name}] is not exists')def query_data(client, index_name, query):if client.indices.exists(index=index_name):result = client.search(index=index_name, body=query)clean_data = dict(result)['hits']['hits'][0]['_source']for key, value in clean_data.items():print(f"{key}:{value}")else:print("index is not exists")def delete_data(client, index_name, document_id):result = client.delete(index=index_name, id=document_id)print(f'< def delete_data >: {result}')def main():es_url = "http://wangting_host:9200"client = Elasticsearch(es_url)index_name = "user_login"document_id = "1"data = {"username": "wangting_666","password": "12345678","phone": "13813812345"}query = {'query': {'match_all': {}}}create_index(client, index_name)insert_data(client, index_name, document_id, data)query_data(client, index_name, query)delete_data(client, index_name, document_id)delete_index(client, index_name)if __name__ == "__main__":main()##### 控制台输出 #####
### create_index
< def create_index >: Index[user_login] already exists!### insert_data
< def insert_data >: Index[user_login] {'_index': 'user_login', '_id': '1', '_version': 2, 'result': 'updated', '_shards': {'total': 2, 'successful': 2, 'failed': 0}, '_seq_no': 1, '_primary_term': 1}### query_data
username:wangting_666
password:12345678
phone:13813812345### delete_data
< def delete_data >: {'_index': 'user_login', '_id': '1', '_version': 3, 'result': 'deleted', '_shards': {'total': 2, 'successful': 2, 'failed': 0}, '_seq_no': 2, '_primary_term': 1}### delete_index
< def delete_index >: Index[user_login] {'acknowledged': True}

2-2. elasticsearch模块业务使用方式

  • 生产环境使用demo

README.md :

项目描述:
本项目demo是一个使用 Python 编写的示例代码,演示了如何使用 ElasticSearch 的官方 Python 客户端库 elasticsearch 进行索引管理和文档操作。项目主要包括创建索引、定义映射、插入文档、更新文档、查询文档和删除文档等功能,并且利用 Python 内置的 logging 模块实现了日志记录功能,将运行时的信息输出到指定的日志文件中。功能概述:1. 初始化 ES 连接:根据配置文件中的信息初始化 ElasticSearch 连接。
2. 创建索引:如果指定的索引不存在,则创建新的索引;如果索引已存在,则忽略。
3. 删除索引:如果指定的索引存在,则删除该索引。
4. 定义映射:为指定索引定义文档的映射。
5. 插入文档:向指定索引中插入新的文档。
6. 更新文档:更新指定索引中的文档。
7. 查询文档:在指定索引中搜索文档。
8. 删除文档:从指定索引中删除文档。日志记录:
项目使用 logging 模块记录运行时的信息,包括索引的创建、映射的定义、文档的插入、更新、查询和删除等操作,将日志信息输出到项目根目录下的 log 目录中的 elasticsearch_dsl.log 文件中,方便开发者查看和调试。使用说明:1. 在配置文件 config.ini 中配置 ElasticSearch 的主机地址。
2. 运行 ElasticSearch_prd.py 文件,即可执行项目中的示例代码,演示 ElasticSearch 的索引管理和文档操作功能。该项目提供了一个简单而完整的示例,可供开发者学习和参考,帮助理解如何使用 elasticsearch-dsl 库进行 ElasticSearch 的操作。目录结构
ElasticSearch/
│
├── conf/                      # 配置文件目录
│   └── config.ini             # ElasticSearch 配置文件
│
├── log/                       # 日志文件目录
│   └── elasticsearch_dsl.log  # 日志文件
│
├── ElasticSearch_prd.py     # 项目主文件
│
└── README.md                  # 项目说明文件配置文件:
config.ini[elasticsearch]
host = http://wangting_host:9200

ElasticSearch_prd.py 完整代码:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# @Project  :ElasticSearch
# @File     :ElasticSearch-dsl.py
# @Time     :2024/3/30 20:24
# @Author   :wangting_666import os
import logging
import configparser
from elasticsearch import Elasticsearch# 设置日志格式和级别
log_dir = "./log"
if not os.path.exists(log_dir):os.makedirs(log_dir)logging.basicConfig(level=logging.INFO,format="%(asctime)s [%(levelname)s] %(message)s",handlers=[logging.FileHandler(os.path.join(log_dir, "elasticsearch_dsl.log")),logging.StreamHandler()]
)# 读取ES配置
config = configparser.ConfigParser()
config.read('./conf/config.ini')
es_host = config.get('elasticsearch', 'host')# 初始化ES连接
def init_es_client(host):return Elasticsearch(hosts=[host])# 创建索引方法
def create_index(es, index_name):"""创建索引,如果索引已存在则忽略"""if not es.indices.exists(index=index_name):es.indices.create(index=index_name)logging.info(f"Index '{index_name}' created successfully")else:logging.info(f"Index '{index_name}' already exists")# 删除索引方法
def delete_index(es, index_name):if es.indices.exists(index=index_name):es.indices.delete(index=index_name)logging.info(f"Index '{index_name}' deleted successfully")# 定义映射方法
def define_mapping(es, index_name, mapping):"""为索引定义映射"""es.indices.create(index=index_name, body=mapping, ignore=400)logging.info(f"Mapping for index '{index_name}' defined successfully")# 插入文档
def insert_document(es, index_name, doc_id=None, document=None):"""插入文档到指定索引"""es.index(index=index_name, id=doc_id, body=document)logging.info(f"Document inserted into index '{index_name}' with ID '{doc_id}'")# 更新文档
def update_document(es, index_name, doc_id=None, updated_doc=None):"""更新指定ID的文档"""es.update(index=index_name, id=doc_id, body={"doc": updated_doc})logging.info(f"Document with ID '{doc_id}' in index '{index_name}' updated successfully")# 查询文档
def search_documents(es, index_name, query):"""在指定索引中搜索文档"""result = es.search(index=index_name, body=query)logging.info(f"Search result for index '{index_name}': {result}")return result# 删除文档
def delete_document(es, index_name, doc_id=None):"""删除指定ID的文档"""es.delete(index=index_name, id=doc_id)logging.info(f"Document with ID '{doc_id}' deleted from index '{index_name}'")def main():es = init_es_client(es_host)index_name = "wangting_666"create_index(es, index_name)mapping = {"mappings": {"properties": {"name": {"type": "text"},"age": {"type": "integer"},"email": {"type": "keyword"}}}}define_mapping(es, index_name, mapping)doc = {"name": "小米su7","age": 18,"email": "wang@xiaomi.com"}insert_document(es, index_name, doc_id="1", document=doc)update_document(es, index_name, doc_id="1", updated_doc={"age": 66})query = {'query': {'match_all': {}}}search_documents(es, index_name, query)delete_document(es, index_name, doc_id="1")delete_index(es, index_name)if __name__ == "__main__":main()

代码运行日志内容示例:

elasticsearch_dsl.log

2024-03-30 20:08:41,718 [INFO] HEAD http://wangting_host:9200/wangting_666 [status:404 duration:0.098s]
2024-03-30 20:08:41,857 [INFO] PUT http://wangting_host:9200/wangting_666 [status:200 duration:0.140s]
2024-03-30 20:08:41,857 [INFO] Index 'wangting_666' created successfully
2024-03-30 20:08:41,947 [INFO] PUT http://wangting_host:9200/wangting_666 [status:400 duration:0.063s]
2024-03-30 20:08:41,947 [INFO] Mapping for index 'wangting_666' defined successfully
2024-03-30 20:08:42,014 [INFO] PUT http://wangting_host:9200/wangting_666/_doc/1 [status:201 duration:0.066s]
2024-03-30 20:08:42,014 [INFO] Document inserted into index 'wangting_666' with ID '1'
2024-03-30 20:08:42,059 [INFO] POST http://wangting_host:9200/wangting_666/_update/1 [status:200 duration:0.044s]
2024-03-30 20:08:42,059 [INFO] Document with ID '1' in index 'wangting_666' updated successfully
2024-03-30 20:08:42,097 [INFO] POST http://wangting_host:9200/wangting_666/_search [status:200 duration:0.038s]
2024-03-30 20:08:42,097 [INFO] Search result for index 'wangting_666': {'took': 0, 'timed_out': False, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 0, 'relation': 'eq'}, 'max_score': None, 'hits': []}}
2024-03-30 20:08:42,162 [INFO] DELETE http://wangting_host:9200/wangting_666/_doc/1 [status:200 duration:0.064s]
2024-03-30 20:08:42,162 [INFO] Document with ID '1' deleted from index 'wangting_666'
2024-03-30 20:08:42,215 [INFO] HEAD http://wangting_host:9200/wangting_666 [status:200 duration:0.053s]
2024-03-30 20:08:42,283 [INFO] DELETE http://wangting_host:9200/wangting_666 [status:200 duration:0.068s]
2024-03-30 20:08:42,284 [INFO] Index 'wangting_666' deleted successfully

验证执行过程是否有问题,可以先不执行delete相关方法

执行代码后,在kibana上查询数据验证

3. elasticsearch-dsl模块使用

3-1. 什么是 elasticsearch-dsl?

​ Elasticsearch DSL(Domain Specific Language 领域特定语言)是 Elasticsearch 官方提供的一个 Python 客户端库,它允许开发者以一种更加 Pythonic 和直观的方式与 Elasticsearch 进行交互和查询。DSL 不是一种编程语言,而是一种专门针对某一领域(如 Elasticsearch 查询语言)设计的语言。在 Elasticsearch 中,DSL 用于构建复杂的搜索查询、聚合操作和过滤条件。

​ Elasticsearch DSL 提供了一个面向对象的接口,使得开发者可以使用 Python 中的类和方法来构建 Elasticsearch 查询,而不必直接编写 JSON 查询体。这种方式使得代码更加清晰易懂,并且可以利用 Python 的强大功能来构建动态查询。通过 Elasticsearch DSL,开发者可以以更加高效和灵活的方式构建 Elasticsearch 查询,同时还能够利用 Python 生态系统中丰富的工具和库来处理查询结果。

​ Elasticsearch DSL 使得 Python 开发者简化了与 Elasticsearch 的交互过程,并提供了更加直观和易于理解的接口。

3-2. elasticsearch-dsl特点

  • 简化了与 Elasticsearch 的交互过程,使得代码更加易于理解和维护。
  • 提供了一种更加直观的方式来构建查询和聚合操作,无需直接操作 JSON。
  • 支持类型检查和自动完成,减少了错误的可能性。
  • 可以更加灵活地构建动态查询,根据不同的条件生成不同的查询语句。

3-3. elasticsearch-dsl 的基本构件

在 elasticsearch-dsl 中,主要的构件包括:

  • 查询(Queries)
  • 过滤器(Filters)
  • 聚合(Aggregations)
  • 排序(Sorting)
  • 分页(Pagination)

3-4. elasticsearch-dsl使用

elasticsearch-dsl模块功能非常多,常见功能如表

方法功能代码示例
Search(index)创建一个搜索请求对象s = Search(index='my_index')
query(...)设置查询条件s = s.query('match', title='python')
filter(...)设置过滤条件s = s.filter('term', category='programming')
sort(...)设置排序条件s = s.sort('title')
source(...)设置返回字段s = s.source(['title', 'category'])
highlight(...)设置高亮显示字段s = s.highlight('title')
aggs.bucket(...)添加一个聚合桶s.aggs.bucket('by_category', 'terms', field='category')
aggs.metric(...)添加一个聚合指标s.aggs.metric('avg_age', 'avg', field='age')
execute()执行搜索请求response = s.execute()
response.hits.total.value获取搜索结果的总数total_hits = response.hits.total.value
response.hits.hits获取搜索结果的文档列表hits = response.hits.hits
response.aggregations获取聚合结果aggs_result = response.aggregations
Q('query_string', query=...)创建一个查询字符串查询q = Q('query_string', query='python')
A('terms', field=...)创建一个术语聚合a = A('terms', field='category')
Index(name)创建一个索引对象index = Index('my_index')
index.create()创建索引index.create()
index.delete()删除索引index.delete()
3-4-1. 匹配查询(Match Query)

匹配查询用于查找包含指定文本的文档。

from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search, Q# 创建 Elasticsearch 客户端
client = Elasticsearch(['http://wangting_host:9200'])# 创建一个 Search 对象
s = Search(using=client, index='storyline_base')# 匹配查询
s1 = s.query(Q('match', raw_text='碧桂园'))# 执行查询
response = s.execute()# 处理查询结果
for hit in response:print(hit.raw_text)# Output:
"""
标题:碧桂园简介(碧桂园简介)
标题:碧桂园(2007.HK)跌超4%,报1.56港元,碧桂园6月销售额创新低|碧桂园
标题:碧桂园被冻结3.8亿存款|碧桂园
标题:碧桂园杨惠妍:碧桂园不是家族企业
标题:碧桂园投资版图盘点 碧桂园
标题:碧桂园天誉(碧桂园简介)
标题:碧桂园近期转让多家公司股权|碧桂园
标题:中国平安(601318.SH):有关碧桂园的报道完全与事实不符 公司未持有碧桂园的股份|碧桂园
标题:森鹰窗业:公司正在履行合同中无碧桂园相关项目,也不存在碧桂园相关项目应收账款|碧桂园
标题:碧桂园新开楼盘排名(碧桂园山河城)
"""
3-4-2. 多字段匹配查询(Multi-match Query)

多字段匹配查询用于在多个字段中查找包含指定文本的文档。

from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search, Q# 创建 Elasticsearch 客户端
client = Elasticsearch(['http://wangting_host:9200'])# 创建一个 Search 对象
s = Search(using=client, index='storyline_base')# 多字段匹配查询
s = s.query(Q('match', raw_text='碧桂园') & Q('match', news_id='66432104581263876650'))# 执行查询
response = s.execute()# 处理查询结果
for hit in response:print(hit.doc_id, hit.news_id, hit.pubtime, hit.raw_text)# Output:
"""
f46b1be478ff3bb94f9b1b8f4d6283ce 66432104581263876650 2023-07-28 09:54:56 标题:港股房地产股多数走强,碧桂园涨近4%
1d06a2c58f536028da3de03db66d540b 66432104581263876650 2023-07-28 09:54:56 摘要:香港股市中的大多数房地产股票表现强劲
"""
3-4-3. 范围查询(Range Query)

范围查询用于查找字段值在指定范围内的文档。

from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search, Q# 创建 Elasticsearch 客户端
client = Elasticsearch(['http://wangting_host:9200'])# 创建一个 Search 对象
s = Search(using=client, index='storyline_base')# 添加范围查询
s = s.query(Q('range', news_id={'gte': 2073620092894144610, 'lte': 2073620092894144630}))# 执行查询
response = s.execute()# 处理查询结果
for hit in response:print(hit.news_id, hit.raw_text)# Output:
"""
2073620092894144620 标题:华为杨超斌:持续创新引领数字时代
2073620092894144623 摘要:华为公司高级副总裁杨超斌在2023年巴塞罗那世界移动通信大会上发表主题演讲,强调5G技术的发展推动社会进入智能世界。
"""
3-4-4. 通配符查询(Wildcard Query)

通配符查询用于查找符合指定模式的文档。

from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search, Q# 创建 Elasticsearch 客户端
client = Elasticsearch(['http://wangting_host:9200'])# 创建一个 Search 对象
s = Search(using=client, index='storyline_base')# 通配符查询
s = s.query(Q('wildcard', doc_id='3dd7e3*'))# 执行查询
response = s.execute()# 处理查询结果
for hit in response:print(hit.doc_id, hit.raw_text)# Output:
"""
3dd7e3e90822340543f8a265ae564f9d 标题:华为杨超斌:持续创新引领数字时代
"""

4. Elasticsearch服务python日常巡检监控

es_check.py脚本内容

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
import requests
from elasticsearch import Elasticsearch
import datetime
import os
import logging# 设置日志格式和级别
log_dir = "./log"
if not os.path.exists(log_dir):os.makedirs(log_dir)logging.basicConfig(level=logging.INFO,format="%(asctime)s [%(levelname)s] %(message)s",handlers=[logging.FileHandler(os.path.join(log_dir, "es_check.log")),logging.StreamHandler()]
)# Elasticsearch配置信息
es_host = 'http://wangting_host:9200'
es = Elasticsearch(es_host)# 函数:检查 Elasticsearch 集群是否可访问
def check_elasticsearch_availability():try:response = requests.get(es_host)if response.status_code == 200:logging.info(f'es {es_host} 健康,可以访问')else:logging.error(f'es {es_host} 不可访问! Status code:{response.status_code}')except requests.ConnectionError:logging.error("连接错误。es无法正常连接")# 获取集群健康状态
def check_cluster_health():health = es.cluster.health()logging.info(f"集群整体健康状态: {health['status']}")# 函数:检查节点信息
def check_node_info():try:response = requests.get(f'{es_host}/_nodes')if response.status_code == 200:nodes_info = response.json()nodes_count = len(nodes_info['nodes'])logging.info("es集群节点 node 个数: %d", nodes_count)else:logging.error("获取集群节点信息数据失败: %d", response.status_code)except requests.ConnectionError:logging.error("连接错误。es无法正常连接")# 获取所有索引的状态
def check_index_status():indices = es.indices.stats()['indices']for index_name, index_stats in indices.items():status = index_stats['status']logging.info(f"索引 {index_name} 状态: {status}")# 监控搜索性能
def monitor_search_performance():query = {'query': {'match_all': {}}}start_time = datetime.datetime.now()result = es.search(index='wangting_666', body=query)end_time = datetime.datetime.now()response_time = (end_time - start_time).total_seconds()logging.info(f"搜索查询性能 - 响应时间: {response_time} 秒, 搜索请求数量: {result['hits']['total']['value']}")# 监控索引性能
def monitor_indexing_performance():doc = {"name": "小米su7","age": 18,"email": "wang@xiaomi.com"}start_time = datetime.datetime.now()es.index(index='wangting_666', id="1", body=doc)end_time = datetime.datetime.now()response_time = (end_time - start_time).total_seconds()logging.info(f"索引性能 - 响应时间: {response_time} 秒")# 执行巡检任务
def run_daily_check():logging.info(f"开始每日巡检: {datetime.datetime.now()}...\n")check_elasticsearch_availability()logging.info("\n")check_node_info()logging.info("\n")check_cluster_health()logging.info("\n")check_index_status()logging.info("\n")monitor_search_performance()logging.info("\n")monitor_indexing_performance()logging.info("\n每日巡检完成.")# 主函数
if __name__ == "__main__":run_daily_check()

日志输出:

2024-03-30 15:11:26,799 [INFO] 开始每日巡检: 2024-03-30 15:11:26.799645...2024-03-30 15:11:26,934 [INFO] es http://wangting_host:9200 健康,可以访问
2024-03-30 15:11:26,934 [INFO] 2024-03-30 15:11:27,197 [INFO] es集群节点 node 个数: 3
2024-03-30 15:11:27,197 [INFO] 2024-03-30 15:11:27,450 [INFO] GET http://wangting_host:9200/_cluster/health [status:200 duration:0.254s]
2024-03-30 15:11:27,451 [INFO] 集群整体健康状态: green
2024-03-30 15:11:27,451 [INFO] 2024-03-30 15:11:27,696 [INFO] GET http://wangting_host:9200/_stats [status:200 duration:0.245s]
2024-03-30 15:11:27,696 [INFO] 索引 event_keystore 状态: open
2024-03-30 15:11:27,697 [INFO] 索引 knowledge_keystore 状态: open
2024-03-30 15:11:27,697 [INFO] 索引 test0330 状态: open
2024-03-30 15:11:27,697 [INFO] 索引 product_keystore 状态: open
2024-03-30 15:11:27,697 [INFO] 索引 storyline_base 状态: open
2024-03-30 15:11:27,697 [INFO] 索引 storyline_demo 状态: open
2024-03-30 15:11:27,697 [INFO] 索引 wangting_666 状态: open
2024-03-30 15:11:27,697 [INFO] 2024-03-30 15:11:27,876 [INFO] POST http://wangting_host:9200/wangting_666/_search [status:200 duration:0.179s]
2024-03-30 15:11:27,876 [INFO] 搜索查询性能 - 响应时间: 0.178881 秒, 搜索请求数量: 1
2024-03-30 15:11:27,876 [INFO] 2024-03-30 15:11:28,002 [INFO] PUT http://wangting_host:9200/wangting_666/_doc/1 [status:200 duration:0.126s]
2024-03-30 15:11:28,003 [INFO] 索引性能 - 响应时间: 0.127108 秒
2024-03-30 15:11:28,003 [INFO] 
每日巡检完成.

5. Python同步MySQL数据至ElasticSearch

MySQL样例数据准备:

create database wow;DROP TABLE IF EXISTS `wow_info`;
CREATE TABLE `wow_info`  (`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '角色id',`role` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '角色简称',`role_cn` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '角色类型',`role_pinyin` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '角色拼音',`zhuangbei` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '装备类型',`tianfu` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '天赋类型',PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 14 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;INSERT INTO `wow_info` VALUES (1, 'fs', '法师', 'fashi', '布甲', '冰法|火法|奥法');
INSERT INTO `wow_info` VALUES (2, 'ms', '牧师', 'mushi', '布甲', '神牧|戒律|暗牧');
INSERT INTO `wow_info` VALUES (3, 'ss', '术士', 'shushi', '布甲', '毁灭|痛苦|恶魔');
INSERT INTO `wow_info` VALUES (4, 'dz', '盗贼', 'daozei', '皮甲', '狂徒|刺杀|敏锐');
INSERT INTO `wow_info` VALUES (5, 'ws', '武僧', 'wuseng', '皮甲', '酒仙|踏风|织雾');
INSERT INTO `wow_info` VALUES (6, 'xd', '德鲁伊', 'xiaode', '皮甲', '恢复|平衡|野性|守护');
INSERT INTO `wow_info` VALUES (7, 'dh', '恶魔猎手', 'emolieshou', '皮甲', '复仇|浩劫');
INSERT INTO `wow_info` VALUES (8, 'lr', '猎人', 'lieren', '锁甲', '兽王|生存|射击');
INSERT INTO `wow_info` VALUES (9, 'sm', '萨满', 'saman', '锁甲', '恢复|增强|元素');
INSERT INTO `wow_info` VALUES (10, 'long', '龙人', 'longren', '锁甲', '湮灭|恩护|增辉');
INSERT INTO `wow_info` VALUES (11, 'dk', '死亡骑士', 'siwangqishi', '板甲', '鲜血|冰霜|邪恶');
INSERT INTO `wow_info` VALUES (12, 'zs', '战士', 'zhanshi', '板甲', '武器|狂暴|防护');
INSERT INTO `wow_info` VALUES (13, 'sq', '圣骑士', 'shengqi', '板甲', '神圣|防护|惩戒');

python同步脚本MySQL_to_ElasticSearch.py

#!/usr/bin/env python3
# -*- coding:utf-8 -*-import pymysql
from elasticsearch import Elasticsearch# 连接到 MySQL 数据库
conn = pymysql.connect(host='192.168.1.1', user='root', password='123456', database='wow')
cursor = conn.cursor()# 连接到 Elasticsearch
es = Elasticsearch(['http://wangting_host:9200'])# 创建 Elasticsearch 索引
es.indices.create(index='es_wow_info', ignore=400)# 查询 MySQL 数据表
cursor.execute('SELECT id, role,role_cn,role_pinyin,zhuangbei,tianfu FROM wow_info')
wow_info_data = cursor.fetchall()# 将数据同步到 Elasticsearch
for data in wow_info_data:doc = {'role': data[1],'role_cn': data[2],'role_pinyin': data[3],'zhuangbei': data[4],'tianfu': data[5]}es.index(index='es_wow_info', id=data[0], body=doc)# 关闭连接
conn.close()

验证结果:

6. Python同步Hive数据至ElasticSearch

Hive样例数据准备:

CREATE TABLE products (id INT,name STRING,price FLOAT,description STRING
);INSERT INTO products VALUES
(1, 'Product 1', 19.99, 'Description for Product 1'),
(2, 'Product 2', 29.99, 'Description for Product 2'),
(3, 'Product 3', 39.99, 'Description for Product 3');

python同步脚本Hive_to_ElasticSearch.py

from pyhive import hive
from elasticsearch import Elasticsearch
import pandas as pd
from datetime import datetime# 连接到 Hive 数据库
conn = hive.Connection(host='192.168.3.1', port=10000)
cursor = conn.cursor()# 连接到 Elasticsearch
es = Elasticsearch(['http://wangting_host:9200'])# 查询 Hive 数据表
cursor.execute('SELECT * FROM products')
data = cursor.fetchall()# 将查询结果转换为 Pandas DataFrame
columns = [desc[0] for desc in cursor.description]
df = pd.DataFrame(data, columns=columns)# 将数据转换为 Elasticsearch 所需的格式
docs = []
for index, row in df.iterrows():doc = {'id': row['id'],'name': row['name'],'price': float(row['price']),'description': row['description'],'@timestamp': datetime.now().isoformat()}docs.append(doc)# 将数据同步到 Elasticsearch
for doc in docs:es.index(index='es_products_index', body=doc)# 关闭连接
cursor.close()
conn.close()

7. ElasticSearch大量数据写入实现

Bigdata2ES.py脚本内容:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-import time
from elasticsearch import Elasticsearch
from elasticsearch import helperses = Elasticsearch(['http://wangting_host:9200'])def timer(func):def wrapper(*args, **kwargs):start = time.time()res = func(*args, **kwargs)print('共耗时约 {:.2f} 秒'.format(time.time() - start))return resreturn wrapperdef create_data():""" 写入数据 """for line in range(10):es.index(index='bigdata_insert', body={'title': line})@timer
def gen():action = ({"_index": "bigdata_insert","_source": {"title": i}} for i in range(1000000))helpers.bulk(es, action)if __name__ == '__main__':# create_data()gen()"""
Output:
共耗时约 215.88 秒
"""

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/782465.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Qt for WebAssembly 环境搭建 - Windows新手入门

Qt for WebAssembly 环境搭建 - Windows新手入门 一、所需工具软件1、安装Python2、安装Git2.1 注册Github账号2.2 下载安装Git2.2.1配置Git&#xff1a;2.2.2 配置Git环境2.2.3解决gitgithub.com: Permission denied (publickey) 3 安装em编译器 二、Qt配置编译器三、参考链接…

怎么让ChatGPT批量写作原创文章

随着人工智能技术的不断发展&#xff0c;自然语言处理模型在文本生成领域的应用也日益广泛。ChatGPT作为其中的佼佼者之一&#xff0c;凭借其强大的文本生成能力和智能对话特性&#xff0c;为用户提供了一种高效、便捷的批量产出内容的解决方案。以下将就ChatGPT批量写作内容进…

厦门攸信技术亮相新技术研讨会,展现物流自动化解决方案新高度!

今日&#xff0c;厦门攸信信息技术有限公司受邀参加了一场备受行业关注的电子制造高端盛会——一步步新技术研讨会&#xff0c;凭借卓越的智能制造与物流自动化技术在会议中大放异彩。作为一家引领行业发展的企业&#xff0c;厦门攸信技术不仅展示了其深厚的技术底蕴&#xff0…

算法系列--动态规划--背包问题(4)--完全背包拓展题目

&#x1f495;"这种低水平质量的攻击根本就不值得我躲&#xff01;"&#x1f495; 作者&#xff1a;Lvzi 文章主要内容&#xff1a;算法系列–动态规划–背包问题(4)–完全背包拓展题目 大家好,今天为大家带来的是算法系列--动态规划--背包问题(4)--完全背包拓展题目…

《web应用技术》第一次课后练习

上机任务&#xff08;利用好chatgpt&#xff0c;文心一言等工具。&#xff09;&#xff1a; 1、下载软件&#xff0c;并安装。相关安装文件已上传至群文件。 JDK,TOMCAT&#xff0c;IDEA 2、学会用记事本编写jsp文件&#xff0c;并放进tomcat的相关目录下&#xff0c;运行。 …

使用Windows自带服务(BitLocker)加密U盘

第一步&#xff1a;启用 BitLocker 服务 1.1快捷键&#xff1a;WinR 调出运行框&#xff0c;输入services.msc 1.2找到服务列表中的BitLocker Drive Encryption Service&#xff0c;启动此项 第二步&#xff1a;加密U盘 把你的U盘插入电脑&#xff0c;打开“我的电脑”&#…

EFCore的空迁移(EFCore操作已存在的数据库表,不影响其中的数据)

背景&#xff1a;EFCore默认的会自动创建数据表&#xff0c;但是有时又是DBFirst&#xff0c;数据库写好了要用现成的表。这个时候就需要进行一些特殊的操作了 1、写出跟要对接数据库的实体类 比如我的表是这样创建的 create table mail_test (user_id bigint auto_increment …

OSCP靶场--Twiggy

OSCP靶场–Twiggy 考点(CVE-2020-11651[RCE]) 1.nmap扫描 ## ┌──(root㉿kali)-[~/Desktop] └─# nmap 192.168.216.62 -sV -sC -Pn --min-rate 2500 -p- Starting Nmap 7.92 ( https://nmap.org ) at 2024-03-30 06:43 EDT Nmap scan report for 192.168.216.62 Host i…

基础拓扑排序

前言 拓扑排序是一种针对“有向无环图”的算法&#xff0c;用于解决一些有“依赖关系”的问题。 拓扑排序保证了当处理到某个电时&#xff0c;其所有的如电都已经处理过了。 例如右边这个图&#xff0c;拓扑序可以保证处理点2之前&#xff0c;点4和点6都处理过了、处理点3之…

IDEA的使用(概念,安装,配置,)以及什么是字符集,模版

目录 Intellij IDEA IDE的概念 IntelliJ IDEA的安装 IntelliJ IDEA的使用 基本配置 JDK配置 创建Module 基本用法 字体配置 主题配置 字符集 设置IDEA默认字符集 注释模板 字符集 字符集简介 常见字符集 Intellij IDEA 我们不可能一直使用记事本之类变成&#…

何恺明重提十年之争——模型表现好是源于能力提升还是捕获数据集偏置?

想象一下&#xff0c;如果把世界上所有的图片都找来&#xff0c;给它们放到一块巨大的空地上&#xff0c;其中内容相似的图片放得近一些&#xff0c;内容不相似的图片放得远一些&#xff08;类比向量嵌入&#xff09;。然后&#xff0c;我随机地向这片空地撒一把豆子&#xff0…

【C#】知识点速通

前言&#xff1a; 笔者是跟着哔站课程&#xff08;Trigger&#xff09;学习unity才去学习的C#&#xff0c;并且C语言功底尚存&#xff0c;所以只是简单地跟着课程将unity所用的C#语言的关键部分进行了了解&#xff0c;然后在后期unity学习过程中加以深度学习。如需完善的C#知识…

码支付个人支付宝永不掉线使用教程

​支付宝免CK添加操作稍微繁琐点&#xff0c;请耐心观看 此通道必须关闭你的余额宝自动转入功能&#xff0c;否则可能造成不跳转 支付宝添加的所有通道均支持H5免输入收款 第一步&#xff1a;打开支付宝开发平台&#xff0c;然后用你的支付宝注册登陆&#xff1a;https://op…

算法学习——LeetCode力扣动态规划篇5

算法学习——LeetCode力扣动态规划篇5 198. 打家劫舍 198. 打家劫舍 - 力扣&#xff08;LeetCode&#xff09; 描述 你是一个专业的小偷&#xff0c;计划偷窃沿街的房屋。每间房内都藏有一定的现金&#xff0c;影响你偷窃的唯一制约因素就是相邻的房屋装有相互连通的防盗系统…

python学习16:python中的布尔类型和条件语句的学习

python中的布尔类型和条件语句的学习 1.布尔&#xff08;bool&#xff09;类型的定义&#xff1a; 布尔类型的字面量&#xff1a;True表示真&#xff08;是、肯定&#xff09; False表示假&#xff08;否、否定&#xff09; True本质上是一个数字记作1&#xff0c;False记作0 …

遥感数字图像处理的学习笔记

相关链接&#xff1a; 遥感数字图像处理实验教程&#xff08;韦玉春&#xff09;--部分实验问题回答 目录 1.什么是图像&#xff0c;什么是数字图像&#xff1f; 2.什么是遥感数字图像&#xff1f;模拟图像(照片)与遥感数字图像有什么区别&#xff1f; 3.什么是遥感数字图像…

构建操作可靠的数据流系统

文章目录 前言数据流动遇到的困难先从简单开始可靠性延迟丢失 性能性能损失性能——分层重试 可扩展性总结 前言 在流式架构中&#xff0c;任何对非功能性需求的漏洞都可能导致严重后果。如果数据工程师没有将可伸缩性、可靠性和可操作性等非功能性需求作为首要考虑因素来构建…

智慧公厕的全域感知、全网协同、全业务融合和全场景智慧赋能

公共厕所是城市的重要组成部分&#xff0c;为市民提供基本的生活服务。然而&#xff0c;传统的公厕管理模式存在诸多问题&#xff0c;如排队等候时间长、卫生状况差、空气质量差等&#xff0c;严重影响市民的出行和生活质量。为了解决这些问题&#xff0c;智慧公厕应运而生&…

【Python基础教程】4 . 算法的空间复杂度

&#x1f388;个人主页&#xff1a;豌豆射手^ &#x1f389;欢迎 &#x1f44d;点赞✍评论⭐收藏 &#x1f917;收录专栏&#xff1a;python基础教程 &#x1f91d;希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出指正&#xff0c;让我们共同学习、…

最短路-Floyd Dijkstrea

Floyd算法 一种求解“多源最短路”问题的算法 在Floyd算法中&#xff0c;图一般用邻接矩阵存储&#xff0c;边权可正可负&#xff08;但不允许负环&#xff09;&#xff0c;利用动态规划的思想&#xff0c;逐步求解出任意两点之间的最短距离 int d[N][N],初始为无穷 d[i][j…