基于 Redis 发布订阅实现服务注册与发现

写在前面

其实很少有公司会使用 Redis 来实现服务注册与发现,通常是ETCD、NACOS、ZOOKEEPER等等,但是也不妨碍我们了解。本文会先介绍 Redis 的发布/订阅模式,接着基于这个模式实现服务注册与发现。

Redis发布订阅流程图:
在这里插入图片描述

Redis 发布订阅

1. 简介

Redis的发布订阅功能主要由PUBLISHSUBSCRIBEPSUBSCRIBE 等命令组成的。

通过执行 SUBSCRIBE 命令,客户端可以订阅一个或多个频道,从而成为这个频道的订阅者。
在这里插入图片描述

每当有其他客户端向这个被订阅的频道发送消息的时候,频道的所有订阅者都会收到这条消息。

在这里插入图片描述
当然,客户端还可以通过PSUBSCRIBE订阅一个或多个模式,从而成为这些模式的订阅者,也就是模糊匹配

2. 订阅

每当一个客户端执行SUBSCRIBE命令订阅某个或某些频道的时候,这个客户端与被订阅者之间就会建立起一种订阅关系。而Redis会将这种订阅关系保存到pubsub_channels 这个字典中,这个字典的键是某个被订阅的频道,而值是一个链表,这个链表记录了所有订阅这个频道的客户端

在这里插入图片描述
每当有客户端执行了SUBSCRIBE命令订阅某个或某些频道的时候,服务器都会将客户端与被订阅的频道在 pubsub_channels字典中进行关联。

3. 退订

如果进行退订UNSUBSCRIBE,那么服务器会从pubsub_channels中接触客户端与被退订频道之间的关联。当这个key中,已经没有订阅者,那么会将这个key进行删除。例如下面的client7
在这里插入图片描述

4. 发布消息

当一个Redis客户端执行 PUBLISH channel message 命令将消息 message 发送给channel的时候,将消息发送给channel频道的所有订阅者(本文不讨论pattern模式)

服务注册与发现

我们了解完redis的发布订阅流程之后,我们来基于这个发布订阅来实现一个服务注册与发现的功能。

Redis服务发现与注册流程图:
在这里插入图片描述

1. 对象定义

redis服务发现与注册的结构体

type RedisRegistryService struct {config *RedisConfig // the config about rediscli *redis.Client // client for redisrwLock *sync.RWMutex // rwLock lock groupList when update service instance// vgroupMapping to store the cluster group// eg: map[cluster_name_key]cluster_namevgroupMapping map[string]string// groupList store all addresses under this cluster// eg: map[cluster_name][]{service_instance1,service_instance2...}groupList map[string][]*ServiceInstancectx context.Context
}

订阅的消息内容,为key 以及 value ,而key就是服务的name,value就是服务的具体地址

type NotifyMessage struct {// key = registry.redis.${cluster}_ip:portKey   string `json:"key"`Value string `json:"value"`
}

2. 对象加载

新建一个redis服务注册与发现对象,并且在创建的这个对象的时候,我们会做两件事情

  1. 将redis中所已存在的key都load一次,存到本地缓存中。
  2. 开启一些协程进行发布订阅,不断监听上游的注册消息
func newRedisRegisterService(config *ServiceConfig, redisConfig *RedisConfig) RegistryService {if redisConfig == nil {log.Fatalf("redis config is nil")panic("redis config is nil")}cfg := &redis.Options{Addr:     redisConfig.ServerAddr,Username: redisConfig.Username,Password: redisConfig.Password,DB:       redisConfig.DB,}cli := redis.NewClient(cfg)vgroupMapping := config.VgroupMappinggroupList := make(map[string][]*ServiceInstance)redisRegistryService := &RedisRegistryService{config:        redisConfig,cli:           cli,ctx:           context.Background(),rwLock:        &sync.RWMutex{},vgroupMapping: vgroupMapping,groupList:     groupList,}// loading all server at init timeredisRegistryService.load()// subscribe at real timego redisRegistryService.subscribe()return redisRegistryService
}

3. 服务加载

load 函数:将所有 key 都 scan 出来,再遍历所有的key,拿到对应的value,进行一次初始化操作,加载到本地缓存中

func (s *RedisRegistryService) load() {// find all the server list redis register by redisFileKeyPrefixkeys, _, err := s.cli.Scan(s.ctx, 0, fmt.Sprintf("%s*", redisFileKeyPrefix), 0).Result()if err != nil {log.Errorf("RedisRegistryService-Scan-Key-Error:%s", err)return}for _, key := range keys {clusterName := s.getClusterNameByKey(key)val, err := s.cli.Get(s.ctx, key).Result()if err != nil {log.Errorf("RedisRegistryService-Get-Key:%s, Err:%s", key, err)continue}ins, err := s.getServerInstance(val)if err != nil {log.Errorf("RedisRegistryService-getServerInstance-val:%s, Err:%s", val, err)continue}// put server instance list in group lists.rwLock.Lock()if s.groupList[clusterName] == nil {s.groupList[clusterName] = make([]*ServiceInstance, 0)}s.groupList[clusterName] = append(s.groupList[clusterName], ins)s.rwLock.Unlock()}
}

