redis发布订阅c接口_Redis 发布/订阅机制原理分析

序:使用订阅发布功能的时候想查一下客户端是如何接收消息的(客户端订阅了频道之后也会注册一个回调函数,服务端publish消息之后回调函数会获取到消息。这块没看到太多内容只有C++的源码),无意中查到这篇博客写的简单明了。转一下也做一个记录。

这些命令被广泛用于构建即时通信应用,比如网络聊天室(chatroom)和实时广播、实时提醒等。

本文通过分析 Redis 源码里的 pubsub.c 文件,了解发布和订阅机制的底层实现,籍此加深对 Redis 的理解。

订阅、发布和退订

在开始研究源码之前,不妨先来回顾一下几个相关命令的使用方式。

PUBLISH 命令用于向给定的频道发送信息,返回值为接收到信息的订阅者数量:

redis> PUBLISH treehole "top secret here ..."

(integer) 0

redis> PUBLISH chatroom "hi?"

(integer) 1

SUBSCRIBE 命令订阅给定的一个或多个频道:

redis> SUBSCRIBE chatroom

Reading messages... (press Ctrl-C to quit)

1) "subscribe" # 订阅反馈

2) "chatroom" # 订阅的频道

3) (integer) 1 # 目前客户端已订阅频道/模式的数量

1) "message" # 信息

2) "chatroom" # 发送信息的频道

3) "hi?" # 信息内容

SUBSCRIBE 的返回值当中, 1) 为 subscribe 的是订阅的反馈信息,而 1) 为 message 的则是订阅的频道所发送的信息。

SUBSCRIBE 还可以订阅多个频道,这样一来它接收到的信息就可能来自多个频道:

redis> SUBSCRIBE chatroom talk-to-jack

Reading messages... (press Ctrl-C to quit)

1) "subscribe" # 订阅 chatroom 的反馈

2) "chatroom"

3) (integer) 1

1) "subscribe" # 订阅 talk-to-jack 的反馈

2) "talk-to-jack"

3) (integer) 2

1) "message" # 来自 chatroom 的消息

2) "chatroom"

3) "yahoo"

1) "message" # 来自 talk-to-peter 的消息

2) "talk-to-jack"

3) "Goodmorning, peter."

PSUBSCRIBE 提供了一种订阅符合给定模式的所有频道的方法,比如说,使用 it.* 为输入,就可以订阅所有以 it. 开头的频道,比如 it.news 、 it.blog 、 it.tweets ,诸如此类:

redis> PSUBSCRIBE it.*

Reading messages... (press Ctrl-C to quit)

1) "psubscribe"

2) "it.*"

3) (integer) 1

1) "pmessage"

2) "it.*" # 匹配的模式

3) "it.news" # 消息的来源频道

4) "Redis 2.6rc5 release" # 消息内容

1) "pmessage"

2) "it.*"

3) "it.blog"

4) "Why NoSQL matters"

1) "pmessage"

2) "it.*"

3) "it.tweet"

4) "@redis: when will the 2.6 stable release?"

当然, PSUBSCRIBE 也可以接受多个参数,从而匹配多种模式。

最后, UNSUBSCRIBE 命令和 PUNSUBSCRIBE 负责退订给定的频道或模式。

发布和订阅机制

当一个客户端通过 PUBLISH 命令向订阅者发送信息的时候,我们称这个客户端为发布者(publisher)。

而当一个客户端使用 SUBSCRIBE 或者 PSUBSCRIBE 命令接收信息的时候,我们称这个客户端为订阅者(subscriber)。

为了解耦发布者(publisher)和订阅者(subscriber)之间的关系,Redis 使用了 channel (频道)作为两者的中介 —— 发布者将信息直接发布给 channel ,而 channel 负责将信息发送给适当的订阅者,发布者和订阅者之间没有相互关系,也不知道对方的存在:

知道了发布和订阅的机制之后,接下来就可以开始研究具体的实现了,我们从 Redis 的订阅命令开始说起。

SUBSCRIBE 命令的实现

