RabbitMQ实践——使用WebFlux响应式方式实时返回队列中消息

大纲

  • Pom.xml
  • 监听队列
  • 实时返回消息
  • 测试
  • 完整代码
  • 工程代码

在之前的案例中,我们在管理后台收发消息都是通过短连接的形式。本文我们将探索对队列中消息的实时读取,并通过流式数据返回给客户端。
webflux是反应式Web框架,客户端可以通过一个长连接和服务端相连,后续服务端可以通过该连接持续给客户端发送消息。可以达到:发送一次,多次接收的效果。

Pom.xml

由于我们要使用Rabbitmq,所以要新增如下依赖

		<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-stream</artifactId></dependency>

webflux的依赖如下:

		<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-core</artifactId><version>3.6.7</version></dependency>

监听队列

下面代码会返回一个监听队列的Container

    private SimpleMessageListenerContainer getListener(String queueName, MessageListener messageListener) {lock.lock();try {SimpleMessageListenerContainer listener = listeners.get(queueName);if (listener == null && messageListener != null) {listener = new SimpleMessageListenerContainer();listener.setConnectionFactory(connectionFactory);listener.setQueueNames(queueName);listener.setMessageListener(messageListener);listeners.put(queueName, listener);}return listener;} finally {lock.unlock();}}

实时返回消息

一旦消费者读取到消息,onMessage方法会被调用。然后Flux的消费者会将消息投递到流上。

    public Flux<String> listen(String queueName) {return Flux.create(emitter -> {SimpleMessageListenerContainer container = getListener(queueName, new MessageListener() {@Overridepublic void onMessage(Message message) {String msg = new String(message.getBody());System.out.println("listen function Received message: " + msg);emitter.next(msg);}});container.start();});}

测试

由于OpenApi不能支持实时展现流式数据,所以我们采用Postman来测试。
发送请求后,该页面一直处于滚动状态。
在这里插入图片描述
在管理后台发送一条消息
在这里插入图片描述
可以看到Postman收到了该消息
在这里插入图片描述
然后在发一条,Postman又会收到一条
在这里插入图片描述
这样我们就完成了“请求一次,多次返回”的效果。

完整代码

需要注意的是,返回的格式需要标记为produces = “text/event-stream”。

// controller
package com.rabbitmq.consumer.controller;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;import com.rabbitmq.consumer.service.ConsumerService;import reactor.core.publisher.Flux;@RestController
@RequestMapping("/consumer")
public class ConsumerController {@Autowiredprivate ConsumerService comsumerService;@GetMapping(value = "/listen", produces = "text/event-stream")public Flux<String> listen(@RequestParam String queueName) {return comsumerService.listen(queueName);}
}
// service
package com.rabbitmq.consumer.service;import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import jakarta.annotation.PostConstruct;
import reactor.core.publisher.Flux;@Service
public class ConsumerService {@Autowiredprivate RabbitTemplate rabbitTemplate;private ConnectionFactory connectionFactory;private final ReentrantLock lock = new ReentrantLock();private Map<String, SimpleMessageListenerContainer> listeners = new java.util.HashMap<>();@PostConstructpublic void init() {connectionFactory = rabbitTemplate.getConnectionFactory();}public Flux<String> listen(String queueName) {return Flux.create(emitter -> {SimpleMessageListenerContainer container = getListener(queueName, new MessageListener() {@Overridepublic void onMessage(Message message) {String msg = new String(message.getBody());System.out.println("listen function Received message: " + msg);emitter.next(msg);}});container.start();});}private SimpleMessageListenerContainer getListener(String queueName, MessageListener messageListener) {lock.lock();try {SimpleMessageListenerContainer listener = listeners.get(queueName);if (listener == null && messageListener != null) {listener = new SimpleMessageListenerContainer();listener.setConnectionFactory(connectionFactory);listener.setQueueNames(queueName);listener.setMessageListener(messageListener);listeners.put(queueName, listener);}return listener;} finally {lock.unlock();}}
}

工程代码

https://github.com/f304646673/RabbitMQDemo/tree/main/consumer

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/33028.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