4.服务发现

通过 key 从 vgroupMapping 找到对应的 value

func (s *RedisRegistryService) Lookup(key string) (r []*ServiceInstance, err error) {s.rwLock.RLock()defer s.rwLock.RUnlock()cluster := s.vgroupMapping[key]if cluster == "" {err = fmt.Errorf("cluster doesnt exit")return}r = s.groupList[cluster]return
}

5. 服务注册

  1. key 和 value set到 redis 中
  2. key 和 value 通过 Channel 发布出去
  3. 另外开启一个协程将进行保活
func (s *RedisRegistryService) register(key, value string) (err error) {_, err = s.cli.HSet(s.ctx, key, value).Result()if err != nil {return}msg := &NotifyMessage{Key:   key,Value: value,}s.cli.Publish(s.ctx, redisRegisterChannel, msg)go func() {s.keepAlive(s.ctx, key)}()return
}

6. 服务订阅

订阅 Subscribe Channel 监听上游服务,并对服务的 key 和 value 进行更新操作。 注意这里对map进行读写的时候要加上读写锁,防止线程不安全。

func (s *RedisRegistryService) subscribe() {go func() {msgs := s.cli.Subscribe(s.ctx, redisRegisterChannel).Channel()for msg := range msgs {var data *NotifyMessageerr := json.Unmarshal([]byte(msg.Payload), &data)if err != nil {log.Errorf("RedisRegistryService-subscribe-Subscribe-Err:%+v", err)continue}// get cluster name by keyclusterName := s.getClusterNameByKey(data.Key)ins, err := s.getServerInstance(data.Value)if err != nil {log.Errorf("RedisRegistryService-subscribe-getServerInstance-value:%s, Err:%s", data.Value, err)continue}s.rwLock.Lock()if s.groupList[clusterName] == nil {s.groupList[clusterName] = make([]*ServiceInstance, 0)}s.groupList[clusterName] = append(s.groupList[clusterName], ins)s.rwLock.Unlock()}}()return
}

注意一点:redis的发布订阅的消息是不存储到日志的,也没有ack确认。 所以如果发生的消息的丢失,就需要业务自己承担了,比如自己实现一个ack,发送的时候进行消息日志的存储。

完整代码:
https://github.com/CocaineCong/incubator-seata-go/blob/discovery/redis/pkg/discovery/redis.go

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/829616.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

云备份项目->配置环境

升级gcc到7.3版本 sudo yum install centos-release-scl-rh centos-release-scl sudo yum install devtoolset-7-gcc devtoolset-7-gcc-c source /opt/rh/devtoolset-7/enable echo "source /opt/rh/devtoolset-7/enable" >> ~/.bashrc 安装Jsoncpp库 sud…

MyBatis面试题总结,详细(2024最新)

面试必须要看看 1、MyBatis 中的一级缓存和二级缓存是什么?它们的区别是什么? MyBatis 中的一级缓存是指 SqlSession 对象内部的缓存,它是默认开启的。一级缓存的生命周期是与 SqlSession 对象绑定的,当 SqlSession 关闭时&#…

Linux--进程控制(2)--进程的程序替换(夺舍)

目录 进程的程序替换 0.相关函数 1.先看现象 2.解释原理 3.将代码改成多进程版 4.使用其它的替换函数,并且认识函数参数的含义 5.其它 进程的程序替换 0.相关函数 关于进程替换我们需要了解的6个函数: 函数解释: 这些函数如果调用成功则…

通过filebeat实现对docker服务的通用日志收集

平台 依赖 linux docker docker-compose 或者 docker compose 镜像 docker.elastic.co/beats/filebeat:8.12.2 docker.elastic.co/beats/kibana:8.12.2 docker.elastic.co/beats/elasticsearch:8.12.2 正文 背景 对于有自建机房的公司来说,如果公司的运维技术…

Stable Diffusion使用ControlNet:IP-Adapter实现图片风格迁移

IP-Adapter 全称是 Text Compatible Image Prompt Adapter for Text-to-Image Diffusion Models(文本到图像扩散模型的文本兼容图像提示适配器),是腾讯研究院出品的一个新的ControlNet模型,旨在使预训练的文本到图像扩散模型能够生…

【06】JAVASE-数组讲解【从零开始学JAVA】

Java零基础系列课程-JavaSE基础篇 Lecture:波哥 Java 是第一大编程语言和开发平台。它有助于企业降低成本、缩短开发周期、推动创新以及改善应用服务。如今全球有数百万开发人员运行着超过 51 亿个 Java 虚拟机,Java 仍是企业和开发人员的首选开发平台。…

最全GPTs使用教程+Prompt预设词教程

使用指南 直接复制使用 可以前往已经添加好Prompt预设的AI系统测试使用(可自定义添加使用) https://ai.sparkaigf.com 现已支持GPTs 雅思写作考官 我希望你假定自己是雅思写作考官,根据雅思评判标准,按我给你的雅思考题和对应…

从零入门区块链和比特币(第三期)

欢迎来到我的区块链与比特币入门指南!如果你对区块链和比特币感兴趣,但不知道从何开始,那么你来对地方了。本博客将为你提供一个简明扼要的介绍,帮助你了解这个领域的基础知识,并引导你进一步探索这个激动人心的领域。…

一些基础知识FK

1. 群体稳定性指数PSI 通过 PSI(Population Stability Index) 指标,可以得到不同样本下,模型在各分数段分布的稳定性。用于衡量两个群体(比如两个时间点、两个子群体等)之间稳定性的指标。通常PSI被用于评估信用风险模型、预测模型等在不同时间点或不同群…

安装配置Maven(idea里面配置)

放在这个路径下(如果需要可以免费发给你,dd我就好了) D:\IearnSoftware\maven\apache-maven-3.6.1-bin.zip(我自己的路径下面,防止忘记) 1.首先测试maven在不在,配置对不对 mvn -v 这样就是成…

STM32HAL库++ESP8266+cJSON连接阿里云物联网平台

实验使用资源:正点原子F1 USART1:PA9P、A10(串口打印调试) USART3:PB10、PB11(WiFi模块) DHT11:PG11(采集数据、上报) LED0、1:PB5、PE5&#xff…

【微信小程序调用百度API实现图像识别实战】-前后端加强版

前言:基于前面两篇图像识别项目实战文章进行了改造升级。 第一篇 入门【微信小程序调用百度API实现图像识别功能】----项目实战 第二篇 前后端结合 【微信小程序调用百度API实现图像识别实战】----前后端分离 这一篇主要讲述的是在第二篇的基础上新增意见反馈功能&a…

第72天:漏洞发现-Web框架中间件联动GobyAfrogXrayAwvsVulmap

案例一:某 APP-Web 扫描-常规&联动-Burp&Awvs&Xray Acunetix 一款商业的 Web 漏洞扫描程序,它可以检查 Web 应用程序中的漏洞,如 SQL 注入、跨站脚本攻击、身份验证页上的弱口令长度等。它拥有一个操作方便的图形用户界 面&#…

实验8 NAT配置

实验8 NAT配置 一、 原理描述二、 实验目的三、 实验内容1.实验场景2.实验要求 四、 实验配置五、 实验步骤2.静态NAT配置3.NAT Outbound配置4.NAT Easy-IP配置 一、 原理描述 2019年11月26日,全球43亿个IPv4地址正式耗尽,这意味着没有更多的IPv4地址可…

Taro引入echarts【兼容多端小程序(飞书/微信/支付宝小程序)】

近期接到公司新需求,开发飞书小程序,并且原型中含有大量的图表,本想使用飞书内置图表组件 —— chart-space,但官方表示已经停止维护了,无奈之下,只能另寻他路,于是乎,图表之王&…

【Godot4.2】自定义Todo清单类 - myTodoList

概述 在写myList类的时候,就想到可以写一个类似的Todo清单类。 基础思路 本质还是在内部维护一个数组,在其基础上进行增删改查操作的封装为了方便存储数据,编写一个自定义内置类TodoItem,内部数组就变成了Array[TodoItem]类型的…

【Flutter】GetX

前言 状态管理 / 路由管理 / 依赖管理 这三部分之间存在联系 参考文章 建议看官网文章,很详细 ,pub.dev搜索get pub.dev的文档 状态管理文章相关链接 状态管理 案例 实现一个计算器,运用GetX去管理它 构建界面 构建一个计算器界面 …

GateWay具体的使用之全链路跟踪TraceId日志

1.创建全局过滤器,在请求头上带入traceId参数,穿透到下游服务. package com.by.filter;import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.ObjectUtil; import cn.hutool.jwt.JWTValidator;…

数据结构1(初):时间复杂度和空间复杂度

目录 1、引言 1.1、什么是数据结构? 1.2、什么是算法? 1.3、如何学好数据结构和算法 ? 2、算法效率 2.1、如何衡量一个算法的好坏 2.2、算法的复杂度 3、时间复杂度 3.1、时间复杂度的概念 3.2、大O的渐进表示法 3.3、常见时间复杂…

C语言自定义类型【联合体与枚举】

文章目录 1.联合体1.1联合体的声明1.2联合体的特点1.3联合体的大小计算联合体的使用案例 2.枚举2.1枚举类型的声明2.2枚举类型的优点(为什么使用枚举)2.3枚举类型的使用 结语 1.联合体 1.1联合体的声明 和结构体一样,联合体也是由一个或多个成员构成,同…