Sprint Cloud Stream整合RocketMq和websocket实现消息发布订阅

1.引入RocketMQ依赖:首先,在pom.xml文件中添加RocketMQ的依赖:

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.0</version> <!-- 版本号根据实际情况调整 -->
</dependency>

2.配置RocketMQ连接信息:在application.propertiesapplication.yml中配置RocketMQ的连接信息,包括Name Server地址等:

spring:application:name: ${sn.publish}cloud:stream:rocketmq:binder:name-server: ${rocket-mq.name-server}bindings:output:producer:group: testSocketsync: truebindings:output:destination: test-topiccontent-type: application/json

3.消息发布组件

@Component
public class MqSourceComponent {@ResourceSource source;public void publishNotify(SampleNotifyDTO notify) {source.output().send(MessageBuilder.withPayload(notify).build());}
}

4.消息发布控制器

@RestController
@Api(tags = "rocketmq")
public class MqController {@ResourceMqSourceComponent mq;@ApiOperation(value = "测试发布消息")@PostMapping("test-publish")public JsonVO<String> testSend(SampleNotifyDTO notify) {mq.publishNotify(notify);return JsonVO.success("消息已发送");}
}

项目结构:

接下来是websocket模块的搭建

1. 依赖添加

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.0</version> <!-- 版本号根据实际情况调整 -->
</dependency>

2.application.yml配置文件

server:port: ${sp.ws}
spring:application:name: ${sn.ws}cloud:stream:rocketmq:binder:name-server: ${rocket-mq.name-server}bindings:input:destination: test-topiccontent-type: application/jsongroup: testSocket

3.将应用程序绑定到消息代理

@EnableBinding(Sink.class): 这是Spring Cloud Stream的注解,它用于将应用程序绑定到消息代理(如Kafka、RabbitMQ等)。Sink.class是Spring Cloud Stream提供的预定义输入通道,允许你接收消息。通过这个注解,你的应用程序可以监听消息通道,并定义消息处理逻辑。

@SpringBootApplication
@EnableDiscoveryClient
@EnableBinding(Sink.class)
public class WsApplication {public static void main(String[] args) {SpringApplication.run(WsApplication.class, args);}}

4.消息订阅组件

监听消息通道中的消息,一旦有消息到达,就会触发listenNotify方法,该方法负责处理消息并通过chat服务发送响应。

@Component
@Slf4j
public class MqListenComponent {@ResourceChatService chat;@StreamListener(Sink.INPUT)public void listenNotify(SampleNotifyDTO notify) {log.info(notify.toString());chat.sendMessage(notify.getClientId(), notify);}
}

5.消息通知服务

