Springboot之整合SSE实现消息推送
前言
项目中涉及到部分请求,后端处理时间较长,使用常规Http请求,页面等待时间太长,对用户不友好,故考虑使用长链接进行消息推送,可选方案有WebSocket、SSE,WebSocket可实现双工通信,SSE仅支持服务端向客户端推送消息,根据实际使用场景,SSE即可满足,故选用SSE。
一、SSE是什么?
SSE技术是基于单工通信模式,只是单纯的客户端向服务端发送请求,服务端不会主动发送给客户端。服务端采取的策略是抓住这个请求不放,等数据更新的时候才返回给客户端,当客户端接收到消息后,再向服务端发送请求,周而复始。
- 注意:因为EventSource对象是SSE的客户端,可能会有浏览器对其不支持,但谷歌、火狐、360是可以的,IE不可以。
- 优点:SSE和WebSocket相比,最大的优势是便利,服务端不需要其他的类库,开发难度较低,SSE和轮询相比它不用处理很多请求,不用每次建立新连接,延迟较低。
- 缺点:如果客户端有很多,那就要保持很多长连接,这会占用服务器大量内存和连接数
- sse 规范:在 html5 的定义中,服务端 sse,一般需要遵循以下要求:
Content-Type: text/event-stream;
charset=UTF-8Cache-Control: no-cache
Connection: keep-alive
二、使用步骤
1.客户端代码示例
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>SseEmitter</title>
</head>
<body>
<button onclick="closeSse()">关闭连接</button>
<div id="message"></div>
</body>
<script>let source = null;// 用时间戳模拟登录用户const userId = new Date().getTime();if (!!window.EventSource) {// 建立连接source = new EventSource('http://ip:端口/CreateSseConnect?clientId=39bd662b7942418595c21a1ef0af7fad');/*** 连接一旦建立,就会触发open事件* 另一种写法:source.onopen = function (event) {}*/source.addEventListener('open', function (e) {setMessageInnerHTML("建立连接。。。");}, false);/*** 客户端收到服务器发来的数据* 另一种写法:source.onmessage = function (event) {}*/source.addEventListener('message', function (e) {setMessageInnerHTML(e.data);});/*** 如果发生通信错误(比如连接中断),就会触发error事件* 或者:* 另一种写法:source.onerror = function (event) {}*/source.addEventListener('error', function (e) {if (e.readyState === EventSource.CLOSED) {setMessageInnerHTML("连接关闭");} else {console.log(e);}}, false);} else {setMessageInnerHTML("你的浏览器不支持SSE");}// 监听窗口关闭事件,主动去关闭sse连接,如果服务端设置永不过期,浏览器关闭后手动清理服务端数据window.onbeforeunload = function () {closeSse();};// 关闭Sse连接function closeSse() {source.close();const httpRequest = new XMLHttpRequest();httpRequest.open('GET', 'http://localhost:8080/sse/CloseConnect/?clientId=e410d4c1d71c469b8d719de5d39783b7', true);httpRequest.send();console.log("close");}// 将消息显示在网页上function setMessageInnerHTML(innerHTML) {document.getElementById('message').innerHTML += innerHTML + '<br/>';}
</script>
</html>
2.服务端整合
Controller:
/*** SSE长链接*/
@RestController
@RequestMapping("/sse")
public class SseEmitterController {@Autowiredprivate SseEmitterService sseEmitterService;/*** 创建SSE长链接** @param clientId 客户端唯一ID(如果为空,则由后端生成并返回给前端)* @return org.springframework.web.servlet.mvc.method.annotation.SseEmitter**/@CrossOrigin //如果nginx做了跨域处理,此处可去掉@GetMapping("/CreateSseConnect")public SseEmitter createSseConnect(@RequestParam(name = "clientId", required = false) String clientId) {return sseEmitterService.createSseConnect(clientId);}/*** 关闭SSE连接** @param clientId 客户端ID**/@GetMapping("/CloseSseConnect")public Result closeSseConnect(String clientId) {sseEmitterService.closeSseConnect(clientId);return ResultGenerator.genSuccessResult(true);}}
ServiceImpl
@Service
public class SseEmitterServiceImpl implements SseEmitterService {/*** 容器,保存连接,用于输出返回*/private static Map<String, SseEmitter> sseCache = new ConcurrentHashMap<>();@Overridepublic SseEmitter createSseConnect(String clientId) {// 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutExceptionSseEmitter sseEmitter = new SseEmitter(0L);// 是否需要给客户端推送IDif (StringUtils.isBlank(clientId)) {clientId = IdUtil.simpleUUID();}// 注册回调sseEmitter.onCompletion(completionCallBack(clientId));sseCache.put(clientId, sseEmitter);logger.info("创建新的sse连接,当前用户:{}", clientId);try {sseEmitter.send(SseEmitter.event().id(SseEmitterConstant.CLIENT_ID).data(clientId));} catch (IOException e) {logger.error("SseEmitterServiceImpl[createSseConnect]: 创建长链接异常,客户端ID:{}", clientId, e);throw new BusinessException("创建连接异常!", e);}return sseEmitter;}@Overridepublic void closeSseConnect(String clientId) {SseEmitter sseEmitter = sseCache.get(clientId);if (sseEmitter != null) {sseEmitter.complete();removeUser(clientId);}}// 根据客户端id获取SseEmitter对象@Overridepublic SseEmitter getSseEmitterByClientId(String clientId) {return sseCache.get(clientId);}// 推送消息到客户端,此处结合业务代码,业务中需要推送消息处调用即可向客户端主动推送消息@Overridepublic void sendMsgToClient(List<SseEmitterResultVO> sseEmitterResultVOList) {if (CollectionUtil.isEmpty(sseCache)) {return;}for (Map.Entry<String, SseEmitter> entry : sseCache.entrySet()) {sendMsgToClientByClientId(entry.getKey(), sseEmitterResultVOList, entry.getValue());}}/*** 推送消息到客户端* 此处做了推送失败后,重试推送机制,可根据自己业务进行修改** @param clientId 客户端ID* @param sseEmitterResultVOList 推送信息,此处结合具体业务,定义自己的返回值即可**/private void sendMsgToClientByClientId(String clientId, List<SseEmitterResultVO> sseEmitterResultVOList, SseEmitter sseEmitter) {if (sseEmitter == null) {logger.error("SseEmitterServiceImpl[sendMsgToClient]: 推送消息失败:客户端{}未创建长链接,失败消息:{}",clientId, sseEmitterResultVOList.toString());return;}SseEmitter.SseEventBuilder sendData = SseEmitter.event().id(SseEmitterConstant.TASK_RESULT).data(sseEmitterResultVOList, MediaType.APPLICATION_JSON);try {sseEmitter.send(sendData);} catch (IOException e) {// 推送消息失败,记录错误日志,进行重推logger.error("SseEmitterServiceImpl[sendMsgToClient]: 推送消息失败:{},尝试进行重推", sseEmitterResultVOList.toString(), e);boolean isSuccess = true;// 推送消息失败后,每隔10s推送一次,推送5次for (int i = 0; i < 5; i++) {try {Thread.sleep(10000);sseEmitter = sseCache.get(clientId);if (sseEmitter == null) {logger.error("SseEmitterServiceImpl[sendMsgToClient]:{}的第{}次消息重推失败,未创建长链接", clientId, i + 1);continue;}sseEmitter.send(sendData);} catch (Exception ex) {logger.error("SseEmitterServiceImpl[sendMsgToClient]:{}的第{}次消息重推失败", clientId, i + 1, ex);continue;}logger.info("SseEmitterServiceImpl[sendMsgToClient]:{}的第{}次消息重推成功,{}", clientId, i + 1, sseEmitterResultVOList.toString());return;}}}/*** 长链接完成后回调接口(即关闭连接时调用)** @param clientId 客户端ID* @return java.lang.Runnable**/private Runnable completionCallBack(String clientId) {return () -> {logger.info("结束连接:{}", clientId);removeUser(clientId);};}/*** 连接超时时调用** @param clientId 客户端ID* @return java.lang.Runnable**/private Runnable timeoutCallBack(String clientId) {return () -> {logger.info("连接超时:{}", clientId);removeUser(clientId);};}/*** 推送消息异常时,回调方法** @param clientId 客户端ID* @return java.util.function.Consumer<java.lang.Throwable>**/private Consumer<Throwable> errorCallBack(String clientId) {return throwable -> {logger.error("SseEmitterServiceImpl[errorCallBack]:连接异常,客户端ID:{}", clientId);// 推送消息失败后,每隔10s推送一次,推送5次for (int i = 0; i < 5; i++) {try {Thread.sleep(10000);SseEmitter sseEmitter = sseCache.get(clientId);if (sseEmitter == null) {logger.error("SseEmitterServiceImpl[errorCallBack]:第{}次消息重推失败,未获取到 {} 对应的长链接", i + 1, clientId);continue;}sseEmitter.send("失败后重新推送");} catch (Exception e) {e.printStackTrace();}}};}/*** 移除用户连接** @param clientId 客户端ID**/private void removeUser(String clientId) {sseCache.remove(clientId);logger.info("SseEmitterServiceImpl[removeUser]:移除用户:{}", clientId);}
}
3. Nginx配置
如果项目中使用nginx对后端服务做了代理,nginx代理转发后,默认会在1min的时候断掉长链接,SSE需要设置自己的长链接时间,则需要在nginx中进行配置;
在反向代理的location块中加入如下配置
proxy_set_header Host $http_host; ##proxy_set_header用来重定义发往后端服务器的请求头
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_buffering off;
proxy_http_version 1.1;
proxy_read_timeout 600s; ##设置SSE长链接保持时间为 600s
4. 请求示例
常见问题
1、前端报错:EventSource’s response has a MIME type (“application/json”) that is not “text/event-stream”. Aborting the connection
前端在创建SSE长链接时,完整的请求(包括参数和参数值)都必须放在new EventSource(完整请求)中;
2、创建长链接时,接口状态一直处于pending,检查后端nginx是否做相应配置;
3、推送消息失败:检查客户端创建链接时的id,和推送消息时的id是否一致;
总结
整体业务流程为:客户端创建链接——>服务端保持生成SseEmitter对象,并通过SseEmitter对象实现向客户端主动推送消息——>客户端收到推送消息后,刷新页面(根据推送消息,请求相关业务接口)