RPC教程 7.服务发现与注册中心

0.前言

这一节的内容只能解决只有一个服务的情况。要是有多个服务(即是多个结构体)这种就解决不了,也即是没有服务ip地址和服务实例的映射关系。

1.为什么需要注册中心

在上一节中,客户端想要找到服务实例的ip,需要硬编码把ip写到代码中。这时可能会出问题,要是该服务实例ip改变了呢,该服务实例下线宕机了呢?这时如何是好。

// 调用单个服务实例
func clientCall(addr1, addr2 string) {d := xclient.NewMultiServerDiscovery([]string{"tcp@" + addr1, "tcp@" + addr2})xc := xclient.NewXClient(d, xclient.RandomSelect, nil)defer xc.Close()//省略其他......
}

这时,注册中心的重要性就出来了。

注册中心主要有三种角色:

  • 服务提供者(RPC Server):在启动时,向 Registry 注册自身服务,并向 Registry 定期发送心跳汇报存活状态。
  • 服务消费者(RPC Client):在启动时,向 Registry 订阅服务,把 Registry 返回的服务节点列表缓存在本地内存中,并与 RPC Sever 建立连接。
  • 服务注册中心(Registry):用于保存 RPC Server 的注册信息,当 RPC Server 节点发生变更时,Registry 会同步变更,RPC Client 感知后会刷新本地 内存中缓存的服务节点列表。

最后,RPC Client 从本地缓存的服务节点列表中,基于负载均衡算法选择一台 RPC Sever 发起调用。

 当然注册中心的功能还有很多,比如配置的动态同步、通知机制等。比较常用的注册中心有 etcd、zookeeper、consul,一般比较出名的微服务或者 RPC 框架,这些主流的注册中心都是支持的。

2.Gee Registry

主流的注册中心 etcd、zookeeper 等功能强大,与这类注册中心的对接代码量是比较大的,需要实现的接口也很多。所以这里我们选择自己实现一个简单的支持心跳保活的注册中心。

GeeRegistry 的代码独立放置在子目录 registry 中。

首先定义 GeeRegistry 结构体,默认超时时间设置为 5 min,也就是说,超过5min没有收到该注册的服务的心跳,即视其为不可用状态。

//registry.go
type ServerItem struct {Addr  stringstart time.Time //用于心跳时间计算
}// GeeRegistry is a simple register center
type GeeRegistry struct {timeout time.Durationmutex   sync.Mutex //protcect serversservers map[string]*ServerItem
}const (defaultPath    = "/_rpc_/registry"defaultTimeout = time.Minute * 5
)func New(timeout time.Duration) *GeeRegistry {return &GeeRegistry{servers: make(map[string]*ServerItem),timeout: timeout,}
}var DefalultGeeRegister = New(defaultTimeout)

然后,为 GeeRegistry 实现添加服务实例和返回服务列表的方法。

  • putServer:添加服务实例,如果服务已经存在,则更新 start。
  • aliveServers:返回可用的服务列表,如果存在超时的服务,则删除。
