此脚本可自动获取pve平台的信息。有兴趣或者有需要大家可以看看。
#@anthor:bbxwg
#@explain:pve平台自动获取名字、类型、节点、备注、状态。
#@Date:2024-3-29import os
import subprocess
import json
import re
from datetime import datetime#lkh:@获取虚拟机IP地址函数
def Get_Ip_Virtual(nodes_list, qemu_ids_list, output_filename):with open(output_filename, 'a') as output_file:for qemu_id in qemu_ids_list:command = f"pvesh get /nodes/{nodes_list}/qemu/{qemu_id}/agent/network-get-interfaces --output-format json"filtered_data_virtual = [] try:result = subprocess.check_output(command, shell=True, universal_newlines=True)parsed_result = json.loads(result) # 时间调试函数# current_time = datetime.now()# formatted_time = current_time.strftime('%Y-%m-%d %H:%M:%S')# result_message = f"Time = {formatted_time}, nodes = {nodes_list}, qemu_id = {qemu_id}"# output_file.write(result_message)# output_file.write("\n")for interface in parsed_result["result"]:if "ip-addresses" in interface:for ip_info in interface["ip-addresses"]:if ip_info["ip-address-type"] == "ipv4":filtered_data_virtual.append({"ip_address": ip_info["ip-address"],"qemu_id": qemu_id})elif "ip-address" in interface:if interface["ip-address-type"] == "ipv4":filtered_data_virtual.append({"ip_address": interface["ip-address"],"qemu_id": qemu_id})json_false = json.dumps(filtered_data_virtual, indent=4)desired_data = json.loads(json_false)output_file.write(json.dumps([desired_data[1]],indent=4))output_file.write("\n")except subprocess.CalledProcessError as e:# 时间调试函数# current_time = datetime.now()# formatted_time = current_time.strftime('%Y-%m-%d %H:%M:%S')# result_message = f"Time = {formatted_time}, nodes = {nodes_list}, qemu_id = {qemu_id}"# output_file.write(result_message)# output_file.write("\n")filtered_data_virtual.append({"ip_address": "0.0.0.0","qemu_id": qemu_id})output_file.write(json.dumps(filtered_data_virtual, indent=4))output_file.write("\n")# 调试语句# print(f"命令执行失败,错误信息:{e}")# error_message = f"Command execution failed for node={nodes_list}, qemu_id={qemu_id}. Error message: {e}"# output_file.write(error_message) # output_file.write("\n")
#lkh:@获取节点IP地址函数
def Get_Ip_Nodes(nodes_list, output_filename):with open(output_filename, 'a') as output_file:for node in nodes_list:command = f"pvesh get /nodes/{node}/hosts --output-format json"try:result = subprocess.check_output(command, shell=True, universal_newlines=True)parsed_result = json.loads(result)print(parsed_result)ipv4_pattern = re.compile(r'^(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})')data_lines = parsed_result["data"].split("\n") desired_data_objects = []for line in data_lines:parts = line.split()if len(parts) >= 2 and ipv4_pattern.match(parts[0]):hostname = parts[1].split('.')[0] desired_data_objects.append({"ip": parts[0], "hostname": hostname})json_false = json.dumps(desired_data_objects, indent=4)desired_data = json.loads(json_false)output_file.write(json.dumps([desired_data[1]], indent=4))output_file.write("\n") except subprocess.CalledProcessError as e:# 调试语句print(f"命令执行失败,错误信息:{e}")# output_file.write(e) # output_file.write('\n')# lkh:@合并字符串函数
def combin_json_data(Masterfile, slave_data):with open(Masterfile, 'r') as masterfile:master_data = masterfile.read()master_value = json.loads(master_data)for i, node in enumerate(master_value):if node["type"] == "node" or node["type"] == "qemu":data_json = next((info for info in slave_data if info["id"] == node["id"]), None)if data_json is not None:master_value[i]["remark"] = data_json["remark"]with open("/Python_GetInfo/Data.json",'w') as output:# 时间调试函数current_time = datetime.now()formatted_time = current_time.strftime('%Y-%m-%d %H:%M:%S')result_message = f"Time = {formatted_time}"output.write(result_message)output.write("\n")output.write(json.dumps(master_value, indent=4))# lkh:@获取虚拟机和节点的运行状态、类型函数
def Node_and_virtual_status_type(output_filename):command = "pvesh get /cluster/resources --output-format json"try:result_bytes = subprocess.check_output(command, shell=True, text=True)resources_list = json.loads(result_bytes)if isinstance(resources_list, list):filtered_data = [{"id": r.get("id"), "type": r.get("type"), "status": r.get("status"), "node": r.get("node")}for r in resources_list if all(k in r for k in ("id", "type", "status", "node")) and r.get("type") != "storage" and r.get("type") != "sdn"]else: filtered_data = {"id": resources_list.get("id"), "type": resources_list.get("type"), "status": resources_list.get("status"), "node": resources_list.get("node")}if all(k in filtered_data for k in ("id", "type", "status", "node")):filtered_data = [filtered_data]with open(output_filename, 'w') as output_file:if isinstance(filtered_data, list) and filtered_data:output_file.write(json.dumps(filtered_data, indent=4))except subprocess.CalledProcessError as e:print(f"命令执行失败,错误信息:{e}") #lkh:@获取节点备注信息函数
def get_and_concatenate_files(dir_list, target_filename, output_filename):with open(output_filename, 'a') as output_file:for src_dir in dir_list:filepath = os.path.join(src_dir, target_filename)node_identifier = src_dir.split('/')[-1]new_filename = f"node/{node_identifier}"command = f"cat {filepath}"result_bytes = subprocess.check_output(command, shell=True, text=True)dic_list = {"id":new_filename, "remark": result_bytes.strip()}output_file.write(json.dumps(dic_list))output_file.write('\n')#lkh:@获取虚拟机备注信息函数
def get_and_concatenate_files_virtual(dir_list, output_filename):if os.path.exists(output_filename):os.remove(output_filename)for src_dir in dir_list:for filename in os.listdir(src_dir):if filename.endswith('.conf'):with open(os.path.join(src_dir, filename), 'r') as src_file:with open(output_filename, 'a') as output:content = src_file.read()lines = content.split('\n')new_filename = "qemu/" + filename[:-5]if lines:first_line = lines[0].strip()if first_line.startswith('#'): json_data = {"id":new_filename, "remark": first_line.lstrip('#').strip()}output.write(json.dumps(json_data, ensure_ascii=False))output.write('\n')else:error_message = f"virtual: {filename} no remarks"json_data_error = {"id":new_filename, "remark": error_message}output.write(json.dumps(json_data_error, ensure_ascii=False))output.write('\n')#lkh:@将备注格式转换字典型
def get_json_remarks(input_file):with open(input_file, 'r') as file:data = file.readlines()# 将每一行的JSON对象转换为Python字典,并添加到列表中json_objects = [json.loads(line) for line in data]# 将列表转换为JSON数组formatted_json = json.dumps(json_objects, ensure_ascii=False)# 写入新文件,注意此处为了显示效果追加了换行和逗号,实际JSON数组不应包含换行和最后一个逗号formatted_json_with_formatting = "[\n{}\n]".format(",\n".join(map(json.dumps, json_objects)))return formatted_json_with_formatting# 获取/etc/pve/nodes/目录下的所有条目(包括文件和目录)
entries_IP = os.listdir('/etc/pve/nodes/')# 筛选出只有目录的列表
directories_only_IP = [d for d in entries_IP if os.path.isdir(os.path.join('/etc/pve/nodes/', d))]# 虚拟机IP信息
all_file_names = []
directories_IP = '/etc/pve/nodes/'
tail_IP = '/qemu-server/'
output_file_ip = '/Python_GetInfo/Ip.json'output_file_node_and_virtual = '/Python_GetInfo/Node_and_virtual_status_type.json'# 节点信息和文件和输出文件
directories = '/etc/pve/nodes/'
target_file_node = 'config'
output_file_remark = '/Python_GetInfo/Remarks.txt'# 虚拟机信息和文件和输出文件
Virture_directories = '/etc/pve/nodes/'
tail = '/qemu-server'# 获取/etc/pve/nodes/目录下的所有条目(包括文件和目录)
entries = os.listdir('/etc/pve/nodes/')# 筛选出只有目录的列表
directories_only = [d for d in entries if os.path.isdir(os.path.join('/etc/pve/nodes/', d))]node_list = [directories + item for item in directories_only]
virtual_list = [Virture_directories + item + tail for item in directories_only]Node_and_virtual_status_type(output_file_node_and_virtual)#lkh:@获取备注函数get_and_concatenate_files_virtual(virtual_list, output_file_remark)
get_and_concatenate_files(node_list, target_file_node, output_file_remark)formatted_json_with_formatting = get_json_remarks(output_file_remark)
formatted_json_list = json.loads(formatted_json_with_formatting.replace('\n', ''))#lkh:@合并data函数
combin_json_data(output_file_node_and_virtual, formatted_json_list)#lkh:@获取Ip函数
#Get_Ip_Nodes(directories_only_IP,output_file_ip)
for file_path in directories_only_IP:node_list = directories_IP + file_path + tail_IPif os.path.isdir(node_list):file_path_ip = [os.path.splitext(f)[0] for f in os.listdir(node_list) if os.path.isfile(os.path.join(node_list, f)) and f.endswith('.conf')] #首先判断join node_list和f的路径是否是一个文件并且是否以.conf结尾,如果是那么就去掉文件的结尾并且存到列表里 all_file_names.extend(file_path_ip)print(all_file_names)#Get_Ip_Virtual(file_path, all_file_names, output_file_ip)all_file_names.clear()