RabbitMQ实践——使用WebFlux响应式方式实时返回队列中消息

大纲

  • Pom.xml
  • 监听队列
  • 实时返回消息
  • 测试
  • 完整代码
  • 工程代码

在之前的案例中,我们在管理后台收发消息都是通过短连接的形式。本文我们将探索对队列中消息的实时读取,并通过流式数据返回给客户端。
webflux是反应式Web框架,客户端可以通过一个长连接和服务端相连,后续服务端可以通过该连接持续给客户端发送消息。可以达到:发送一次,多次接收的效果。

Pom.xml

由于我们要使用Rabbitmq,所以要新增如下依赖

		<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-stream</artifactId></dependency>

webflux的依赖如下:

		<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-core</artifactId><version>3.6.7</version></dependency>

监听队列

下面代码会返回一个监听队列的Container

    private SimpleMessageListenerContainer getListener(String queueName, MessageListener messageListener) {lock.lock();try {SimpleMessageListenerContainer listener = listeners.get(queueName);if (listener == null && messageListener != null) {listener = new SimpleMessageListenerContainer();listener.setConnectionFactory(connectionFactory);listener.setQueueNames(queueName);listener.setMessageListener(messageListener);listeners.put(queueName, listener);}return listener;} finally {lock.unlock();}}

实时返回消息

一旦消费者读取到消息,onMessage方法会被调用。然后Flux的消费者会将消息投递到流上。

    public Flux<String> listen(String queueName) {return Flux.create(emitter -> {SimpleMessageListenerContainer container = getListener(queueName, new MessageListener() {@Overridepublic void onMessage(Message message) {String msg = new String(message.getBody());System.out.println("listen function Received message: " + msg);emitter.next(msg);}});container.start();});}

测试

由于OpenApi不能支持实时展现流式数据,所以我们采用Postman来测试。
发送请求后,该页面一直处于滚动状态。
在这里插入图片描述
在管理后台发送一条消息
在这里插入图片描述
可以看到Postman收到了该消息
在这里插入图片描述
然后在发一条,Postman又会收到一条
在这里插入图片描述
这样我们就完成了“请求一次,多次返回”的效果。

完整代码

需要注意的是,返回的格式需要标记为produces = “text/event-stream”。

// controller
package com.rabbitmq.consumer.controller;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;import com.rabbitmq.consumer.service.ConsumerService;import reactor.core.publisher.Flux;@RestController
@RequestMapping("/consumer")
public class ConsumerController {@Autowiredprivate ConsumerService comsumerService;@GetMapping(value = "/listen", produces = "text/event-stream")public Flux<String> listen(@RequestParam String queueName) {return comsumerService.listen(queueName);}
}
// service
package com.rabbitmq.consumer.service;import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import jakarta.annotation.PostConstruct;
import reactor.core.publisher.Flux;@Service
public class ConsumerService {@Autowiredprivate RabbitTemplate rabbitTemplate;private ConnectionFactory connectionFactory;private final ReentrantLock lock = new ReentrantLock();private Map<String, SimpleMessageListenerContainer> listeners = new java.util.HashMap<>();@PostConstructpublic void init() {connectionFactory = rabbitTemplate.getConnectionFactory();}public Flux<String> listen(String queueName) {return Flux.create(emitter -> {SimpleMessageListenerContainer container = getListener(queueName, new MessageListener() {@Overridepublic void onMessage(Message message) {String msg = new String(message.getBody());System.out.println("listen function Received message: " + msg);emitter.next(msg);}});container.start();});}private SimpleMessageListenerContainer getListener(String queueName, MessageListener messageListener) {lock.lock();try {SimpleMessageListenerContainer listener = listeners.get(queueName);if (listener == null && messageListener != null) {listener = new SimpleMessageListenerContainer();listener.setConnectionFactory(connectionFactory);listener.setQueueNames(queueName);listener.setMessageListener(messageListener);listeners.put(queueName, listener);}return listener;} finally {lock.unlock();}}
}

工程代码

