from kazoo.client import KazooClient, KazooState
from kazoo.exceptions import NoNodeError,NodeExistsError,NotEmptyError
import json
zk = KazooClient(hosts='127.0.0.1:2181')
zk.start()
path = "/main_node/sub_node"
def ensure_path_exists(zk, path):parts = path.strip('/').split('/')for i in range(1, len(parts)):current_path = '/'.join(parts[:i])try:zk.get(current_path)except NoNodeError:zk.create(current_path, b"")def create_node(zk, path, data):"""创建一个 ZooKeeper 节点。:param zk: KazooClient 实例:param path: 节点路径:param data: 节点数据"""try:ensure_path_exists(zk, path)zk.create(path, data)print(f"Node {path} created successfully with data: {data.decode('utf-8')}")except NodeExistsError:print(f"Node {path} already exists.")def update_node(zk, path, data):"""更新一个 ZooKeeper 节点的数据。:param zk: KazooClient 实例:param path: 节点路径:param data: 新的节点数据"""try:zk.set(path, data)print(f"Node {path} updated successfully with new data: {data.decode('utf-8')}")except NoNodeError:print(f"Node {path} does not exist. Cannot update non-existing node.")def delete_node(zk, path, recursive=False):"""删除一个 ZooKeeper 节点。:param zk: KazooClient 实例:param path: 节点路径:param recursive: 是否递归删除节点及其所有子节点"""try:zk.delete(path, recursive=recursive)print(f"Node {path} deleted successfully")except NoNodeError:print(f"Node {path} does not exist.")except NotEmptyError:print(f"Node {path} is not empty. Use recursive=True to delete it and its children.")except Exception as e:print(f"An error occurred: {e}")def read_node(zk, path):"""读取一个 ZooKeeper 节点的数据。:param zk: KazooClient 实例:param path: 节点路径"""try:data, stat = zk.get(path)print(f"Node {path} data: {data.decode('utf-8')}")return data.decode('utf-8'), statexcept NoNodeError:print(f"Node {path} does not exist.")return None, Nonedef write_json_to_node(zk, path, data):"""将 JSON 数据写入 ZooKeeper 节点。:param zk: KazooClient 实例:param path: 节点路径:param data: 要写入的字典或对象"""try:json_data = json.dumps(data)if not zk.exists(path):zk.create(path, json_data.encode('utf-8'))print(f"Node {path} created and JSON data written.")else:zk.set(path, json_data.encode('utf-8'))print(f"JSON data written to node {path}")except TypeError:print("Provided data is not serializable to JSON.")except NodeExistsError:print(f"Node {path} already exists, but was expected to be created.")def read_json_from_node(zk, path):"""从 ZooKeeper 节点读取 JSON 数据。:param zk: KazooClient 实例:param path: 节点路径:return: 反序列化后的 JSON 数据对象"""try:data, stat = zk.get(path)json_data = json.loads(data.decode('utf-8'))print(f"JSON data read from node {path}: {json_data}")return json_dataexcept NoNodeError:print(f"Node {path} does not exist.")return Noneexcept json.JSONDecodeError:print("Failed to decode JSON from node data.")return None