前面说到,Redis 将所有接受和发送信息的任务交给 channel 来进行,而所有 channel 的信息就储存在 redisServer 这个结构中:

struct redisServer {

// 省略 ...

dict *pubsub_channels; // Map channels to list of subscribed clients

// 省略 ...

};

pubsub_channels 是一个字典,字典的键就是一个个 channel ,而字典的值则是一个链表,链表中保存了所有订阅这个 channel 的客户端。

举个例子,如果在一个 redisServer 实例中,有一个叫做 news 的频道,这个频道同时被client_123 和 client_456 两个客户端订阅,那么这个 redisServer 结构看起来应该是这样子:

可以看出,实现 SUBSCRIBE 命令的关键,就是将客户端添加到给定 channel 的订阅链表中。

函数 pubsubSubscribeChannel 是 SUBSCRIBE 命令的底层实现,它完成了将客户端添加到订阅链表中的工作:

// 订阅指定频道

// 订阅成功返回 1 ,如果已经订阅过,返回 0

int pubsubSubscribeChannel(redisClient *c, robj *channel) {

struct dictEntry *de;

list *clients = NULL;

int retval = 0;

/* Add the channel to the client -> channels hash table */

// dictAdd 在添加新元素成功时返回 DICT_OK

// 因此这个判断句表示,如果新订阅 channel 成功,那么 。。。

if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {

retval = 1;

incrRefCount(channel);

/* Add the client to the channel -> list of clients hash table */

// 将 client 添加到订阅给定 channel 的链表中

// 这个链表是一个哈希表的值,哈希表的键是给定 channel

// 这个哈希表保存在 server.pubsub_channels 里

de = dictFind(server.pubsub_channels,channel);

if (de == NULL) {

// 如果 de 等于 NULL

// 表示这个客户端是首个订阅这个 channel 的客户端

// 那么创建一个新的列表, 并将它加入到哈希表中

clients = listCreate();

dictAdd(server.pubsub_channels,channel,clients);

incrRefCount(channel);

} else {

// 如果 de 不为空,就取出这个 clients 链表

clients = dictGetVal(de);

}

// 将客户端加入到链表中

listAddNodeTail(clients,c);

}

/* Notify the client */

addReply(c,shared.mbulkhdr[3]);

addReply(c,shared.subscribebulk);

// 返回订阅的频道

addReplyBulk(c,channel);

// 返回客户端当前已订阅的频道和模式数量的总和

addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));

return retval;

}

PSUBSCRIBE 命令的实现

除了直接订阅给定 channel 外,还可以使用 PSUBSCRIBE 订阅一个模式(pattern),订阅一个模式等同于订阅所有匹配这个模式的 channel 。

和 redisServer.pubsub_channels 属性类似, redisServer.pubsub_patterns 属性用于保存所有被订阅的模式,和 pubsub_channels 不同的是, pubsub_patterns 是一个链表(而不是字典):

struct redisServer {

// 省略 ...

list *pubsub_patterns; // A list of pubsub_patterns

// 省略 ...

};

pubsub_patterns 的每一个节点都是一个 pubsubPattern 结构的实例,它保存了被订阅的模式,以及订阅这个模式的客户客户端:

typedef struct pubsubPattern {

redisClient *client;

robj *pattern;

} pubsubPattern;

举个例子,假设在一个 redisServer 实例中,有一个叫做 news.* 的模式同时被客户端client_789 和 client_999 订阅,那么这个 redisServer 结构看起来应该是这样子:

现在可以知道,实现 PSUBSCRIBE 命令的关键,就是将客户端和订阅的模式添加到redisServer.pubsub_patterns 当中。

pubsubSubscribePattern 是 PSUBSCRIBE 的底层实现,它将客户端和所订阅的模式添加到redisServer.pubsub_patterns 当中:

// 订阅指定模式

// 订阅成功返回 1 ,如果已经订阅过,返回 0

