Redis发布与订阅

什么是发布与订阅

: redis发布订阅是一种消息通信通信模式,由发送者(pub)发送消息,订阅者(sub)接收消息。

如下图client2、4、5就是订阅着,订阅了channel1的消息。

在这里插入图片描述

channel1要发送消息时,这几个订阅者都会实时收到消息。

在这里插入图片描述

发布订阅的方式有哪几种知道吗?

答: 有两种,一种基于频道的,还有一种是基于匹配的模式:

基于频道的订阅案例

指令如下所示,可以看到订阅者可以订阅多个频道

subscribe channel [channel ...]

所以我们开启一个redis客户端,订阅一个channel:sport的频道

# 客户端1 订阅 channel:sport
127.0.0.1:6379> SUBSCRIBE channel:sport
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel:sport"
3) (integer) 1

发布消息的指令格式为:

publish channel message

此时,我们再开启一个redis客户端,发布一条消息到channel:sport的频道

# 另一个客户端发送消息
127.0.0.1:6379> PUBLISH channel:sport "this is why we play"
(integer) 1
127.0.0.1:6379>

此时刚刚订阅消息的redis客户端就会实时的收到这条消息

127.0.0.1:6379> SUBSCRIBE channel:sport
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel:sport"
3) (integer) 1# 如下便是实时收到的消息内容
1) "message"
2) "channel:sport" #频道
3) "this is why we play" # 消息内容

基于匹配模式的发布与订阅示例

有时候我们会订阅多个频道,我们不可能每次都去手动增加订阅的频道,例如我们当前订阅频道有:c1、c2、c3、c4、c5、c6、c7、c8。将来还可能出现c9等情况。我们不可能实时去添加订阅的频道。
观察上面的频道我们发现频道都是以c开头,后续的数字不断变化,所以我们完全可以使用模式匹配来实现频道订阅。

模式订阅和取消的命令为

psubscribe pattern [pattern...]
punsubscribe [pattern [pattern ...]]

关于parttern常见的匹配符有

1. *:表示任意占位符,例如c*,可以匹配c、c1、c111
2. ?*:匹配一个及以上个占位符
3. ?:表示匹配一个占位符

我们希望订阅c1-c9的频道基于模式匹配我们就能够做到这一点

我们首先开启一个客户端,使用模式匹配发起订阅

# 订阅匹配cxx相关的模式
PSUBSCRIBE c?*

然后我们在开启另一个客户端,发送消息到c1频道

127.0.0.1:6379> PUBLISH c1 "this is c1 message"
(integer) 1
127.0.0.1:6379>

刚刚订阅的客户端就会收到消息

127.0.0.1:6379> PSUBSCRIBE c?*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "c?*"
3) (integer) 1
1) "pmessage"
2) "c?*"
3) "c1"
4) "this is c1 message"

注意:当你订阅PSUBSCRIBE c?* c1订阅时,若另一个客户端发送消息到c1你会收到两条消息(原因会在后文源码解析时补充)。

如下便是PSUBSCRIBE c?* c1的收到PUBLISH c1 "this is c1 message"的消息内容

127.0.0.1:6379> PSUBSCRIBE c?* c1
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "c?*"
3) (integer) 1
1) "psubscribe"
2) "c1"
3) (integer) 2
1) "pmessage"
2) "c1"
3) "c1"
4) "this is c1 message"
1) "pmessage"
2) "c?*"
3) "c1"
4) "this is c1 message"

发布订阅的常见的使用场景和优缺点了解过嘛?

答: 使用场景如:聊天室、公告牌等需要实现消息解耦的场景都可以使用消息订阅发布,如下便是一个基于redis实现视频订阅的管理系统。

在这里插入图片描述

而优缺点也很明显:

缺点: 相较于KafkaRocketMQRedis的实现略显粗糙,无法实现消息堆积与回溯
优点: 实现简单,若不需要消息回溯等需求使用redis绰绰有余

redis是如何实现频道、模式发布与订阅的知道嘛?

答: 我们可以从源码的角度分析分析频道模式的发布与订阅:

频道发布订阅是基于字典这个数据结果,以频道作为key,用链表作为value存储所有订阅该频道的当订阅者,如下图,当channel1有消息时,程序就通过channel1订阅链,逐个给订阅者发送消息。

在这里插入图片描述