https://github.com/f304646673/RabbitMQDemo/tree/main/consumer

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/33028.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

捷云等保一体机 产品服务一站式等保合规交付解决方案

等保2.0的变化 2019 年 5 月 13 日&#xff0c;网络安全等级保护制度 2.0 国家标准&#xff08;简称“等保 2.0”&#xff09;正式发布&#xff0c;将等保 2.0 基本要求、测评要求、安全设计技术要求框架统一为安全管理中心支持下的三重防护结构框架。定级对象在按照等保 2.0 …

Python爬虫-贝壳新房

前言 本文是该专栏的第32篇,后面会持续分享python爬虫干货知识,记得关注。 本文以某房网为例,如下图所示,采集对应城市的新房房源数据。具体实现思路和详细逻辑,笔者将在正文结合完整代码进行详细介绍。接下来,跟着笔者直接往下看正文详细内容。(附带完整代码) 正文 地…

TensorFlow高阶API使用与PyTorch的安装

欢迎来到 Papicatch的博客 文章目录 &#x1f349;TensorFlow高阶API使用 &#x1f348;示例1&#xff1a;使用tf.keras构建模型 &#x1f34d;通过“序贯式”方法构建模型 &#x1f34d;通过“函数式”方法构建模型 &#x1f348;示例2&#xff1a;编译模型关键代码 &am…

ArkTS开发系列之导航 (2.6 图形)

上篇回顾&#xff1a;ArkTS开发系列之导航 (2.5.2 页面组件导航&#xff09; 本篇内容&#xff1a; 显示图片、自定义图形和画布自定义图形的学习使用 一、知识储备 1. 图片组件&#xff08;Image&#xff09; 可以展示jpg 、png 、svg 、gif等各格式的网络和本地资源文件图…

AI 开发平台(Coze)搭建小游戏《挑战花光10亿》

前言 本文讲解如何从零开始&#xff0c;使用扣子平台去搭建一个小游戏 这是成品链接&#xff1a;挑战花光10亿 - 扣子 AI Bot (coze.cn) 欢迎大家去体验一下 效果 正文 什么是coze平台&#xff1f; 扣子&#xff08;Coze&#xff09;是字节跳动推出的一站式 AI 开发平台&am…

周末设计高端企业_集团官网主题Discuz模板

风格名称: 周末设计_高端企业_集团官网 适用版本: Discuz! X3.0、X3.1、X3.2、X3.3、F1.0 风格编码: 使用语言包结构&#xff0c;适合全部编码 周末设计高端企业_集团官网主题Discuz模板

会话会话会话

目录 1.会话 1.1 为什么需要会话控制 1.2 域对象的范围 1.2.1 应用域的范围 1.2.2 请求域的范围 1.2.3 会话域的范围 1.3 Cookie技术 1.3.1 Cookie的概念 1.3.2 Cookie的作用 1.3.3 Cookie的应用场景 1.3.4 Cookie的入门案例 ① 目标 ② Cookie相关的API ③ Serv…

C++ | Leetcode C++题解之第187题重复的DNA序列

题目&#xff1a; 题解&#xff1a; class Solution {const int L 10;unordered_map<char, int> bin {{A, 0}, {C, 1}, {G, 2}, {T, 3}}; public:vector<string> findRepeatedDnaSequences(string s) {vector<string> ans;int n s.length();if (n < L…

GPOPS-II教程(1): 语法和一个最优控制问题案例

文章目录 一、写在前面二、GPOPS-II结构2.1 setup的语法2.2 function的语法2.2.1 setup.functions.continuousfun2.2.2 setup.functions.endpoint 2.3 bounds的语法setup.guessoutput 三、例题3.1 问题描述3.2 代码部分3.2.1 main function3.2.1.1 初始参数设置3.2.1.2 边界条件…

安装VEX外部编辑器

Houdini20配置VEX外部编辑器方法_哔哩哔哩_bilibili 下载并安装Visual Studio Code软件&#xff1a;Download Visual Studio Code - Mac, Linux, Windows 在Visual Studio Code软件内&#xff0c;安装相关插件&#xff0c;如&#xff1a; 中文汉化插件vex插件 安装Houdini Expr…

