前言
博主在学习 Redis 实现发布订阅功能的时候,踩了太多的坑。
不是讲解不详细,看的一知半解;就是代码有问题,实际压根跑不起来!
于是博主萌生了自己写一个最新版且全程无错的博客供各位参考。希望各位不要把我才过的坑再踩一遍。(实战篇的所有代码均由本人测试,全程无Bug。)
废话不多说,让我们进入实战篇的学习!
在开始实战篇的之前,我们先一起回顾下原理篇的内容。
Redis 发布/订阅的优点
- 高性能:Redis 作为内存存储,具备极高的读写性能,能够快速处理发布和订阅消息。
- 简单易用:Redis 的发布/订阅接口简单,易于集成和使用。
- 实时性强:发布的消息会立即传递给所有订阅者,具备高实时性。
Redis 发布/订阅的缺点
- 消息丢失:由于 Redis 是内存存储,如果 Redis 实例宕机,未处理的消息可能会丢失。
- 无法持久化:Redis 的发布/订阅模式不支持消息持久化,无法存储和检索历史消息。
- 订阅者不可控:发布者无法控制订阅者的数量和状态,无法保证所有订阅者都能接收到消息。
- 无确认机制:发布者无法确认消息是否被订阅者接收和处理。
正如上述中Redis
的缺点,Redis
的发布订阅功能并不可靠,如果我们需要保证消息的可靠性、包括确认、重试等要求,我们还是要选择使用MQ
实现发布订阅。
Redis的发布/订阅应用场景:
- 对于消息处理可靠性要求不强
- 消息无需持久化
- 消费能力无需通过增加消费方进行增强
- 架构简单 中小型系统不希望应用过多中间件
Redis发布订阅命令
命令 | 描述 |
Redis Unsubscribe 命令 | 指退订给定的频道 |
Redis Subscribe 命令 | 订阅给定的一个或多个频道的信息 |
Redis Pubsub 命令 | 查看订阅与发布系统状态 |
Redis Punsubscribe 命令 | 退订所有给定模式的频道 |
Redis Publish 命令 | 将信息发送到指定的频道 |
Redis Psubscribe 命令 | 订阅一个或多个符合给定模式的频道 |
正式开始
一、添加依赖
首先,确保你已经安装并配置好 Redis 服务器,并创建了 Spring Boot 项目,在pom.xml中引入依赖。
<!-- 所需依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency>
二、配置文件中配置Redis
spring:# 项目名称application:name: test-redis-boot# Redis 配置data:redis:host: 填写自己的主机IPport: 8000password: 有则填,没有去掉这个属性database: 1# 连接超时时间timeout: 10slettuce:pool:# 连接池中的最小空闲连接min-idle: 5# 连接池中的最大空闲连接max-idle: 8# 连接池的最大数据库连接数max-active: 20# #连接池最大阻塞等待时间(使用负值表示没有限制)max-wait: -1ms
三、创建Redis配置类
创建一个配置类,用于配置 Redis
连接工厂和消息监听器监听通道信息。
注:此配置类无需死记硬背。只需大致了解每个方法的作用即可。
/*** @Description Redis 配置类,用于配置 Redis 连接工厂和消息监听器监听通道信息* @Author gongming.Zhang* @Date 2024/9/11 18:27* @Version 1.0*/
@Configuration
@Slf4j
public class RedisConfig {/*** 自定义 RedisTemplate 序列化方式** @param redisConnectionFactory Redis 连接的线程安全工厂* @return 模板类*/@Beanpublic RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();// 绑定 RedisConnectionFactoryredisTemplate.setConnectionFactory(redisConnectionFactory);// 创建 Jackson2JsonRedisSerializer 序列方式,对象类型使用 Object 类型。ObjectMapper objectMapper = new ObjectMapper();objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);objectMapper.activateDefaultTyping(new LaissezFaireSubTypeValidator(), ObjectMapper.DefaultTyping.NON_FINAL);Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(objectMapper, Object.class);// 设置 RedisTemplate 序列化规则,因为 key 通常是普通的字符串,所以使用StringRedisSerializer即可,而 value 是对象时,才需要使用序列化与反序列化。redisTemplate.setKeySerializer(new StringRedisSerializer());redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);// hash key 序列化规则redisTemplate.setHashKeySerializer(new StringRedisSerializer());redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);// 属性设置后操作redisTemplate.afterPropertiesSet();log.info("RedisTemplate 自定义序列化配置完毕...");return redisTemplate;}/*** 配置主题订阅* * 可以添加多个监听器,监听多个通道,只需要将消息监听器与订阅的通道/主题绑定即可。* addMessageListener(MessageListener listener, Collection<? extends Topic> topics):将消息监听器与多个订阅的通道/主题绑定* addMessageListener(MessageListener listener, Topic topic):将消息监听器与订阅的通道/主题绑定** @param connectionFactory Redis 连接的线程安全工厂* @return 容器对象*/@Beanpublic RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,MessageListenerAdapter listenerAdapter) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();// 设置连接工厂,RedisConnectionFactory 可以直接从容器中取,也可以从 RedisTemplate 中取container.setConnectionFactory(connectionFactory);// 订阅名称叫 cache 的通道, 类似 Redis 中的 subscribe 命令container.addMessageListener(listenerAdapter, new ChannelTopic("cache"));// 订阅名称以 'test-' 开头的全部通道, 类似 Redis 的 pSubscribe 命令container.addMessageListener(listenerAdapter, new PatternTopic("test-*"));log.info("消息监听器和通道绑定完毕...");return container;}/*** 配置消息监听适配器** @param redisReceiveListener* @return*/@Beanpublic MessageListenerAdapter listenerAdapter(RedisReceiveListener redisReceiveListener) {return new MessageListenerAdapter(redisReceiveListener);}
}
四、创建消息发送服务端
创建一个消息发布类,用于向客户端发布消息。
/*** @Description 消息发布服务端* @Author gongming.Zhang* @Date 2024/9/12 9:42* @Version 1.0*/
@Component
@Slf4j
public class MessagePublisher {@Autowiredprivate RedisTemplate<Object, Object> redisTemplate;/*** 服务端发布消息** @param channel 通道名* @param message 待发送的消息*/public void sendMessage(String channel, String message) {redisTemplate.convertAndSend(channel, message);log.info("消息发送成功... channel={}, message={}", channel, message);}}
五、创建监听器(客户端)
用于监听服务端发送的消息,每次服务端发送新消息时,都会自动调用onMessage()
方法。
/*** @Description Redis 消息监听器* @Author gongming.Zhang* @Date 2024/9/11 18:53* @Version 1.0*/
/*当收到订阅的消息时,会将消息交给这个类处理。* 可以直接实现 MessageListener 接口,也可以继承它的实现类 MessageListenerAdapter.* 自动多线程处理,打印日志即可看出,即使手动延迟,也不会影响后面消息的接收。*/
@Component
@Slf4j
public class RedisReceiveListener implements MessageListener {@Autowiredprivate RedisTemplate<Object, Object> redisTemplate;/*** 处理回调逻辑。每次新消息到达时,都会调用此方法。通过 onMessage 方法执行业务代码* <p>* 该接口不仅可以访问实际消息,还可以访问接收消息的频道(Channel),以及订阅时用于匹配频道(Channel)的模式。* 此信息使被调用者不仅可以通过内容区分各种消息,还可以检查其他详细信息。** @param message 消息对象,不能为 null* @param pattern 与通道匹配的模式(如果指定),可以为 null*/@Overridepublic void onMessage(Message message, byte[] pattern) {// 1.获取消息所属的通道 --> 首先获取 字符串序列化器,再从给定的二进制数据中反序列化对象。String channel = redisTemplate.getStringSerializer().deserialize(message.getChannel());// 2.获取客户端发送的消息内容 --> 后期可以根据自己项目中 消息 的类型,来确定用什么序列化器Object msg = redisTemplate.getValueSerializer().deserialize(message.getBody());log.info("收到Redis订阅消息: channel={} msg={}", channel, msg);}
}
六、编写Controller测试
/*** @Description 测试订阅发布功能* @Author gongming.Zhang* @Date 2024/9/12 10:13* @Version 1.0*/
@RestController
@RequestMapping(value = "/api/v1/publish")
public class PublisherController {@Autowiredprivate MessagePublisher publisher;@GetMapping("/sendMessage")public String sendMessage(@RequestParam(value = "message") String message, @RequestParam(value = "channel") String channel) {publisher.sendMessage(channel, message);return "Message published: " + message;}
}
七、测试响应数据
至此,我们 SpringBoot 整合 Redis 实现发布订阅功能已经完成!
总结
通过本文,我们详细介绍了如何在 SpringBoot 中整合 Redis 实现发布/订阅功能,并提供了详细的代码示例。Redis 发布/订阅模式以其高性能和简单易用的特点,在实时消息传递场景中有着广泛的应用,但同时也需要注意其消息丢失和无法持久化等缺点,需要根据实际业务需求选择。