package com.zeroone.star.ws.service;import cn.hutool.json.JSONUtil;
import lombok.SneakyThrows;
import org.springframework.stereotype.Component;import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;@Component
@ServerEndpoint("/chat")
public class ChatService {/*** 连接会话池*/private static ConcurrentHashMap<String, Session> SESSION_POOL = new ConcurrentHashMap<>();@OnOpenpublic void onOpen(Session session) throws IOException {// 判断客户端对象是否存在if (SESSION_POOL.containsKey(session.getQueryString())) {CloseReason closeReason = new CloseReason(CloseReason.CloseCodes.CANNOT_ACCEPT, "ID冲突,连接拒绝");session.getUserProperties().put("reason", closeReason);session.close();return;}// 将客户端对象存储到会话池SESSION_POOL.put(session.getQueryString(), session);System.out.println("客户端(" + session.getQueryString() + "):开启了连接");}@OnMessagepublic String onMessage(String msg, Session session) throws IOException {// 解析消息 ==> ID::消息内容String[] msgArr = msg.split("::", 2);// 处理群发消息,ID==all表示群发if ("all".equalsIgnoreCase(msgArr[0])) {for (Session one : SESSION_POOL.values()) {// 排除自己if (one == session) {continue;}// 发送消息one.getBasicRemote().sendText(msgArr[1]);}}// 指定发送else {// 获取接收方Session target = SESSION_POOL.get(msgArr[0]);if (target != null) {target.getBasicRemote().sendText(msgArr[1]);}}return session.getQueryString() + ":消息发送成功";}@OnClosepublic void onClose(Session session) {// 连接拒绝关闭会话Object reason = session.getUserProperties().get("reason");if (reason instanceof CloseReason) {CloseReason creason = (CloseReason) reason;if (creason.getCloseCode() == CloseReason.CloseCodes.CANNOT_ACCEPT) {System.out.println("拒绝客户(" + session.getQueryString() + "):关闭连接");return;}}// 从会话池中移除会话SESSION_POOL.remove(session.getQueryString());System.out.println("客户端(" + session.getQueryString() + "):关闭连接");}@OnErrorpublic void onError(Session session, Throwable throwable) {System.out.println("客户端(" + session.getQueryString() + ")错误信息:" + throwable.getMessage());}@SneakyThrowspublic void sendMessage(String id, Object message) {// 群发if ("all".equalsIgnoreCase(id)) {for (Session one : SESSION_POOL.values()) {// 发送消息one.getBasicRemote().sendText(JSONUtil.toJsonStr(message));}}// 指定发送else {// 获取接收方Session target = SESSION_POOL.get(id);if (target != null) {target.getBasicRemote().sendText(JSONUtil.toJsonStr(message));}}}
}

项目结构:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/123945.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

文件改名,轻松添加前缀顺序编号,文件改名更高效!

您是否曾经需要批量修改文件名&#xff0c;并希望在文件名中添加特定的前缀或顺序编号&#xff1f;现在&#xff0c;我们为您带来了一款全新的文件改名工具&#xff0c;帮助您轻松解决这个问题&#xff01; 第一步&#xff0c;进入文件批量改名高手主页面&#xff0c;在板块栏…

C++学习笔记之四(标准库、标准模板库、vector类)

C 1、C标准库2、C标准模板库2.1、vector2.1.1、vector与array2.1.2、vector与函数对象2.1.3、vector与迭代器2.1.4、vector与算法 1、C标准库 C C C标准库指的是标准程序库( S t a n d a r d Standard Standard L i b a r a y Libaray Libaray)&#xff0c;它定义了十个大类…

js读取文件 vue读取文件 JavaScript 读取文件解析为字符串 js读取文件 Vue读取文件

js读取文件 vue读取文件 JavaScript 读取文件解析为字符串 js读取文件 Vue读取文件 js读取文件 vue读取文件 JavaScript 读取文件解析为字符串 js读取文件 Vue读取文件使用 FileReader js读取文件 vue读取文件 JavaScript 读取文件解析为字符串 js读取文件 Vue读取文件 使用 F…

【python爬虫】设计自己的爬虫 1. request封装

通过requests.session().request 封装request方法 考虑到请求HTTP/2.0 同时封装httpx 来处理HTTP/2.0的请求 封装requests # 遇到请求失败的情况时 重新请求&#xff0c;请求5次等待2s retry(stop_max_attempt_number5, retry_on_resultlambda re_data: re_data is None, wai…

亚马逊,速卖通,美客多如何打造爆款商品,排名提升榜首

1、产品Listing的完整性 Listing是亚马逊A9算法认识你产品的基础&#xff0c;在发布一条listing的时候&#xff0c;尽可能地做到最好!在准备一条listing之前&#xff0c;一定事先要收集、整理足够多的产品关键词&#xff0c;在优化listing内容的时候填充进去。仔细观察优秀竞品…

Realrek 2.5G交换机 8+1万兆光RTL8373-VB-CG方案简介

新一代2.5G交换机方案RTL8373-VB-CG可以提供4中不同形态 a. 52.5G 电口110G光》RTL8373 b. 52.5G 电口110G电》RTL83738261 c. 82.5G 电口110G光》RTL83738224 d.82.5G 电口110G电口》RTL837382248261 1.概述 Realtek RTL8373-CG是一款低功耗、高性能、高度集成的八端口2.5G和一…

C++设计模式_19_Memento 备忘录(理解,目前多使用序列化方案来实现)

Memento 备忘录模式也属于“状态变化”模式&#xff0c;它是一个小模式&#xff0c;在今天来看有些过时&#xff0c;当今已经很少使用当前模式实现需求&#xff0c;思想却不变&#xff08;信息隐藏&#xff09;&#xff0c;目前多使用序列化方案来实现。本系列所介绍的模式&…

小程序开发——小程序项目的配置与生命周期

1.app.json配置属性 app.json配置属性 2.页面配置 app的页面配置指的是pages属性&#xff0c; pages数组的第一个页面将默认作为小程序的启动页。利用开发工具新建页面时&#xff0c;则pages属性对应的数组将自动添加该页面的路径&#xff0c;若是在硬盘中添加文件的形式则不…

C++数据结构X篇_23_快速排序(最快、不稳定的排序)

文章参考十大经典排序算法-快速排序算法详解进行整理补充。快速排序是最快的排序方法。 排序思路&#xff1a;分治法-挖坑填数&#xff1a;大问题分解为各个小问题&#xff0c;对小问题求解&#xff0c;使得大问题得以解决 文章目录 1. 什么是快速排序1.1 概念1.2 算法原理1.3 …

手写常用的javascript函数

// 获取url参数 function getQueryString(name) {var reg new RegExp((^|&) name ([^&]*)(&|$), i);var r window.location.search.substr(1).match(reg);if (r ! null) {return unescape(r[2]);}return null; }// 验证手机号码(strict) function validPhoneNu…

Linux rm命令:删除文件或目录

当 Linux 系统使用很长时间之后&#xff0c;可能会有一些已经没用的文件&#xff08;即垃圾&#xff09;&#xff0c;这些文件不但会消耗宝贵的硬盘资源&#xff0c;还是降低系统的运行效率&#xff0c;因此需要及时地清理。 rm 是强大的删除命令&#xff0c;它可以永久性地删除…

一百九十七、Java——IDEA项目中把多层文件夹拆开显示

一、目的 由于IDEA项目中&#xff0c;默认的是把文件夹连在一起显示&#xff0c;于是为了方便需要把这些连在一起的文件夹拆开&#xff0c;分层显示 如文件夹cn.kgc 二、解决措施 解决方法很简单 &#xff08;一&#xff09;找到IDEA项目上的小齿轮 &#xff08;二&#xf…

基于深度学习的单图像人群计数研究:网络设计、损失函数和监控信号

摘要 https://arxiv.org/pdf/2012.15685v2.pdf 单图像人群计数是一个具有挑战性的计算机视觉问题,在公共安全、城市规划、交通管理等领域有着广泛的应用。近年来,随着深度学习技术的发展,人群计数引起了广泛的关注并取得了巨大的成功。通过系统地回顾和总结2015年以来基于深…

基于STM32的多功能智能密码锁控制设计

**单片机设计介绍&#xff0c;1653基于STM32的多功能智能密码锁控制设计 文章目录 一 概要二、功能设计设计思路 三、 软件设计原理图 五、 程序文档 六、 文章目录 一 概要 基于STM32的多功能智能密码锁控制设计是一种用STM32微控制器开发的系统&#xff0c;用于控制和管理密码…

手机桌面待办事项APP推荐,手机上可使用哪些待办事项APP

生活中&#xff0c;无论你是一名专业人士&#xff0c;学生&#xff0c;还是家庭主妇&#xff0c;总有各种各样的任务等待着你&#xff0c;有时候需要额外的工具来提醒和管理这些待办事项。手机上的待办事项APP软件成为了这个任务的好帮手&#xff0c;为我们提供了快速、方便的方…

前端学习路线指南:从入门到精通【①】

前言 作为一个前端开发者&#xff0c;学习前端技术是必不可少的。然而&#xff0c;由于前端领域的广阔和不断演进的技术栈&#xff0c;对于初学者来说可能会感到困惑。本篇文章将为你提供一个清晰的前端学习路线&#xff0c;帮助你系统地掌握前端开发技能&#xff0c;并成为一名…

spring boot配置ssl(多cer格式)保姆级教程

1. 准备cer格式的证书&#xff1b; 2. 合并cer证书并转化成jks格式的证书 为啥有这一步&#xff0c;因为cer证书配置在spring boot项目中&#xff0c;项目启动不起来。如果有大佬想指导一下可以给我留言&#xff0c;在此先谢过大佬。 1&#xff09;先创建一个jks格式的证…

Kotlin(八) 数据类、单例

目录 一&#xff1a;创建数据类 二&#xff1a;单例类 一&#xff1a;创建数据类 和Java的不同&#xff0c;kotlin的数据类比较简单&#xff0c;New→Kotlin File/Class&#xff0c;在弹出的对话框中输入“Book”&#xff0c;创建类型选择“Data”。如图&#xff1a; 然后编…

Keil Map信息解析

基本功能&#xff1a; 1.在Keil里面&#xff0c;通过App.Map复制所有信息。然后解析剪辑版内容。 2.随意输入一个函数内存地址&#xff0c;即可遍历出该内存地址属于哪个.c或者函数名。或者能遍历出变量。 强化功能&#xff1a; 1.通过Keil5 命令 Save xxxxxxx\1.Hex 0x200173…

使用设计模式省去大量的if-elsef分支

1.测试类 Testpublic void test7() {/*** 使用设计模式前*///模拟入参String name "?";if("张三".equals(name)){System.out.println("按照张三的策略执行的任务!");}else if ("李四".equals(name)){System.out.println("按照李…