参考文章目录
elasticdump之shell备份脚本
前言
在企业实际生产环境中,避免不了要对es集群进行迁移、数据备份与恢复,以此来确保数据的可用性及完整性。因此,就涉及到了数据备份与恢复。本章主要以elasticdump+python为主,实现es集群索引备份、es集群索引恢复、两个网络互通的es集群直接迁移索引(通过nc命令判断网络是否互通)。其余备份和恢复方法见elasticsearch博客目录,总结来自于生产实战。
提示:以下是本篇文章正文内容,下面案例可供参考
一、脚本阅读注意事项
0、注意事项:a、python3.11版本b、es 6.8.13版本(其余版本es没有演练过)c、该脚本全程通过elasticdump插件进行索引的备份与恢复d、依赖的模块在requirements.txt文件中,在新机器中执行pip3 install -r requirements.txt 安装即可1、esdump_back.py脚本的作用:a、实现es集群索引备份b、实现es集群索引恢复c、实现两个网络互通的es集群直接迁移索引(通过nc命令判断网络是否互通)2、具体功能:a、将相关配置写入到json文件中,通过解析json文件拿到配置配置文件包含源(目的)es集群以下配置:用户名密码地址需要迁移的索引名称需要迁移的索引相关类型(例如索引的analyzer、mapping、data、settings...)存放备份好的索引数据目录日志文件b、有两种模式a、判断源es与目的es是否互通,如果通,则执行脚本时调用配置文件中pong配置b、如果不通,则在源机器上执行脚本时调用配置文件esdump_source配置,自行下载上传备份文件至目标机器后,在目标机器上执行脚本时调用配置文件esdump_dest配置c、支持迁移索引时,将迁移的索引名、大小保存到日志文件中3、执行步骤:a、源es与目的es网络互通python3 esdump_backup.py pongb、源es与目的es网络不通源es执行索引备份: python3 esdump_backup.py esdump_source目的es执行索引恢复: python3 esdump_backup.py esdump_dest4、日志文件:随着脚本的启动产生日志文件位置: 与脚本同级位置日志文件名称: esdump.log
二、模块文件的产生与安装
模块导出到requirements.txt文件pip freeze > requirements.txt
安装requirements.txt文件pip install -r requirements.txt
三、脚本依赖的配置文件详解
config.json
可以在配置文件中添加对应的配置,执行简单
{"pong" : { #两个网络互通的es集群直接迁移索引时,脚本的位置参数"source": {"source_index": ["xxx-search-test-v1"], #源索引名称(有几个,添加几个。格式["xx","xxx"])"source_type": ["analyzer","mapping","data"], #源索引依赖的类型(需要什么添加什么就行,例如分词、mapping、data、settings...)"endpoints": "xx.xx.xx.xx:9200", #源es地址"es_user": "xx", #源es用户"es_password": "xx" #源es密码},"dest": {"es_user": "xx", #目的es用户"es_password": "xx", #目的es密码"dest_type": ["analyzer","mapping","data"], #目的es索引依赖的类型(源es索引有什么类型,目的索引也必要有)"endpoints": "xxx.xxx.xxx.xx:9200" #目的es地址}},"esdump_source": { #源es执行索引备份时,脚本的位置参数"source_index": ["xxx-search-test-v1"],"source_type": ["analyzer","mapping","data"],"endpoints": "xxx.xxx.xxx.xx:9200","es_user": "xx","es_password": "xx","source_backupdir": "/export/backup/" #源es索引备份后存放的目录},"esdump_dest": { #目的es执行索引恢复时,脚本的位置参数"es_user": "xx","es_password": "xx","dest_type": ["analyzer","mapping","data"],"endpoints": "xxx.xxx.xxx.xx:9200","dest_backupdir": "/export/backup/" #源es执行索引备份后上传到目的服务器的目录位置}
}
四、python代码
代码如下(示例)esdump_back.py :
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#author: wxd
#date: 2024/04/13 20:08:13
import json
import os
import subprocess
import sys
import logging
import requests
from requests.auth import HTTPBasicAuth
from elasticsearch import Elasticsearch
from elasticsearch.client import IndicesClientelasdump_dir = "/usr/local/bin/elasticdump"
nc_dir="/usr/bin/nc"
# 配置日志记录
logging.basicConfig(filename='esdump.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# 连接es集群
def connect_es(value):url = "http://{}".format(value["endpoints"])es = Elasticsearch([url], http_auth=(value["es_user"], value["es_password"]),verify_certs=False)return es
# 检查并安装elasticdump工具
def check_and_install_tool():try:# 检查工具是否存在if not os.path.exists(elasdump_dir) and not os.path.exists(nc_dir):logging.info("elasticdump和nc 工具未找到,开始安装...")install_esdump = "yum -y install npm nodejs nc && npm config set registry https://registry.npmmirror.com/ && npm install elasticdump -g"install_result = subprocess.run(install_esdump, shell=True, capture_output=True, text=True)if install_result.returncode != 0:logging.error("安装 elasticdump、nc 工具失败: %s", install_result.stderr)return Falseelse:logging.info("elasticdump、nc 工具安装成功")return Trueelse:logging.info("elasticdump、nc 工具已安装")return Trueexcept Exception as e:logging.error("安装工具异常: %s", e)return False
#获取索引信息
def get_index_stats(endpoint, index_name,user,password):try:url = f"http://{endpoint}/{index_name}/_stats"response = requests.get(url,auth=HTTPBasicAuth(user, password))if response.status_code == 200 and response.status_code < 400:stats = response.json()#总文档数total_docs = stats["_all"]["primaries"]["docs"]["count"]#索引大小index_size = stats["_all"]["primaries"]["store"]["size_in_bytes"]return total_docs, index_sizeelse:logging.error(f"Failed to get stats for index {index_name}: {response.text}")return None, Noneexcept Exception as e:logging.error(f"获取索引信息异常: {e}")return None, None#索引备份操作
def esdump_back(value):# exist_ok=True 当设置为True时,如果目标目录已经存在,os.makedirs()函数不会引发错误,而是静默地忽略这种情况。如果设置为False(默认值),并且目标目录已经存在,那么会引发FileExistsError异常try:os.makedirs(value["source_backupdir"], exist_ok=True)# 执行备份脚本for item in value["source_index"]:for items in value["source_type"]:url = f'http://{value["es_user"]}:{value["es_password"]}@{value["endpoints"]}/{item}'dir=os.path.join(value["source_backupdir"],items)os.makedirs(dir,exist_ok=True)output_file = f'{dir}/{item}_{items}.json'commands = f'{elasdump_dir} --input {url} --output {output_file} --type={items} --limit=10000'result = subprocess.run(commands, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)if result.returncode == 0:logging.info(f"{items}备份命令 {commands} 执行成功")else:logging.error(f"{items}备份命令 {commands} 执行失败")logging.error(f'{items}命令失败原因是{result.stderr.decode("utf-8")}')except Exception as e:logging.error("备份索引异常: %s", e)return False#索引备份入口
def esdump_source(file):try:# 检查配置文件中是否存在 esdump_sourceif "esdump_source" not in file:logging.error("配置文件中未找到 'esdump_source' 键")return#判断es集群连接是否成功value = file["esdump_source"]es = connect_es(value)if not es.ping():logging.error("ES集群连接失败")return# 判断工具安装是否成功if not check_and_install_tool():logging.error("未检测到所需工具或安装失败")return#执行索引备份入口logging.info("开始备份~~~")esdump_back(value)for idx in value["source_index"]:if es.indices.exists(idx):# 获取索引统计信息total_docs, index_size = get_index_stats(value["endpoints"], idx,value["es_user"],value["es_password"])if total_docs is not None and index_size is not None:logging.info(f"索引 '{idx}' 的总文档数: {total_docs}")logging.info(f"索引 '{idx}' 的大小: {index_size} bytes")else:logging.error(f"索引 '{idx}' 不存在")except Exception as e:logging.error("开启索引备份异常: %s", e)return False#索引恢复操作
def esdump_restore(value):try:#判断是否已存在索引恢复目录if not os.path.exists(value["dest_backupdir"]):logging.error(f'恢复索引备份失败 {value["dest_backupdir"]} 不存在')return#执行恢复脚本url = f'http://{value["es_user"]}:{value["es_password"]}@{value["endpoints"]}'# 读取配置中的类型列表types = value["dest_type"]for type_idx in types:dir_path = os.path.join(value["dest_backupdir"], type_idx)for file in os.listdir(dir_path):if file.endswith(f"_{type_idx}.json"):index_name = file.split("_")[0]input_file = os.path.join(dir_path, file)output_url = f"{url}/{index_name}"command = [elasdump_dir, "--input", input_file, "--output", output_url, f"--type={type_idx}","--limits=10000"]result =subprocess.run(command,stdout=subprocess.PIPE, stderr=subprocess.PIPE)if result.returncode == 0:logging.info(f"{type_idx}恢复命令 {command} 执行成功")else:logging.error(f"{type_idx}备份命令 {command} 执行失败:")logging.error(f'{type_idx}命令失败原因是{result.stderr.decode("utf-8")}')total_docs, index_size = get_index_stats(value["endpoints"], index_name,value["es_user"],value["es_password"])if total_docs is not None and index_size is not None:logging.info(f"索引 '{index_name}' 的总文档数: {total_docs}")logging.info(f"索引 '{index_name}' 的大小: {index_size} bytes")except Exception as e:logging.error("索引恢复异常: %s", e)return False
#将本地备份拷贝到目标机器,恢复入口
def esdump_dest(file):try:# 检查配置文件中是否存在 esdump_destif "esdump_dest" not in file:logging.error("配置文件中未找到 'esdump_dest' 键")return#判断es集群连接是否成功value = file["esdump_dest"]es = connect_es(value)if not es.ping():logging.error("ES集群连接失败")return#判断工具安装是否成功if not check_and_install_tool():logging.error("未检测到所需工具或安装失败")return#索引恢复入口logging.info("开始恢复~~~")esdump_restore(value)except Exception as e:logging.error("恢复索引异常: %s", e)return False#两台服务器网络相通,走这个索引迁移函数
def migrate_index(source_url, dest_url, source_index, source_type,source_es_user,source_es_password,dest_es_user,dest_es_password):try:# 执行索引迁移的逻辑commands = f'{elasdump_dir} --input http://{source_es_user}:{source_es_password}@{source_url}/{source_index} \--output http://{dest_es_user}:{dest_es_password}@{dest_url}/{source_index} --type={source_type} --limit=10000'result = subprocess.run(commands, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)if result.returncode != 0:logging.error(f"索引: {source_index} 命令: {commands} 执行失败")logging.error(f'索引: {source_index} 命令失败原因是: {result.stderr.decode("utf-8")}')return Falselogging.info(f"索引: {source_index} 命令: {commands} 执行成功")return Trueexcept Exception as e:logging.error("migrate_index函数迁移索引异常: %s", e)return Falsedef pong(file):try:# 检查配置文件中是否存在 esdump_sourceif "pong" not in file:logging.error("配置文件中未找到 'pong' 键")return#判断es集群连接是否成功value = file["pong"]source_es = connect_es(value["source"])dest_es= connect_es(value["dest"])if not source_es.ping() or not dest_es.ping():logging.error("源ES或目的ES集群连接失败")return# 判断工具安装是否成功if not check_and_install_tool():logging.error("未检测到所需工具或安装失败")return# 判断源es和目的es网络是否互通nc_command=f'{nc_dir} -zv {value["dest"]["endpoints"].split(":")[0]} {value["dest"]["endpoints"].split(":")[1]}'result_nc = subprocess.run(nc_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)if result_nc.returncode != 0:logging.error(f"命令: {nc_command} 执行失败,网络不通")logging.error(f'命令失败返回的结果是: {result_nc.stderr.decode("utf-8")}')return#执行索引迁移for source_index in value["source"]["source_index"]:if source_es.indices.exists(source_index):# 获取源索引统计信息total_docs, index_size = get_index_stats(value["source"]["endpoints"],source_index,value["source"]["es_user"],value["source"]["es_password"])if total_docs is not None and index_size is not None:logging.info(f"源索引 '{source_index}' 的总文档数: {total_docs}")logging.info(f"源索引 '{source_index}' 的大小: {index_size} bytes")for source_type in value["source"]["source_type"]:success=migrate_index(value["source"]["endpoints"],value["dest"]["endpoints"],source_index,source_type,value["source"]["es_user"],value["source"]["es_password"],value["dest"]["es_user"],value["dest"]["es_password"])if success:logging.info("等待索引迁移完毕...")# 获取迁移后索引的信息total_docs, index_size = get_index_stats(value["dest"]["endpoints"],source_index,value["dest"]["es_user"],value["dest"]["es_password"])if total_docs is not None and index_size is not None:logging.info(f"目的索引 '{source_index}' 迁移后的总文档数: {total_docs}")logging.info(f"目的索引 '{source_index}' 迁移后的大小: {index_size} bytes")else:logging.error("索引迁移失败")except Exception as e:logging.error("不同集群间迁移索引异常: %s", e)return Falseif __name__ == "__main__":if len(sys.argv) != 2:print("Usage: python script.py pong|esdump_source|esdump_dest")else:with open("config.json", "r") as file:response_file = json.load(file)command = sys.argv[1]if command == "pong" and "pong" in response_file:pong(response_file)if command == "esdump_source" and "esdump_source" in response_file:esdump_source(response_file)if command == "esdump_dest" and "esdump_dest" in response_file:esdump_dest(response_file)
五、执行脚本示例
源es与目的es网络互通[root@python1 esdump]# python3 esdump_backup.py pong #此处的pong就是config.json文件中第一行指定的key[root@python1 esdump]# tail -f esdump.log2024-04-13 18:22:43,694 - INFO - HEAD http://xxx.xxx.xxx.xxx:9200/ [status:200 request:0.003s]2024-04-13 18:22:43,694 - INFO - elasticdump、nc 工具已安装2024-04-13 18:22:43,709 - INFO - HEAD http://xxx.xxx.xxx.xxx:9200/xxx-search-test-v1 [status:200 request:0.003s]2024-04-13 18:22:43,721 - INFO - 源索引 'xxx-search-test-v1' 的总文档数: 14372024-04-13 18:22:43,721 - INFO - 源索引 'xxx-search-test-v1' 的大小: 17846006 bytes2024-04-13 18:23:14,688 - INFO - 索引: xxx-search-test-v1 命令: /usr/local/bin/elasticdump --input http://xx:xx@xxx.xxx.xxx.xxx:9200/xxx-search-test-v1 --output http://xx:xx@xxx.xxx.xxx.xxx:9200/xxx-search-test-v1 --type=analyzer --limit=10000 执行成功2024-04-13 18:23:15,609 - INFO - 索引: xxx-search-test-v1 命令: /usr/local/bin/elasticdump --input http://xx:xx@xxx.xxx.xxx.xxx:9200/xxx-search-test-v1 --output http://xx:xx@xxx.xxx.xxx.xxx:9200/xxx-search-test-v1 --type=mapping --limit=10000 执行成功2024-04-13 18:23:25,345 - INFO - 索引: xxx-search-test-v1 命令: /usr/local/bin/elasticdump --input http://xx:xx@xxx.xxx.xxx.xxx:9200/xxx-search-test-v1 --output http://xx:xx@xxx.xxx.xxx.xxx:9200/xxx-search-test-v1 --type=data --limit=10000 执行成功2024-04-13 18:23:25,345 - INFO - 等待索引迁移完毕...2024-04-13 18:23:25,374 - INFO - 目的索引 'xxx-search-test-v1' 迁移后的总文档数: 14372024-04-13 18:23:25,374 - INFO - 目的索引 'xxx-search-test-v1' 迁移后的大小: 15417346 bytes
源es与目的es网络不通源es执行索引备份:[root@python1 esdump]# python3 esdump_backup.py esdump_source[root@python1 esdump]# tail -f esdump.log2024-04-13 18:24:25,542 - INFO - HEAD http://xxx.xxx.xxx.xxx:9200/ [status:200 request:0.009s]2024-04-13 18:24:25,542 - INFO - elasticdump、nc 工具已安装2024-04-13 18:24:25,542 - INFO - 开始备份~~~2024-04-13 18:24:26,319 - INFO - analyzer备份命令 /usr/local/bin/elasticdump --input http://xx:xx@xxx.xxx.xxx.xxx:9200/xxx-search-test-v1 --output /export/backup/analyzer/xxx-search-test-v1_analyzer.json --type=analyzer --limit=10000 执行成功2024-04-13 18:24:27,278 - INFO - mapping备份命令 /usr/local/bin/elasticdump --input http://xx:xx@xxx.xxx.xxx.xxx:9200/xxx-search-test-v1 --output /export/backup/mapping/xxx-search-test-v1_mapping.json --type=mapping --limit=10000 执行成功2024-04-13 18:24:28,613 - INFO - data备份命令 /usr/local/bin/elasticdump --input http://xx:xx@xxx.xxx.xxx.xxx:9200/xxx-search-test-v1 --output /export/backup/data/xxx-search-test-v1_data.json --type=data --limit=10000 执行成功2024-04-13 18:24:28,618 - INFO - HEAD http://xxx.xxx.xxx.xxx:9200/xxx-search-test-v1 [status:200 request:0.004s]2024-04-13 18:24:28,644 - INFO - 索引 'xxx-search-test-v1' 的总文档数: 14372024-04-13 18:24:28,645 - INFO - 索引 'xxx-search-test-v1' 的大小: 17846006 bytes
源es与目的es网络不通目的es执行索引恢复:[root@python1 esdump]# python3 esdump_backup.py esdump_dest[root@python1 esdump]# tail -f esdump.log2024-04-13 18:25:05,002 - INFO - HEAD http://xxx.xxx.xxx.xxx:9200/ [status:200 request:0.004s]2024-04-13 18:25:05,002 - INFO - elasticdump、nc 工具已安装2024-04-13 18:25:05,002 - INFO - 开始恢复~~~2024-04-13 18:25:36,020 - INFO - analyzer恢复命令 ['/usr/local/bin/elasticdump', '--input', '/export/backup/analyzer/xxx-search-test-v1_analyzer.json', '--output', 'http://xx:xx@xxx.xxx.xxx.xxx:9200/xxx-search-test-v1', '--type=analyzer', '--limits=10000'] 执行成功2024-04-13 18:25:36,728 - INFO - mapping恢复命令 ['/usr/local/bin/elasticdump', '--input', '/export/backup/mapping/xxx-search-test-v1_mapping.json', '--output', 'http://xx:xx@xxx.xxx.xxx.xxx:9200/xxx-search-test-v1', '--type=mapping', '--limits=10000'] 执行成功2024-04-13 18:25:57,440 - INFO - data恢复命令 ['/usr/local/bin/elasticdump', '--input', '/export/backup/data/xxx-search-test-v1_data.json', '--output', 'http://xx:xx@xxx.xxx.xxx.xxx:9200/huizhanyun-search-test-v1', '--type=data', '--limits=10000'] 执行成功2024-04-13 18:25:57,458 - INFO - 索引 'xxx-search-test-v1' 的总文档数: 14372024-04-13 18:25:57,458 - INFO - 索引 'xxx-search-test-v1' 的大小: 17456898 bytes
总结
以上就是今天的python脚本内容分享,因为最近刚好在学python,趁此机会强化一下自身的python基础,夯实自身.多了解一些python自动化运维常用的模块及json序列化,加油,奥里给!!!