对于大量数据,可以使用 Elasticsearch 的 scroll API 来分批次地读取数据,以避免一次性读取所有数据造成的内存负担。这段代码使用滚动查询(scroll)来分批次地读取数据。首先,它发送初始的搜索请求,并获取第一批数据。然后,使用滚动查询持续获取剩余的数据,直到所有数据都被读取完毕。最后,记得清除滚动查询的状态,释放相关资源。以下是使用 scroll API 的示例代码:
from elasticsearch import Elasticsearchdef scroll_query(es, index_name, query, scroll_size=1000, scroll_time='1m'):"""执行滚动查询,逐批获取数据并处理结果Args:- es: Elasticsearch 实例- index_name: 要查询的索引名称- query: 查询语句- scroll_size: 每次滚动查询获取的文档数量,默认为 1000- scroll_time: 滚动查询的保持时间,默认为 '1m'Returns:- None"""# 初始化滚动查询result = es.search(index=index_name, body=query, size=scroll_size, scroll=scroll_time)scroll_id = result['_scroll_id']total_docs = result['hits']['total']['value']# 处理第一批数据hits = result['hits']['hits']for hit in hits:source_data = hit['_source']print(source_data) # 处理你的数据,这里简单打印出来作为示例# 继续滚动查询获取剩余数据while len(hits) > 0:result = es.scroll(scroll_id=scroll_id, scroll=scroll_time)hits = result['hits']['hits']total_docs += result['hits']['total']['value']for hit in hits:source_data = hit['_source']print(source_data) # 处理你的数据,这里简单打印出来作为示例print(total_docs) # # 数据总数# 清除滚动查询es.clear_scroll(scroll_id=scroll_id)# 连接到Elasticsearch集群
es = Elasticsearch([{'host': 'localhost', 'port': 9200}])# 确保ES实例可用
if es.ping():print("Connected to Elasticsearch")
else:print("Could not connect to Elasticsearch")# 定义查询语句
query = {"query": {"match_all": {} # 匹配所有文档}
}# 调用滚动查询函数
scroll_query(es, 'your_index_name', query, scroll_size=1000, scroll_time='1m')
在这个函数中,我们将滚动查询的逻辑封装在 scroll_query 函数中,可以通过传入 Elasticsearch 实例、索引名称、查询语句以及其他参数来执行滚动查询。你可以根据需要调整 scroll_size 和 scroll_time 参数来控制每次查询的文档数量和滚动查询的保持时间。调用该函数后,会依次输出查询到的数据。