当如果客户端 client10086 执行命令 SUBSCRIBE channel1 channel2 channel3时,程序就会将这个订阅者挂到对应三个频道中。
同理,订阅者通过UNSUBSCRIBE 退订频道,程序也是通过pubsub_channels找到对应的链表将这个节点移除。

在这里插入图片描述

说完了频道的,我们再来说说redis匹配模式发布与订阅:

与频道订阅有所不同,匹配模式订阅数据结构如下所示,可以看到client123、client256订阅了tweet.shop.* ,订阅者的数据结构是一个记录客户端以及客户端订阅的模式频道的结构体,从源码我们可以看到这个结构体是这样的:

typedef struct pubsubPattern {redisClient *client; //指向订阅者robj *pattern;//指向匹配模式
} pubsubPattern;

在这里插入图片描述

从上图我们也可以看到redisServer 就是记录一个个客户端匹配模式的结构体,从源码我们就可以看到,它包含了一个pubsub_patterns的指针用于记录每个模式订阅者

struct redisServer {// ...list *pubsub_patterns;// ...
};

例如client10086 订阅broadcast.live.*,程序就会在redisServer所指向的链表中挂一个新的pubsub_patterns ,如下图所示:

在这里插入图片描述

基于spring boot集成redis演示发布与订阅

需求

我们希望一个用户订阅channel:sport,一个用户订阅channel:stock,而另一个用户两个都订阅

实现

首先pom配置引入相关依赖

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>

spring boot的application.yml配置redis配置

spring:redis:host: 127.0.0.1port: 6379password:database: 0 

接收redis消息行为接口

package com.example.demo;import org.springframework.stereotype.Component;/*** 发布消息接口类*/
@Component
public interface RedisMsg {void receiveMessage(String message);
}

三个订阅者

/*** 订阅所有channel:*的消息*/
public class RedisSubPChannel implements RedisMsg {public void receiveMessage(String message) {System.out.println("RedisSubPChannel收到消息消息" + message);}
}
package com.example.demo;/*** 订阅所有channel:sport最新消息*/
public class RedisSubSport implements RedisMsg{public void receiveMessage(String message){System.out.println("RedisSubSport获取体育频道最新消息:"+message);}
}
package com.example.demo;/*** 订阅所有channel:stock消息*/
public class RedisSubStock implements RedisMsg {@Overridepublic void receiveMessage(String message) {System.out.println("RedisSubStock获取channel:stock最新消息:"+message);}
}

redis配置类

@Configuration
@EnableCaching
public class RedisConfig{/*** Redis消息监听器容器* @param connectionFactory* @return*/@BeanRedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);//订阅三个频道container.addMessageListener(listenerAdapter(new RedisSubSport()),new PatternTopic("channel:sport"));container.addMessageListener(listenerAdapter(new RedisSubStock()),new PatternTopic("channel:stock"));container.addMessageListener(listenerAdapter(new RedisSubPChannel()),new PatternTopic("channel:*"));return container;}/*** 配置消息接收处理类* @param redisMsg  自定义消息接收类* @return*/@Bean()@Scope("prototype")MessageListenerAdapter listenerAdapter(RedisMsg redisMsg) {//这个地方 是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“receiveMessage”//也有好几个重载方法,这边默认调用处理器的方法 叫handleMessage 可以自己到源码里面看return new MessageListenerAdapter(redisMsg, "receiveMessage");//注意2个通道调用的方法都要为receiveMessage}}

消息发送者

//定时器
@EnableScheduling
@Component
public class SenderController {@Autowiredprivate StringRedisTemplate stringRedisTemplate;//向redis消息队列index通道发布消息@Scheduled(fixedRate = 2000)public void sendMessage() {stringRedisTemplate.convertAndSend("channel:stock", "股票涨了"+Math.random()+"个百分点");stringRedisTemplate.convertAndSend("channel:sport",  "湖人队 No."+((int)(Math.random()*100)+1)+"选手得分");}
}

运行结果

