Springboot集成SSE消息推送

SSE介绍
        SSE(Server-Sent Events)的全称是服务器推送事件,它是一种基于 HTTP 协议的实时通信技术,用于在客户端和服务器之间建立持久、单向的链接,允许服务器向客户端发送异步消息。

        了解 websocket 的小伙伴,可能也知道它也是长连接,可以推送信息,但是它们有一个明显的区别:SSE 是单通道,只能服务端向客户端发消息;而 webscoket 是双通道,客户端和服务端可以相互发消息

SSE
        1. 单向通信:SSE是一种服务器推送技术,服务器可以向客户端发送消息,但客户端无法主动发送消息到服务器。
        2. 持久连接:SSE在单个HTTP连接上建立持久连接,服务器可以多次发送事件到客户端,客户端只需保持连接不关闭。

        3 . 实时性:适用于需要从服务器获取实时更新的场景,如即时通知、实时数据更新

Websocket
        1. 双向通信:WebSocket提供了全双工通信,客户端和服务器可以双向发送消息,不需要等待请求-响应。
        2. 持久连接:WebSocket在单个TCP连接上实现持久连接,适用于双向通信的场景。
        3. 实时性:非常适合实时性要求高的应用,如在线游戏、实时聊天等。

使用方法:

Maven依赖

springboot中封装了sse代码,不需要额外的依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.21</version>
</dependency>

SseEmitterUtil工具类

