0.前言
这一节的内容只能解决只有一个服务的情况。要是有多个服务(即是多个结构体)这种就解决不了,也即是没有服务ip地址和服务实例的映射关系。
1.为什么需要注册中心
在上一节中,客户端想要找到服务实例的ip,需要硬编码把ip写到代码中。这时可能会出问题,要是该服务实例ip改变了呢,该服务实例下线宕机了呢?这时如何是好。
// 调用单个服务实例
func clientCall(addr1, addr2 string) {d := xclient.NewMultiServerDiscovery([]string{"tcp@" + addr1, "tcp@" + addr2})xc := xclient.NewXClient(d, xclient.RandomSelect, nil)defer xc.Close()//省略其他......
}
这时,注册中心的重要性就出来了。
注册中心主要有三种角色:
- 服务提供者(RPC Server):在启动时,向 Registry 注册自身服务,并向 Registry 定期发送心跳汇报存活状态。
- 服务消费者(RPC Client):在启动时,向 Registry 订阅服务,把 Registry 返回的服务节点列表缓存在本地内存中,并与 RPC Sever 建立连接。
- 服务注册中心(Registry):用于保存 RPC Server 的注册信息,当 RPC Server 节点发生变更时,Registry 会同步变更,RPC Client 感知后会刷新本地 内存中缓存的服务节点列表。
最后,RPC Client 从本地缓存的服务节点列表中,基于负载均衡算法选择一台 RPC Sever 发起调用。
当然注册中心的功能还有很多,比如配置的动态同步、通知机制等。比较常用的注册中心有 etcd、zookeeper、consul,一般比较出名的微服务或者 RPC 框架,这些主流的注册中心都是支持的。
2.Gee Registry
主流的注册中心 etcd、zookeeper 等功能强大,与这类注册中心的对接代码量是比较大的,需要实现的接口也很多。所以这里我们选择自己实现一个简单的支持心跳保活的注册中心。
GeeRegistry 的代码独立放置在子目录 registry 中。
首先定义 GeeRegistry 结构体,默认超时时间设置为 5 min,也就是说,超过5min没有收到该注册的服务的心跳,即视其为不可用状态。
//registry.go
type ServerItem struct {Addr stringstart time.Time //用于心跳时间计算
}// GeeRegistry is a simple register center
type GeeRegistry struct {timeout time.Durationmutex sync.Mutex //protcect serversservers map[string]*ServerItem
}const (defaultPath = "/_rpc_/registry"defaultTimeout = time.Minute * 5
)func New(timeout time.Duration) *GeeRegistry {return &GeeRegistry{servers: make(map[string]*ServerItem),timeout: timeout,}
}var DefalultGeeRegister = New(defaultTimeout)
然后,为 GeeRegistry 实现添加服务实例和返回服务列表的方法。
- putServer:添加服务实例,如果服务已经存在,则更新 start。
- aliveServers:返回可用的服务列表,如果存在超时的服务,则删除。
func (r *GeeRegistry) putServer(addr string) {r.mutex.Lock()defer r.mutex.Unlock()s := r.servers[addr]if s == nil {r.servers[addr] = &ServerItem{Addr: addr, start: time.Now()}} else {s.start = time.Now() // if exists, update start time to keep alive}
}func (r *GeeRegistry) aliveServers() []string {r.mutex.Lock()defer r.mutex.Unlock()var alive []stringfor addr, s := range r.servers {if r.timeout == 0 || s.start.Add(r.timeout).After(time.Now()) {alive = append(alive, addr)} else {delete(r.servers, addr)}}sort.Strings(alive)return alive
}
为了简单,那么rpc客户端通过HTTP去访问注册中心,且所有的有用信息都承载在 HTTP Header 中。
- Get:返回所有可用的服务列表,通过自定义字段 X-rpc-Servers 承载。
- Post:添加服务实例或发送心跳,通过自定义字段 X-rpc-Server 承载。
func (r *GeeRegistry) ServeHTTP(w http.ResponseWriter, req *http.Request) {switch req.Method {case "GET":w.Header().Set("X-rpc-Servers", strings.Join(r.aliveServers(), ","))case "POST":addr := req.Header.Get("X-rpc-Servers")if addr == "" {w.WriteHeader(http.StatusInternalServerError)return}r.putServer(addr) //更新保存在注册中心的服务实例default:w.WriteHeader(http.StatusMethodNotAllowed)}
}func (r *GeeRegistry) HandleHTTP(registryPath string) {http.Handle(registryPath, r)
}func HandleHTTP() {DefalultGeeRegister.HandleHTTP(defaultPath)
}
另外,也要提供 Heartbeat 方法,便于服务启动时定时向注册中心发送心跳(也是通过HTTP),默认周期比注册中心设置的过期时间少 1 min。
// only send once
func sendHeartbeat(registryURL, addr string) error {httpClient := &http.Client{Timeout: time.Second * 10}req, _ := http.NewRequest("POST", registryURL, nil)req.Header.Set("X-rpc-Servers", addr)resp, err := httpClient.Do(req)if err != nil {fmt.Println("rpc server: heart beat err:", err)return err}defer resp.Body.Close()return nil
}// Heartbeat send a heartbeat message every once in a while
func Heartbeat(registryURL, addr string, duration time.Duration) {if duration == 0 {duration = defaultTimeout - time.Duration(1)*time.Minute}err := sendHeartbeat(registryURL, addr)go func() {//创建一个定时器t := time.NewTicker(duration)for err == nil {<-t.Cerr = sendHeartbeat(registryURL, addr)}}()
}
3.需要注册中心的服务发现
上一节我们实现了一个不需要注册中心,服务列表由手工维护的服务发现的结构体MultiServersDiscovery。
而现在我们实现了注册中心,那这一节的服务发现就可以继承上一节的,并添加与注册中心相关的细节。
type GeeRegistryDiscovery struct {*MultiServerDiscoveryregistryAddr stringtimeout time.Duration //服务列表的过期时间lastUpdate time.Time
}const defaultUpdateTimeout = time.Second * 10func NewGeeRegistryDiscovery(registerAddr string, timeout time.Duration) *GeeRegistryDiscovery {if timeout == 0 {timeout = defaultUpdateTimeout}return &GeeRegistryDiscovery{MultiServerDiscovery: NewMultiServerDiscovery(make([]string, 0)),registryAddr: registerAddr,timeout: timeout,}
}
- GeeRegistryDiscovery 嵌套了 MultiServersDiscovery,很多能力可以复用。
- registryAddr 即注册中心的地址
- timeout 服务列表的过期时间
- lastUpdate 是代表最后从注册中心更新服务列表的时间,默认 10s 过期,即 10s 之后,需要从注册中心更新新的列表。
实现 Update 和 Refresh 方法,超时重新获取的逻辑在 Refresh 中实现:
func (d *GeeRegistryDiscovery) Update(servers []string) error {d.rwMutex.Lock()defer d.rwMutex.Unlock()d.servers = serversd.lastUpdate = time.Now()return nil
}// 刷新,有了注册中心,在客户端每次获取服务实例时候,需要刷新注册中心的保存的服务实例
func (d *GeeRegistryDiscovery) Refresh() error {d.rwMutex.Lock()defer d.rwMutex.Unlock()//注册中心保存的服务实例还没超时,不用更新if d.lastUpdate.Add(d.timeout).After(time.Now()) {return nil}httpClient := http.Client{Timeout: time.Second * 10} //http客户端最好有个超时resp, err := httpClient.Get(d.registryAddr)if err != nil {fmt.Println("rpc registry refresh err:", err)return err}defer resp.Body.Close()servers := strings.Split(resp.Header.Get("X-rpc-Servers"), ",")d.servers = make([]string, 0, len(servers))for _, server := range servers {//返回一个string类型,并将最前面和最后面的ASCII定义的空格去掉,中间的空格不会去掉s := strings.TrimSpace(server)if s != "" {d.servers = append(d.servers, s)}}d.lastUpdate = time.Now()return nil
}
Get
和 GetAll
与 MultiServersDiscovery 相似,唯一的不同在于,GeeRegistryDiscovery 需要先调用 Refresh 确保服务列表没有过期。
func (d *GeeRegistryDiscovery) Get(mode SelectMode) (string, error) {if err := d.Refresh(); err != nil {return "", err}//d.Get(mode) 表示调用的是(GeeRegistryDiscovery).Getreturn d.MultiServerDiscovery.Get(mode) //d.MultiServerDiscovery是调用MultiServerDiscovery的Get()
}func (d *GeeRegistryDiscovery) GetAll() ([]string, error) {if err := d.Refresh(); err != nil {return nil, err}return d.MultiServerDiscovery.GetAll()
}
4.测试
添加函数 startRegistry,之后需要稍微修改 startServer,定期向注册中心发送心跳保活(Heartbeat
)。
这里使用sync.WaitGroup是为了等待该操作执行完毕才会往后执行,因为这些函数都是新开协程运行。
func startServer(registryAddr string, wg *sync.WaitGroup) {var myServie Myl, _ := net.Listen("tcp", "localhost:0") //端口是0表示端口随机server := geerpc.NewServer()//这里一定要用&myServie,因为前面Sum方法的接受者是*My;若接受者是My,myServie或者&myServie都可以server.Register(&myServie)registry.Heartbeat(registryAddr, "tcp@"+l.Addr().String(), 0) //定时发送心跳wg.Done()server.Accept(l)
}func startRegistry(wg *sync.WaitGroup) {l, _ := net.Listen("tcp", "localhost:9999")registry.HandleHTTP()wg.Done()http.Serve(l, nil)
}
接下来,将 call 和 broadcast 的 MultiServersDiscovery 替换为 GeeRegistryDiscovery,不再需要硬编码服务列表。
这里就重点对比下NewGeeRegistryDiscovery方法和之前的不同之处。
// 调用单个服务实例
func clientCall(registryAddr string) {// d := xclient.NewMultiServerDiscovery([]string{"tcp@" + addr1, "tcp@" + addr2})d := xclient.NewGeeRegistryDiscovery(registryAddr, 0)xc := xclient.NewXClient(d, xclient.RandomSelect, nil)defer xc.Close()var wg sync.WaitGroupfor i := 0; i < 5; i++ {wg.Add(1)go func(i int) {defer wg.Done()var reply int = 1324if err := xc.Call(context.Background(), "My.Sum", &Args{Num1: i, Num2: i * i}, &reply); err != nil {log.Println("call Foo.Sum error:", err)}fmt.Println("reply: ", reply)}(i)}wg.Wait()
}func broadcast(registryAddr string) {// d := xclient.NewMultiServerDiscovery([]string{"tcp@" + addr1, "tcp@" + addr2})d := xclient.NewGeeRegistryDiscovery(registryAddr, 0)xc := xclient.NewXClient(d, xclient.RandomSelect, nil)defer xc.Close()var wg sync.WaitGroupfor i := 0; i < 5; i++ {wg.Add(1)go func(i int) {defer wg.Done()var reply int = 1324if err := xc.Broadcast(context.Background(), "My.Sum", &Args{Num1: i, Num2: i * i}, &reply); err != nil {fmt.Println("Broadcast call Foo.Sum error:", err)}fmt.Println("Broadcast reply: ", reply)ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)defer cancel()var replyTimeout int = 1324if err := xc.Broadcast(ctx, "My.Sleep", &Args{Num1: i, Num2: i * i}, &replyTimeout); err != nil {fmt.Println("Broadcast call Foo.Sum error:", err)}fmt.Println("timeout Broadcast reply: ", replyTimeout)}(i)}wg.Wait()
}
最后是main函数。
确保注册中心启动后,再启动 RPC 服务端,最后客户端远程调用。
func main() {registryAddr := "http://localhost:9999/_rpc_/registry"var wg sync.WaitGroupwg.Add(1)go startRegistry(&wg) //开启注册中心服务wg.Wait()time.Sleep(time.Second)wg.Add(2)go startServer(registryAddr, &wg)go startServer(registryAddr, &wg)wg.Wait()time.Sleep(time.Second)clientCall(registryAddr)broadcast(registryAddr)
}
运行结果:
代码: https://github.com/liwook/Go-projects/tree/main/geerpc/7-registry