Springboot之整合SSE实现消息推送

Springboot之整合SSE实现消息推送


前言

项目中涉及到部分请求,后端处理时间较长,使用常规Http请求,页面等待时间太长,对用户不友好,故考虑使用长链接进行消息推送,可选方案有WebSocket、SSE,WebSocket可实现双工通信,SSE仅支持服务端向客户端推送消息,根据实际使用场景,SSE即可满足,故选用SSE。

一、SSE是什么?

SSE技术是基于单工通信模式,只是单纯的客户端向服务端发送请求,服务端不会主动发送给客户端。服务端采取的策略是抓住这个请求不放,等数据更新的时候才返回给客户端,当客户端接收到消息后,再向服务端发送请求,周而复始。

  • 注意:因为EventSource对象是SSE的客户端,可能会有浏览器对其不支持,但谷歌、火狐、360是可以的,IE不可以。
  • 优点:SSE和WebSocket相比,最大的优势是便利,服务端不需要其他的类库,开发难度较低,SSE和轮询相比它不用处理很多请求,不用每次建立新连接,延迟较低。
  • 缺点:如果客户端有很多,那就要保持很多长连接,这会占用服务器大量内存和连接数
  • sse 规范:在 html5 的定义中,服务端 sse,一般需要遵循以下要求:
    Content-Type: text/event-stream;
    charset=UTF-8Cache-Control: no-cache
    Connection: keep-alive

二、使用步骤

1.客户端代码示例

<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>SseEmitter</title>
</head>
<body>
<button onclick="closeSse()">关闭连接</button>
<div id="message"></div>
</body>
<script>let source = null;// 用时间戳模拟登录用户const userId = new Date().getTime();if (!!window.EventSource) {// 建立连接source = new EventSource('http://ip:端口/CreateSseConnect?clientId=39bd662b7942418595c21a1ef0af7fad');/*** 连接一旦建立,就会触发open事件* 另一种写法:source.onopen = function (event) {}*/source.addEventListener('open', function (e) {setMessageInnerHTML("建立连接。。。");}, false);/*** 客户端收到服务器发来的数据* 另一种写法:source.onmessage = function (event) {}*/source.addEventListener('message', function (e) {setMessageInnerHTML(e.data);});/*** 如果发生通信错误(比如连接中断),就会触发error事件* 或者:* 另一种写法:source.onerror = function (event) {}*/source.addEventListener('error', function (e) {if (e.readyState === EventSource.CLOSED) {setMessageInnerHTML("连接关闭");} else {console.log(e);}}, false);} else {setMessageInnerHTML("你的浏览器不支持SSE");}// 监听窗口关闭事件,主动去关闭sse连接,如果服务端设置永不过期,浏览器关闭后手动清理服务端数据window.onbeforeunload = function () {closeSse();};// 关闭Sse连接function closeSse() {source.close();const httpRequest = new XMLHttpRequest();httpRequest.open('GET', 'http://localhost:8080/sse/CloseConnect/?clientId=e410d4c1d71c469b8d719de5d39783b7', true);httpRequest.send();console.log("close");}// 将消息显示在网页上function setMessageInnerHTML(innerHTML) {document.getElementById('message').innerHTML += innerHTML + '<br/>';}
</script>
</html>

2.服务端整合

Controller:

