【Spring连载】使用Spring Data访问Redis(八)----发布/订阅消息

【Spring连载】使用Spring Data访问Redis(八)----发布/订阅消息Pub/Sub Messaging

  • 一、发布消息Publishing (Sending Messages)
  • 二、订阅消息Subscribing (Receiving Messages)
    • 2.1 消息监听容器Message Listener Containers
    • 2.2 消息监听适配器The MessageListenerAdapter
  • 三、反应式消息监听器容器Reactive Message Listener Container
    • 3.1 通过template API订阅Subscribing via template API

Spring Data为Redis提供了专用的消息集成,在功能和命名方面与Spring Framework中的JMS集成类似。
Redis消息传递大致可以分为两个功能领域:

  • 消息的发布(publish)或生产
  • 消息的订阅(subscribe)或消费

这是一个通常称为发布/订阅(简称Pub/Sub)的模式示例。RedisTemplate类用于消息生成。对于类似于Java EE的消息驱动bean风格的异步接收,Spring Data提供了一个专用的消息监听器容器,用于创建消息驱动的POJO(MDP),对于同步接收,还提供RedisConnection。
org.springframework.data.redis.connection 和 org.springframework.data.redis.listener包提供Redis消息传递的核心功能。

一、发布消息Publishing (Sending Messages)

要发布消息,你可以像使用其他操作一样,使用低级的RedisConnection或高级的RedisOperations。这两个实体都提供了publish方法,该方法接受消息和目标通道(channel)作为参数。RedisConnection需要原始数据(字节数组),而RedisOperations允许任意对象作为消息传递,如下面的例子所示:

// send message through connection
RedisConnection con =byte[] msg =byte[] channel = …
con.pubSubCommands().publish(msg, channel);// send message through RedisOperations
RedisOperations operations =Long numberOfClients = operations.convertAndSend("hello!", "world");

一个相对完整的发布例子:

package com.example.demo;import org.springframework.data.redis.connection.RedisPassword;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;public class RedisPublishMessage {public JedisConnectionFactory jedisConnectionFactory() {RedisStandaloneConfiguration redisStandaloneConfiguration =new RedisStandaloneConfiguration();redisStandaloneConfiguration.setHostName("localhost");redisStandaloneConfiguration.setDatabase(0);redisStandaloneConfiguration.setPassword(RedisPassword.of("123456"));redisStandaloneConfiguration.setPort(6379);return new JedisConnectionFactory(redisStandaloneConfiguration);}public static void main(String[] args) {JedisConnectionFactory connectionFactory = new RedisApplication().jedisConnectionFactory();connectionFactory.afterPropertiesSet();RedisTemplate<String, String> template = new RedisTemplate<>();template.setConnectionFactory(connectionFactory);template.setDefaultSerializer(StringRedisSerializer.UTF_8);template.afterPropertiesSet();
// send message through RedisOperationsRedisOperations operations = template;Long numberOfClients = operations.convertAndSend("mychannel", "This is Gabriel");System.out.println(numberOfClients);}
}

二、订阅消息Subscribing (Receiving Messages)

在接收端,可以通过直接命名或使用模式(pattern)匹配来订阅一个或多个通道(channels)。后一种方法非常有用,因为它不仅允许使用一个命令创建多个订阅(subscription),而且还可以监听订阅时尚未创建的通道(只要它们与模式匹配)。
在底层,RedisConnection提供了subscribe和pSubscribe方法,它们分别映射Redis命令的以通道或模式进行订阅。前述2个方法可以使用多个通道或多个模式做为参数。为了更改连接的订阅或检查连接是否在监听,RedisConnection提供了getSubscription和isSubscribed方法。
Spring Data Redis中的Subscription命令是阻塞的。也就是说,在连接上调用subscribe会导致当前线程在开始等待消息时阻塞。只有当订阅被取消时,线程才会被释放,当另一个线程在同一连接上调用unsubscribe或pUnsubscribe时,就会发生这种情况。有关此问题的解决方案,请参阅“2.1 消息监听容器”(本文档稍后部分)。
如前所述,一旦订阅,连接就会开始等待消息。只允许添加新订阅、修改现有订阅和取消现有订阅的命令。调用除subscribe, pSubscribe, unsubscribe, 和 pUnsubscribe之外的任何操作都会引发异常。
为了订阅消息,需要实现MessageListener回调。每次新消息到达时,都会调用回调,并通过onMessage方法运行用户代码。该接口不仅可以访问实际消息,还可以访问接收该消息的通道以及用于匹配订阅的通道的模式(如果有的话)。这些信息使被调用者来区分各种消息,不仅是通过内容,还可以通过检查其他细节。
一个相对完整的订阅例子:

package com.example.demo;import org.springframework.data.redis.connection.*;
import org.springframework.data.redis.connection.jedis.JedisConnection;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;import java.nio.charset.StandardCharsets;public class RedisSubscribeMessage {public JedisConnectionFactory jedisConnectionFactory() {RedisStandaloneConfiguration redisStandaloneConfiguration =new RedisStandaloneConfiguration();redisStandaloneConfiguration.setHostName("localhost");redisStandaloneConfiguration.setDatabase(0);redisStandaloneConfiguration.setPassword(RedisPassword.of("123456"));redisStandaloneConfiguration.setPort(6379);return new JedisConnectionFactory(redisStandaloneConfiguration);}public static void main(String[] args) {JedisConnectionFactory connectionFactory = new RedisApplication().jedisConnectionFactory();connectionFactory.afterPropertiesSet();RedisTemplate<String, String> template = new RedisTemplate<>();template.setConnectionFactory(connectionFactory);template.setDefaultSerializer(StringRedisSerializer.UTF_8);template.afterPropertiesSet();RedisConnection redisConnection = template.getConnectionFactory().getConnection();redisConnection.subscribe(new MessageListener() {@Overridepublic void onMessage(Message message, byte[] bytes) {// 收到消息的处理逻辑System.out.println("Receive message : " + message);}}, "mychannel".getBytes(StandardCharsets.UTF_8));}
}

2.1 消息监听容器Message Listener Containers

由于其阻塞特性,低级别订阅(RedisConnection的订阅)没有吸引力,因为它需要对每个监听器进行连接和线程管理。为了缓解这个问题,Spring Data提供了RedisMessageListenerContainer,它完成了所有繁重的工作。如果你熟悉EJB和JMS,你应该会发现这些概念很熟悉,因为它的设计尽可能地接近Spring Framework的支持及其消息驱动的POJO(MDP)。
RedisMessageListenerContainer充当消息监听器容器。它用于从Redis通道(channel)接收消息,并驱动注入其中的MessageListener实例。监听器容器负责消息接收的所有线程,并将消息分派到监听器中进行处理。消息监听器容器是MDP和消息传递提供者之间的中介,负责注册接收消息、资源获取和释放、异常转换等。这使你作为应用程序开发人员能够编写与接收消息相关联的(可能复杂的)业务逻辑,并将Redis基础设施的公式化问题委托给框架。
MessageListener还可以实现SubscriptionListener,以便在确认订阅/取消订阅时接收通知。在同步调用时,监听订阅通知可能很有用。
为了最大限度地减少应用程序占用,RedisMessageListenerContainer允许多个监听器共享一个连接和一个线程,即使它们不共享订阅。因此,无论应用程序跟踪多少监听器或通道,运行时成本在其整个生命周期中都保持不变。另外,容器允许更改运行时配置,以便在应用程序运行时添加或删除监听器,而无需重新启动。此外,容器使用延迟订阅方法,仅在需要时使用RedisConnection。如果所有监听器都被取消订阅,则会自动执行清理,并释放线程。
为了保证消息的异步特性,容器需要一个java.util.concurrent.Executor(或Spring的TaskExecutor)来分发消息。根据负载、监听器的数量和运行时环境,你应该调整executor以更好地满足你的需求。

2.2 消息监听适配器The MessageListenerAdapter

MessageListenerAdapter类是Spring异步消息传递支持的最后一个组件。简而言之,它允许你将几乎任何类公开为MDP(尽管有一些约束)。以下面的接口定义举例:

public interface MessageDelegate {void handleMessage(String message);void handleMessage(Map message);void handleMessage(byte[] message);void handleMessage(Serializable message);// pass the channel/pattern as wellvoid handleMessage(Serializable message, String channel);}

请注意,尽管上面的接口没有扩展MessageListener接口,但仍然可以通过使用MessageListenerAdapter类将其用作MDP。还请注意,各种消息处理方法是如何根据它们可以接收和处理的各种消息类型的内容进行强类型化的。此外,消息发送到的通道(channel)或模式(pattern)可以作为第二个String类型的参数传递给方法:

public class DefaultMessageDelegate implements MessageDelegate {// implementation elided for clarity...
}

注意上面的MessageDelegate接口的实现(上面的DefaultMessageDelegate类)是如何完全没有Redis依赖的。它确实是一个POJO,我们使用以下配置将其创建为MDP:

@Configuration
class MyConfig {// …@BeanDefaultMessageDelegate listener() {return new DefaultMessageDelegate();}@BeanMessageListenerAdapter messageListenerAdapter(DefaultMessageDelegate listener) {return new MessageListenerAdapter(listener, "handleMessage");}@BeanRedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory, MessageListenerAdapter listener) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.addMessageListener(listener, ChannelTopic.of("chatroom"));return container;}
}

监听器主题可以是一个通道(例如,topic=“chatroom”),也可以是一种模式(例如,topic=“*room”)
前面的示例使用Redis命名空间来声明消息监听器容器,并自动将POJO注册为监听器。成熟beans的定义如下:

<bean id="messageListener" class="org.springframework.data.redis.listener.adapter.MessageListenerAdapter"><constructor-arg><bean class="redisexample.DefaultMessageDelegate"/></constructor-arg>
</bean><bean id="redisContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer"><property name="connectionFactory" ref="connectionFactory"/><property name="messageListeners"><map><entry key-ref="messageListener"><bean class="org.springframework.data.redis.listener.ChannelTopic"><constructor-arg value="chatroom"/></bean></entry></map></property>
</bean>

每次接收到消息时,适配器都会自动且透明地在低级(low-level)格式和所需的对象类型之间执行转换(使用配置的RedisSerializer)。由方法调用引起的任何异常都将被捕获并由容器处理(默认情况下,异常将被记录)。

三、反应式消息监听器容器Reactive Message Listener Container

Spring Data提供了ReactiveRedisMessageListenerContainer,它帮助用户完成所有繁重的转换和订阅状态管理工作。
消息监听器容器本身不需要外部线程资源。它使用driver线程来发布消息。

ReactiveRedisConnectionFactory factory =ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(factory);Flux<ChannelMessage<String, String>> stream = container.receive(ChannelTopic.of("my-channel"));

要等待并确保正确的订阅,可以使用receiveLater方法,该方法返回Mono<Flux>。返回的Mono与内部发布者一起完成,作为完成对给定主题的订阅的结果。通过拦截onNext信号,你可以同步服务器端订阅。

ReactiveRedisConnectionFactory factory =ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(factory);Mono<Flux<ChannelMessage<String, String>>> stream = container.receiveLater(ChannelTopic.of("my-channel"));stream.doOnNext(inner -> // notification hook when Redis subscriptions are synchronized with the server).flatMapMany(Function.identity()).;

3.1 通过template API订阅Subscribing via template API

如上所述,你可以直接使用ReactiveRedisTemplate订阅通道/模式。这种方法提供了一种直接但有限的解决方案,因为你失去了在初始订阅之后添加订阅的选项。尽管如此,你仍然可以通过返回的Flux来控制消息流,例如使用take(Duration)。当读取完成、出错或取消时,所有绑定的资源将再次释放。

redisTemplate.listenToChannel("channel1", "channel2").doOnNext(msg -> {// message processing ...
}).subscribe();

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/666514.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Transformer 自然语言处理(三)

原文&#xff1a;Natural Language Processing with Transformers 译者&#xff1a;飞龙 协议&#xff1a;CC BY-NC-SA 4.0 第八章&#xff1a;使 transformers 在生产中更高效 在之前的章节中&#xff0c;您已经看到了 transformers 如何被微调以在各种任务上产生出色的结果。…

AI 数字人从制作到变现

最近AI很火&#xff0c;无意中发现一个宝藏专栏《AI数字人从制作到变现》&#xff0c;原价599&#xff0c;现在推广阶段&#xff0c;只需要10元&#xff0c;专栏持续更新中&#xff0c;会有更多的知识后续分享。如有兴趣可以用微信扫描左侧海报二维码&#xff0c;下面我将介绍专…

『运维备忘录』之 Cron 命令详解

运维人员不仅要熟悉操作系统、服务器、网络等只是&#xff0c;甚至对于开发相关的也要有所了解。很多运维工作者可能一时半会记不住那么多命令、代码、方法、原理或者用法等等。这里我将结合自身工作&#xff0c;持续给大家更新运维工作所需要接触到的知识点&#xff0c;希望大…

计算机设计大赛 深度学习 机器视觉 人脸识别系统 - opencv python

文章目录 0 前言1 机器学习-人脸识别过程人脸检测人脸对其人脸特征向量化人脸识别 2 深度学习-人脸识别过程人脸检测人脸识别Metric Larning 3 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; 深度学习 机器视觉 人脸识别系统 该项目…

ping 不支持代理,命令行测试外网网址请使用 curl 测试,如何测试?

如果你想通过命令行测试外网网址的可达性&#xff0c;并且因为 ping 命令不支持通过代理服务器进行操作&#xff0c;你可以使用 curl 命令来测试。curl 是一个强大的工具&#xff0c;可以用来传输数据&#xff0c;它支持多种协议&#xff0c;包括 HTTP、HTTPS 等&#xff0c;而…

必读:揭秘HR心理,高效制胜校招实习简历!

场景 很多同学之前私信过语兴简历修改问题&#xff0c;这期我找几个特定典型问题说 问题 不清楚校招实习简历该如何写为什么投了半天也没反馈&#xff1f;为什么面试官基本不问我简历的内容&#xff1f; 解答 第一点 &#x1f50d; 筛选简历角度&#xff1a;突显阿里P7及…

6个国内可用的chat大模型

文心一言 - 一款适合中国人使用的AI智能助理&#xff0c;能够帮助用户进行对话、生成内容等工作&#xff0c;提高工作效率和创作水平 文心一言 文心一言 App 是一款适合中国人的 AI 智能助理&#xff0c;它的功能点主要包括&#xff1a; 工作生活助理&#xff1a;该应用通过简…

OJ刷题:《剑指offer》之单身狗1、2 !(巧用位操作符,超详细讲解!)

目录 1.单身狗1 1.1 题目描述 1.2排序寻找 1.3巧用位操作符 2.单身狗2 1.1 题目描述 1.2排序寻找 1.3巧用位操作符 不是每个人都能做自己想做的事&#xff0c;成为自己想成为的人。 克心守己&#xff0c;律己则安&#xff01; 创作不易&#xff0c;宝子们&#xff01;如…

智慧未来已至:人工智能与数字孪生共筑城市新纪元

随着科技的飞速发展&#xff0c;人工智能与数字孪生技术正逐步成为智慧城市建设的核心驱动力。 这两项技术的结合&#xff0c;不仅将彻底改变城市的传统面貌&#xff0c;更将引领我们走向一个更加高效、便捷、绿色的未来。 一、智慧城市的新内涵 智慧城市&#xff0c;是指在城…

移动Web——less

1、less-简介 less是一个CSS预处理器&#xff0c;Less文件后缀是.less。扩充了CSS语言&#xff0c;使CSS具备一定的逻辑性、计算能力注意&#xff1a;浏览器不识别Less代码&#xff0c;目前阶段&#xff0c;网页要引入对应的CSS文件VS code插件&#xff1a;Easy LESS&#xff…

Kafka系列(二)将消息数据写入Kafka系统--生产者【异步发送、同步发送、单线程发送、多线程发送、配置生产者属性、自定义序列化、自定义主题分区】

Kafka系列 发送消息到 Kafka 主题了解异步模式了解同步模式线程发送消息的步骤生产者用单线程发送消息生产者用多线程发送消息 配置生产者属性保存对象的各个属性一序列化序列化一个对象序列化对象的存储格式自己实现 序列化的步骤1. 创建序列化对象2. 编写序列化工具类3. 编写…

13.2 Web与Servlet进阶(❤❤)

13.2 Web与Servlet进阶 1. 请求与响应1.1 URL与URI1.2 HTTP请求的结构1. 结构2.后端获取访问工具类型:getHeader().toLowerCase方法1.3 响应的结构1. 结构2. 响应常见状态码3. 后端设置响应参数4. 响应的ContentType作用1.4 请求转发与响应重定向应用1. 请求转发:getRequestDis…

Mysql运维篇(四) MySQL常用命令

一路走来&#xff0c;所有遇到的人&#xff0c;帮助过我的、伤害过我的都是朋友&#xff0c;没有一个是敌人。如有侵权&#xff0c;请留言&#xff0c;我及时删除&#xff01; 一、MySQL命令速查表 https://www.cnblogs.com/pyng/p/15560059.html Mysql DBA运维命令大全 - 墨…

【机器学习、深度学习和强化学习原理】

目录 机器学习、深度学习和强化学习都是人工智能的重要领域&#xff0c;它们的代码原理与实现有所不同。机器学习是一种通过训练模型来从数据中学习规律和模式的技术。其代码实现通常包括以下步骤&#xff1a;深度学习是一种模仿人脑神经网络的算法&#xff0c;通过多层神经网络…

批量注册与自动下单:探索速卖通跨境智能系统的操作方法

速卖通跨境智能系统是一款功能强大的软件&#xff0c;可以帮助用户批量注册速卖通买家号、绑定地址、加购加心愿单以及自动下单等任务。 该软件具有以下优势&#xff1a; 强大的指纹系统&#xff1a;采用最新的反指纹技术&#xff0c;可以设置与代理IP相对应的语言和时区&…

调试prplmesh指南

前提&#xff1a;安装ubuntu虚拟机 本文使用的linux发行版本&#xff1a;Ubuntu 20.04.6 LTS 1 安装依赖 安装相关依赖包 sudo apt-get update && sudo apt-get upgrade sudo apt-get install vim git unzip curl binutils cmake gcc bison curl flex gcovr binuti…

Java swing——创建对话框JDialog

之前我们讲了怎么建立一个简易的窗口&#xff0c;链接&#xff1a;http://t.csdnimg.cn/l7QSs&#xff0c;接下来继续讲解窗口的进阶。 对话框 上一篇文章中我们讲到了JFrame是一种顶层容器&#xff0c;本文接下来介绍其余的顶层容器。 跟JFrame一样&#xff0c;&#xff0c;这…

C/C++ 回调函数 callback 异步编程

一、C语言的回调函数 1.小试牛刀 #include <iostream> using namespace std; #include <memory> #include <stdlib.h>int add(int a, int b) {return a b; }void test01() {// 函数指针可以指向任何类型的函数&#xff0c;只要函数的参数列表和返回值类型…

如何结合ChatGPT生成个人魔法咒语词库

3.6.1 ChatGPT辅助力AI绘画 3.6.1.1 给定主题让ChatGPT直接描述 上面给了一个简易主题演示一下&#xff0c;这是完全我没有细化的提问&#xff0c;然后把直接把这些关键词组合在一起。 关键词&#xff1a; 黄山的美景&#xff0c;生机勃勃&#xff0c;湛蓝天空&#xff0c;青…

厕所革命与可持续发展的“九牧方案”

人类文明的历史&#xff0c;就是厕所的革命史&#xff0c;小小的厕所里&#xff0c;承载着大故事。 2015 年&#xff0c;印度一个名叫娜尔的女孩&#xff0c;因为丈夫不愿意在家盖厕所&#xff0c;向法庭提出了离婚申请&#xff0c;由此引发了全印度“无厕所&#xff0c;无新娘…