func (r *GeeRegistry) putServer(addr string) {r.mutex.Lock()defer r.mutex.Unlock()s := r.servers[addr]if s == nil {r.servers[addr] = &ServerItem{Addr: addr, start: time.Now()}} else {s.start = time.Now() // if exists, update start time to keep alive}
}func (r *GeeRegistry) aliveServers() []string {r.mutex.Lock()defer r.mutex.Unlock()var alive []stringfor addr, s := range r.servers {if r.timeout == 0 || s.start.Add(r.timeout).After(time.Now()) {alive = append(alive, addr)} else {delete(r.servers, addr)}}sort.Strings(alive)return alive
}

 为了简单,那么rpc客户端通过HTTP去访问注册中心,且所有的有用信息都承载在 HTTP Header 中。

  • Get:返回所有可用的服务列表,通过自定义字段 X-rpc-Servers 承载。
  • Post:添加服务实例或发送心跳,通过自定义字段 X-rpc-Server 承载。
func (r *GeeRegistry) ServeHTTP(w http.ResponseWriter, req *http.Request) {switch req.Method {case "GET":w.Header().Set("X-rpc-Servers", strings.Join(r.aliveServers(), ","))case "POST":addr := req.Header.Get("X-rpc-Servers")if addr == "" {w.WriteHeader(http.StatusInternalServerError)return}r.putServer(addr) //更新保存在注册中心的服务实例default:w.WriteHeader(http.StatusMethodNotAllowed)}
}func (r *GeeRegistry) HandleHTTP(registryPath string) {http.Handle(registryPath, r)
}func HandleHTTP() {DefalultGeeRegister.HandleHTTP(defaultPath)
}

另外,也要提供 Heartbeat 方法,便于服务启动时定时向注册中心发送心跳(也是通过HTTP),默认周期比注册中心设置的过期时间少 1 min。

// only send once
func sendHeartbeat(registryURL, addr string) error {httpClient := &http.Client{Timeout: time.Second * 10}req, _ := http.NewRequest("POST", registryURL, nil)req.Header.Set("X-rpc-Servers", addr)resp, err := httpClient.Do(req)if err != nil {fmt.Println("rpc server: heart beat err:", err)return err}defer resp.Body.Close()return nil
}// Heartbeat send a heartbeat message every once in a while
func Heartbeat(registryURL, addr string, duration time.Duration) {if duration == 0 {duration = defaultTimeout - time.Duration(1)*time.Minute}err := sendHeartbeat(registryURL, addr)go func() {//创建一个定时器t := time.NewTicker(duration)for err == nil {<-t.Cerr = sendHeartbeat(registryURL, addr)}}()
}

3.需要注册中心的服务发现

上一节我们实现了一个不需要注册中心,服务列表由手工维护的服务发现的结构体MultiServersDiscovery。

而现在我们实现了注册中心,那这一节的服务发现就可以继承上一节的,并添加与注册中心相关的细节

type GeeRegistryDiscovery struct {*MultiServerDiscoveryregistryAddr stringtimeout      time.Duration //服务列表的过期时间lastUpdate   time.Time
}const defaultUpdateTimeout = time.Second * 10func NewGeeRegistryDiscovery(registerAddr string, timeout time.Duration) *GeeRegistryDiscovery {if timeout == 0 {timeout = defaultUpdateTimeout}return &GeeRegistryDiscovery{MultiServerDiscovery: NewMultiServerDiscovery(make([]string, 0)),registryAddr:         registerAddr,timeout:              timeout,}
}
  • GeeRegistryDiscovery 嵌套了 MultiServersDiscovery,很多能力可以复用。
  • registryAddr 即注册中心的地址
  • timeout 服务列表的过期时间
  • lastUpdate 是代表最后从注册中心更新服务列表的时间,默认 10s 过期,即 10s 之后,需要从注册中心更新新的列表。

实现 Update 和 Refresh 方法,超时重新获取的逻辑在 Refresh 中实现: 

func (d *GeeRegistryDiscovery) Update(servers []string) error {d.rwMutex.Lock()defer d.rwMutex.Unlock()d.servers = serversd.lastUpdate = time.Now()return nil
}// 刷新,有了注册中心,在客户端每次获取服务实例时候,需要刷新注册中心的保存的服务实例
func (d *GeeRegistryDiscovery) Refresh() error {d.rwMutex.Lock()defer d.rwMutex.Unlock()//注册中心保存的服务实例还没超时,不用更新if d.lastUpdate.Add(d.timeout).After(time.Now()) {return nil}httpClient := http.Client{Timeout: time.Second * 10} //http客户端最好有个超时resp, err := httpClient.Get(d.registryAddr)if err != nil {fmt.Println("rpc registry refresh err:", err)return err}defer resp.Body.Close()servers := strings.Split(resp.Header.Get("X-rpc-Servers"), ",")d.servers = make([]string, 0, len(servers))for _, server := range servers {//返回一个string类型,并将最前面和最后面的ASCII定义的空格去掉,中间的空格不会去掉s := strings.TrimSpace(server)if s != "" {d.servers = append(d.servers, s)}}d.lastUpdate = time.Now()return nil
}

 Get 和 GetAll 与 MultiServersDiscovery 相似,唯一的不同在于,GeeRegistryDiscovery 需要先调用 Refresh 确保服务列表没有过期

func (d *GeeRegistryDiscovery) Get(mode SelectMode) (string, error) {if err := d.Refresh(); err != nil {return "", err}//d.Get(mode) 表示调用的是(GeeRegistryDiscovery).Getreturn d.MultiServerDiscovery.Get(mode) //d.MultiServerDiscovery是调用MultiServerDiscovery的Get()
}func (d *GeeRegistryDiscovery) GetAll() ([]string, error) {if err := d.Refresh(); err != nil {return nil, err}return d.MultiServerDiscovery.GetAll()
}

4.测试

添加函数 startRegistry,之后需要稍微修改 startServer,定期向注册中心发送心跳保活(Heartbeat)。

这里使用sync.WaitGroup是为了等待该操作执行完毕才会往后执行,因为这些函数都是新开协程运行。

func startServer(registryAddr string, wg *sync.WaitGroup) {var myServie Myl, _ := net.Listen("tcp", "localhost:0") //端口是0表示端口随机server := geerpc.NewServer()//这里一定要用&myServie,因为前面Sum方法的接受者是*My;若接受者是My,myServie或者&myServie都可以server.Register(&myServie)registry.Heartbeat(registryAddr, "tcp@"+l.Addr().String(), 0)  //定时发送心跳wg.Done()server.Accept(l)
}func startRegistry(wg *sync.WaitGroup) {l, _ := net.Listen("tcp", "localhost:9999")registry.HandleHTTP()wg.Done()http.Serve(l, nil)
}

接下来,将 call 和 broadcast 的 MultiServersDiscovery 替换为 GeeRegistryDiscovery,不再需要硬编码服务列表。 

这里就重点对比下NewGeeRegistryDiscovery方法和之前的不同之处。

// 调用单个服务实例
func clientCall(registryAddr string) {// d := xclient.NewMultiServerDiscovery([]string{"tcp@" + addr1, "tcp@" + addr2})d := xclient.NewGeeRegistryDiscovery(registryAddr, 0)xc := xclient.NewXClient(d, xclient.RandomSelect, nil)defer xc.Close()var wg sync.WaitGroupfor i := 0; i < 5; i++ {wg.Add(1)go func(i int) {defer wg.Done()var reply int = 1324if err := xc.Call(context.Background(), "My.Sum", &Args{Num1: i, Num2: i * i}, &reply); err != nil {log.Println("call Foo.Sum error:", err)}fmt.Println("reply: ", reply)}(i)}wg.Wait()
}func broadcast(registryAddr string) {// d := xclient.NewMultiServerDiscovery([]string{"tcp@" + addr1, "tcp@" + addr2})d := xclient.NewGeeRegistryDiscovery(registryAddr, 0)xc := xclient.NewXClient(d, xclient.RandomSelect, nil)defer xc.Close()var wg sync.WaitGroupfor i := 0; i < 5; i++ {wg.Add(1)go func(i int) {defer wg.Done()var reply int = 1324if err := xc.Broadcast(context.Background(), "My.Sum", &Args{Num1: i, Num2: i * i}, &reply); err != nil {fmt.Println("Broadcast call Foo.Sum error:", err)}fmt.Println("Broadcast reply: ", reply)ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)defer cancel()var replyTimeout int = 1324if err := xc.Broadcast(ctx, "My.Sleep", &Args{Num1: i, Num2: i * i}, &replyTimeout); err != nil {fmt.Println("Broadcast call Foo.Sum error:", err)}fmt.Println("timeout Broadcast reply: ", replyTimeout)}(i)}wg.Wait()
}

最后是main函数。

确保注册中心启动后,再启动 RPC 服务端,最后客户端远程调用。

func main() {registryAddr := "http://localhost:9999/_rpc_/registry"var wg sync.WaitGroupwg.Add(1)go startRegistry(&wg) //开启注册中心服务wg.Wait()time.Sleep(time.Second)wg.Add(2)go startServer(registryAddr, &wg)go startServer(registryAddr, &wg)wg.Wait()time.Sleep(time.Second)clientCall(registryAddr)broadcast(registryAddr)
}

运行结果:

代码: https://github.com/liwook/Go-projects/tree/main/geerpc/7-registry

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/651927.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

猿媛员的专属春联来咯

我们“因程序汇聚&#xff0c;因猿份相识”&#xff0c;今天来给辛苦了一年的“猿媛员”们送上几幅恶搞对联&#xff0c;为图一笑 &#x1f604; 闲言少叙&#xff0c;上对联 龙行多福 上联&#xff1a;龙龙龙龙龙龙龙 下联&#xff1a;福福福福福福福 形象版 上联&#…

centos 7安装MySQl

本文参考借鉴&#xff1a;https://cloud.tencent.com/developer/article/2353312&#xff0c;非常赞&#xff01; 为了避免权限不足的问题&#xff0c;建议切换至root用户进行安装 1.MySQL的清理与安装 查看是否存在MySQL服务 安装mysql之前&#xff0c;需要先看看要安装系…

【极数系列】Flink搭建入门项目Demo 秒懂Flink开发运行原理(05)

文章目录 引言1.创建mavenx项目2.包结构3.引入pom依赖4.增加log4j2.properties配置5.创建主启动类6.构建打jar包7.flinkUI页面部署 引言 gitee地址&#xff1a;https://gitee.com/shawsongyue/aurora.git 源码直接下载可运行&#xff0c;模块&#xff1a;aurora_flink Flink 版…

phar反序列化漏洞

基础&#xff1a; Phar是一种PHP文件归档格式&#xff0c;它类似于ZIP或JAR文件格式&#xff0c;可以将多个PHP文件打包成一个单独的文件&#xff08;即Phar文件&#xff09;。 打包后的Phar文件可以像普通的PHP文件一样执行&#xff0c;可以包含PHP代码、文本文件、图像等各…

Python爬虫---Scrapy框架---CrawlSpider

CrawlSpider 1. CrawlSpider继承自scrapy.Spider 2. CrawlSpider可以定义规则&#xff0c;再解析html内容的时候&#xff0c;可以根据链接规则提取出指定的链接&#xff0c;然后再向这些链接发送请求&#xff0c;所以&#xff0c;如果有需要跟进链接的需求&#xff0c;意思就是…

Redis实现多种限流算法

一 常见限流算法 1 固定窗口限流 每一个时间段计数器&#xff0c;当计数器达到阈值后拒绝&#xff0c;每过完这个时间段&#xff0c;计数器重置0&#xff0c;重新计数。 优点&#xff1a;实现简单&#xff0c;性能高&#xff1b; 缺点&#xff1a;明显的临界问题&#xff0c…

有手就行!阿里云上3分钟搞定幻兽帕鲁联机服务器搭建

幻兽帕鲁最近在社区呈现了爆火的趋势&#xff0c;在线人数已突破百万级别&#xff0c;官方服务器也开始出现不稳定&#xff0c;卡人闪退的情况。对于有一定财力的小伙伴&#xff0c;搭建一个私人服务器是一个最稳定而舒服的解决方案。 本文萝卜哥将讲解一下如何快速搭建 palwo…

看图说话:Git图谱解读

很多新加入公司的同学在使用Git各类客户端管理代码的过程中对于Git图谱解读不太理解&#xff0c;我们常用的Git客户端是SourceTree&#xff0c;配合P4Merge进行冲突解决基本可以满足日常工作大部分需要。不同的Git客户端工具对图谱展示会有些许差异&#xff0c;以下是SourceTre…

Java如何对OSS存储引擎的Bucket进行创建【OSS学习】

在前面学会了如何开通OSS&#xff0c;对OSS的一些基本操作&#xff0c;接下来记录一下如何通过Java代码通过SDK对OSS存储引擎里面的Bucket存储空间进行创建。 目录 1、先看看OSS&#xff1a; 2、代码编写&#xff1a; 3、运行效果&#xff1a; 1、先看看OSS&#xff1a; 此…

跟着cherno手搓游戏引擎【12】渲染context和首个三角形

渲染上下文&#xff1a; 目的&#xff1a;修改WindowsWindow的结构&#xff0c;把glad抽离出来 WindowsWindow.h:新建m_Context #pragma once #include "YOTO/Window.h" #include <YOTO/Renderer/GraphicsContext.h> #include<GLFW/glfw3.h> #include…

多种协议转IEC104网关BE110

随着电力系统信息化建设和数字化转型的进程不断加速&#xff0c;对电力能源的智能化需求也日趋增强。健全稳定的智慧电力系统能够为工业生产、基础设施建设以及国防建设提供稳定的能源支持。在此背景下&#xff0c;高性能的工业电力数据传输解决方案——协议转换网关应运而生&a…

03 Redis之命令(基本命令+Key命令+String型Value命令与应用场景)

Redis 根据命令所操作对象的不同&#xff0c;可以分为三大类&#xff1a;对 Redis 进行基础性操作的命令&#xff0c;对 Key 的操作命令&#xff0c;对 Value 的操作命令。 3.1 Redis 基本命令 一些可选项对大小写敏感, 所以应尽量将redis的所有命令大写输入 首先通过 redis-…

Pyecharts水球图全面指南:参数解读、代码实战与高级应用【第41篇—python:Pyecharts水球图】

文章目录 Pyecharts水球图绘制与交互的完整教程1. 简介2. 安装Pyecharts3. 基础水球图4. 自定义水球图样式5. 多水球图展示6. 水球图的动态效果7. 水球图与其他图表的组合8. 数据动态更新与实时展示9. 水球图的交互功能10. 导出水球图为图片或PDF11. 移动端适配 结语 Pyecharts…

S275 4G网络IO模块:智能酒店的理想选择

行业背景 随着物联网技术的发展&#xff0c;酒店服务也变得更加“智能”——自动灯光效果、室内温湿度控制、各种人性化操作等贴心服务&#xff0c;带给顾客真正的宾至如归之感。 同时&#xff0c;智慧酒店更为管理者提供了高效的管理手段&#xff0c;将酒店物耗、能耗、人员…

51-17 视频理解串讲— MViT 论文精读

继TimeSformer模型之后&#xff0c;咱们再介绍两篇来自Facebook AI的论文&#xff0c;即Multiscale Vision Transformers以及改进版MViTv2: Improved Multiscale Vision Transformers for Classification and Detection。 由于本司大模型组最近组织阅读的论文较多&#xff0c;…

解锁一些SQL注入的姿势

昨天课堂上布置了要去看一些sql注入的案例&#xff0c;以下是我的心得&#xff1a; ​​​​​​​ ​​​​​​​ ​​​​​​​ 1.新方法 打了sqli的前十关&#xff0c;我发现一般都是联合查询&#xff0c;但是有没有不是联合查询的方法呢&#xf…

go 实现暴力破解数独

一切罪恶的来源是昨晚睡前玩了一把数独&#xff0c;找虐的选了个最难的模式&#xff0c;做了一个多小时才做完&#xff0c;然后就睡不着了..........程序员不能受这委屈&#xff0c;今天咋样也得把这玩意儿破解了 破解思路&#xff08;暴力破解加深度遍历&#xff09; 把数独…

企业计算机中了360后缀勒索病毒怎么办,360后缀勒索病毒解密流程

企业计算机服务器在生产运营过程中发挥着巨大作用&#xff0c;为企业带来极大便利&#xff0c;存储着企业的重要核心数据&#xff0c;但同时也成为众多勒索病毒攻击的目标。近期&#xff0c;云天数据恢复中心接到很多企业的求助&#xff0c;企业的计算机服务器遭到了360后缀勒索…

大数据学习之Redis、从零基础到入门(一)

目录 一、Redis入门概述 1. 是什么&#xff1f; 官方解释&#xff1a; 2. 能干嘛&#xff1f; 2.1 主流功能与应用 2.1.1分布式缓存 2.1.2内存存储和持久化(RDBAOF) 2.1.3高可用架构搭建 2.1.4缓存穿透、击穿、雪崩 2.1.5分布式锁 2.1.6队列 2.2 总体功能概括 2.3…

【C++干货铺】C++中的IO流和文件操作

个人主页点击直达&#xff1a;小白不是程序媛 C系列专栏&#xff1a;C干货铺 代码仓库&#xff1a;Gitee 目录 C语言的输入输出 流是什么&#xff1f; C的IO流 C标准IO流 C文件IO流 文本文件读写 二进制文件的读写 stringstream的简单介绍 将数值类型数据格式化为字…