/*** SSE长链接*/
@RestController
@RequestMapping("/sse")
public class SseEmitterController {@Autowiredprivate SseEmitterService sseEmitterService;/*** 创建SSE长链接** @param clientId   客户端唯一ID(如果为空,则由后端生成并返回给前端)* @return org.springframework.web.servlet.mvc.method.annotation.SseEmitter**/@CrossOrigin //如果nginx做了跨域处理,此处可去掉@GetMapping("/CreateSseConnect")public SseEmitter createSseConnect(@RequestParam(name = "clientId", required = false) String clientId) {return sseEmitterService.createSseConnect(clientId);}/*** 关闭SSE连接** @param clientId 客户端ID**/@GetMapping("/CloseSseConnect")public Result closeSseConnect(String clientId) {sseEmitterService.closeSseConnect(clientId);return ResultGenerator.genSuccessResult(true);}}

ServiceImpl

@Service
public class SseEmitterServiceImpl implements SseEmitterService {/*** 容器,保存连接,用于输出返回*/private static Map<String, SseEmitter> sseCache = new ConcurrentHashMap<>();@Overridepublic SseEmitter createSseConnect(String clientId) {// 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutExceptionSseEmitter sseEmitter = new SseEmitter(0L);// 是否需要给客户端推送IDif (StringUtils.isBlank(clientId)) {clientId = IdUtil.simpleUUID();}// 注册回调sseEmitter.onCompletion(completionCallBack(clientId));sseCache.put(clientId, sseEmitter);logger.info("创建新的sse连接,当前用户:{}", clientId);try {sseEmitter.send(SseEmitter.event().id(SseEmitterConstant.CLIENT_ID).data(clientId));} catch (IOException e) {logger.error("SseEmitterServiceImpl[createSseConnect]: 创建长链接异常,客户端ID:{}", clientId, e);throw new BusinessException("创建连接异常!", e);}return sseEmitter;}@Overridepublic void closeSseConnect(String clientId) {SseEmitter sseEmitter = sseCache.get(clientId);if (sseEmitter != null) {sseEmitter.complete();removeUser(clientId);}}// 根据客户端id获取SseEmitter对象@Overridepublic SseEmitter getSseEmitterByClientId(String clientId) {return sseCache.get(clientId);}// 推送消息到客户端,此处结合业务代码,业务中需要推送消息处调用即可向客户端主动推送消息@Overridepublic void sendMsgToClient(List<SseEmitterResultVO> sseEmitterResultVOList) {if (CollectionUtil.isEmpty(sseCache)) {return;}for (Map.Entry<String, SseEmitter> entry : sseCache.entrySet()) {sendMsgToClientByClientId(entry.getKey(), sseEmitterResultVOList, entry.getValue());}}/*** 推送消息到客户端* 此处做了推送失败后,重试推送机制,可根据自己业务进行修改** @param clientId               客户端ID* @param sseEmitterResultVOList 推送信息,此处结合具体业务,定义自己的返回值即可**/private void sendMsgToClientByClientId(String clientId, List<SseEmitterResultVO> sseEmitterResultVOList, SseEmitter sseEmitter) {if (sseEmitter == null) {logger.error("SseEmitterServiceImpl[sendMsgToClient]: 推送消息失败:客户端{}未创建长链接,失败消息:{}",clientId, sseEmitterResultVOList.toString());return;}SseEmitter.SseEventBuilder sendData = SseEmitter.event().id(SseEmitterConstant.TASK_RESULT).data(sseEmitterResultVOList, MediaType.APPLICATION_JSON);try {sseEmitter.send(sendData);} catch (IOException e) {// 推送消息失败,记录错误日志,进行重推logger.error("SseEmitterServiceImpl[sendMsgToClient]: 推送消息失败:{},尝试进行重推", sseEmitterResultVOList.toString(), e);boolean isSuccess = true;// 推送消息失败后,每隔10s推送一次,推送5次for (int i = 0; i < 5; i++) {try {Thread.sleep(10000);sseEmitter = sseCache.get(clientId);if (sseEmitter == null) {logger.error("SseEmitterServiceImpl[sendMsgToClient]:{}的第{}次消息重推失败,未创建长链接", clientId, i + 1);continue;}sseEmitter.send(sendData);} catch (Exception ex) {logger.error("SseEmitterServiceImpl[sendMsgToClient]:{}的第{}次消息重推失败", clientId, i + 1, ex);continue;}logger.info("SseEmitterServiceImpl[sendMsgToClient]:{}的第{}次消息重推成功,{}", clientId, i + 1, sseEmitterResultVOList.toString());return;}}}/*** 长链接完成后回调接口(即关闭连接时调用)** @param clientId 客户端ID* @return java.lang.Runnable**/private Runnable completionCallBack(String clientId) {return () -> {logger.info("结束连接:{}", clientId);removeUser(clientId);};}/*** 连接超时时调用** @param clientId 客户端ID* @return java.lang.Runnable**/private Runnable timeoutCallBack(String clientId) {return () -> {logger.info("连接超时:{}", clientId);removeUser(clientId);};}/*** 推送消息异常时,回调方法** @param clientId 客户端ID* @return java.util.function.Consumer<java.lang.Throwable>**/private Consumer<Throwable> errorCallBack(String clientId) {return throwable -> {logger.error("SseEmitterServiceImpl[errorCallBack]:连接异常,客户端ID:{}", clientId);// 推送消息失败后,每隔10s推送一次,推送5次for (int i = 0; i < 5; i++) {try {Thread.sleep(10000);SseEmitter sseEmitter = sseCache.get(clientId);if (sseEmitter == null) {logger.error("SseEmitterServiceImpl[errorCallBack]:第{}次消息重推失败,未获取到 {} 对应的长链接", i + 1, clientId);continue;}sseEmitter.send("失败后重新推送");} catch (Exception e) {e.printStackTrace();}}};}/*** 移除用户连接** @param clientId 客户端ID**/private void removeUser(String clientId) {sseCache.remove(clientId);logger.info("SseEmitterServiceImpl[removeUser]:移除用户:{}", clientId);}
}

3. Nginx配置

如果项目中使用nginx对后端服务做了代理,nginx代理转发后,默认会在1min的时候断掉长链接,SSE需要设置自己的长链接时间,则需要在nginx中进行配置;
在反向代理的location块中加入如下配置

proxy_set_header Host $http_host;  ##proxy_set_header用来重定义发往后端服务器的请求头
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_buffering off;
proxy_http_version  1.1;
proxy_read_timeout 600s; ##设置SSE长链接保持时间为 600s

4. 请求示例

在这里插入图片描述
在这里插入图片描述

常见问题

1、前端报错:EventSource’s response has a MIME type (“application/json”) that is not “text/event-stream”. Aborting the connection
前端在创建SSE长链接时,完整的请求(包括参数和参数值)都必须放在new EventSource(完整请求)中;
2、创建长链接时,接口状态一直处于pending,检查后端nginx是否做相应配置;
3、推送消息失败:检查客户端创建链接时的id,和推送消息时的id是否一致;

总结

整体业务流程为:客户端创建链接——>服务端保持生成SseEmitter对象,并通过SseEmitter对象实现向客户端主动推送消息——>客户端收到推送消息后,刷新页面(根据推送消息,请求相关业务接口)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/554919.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Vue中npm run dev 和 npm run serve区别

在运行vue文件时&#xff0c;需要进行npm操作&#xff0c;但我们发现&#xff0c;有时候用的是npm run serve&#xff0c;而有的时候用的是npm run dev&#xff0c;二者有什么区别 在我们运行一些 vue 项目的时候&#xff0c;输入npm run serve或者 npm run dev的其中一个时&a…

SpringBoot导出数据为PDF

一、SpringBoot导出数据为PDF 1、添加所需依赖 <dependency><groupId>com.itextpdf</groupId><artifactId>itextpdf</artifactId><version>5.5.11</version> </dependency> <dependency><groupId>com.itextpdf.…

php安装mem+cache扩展,安装memcached及php扩展

用的是centos系统1、安装memcachedyum -y install memcached安装完成后&#xff0c;memcached -h应该会出现memcached 参数说明2、memcached配置文件vi /etc/sysconfig/memcachedPORT"11210"USER"memcached"MAXCONN"1024"CACHESIZE"64"…

Springboot集成支付宝沙箱支付(完整版)

开发前准备 easy支付官方文档&#xff1a;https://opendocs.alipay.com/open/009ys9 通用版文档&#xff1a;https://opendocs.alipay.com/open/02np94 支付宝沙箱的配置 注册支付宝开发者账户&#xff0c;进入开发者控制台 https://openhome.alipay.com/platform/developer…

Springboot集成支付宝沙箱支付(退款功能)

包括&#xff1a; 支付宝沙箱 支付 异步通知 退款功能 正式版本的sdk 通用版本SDK文档&#xff1a;https://opendocs.alipay.com/open/02np94 <dependency><groupId>com.alipay.sdk</groupId><artifactId>alipay-sdk-java</artifactId><…

Java递归生成树

1.建菜单表 CREATE TABLE t_menu (id int(11) NOT NULL AUTO_INCREMENT,pid int(11) NOT NULL,name varchar(255) DEFAULT NULL,PRIMARY KEY (id) ) ENGINEInnoDB AUTO_INCREMENT11 DEFAULT CHARSETutf8mb4;2.造一些数据 注意&#xff1a;根节点的pid0&#xff0c;其他节点的p…

Springboot获取公网IP和当前所在城市(非常简单)

最近我们发现各大社交平台都出现了一个新的功能&#xff1a;IP属地。 比如某乎&#xff1a; 这个IP属地是怎么做到的呢&#xff1f;今天我来教教你&#xff0c;保证你看完直呼Easy~ 百度搜索 打开百度&#xff0c;搜索IP&#xff0c;你就能看到你当前的IP地址&#xff0c;类…

线程死锁——死锁产生的条件

什么是线程死锁 线程死锁是指由于两个或者多个线程互相持有对方所需要的资源&#xff0c;导致这些线程处于相互等待状态&#xff0c;若无外力作用&#xff0c;它们将无法继续执行下去。 造成死锁的原因可以概括成三句话&#xff1a; 当前线程拥有其他线程需要的资源当前线程…

TortoiseGit的使用详解

Git是什么&#xff0c;相信大家都很清楚。Git不就是分布式版本控制系统嘛&#xff1f;那你知道TortoiseGit是什么吗&#xff1f;下面我们就介绍一下TortoiseGit它是什么&#xff1f;如何使用&#xff1f;   TortoiseGit其实是一款开源的git的版本控制系统&#xff0c;也叫海龟…

将项目上传到Gitee上(命令方式使用TortoiseGit方式)

如何将项目上传到Gitee上&#xff08;命令方式&#xff09; 目录 将项目上传到Gitee是我们经常需要使用到的操作&#xff0c;因此我们要熟悉这些步骤 一、首先保证本机已经安装了Git git官网安装完成之后&#xff0c;鼠标右键会出现Git GUI Here和Git Bash Here 二、上传代…

BufferedImage类、Image类、Graphics类

BufferedImage Image是一个抽象类&#xff0c;BufferedImage是其实现类&#xff0c;是一个带缓冲区图像类&#xff0c;主要作用是将一幅图片加载到内存中&#xff08;BufferedImage生成的图片在内存里有一个图像缓冲区&#xff0c;利用这个缓冲区我们可以很方便地操作这个图片&…

Vue this.$refs的作用

案例一、ref 写在标签上时 <!-- ref 写在标签上时&#xff1a;this.$refs.名字 获取的是标签对应的dom元素ref 写在组件上时&#xff1a;这时候获取到的是 子组件&#xff08;比如counter&#xff09;的引用--><div id"root"><!-- ref hello&#…

linux电脑合盖后卡住了,解决ubuntu合盖后无法唤醒

解决办法&#xff1a;安装laptop-mode-tools工具包。1.检查是否安装了grep laptop-mode-tools 工具包$ dpkg -l | grep laptop-mode-tools如果执行命令无结果输出&#xff0c;表示未安装(如果已安装&#xff0c;忽略第2步)2.安装laptop-mode执行命令&#xff1a;$ sudo apt-get…

三列布局 css

实现如下图的三列布局&#xff1a; .box {width:1400px;margin:0 auto;padding-bottom:40px;> .left {float:left;width:180px;margin-top:100px;text-align:center;}> .center {float:left;margin-top:100px;margin-left:130px;item-box {float:left;text-align:left;…

axios和ajax的区别是什么

axios和ajax的区别&#xff1a; 1、axios是一个基于Promise的HTTP库&#xff0c;而ajax是对原生XHR的封装&#xff1b; 2、ajax技术实现了局部数据的刷新&#xff0c;而axios实现了对ajax的封装。 axios和ajax的区别是什么? axios和ajax的区别及优缺点: ajax&#xff1a; 1…

VUE学习笔记详细

VUE学习笔记 本文章以vue3来记录的&#xff0c;但并非记录vue3所有变化&#xff01; 1、ES6语法 1.1、let变量声明 let用于声明变量有局部作用域let声明的变量不会提升&#xff08;只能先定义后使用&#xff09; 1.2、const变量声明 const用于声明常量const声明的常量也不会…

Centos7配置gitlab服务器

Centos7配置gitlab服务器 1、安装SSH yum install -y curl policycoreutils-pythonopenssh-server设置开机自启 sudo systemctl enable sshd启动服务 sudo systemctl start sshd2、安装postfix 邮件服务 sudo yum install postfix设置开机自启 sudo systemctl enable po…

Jenkins学习笔记详细

最近接触了jenkins这个东西&#xff0c;所以花点时间了解了下。它可以在代码上传仓库&#xff08;如github,gitee&#xff0c;gitlab&#xff09;后&#xff0c;在jenkins&#xff08;一个网站界面&#xff09;中通过获取代码仓库中最新代码&#xff0c;进行自动化部署&#xf…

Form Data与Request Payload,你真的了解吗?

前言 做过前后端联调的小伙伴&#xff0c;可能有时会遇到一些问题。例如&#xff0c;我明明传递数据给后端了&#xff0c;后端为什么说没收到呢&#xff1f;这时候可能就会就会有小伙伴陷入迷茫&#xff0c;本文从chrome-dev-tools&#xff08;F12调试器&#xff09;中看到的F…

计算机网络知识点复习

基础 1.说下计算机网络体系结构 计算机网络体系结构&#xff0c;一般有三种&#xff1a;OSI 七层模型、TCP/IP 四层模型、五层结构。 简单说&#xff0c;OSI是一个理论上的网络通信模型&#xff0c;TCP/IP是实际上的网络通信模型&#xff0c;五层结构就是为了介绍网络原理而折…