图像处理Python库--图片裁剪、缩放、灰度图、圆角等

图像处理Python库 py-img-processor1. 安装2. 使用(Usage)2.1 运行配置2.2 图像处理处理函数图像处理参数为字符串图像处理参数为JSON 命令行提取图像主色调 py-img-processor Image editor using Python and Pillow. 依赖Pillow开发的Python库&#xff0c;用于图像编辑处理。…

nest.js关键笔记

Nest.js 介绍核心功能设计模式&#xff1a;IOC 控制反转 DI 依赖注入前置知识&#xff1a;装饰器前置知识装饰器-实现一个GET请求 Nestjs脚手架Nestjs cli 常用命令 RESTful 风格设计Nestjs 控制器控制器中常见的参数装饰器 Session 实例Nestjs 提供者**工厂模式**异步模式 Nes…

【Unity服务器01】之【AssetBundle上传加载u3d模型】

首先打开一个项目导入一个简单的场景 导入怪物资源&#xff0c; AssetBundle知识点&#xff1a; 1.指定资源的AssetBundle属性标签 &#xff08;1&#xff09;找到AssetBundle属性标签 &#xff08;2&#xff09;A标签 代表&#xff1a;资源目录&#xff08;决定打包之后在哪…

如何给文档设置密码?电脑文件安全加密的详细操作步骤(10种方法)

在数字化时代&#xff0c;电脑文件的安全和隐私至关重要。通过给电脑的文件或者文件夹设置密码和加密&#xff0c;可以有效保护你的重要文件不被未经授权的人员访问&#xff0c;特别是公司的重要岗位&#xff0c;一些特殊的机密文件&#xff0c;投标文件&#xff0c;资金文件等…

动手学深度学习(Pytorch版)代码实践 -深度学习基础-10权重衰减

10权重衰减 """ 正则化是处理过拟合的常用方法&#xff1a;在训练集的损失函数中加入惩罚项&#xff0c;以降低学习到的模型的复杂度。 保持模型简单的一个特别的选择是使用L2惩罚的权重衰减。这会导致学习算法更新步骤中的权重衰减。 """impor…

html--好看的手机充值单页

<!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><title>线上充值-首页</title><meta content"widthdevice-width,initial-scale1.0,maximum-scale1.0,user-scalable0" name"viewport&…

maya模型仓鼠制作

小仓鼠建模&#xff08;6&#xff09;_哔哩哔哩_bilibili 20240623作品---个人评价&#xff1a;第一次做的&#xff0c;虽然有点丑&#xff0c;但是还能看&#xff01;希望后面有些进步

论文阅读--Efficient Hybrid Zoom using Camera Fusion on Mobile Phones

这是谷歌影像团队 2023 年发表在 Siggraph Asia 上的一篇文章&#xff0c;主要介绍的是利用多摄融合的思路进行变焦。 单反相机因为卓越的硬件性能&#xff0c;可以非常方便的实现光学变焦。不过目前的智能手机&#xff0c;受制于物理空间的限制&#xff0c;还不能做到像单反一…

线程封装,互斥

文章目录 线程封装线程互斥加锁、解锁认识接口解决问题理解锁 线程封装 C/C代码混编引起的问题 此处pthread_create函数要求传入参数为void * func(void * )类型,按理来说ThreadRoutine满足,但是 这是在内类完成封装,所以ThreadRoutine函数实际是两个参数,第一个参数Thread* …

【建设方案】大数据湖一体化建设方案(ppt原件)

1、背景&#xff1a;大数据湖的发展背景与建设理念 2、体系&#xff1a;大数据湖体系规划与建设思路 3、生态圈&#xff1a;探索新兴业务入湖建设模式 4、共享&#xff1a;大数据湖统一访问共享规划 5、运营&#xff1a;大数据湖一体化运营管理建设 &#xff08;本方案及更多方…