int pubsubSubscribePattern(redisClient *c, robj *pattern) {

int retval = 0;

// 向 c->pubsub_patterns 中查找指定 pattern

// 如果返回值为 NULL ,说明这个 pattern 还没被这个客户端订阅过

if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {

retval = 1;

// 添加 pattern 到客户端 pubsub_patterns

listAddNodeTail(c->pubsub_patterns,pattern);

incrRefCount(pattern);

// 将 pattern 添加到服务器

pubsubPattern *pat;

pat = zmalloc(sizeof(*pat));

pat->pattern = getDecodedObject(pattern);

pat->client = c;

listAddNodeTail(server.pubsub_patterns,pat);

}

/* Notify the client */

addReply(c,shared.mbulkhdr[3]);

addReply(c,shared.psubscribebulk);

// 返回被订阅的模式

addReplyBulk(c,pattern);

// 返回客户端当前已订阅的频道和模式数量的总和

addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));

return retval;

}

PUBLISH 命令的实现

使用 PUBLISH 命令向订阅者发送消息,需要执行以下两个步骤:

1) 使用给定的频道作为键,在 redisServer.pubsub_channels 字典中查找记录了订阅这个频道的所有客户端的链表,遍历这个链表,将消息发布给所有订阅者。

2) 遍历 redisServer.pubsub_patterns 链表,将链表中的模式和给定的频道进行匹配,如果匹配成功,那么将消息发布到相应模式的客户端当中。

举个例子,假设有两个客户端分别订阅 it.news 频道和 it.* 模式,当执行命令PUBLISH it.news "hello moto" 的时候, it.news 频道的订阅者会在步骤 1 收到信息,而当PUBLISH 进行到步骤 2 的时候, it.* 模式的订阅者也会收到信息。

PUBLISH 命令的实际实现由 pubsubPublishMessage 函数完成,它的完整定义如下:

// 发送消息

int pubsubPublishMessage(robj *channel, robj *message) {

int receivers = 0;

struct dictEntry *de;

listNode *ln;

listIter li;

/* Send to clients listening for that channel */

// 向所有频道的订阅者发送消息

de = dictFind(server.pubsub_channels,channel);

if (de) {

list *list = dictGetVal(de); // 取出所有订阅者

listNode *ln;

listIter li;

// 遍历所有订阅者, 向它们发送消息

listRewind(list,&li);

while ((ln = listNext(&li)) != NULL) {

redisClient *c = ln->value;

addReply(c,shared.mbulkhdr[3]);

addReply(c,shared.messagebulk);

addReplyBulk(c,channel); // 打印频道名

addReplyBulk(c,message); // 打印消息

receivers++; // 更新接收者数量

}

}

/* Send to clients listening to matching channels */

// 向所有被匹配模式的订阅者发送消息

if (listLength(server.pubsub_patterns)) {

listRewind(server.pubsub_patterns,&li); // 取出所有模式

channel = getDecodedObject(channel);

while ((ln = listNext(&li)) != NULL) {

pubsubPattern *pat = ln->value; // 取出模式

// 如果模式和 channel 匹配的话

// 向这个模式的订阅者发送消息

if (stringmatchlen((char*)pat->pattern->ptr,

sdslen(pat->pattern->ptr),

(char*)channel->ptr,

sdslen(channel->ptr),0)) {

addReply(pat->client,shared.mbulkhdr[4]);

addReply(pat->client,shared.pmessagebulk);

addReplyBulk(pat->client,pat->pattern); // 打印被匹配的模式

addReplyBulk(pat->client,channel); // 打印频道名

addReplyBulk(pat->client,message); // 打印消息

receivers++; // 更新接收者数量

}

}

decrRefCount(channel); // 释放用过的 channel

}

return receivers; // 返回接收者数量

}

UNSUBSCRIBE 和 PUNSUBSCRIBE 的实现

UNSUBSCRIBE 和 PUNSUBSCRIBE 分别是 SUBSCRIBE 和 PSUBSCRIBE 的反操作,如果明白了SUBSCRIBE 和 PSUBSCRIBE 的工作机制的话,应该不难理解这两个反操作的原理,所以这里就省略详细的分析了,有兴趣的可以直接看代码。

