Sprint Cloud Stream整合RocketMq和websocket实现消息发布订阅

1.引入RocketMQ依赖:首先,在pom.xml文件中添加RocketMQ的依赖:

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.0</version> <!-- 版本号根据实际情况调整 -->
</dependency>

2.配置RocketMQ连接信息:在application.propertiesapplication.yml中配置RocketMQ的连接信息,包括Name Server地址等:

spring:application:name: ${sn.publish}cloud:stream:rocketmq:binder:name-server: ${rocket-mq.name-server}bindings:output:producer:group: testSocketsync: truebindings:output:destination: test-topiccontent-type: application/json

3.消息发布组件

@Component
public class MqSourceComponent {@ResourceSource source;public void publishNotify(SampleNotifyDTO notify) {source.output().send(MessageBuilder.withPayload(notify).build());}
}

4.消息发布控制器

@RestController
@Api(tags = "rocketmq")
public class MqController {@ResourceMqSourceComponent mq;@ApiOperation(value = "测试发布消息")@PostMapping("test-publish")public JsonVO<String> testSend(SampleNotifyDTO notify) {mq.publishNotify(notify);return JsonVO.success("消息已发送");}
}

项目结构:

接下来是websocket模块的搭建

1. 依赖添加

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.0</version> <!-- 版本号根据实际情况调整 -->
</dependency>

2.application.yml配置文件

server:port: ${sp.ws}
spring:application:name: ${sn.ws}cloud:stream:rocketmq:binder:name-server: ${rocket-mq.name-server}bindings:input:destination: test-topiccontent-type: application/jsongroup: testSocket

3.将应用程序绑定到消息代理

@EnableBinding(Sink.class): 这是Spring Cloud Stream的注解,它用于将应用程序绑定到消息代理(如Kafka、RabbitMQ等)。Sink.class是Spring Cloud Stream提供的预定义输入通道,允许你接收消息。通过这个注解,你的应用程序可以监听消息通道,并定义消息处理逻辑。

@SpringBootApplication
@EnableDiscoveryClient
@EnableBinding(Sink.class)
public class WsApplication {public static void main(String[] args) {SpringApplication.run(WsApplication.class, args);}}

4.消息订阅组件

监听消息通道中的消息,一旦有消息到达,就会触发listenNotify方法,该方法负责处理消息并通过chat服务发送响应。

@Component
@Slf4j
public class MqListenComponent {@ResourceChatService chat;@StreamListener(Sink.INPUT)public void listenNotify(SampleNotifyDTO notify) {log.info(notify.toString());chat.sendMessage(notify.getClientId(), notify);}
}

5.消息通知服务

