2019独角兽企业重金招聘Python工程师标准>>>
Redis发布与订阅——PUBLISH & SUBSCRIBE
一般来说,发布与订阅(又称pub/sub)的特点是订阅者(listener)负责订阅频道(channel),发送者(publisher)负责向频道发送二进制字符串消息(binary string message)。每当有消息发送至给定频道时,频道的订阅者都会收到消息。我们也可以把频道看作是电台,其中订阅者可以同时收听多个电台,而发送者则可以在任何电台发送消息。
发布与订阅的模式
Redis 的 SUBSCRIBE 命令可以让客户端订阅任意数量的频道, 每当有新信息发送到被订阅的频道时, 信息就会被发送给所有订阅指定频道的客户端。
作为例子, 下图展示了频道 channel1 , 以及订阅这个频道的三个客户端 —— client2 、 client5 和 client1 之间的关系:
当有新消息通过 PUBLISH 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的三个客户端:
发布与订阅相关命令
订阅和发布
首先打开一个cli,订阅一个频道,如下,
127.0.0.1:7000> subscribe mychannel
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "mychannel"
3) (integer) 1
然后向该频道发送一个消息,如下,
127.0.0.1:7000> publish mychannel 'hell world'
(integer) 1
127.0.0.1:7000>
刚才的客户端已经订阅了该频道,所以该频道就会收到消息,如下,
➜ ~ redis-cli -p 7000
127.0.0.1:7000> subscribe mychannel
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "mychannel"
3) (integer) 1
1) "message"
2) "mychannel"
3) "hell world"
其他命令:
UNSUBSCRIBE——退订给定的一个或多个频道,如果执行是没有给定任何频道,那么退订所有频道
PSUBSCRIBE——PSUBSCRIBE pattern [pattern ...] 订阅与给定模式相匹配的所有频道
PUNSUBSCRIBE——PUNSUBSCRIBE [pattern [pattern ...]] 退订给定的模式,如果执行是没有给定任何模式,那么退订所有模式。
使用Jedis客户端实现发布与订阅
首先实现一个Listener,用于订阅消息,接收消息,
package com.usoft.jedis.sample;import redis.clients.jedis.JedisPubSub;/*** Created by xinxingegeya on 16/4/13.*/
public class MessageListener extends JedisPubSub {@Overridepublic void onMessage(String channel, String message) {System.out.println("channel:" + channel + ",message:" + message);//此处我们可以取消订阅if (message.equalsIgnoreCase("quit")) {this.unsubscribe(channel);}}
}
写一个测试类,实现订阅和发布,
package com.usoft.jedis.sample;import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;/*** Created by xinxingegeya on 16/4/13.*/
public class PubSubTest {/*** jedis连接池*/public JedisPool jedisPool;/*** 发布和订阅的频道*/public String channel = "mychannel";@Beforepublic void before() {JedisPoolConfig config = new JedisPoolConfig();config.setMaxTotal(10);config.setMaxIdle(5);config.setMaxWaitMillis(5000);config.setTestOnBorrow(true);jedisPool = new JedisPool(config, "127.0.0.1", 7000);}@Afterpublic void after() {jedisPool.close();}@Testpublic void subscribe() {Jedis jedis = jedisPool.getResource();jedis.subscribe(new MessageListener(), channel);}/*** 发布9次消息后,在此发送quit消息,使listener(订阅者)关闭*/@Testpublic void publish() {Jedis jedis = jedisPool.getResource();for (int i = 0; i < 10; i++) {if (i == 9) {jedis.publish(channel, "quit");} else {jedis.publish(channel, "hello world");}}}
}
=========END=========