小节

Redis 的 pubsub 机制的分析就到此结束了,跟往常一样,带有注释的完整 pubsub.c 文件可以到原作者的 GITHUB 上找: https://github.com/huangz1990/reading_redis_source

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/555324.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

理解java注解的实现原理

JAVA 注解的基本原理(重点) https://www.cnblogs.com/yangming1996/p/9295168.html 从以下4个方面来系统的学习一下java注解 什么是注解注解的用途注解使用演示注解的实现原理 1,什么是注解 注解也叫元数据,例如我们常见的Ove…

centos7重新加载服务的命令_CentOS7 从查看、启动、停止服务说起systemctl

执行命令“systemctl status 服务名.service”可查看服务的运行状态,其中服务名后的.service 可以省略,这是CenOS7以后采用systemd作为初始化进程后产生的变化。Systemctl是一个systemd工具,主要负责控制systemd系统和服务管理器。Systemd是一…

c#switch语句判断成绩_switch语句成绩等级例子

求 用C语言中switch语句编写学生成绩问题 #include void main() {double score; printf("请输入分数:\n"); scanf("%lf",&score); switch((int)(score/10))//switch((int)score/10)出现严重错误,switch(((int)score)/10)出现严重错误, {case 10: ca…

一体化住户调查_曲麻莱县2020年城乡一体化住户调查表彰会暨年报部署会

为全面推进我县城乡一体化住户调查工作,总结经验、鼓励优秀,提高统计员和辅助调查员的工作积极性,提高账本数据质量,11月25日下午,县统计局组织召开2020年全县城乡一体化住户调查工作表彰会暨年报部署会。全县6个镇的统…

电力系统继电保护第二版张保会_电力系统继电保护试题

一、填空题(每小题1分,共20分)1.电气元件配置两套保护,一套保护不动作时另一套保护动作于跳闸,称为_近后备_保护。2.电流继电器的_返回_电流与动作电流的比值称为继电器的返回系数。3.反应电流增大而瞬时动作的保护称为__无时限电流速断保护_…

Linux中shell脚本详解

文章目录1、第一个脚本程序:2、shell获取字符串长度:3、shell变量:4、**引用shell变量**:5、shell变量的赋值、修改、删除:5、shell特殊变量:6、shell中字符串的拼接:**7、字符串的截取**8、she…

java递归实现多级菜单栏_Java构建树形菜单以及支持多级菜单的实例代码

