1、使用过程:
发布创建channel1消息
redis-cli> PUBLISH channel1 "Hello, world!"
redis-cli> SUBSCRIBE channel1
优点:
1、采用Reactor事件单线程去驱动发布订阅事件的,实时性高。
2、从redis架构去思考,拓展哨兵、master、salve都相对简单容易, 扩展性高。
缺点:
1、可靠性一般,redis只管发送消息,不会等待订阅该频道的实例响应。
2、高频次访问发布消息,容易阻塞挤压,说白了还是Reactor单线程驱动缺点。
2、实现过程:主从切换通过发布订阅模式实现的。
2-1、实例化pubsub_channels(频道)哈希表,当前为空。
initServer函数
server.pubsub_channels = dictCreate(&keylistDictType,NULL);
server.pubsub_patterns = listCreate(); listSetFreeMethod(server.pubsub_patterns,freePubsubPattern); listSetMatchMethod(server.pubsub_patterns,listMatchPubsubPattern);
2-2、发布命令
初始化发布命令,往pubsub_channels添加channel和对应的订阅者列表。hash key存储了channel名称,value存储了订阅者列表。
initServerConfig函数
populateCommandTable函数加载redisCommandTable列表。
struct redisCommand redisCommandTable[] = {...{"publish",publishCommand,3,"pltr",0,NULL,0,0,0,0,0},{"pubsub",pubsubCommand,-2,"pltrR",0,NULL,0,0,0,0,0},...
}
publishCommand函数
void publishCommand(redisClient *c) {//发布命令,从pubsub_channels哈希表中查询到对应发布的消息。int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);if (server.cluster_enabled)//如果是cluster节点,则使用集群发布模式clusterPropagatePublish(c->argv[1],c->argv[2]);else//forceCommandPropagation(c,REDIS_PROPAGATE_REPL);//返回接收消息的订阅者数量addReplyLongLong(c,receivers);
}
2-3、订阅消息
从pubsub_channels适配channel对应的订阅者列表。
initServerConfig函数
populateCommandTable函数加载redisCommandTable列表。
批量订阅和退订指令加载
struct redisCommand redisCommandTable[] = {...{"subscribe",subscribeCommand,-2,"rpslt",0,NULL,0,0,0,0,0},{"unsubscribe",unsubscribeCommand,-1,"rpslt",0,NULL,0,0,0,0,0},{"psubscribe",psubscribeCommand,-2,"rpslt",0,NULL,0,0,0,0,0},{"punsubscribe",punsubscribeCommand,-1,"rpslt",0,NULL,0,0,0,0,0},...
}
void subscribeCommand(redisClient *c) {int j;for (j = 1; j < c->argc; j++)pubsubSubscribeChannel(c,c->argv[j]);
}void unsubscribeCommand(redisClient *c) {if (c->argc == 1) {pubsubUnsubscribeAllChannels(c,1);} else {int j;for (j = 1; j < c->argc; j++)pubsubUnsubscribeChannel(c,c->argv[j],1);}
}void psubscribeCommand(redisClient *c) {int j;for (j = 1; j < c->argc; j++)pubsubSubscribePattern(c,c->argv[j]);
}void punsubscribeCommand(redisClient *c) {if (c->argc == 1) {pubsubUnsubscribeAllPatterns(c,1);} else {int j;for (j = 1; j < c->argc; j++)pubsubUnsubscribePattern(c,c->argv[j],1);}
}
/* Subscribe a client to a channel. Returns 1 if the operation succeeded, or* 0 if the client was already subscribed to that channel. ** 设置客户端 c 订阅频道 channel 。** 订阅成功返回 1 ,如果客户端已经订阅了该频道,那么返回 0 。*/
int pubsubSubscribeChannel(redisClient *c, robj *channel) {dictEntry *de;list *clients = NULL;int retval = 0;/* Add the channel to the client -> channels hash table */// 将 channels 填接到 c->pubsub_channels 的集合中(值为 NULL 的字典视为集合)if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {retval = 1;incrRefCount(channel);// 关联示意图// {// 频道名 订阅频道的客户端// 'channel-a' : [c1, c2, c3],// 'channel-b' : [c5, c2, c1],// 'channel-c' : [c10, c2, c1]// }/* Add the client to the channel -> list of clients hash table */// 从 pubsub_channels 字典中取出保存着所有订阅了 channel 的客户端的链表// 如果 channel 不存在于字典,那么添加进去de = dictFind(server.pubsub_channels,channel);if (de == NULL) {clients = listCreate();dictAdd(server.pubsub_channels,channel,clients);incrRefCount(channel);} else {clients = dictGetVal(de);}// before:// 'channel' : [c1, c2]// after:// 'channel' : [c1, c2, c3]// 将客户端添加到链表的末尾listAddNodeTail(clients,c);}/* Notify the client */// 回复客户端。// 示例:// redis 127.0.0.1:6379> SUBSCRIBE xxx// Reading messages... (press Ctrl-C to quit)// 1) "subscribe"// 2) "xxx"// 3) (integer) 1addReply(c,shared.mbulkhdr[3]);// "subscribe\n" 字符串addReply(c,shared.subscribebulk);// 被订阅的客户端addReplyBulk(c,channel);// 客户端订阅的频道和模式总数addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));return retval;
}