Springboot集成SSE消息推送

SSE介绍
        SSE(Server-Sent Events)的全称是服务器推送事件,它是一种基于 HTTP 协议的实时通信技术,用于在客户端和服务器之间建立持久、单向的链接,允许服务器向客户端发送异步消息。

        了解 websocket 的小伙伴,可能也知道它也是长连接,可以推送信息,但是它们有一个明显的区别:SSE 是单通道,只能服务端向客户端发消息;而 webscoket 是双通道,客户端和服务端可以相互发消息

SSE
        1. 单向通信:SSE是一种服务器推送技术,服务器可以向客户端发送消息,但客户端无法主动发送消息到服务器。
        2. 持久连接:SSE在单个HTTP连接上建立持久连接,服务器可以多次发送事件到客户端,客户端只需保持连接不关闭。

        3 . 实时性:适用于需要从服务器获取实时更新的场景,如即时通知、实时数据更新

Websocket
        1. 双向通信:WebSocket提供了全双工通信,客户端和服务器可以双向发送消息,不需要等待请求-响应。
        2. 持久连接:WebSocket在单个TCP连接上实现持久连接,适用于双向通信的场景。
        3. 实时性:非常适合实时性要求高的应用,如在线游戏、实时聊天等。

使用方法:

Maven依赖

springboot中封装了sse代码,不需要额外的依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.21</version>
</dependency>

SseEmitterUtil工具类