import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;/*** SSE长链接工具类*/
@Slf4j
public class SseEmitterUtil {/*** 使用map对象,便于根据userId来获取对应的SseEmitter,或者放redis里面*/private final static Map<Long, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();public static SseEmitter connect(Long userId) {// 设置超时时间,0表示不过期。默认30S,超时时间未完成会抛出异常:AsyncRequestTimeoutExceptionSseEmitter sseEmitter = new SseEmitter(0L);// 注册回调sseEmitter.onCompletion(completionCallBack(userId));sseEmitter.onError(errorCallBack(userId));sseEmitter.onTimeout(timeoutCallBack(userId));sseEmitterMap.put(userId, sseEmitter);log.info("创建新的 SSE 连接,当前用户 {}, 连接总数 {}", userId, sseEmitterMap.size());return sseEmitter;}/*** 给制定用户发送消息** @param userId 指定用户名* @param sseMessage 消息体*/public static void sendMessage(Long userId, String sseMessage) {if (sseEmitterMap.containsKey(userId)) {try {sseEmitterMap.get(userId).send(sseMessage);log.info("用户 {} 推送消息 {}", userId, sseMessage);} catch (IOException e) {log.error("用户 {} 推送消息异常", userId, e);removeUser(userId);}} else {log.error("消息推送 用户 {} 不存在,链接总数 {}", userId, sseEmitterMap.size());}}/*** 群发消息*/public static void batchSendMessage(String message, List<Long> ids) {ids.forEach(userId -> sendMessage(userId, message));}/*** 群发所有人*/public static void batchSendMessage(String message) {sseEmitterMap.forEach((k, v) -> {try {v.send(message, MediaType.APPLICATION_JSON);} catch (IOException e) {log.error("用户 {} 推送异常", k, e);removeUser(k);}});}/*** 移除用户连接** @param userId 用户 ID*/public static void removeUser(Long userId) {if (sseEmitterMap.containsKey(userId)) {sseEmitterMap.get(userId).complete();sseEmitterMap.remove(userId);log.info("移除用户 {}, 剩余连接 {}", userId, sseEmitterMap.size());} else {log.error("消息推送 用户 {} 已被移除,剩余连接 {}", userId, sseEmitterMap.size());}}/*** 获取当前连接信息** @return 所有的连接用户*/public static List<Long> getIds() {return new ArrayList<>(sseEmitterMap.keySet());}/*** 获取当前的连接数量** @return 当前的连接数量*/public static int getUserCount() {return sseEmitterMap.size();}private static Runnable completionCallBack(Long userId) {return () -> {log.info("用户 {} 结束连接", userId);};}private static Runnable timeoutCallBack(Long userId) {return () -> {log.error("用户 {} 连接超时", userId);removeUser(userId);};}private static Consumer<Throwable> errorCallBack(Long userId) {return throwable -> {log.error("用户 {} 连接异常", userId);removeUser(userId);};}
}
Controller层
import cn.hutool.json.JSONUtil;
import com.geb.common.utils.SseEmitterUtil;
import com.geb.domain.SseMessage;
import com.geb.domain.WdAgent;
import io.grpc.internal.JsonUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;@Slf4j
@RestController
@RequestMapping("/sse")
public class SseController {/*** 用于创建连接*/@GetMapping("/connect/{userId}")public SseEmitter connect(@PathVariable Long userId) {return SseEmitterUtil.connect(userId);}/*** 关闭连接*/@GetMapping("/close/{userid}")public void close(@PathVariable("userid") Long userid) {SseEmitterUtil.removeUser(userid);}@GetMapping("/sse")public void sse(){// 构建推送消息体SseMessage sseMessage = new SseMessage();sseMessage.setId(1L);sseMessage.setMsg("SSE测试发送消息");sseMessage.setAgentName("测试智能体推送消息");SseEmitterUtil.sendMessage(1L, JSONUtil.toJsonStr(sseMessage));}
}
测试

这里使用postman测试,输入链接点击运行后,会自动处于链接状态

我们发送一条消息测试,我这里对消息实体进行了封装,在postman新建窗口输入测试请求

消息发送完毕,在postman就可以看到刚刚的消息

可以多发送几条消息看看效果,关闭链接调用关闭的接口即可。

这是个本地的测试,如果使用了Nginx反向代理和Gateway网关,具体的使用情况需要根据测试或者生产环境进行相应的配置。

文章参考:Springboot集成SSE消息推送_springboot 集成sse推送-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/29808.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SAP BC 换了logo后,其他人的logo都已经换了,但是其中有一台就是PRD 显示DEV的logo,从smw0上下载的是PRD

昨天终于发现是缓存的问题 GUI登录后 选项-本地数据-缓存 删除本地缓存文件&#xff0c;问题解决了

机器学习课程复习——聚类算法

Q:什么是硬聚类,什么是软聚类? 如果一个样本只能属于一个类,则称为硬聚类(hard clustering); 如果一个样本可以属于多个类,则称为软聚类(soft clustering)。 Q:聚类和分类的区别? 聚类分类学习类型无监督学习方法 不需要事先标记的数据 通过发现数据中的模式或结构来组…

sprintboot依赖管理和自动配置

springboot依赖管理和自动配置 依赖管理和自动配置依赖管理什么是依赖管理修改自动仲裁/默认版本号 starter场景启动器starter场景启动器基本介绍官方提供的starter第三方starter 自动配置自动配置基本介绍SpringBoot自动配置了哪些?如何修改默认配置如何修改默认扫描包结构re…

深入解析 iOS 应用启动过程:main() 函数前的四大步骤

深入解析 iOS 应用启动过程&#xff1a;main() 函数前的四大步骤 背景描述&#xff1a;使用 Objective-C 开发的 iOS 或者 MacOS 应用 在开发 iOS 应用时&#xff0c;我们通常会关注 main() 函数及其之后的执行逻辑&#xff0c;但在 main() 函数之前&#xff0c;系统已经为我们…

创建最基本的web服务器-http模块

在Node.js中&#xff0c;可以使用内置的http模块来创建一个最基本的web服务器。以下是一个简单的示例&#xff0c;它创建了一个HTTP服务器&#xff0c;该服务器监听一个端口&#xff0c;并在接收到请求时发送一个“Hello, World!”的响应。 // 引入http模块 const http requi…

leetcode 二分查找·系统掌握 寻找比目标字母大的最小字母

题目&#xff1a; 给你一个字符数组 letters&#xff0c;该数组按非递减顺序排序&#xff0c;以及一个字符 target。letters 里至少有两个不同的字符。 返回 letters 中大于 target 的最小的字符。如果不存在这样的字符&#xff0c;则返回 letters 的第一个字符。 题解&…

C++及cmake语法介绍

c/cmake学习 文章目录 c/cmake学习1. c1.1 基本模型1.1.1 for循环1.1.2 main函数1.1.2 带参数函数编译函数 2. CMAKE2.1 相关命令2.1.1 编译基本命令2.1.2 动态库静态库编译2.1.3 消息输出2.1.4 cmake变量常见参数1. 设置构建类型2. 设置编译器标志3. 指定编译器4. 设置安装路径…

机器学习_PCA

目录 一、概念 二、原理 三、步骤 四、实战 1、数据处理——转rgb为灰度图像 2、手动实现pca降维 3、查看信息保留数量 4、调用第三方库实现pca降维 五、小结 引入&#xff1a; 当说黄河五路和渤海三路交叉口的时候&#xff0c;这些路就类似于我们说的坐标系。而城市中的…

高等数学笔记(三):导数

一、导数概念 1.1 导数的定义 1.1.1 函数在一点处的导数与导函数 1.1.2 单侧导数 1.2 导数的几何意义 1.3 函数可导性与连续性的关系 二、函数的求导法则 2.1 函数的和、差、积、商的求导法则 2.2 反函数的求导法则 2.3 复合函数的求导法则 2.4 基本求导法则与导数公式 三…

read code and make summer (python)

read code and make summer ==标题==:语法==标题==:类的定义==标题==:继承==标题==:多态==标题==:__all__ = [create_dataset, create_dataloader]==标题==:yield==标题==: f-string(格式化字符串)==标题==:getattr()==标题==:logging==标题==:seed==标题==:slice…

必看!!! 2024 最新 PG 硬核干货大盘点(上)

PGConf.dev&#xff08;原名PGCon&#xff0c;从2007年至2023年&#xff09;首次在风景如画的加拿大温哥华市举办。此次重新定位的会议带来了全新的视角和多项新的内容&#xff0c;参会体验再次升级。尽管 PGCon 历来更侧重于开发者&#xff0c;吸引来自世界各地的资深开发者、…

Nginx常用配置、反向代理

目录 1. 常用配置 基本设置 HTTP配置 虚拟主机配置 2. 高级配置 反向代理配置 SSL/TLS配置 负载均衡配置 1. 常用配置 基本设置 user nginx; worker_processes auto; error_log /var/log/nginx/error.log warn; pid /var/run/nginx.pid;user nginx;: 指定Nginx worker…

深入理解并打败C语言难关之一————指针(5)(最终篇)

前言&#xff1a; 仔细一想&#xff0c;小编已经把指针的大部分内容都说了一遍了&#xff0c;小编目前有点灵感枯竭了&#xff0c;今天决定就结束指针这一大山&#xff0c;可能很多小编并没有提到过&#xff0c;如果有些没说的小编会在后续博客进行补充道&#xff0c;不多废话了…

服务器数据恢复—NTFS文件系统下双循环riad5数据恢复案例

服务器存储数据恢复环境&#xff1a; EMC CX4-480存储&#xff0c;该存储中有10块硬盘&#xff0c;其中有3块磁盘为掉线磁盘&#xff0c;另外7块磁盘组成一组RAID5磁盘阵列。运维人员在处理掉线磁盘时只添加新的硬盘做rebuild&#xff0c;并没有将掉线的硬盘拔掉&#xff0c;所…

ARCGIS 如何对河流等线条图形进行Smooth处理——具有多个断点高阶版

1.线转点折点&#xff08;注意&#xff01;很重要&#xff0c;不是线转点&#xff09; 2.点转线步骤 ## 3 线的融合 2.1 新建Filed 》短精度类型》利用选择工具的 线文件。全选同一条河流点&#xff0c;进入Tabel的选择界面。给同一条河赋值同一个值。 大功告成&#xff01;…

文章解读与仿真程序复现思路——电网技术EI\CSCD\北大核心《计及多类型储能调频容量动态申报的电能量与调频市场联合出清方法研究》

本专栏栏目提供文章与程序复现思路&#xff0c;具体已有的论文与论文源程序可翻阅本博主免费的专栏栏目《论文与完整程序》 论文与完整源程序_电网论文源程序的博客-CSDN博客https://blog.csdn.net/liang674027206/category_12531414.html 电网论文源程序-CSDN博客电网论文源…

深入探究RTOS的任务调度

阅读引言&#xff1a; 此文将会从一个工程文件&#xff0c; 一步一步的分析RTOS的任务调度实现&#xff0c; 这里选用FreeRTOS分析&#xff0c; 别的也差不多的&#xff0c; 可能在细节上有少许不一样。 目录 1&#xff0c; 常见嵌入式实时操作系统 2&#xff0c; 任务调度的…

HDFS 面试题(一)

1. 简述什么是HDFS&#xff0c;以及HDFS作用 &#xff1f; HDFS&#xff0c;全称为Hadoop Distributed File System&#xff0c;即Hadoop分布式文件系统&#xff0c;是一个分布式文件系统&#xff0c;由Apache Hadoop项目的一部分。它被设计用来在廉价的硬件上运行&#xff0c…

练习题-18 计算两个积分

计算积分 I ∫ R e − t 4 d t . I\int_{\mathbb{R}} e^{-t^4} dt. I∫R​e−t4dt. 解&#xff1a;令 x t 4 xt^4 xt4. 则 I 2 ∫ 0 ∞ e − x ⋅ 1 4 ⋅ x − 3 / 4 d x 1 2 Γ ( 1 4 ) I 2\int_0^\infty e^{-x} \cdot \frac{1}{4}\cdot x^{-3/4} dx\frac{1}{2} \Gamma(…

SQLite Delete 语句

SQLite Delete 语句 SQLite 的 DELETE 语句用于从表中删除数据。它是 SQL 数据库管理中非常基础且重要的操作之一。在使用 DELETE 语句时&#xff0c;可以删除表中的特定行&#xff0c;也可以删除整个表的数据。本文将详细介绍 SQLite 中的 DELETE 语句&#xff0c;包括其语法…