RedisSubStock获取channel:stock最新消息:股票涨了0.46015187165884985个百分点
RedisSubPChannel收到消息消息股票涨了0.46015187165884985个百分点
RedisSubSport获取体育频道最新消息:湖人队 No.40选手得分
RedisSubPChannel收到消息消息湖人队 No.40选手得分
RedisSubStock获取channel:stock最新消息:股票涨了0.5293839028647649个百分点
RedisSubPChannel收到消息消息股票涨了0.5293839028647649个百分点
RedisSubSport获取体育频道最新消息:湖人队 No.26选手得分
RedisSubPChannel收到消息消息湖人队 No.26选手得分
RedisSubPChannel收到消息消息股票涨了0.6779431428770823个百分点
RedisSubStock获取channel:stock最新消息:股票涨了0.6779431428770823个百分点
RedisSubSport获取体育频道最新消息:湖人队 No.35选手得分
RedisSubPChannel收到消息消息湖人队 No.35选手得分
RedisSubStock获取channel:stock最新消息:股票涨了0.40332150203704个百分点
RedisSubPChannel收到消息消息股票涨了0.40332150203704个百分点
RedisSubSport获取体育频道最新消息:湖人队 No.14选手得分
RedisSubPChannel收到消息消息湖人队 No.14选手得分
RedisSubStock获取channel:stock最新消息:股票涨了0.8407461728769783个百分点
RedisSubPChannel收到消息消息股票涨了0.8407461728769783个百分点
RedisSubSport获取体育频道最新消息:湖人队 No.25选手得分
RedisSubPChannel收到消息消息湖人队 No.25选手得分

参考文献

Redis开发与运维

springboot入门–springboot集成redis实现消息发布订阅模式-双通道

Redis进阶 - 消息传递:发布订阅模式详解

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/230292.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++ STL泛型算法

泛型算法 <algorithm>定义了大约 80 个标准算法。 它们操作由一对迭代器定义的&#xff08;输入&#xff09;序列或单一迭代器定义的&#xff08;输出&#xff09;序列。 当对两个序列进行拷贝、比较操作时&#xff0c;第一个序列由一对迭代器[b,e)表示&#xff0c;但第…

移动零算法(leetcode第283题)

题目描述&#xff1a; 给定一个数组 nums&#xff0c;编写一个函数将所有 0 移动到数组的末尾&#xff0c;同时保持非零元素的相对顺序。请注意 &#xff0c;必须在不复制数组的情况下原地对数组进行操作。示例 1:输入: nums [0,1,0,3,12] 输出: [1,3,12,0,0] 示例 2:输入: n…

用uniapp写一个点击左侧可以滑动的menu

完成后的图片&#xff08;点击左侧右边或滑动&#xff0c;滑动右边左侧的选中也会变化&#xff09;&#xff1a; 数据js &#xff08;classifyData&#xff09;&#xff1a; export default[{"name": "女装","foods": [{"name": &q…

消息幂等:如何保证消息不被重复消费?

应用的幂等是在分布式系统设计时必须要考虑的一个方面&#xff0c;如果对幂等没有额外的考虑&#xff0c;那么在消息失败重新投递&#xff0c;或者远程服务重试时&#xff0c;可能会出现许多诡异的问题。本文一起来看一下&#xff0c;在消息队列应用中&#xff0c;如何处理因为…

命名之美:探索Java的标识符与命名规范

目录 ​编辑 前言 一、Java关键字&#xff1a; class&#xff1a; public、private、protected&#xff1a; static&#xff1a; final&#xff1a; void&#xff1a; int、double、char、boolean&#xff1a; if、else、switch&#xff1a; for、while、do&#xf…

01到底应该怎么理解“平均负载”

1、如何了解系统的负载情况&#xff1f; 每次发现系统变慢时&#xff0c; 我们通常做的第⼀件事&#xff0c; 就是执⾏top或者uptime命令&#xff0c; 来了解系统的负载情况。 ⽐如像下⾯这样&#xff0c; 我在命令⾏⾥输⼊了uptime命令&#xff0c; 系统也随即给出了结果。 …

微服务组件OpenFeign的学习

OpenFeign 添加依赖OpenFeign的简单使用OpenFeign日志配置OpenFeign超时时间配置 添加依赖 <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-openfeign</artifactId></dependency>OpenFeign的…

思码逸关钦杰:聊聊研效管理中的数据操纵

3月25日&#xff0c;思码逸咨询总监、研发过程提效专家关钦杰在 QECon 质效城市论坛【深圳站】分享了主题为《聊聊研效管理中的数据操纵》的演讲。 以下内容根据关钦杰老师分享内容整理&#xff1a; 在生活中&#xff0c;当我们去描述客观事实的时候&#xff0c;我们经常要用…

【Source Insight4.0】解决注释中文乱码

