Springboot整合SSE实现实时消息推送

SSE详细介绍传送门:SSE实时消息推送

简单描述一下SSE推送在实际项目中应用的常见场景

1,项目页面中有消息通知板块,当信息有变化时,只有手动刷新页面,才会看到最新的数据,这里可以采用SSE技术实时推送最新消息
.
2,大屏数据,这种场景是可以用SSE进行推送,但是需要注意的是SSE是单向的服务端向前端推数据,一般要求的是大屏基本没有查询框条件这种,比较合适。

注意点:如果对于实时数据要求很高并且连接要求做到安全稳定,这里推荐用WebSocket,一般来说对于数据量小,并发连接不是很高要求的情况下,SSE足够,用而且SSE的配置对于前后端都比较简单,但是WebSocket的配置对于后端来说需要花费比较多的时间去完善,而且WebSocket是比较消耗服务器资源和网络带宽资源的,另外一个,如果项目中运维配置了代理服务器的话,可能代理服务器也要配置一些支持WebSocket的属性,总体来说WebSocket配置的位置比较多,容易出现各种坑bug,这里注意一下即可。

话不多说,总结一下Springboot整合SSE需要的步骤如下:

1,编写SSE的服务类:主要包括建立连接、关闭连接、异常连接、心跳检测、推送消息等
.
2,controller层写入SSE连接和关闭接口
.
3,在所需要的业务模块中直接调用SSE服务类中推送消息功能即可

SSE步骤简单,无需导入maven依赖,踩坑bug少,主要是SSE内部支持断线重连,爽爽爽

1,SSE服务类

