SSE(Server-Send-Event)服务端推送数据技术
大家是否遇到过服务端需要主动传输数据到客户端的情况,目前有三种解决方案。
- 客户端轮询更新数据。
- 服务端与客户端建立 Socket 连接双向通信
- 服务端与客户建立 SSE 连接单向通信
几种方案的比较:
-
轮询:
客户端通过频繁请求向服务端请求数据,达到类似实时更新的效果。轮询的优点是实现简单,但是会给服务端和网络带来额外的压力,且延迟较高。
-
WebSocket连接:
服务端与客户端建立Socket连接进行数据传输,Socket的传输方式是全双工的。WebSocket是基于 TCP 的长连接,和HTTP 协议相比,它能实现轻量级的、低延迟的数据传输,非常适合实时通信场景,主要用于交互性强的双向通信。
-
SSE推送:
SSE(Server-Sent Events)是一种基于 HTTP 协议的推送技术,只允许单向通讯。相较于 WebSocket,SSE 更简单、更轻量级。
下面是SpringBoot使用SSE的步骤和示例代码
-
配置依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-validation</artifactId></dependency>
SSE已经集成到spring-web中,所以可以直接使用。
-
后端代码
import com.wry.wry_test.service.SseService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.*; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import javax.validation.constraints.NotBlank; import java.util.concurrent.CompletableFuture;@RestController @RequestMapping("/sse") @Slf4j @Validated public class SseTestController {@Autowiredprivate SseService service;@GetMapping("/testSse")public SseEmitter testSse(@RequestParam("clientId") @NotBlank(message = "客户端id不能为空") String clientId) {final SseEmitter emitter = service.getConn(clientId);CompletableFuture.runAsync(() -> {try {service.send(clientId);log.info("建立连接成功!clientId = {}", clientId);} catch (Exception e) {log.error("推送数据异常");}});return emitter;}@GetMapping("/sseConection")public SseEmitter createConnection(@RequestParam("clientId") @NotBlank(message = "客户端id不能为空") String clientId) {return service.getConn(clientId);}@GetMapping("/sendMsg")public void sendMsg(@RequestParam("clientId") String clientId) {try {// 异步发送消息CompletableFuture.runAsync(() -> {try {service.send(clientId);} catch (Exception e) {log.error("推送数据异常");}});} catch (Exception e) {e.printStackTrace();}}@GetMapping("/sendMsgToAll")public void sendMsgToAll() {try {//异步发送消息CompletableFuture.runAsync(() -> {try {service.sendToAll();} catch (Exception e) {e.printStackTrace();}});} catch (Exception e) {e.printStackTrace();}}@GetMapping("closeConn/{clientId}")public String closeConn(@PathVariable("clientId") @NotBlank(message = "客户端id不能为空") String clientId) {service.closeConn(clientId);return "连接已关闭";}}
package com.wry.wry_test.service;import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import javax.validation.constraints.NotBlank;public interface SseService {/*** 获取连接* @param clientId 客户端id* @return*/SseEmitter getConn(String clientId);/*** 发送消息到指定客户端* @param clientId 客户端id* @throws Exception*/void send(String clientId);/*** 发送消息到所有SSE客户端* @throws Exception*/void sendToAll() throws Exception;/*** 关闭指定客户端的连接* @param clientId 客户端id*/void closeConn(String clientId); }
package com.wry.wry_test.service.impl;import com.wry.wry_test.service.SseService; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import javax.validation.constraints.NotBlank; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap;@Service @Slf4j public class SseServiceImpl implements SseService {private static final Map<String, SseEmitter> SSE_CACHE = new ConcurrentHashMap<>();@Overridepublic SseEmitter getConn(@NotBlank String clientId) {final SseEmitter sseEmitter = SSE_CACHE.get(clientId);if (sseEmitter != null) {return sseEmitter;} else {// 设置连接超时时间,需要配合配置项 spring.mvc.async.request-timeout: 600000 一起使用final SseEmitter emitter = new SseEmitter(600_000L);// 注册超时回调,超时后触发emitter.onTimeout(() -> {log.info("连接已超时,正准备关闭,clientId = {}", clientId);SSE_CACHE.remove(clientId);});// 注册完成回调,调用 emitter.complete() 触发emitter.onCompletion(() -> {log.info("连接已关闭,正准备释放,clientId = {}", clientId);SSE_CACHE.remove(clientId);log.info("连接已释放,clientId = {}", clientId);});// 注册异常回调,调用 emitter.completeWithError() 触发emitter.onError(throwable -> {log.error("连接已异常,正准备关闭,clientId = {}", clientId, throwable);SSE_CACHE.remove(clientId);});SSE_CACHE.put(clientId, emitter);log.info("建立连接成功!clientId = {}", clientId);return emitter;}}/*** 模拟类似于 chatGPT 的流式推送回答** @param clientId 客户端 id* @throws IOException 异常*/@Overridepublic void send(@NotBlank String clientId) {final SseEmitter emitter = SSE_CACHE.get(clientId);if (emitter == null) return;// 开始推送数据// todo 模拟推送数据for (int i = 0; i < 10000000; i++) {String msg = "SSE 测试数据";try {this.sseSend(emitter, msg, clientId);Thread.sleep(1000);} catch (Exception e) {log.error("推送数据异常", e);break;}}log.info("推送数据结束,clientId = {}", clientId);// 结束推流emitter.complete();}/*** 发送数据给所有连接*/public void sendToAll() {List<SseEmitter> emitters = new ArrayList<>(SSE_CACHE.values());for (int i = 0; i < 10000000; i++) {String msg = "SSE 测试数据";this.sseSend(emitters, msg);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}@Overridepublic void closeConn(@NotBlank String clientId) {final SseEmitter sseEmitter = SSE_CACHE.get(clientId);if (sseEmitter != null) {sseEmitter.complete();}}/*** 推送数据封装** @param emitter sse长连接* @param data 发送数据* @param clientId 客户端id*/private void sseSend(SseEmitter emitter, Object data, String clientId) {try {emitter.send(data);log.info("推送数据成功,clientId = {}", clientId);} catch (Exception e) {log.error("推送数据异常", e);throw new RuntimeException("推送数据异常");}}/*** 推送数据封装** @param emitter sse长连接* @param data 发送数据*/private void sseSend(List<SseEmitter> emitter, Object data) {emitter.forEach(e -> {try {e.send(data);} catch (IOException ioException) {log.error("推送数据异常", ioException);}});log.info("推送数据成功");}}
实现效果如下:服务端不断推送数据到前端,前端可以也可以调用接口主动关闭连接。
适用场景:SSE由于是服务端单向通讯,所以适合那种需要单向持久的连接。比如:
- ChatGPT这种实时加载会话数据
- 文件下载,通过SSE异步下载文件
- 服务端实时数据推送