参考博客
1、服务实例接口与默认实现
type ServiceInstance interface {// 获取服务实例的唯一IDGetInstanceId() string// 获取服务IDGetServiceId() string// 获取服务实例的主机名或IP地址GetHost() string// 获取服务实例的端口号GetPort() int// 判断服务实例是否使用HTTPSIsSecure() bool// 获取服务实例的元数据(键值对形式)GetMetadata() map[string]string
}type DefaultServiceInstance struct {InstanceId stringServiceId stringHost stringPort intSecure boolMetadata map[string]string
}// 构造函数
func NewDefaultServiceInstance(serviceId string, host string, port int, secure bool,metadata map[string]string, instanceId string) (*DefaultServiceInstance, error) {// 如果没有传入 IP 则获取一下,这个方法在多网卡的情况下,并不好用if len(host) == 0 {localIP, err := util.GetLocalIP()if err != nil {return nil, err}host = localIP}//若instanceId为空,自动生成“服务ID-时间戳-随机数(1000-9999)”if len(instanceId) == 0 {instanceId = serviceId + "-" + strconv.FormatInt(time.Now().Unix(), 10) + "-" + strconv.Itoa(rand.Intn(9000)+1000)}return &DefaultServiceInstance{InstanceId: instanceId, ServiceId: serviceId, Host: host, Port: port, Secure: secure, Metadata: metadata}, nil
}func (serviceInstance DefaultServiceInstance) GetInstanceId() string {return serviceInstance.InstanceId
}func (serviceInstance DefaultServiceInstance) GetServiceId() string {return serviceInstance.ServiceId
}func (serviceInstance DefaultServiceInstance) GetHost() string {return serviceInstance.Host
}func (serviceInstance DefaultServiceInstance) GetPort() int {return serviceInstance.Port
}func (serviceInstance DefaultServiceInstance) IsSecure() bool {return serviceInstance.Secure
}func (serviceInstance DefaultServiceInstance) GetMetadata() map[string]string {return serviceInstance.Metadata
}
2、定义服务注册和剔除的方法
type ServiceRegistry interface {Register(serviceInstance cloud.ServiceInstance) boolDeregister()
}
具体实现:
import ("errors""fmt""github.com/hashicorp/consul/api""strconv""unsafe"
)type consulServiceRegistry struct {// 服务实例缓存(服务ID->实例ID->实例)serviceInstances map[string]map[string]cloud.ServiceInstance//Consul客户端client api.Client//当前本地服务实例localServiceInstance cloud.ServiceInstance
}func (c consulServiceRegistry) Register(serviceInstance cloud.ServiceInstance) bool {// 创建注册到consul的服务到registration := new(api.AgentServiceRegistration)registration.ID = serviceInstance.GetInstanceId()registration.Name = serviceInstance.GetServiceId()registration.Port = serviceInstance.GetPort()var tags []stringif serviceInstance.IsSecure() {tags = append(tags, "secure=true")} else {tags = append(tags, "secure=false")}if serviceInstance.GetMetadata() != nil {var tags []stringfor key, value := range serviceInstance.GetMetadata() {tags = append(tags, key+"="+value)}registration.Tags = tags}registration.Tags = tagsregistration.Address = serviceInstance.GetHost()// 增加consul健康检查回调函数check := new(api.AgentServiceCheck)schema := "http"if serviceInstance.IsSecure() {schema = "https"}check.HTTP = fmt.Sprintf("%s://%s:%d/actuator/health", schema, registration.Address, registration.Port)check.Timeout = "5s"check.Interval = "5s"check.DeregisterCriticalServiceAfter = "20s" // 故障检查失败30s后 consul自动将注册服务删除registration.Check = check// 注册服务到consulerr := c.client.Agent().ServiceRegister(registration)if err != nil {fmt.Println(err)return false}//初始化服务实例缓存结构//若未初始化,服务ID-实例ID-实例if c.serviceInstances == nil {c.serviceInstances = map[string]map[string]cloud.ServiceInstance{}}//获取特定服务的实例集合//从外层map中查询该服务ID对应的内层mapservices := c.serviceInstances[serviceInstance.GetServiceId()]//初始化空实例集合//处理首次注册情况,创建新的内层map,用处存储该服务的实例集合if services == nil {services = map[string]cloud.ServiceInstance{}}//添加当前实例到集合services[serviceInstance.GetInstanceId()] = serviceInstance//更新外层缓存 //确保外层map中服务ID对应内层map是最新版本 c.serviceInstances[serviceInstance.GetServiceId()] = services//记录当前实例c.localServiceInstance = serviceInstancereturn true
}// deregister a service
func (c consulServiceRegistry) Deregister() {//检查服务实例缓存是否初始化if c.serviceInstances == nil {return}//从缓存中获取当前服务ID对应的实例集合(实例ID->实例对象)services := c.serviceInstances[c.localServiceInstance.GetServiceId()]//检查实例集合是否存在if services == nil {return}//从本地缓存移除当前实例delete(services, c.localServiceInstance.GetInstanceId())//清空服务记录if len(services) == 0 {delete(c.serviceInstances, c.localServiceInstance.GetServiceId())}//从Consul注销服务_ = c.client.Agent().ServiceDeregister(c.localServiceInstance.GetInstanceId())//重置当前实例记录c.localServiceInstance = nil
}// new a consulServiceRegistry instance
// token is optional
func NewConsulServiceRegistry(host string, port int, token string) (*consulServiceRegistry, error) {if len(host) < 3 {return nil, errors.New("check host")}if port <= 0 || port > 65535 {return nil, errors.New("check port, port should between 1 and 65535")}config := api.DefaultConfig()config.Address = host + ":" + strconv.Itoa(port)config.Token = tokenclient, err := api.NewClient(config)if err != nil {return nil, err}return &consulServiceRegistry{client: *client}, nil
}
测试用例:
func TestConsulServiceRegistry(t *testing.T) {//初始化Consul注册中心客户端host := "127.0.0.1"port := 8500registryDiscoveryClient, _ := extension.NewConsulServiceRegistry(host, port, "")//获取本地IP地址ip, err := util.GetLocalIP()if err != nil {t.Error(err)}//创建服务实例信息serviceInstanceInfo, _ := cloud.NewDefaultServiceInstance("go-user-server", "", 8090,false, map[string]string{"user":"zyn"}, "")//注册服务实例
registryDiscoveryClient.Register(serviceInstanceInfo)r := gin.Default()// 健康检测接口,其实只要是 200 就认为成功了r.GET("/actuator/health", func(c *gin.Context) {c.JSON(200, gin.H{"message": "pong",})})err = r.Run(":8090")if err != nil{registryDiscoveryClient.Deregister()}
}
3、服务发现
- 获取所有的服务列表
- 获取指定的服务的所有实例信息
接口定义:
type DiscoveryClient interface {/*** Gets all ServiceInstances associated with a particular serviceId.* @param serviceId The serviceId to query.* @return A List of ServiceInstance.*/GetInstances(serviceId string) ([]cloud.ServiceInstance, error)/*** @return All known service IDs.*/GetServices() ([]string, error)
}
具体实现:
type consulServiceRegistry struct {serviceInstances map[string]map[string]cloud.ServiceInstanceclient api.ClientlocalServiceInstance cloud.ServiceInstance
}func (c consulServiceRegistry) GetInstances(serviceId string) ([]cloud.ServiceInstance, error) {//查询指定服务catalogService, _, _ := c.client.Catalog().Service(serviceId, "", nil)//若查询到服务实例if len(catalogService) > 0 {//转换Consul数据到标准格式result := make([]cloud.ServiceInstance, len(catalogService))for index, sever := range catalogService {s := cloud.DefaultServiceInstance{InstanceId: sever.ServiceID,ServiceId: sever.ServiceName,Host: sever.Address,Port: sever.ServicePort,Metadata: sever.ServiceMeta,}result[index] = s}return result, nil}return nil, nil
}
//返回服务名称列表
func (c consulServiceRegistry) GetServices() ([]string, error) {//查询所有服务services, _, _ := c.client.Catalog().Services(nil)result := make([]string, unsafe.Sizeof(services))index := 0for serviceName, _ := range services {result[index] = serviceNameindex++}return result, nil
}// new a consulServiceRegistry instance
// token is optional
func NewConsulServiceRegistry(host string, port int, token string) (*consulServiceRegistry, error) {if len(host) < 3 {return nil, errors.New("check host")}if port <= 0 || port > 65535 {return nil, errors.New("check port, port should between 1 and 65535")}config := api.DefaultConfig()config.Address = host + ":" + strconv.Itoa(port)config.Token = tokenclient, err := api.NewClient(config)if err != nil {return nil, err}return &consulServiceRegistry{client: *client}, nil
}
测试用例:
func TestConsulServiceDiscovery(t *testing.T) {host := "127.0.0.1"port := 8500token := ""registryDiscoveryClient, err := extension.NewConsulServiceRegistry(host, port, token)if err != nil {panic(err)}t.Log(registryDiscoveryClient.GetServices())t.Log(registryDiscoveryClient.GetInstances("go-user-server"))
}
结果:
consul_service_registry_test.go:57: [consul go-user-server ] <nil>consul_service_registry_test.go:59: [{go-user-server-1602590661-56179 go-user-server 127.0.0.1 8090 false map[user:zyn]}] <nil>