一、服务端实现
-
使用
@RestController
注解创建一个控制器类(Controller) -
创建一个方法来创建一个客户端连接,它返回一个
SseEmitter
,处理 GET 请求并产生(produces)文本/事件流 (text/event-stream
) -
创建一个新的 SseEmitter, 保存它并从方法中返回
-
在另一个线程中异步发送事件, 先拿到保存的 SseEmitter 并根据需要多次调用调用
SseEmitter.send()
方法 -
完成事件发送, 调用
SseEmitter.complete()
方法 -
要异常完成发送事件,请调用
SseEmitter.completeWithError()
方法
/** xxx.com* Copyright (C) 2021-2024 All Rights Reserved.*/
package com.sse.demo.controller;import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** @author xxx* @version SseController.java, v 0.1 2024-07-11 10:11*/
@Slf4j
@RestController
@RequestMapping("/sse")
public class SseController {private static final Map<String, SseEmitter> SSE_EMITTER_MAP = new ConcurrentHashMap<>();/*** 创建连接*/@GetMapping("/create-connect")public SseEmitter createConnect(@RequestParam("userId") String userId) {try {// 设置超时时间,0表示不过期。默认30秒SseEmitter sseEmitter = new SseEmitter(0L);// 注册回调sseEmitter.onCompletion(() -> removeSseConnection(userId, "SSE连接已关闭"));sseEmitter.onError(throwable -> removeSseConnection(userId, "SSE连接出现错误"));sseEmitter.onTimeout(() -> removeSseConnection(userId, "SSE连接超时"));SSE_EMITTER_MAP.put(userId, sseEmitter);log.info("创建了用户[{}]的SSE连接", userId);return sseEmitter;} catch (Exception e) {log.error("创建新的SSE连接异常,当前用户:" + userId, e);return null;}}/*** 发送消息*/@GetMapping("/send-message")public void sendMessage(@RequestParam("userId") String userId, @RequestParam("message") String message) {SseEmitter sseEmitter = SSE_EMITTER_MAP.get(userId);if (sseEmitter != null) {try {sseEmitter.send(SseEmitter.event().name("message").data(message).reconnectTime(5000));log.info("给用户[{}]发送消息成功: {}", userId, message);} catch (Exception e) {log.error("给用户[{}]发送消息失败: {}", userId, e.getMessage(), e);// 如果发送失败,尝试从map中移除失效的SseEmitterremoveSseConnection(userId, "发送消息失败");}} else {log.info("用户[{}]的SSE连接不存在或已关闭,无法发送消息", userId);}}private void removeSseConnection(String userId, String reason) {SSE_EMITTER_MAP.computeIfPresent(userId, (key, sseEmitter) -> {sseEmitter.complete();log.info("用户[{}]的SSE连接已移除,原因:{}", userId, reason);return null;});}
}
二、客户端实现
创建多个 index.html文件,放在 static 目录下,用不同的浏览器打开,实现向多个用户推送的场景。
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>SSE Demo</title><script> document.addEventListener('DOMContentLoaded', function () {var userId = "1";// 创建一个新的EventSource对象var source = new EventSource('http://localhost:8080/sse/create-connect?userId=' + userId);// 当连接打开时触发source.onopen = function (event) {console.log('SSE连接已打开');};// 当从服务器接收到消息时触发source.onmessage = function (event) {// event.data 包含服务器发送的文本数据console.log('接收到消息:', event.data);// 在页面上显示消息var messagesDiv = document.getElementById('messages');if (messagesDiv) {messagesDiv.innerHTML += '<p>' + event.data + '</p>'; // 直接使用event.data} else {console.error('未找到消息容器元素');}};// 当发生错误时触发source.onerror = function (event) {console.error('SSE连接错误:', event);};});</script>
</head>
<body>
<div id="messages"><!-- 这里将显示接收到的消息 -->
</div>
</body>
</html>
三、启动项目
- 运行 Spring 项目
- 浏览器打开 index.html文件
- 调用发送消息接口
curl http://localhost:8080/sse/send-message\?userId\=1\&message\=test0001
打开多个连接,用 userId 就可以实现向不同的用户推送的逻辑了。
四、总结
上面已经实现了最基本的消息推送需求,但是我们还可以思考一下实际生产中,我们还需要做哪些优化?
- 如果我们服务设置了最大连接时间,比如 3 分钟,而服务端又长时间没有消息推送给客户端,导致长连接被关闭该怎么办?
- 实际生产环境,我们肯定是多个实例部署,那么怎么保证创建连接和发送消息是在同一个实例完成?如果不是一个实例,就意味着用户没有建立连接,消息肯定发送失败。
下一篇博客,再做具体优化。