捷云等保一体机 产品服务一站式等保合规交付解决方案

等保2.0的变化 2019 年 5 月 13 日&#xff0c;网络安全等级保护制度 2.0 国家标准&#xff08;简称“等保 2.0”&#xff09;正式发布&#xff0c;将等保 2.0 基本要求、测评要求、安全设计技术要求框架统一为安全管理中心支持下的三重防护结构框架。定级对象在按照等保 2.0 …

Python爬虫-贝壳新房

前言 本文是该专栏的第32篇,后面会持续分享python爬虫干货知识,记得关注。 本文以某房网为例,如下图所示,采集对应城市的新房房源数据。具体实现思路和详细逻辑,笔者将在正文结合完整代码进行详细介绍。接下来,跟着笔者直接往下看正文详细内容。(附带完整代码) 正文 地…

TensorFlow高阶API使用与PyTorch的安装

欢迎来到 Papicatch的博客 文章目录 &#x1f349;TensorFlow高阶API使用 &#x1f348;示例1&#xff1a;使用tf.keras构建模型 &#x1f34d;通过“序贯式”方法构建模型 &#x1f34d;通过“函数式”方法构建模型 &#x1f348;示例2&#xff1a;编译模型关键代码 &am…

ArkTS开发系列之导航 (2.6 图形)

上篇回顾&#xff1a;ArkTS开发系列之导航 (2.5.2 页面组件导航&#xff09; 本篇内容&#xff1a; 显示图片、自定义图形和画布自定义图形的学习使用 一、知识储备 1. 图片组件&#xff08;Image&#xff09; 可以展示jpg 、png 、svg 、gif等各格式的网络和本地资源文件图…

AI 开发平台(Coze)搭建小游戏《挑战花光10亿》

前言 本文讲解如何从零开始&#xff0c;使用扣子平台去搭建一个小游戏 这是成品链接&#xff1a;挑战花光10亿 - 扣子 AI Bot (coze.cn) 欢迎大家去体验一下 效果 正文 什么是coze平台&#xff1f; 扣子&#xff08;Coze&#xff09;是字节跳动推出的一站式 AI 开发平台&am…

周末设计高端企业_集团官网主题Discuz模板

风格名称: 周末设计_高端企业_集团官网 适用版本: Discuz! X3.0、X3.1、X3.2、X3.3、F1.0 风格编码: 使用语言包结构&#xff0c;适合全部编码 周末设计高端企业_集团官网主题Discuz模板

会话会话会话

目录 1.会话 1.1 为什么需要会话控制 1.2 域对象的范围 1.2.1 应用域的范围 1.2.2 请求域的范围 1.2.3 会话域的范围 1.3 Cookie技术 1.3.1 Cookie的概念 1.3.2 Cookie的作用 1.3.3 Cookie的应用场景 1.3.4 Cookie的入门案例 ① 目标 ② Cookie相关的API ③ Serv…

C++ | Leetcode C++题解之第187题重复的DNA序列

题目&#xff1a; 题解&#xff1a; class Solution {const int L 10;unordered_map<char, int> bin {{A, 0}, {C, 1}, {G, 2}, {T, 3}}; public:vector<string> findRepeatedDnaSequences(string s) {vector<string> ans;int n s.length();if (n < L…

文件传输 断点续传

什么是断点续传 简单来说断点续传指的是文件在上传或下载的过程中&#xff0c;由于网络差断开了&#xff0c;那么下次上传或下载时应该从断点处开始。 怎么实现 前端对文件进行分块前端使用多线程一块一块上传&#xff0c;上传前给服务端发一个消息检验该分块是否上传&#…

ubuntu编译和链接特定版本的opencv和boost

编译opencv&#xff0c;网上资料已经很多&#xff0c; cmake -D CMAKE_BUILD_TYPERelease \-D CMAKE_INSTALL_PREFIX../install \-D BUILD_DOCSON \-D BUILD_EXAMPLESON \-D OPENCV_GENERATE_PKGCONFIGON \ ..如上&#xff0c;make install后 安装到了父目录的install目录下 …

