如果使用nacos-sdk-python(注意适用nacos版本),需要按照下面的链接修改源码的bug
https://github.com/nacos-group/nacos-sdk-python/issues/135
代码如下:
import nacos
import threading
import socket
import requests
import json
from flask import Flaskapp = Flask(__name__)class NacosReg:def __init__(self):self.SERVER_HOST = "127.0.0.1"self.SERVER_PORT = 8848self.SERVER_ADDRESSES = f"http://{self.SERVER_HOST}:{self.SERVER_PORT}"self.SERVICE_NAME = "test_server"self.GROUP_NAME = "DEFAULT_GROUP"self.username = "nacos"self.password = "nacos"self.client = nacos.NacosClient(self.SERVER_ADDRESSES, username=self.username, password=self.password)self.model_port = 8000def get_host_ip(self):try:s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)s.connect((self.SERVER_HOST, self.SERVER_PORT))ip = s.getsockname()[0]finally:s.close()return ipdef service_register_and_check(self):self.service_register_()self.set_http_check()def service_register(self):self.client.add_naming_instance(self.SERVICE_NAME,self.get_host_ip(),self.model_port, ephemeral=False, group_name=self.GROUP_NAME)def service_register_(self):params = {"ip": self.get_host_ip(),"port": self.model_port,"serviceName": self.SERVICE_NAME,"weight": 1.0,"enable": True,"healthy": True,"clusterName": "None","ephemeral": False,"groupName": self.GROUP_NAME,'username': self.username,'password': self.password}res = requests.post(f"{self.SERVER_ADDRESSES}/nacos/v1/ns/instance", params=params)print(res.text)def service_unregister(self):self.client.remove_naming_instance(self.SERVICE_NAME,self.get_host_ip(),self.model_port, ephemeral=False, group_name=self.GROUP_NAME)def set_http_check(self):data = {"serviceName": f"{self.GROUP_NAME}@@{self.SERVICE_NAME}","clusterName": "None","checkPort": 80,"useInstancePort4Check": True,"healthChecker": json.dumps({"type": "HTTP", "path": "/test"}),"namespaceId": ""}res = requests.put(f"{self.SERVER_ADDRESSES}/nacos/v1/ns/cluster",params={'username': self.username, 'password': self.password}, data=data)print(res.text)def service_list(self):return self.client.list_naming_instance(self.SERVICE_NAME, healthy_only=True)@app.route("/test", methods=['GET'])
def test():return "ok"if __name__ == '__main__':threading.Timer(1, NacosReg().service_register_and_check).start()app.run(port=8000)
服务器运行情况:
nacos服务器情况:
HTTP探活配置-- 单击上图集群配置:
参考资料:
GitHub - nacos-group/nacos-sdk-python: nacos python sdk
Open API 指南 | Nacos