python nacos-sdk-python 连接 nacos2.x版本,鉴权403解决办法

看nacos-sdk-python 的git项目提交记录,应该是已经解决了nacos2.x权限问题,但为什么还连接不上呢?因为最新代码,居然把以前鉴权代码删除了,具体原因不得而知。

解决办法:
1.把nacos-sdk-python里面params.py的代码,替换成下面的代码:

# VALID_CHAR = set(['_', '-', '.', ':'])
VALID_CHAR = {'_', '-', '.', ':'}
PARAM_KEYS = ["data_id", "group"]
DEFAULT_GROUP_NAME = "DEFAULT_GROUP"def is_valid(param):if not param:return Falsefor i in param:if i.isalpha() or i.isdigit() or i in VALID_CHAR:continuereturn Falsereturn Truedef check_params(params):for p in PARAM_KEYS:if p in params and not is_valid(params[p]):return Falsereturn Truedef group_key(data_id, group, namespace):return "+".join([data_id, group, namespace])def parse_key(key):sp = key.split("+")return sp[0], sp[1], sp[2]

2.然后同样的操作,把nacos-sdk-python里面client.py的代码,替换成下面的代码:
在这里插入图片描述

# -*- coding=utf-8 -*-
import base64
import functools
import hashlib
import logging
import os
import socket
import json
import platform
import threading
import time
import hmacimport nacos.client
from urllib3 import HTTPResponse
try:import ssl
except ImportError:ssl = Nonefrom multiprocessing import Process, Manager, Queue, pool
from threading import RLock, Threadtry:# python3.6from http import HTTPStatusfrom urllib.request import Request, urlopen, ProxyHandler, HTTPSHandler, build_openerfrom urllib.parse import urlencode, unquote_plus, quotefrom urllib.error import HTTPError, URLError
except ImportError:# python2.7import httplib as HTTPStatusfrom urllib2 import Request, urlopen, HTTPError, URLError, ProxyHandler, HTTPSHandler, build_openerfrom urllib import urlencode, unquote_plus, quotebase64.encodebytes = base64.encodestringfrom .commons import synchronized_with_attr, truncate, python_version_bellow
from .params import group_key, parse_key, is_valid
from .files import read_file_str, save_file, delete_file
from .exception import NacosException, NacosRequestException
from .listener import Event, SimpleListenerManager
from .timer import NacosTimer, NacosTimerManagerlogging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)DEBUG = False
VERSION = "0.1.14"DEFAULT_GROUP_NAME = "DEFAULT_GROUP"
DEFAULT_NAMESPACE = ""
ADDRESS_SERVER_TIMEOUT = 3
WORD_SEPARATOR = u'\x02'
LINE_SEPARATOR = u'\x01'DEFAULTS = {"APP_NAME": "Nacos-SDK-Python","TIMEOUT": 3,  # in seconds"PULLING_TIMEOUT": 30,  # in seconds"PULLING_CONFIG_SIZE": 3000,"CALLBACK_THREAD_NUM": 10,"FAILOVER_BASE": "nacos-data/data","SNAPSHOT_BASE": "nacos-data/snapshot",
}OPTIONS = {"default_timeout", "pulling_timeout", "pulling_config_size", "callback_thread_num", "failover_base","snapshot_base", "no_snapshot", "proxies"}def process_common_config_params(data_id, group):if not group or not group.strip():group = DEFAULT_GROUP_NAMEelse:group = group.strip()if not data_id or not is_valid(data_id):raise NacosException("Invalid dataId.")if not is_valid(group):raise NacosException("Invalid group.")return data_id, groupdef parse_pulling_result(result):if not result:return list()ret = list()for i in unquote_plus(result.decode()).split(LINE_SEPARATOR):if not i.strip():continuesp = i.split(WORD_SEPARATOR)if len(sp) < 3:sp.append("")ret.append(sp)return retclass WatcherWrap:def __init__(self, key, callback, last_md5=None):self.callback = callbackself.last_md5 = last_md5self.watch_key = keyclass CacheData:def __init__(self, key, client):self.key = keylocal_value = read_file_str(client.failover_base, key) or read_file_str(client.snapshot_base, key)self.content = local_valueself.md5 = hashlib.md5(local_value.encode("UTF-8")).hexdigest() if local_value else Noneself.is_init = Trueif not self.md5:logger.info("[init-cache] cache for %s does not have local value" % key)class SubscribedLocalInstance(object):def __init__(self, key, instance):self.key = keyself.instance_id = instance["instanceId"]self.md5 = NacosClient.get_md5(str(instance))self.instance = instanceclass SubscribedLocalManager(object):def __init__(self):self.manager = {# "key1": {#     "LOCAL_INSTANCES": {#         "instanceId1": None,#         "instanceId2": None,#         "instanceId3": None,#         "instanceId4": None#     },#     "LISTENER_MANAGER": None# },# "key2": {#     "LOCAL_INSTANCES": {#         "instanceId1": "",#         "instanceId2": "",#         "instanceId3": "",#         "instanceId4": ""#     },#     "LISTENER_MANAGER": None# }}def do_listener_launch(self, key, event, slc):listener_manager = self.get_local_listener_manager(key)if listener_manager and isinstance(listener_manager, SimpleListenerManager):listener_manager.do_launch(event, slc)def get_local_listener_manager(self, key):key_node = self.manager.get(key)if not key_node:return Nonereturn key_node.get("LISTENER_MANAGER")def add_local_listener(self, key, listener_fn):if not self.manager.get(key):self.manager[key] = {}local_listener_manager = self.manager.get(key).get("LISTENER_MANAGER")if not local_listener_manager or not isinstance(local_listener_manager, SimpleListenerManager):self.manager.get(key)["LISTENER_MANAGER"] = SimpleListenerManager()local_listener_manager = self.manager.get(key).get("LISTENER_MANAGER")if not local_listener_manager:return selfif isinstance(listener_fn, list):listener_fn = tuple(listener_fn)local_listener_manager.add_listeners(*listener_fn)if isinstance(listener_fn, tuple):local_listener_manager.add_listeners(*listener_fn)#  just single listener functionelse:local_listener_manager.add_listener(listener_fn)return selfdef add_local_listener_manager(self, key, listener_manager):key_node = self.manager.get(key)if key_node is None:key_node = {}key_node["LISTENER_MANAGER"] = listener_managerreturn selfdef get_local_instances(self, key):if not self.manager.get(key):return Nonereturn self.manager.get(key).get("LOCAL_INSTANCES")def add_local_instance(self, slc):if not self.manager.get(slc.key):self.manager[slc.key] = {}if not self.manager.get(slc.key).get('LOCAL_INSTANCES'):self.manager.get(slc.key)['LOCAL_INSTANCES'] = {}self.manager.get(slc.key)['LOCAL_INSTANCES'][slc.instance_id] = slcreturn selfdef remove_local_instance(self, slc):key_node = self.manager.get(slc.key)if not key_node:return selflocal_instances_node = key_node.get("LOCAL_INSTANCES")if not local_instances_node:return selflocal_instance = local_instances_node.get(slc.instance_id)if not local_instance:return selflocal_instances_node.pop(slc.instance_id)return selfdef parse_nacos_server_addr(server_addr):sp = server_addr.split(":")if len(sp) == 3:return sp[0] + ":" + sp[1], int(sp[2])else:port = int(sp[1]) if len(sp) > 1 else 8848return sp[0], portclass NacosClient:debug = False@staticmethoddef set_debugging():if not NacosClient.debug:global loggerlogger = logging.getLogger("nacos")handler = logging.StreamHandler()handler.setFormatter(logging.Formatter("%(asctime)s %(levelname)s %(name)s:%(message)s"))logger.addHandler(handler)logger.setLevel(logging.DEBUG)NacosClient.debug = True@staticmethoddef get_md5(content):return hashlib.md5(content.encode("UTF-8")).hexdigest() if content is not None else Nonedef get_server_from_url(self, url):server_list_content = urlopen(url, timeout=ADDRESS_SERVER_TIMEOUT).read()default_port = 8848server_list_temp = list()if server_list_content:for server_info in server_list_content.decode().strip().split("\n"):sp = server_info.strip().split(":")if len(sp) == 1:# endpoint中没有指定portserver_list_temp.append((sp[0], default_port))else:try:port = sp.strip().split("/")[0]server_list_temp.append((sp[0], int(port)))except ValueError:logger.warning("[get-server-list] bad server address:%s ignored" % server_info)if (self.server_list != server_list_temp):self.server_list = server_list_tempreturn server_list_tempdef get_server_from_url_task(self, url):while (True):try:time.sleep(10)self.get_server_from_url(url)except Exception as ex:logger.exception("get_server_from_url_task %s" % ex)def initLog(self, logDir):if logDir is None or logDir.strip() == "":logDir = os.path.expanduser("~") + "/logs/nacos/"if not logDir.endswith(os.path.sep):logDir += os.path.sepif not os.path.exists(logDir):os.makedirs(logDir)logPath = logDir + 'nacos-client-python.log'file_handler = logging.FileHandler(logPath)if nacos.NacosClient.debug:file_handler.setLevel(logging.DEBUG)else:file_handler.setLevel(logging.INFO)formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')file_handler.setFormatter(formatter)logger.addHandler(file_handler)def __init__(self, server_addresses=None, endpoint=None, namespace=None, ak=None,sk=None, username=None, password=None, logDir=None):self.server_list = list()self.initLog(logDir)try:if server_addresses is not None and server_addresses.strip() != "":for server_addr in server_addresses.strip().split(","):self.server_list.append(parse_nacos_server_addr(server_addr.strip()))logger.info("user server address  " + server_addresses)elif endpoint is not None and endpoint.strip() != "":url = endpoint.strip()if ("?" not in endpoint):url = url + "?namespace=" + namespaceelse:url = url + "&namespace=" + namespacelogger.info("address server url " + url)self.get_server_from_url(url)partial_task_function = functools.partial(self.get_server_from_url_task,url)thread = threading.Thread(target=partial_task_function)thread.daemon = Truethread.start()else:logger.exception("[init] server address & endpoint must not both none")raise ValueError('server address & endpoint must not both none')except Exception as ex:logger.exception("[init] bad server address for %s" % server_addresses)raise exself.current_server = self.server_list[0]self.endpoint = endpointself.namespace = namespace or DEFAULT_NAMESPACE or ""self.ak = akself.sk = skself.username = usernameself.password = passwordself.server_list_lock = RLock()self.server_offset = 0self.watcher_mapping = dict()self.subscribed_local_manager = SubscribedLocalManager()self.subscribe_timer_manager = NacosTimerManager()self.pulling_lock = RLock()self.puller_mapping = Noneself.notify_queue = Noneself.callback_tread_pool = Noneself.process_mgr = Noneself.default_timeout = DEFAULTS["TIMEOUT"]self.auth_enabled = self.ak and self.skself.cai_enabled = Trueself.pulling_timeout = DEFAULTS["PULLING_TIMEOUT"]self.pulling_config_size = DEFAULTS["PULLING_CONFIG_SIZE"]self.callback_thread_num = DEFAULTS["CALLBACK_THREAD_NUM"]self.failover_base = DEFAULTS["FAILOVER_BASE"]self.snapshot_base = DEFAULTS["SNAPSHOT_BASE"]self.no_snapshot = Falseself.proxies = Noneself.logDir = logDirlogger.info("[client-init] endpoint:%s, tenant:%s" % (endpoint, namespace))def set_options(self, **kwargs):for k, v in kwargs.items():if k not in OPTIONS:logger.warning("[set_options] unknown option:%s, ignored" % k)continuelogger.debug("[set_options] key:%s, value:%s" % (k, v))setattr(self, k, v)def change_server(self):with self.server_list_lock:self.server_offset = (self.server_offset + 1) % len(self.server_list)self.current_server = self.server_list[self.server_offset]def get_server(self):logger.debug("[get-server] use server:%s" % str(self.current_server))return self.current_serverdef remove_config(self, data_id, group, timeout=None):data_id, group = process_common_config_params(data_id, group)logger.info("[remove] data_id:%s, group:%s, namespace:%s, timeout:%s" % (data_id, group, self.namespace, timeout))params = {"dataId": data_id,"group": group,}if self.namespace:params["tenant"] = self.namespacetry:resp = self._do_sync_req("/nacos/v1/cs/configs", None, None, params,timeout or self.default_timeout, "DELETE")c = resp.read()logger.info("[remove] remove group:%s, data_id:%s, server response:%s" % (group, data_id, c))return c == b"true"except HTTPError as e:if e.code == HTTPStatus.FORBIDDEN:logger.error("[remove] no right for namespace:%s, group:%s, data_id:%s" % (self.namespace, group, data_id))raise NacosException("Insufficient privilege.")else:logger.error("[remove] error code [:%s] for namespace:%s, group:%s, data_id:%s" % (e.code, self.namespace, group, data_id))raise NacosException("Request Error, code is %s" % e.code)except Exception as e:logger.exception("[remove] exception %s occur" % str(e))raisedef publish_config(self, data_id, group, content, app_name=None, config_type=None, timeout=None):if content is None:raise NacosException("Can not publish none content, use remove instead.")data_id, group = process_common_config_params(data_id, group)if type(content) == bytes:content = content.decode("UTF-8")logger.info("[publish] data_id:%s, group:%s, namespace:%s, content:%s, timeout:%s" % (data_id, group, self.namespace, truncate(content), timeout))params = {"dataId": data_id,"group": group,"content": content.encode("UTF-8"),}if self.namespace:params["tenant"] = self.namespaceif app_name:params["appName"] = app_nameif config_type:params["type"] = config_typetry:resp = self._do_sync_req("/nacos/v1/cs/configs", None, None, params,timeout or self.default_timeout, "POST")c = resp.read()logger.info("[publish] publish content, group:%s, data_id:%s, server response:%s" % (group, data_id, c))return c == b"true"except HTTPError as e:if e.code == HTTPStatus.FORBIDDEN:logger.info("[publish] publish content fail result code :403, group:%s, data_id:%s" % (group, data_id))raise NacosException("Insufficient privilege.")else:raise NacosException("Request Error, code is %s" % e.code)except Exception as e:logger.exception("[publish] exception %s occur" % str(e))raisedef get_config(self, data_id, group, timeout=None, no_snapshot=None):no_snapshot = self.no_snapshot if no_snapshot is None else no_snapshotdata_id, group = process_common_config_params(data_id, group)logger.debug("[get-config] data_id:%s, group:%s, namespace:%s, timeout:%s" % (data_id, group, self.namespace, timeout))params = {"dataId": data_id,"group": group,}if self.namespace:params["tenant"] = self.namespacecache_key = group_key(data_id, group, self.namespace)# get from failovercontent = read_file_str(self.failover_base, cache_key)if content is None:logger.debug("[get-config] failover config is not exist for %s, try to get from server" % cache_key)else:logger.debug("[get-config] get %s from failover directory, content is %s" % (cache_key, truncate(content)))return content# get from servertry:resp = self._do_sync_req("/nacos/v1/cs/configs", None, params, None, timeout or self.default_timeout)content = resp.read().decode("UTF-8")except HTTPError as e:if e.code == HTTPStatus.NOT_FOUND:logger.warning("[get-config] config not found for data_id:%s, group:%s, namespace:%s, try to delete snapshot" % (data_id, group, self.namespace))delete_file(self.snapshot_base, cache_key)return Noneelif e.code == HTTPStatus.CONFLICT:logger.error("[get-config] config being modified concurrently for data_id:%s, group:%s, namespace:%s" % (data_id, group, self.namespace))elif e.code == HTTPStatus.FORBIDDEN:logger.error("[get-config] no right for data_id:%s, group:%s, namespace:%s" % (data_id, group, self.namespace))raise NacosException("Insufficient privilege.")else:logger.error("[get-config] error code [:%s] for data_id:%s, group:%s, namespace:%s" % (e.code, data_id, group, self.namespace))if no_snapshot:raiseexcept Exception as e:logger.exception("[get-config] exception %s occur" % str(e))if no_snapshot:raiseif no_snapshot:return contentif content is not None:logger.debug("[get-config] content from server:%s, data_id:%s, group:%s, namespace:%s, try to save snapshot" % (truncate(content), data_id, group, self.namespace))try:save_file(self.snapshot_base, cache_key, content)except Exception as e:logger.exception("[get-config] save snapshot failed for %s, data_id:%s, group:%s, namespace:%s" % (data_id, group, self.namespace, str(e)))return contentlogger.info("[get-config] get config from server failed, try snapshot, data_id:%s, group:%s, namespace:%s" % (data_id, group, self.namespace))content = read_file_str(self.snapshot_base, cache_key)if content is None:logger.info("[get-config] snapshot is not exist for %s." % cache_key)else:logger.info("[get-config] get %s from snapshot directory, content is %s" % (cache_key, truncate(content)))return contentdef get_configs(self, timeout=None, no_snapshot=None, group="", page_no=1, page_size=1000):no_snapshot = self.no_snapshot if no_snapshot is None else no_snapshotlogger.info("[get-configs] namespace:%s, timeout:%s, group:%s, page_no:%s, page_size:%s" % (self.namespace, timeout, group, page_no, page_size))params = {"dataId": "","group": group,"search": "accurate","pageNo": page_no,"pageSize": page_size,}if self.namespace:params["tenant"] = self.namespacecache_key = group_key("", "", self.namespace)# get from failovercontent = read_file_str(self.failover_base, cache_key)if content is None:logger.debug("[get-config] failover config is not exist for %s, try to get from server" % cache_key)else:logger.debug("[get-config] get %s from failover directory, content is %s" % (cache_key, truncate(content)))return json.loads(content)# get from servertry:resp = self._do_sync_req("/nacos/v1/cs/configs", None, params, None, timeout or self.default_timeout)content = resp.read().decode("UTF-8")except HTTPError as e:if e.code == HTTPStatus.CONFLICT:logger.error("[get-configs] configs being modified concurrently for namespace:%s" % self.namespace)elif e.code == HTTPStatus.FORBIDDEN:logger.error("[get-configs] no right for namespace:%s" % self.namespace)raise NacosException("Insufficient privilege.")else:logger.error("[get-configs] error code [:%s] for namespace:%s" % (e.code, self.namespace))if no_snapshot:raiseexcept Exception as e:logger.exception("[get-config] exception %s occur" % str(e))if no_snapshot:raiseif no_snapshot:return json.loads(content)if content is not None:logger.info("[get-configs] content from server:%s, namespace:%s, try to save snapshot" % (truncate(content), self.namespace))try:save_file(self.snapshot_base, cache_key, content)for item in json.loads(content).get("pageItems"):data_id = item.get('dataId')group = item.get('group')item_content = item.get('content')item_cache_key = group_key(data_id, group, self.namespace)save_file(self.snapshot_base, item_cache_key, item_content)except Exception as e:logger.exception("[get-configs] save snapshot failed for %s, namespace:%s" % (str(e), self.namespace))return json.loads(content)logger.error("[get-configs] get config from server failed, try snapshot, namespace:%s" % self.namespace)content = read_file_str(self.snapshot_base, cache_key)if content is None:logger.warning("[get-configs] snapshot is not exist for %s." % cache_key)else:logger.debug("[get-configs] get %s from snapshot directory, content is %s" % (cache_key, truncate(content)))return json.loads(content)@synchronized_with_attr("pulling_lock")def add_config_watcher(self, data_id, group, cb, content=None):self.add_config_watchers(data_id, group, [cb], content)@synchronized_with_attr("pulling_lock")def add_config_watchers(self, data_id, group, cb_list, content=None):if not cb_list:raise NacosException("A callback function is needed.")data_id, group = process_common_config_params(data_id, group)logger.info("[add-watcher] data_id:%s, group:%s, namespace:%s" % (data_id, group, self.namespace))cache_key = group_key(data_id, group, self.namespace)wl = self.watcher_mapping.get(cache_key)if not wl:wl = list()self.watcher_mapping[cache_key] = wlif not content:content = self.get_config(data_id, group)last_md5 = NacosClient.get_md5(content)for cb in cb_list:wl.append(WatcherWrap(cache_key, cb, last_md5))logger.info("[add-watcher] watcher has been added for key:%s, new callback is:%s, callback number is:%s" % (cache_key, cb.__name__, len(wl)))if self.puller_mapping is None:logger.debug("[add-watcher] pulling should be initialized")self._init_pulling()if cache_key in self.puller_mapping:logger.debug("[add-watcher] key:%s is already in pulling" % cache_key)returnfor key, puller_info in self.puller_mapping.items():if len(puller_info[1]) < self.pulling_config_size:logger.debug("[add-watcher] puller:%s is available, add key:%s" % (puller_info[0], cache_key))puller_info[1].append(cache_key)self.puller_mapping[cache_key] = puller_infobreakelse:logger.debug("[add-watcher] no puller available, new one and add key:%s" % cache_key)key_list = self.process_mgr.list()key_list.append(cache_key)sys_os = platform.system()if sys_os == 'Windows' or sys_os == 'Darwin':puller = Thread(target=self._do_pulling, args=(key_list, self.notify_queue))else:puller = Process(target=self._do_pulling, args=(key_list, self.notify_queue))puller.daemon = Truepuller.start()self.puller_mapping[cache_key] = (puller, key_list)@synchronized_with_attr("pulling_lock")def remove_config_watcher(self, data_id, group, cb, remove_all=False):if not cb:raise NacosException("A callback function is needed.")data_id, group = process_common_config_params(data_id, group)if not self.puller_mapping:logger.warning("[remove-watcher] watcher is never started.")returncache_key = group_key(data_id, group, self.namespace)wl = self.watcher_mapping.get(cache_key)if not wl:logger.warning("[remove-watcher] there is no watcher on key:%s" % cache_key)returnwrap_to_remove = list()for i in wl:if i.callback == cb:wrap_to_remove.append(i)if not remove_all:breakfor i in wrap_to_remove:wl.remove(i)logger.info("[remove-watcher] %s is removed from %s, remove all:%s" % (cb.__name__, cache_key, remove_all))if not wl:logger.debug("[remove-watcher] there is no watcher for:%s, kick out from pulling" % cache_key)self.watcher_mapping.pop(cache_key)puller_info = self.puller_mapping[cache_key]puller_info[1].remove(cache_key)if not puller_info[1]:logger.debug("[remove-watcher] there is no pulling keys for puller:%s, stop it" % puller_info[0])self.puller_mapping.pop(cache_key)if isinstance(puller_info[0], Process):puller_info[0].terminate()def _do_sync_req(self, url, headers=None, params=None, data=None, timeout=None, method="GET", module="config"):all_headers = {}if headers:all_headers.update(headers)all_params = {}if params:all_params.update(params)self._inject_version_info(all_headers)self._inject_auth_info(all_headers, all_params, data, module)url = "?".join([url, urlencode(all_params)]) if all_params else urllogger.debug("[do-sync-req] url:%s, headers:%s, params:%s, data:%s, timeout:%s" % (url, all_headers, all_params, data, timeout))tries = 0while True:try:server_info = self.get_server()if not server_info:logger.error("[do-sync-req] can not get one server.")raise NacosRequestException("Server is not available.")address, port = server_infoserver = ":".join([address, str(port)])server_url = serverif not server_url.startswith("http"):server_url = "%s://%s" % ("http", server)if python_version_bellow("3"):req = Request(url=server_url + url, data=urlencode(data).encode() if data else None,headers=all_headers)req.get_method = lambda: methodctx = ssl.create_default_context()ctx.check_hostname = Falsectx.verify_mode = ssl.CERT_NONEelse:req = Request(url=server_url + url, data=urlencode(data).encode() if data else None,headers=all_headers, method=method)ctx = ssl.SSLContext()# build a new opener that adds proxy setting so that http request go through the proxyif self.proxies:proxy_support = ProxyHandler(self.proxies)https_support = HTTPSHandler(context=ctx)opener = build_opener(proxy_support, https_support)resp = opener.open(req, timeout=timeout)else:# for python version compatibilityif python_version_bellow("2.7.5"):resp = urlopen(req, timeout=timeout)else:resp = urlopen(req, timeout=timeout, context=ctx)logger.debug("[do-sync-req] info from server:%s" % server)return respexcept HTTPError as e:if e.code in [HTTPStatus.INTERNAL_SERVER_ERROR, HTTPStatus.BAD_GATEWAY,HTTPStatus.SERVICE_UNAVAILABLE]:logger.warning("[do-sync-req] server:%s is not available for reason:%s" % (server, e.msg))else:raiseexcept socket.timeout:logger.warning("[do-sync-req] %s request timeout" % server)except URLError as e:logger.warning("[do-sync-req] %s connection error:%s" % (server, e.reason))tries += 1if tries >= len(self.server_list):logger.error("[do-sync-req] %s maybe down, no server is currently available" % server)raise NacosRequestException("All server are not available")self.change_server()logger.warning("[do-sync-req] %s maybe down, skip to next" % server)def _do_pulling(self, cache_list, queue):cache_pool = dict()for cache_key in cache_list:cache_pool[cache_key] = CacheData(cache_key, self)while cache_list:unused_keys = set(cache_pool.keys())contains_init_key = Falseprobe_update_string = ""for cache_key in cache_list:cache_data = cache_pool.get(cache_key)if not cache_data:logger.debug("[do-pulling] new key added: %s" % cache_key)cache_data = CacheData(cache_key, self)cache_pool[cache_key] = cache_dataelse:unused_keys.remove(cache_key)if cache_data.is_init:contains_init_key = Truedata_id, group, namespace = parse_key(cache_key)probe_update_string += WORD_SEPARATOR.join([data_id, group, cache_data.md5 or "", self.namespace]) + LINE_SEPARATORfor k in unused_keys:logger.debug("[do-pulling] %s is no longer watched, remove from cache" % k)cache_pool.pop(k)logger.debug("[do-pulling] try to detected change from server probe string is %s" % truncate(probe_update_string))headers = {"Long-Pulling-Timeout": int(self.pulling_timeout * 1000)}# if contains_init_key:#     headers["longPullingNoHangUp"] = "true"data = {"Listening-Configs": probe_update_string}changed_keys = list()try:resp = self._do_sync_req("/nacos/v1/cs/configs/listener", headers, None, data,self.pulling_timeout + 10, "POST")changed_keys = [group_key(*i) for i in parse_pulling_result(resp.read())]logger.info("[do-pulling] following keys are changed from server %s" % truncate(str(changed_keys)))except NacosException as e:logger.error("[do-pulling] nacos exception: %s, waiting for recovery" % str(e))time.sleep(1)except Exception as e:logger.exception("[do-pulling] exception %s occur, return empty list, waiting for recovery" % str(e))time.sleep(1)for cache_key, cache_data in cache_pool.items():cache_data.is_init = Falseif cache_key in changed_keys:data_id, group, namespace = parse_key(cache_key)content = self.get_config(data_id, group)cache_data.md5 = NacosClient.get_md5(content)cache_data.content = contentqueue.put((cache_key, cache_data.content, cache_data.md5))@synchronized_with_attr("pulling_lock")def _init_pulling(self):if self.puller_mapping is not None:logger.info("[init-pulling] puller is already initialized")returnself.puller_mapping = dict()self.notify_queue = Queue()self.callback_tread_pool = pool.ThreadPool(self.callback_thread_num)self.process_mgr = Manager()t = Thread(target=self._process_polling_result)t.setDaemon(True)t.start()logger.info("[init-pulling] init completed")def _process_polling_result(self):while True:cache_key, content, md5 = self.notify_queue.get()logger.info("[process-polling-result] receive an event:%s" % cache_key)wl = self.watcher_mapping.get(cache_key)if not wl:logger.warning("[process-polling-result] no watcher on %s, ignored" % cache_key)continuedata_id, group, namespace = parse_key(cache_key)plain_content = contentparams = {"data_id": data_id,"group": group,"namespace": namespace,"raw_content": content,"content": plain_content,}for watcher in wl:if not watcher.last_md5 == md5:logger.info("[process-polling-result] md5 changed since last call, calling %s with changed md5: %s ,params: %s"% (watcher.callback.__name__,md5, params))try:self.callback_tread_pool.apply(watcher.callback, (params,))except Exception as e:logger.exception("[process-polling-result] exception %s occur while calling %s " % (str(e), watcher.callback.__name__))watcher.last_md5 = md5@staticmethoddef _inject_version_info(headers):headers.update({"User-Agent": "Nacos-Python-Client:v" + VERSION})outtime = 0access_token = ""def _inject_auth_info(self, headers, params, data, module="config"):if self.username and self.password and params:params.update({"username": self.username, "password": self.password})if module == "login":returnif self.username and self.password:if time.time() > self.outtime:data: HTTPResponse = self._do_sync_req("/nacos/v1/auth/login", None, None,{"username": self.username, "password": self.password}, None,"POST", "login")body = json.loads(data.read())self.access_token = body["accessToken"]self.outtime = time.time() + body["tokenTtl"] - 1params["accessToken"] = self.access_tokenif not self.auth_enabled:return# in case tenant or group is nullif not params and not data:returnts = str(int(time.time() * 1000))ak, sk = self.ak, self.sksign_str = ""params_to_sign = params or data or {}# config signatureif "config" == module:headers.update({"Spas-AccessKey": ak,"timeStamp": ts,})tenant = params_to_sign.get("tenant")group = params_to_sign.get("group")if tenant:sign_str = tenant + "+"if group:sign_str = sign_str + group + "+"if sign_str:sign_str += tsheaders["Spas-Signature"] = self.__do_sign(sign_str, sk)# naming signatureelse:group = params_to_sign.get("groupName")service_name = params_to_sign.get("serviceName")if service_name:if "@@" in service_name or group is None or group == "":sign_str = service_nameelse:sign_str = group + "@@" + service_namesign_str = ts + "@@" + sign_strelse:sign_str = tsparams.update({"ak": ak,"data": sign_str,"signature": self.__do_sign(sign_str, sk),})def __do_sign(self, sign_str, sk):return base64.encodebytes(hmac.new(sk.encode(), sign_str.encode(), digestmod=hashlib.sha1).digest()).decode().strip()def _build_metadata(self, metadata, params):if metadata:if isinstance(metadata, dict):params["metadata"] = json.dumps(metadata)else:params["metadata"] = metadatadef add_naming_instance(self, service_name, ip, port, cluster_name=None, weight=1.0, metadata=None,enable=True, healthy=True, ephemeral=True,group_name=DEFAULT_GROUP_NAME):logger.info("[add-naming-instance] ip:%s, port:%s, service_name:%s, namespace:%s" % (ip, port, service_name, self.namespace))params = {"ip": ip,"port": port,"serviceName": service_name,"weight": weight,"enable": enable,"healthy": healthy,"clusterName": cluster_name,"ephemeral": ephemeral,"groupName": group_name}self._build_metadata(metadata, params)if self.namespace:params["namespaceId"] = self.namespacetry:resp = self._do_sync_req("/nacos/v1/ns/instance", None, None, params, self.default_timeout, "POST", "naming")c = resp.read()logger.info("[add-naming-instance] ip:%s, port:%s, service_name:%s, namespace:%s, server response:%s" % (ip, port, service_name, self.namespace, c))return c == b"ok"except HTTPError as e:if e.code == HTTPStatus.FORBIDDEN:raise NacosException("Insufficient privilege.")else:raise NacosException("Request Error, code is %s" % e.code)except Exception as e:logger.exception("[add-naming-instance] exception %s occur" % str(e))raisedef remove_naming_instance(self, service_name, ip, port, cluster_name=None, ephemeral=True,group_name=DEFAULT_GROUP_NAME):logger.info("[remove-naming-instance] ip:%s, port:%s, service_name:%s, namespace:%s" % (ip, port, service_name, self.namespace))params = {"ip": ip,"port": port,"serviceName": service_name,"ephemeral": ephemeral,"groupName":group_name}if cluster_name is not None:params["clusterName"] = cluster_nameif self.namespace:params["namespaceId"] = self.namespacetry:resp = self._do_sync_req("/nacos/v1/ns/instance", None, None, params, self.default_timeout, "DELETE", "naming")c = resp.read()logger.info("[remove-naming-instance] ip:%s, port:%s, service_name:%s, namespace:%s, server response:%s" % (ip, port, service_name, self.namespace, c))return c == b"ok"except HTTPError as e:if e.code == HTTPStatus.FORBIDDEN:raise NacosException("Insufficient privilege.")else:raise NacosException("Request Error, code is %s" % e.code)except Exception as e:logger.exception("[remove-naming-instance] exception %s occur" % str(e))raisedef modify_naming_instance(self, service_name, ip, port, cluster_name=None, weight=None, metadata=None,enable=None, ephemeral=True,group_name=DEFAULT_GROUP_NAME):logger.info("[modify-naming-instance] ip:%s, port:%s, service_name:%s, namespace:%s" % (ip, port, service_name, self.namespace))params = {"ip": ip,"port": port,"serviceName": service_name,"ephemeral": ephemeral,"groupName": group_name}if cluster_name is not None:params["clusterName"] = cluster_nameif enable is not None:params["enable"] = enableif weight is not None:params["weight"] = weightself._build_metadata(metadata, params)if self.namespace:params["namespaceId"] = self.namespacetry:resp = self._do_sync_req("/nacos/v1/ns/instance", None, None, params, self.default_timeout, "PUT", "naming")c = resp.read()logger.info("[modify-naming-instance] ip:%s, port:%s, service_name:%s, namespace:%s, server response:%s" % (ip, port, service_name, self.namespace, c))return c == b"ok"except HTTPError as e:if e.code == HTTPStatus.FORBIDDEN:raise NacosException("Insufficient privilege.")else:raise NacosException("Request Error, code is %s" % e.code)except Exception as e:logger.exception("[modify-naming-instance] exception %s occur" % str(e))raisedef list_naming_instance(self, service_name, clusters=None, namespace_id=None, group_name=None, healthy_only=False):""":param service_name:        服务名:param clusters:            集群名称            字符串,多个集群用逗号分隔:param namespace_id:        命名空间ID:param group_name:          分组名:param healthy_only:         是否只返回健康实例   否,默认为false"""logger.info("[list-naming-instance] service_name:%s, namespace:%s" % (service_name, self.namespace))params = {"serviceName": service_name,"healthyOnly": healthy_only}if clusters is not None:params["clusters"] = clustersnamespace_id = namespace_id or self.namespaceif namespace_id:params["namespaceId"] = namespace_idgroup_name = group_name or 'DEFAULT_GROUP'if group_name:params['groupName'] = group_nametry:resp = self._do_sync_req("/nacos/v1/ns/instance/list", None, params, None, self.default_timeout, "GET", "naming")c = resp.read()logger.info("[list-naming-instance] service_name:%s, namespace:%s, server response:%s" %(service_name, self.namespace, c))return json.loads(c.decode("UTF-8"))except HTTPError as e:if e.code == HTTPStatus.FORBIDDEN:raise NacosException("Insufficient privilege.")else:raise NacosException("Request Error, code is %s" % e.code)except Exception as e:logger.exception("[list-naming-instance] exception %s occur" % str(e))raisedef get_naming_instance(self, service_name, ip, port, cluster_name=None):logger.info("[get-naming-instance] ip:%s, port:%s, service_name:%s, namespace:%s" % (ip, port, service_name,self.namespace))params = {"serviceName": service_name,"ip": ip,"port": port,}if cluster_name is not None:params["cluster"] = cluster_nameparams["clusterName"] = cluster_nameif self.namespace:params["namespaceId"] = self.namespacetry:resp = self._do_sync_req("/nacos/v1/ns/instance", None, params, None, self.default_timeout, "GET", "naming")c = resp.read()logger.info("[get-naming-instance] ip:%s, port:%s, service_name:%s, namespace:%s, server response:%s" %(ip, port, service_name, self.namespace, c))return json.loads(c.decode("UTF-8"))except HTTPError as e:if e.code == HTTPStatus.FORBIDDEN:raise NacosException("Insufficient privilege.")else:raise NacosException("Request Error, code is %s" % e.code)except Exception as e:logger.exception("[get-naming-instance] exception %s occur" % str(e))raisedef send_heartbeat(self, service_name, ip, port, cluster_name=None, weight=1.0, metadata=None, ephemeral=True,group_name=DEFAULT_GROUP_NAME):logger.info("[send-heartbeat] ip:%s, port:%s, service_name:%s, namespace:%s" % (ip, port, service_name,self.namespace))beat_data = {"serviceName": service_name,"ip": ip,"port": port,"weight": weight,"ephemeral": ephemeral}if cluster_name is not None:beat_data["cluster"] = cluster_nameif metadata is not None:if isinstance(metadata, str):beat_data["metadata"] = json.loads(metadata)else:beat_data["metadata"] = metadataparams = {"serviceName": service_name,"beat": json.dumps(beat_data),"groupName": group_name}if self.namespace:params["namespaceId"] = self.namespacetry:resp = self._do_sync_req("/nacos/v1/ns/instance/beat", None, params, None, self.default_timeout, "PUT", "naming")c = resp.read()logger.info("[send-heartbeat] ip:%s, port:%s, service_name:%s, namespace:%s, server response:%s" %(ip, port, service_name, self.namespace, c))return json.loads(c.decode("UTF-8"))except HTTPError as e:if e.code == HTTPStatus.FORBIDDEN:raise NacosException("Insufficient privilege.")else:raise NacosException("Request Error, code is %s" % e.code)except Exception as e:logger.exception("[send-heartbeat] exception %s occur" % str(e))raisedef subscribe(self,listener_fn, listener_interval=7, *args, **kwargs):"""reference at `/nacos/v1/ns/instance/list` in https://nacos.io/zh-cn/docs/open-api.html:param listener_fn           监听方法,可以是元组,列表,单个监听方法:param listener_interval     监听间隔,在 HTTP 请求 OpenAPI 时间间隔:return:"""service_name = kwargs.get("service_name")if not service_name:if len(args) > 0:service_name = args[0]else:raise NacosException("`service_name` is required in subscribe")self.subscribed_local_manager.add_local_listener(key=service_name, listener_fn=listener_fn)#  判断是否是第一次订阅调用class _InnerSubContext(object):first_sub = Truedef _compare_and_trigger_listener():#  invoke `list_naming_instance`latest_res = self.list_naming_instance(*args, **kwargs)latest_instances = latest_res['hosts']#  获取本地缓存实例local_service_instances_dict = self.subscribed_local_manager.get_local_instances(service_name)#  当前本地没有缓存,所有都是新的实例if not local_service_instances_dict:if not latest_instances or len(latest_instances) < 1:#  第一次订阅调用不通知if _InnerSubContext.first_sub:_InnerSubContext.first_sub = Falsereturnfor instance in latest_instances:slc = SubscribedLocalInstance(key=service_name, instance=instance)self.subscribed_local_manager.add_local_instance(slc)#  第一次订阅调用不通知if _InnerSubContext.first_sub:_InnerSubContext.first_sub = Falsereturnself.subscribed_local_manager.do_listener_launch(service_name, Event.ADDED, slc)else:local_service_instances_dict_copy = local_service_instances_dict.copy()for instance in latest_instances:slc = SubscribedLocalInstance(key=service_name, instance=instance)local_slc = local_service_instances_dict.get(slc.instance_id)# 本地不存在实例缓存if local_slc is None:self.subscribed_local_manager.add_local_instance(slc)self.subscribed_local_manager.do_listener_launch(service_name, Event.ADDED, slc)# 本地存在实例缓存else:local_slc_md5 = local_slc.md5local_slc_id = local_slc.instance_idlocal_service_instances_dict_copy.pop(local_slc_id)# 比较md5,存在实例变更if local_slc_md5 != slc.md5:self.subscribed_local_manager.remove_local_instance(local_slc).add_local_instance(slc)self.subscribed_local_manager.do_listener_launch(service_name, Event.MODIFIED, slc)#  still have instances in local marked deletedif len(local_service_instances_dict_copy) > 0:for local_slc_id, slc in local_service_instances_dict_copy.items():self.subscribed_local_manager.remove_local_instance(slc)self.subscribed_local_manager.do_listener_launch(service_name, Event.DELETED, slc)timer_name = 'service-subscribe-timer-{key}'.format(key=service_name)subscribe_timer = NacosTimer(name=timer_name,interval=listener_interval,fn=_compare_and_trigger_listener)subscribe_timer.scheduler()self.subscribe_timer_manager.add_timer(subscribe_timer)def unsubscribe(self, service_name, listener_name=None):"""remove listener from subscribed  listener manager:param service_name:    service_name:param listener_name:   listener name:return: """listener_manager = self.subscribed_local_manager.get_local_listener_manager(key=service_name)if not listener_manager:returnif listener_name:listener_manager.remove_listener(listener_name)returnlistener_manager.empty_listeners()def stop_subscribe(self):"""stop subscribe timer scheduler:return: """self.subscribe_timer_manager.stop()if DEBUG:NacosClient.set_debugging()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/664476.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

redis 6.x集群搭建

redis6集群搭建 安装文件下载 redis-6.2.6.tar.gz 编译 tar -zxvf redis-6.2.6.tar.gz cd redis-6.2.6/ make MALLOClibc make install PREFIX/opt/soft/redis复制可执行文件 cp /opt/soft/redis/redis-cli /usr/bin/redis-cli cp /opt/soft/redis/redis-server /usr/bi…

小鱼深度产品测评之:阿里云自研PolarDBMySQL 版 Serverless,真正达到100%兼容 MySQL,分析性能达到开源数据400倍。

阿里云自研PolarDBMySQL 版 Serverless测评 一、 开箱二、 试用教程三、 使用感受3.1 查看资源包信息3.2 列表3.2.1 列表展示3.2.2 集群名称 3.3创建账号3.4 实例配置3.5 Serverless弹性压测3.5.1 遇到问题3.5.2 实操 四、 总结 一、 开箱 又到了体验新产品的时候了。 话不多说…

基于Python的招聘网站爬虫及可视化的设计与实现

摘要&#xff1a;现在&#xff0c;随着互联网网络的飞速发展&#xff0c;人们获取信息的最重要来源也由报纸、电视转变为了互联网。互联网的广泛应用使网络的数据量呈指数增长&#xff0c;让人们得到了更新、更完整的海量信息的同时&#xff0c;也使得人们在提取自己最想要的信…

tcpdump在手机上的使用

首先手机得root才可以&#xff0c;主要分析手机与手机的通信协议 我使用的是一加9pro&#xff0c; root方法参考一加全能盒子、一加全能工具箱官方网站——大侠阿木 (daxiaamu.com)https://optool.daxiaamu.com/index.php tcpdump&#xff0c;要安装在/data/local/tmp下要arm6…

JProfiler for Mac/win:深度探索Java性能的终极工具

随着Java应用的日益普及&#xff0c;性能优化成为开发人员的重要任务。在众多性能分析工具中&#xff0c;JProfiler以其强大的功能和直观的界面脱颖而出。本文将深入探讨JProfiler的优势&#xff0c;以及如何利用它来提升Java应用的性能。 一、JProfiler的核心优势 全面性能监…

备战蓝桥杯---搜索(DFS基础2)

下面我主要介绍一下深搜的简单应用吧&#xff1a; 下面是分析&#xff1a; 我们对每行遍历一下&#xff0c;跟求排列差不多。在判断条件上&#xff0c;我们可以放一个存列的数组&#xff0c;对于对角线的判断&#xff0c;我们可以发现在主对角线上&#xff0c;列数-dep为恒定值…

Swift 入门之自定义类型的模式匹配(Pattern Matching)

概览 小伙伴们都知道 Swift 是一门简洁、类型安全、极富表现力以及“性感迷人”的编程语言。 和大多数语言一样&#xff0c;在 Swift 中也有一些隐藏着的、不为人知的宝藏特性。利用它们我们可以极大增加撸码的愉悦和成就感。 其中&#xff0c;模式匹配&#xff08;Pattern …

Linux---动静态库

动静态库的相关概念 静态库&#xff08;.a&#xff09;&#xff1a;程序在编译链接的时候把库的代码链接到可执行文件中。程序运行的时候将不再需要静态库动态库&#xff08;.so&#xff09;&#xff1a;程序在运行的时候才去链接动态库的代码&#xff0c;多个程序共享使用库的…

2024 年 11 款值得收藏的 iPhone 数据恢复软件和应用

数据丢失是任何人都无法承受的&#xff0c;因为它对每个人都至关重要。但是数据丢失的原因有很多&#xff0c;一些常见的原因是数据意外删除、设备被盗、iOS 越狱、硬件损坏、病毒感染等。我们列出了 iOS 的顶级恢复工具&#xff0c;其中将帮助您方便地恢复数据。 这是 11 款最…

k8s之安装部署及kuboard发布应用

目录 环境准备 系统规划 配置免密 将桥接的IPv4流量传递到iptables的链 系统基础配置 安装docker 安装docker及基础依赖 配置docker的仓库下载地址 部署k8s 添加阿里云的k8s源 安装kubeadm&#xff0c;kubelet和kubectl 初始化masteer节点 部署node节点 部署flanne…

docker相关问题解决(file exists、not a directory

背景 以下环境为wsl file exists 缓存没删干净 docker-compose down -v not a directory flags: 0x5000: not a directory: unknown: Are you trying to mount a directory onto a file (or vice-versa)? 明明我确定报错指示的位置就是文件而不是文件夹...相当神奇的错误 …

【C++初阶】--入门基础(二)

目录 一.C输出与输入 二.缺省参数 1.概念 2.缺省参数分类 (1) 全缺省参数 (2)半缺省参数 三.函数重载 1.概念 2.C支持函数重载的原理--名字修饰 四.引用 1.概念 2.语法 3.引用的特性 (1)引用在定义时必须初始化 (2)引用时不能改变指向 (3)一个变量…

UE 代码构建(BuildSystem)与源码编译相关

年底了&#xff0c;把之前的草稿文章整理一下&#xff0c;整理好的发出来 UnrealBuildTool简介 参考&#xff1a;https://docs.unrealengine.com/4.27/en-US/ProductionPipelines/BuildTools/UnrealBuildTool/ UE里的项目代码、包括UE本身的源码&#xff0c;都是划分成一个…

flv视频格式批量截取封面图(不占内存版)--其他视频格式也通用

flv视频格式批量截取封面图&#xff08;不占内存版&#xff09;--其他视频格式也通用 需求&#xff08;实现的效果&#xff09;功能实现htmlcssjs 需求&#xff08;实现的效果&#xff09; 批量显示视频&#xff0c;后端若返回有imgUrl,则直接显示图1&#xff0c; 若无&#xf…

Socket套接字类编译测试

目录 类设计 类实现 测试 测试服务器 测试客户端 测试结果 这一节相当于整合了之前的一些东西&#xff0c;重新过了一遍&#xff0c;这个就显得相对之前的版本更加完善一点 类设计 // 套接字类 #define MAX_LISTEN 1024 class Socket {private:int _sockfd;public:Socke…

闲人闲谈PS之五十三——离散制造中的魔鬼--物料套裁

惯例闲话&#xff1a;最近和老婆大人商议买车事宜&#xff0c;闲人以为会陷入买油车还是电车的纠结&#xff0c;没想到老婆大人无比坚定&#xff0c;买电车。在买车这方面&#xff0c;老婆的想法居然比闲人超前。闲人对车定位在代步工具&#xff0c;2年前&#xff0c;对车还是印…

SAP下载word

事务代码&#xff1a;STRANS 启动转换器 步骤 1. 将参数填入模板&#xff0c;并另存为word 2003 xml文档 2.使用网页打开xml文档&#xff0c;并将xml拷贝到转换器tt:template中&#xff0c;添加参数 3.替换参数&#xff0c;部分xml可能存在错误或者跑偏根据实际情况检查修改 …

洛谷 P1980 [NOIP2013 普及组] 计数问题

题目背景 NOIP2013 普及组 T1 题目描述 试计算在区间 1 到 n 的所有整数中&#xff0c;数字 x&#xff08;0≤x≤9&#xff09;共出现了多少次&#xff1f;例如&#xff0c;在 1 到 11 中&#xff0c;即在 1,2,3,4,5,6,7,8,9,10,11 中&#xff0c;数字 1 出现了 4 次。 输入…

Ubuntu18.04安装Matlab流程笔记

提示:博主取舍了很多大佬的博文并亲测有效,分享笔记邀大家共同学习讨论 Ubuntu18.04 安装Matlab流程 下载安装包和破解文件安装Matlab注册并运行 下载安装包和破解文件 matlabR2019A源码 提取码:2ztb 下载的Linux matlab2018a文件夹内有三个文件&#xff1a; # 解压Matlab201…

<网络安全>《15 移动安全管理系统》

1 概念 移动安全管理系统&#xff0c;MSM&#xff0c;Mobile security management,提供大而全的功能解决方案&#xff0c;覆盖了企业移动信息化中所涉及到安全沙箱、数据落地保护、威胁防护、设备管理、应用管理、文档管理、身份认证等各个维度。移动安全管理系统将设备管理和…