计算机网络协议常考点!!!

应用层协议 HTTP协议 基于TCP协议&#xff0c;是一种用于传输超文本和多媒体内容的协议&#xff0c;主要是为浏览器之前的通信而设计的&#xff1b; get请求和post请求区别 请求参数位置不同&#xff1a;get会将请求参数放在URL后面并通过&运算符连接&#xff0c;而pos…

C++ 重建二叉树(递归方法)

/*** struct TreeNode {* int val;* struct TreeNode *left;* struct TreeNode *right;* TreeNode(int x) : val(x), left(nullptr), right(nullptr) {}* };*/ #include <vector> class Solution {public:/*** 代码中的类名、方法名、参数名已经指定,请勿修改,直接…

等保测评中,关键数据的保护措施

简介 等保测评&#xff0c;即网络安全等级保护测评&#xff0c;是依据国家网络安全等级保护制度&#xff0c;对企业信息系统的安全保护水平进行的评估和认证过程。它通过一系列的标准流程和技术手段&#xff0c;确保信息系统在建设、运维过程中达到国家规定的安全保护等级要求…

GPOPS-II教程(1): 语法和一个最优控制问题案例

文章目录 一、写在前面二、GPOPS-II结构2.1 setup的语法2.2 function的语法2.2.1 setup.functions.continuousfun2.2.2 setup.functions.endpoint 2.3 bounds的语法setup.guessoutput 三、例题3.1 问题描述3.2 代码部分3.2.1 main function3.2.1.1 初始参数设置3.2.1.2 边界条件…

安装VEX外部编辑器

Houdini20配置VEX外部编辑器方法_哔哩哔哩_bilibili 下载并安装Visual Studio Code软件&#xff1a;Download Visual Studio Code - Mac, Linux, Windows 在Visual Studio Code软件内&#xff0c;安装相关插件&#xff0c;如&#xff1a; 中文汉化插件vex插件 安装Houdini Expr…

图像处理Python库--图片裁剪、缩放、灰度图、圆角等

图像处理Python库 py-img-processor1. 安装2. 使用(Usage)2.1 运行配置2.2 图像处理处理函数图像处理参数为字符串图像处理参数为JSON 命令行提取图像主色调 py-img-processor Image editor using Python and Pillow. 依赖Pillow开发的Python库&#xff0c;用于图像编辑处理。…

linux常用API接口

linux常用API接口 文章目录 linux常用API接口1.应用层内存映射mmap取消内存映射munmap终端打印可用方式1.puts 函数2.文件操作函数 fprintf3.字符输出函数 putchar4.fwrite 函数 2.内核层 1.应用层 内存映射mmap mmap 是一个用于内存映射的系统调用&#xff0c;它可以将一个文…

Java零基础-集合:List

哈喽&#xff0c;各位小伙伴们&#xff0c;你们好呀&#xff0c;我是喵手。运营社区&#xff1a;C站/掘金/腾讯云&#xff1b;欢迎大家常来逛逛 今天我要给大家分享一些自己日常学习到的一些知识点&#xff0c;并以文字的形式跟大家一起交流&#xff0c;互相学习&#xff0c;一…

nest.js关键笔记

Nest.js 介绍核心功能设计模式&#xff1a;IOC 控制反转 DI 依赖注入前置知识&#xff1a;装饰器前置知识装饰器-实现一个GET请求 Nestjs脚手架Nestjs cli 常用命令 RESTful 风格设计Nestjs 控制器控制器中常见的参数装饰器 Session 实例Nestjs 提供者**工厂模式**异步模式 Nes…

【Unity服务器01】之【AssetBundle上传加载u3d模型】

首先打开一个项目导入一个简单的场景 导入怪物资源&#xff0c; AssetBundle知识点&#xff1a; 1.指定资源的AssetBundle属性标签 &#xff08;1&#xff09;找到AssetBundle属性标签 &#xff08;2&#xff09;A标签 代表&#xff1a;资源目录&#xff08;决定打包之后在哪…