本来用的好好的&#xff0c;结果今天创建一个新的项目就出现注释中文乱码&#xff01;&#xff01;&#xff01; 然后上网查找说要修改为【Default encoding” &#xff1a;改成System Default(Windows ANSI) 或者Chinese Simplified(GB2312)】但是我的并没有效果。 最后是选…

Spring Boot Logging中文文档

本文为官方文档直译版本。原文链接 Spring Boot Logging中文文档 引言日志格式控制台输出彩色输出 文件输出文件轮转日志级别日志组使用日志关机钩子自定义日志配置Logback 扩展特定配置文件的配置环境属性 Log4j2 扩展特定配置文件的配置环境属性查找Log4j2 系统属性 引言 Sp…

Frida05 - 高级API用法

参考文档 https://api-caller.com/2019/03/30/frida-note/ https://frida.re/docs/javascript-api/#frida 数组打印 测试代码&#xff1a; private static class Bean {String a;int b;float c; }private void test() {Bean[] beans new Bean[3];beans[0] new Bean();be…

深度学习笔记_6经典预训练网络LeNet-18解决FashionMNIST数据集

1、 调用模型库&#xff0c;定义参数&#xff0c;做数据预处理 import numpy as np import torch from torchvision.datasets import FashionMNIST import torchvision.transforms as transforms from torch.utils.data import DataLoader import torch.nn.functional as F im…

Redis——Redis常用命令

Redis提供了丰富的命令&#xff0c;可以对数据库和各种数据类型进行操作&#xff0c;这些命令可以在Windows和Linux中使用。 1、键值相关命令 1.1、KEYS KEYS用于返回满足pattern的所有key&#xff0c;pattern支持以下通配符&#xff1a; *&#xff1a;匹配任意字符。&…

Python教程81:函数的位置参数、默认参数、动态参数、关键字参数(入门必看)

1.形式参数&#xff08;Formal Parameters&#xff09;和实际参数&#xff08;Actual Parameters&#xff09;是函数或方法定义和调用过程中的两个重要概念。举个例子&#xff0c;在下面的greet函数中&#xff0c;当我们调用greet(“李白”)时&#xff0c;"李白"就是…

electron这样使用更安全

背景&#xff1a; electron大家平时为了方便使用&#xff0c;或是一些网上demo的引导&#xff0c;会让渲染进程的业务界面支持直接使用nodejs&#xff0c;这种开发方式有一定的安全隐患&#xff0c;如果业务界面因为xss之类的漏洞被注入其他代码&#xff0c;危害非常大&#x…

Spring之容器:IOC(3)

学习的最大理由是想摆脱平庸&#xff0c;早一天就多一份人生的精彩&#xff1b;迟一天就多一天平庸的困扰。各位小伙伴&#xff0c;如果您&#xff1a; 想系统/深入学习某技术知识点… 一个人摸索学习很难坚持&#xff0c;想组团高效学习… 想写博客但无从下手&#xff0c;急需…

华为云CodeArts Repo常见问答汇总

1.【Repo】codearts Repo最大支持上传文件大小 答&#xff1a;参考链接 https://support.huaweicloud.com/productdesc-codehub/codehub_pdtd_0005.html • 单文件上传大小限制&#xff08;评论中上传附件&#xff09;<50MB。 • 单文件上传大小限制&#xff08;代码…

某联合产权交易所持续购买监控易产品的维保服务,提升IT运维保障能力

在信息化时代&#xff0c;企业信息化的程度已经成为影响其核心竞争力的重要因素。某联合产权交易所&#xff08;以下简称“交易所”&#xff09;作为行业领导者&#xff0c;一直以来都积极推进信息化建设&#xff0c;致力于提升运维管理水平&#xff0c;以适应日益激烈的市场竞…

Rust 嵌入式开发

Rust 进行嵌入式开发: https://xxchang.github.io/book/intro/index.html # 列出所有目标平台 rustup target list# 安装目标平台工具链 rustup target add thumbv7m-none-eabi# 创建工程 cargo new demo && cd demo cargo add cortex-m-rt cargo add panic-halt carg…

二十九、获取文件属性及相关信息

二十九、获取文件属性及相关信息QFileInfo QFileInfo 提供有关文件在文件系统中的名称 位置 &#xff08;路径&#xff09;、访问权限及它是目录还是符号链接、等信息。文件的大小、最后修改/读取时间也是可用的。QFileInfo 也可以被用于获取信息有关 Qt resource . QFileInf…