package com.zeroone.star.ws.service;import cn.hutool.json.JSONUtil;
import lombok.SneakyThrows;
import org.springframework.stereotype.Component;import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;@Component
@ServerEndpoint("/chat")
public class ChatService {/*** 连接会话池*/private static ConcurrentHashMap<String, Session> SESSION_POOL = new ConcurrentHashMap<>();@OnOpenpublic void onOpen(Session session) throws IOException {// 判断客户端对象是否存在if (SESSION_POOL.containsKey(session.getQueryString())) {CloseReason closeReason = new CloseReason(CloseReason.CloseCodes.CANNOT_ACCEPT, "ID冲突,连接拒绝");session.getUserProperties().put("reason", closeReason);session.close();return;}// 将客户端对象存储到会话池SESSION_POOL.put(session.getQueryString(), session);System.out.println("客户端(" + session.getQueryString() + "):开启了连接");}@OnMessagepublic String onMessage(String msg, Session session) throws IOException {// 解析消息 ==> ID::消息内容String[] msgArr = msg.split("::", 2);// 处理群发消息,ID==all表示群发if ("all".equalsIgnoreCase(msgArr[0])) {for (Session one : SESSION_POOL.values()) {// 排除自己if (one == session) {continue;}// 发送消息one.getBasicRemote().sendText(msgArr[1]);}}// 指定发送else {// 获取接收方Session target = SESSION_POOL.get(msgArr[0]);if (target != null) {target.getBasicRemote().sendText(msgArr[1]);}}return session.getQueryString() + ":消息发送成功";}@OnClosepublic void onClose(Session session) {// 连接拒绝关闭会话Object reason = session.getUserProperties().get("reason");if (reason instanceof CloseReason) {CloseReason creason = (CloseReason) reason;if (creason.getCloseCode() == CloseReason.CloseCodes.CANNOT_ACCEPT) {System.out.println("拒绝客户(" + session.getQueryString() + "):关闭连接");return;}}// 从会话池中移除会话SESSION_POOL.remove(session.getQueryString());System.out.println("客户端(" + session.getQueryString() + "):关闭连接");}@OnErrorpublic void onError(Session session, Throwable throwable) {System.out.println("客户端(" + session.getQueryString() + ")错误信息:" + throwable.getMessage());}@SneakyThrowspublic void sendMessage(String id, Object message) {// 群发if ("all".equalsIgnoreCase(id)) {for (Session one : SESSION_POOL.values()) {// 发送消息one.getBasicRemote().sendText(JSONUtil.toJsonStr(message));}}// 指定发送else {// 获取接收方Session target = SESSION_POOL.get(id);if (target != null) {target.getBasicRemote().sendText(JSONUtil.toJsonStr(message));}}}
}

项目结构:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/123945.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

文件改名,轻松添加前缀顺序编号,文件改名更高效!

您是否曾经需要批量修改文件名&#xff0c;并希望在文件名中添加特定的前缀或顺序编号&#xff1f;现在&#xff0c;我们为您带来了一款全新的文件改名工具&#xff0c;帮助您轻松解决这个问题&#xff01; 第一步&#xff0c;进入文件批量改名高手主页面&#xff0c;在板块栏…

C++学习笔记之四(标准库、标准模板库、vector类)

C 1、C标准库2、C标准模板库2.1、vector2.1.1、vector与array2.1.2、vector与函数对象2.1.3、vector与迭代器2.1.4、vector与算法 1、C标准库 C C C标准库指的是标准程序库( S t a n d a r d Standard Standard L i b a r a y Libaray Libaray)&#xff0c;它定义了十个大类…

亚马逊,速卖通,美客多如何打造爆款商品,排名提升榜首

1、产品Listing的完整性 Listing是亚马逊A9算法认识你产品的基础&#xff0c;在发布一条listing的时候&#xff0c;尽可能地做到最好!在准备一条listing之前&#xff0c;一定事先要收集、整理足够多的产品关键词&#xff0c;在优化listing内容的时候填充进去。仔细观察优秀竞品…

Realrek 2.5G交换机 8+1万兆光RTL8373-VB-CG方案简介

新一代2.5G交换机方案RTL8373-VB-CG可以提供4中不同形态 a. 52.5G 电口110G光》RTL8373 b. 52.5G 电口110G电》RTL83738261 c. 82.5G 电口110G光》RTL83738224 d.82.5G 电口110G电口》RTL837382248261 1.概述 Realtek RTL8373-CG是一款低功耗、高性能、高度集成的八端口2.5G和一…

C++设计模式_19_Memento 备忘录(理解,目前多使用序列化方案来实现)

Memento 备忘录模式也属于“状态变化”模式&#xff0c;它是一个小模式&#xff0c;在今天来看有些过时&#xff0c;当今已经很少使用当前模式实现需求&#xff0c;思想却不变&#xff08;信息隐藏&#xff09;&#xff0c;目前多使用序列化方案来实现。本系列所介绍的模式&…

小程序开发——小程序项目的配置与生命周期

1.app.json配置属性 app.json配置属性 2.页面配置 app的页面配置指的是pages属性&#xff0c; pages数组的第一个页面将默认作为小程序的启动页。利用开发工具新建页面时&#xff0c;则pages属性对应的数组将自动添加该页面的路径&#xff0c;若是在硬盘中添加文件的形式则不…

C++数据结构X篇_23_快速排序(最快、不稳定的排序)

文章参考十大经典排序算法-快速排序算法详解进行整理补充。快速排序是最快的排序方法。 排序思路&#xff1a;分治法-挖坑填数&#xff1a;大问题分解为各个小问题&#xff0c;对小问题求解&#xff0c;使得大问题得以解决 文章目录 1. 什么是快速排序1.1 概念1.2 算法原理1.3 …

Linux rm命令:删除文件或目录

当 Linux 系统使用很长时间之后&#xff0c;可能会有一些已经没用的文件&#xff08;即垃圾&#xff09;&#xff0c;这些文件不但会消耗宝贵的硬盘资源&#xff0c;还是降低系统的运行效率&#xff0c;因此需要及时地清理。 rm 是强大的删除命令&#xff0c;它可以永久性地删除…

一百九十七、Java——IDEA项目中把多层文件夹拆开显示

一、目的 由于IDEA项目中&#xff0c;默认的是把文件夹连在一起显示&#xff0c;于是为了方便需要把这些连在一起的文件夹拆开&#xff0c;分层显示 如文件夹cn.kgc 二、解决措施 解决方法很简单 &#xff08;一&#xff09;找到IDEA项目上的小齿轮 &#xff08;二&#xf…

基于深度学习的单图像人群计数研究:网络设计、损失函数和监控信号

摘要 https://arxiv.org/pdf/2012.15685v2.pdf 单图像人群计数是一个具有挑战性的计算机视觉问题,在公共安全、城市规划、交通管理等领域有着广泛的应用。近年来,随着深度学习技术的发展,人群计数引起了广泛的关注并取得了巨大的成功。通过系统地回顾和总结2015年以来基于深…

基于STM32的多功能智能密码锁控制设计

**单片机设计介绍&#xff0c;1653基于STM32的多功能智能密码锁控制设计 文章目录 一 概要二、功能设计设计思路 三、 软件设计原理图 五、 程序文档 六、 文章目录 一 概要 基于STM32的多功能智能密码锁控制设计是一种用STM32微控制器开发的系统&#xff0c;用于控制和管理密码…

手机桌面待办事项APP推荐,手机上可使用哪些待办事项APP

生活中&#xff0c;无论你是一名专业人士&#xff0c;学生&#xff0c;还是家庭主妇&#xff0c;总有各种各样的任务等待着你&#xff0c;有时候需要额外的工具来提醒和管理这些待办事项。手机上的待办事项APP软件成为了这个任务的好帮手&#xff0c;为我们提供了快速、方便的方…

spring boot配置ssl(多cer格式)保姆级教程

1. 准备cer格式的证书&#xff1b; 2. 合并cer证书并转化成jks格式的证书 为啥有这一步&#xff0c;因为cer证书配置在spring boot项目中&#xff0c;项目启动不起来。如果有大佬想指导一下可以给我留言&#xff0c;在此先谢过大佬。 1&#xff09;先创建一个jks格式的证…

Kotlin(八) 数据类、单例

目录 一&#xff1a;创建数据类 二&#xff1a;单例类 一&#xff1a;创建数据类 和Java的不同&#xff0c;kotlin的数据类比较简单&#xff0c;New→Kotlin File/Class&#xff0c;在弹出的对话框中输入“Book”&#xff0c;创建类型选择“Data”。如图&#xff1a; 然后编…

Keil Map信息解析

基本功能&#xff1a; 1.在Keil里面&#xff0c;通过App.Map复制所有信息。然后解析剪辑版内容。 2.随意输入一个函数内存地址&#xff0c;即可遍历出该内存地址属于哪个.c或者函数名。或者能遍历出变量。 强化功能&#xff1a; 1.通过Keil5 命令 Save xxxxxxx\1.Hex 0x200173…

为什么我觉得Rust比C++复杂得多?

为什么我觉得Rust比C复杂得多&#xff1f; Rust自学确实有一定门槛&#xff0c;很多具体问题解决起来搜索引擎也不太帮的上忙&#xff0c;会出现卡住的情况&#xff0c;卡的时间长了就放弃了。最近很多小伙伴找我&#xff0c;说想要一些c语言资料&#xff0c;然后我根据自己从…

SpringCloudAlibaba实战-nacos集群部署

写在前面&#xff1a;在学习阶段&#xff0c;我们想快速学习SpringCloudAlibaba功能&#xff0c;但总是花费大量时间跟着视频或博客做组件配置。由于版本的更迭&#xff0c;我们学习时的组件版本很可能和作者的不一致&#xff0c;又或者是各自环境不一&#xff0c;只能一坑又一…

kubernets挑战实验一(| pvc | pod | services | rolebinding | context)

参考&#xff1a; https://kubernetes.io/zh-cn/docs/tasks/access-application-cluster/configure-access-multiple-clusters/ Deploy the given architecture diagram for implementing a Jekyll SSG. 1、创建pvc使用,以下条件限制 Storage Request: 1Gi Access modes: Re…

【WSL 2】Windows10 安装 WSL 2,并配合 Windows Terminal 和 VSCode 使用

【WSL 2】Windows10 安装 WSL 2&#xff0c;并配合 Windows Terminal 和 VSCode 使用 1 安装 Windows Terminal2 安装 WSL 23 在 Windows 文件资源管理器中打开 WSL 项目4 在 VSCode 中使用 WSL 24.1 必要准备4.2 从 VSCode 中 Connect WSL4.3 从 Linux 中打开 VSCode 1 安装 W…

✔ ★【备战实习(面经+项目+算法)】 10.29学习

✔ ★【备战实习&#xff08;面经项目算法&#xff09;】 坚持完成每天必做如何找到好工作1. 科学的学习方法&#xff08;专注&#xff01;效率&#xff01;记忆&#xff01;心流&#xff01;&#xff09;2. 每天认真完成必做项&#xff0c;踏实学习技术 认真完成每天必做&…