这篇文章主要介绍了Java构建树形菜单的实例代码(支持多级菜单),非常不错,具有参考借鉴价值,需要的朋友可以参考下效果图:支持多级菜单。菜单实体类:public class Menu {// 菜单idprivate String id;// 菜单名称private String nam…

java中clone方法的理解(深拷贝、浅拷贝)

文章目录前言:知识点一:什么是浅拷贝?知识点二:什么是深拷贝?知识点三、java拷贝(clone)的前提:知识点四:浅拷贝案例:拷贝类:测试类:总…

mysql实现内容加密_简单为mysql 实现透明加密方法

一般用户在数据库中保存数据,虽然数据库存储的是二进制,无法直接明文打开查看,但是如果是一个外行人,直接连接进入mysql中,还是可以直接查看数据的。所以对于一些核心数据,特别是企业重要数据资产&#xff…

Arrays.sort() 的一些用法

Arrays.sort() 的一些用法 介绍 sort(T[] a)&#xff1a;对指定T型数组按数字升序排序。sort(T[] a,int formIndex, int toIndex)&#xff1a;对指定T型数组的指定范围按数字升序排序。sort(T[] a, Comparator<? supre T> c): 根据指定比较器产生的顺序对指定对象数组…

foreach 语法糖

// 源码 public class Test {public static void test1() {int[] arr {1, 2, 3, 4, 5};for (int i : arr) {System.out.println(i);}}public static void main(String[] args) {test1();} }// 反编译 public class Test {public Test() {}public static void test1() {int[] a…

DB2连接不上mysql数据库_一次DB2数据库连接失败(SQLSTATE=08001)的解决方法

有一次&#xff0c;在使用DbVisualizer工具连接自己linux虚拟机上的DB2数据库时&#xff0c;报如下错误&#xff1a;Product: DbVisualizer Pro 9.1Build: #2050 (2013/09/08 11:03)Java VM: Java HotSpot(TM)64-Bit Server VMJava Version:1.6.0_43Java Vendor: Sun Microsyst…

Java设置时间为0时0分0秒和23时59分59秒

Calendar calendar Calendar.getInstance(); // 0时0分0秒 calendar.set(Calendar.HOUR_OF_DAY, 0); calendar.set(Calendar.MINUTE, 0); calendar.set(Calendar.SECOND, 0); calendar.set(Calendar.MILLISECOND, 0);// 23时59分59秒 calendar.set(Calendar.HOUR_OF_DAY, 23);…

mysql分片库分页查询_准备开发一个数据库分片的中间件,请问下分页查询用什么样的算法效率较高?...

假设你说的用户&#xff0c;不是开发人员&#xff0c;是终端用户&#xff0c;比如saas之类的系统用户。如果对于用户是透明的&#xff0c;意味着每个用户只需要看到自己的数据&#xff0c;那么比较经济的处理方式是&#xff0c;把用户id的哈希值作为分配的条件&#xff0c;这样…

Java之AQS(AbstractQueuedSynchronizer)

Java之AQS&#xff08;AbstractQueuedSynchronizer&#xff09; AQS 介绍 AQS 的全称为 AbstractQueuedSynchronizer &#xff0c;翻译过来的意思就是抽象队列同步器。这个类在 java.util.concurrent.locks 包下面。 ● 是用来实现锁或者其他同步器组件的公共基础部分的抽象实…

java中Arrays类的讲解

介绍 在java.util中有一个Arrays类&#xff0c;此类包含用于操纵数组的各种方法&#xff0c;例如&#xff1a;二分查找&#xff08;binarySearch&#xff09;、拷贝操作&#xff08;copyOf&#xff09;、比较(equals)、填充(fill)、排序&#xff08;sort&#xff09;等&#xf…

mysql 1055 解决方案_MySQL报错1055解决方案 - 树懒学堂

相信大家在使用MySQL数据库的过程中&#xff0c;或多或少都遇到报错。在mysql版本更新到5.7之后&#xff0c;有一个新的报错出现的次数越来越频繁&#xff0c;它就是MySQL 1055 报错。本文就给大家介绍一下MySQL 1055报错的原因以及解决的思路&#xff0c;并给出两种具体的解决…

SpringBoot 3.0最低版本要求的JDK 17,这几个新特性不能不知道

最近&#xff0c;有很多人在传说 SpringBoot要出3.0的版本了&#xff0c;并且宣布不再支持 Java 8&#xff0c;最低要求是 Java 17了。 其实&#xff0c;早在2021年9月份&#xff0c;关于 Spring Framework 6.0的消息出来的时候&#xff0c;Spring 官方就已经明确了不会向下兼…

jmeter mysql数据导出_Jmeter连接mysql

一、下载添加jar包image.png添加方法&#xff1a;1.拷贝到jmeter/lib目录下&#xff0c;此方法需重启jmeter2.直接在jmeter的测试计划中导入image.png二、连接mysql数据库添加配置元件-JDBC Connection Configurationimage.pngimage.png1.Variable Name for created pool&#…

判断一个坐标点是否在不规则多边形内部的算法

参考&#xff1a;https://wrf.ecse.rpi.edu//Research/Short_Notes/pnpoly.html 在GIS&#xff08;地理信息管理系统&#xff09;中&#xff0c;判断一个坐标是否在多边形内部是个经常要遇到的问题。乍听起来还挺复杂。根据W. Randolph Franklin 提出的PNPoly算法&#xff0c;…