import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;/*** SSE长链接工具类*/
@Slf4j
public class SseEmitterUtil {/*** 使用map对象,便于根据userId来获取对应的SseEmitter,或者放redis里面*/private final static Map<Long, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();public static SseEmitter connect(Long userId) {// 设置超时时间,0表示不过期。默认30S,超时时间未完成会抛出异常:AsyncRequestTimeoutExceptionSseEmitter sseEmitter = new SseEmitter(0L);// 注册回调sseEmitter.onCompletion(completionCallBack(userId));sseEmitter.onError(errorCallBack(userId));sseEmitter.onTimeout(timeoutCallBack(userId));sseEmitterMap.put(userId, sseEmitter);log.info("创建新的 SSE 连接,当前用户 {}, 连接总数 {}", userId, sseEmitterMap.size());return sseEmitter;}/*** 给制定用户发送消息** @param userId 指定用户名* @param sseMessage 消息体*/public static void sendMessage(Long userId, String sseMessage) {if (sseEmitterMap.containsKey(userId)) {try {sseEmitterMap.get(userId).send(sseMessage);log.info("用户 {} 推送消息 {}", userId, sseMessage);} catch (IOException e) {log.error("用户 {} 推送消息异常", userId, e);removeUser(userId);}} else {log.error("消息推送 用户 {} 不存在,链接总数 {}", userId, sseEmitterMap.size());}}/*** 群发消息*/public static void batchSendMessage(String message, List<Long> ids) {ids.forEach(userId -> sendMessage(userId, message));}/*** 群发所有人*/public static void batchSendMessage(String message) {sseEmitterMap.forEach((k, v) -> {try {v.send(message, MediaType.APPLICATION_JSON);} catch (IOException e) {log.error("用户 {} 推送异常", k, e);removeUser(k);}});}/*** 移除用户连接** @param userId 用户 ID*/public static void removeUser(Long userId) {if (sseEmitterMap.containsKey(userId)) {sseEmitterMap.get(userId).complete();sseEmitterMap.remove(userId);log.info("移除用户 {}, 剩余连接 {}", userId, sseEmitterMap.size());} else {log.error("消息推送 用户 {} 已被移除,剩余连接 {}", userId, sseEmitterMap.size());}}/*** 获取当前连接信息** @return 所有的连接用户*/public static List<Long> getIds() {return new ArrayList<>(sseEmitterMap.keySet());}/*** 获取当前的连接数量** @return 当前的连接数量*/public static int getUserCount() {return sseEmitterMap.size();}private static Runnable completionCallBack(Long userId) {return () -> {log.info("用户 {} 结束连接", userId);};}private static Runnable timeoutCallBack(Long userId) {return () -> {log.error("用户 {} 连接超时", userId);removeUser(userId);};}private static Consumer<Throwable> errorCallBack(Long userId) {return throwable -> {log.error("用户 {} 连接异常", userId);removeUser(userId);};}
}
Controller层
import cn.hutool.json.JSONUtil;
import com.geb.common.utils.SseEmitterUtil;
import com.geb.domain.SseMessage;
import com.geb.domain.WdAgent;
import io.grpc.internal.JsonUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;@Slf4j
@RestController
@RequestMapping("/sse")
public class SseController {/*** 用于创建连接*/@GetMapping("/connect/{userId}")public SseEmitter connect(@PathVariable Long userId) {return SseEmitterUtil.connect(userId);}/*** 关闭连接*/@GetMapping("/close/{userid}")public void close(@PathVariable("userid") Long userid) {SseEmitterUtil.removeUser(userid);}@GetMapping("/sse")public void sse(){// 构建推送消息体SseMessage sseMessage = new SseMessage();sseMessage.setId(1L);sseMessage.setMsg("SSE测试发送消息");sseMessage.setAgentName("测试智能体推送消息");SseEmitterUtil.sendMessage(1L, JSONUtil.toJsonStr(sseMessage));}
}
测试

这里使用postman测试,输入链接点击运行后,会自动处于链接状态

我们发送一条消息测试,我这里对消息实体进行了封装,在postman新建窗口输入测试请求

消息发送完毕,在postman就可以看到刚刚的消息

可以多发送几条消息看看效果,关闭链接调用关闭的接口即可。

这是个本地的测试,如果使用了Nginx反向代理和Gateway网关,具体的使用情况需要根据测试或者生产环境进行相应的配置。

文章参考:Springboot集成SSE消息推送_springboot 集成sse推送-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/29808.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SAP BC 换了logo后,其他人的logo都已经换了,但是其中有一台就是PRD 显示DEV的logo,从smw0上下载的是PRD

昨天终于发现是缓存的问题 GUI登录后 选项-本地数据-缓存 删除本地缓存文件&#xff0c;问题解决了

机器学习课程复习——聚类算法

Q:什么是硬聚类,什么是软聚类? 如果一个样本只能属于一个类,则称为硬聚类(hard clustering); 如果一个样本可以属于多个类,则称为软聚类(soft clustering)。 Q:聚类和分类的区别? 聚类分类学习类型无监督学习方法 不需要事先标记的数据 通过发现数据中的模式或结构来组…

sprintboot依赖管理和自动配置

springboot依赖管理和自动配置 依赖管理和自动配置依赖管理什么是依赖管理修改自动仲裁/默认版本号 starter场景启动器starter场景启动器基本介绍官方提供的starter第三方starter 自动配置自动配置基本介绍SpringBoot自动配置了哪些?如何修改默认配置如何修改默认扫描包结构re…

深入解析 iOS 应用启动过程:main() 函数前的四大步骤

深入解析 iOS 应用启动过程&#xff1a;main() 函数前的四大步骤 背景描述&#xff1a;使用 Objective-C 开发的 iOS 或者 MacOS 应用 在开发 iOS 应用时&#xff0c;我们通常会关注 main() 函数及其之后的执行逻辑&#xff0c;但在 main() 函数之前&#xff0c;系统已经为我们…

C++及cmake语法介绍

c/cmake学习 文章目录 c/cmake学习1. c1.1 基本模型1.1.1 for循环1.1.2 main函数1.1.2 带参数函数编译函数 2. CMAKE2.1 相关命令2.1.1 编译基本命令2.1.2 动态库静态库编译2.1.3 消息输出2.1.4 cmake变量常见参数1. 设置构建类型2. 设置编译器标志3. 指定编译器4. 设置安装路径…

机器学习_PCA

目录 一、概念 二、原理 三、步骤 四、实战 1、数据处理——转rgb为灰度图像 2、手动实现pca降维 3、查看信息保留数量 4、调用第三方库实现pca降维 五、小结 引入&#xff1a; 当说黄河五路和渤海三路交叉口的时候&#xff0c;这些路就类似于我们说的坐标系。而城市中的…

高等数学笔记(三):导数

一、导数概念 1.1 导数的定义 1.1.1 函数在一点处的导数与导函数 1.1.2 单侧导数 1.2 导数的几何意义 1.3 函数可导性与连续性的关系 二、函数的求导法则 2.1 函数的和、差、积、商的求导法则 2.2 反函数的求导法则 2.3 复合函数的求导法则 2.4 基本求导法则与导数公式 三…

必看!!! 2024 最新 PG 硬核干货大盘点(上)

PGConf.dev&#xff08;原名PGCon&#xff0c;从2007年至2023年&#xff09;首次在风景如画的加拿大温哥华市举办。此次重新定位的会议带来了全新的视角和多项新的内容&#xff0c;参会体验再次升级。尽管 PGCon 历来更侧重于开发者&#xff0c;吸引来自世界各地的资深开发者、…

深入理解并打败C语言难关之一————指针(5)(最终篇)

前言&#xff1a; 仔细一想&#xff0c;小编已经把指针的大部分内容都说了一遍了&#xff0c;小编目前有点灵感枯竭了&#xff0c;今天决定就结束指针这一大山&#xff0c;可能很多小编并没有提到过&#xff0c;如果有些没说的小编会在后续博客进行补充道&#xff0c;不多废话了…

服务器数据恢复—NTFS文件系统下双循环riad5数据恢复案例

服务器存储数据恢复环境&#xff1a; EMC CX4-480存储&#xff0c;该存储中有10块硬盘&#xff0c;其中有3块磁盘为掉线磁盘&#xff0c;另外7块磁盘组成一组RAID5磁盘阵列。运维人员在处理掉线磁盘时只添加新的硬盘做rebuild&#xff0c;并没有将掉线的硬盘拔掉&#xff0c;所…

ARCGIS 如何对河流等线条图形进行Smooth处理——具有多个断点高阶版

1.线转点折点&#xff08;注意&#xff01;很重要&#xff0c;不是线转点&#xff09; 2.点转线步骤 ## 3 线的融合 2.1 新建Filed 》短精度类型》利用选择工具的 线文件。全选同一条河流点&#xff0c;进入Tabel的选择界面。给同一条河赋值同一个值。 大功告成&#xff01;…

文章解读与仿真程序复现思路——电网技术EI\CSCD\北大核心《计及多类型储能调频容量动态申报的电能量与调频市场联合出清方法研究》

本专栏栏目提供文章与程序复现思路&#xff0c;具体已有的论文与论文源程序可翻阅本博主免费的专栏栏目《论文与完整程序》 论文与完整源程序_电网论文源程序的博客-CSDN博客https://blog.csdn.net/liang674027206/category_12531414.html 电网论文源程序-CSDN博客电网论文源…

深入探究RTOS的任务调度

阅读引言&#xff1a; 此文将会从一个工程文件&#xff0c; 一步一步的分析RTOS的任务调度实现&#xff0c; 这里选用FreeRTOS分析&#xff0c; 别的也差不多的&#xff0c; 可能在细节上有少许不一样。 目录 1&#xff0c; 常见嵌入式实时操作系统 2&#xff0c; 任务调度的…

【机器学习】第11章 神经网络与深度学习(重中之重)

一、概念 1.神经元模型 &#xff08;1&#xff09;神经网络的基本组成单位 &#xff08;2&#xff09;生物上&#xff0c;每个神经元通过树突接受来自其他被激活神经元的信息&#xff0c;通过轴突释放出来的化学递质改变当前神经元内的电位。当神经元内的电位累计到一个水平时…

Kubernetes Dashboard

Dashboard Dashboard 的项目网站&#xff0c;可以查看说明文档和基本的使用情况。 下载yaml wget https://raw.githubusercontent.com/kubernetes/dashboard/v2.6.0/aio/deploy/recommended.yaml注意需要修改镜像&#xff0c;不然可能会拉去不下来镜像 cat recommended.yaml…

人工智能--自然语言处理NLP概述

欢迎来到 Papicatch的博客 目录 &#x1f349;引言 &#x1f348;基本概念 &#x1f348;核心技术 &#x1f348;常用模型和方法 &#x1f348;应用领域 &#x1f348;挑战和未来发展 &#x1f349;案例分析 &#x1f348;机器翻译中的BERT模型 &#x1f348;情感分析在…

函数(下) C语言

函数下 嵌套调用和链式访问1. 嵌套调用2. 链式访问 函数的声明和定义1. 单个文件2. 多个文件3. static 和 extern3.1 static 嵌套调用和链式访问 1. 嵌套调用 嵌套调用就是函数之间的互相调用&#xff0c;每个函数就像⼀个乐高零件&#xff0c;正是因为多个乐高的零件互相无缝…

Nginx缓存之代理缓存配置

Nginx 的缓存功能是集成在代理模块中的&#xff0c;当启用缓存功能时&#xff0c;Nginx 将请求返回的响应数据持久化在服务器磁盘中&#xff0c;响应数据缓存的相关元数据、有效期及缓存内容等信息将被存储在定义的共享内存中。当收到客户端请求时&#xff0c;Nginx 会在共享内…

当同时绑定mousedown和mouseup时,不执行mouseup

问题描述&#xff1a; 当我同时给一个标签添加mousedown和mouseup两个鼠标事件&#xff0c;点击span的时候会触发mousedown事件&#xff0c;但是不会执行mouseup事件&#xff1b;但是注释图二中的setCloudControl方法又能触发mouseup。 后来查阅资料&#xff0c;发现是在封装a…

[算法刷题积累] 两数之和以及进阶引用

两数之和很经典&#xff0c;通常对于首先想到的就是暴力的求解&#xff0c;当然这没有问题&#xff0c;但是我们如果想要追求更优秀算法&#xff0c;就需要去实现更加简便的复杂度。 这里就要提到我们的哈希表法: 我们可以使用unordered_map去实现&#xff0c;也可以根据题目&a…