package com.bosera.salesioc.home.sse;
import com.alibaba.fastjson.JSONObject;
import com.bosera.salesioc.domain.home.vo.MessageVO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;@Slf4j
@Component
public class SseEmitterServer{private static final ConcurrentHashMap<String, Map<String,SseEmitter>> sseEmitterPool = new ConcurrentHashMap<>();private static final ConcurrentHashMap<String, Timer>  headerPool = new ConcurrentHashMap<>();public  static ConcurrentHashMap<String, Map<String, SseEmitter>> getSseEmitterPool(){return sseEmitterPool;}/*** 建立连接*/public  SseEmitter connect(String  userCode, String userId){log.info("******************开始建立连接*****************");//设置超时时间,0表示不过期,默认是30秒,超过时间未完成会抛出异常SseEmitter sseemitter = new SseEmitter(0L);//注册回调sseemitter.onCompletion(completionCallBack(userCode,userId));sseemitter.onError(errorCallBack(userCode,userId));sseemitter.onTimeout(timeoutCallBack(userCode,userId));sseEmitterPool.computeIfAbsent(userCode, k -> new ConcurrentHashMap<>()).put(userId, sseemitter);// 开启心跳活跃startHeartbeat(sseemitter,userId);return sseemitter;}/*** 关闭当前连接*/public void complete(String userCode, String userId){Map<String, SseEmitter> map = sseEmitterPool.get(userCode);if (map != null)map.get(userId).complete();}/*** 关闭所有连接*/public void completeAll(){if(!sseEmitterPool.isEmpty()){for (Map.Entry<String, Map<String, SseEmitter>> entry : sseEmitterPool.entrySet()) {Map<String, SseEmitter> userIdMap = entry.getValue();if(!userIdMap.isEmpty()){for (Map.Entry<String, SseEmitter> userIdEntry : userIdMap.entrySet()) {userIdEntry.getValue().complete();}}}sseEmitterPool.clear();}}private  Runnable completionCallBack(String userCode, String userId) {return () -> {removeUser(userCode,userId);log.info("{}结束连接:{}",userCode,userId);};}private  Runnable timeoutCallBack(String userCode, String userId){return ()->{removeUser(userCode,userId);log.error("{}连接超時:{}",userCode,userId);};}private  Consumer<Throwable> errorCallBack(String userCode, String userId){return throwable -> {log.error("{}连接异常:{}",userCode,userId);stopHeartbeat(userId);};}/*** 推送消息*/public  void sendMessage(String userCode, MessageVO message){Map<String, SseEmitter> map = sseEmitterPool.get(userCode);if (map != null) {for (Map.Entry<String, SseEmitter> entry : map.entrySet()) {try {// 发送事件entry.getValue().send(JSONObject.toJSONString(message));}catch (Exception e){log.error("{}连接信息:{}, 错误消息:{}",userCode,entry.getKey(),e.getMessage());}}}}private void removeUser(String userCode, String userId){try {Map<String, SseEmitter> map = sseEmitterPool.get(userCode);if (map != null) {map.remove(userId);// 如果该用户的所有会话都已关闭,则移除整个映射if (map.isEmpty())sseEmitterPool.remove(userCode);}// 关闭心跳stopHeartbeat(userId);}catch (Exception e){log.error("关闭连接异常{}",e.getMessage());}}/*** 开启心跳*/public void startHeartbeat(SseEmitter sseemitter, String userId) {Timer heartbeatTimer = new Timer();headerPool.put(userId,heartbeatTimer);heartbeatTimer.schedule(new TimerTask() {@Overridepublic void run() {if (Objects.nonNull(headerPool.get(userId))) {// 发送心跳:保持长连接try {sseemitter.send("connect active");} catch (Exception e) {log.error("connect active error");}}}}, 25000, 25000);}/*** 关闭心跳* @param userId*/public void stopHeartbeat(String userId) {Timer timer = headerPool.get(userId);if (timer!= null)timer.cancel();headerPool.remove(userId);}
}

推送的消息可以统一定义一个类来封装信息
2,消息推送响应体

/*** @Author xiaozq* @Date 2024/2/21* @Description: 消息推送响应体*/
public class MessageVO<T> {// 主题:不同位置推送的内容不同private String topic;// 推送消息private T data;public void setTopic(String topic) {this.topic = topic;}public void setData(T data) {this.data = data;}public String getTopic() {return topic;}public T getData() {return data;}
}

3,controller层编写连接和关闭接口

@RestController
@RequestMapping("/sse")
@Slf4j
public class SSEController{@Autowiredprivate SseEmitterServer sseEmitterServer;/*** 用于创建连接*/@GetMapping(value = "/connect/{userCode}/{userId}",produces = MediaType.TEXT_EVENT_STREAM_VALUE)public SseEmitter connect(@PathVariable("userCode") String userCode, @PathVariable("userId") String userId){return sseEmitterServer.connect(userCode, userId);}/*** 关闭连接*/@GetMapping(value = "/close/{userCode}/{userId}")public void close(@PathVariable("userCode") String userCode,@PathVariable("userId") String userId ) {sseEmitterServer.complete(userCode, userId);}}

4,业务中实际应用:推送消息

@Autowired
SseInfoService  sseInfoService;
private void handlerMessageInform() {ConcurrentHashMap<String, Map<String, SseEmitter>> sessionPool = SseEmitterServer.getSseEmitterPool();for (Map.Entry<String, Map<String, SseEmitter>> entry : sessionPool.entrySet()) {// 封装消息MessageVO<List<MessageNotificationVO>> messageVO = new MessageVO();messageVO.setTopic(TopicTypeEnum.MESSAGE_INFORM.getTopic());messageVO.setData(messageService.getMessageList(request));// 推送消息sseEmitterServer.sendMessage(entry.getKey(), messageVO);}}

在实践过程中存在的问题:

1,报错504 gateway timeout:这里主要是原项目中配置了响应超时时间,不支持长连接,这里的做法是心跳活跃,保证连接不会被掐断,可以写一个定时任务,每天晚上定时去关闭所有连接,第二天用新的连接,这样可以尽量保证内存的连接数不会过多占用内存,因为夜深人静的时候谁还会打开web项目工作啊,哈哈太卷了吧,所以把时间定在晚上最好。
.
如果项目是集群模式的话,上述代码就得改造了,建议是把消息推送这块单独抽出一个微服务模块来,这样子保证所有的连接统一走单独的一个服务,因为SSE不是双向的,既然是单项连接,与后端集群下的其中一个服务建立连接产生的IO流这是只属于当前服务的本地IO,关闭IO只能连接对应的这台服务去关闭,否则关闭失效。总之,考虑的点还有很多,一般情况下,SSE够用啦

总体来说,应用是比较简单的,涉及到消息实时推送相关的业务,可以尝试SSE

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/711361.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Docker技术概论(1):Docker与虚拟化技术比较

Docker技术概论&#xff08;1&#xff09; Docker与虚拟化技术比较 - 文章信息 - Author: 李俊才 (jcLee95) Visit me at: https://jclee95.blog.csdn.netMy WebSite&#xff1a;http://thispage.tech/Email: 291148484163.com. Shenzhen ChinaAddress of this article:https:…

深入解析Android-AutoLayout,2024安卓开发面试题及答案

前言 如果你也学习Android&#xff0c;那么你大概率会看过我的文章。经常有读者给我留言&#xff1a;“该怎么学习Android&#xff1f;”、“日常学习Android的方法是什么”。 所以&#xff0c;今天&#xff0c;我将献上一份《Android知识图谱》&#xff0c;以自身的经验 &…

ABAP 发送带EXCEL邮件

前言 没啥特殊需求&#xff0c;就是有个库龄报表用户想整邮件发送 实现 用的最简单的XLS文件作为excel附件发送出去 观察XLS文件的纯文本格式&#xff0c;每列之间用TAB制表符分隔&#xff0c;每行之间用回车符分隔 思路也比较明确&#xff0c;在SAP中实现这种格式&#xf…

.Net利用Microsoft.Extensions.DependencyInjection配置依赖注入

一、概述 为了让接口程序更加模块化和可测试,采用依赖注入的方式调用接口方法。 二、安装Microsoft.Extensions.DependencyInjection 在NuGet里面搜索Microsoft.Extensions.DependencyInjection,并进行安装。 三、代码编写 3.1 创建Service 实现类 /*****************…

【跨境电商须知】FP独立站的特点和痛点有哪些?

无论是做独立站&#xff0c;还是做亚马逊&#xff0c;都有各自的难点。自己做独立站若要在跨境行业长足发展&#xff0c;既要知道FP独立站有什么特点&#xff0c;要清楚FP独立站的痛点并一一克服。 一、FP独立站的特点 与依赖第三方平台相比&#xff0c;拥有自己的域名、服务器…

Doccano 修复 spacy.gold 的bug

引言 最初只是想把Doccano标注的数据集转换成BIO(类似conll2003数据集)的标注格式&#xff1b; 摘要 可先阅读一下教程&#xff1a;【已解决】关于如何将Doccano标注的文本转换成NER模型可以直接处理的CoNLL 2003格式 装包:pip install doccano-transformer 报错信息 运行…

Adam优化算法

Adam算法&#xff08;Adaptive Moment Estimation&#xff09;是一种用于深度学习模型优化的算法&#xff0c;它结合了动量&#xff08;Momentum&#xff09;和RMSprop&#xff08;Root Mean Square Propagation&#xff09;的概念。Adam算法自2015年提出以来&#xff0c;因其高…

【前端素材】推荐优质后台管理系统DAdmin平台模板(附源码)

一、需求分析 1、系统定义 后台管理系统是一种用于管理网站、应用程序或系统的管理界面&#xff0c;通常由管理员和工作人员使用。它提供了访问和控制网站或应用程序后台功能的工具和界面&#xff0c;使其能够管理用户、内容、数据和其他各种功能。 2、功能需求 后台管理系…

FreeCAD|读取STEP、创建平面、相交、瓶子

FreeCAD是一个基于OpenCASCADE的开源CAD/CAE工具。OpenCASCADE是一套开源的CAD/CAM/CAE几何模型核心&#xff0c;来自法国Matra Datavision公司&#xff0c;是著名的CAD软件EUCLID的开发平台。FreeCAD可运行于Windows以及Linux系统环境下&#xff0c;是一种通用的3D CAD建模工具…

记录 关于navicat连接数据库报错1045的问题

重装数据库之后就连接不上了 报错1045 而网上的解决方案大都是更改数据库密码&#xff0c;但是我在第一步就被卡住无法更改密码&#xff0c;输入指令也报错&#xff0c;检查的环境变量也没错&#xff0c;经过长时间的试错终于找到解决了办法 解决办法 删除data文件夹 如果无法…

积累:Qt 多种数据类型之间的转换方法

前言 开发时经常涉及到数据类型的转换&#xff0c;为方便温故知新、提升开发效率&#xff0c;现将 Qt 开发部分常用的数据类型转换方式形成工具文档供查询、参考。 1. int 转 QString 1&#xff09;函数&#xff1a;QString::number 2&#xff09;函数原型 //将数字&#xff0…

LD: 利用Plink软件进行连锁不平衡计算和绘图

输入文件详解 PLINK主要使用以下三种文件格式: .ped文件:文本文件,列出所有样本的基因型数据。每行代表一个样本,包含个体和家系信息,以及其对应的基因型数据。.map文件:文本文件,与.ped文件配合使用,列出了基因型数据中所有SNP的位置信息。每行代表一个SNP,包含染色…

Python:练习:输出int值a占b的百分之几。例如:输入1和4,输出:25%。

案例&#xff1a; 输出int值a占b的百分之几。例如&#xff1a;输入1和4&#xff0c;输出&#xff1a;25%。 思考&#xff1a; 所有的一步步思考&#xff0c;最后综合起来。 首先&#xff0c;确定 输出&#xff0c;那么就用input&#xff0c;而且是int值&#xff0c;所以肯定…

springboot2.6.5 下配置ForkJoinPool线程池大小

从java1.7开始&#xff0c;引入了parallelStream的方式使用ForkJoinPool多线程处理数据的方式&#xff0c;ForkJoinPool默认线程池大小是cpu内核数-1&#xff0c;并且可以通过以下方式配置线程池大小&#xff1a; System.setProperty("java.util.concurrent.ForkJoinPool…

C++设计模式_创建型模式_工厂方法模式

目录 C设计模式_创建型模式_工厂方法模式 一、简单工厂模式 1.1 简单工厂模式引入 1.2 简单工厂模式 1.3 简单工厂模式利弊分析 1.4 简单工厂模式的UML图 二、工厂方法模式 2.1 工厂模式和简单工厂模式比较 2.2 工厂模式代码实现 2.3 工厂模式UML 三、抽象工厂模式 3.1 战斗场景…

MDS300-16-ASEMI整流模块MDS300-16参数、封装、尺寸

编辑&#xff1a;ll MDS300-16-ASEMI整流模块MDS300-16参数、封装、尺寸 型号&#xff1a;MDS300-16 品牌&#xff1a;ASEMI 封装&#xff1a;M25 最大重复峰值反向电压&#xff1a;1600V 最大正向平均整流电流(Vdss)&#xff1a;300A 功率(Pd)&#xff1a;大功率 芯片…

centos 安装 glibc2.25

在 CentOS 7 系统上安装 glibc 2.25 需要非常谨慎&#xff0c;因为 glibc 是系统核心库之一&#xff0c;升级它可能导致与系统其他组件的兼容性问题。CentOS 7 自带的 glibc 版本较低&#xff0c;直接替换为高版本可能会导致依赖于旧版 glibc 的系统软件崩溃。 以下是一般情况…

Flink——芒果TV的实时数仓建设实践

目录 一、芒果TV实时数仓建设历程 1.1 阶段一&#xff1a;Storm/Flink JavaSpark SQL 1.2 阶段二&#xff1a;Flink SQLSpark SQL 1.3 阶段三&#xff1a;Flink SQLStarRocks 二、自研Flink实时计算调度平台介绍 2.1 现有痛点 2.2 平台架构设计 三、Flink SQL实时数仓分…

面试笔记系列三之spring基础知识点整理及常见面试题

目录 如何实现一个IOC容器? 说说你对Spring 的理解&#xff1f; 你觉得Spring的核心是什么&#xff1f; 说一下使用spring的优势&#xff1f; Spring是如何简化开发的&#xff1f; IOC 运行时序 prepareRefresh() 初始化上下文环境 obtainFreshBeanFactory() 创建并…

Linux系统加固:如何有效管理系统账号

Linux系统加固&#xff1a;如何有效管理系统账号 1.1 口令重复次数限制1.2 避免系统存在uid相同的账号1.3 空密码的帐户1.4 口令复杂度1.5 口令生存期1.6 登录失败次数锁定策略 &#x1f496;The Begin&#x1f496;点点关注&#xff0c;收藏不迷路&#x1f496; 在Linux系统中…