什么是SSE
SSE 即服务器发送事件(Server-Sent Events),是一种服务器推送技术,允许服务器在客户端建立连接后,主动向客户端推送数据。
SSE 基于 HTTP 协议,使用简单,具有轻量级、实时性和断线重连等特点。它在一些需要实时数据更新的场景中非常有用,如股票行情、实时通知等。与传统的轮询方式相比,SSE 可以减少不必要的网络请求,提高数据传输效率。
SSE 的主要优点包括:
实时性:服务器可以实时推送数据到客户端,无需客户端不断轮询。
轻量级:SSE 使用简单的文本协议,数据量小,对网络带宽要求较低。
兼容性好:SSE 基于 HTTP 协议,大多数现代浏览器都支持。
易于实现:服务器端和客户端的实现都相对简单。
然而,SSE 也有一些局限性:
单向通信:SSE 只允许服务器向客户端推送数据,客户端无法直接向服务器发送数据。
支持的浏览器有限:虽然大多数现代浏览器支持 SSE,但一些较旧的浏览器可能不支持。
数据格式受限:SSE 通常只能传输文本数据,对于二进制数据的支持有限。
与 HTTP 相比,SSE 提供了更高效的实时数据推送机制,减少了不必要的请求和响应,降低了服务器负载。但 HTTP 更适合一般性的请求-响应模式的数据传输。
SSE WebSocket 对比
SSE 的优点:
- 简单易用:SSE 使用标准的 HTTP 协议,实现相对简单,不需要复杂的握手和协议转换。
- 单向通信:适合只需从服务器向客户端推送数据的场景,减少了不必要的双向通信开销。
- 低延迟:由于基于 HTTP 协议,数据可以在服务器有新数据时立即推送,延迟较低。
- 兼容性好:大多数现代浏览器都支持 SSE,不需要特殊的插件或扩展。
- 轻量级:相比 WebSocket,SSE 的实现相对较轻量,对服务器资源的消耗较少。
- 自动重连:如果连接中断,SSE 会自动尝试重新连接,确保数据的持续推送。
SSE 的缺点:
- 单向通信限制:SSE 只支持服务器向客户端发送数据,客户端无法向服务器发送数据。
- 数据格式受限:SSE 通常只能发送文本数据,对于二进制数据的支持有限。
- 连接管理:每个 SSE 连接在每次数据推送后都会关闭,然后需要重新建立连接,这可能会导致一些额外的开销。
** WebSocket 的优点:**
- 全双工通信:支持双向通信,客户端和服务器可以随时互相发送数据,适用于实时交互性较高的应用。
- 低延迟:建立连接后,数据可以实时传输,延迟较低。
- 二进制支持:WebSocket 可以发送文本和二进制数据,更适合处理多媒体等二进制数据。
- 较少的 HTTP 开销:由于建立了持久连接,减少了 HTTP 请求头和响应头的开销。
WebSocket 的缺点:
- 协议复杂性:WebSocket 协议相对较复杂,需要更多的代码和服务器资源来处理连接和数据传输。
- 兼容性问题:虽然大多数现代浏览器支持 WebSocket,但在一些旧版本的浏览器或特定环境中可能存在兼容性问题。
- 安全风险:由于 WebSocket 可以实现双向通信,需要注意安全问题,如防止跨站脚本攻击(XSS)和跨站请求伪造(CSRF)。
- 服务器资源消耗:相比 SSE,WebSocket 可能会消耗更多的服务器资源,特别是在处理大量并发连接时。
SSE 适用于简单的单向数据推送场景,如新闻更新、实时通知等,而 WebSocket 更适合需要双向实时通信的场景,如在线聊天、实时游戏等。在选择使用哪种技术时,需要根据具体的应用需求、浏览器兼容性和服务器资源等因素进行综合考虑
效果演示
话不多说。直接上代码
Controller
package cn.ideamake.feishu.web.controller.sse;import cn.hutool.core.thread.ThreadUtil;
import cn.ideamake.common.response.Result;
import cn.ideamake.feishu.pojo.dto.SseMessageDTO;
import cn.ideamake.feishu.service.sse.SseService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import javax.validation.Valid;/*** @author Barcke* @version 1.0* @projectName feishu-application* @className SseController* @date 2024/6/5 10:14* @slogan: 源于生活 高于生活* @description:**/
@Slf4j
@RequiredArgsConstructor
@RestController
@RequestMapping("/sse")
@Validated
public class SseController {private final SseService sseService;/*** 创建sse链接* @param clientId* @return*/@GetMapping("/createConnect")public SseEmitter createConnect(String clientId) {return sseService.createConnect(clientId);}/*** 给所有客户端发送消息* @param msg* @return*/@PostMapping("/broadcast")public Result<Boolean> sendMessageToAllClient(@RequestBody String msg) {ThreadUtil.execute(() -> {sseService.sendMessageToAllClient(msg);});return Result.ok(true);}/*** 给指定端发送消息* @param sseMessageDTO* @return*/@PostMapping("/sendMessage")public Result<Boolean> sendMessageToOneClient(@RequestBody @Valid SseMessageDTO sseMessageDTO) {ThreadUtil.execute(() -> {sseService.sendMessageToOneClient(sseMessageDTO.getClientId(), sseMessageDTO.getData());});return Result.ok(true);}/*** 关闭链接* @param clientId* @return*/@GetMapping("/closeConnect")public Result<Boolean> closeConnect(@RequestParam("clientId") String clientId) {ThreadUtil.execute(() -> {sseService.closeConnect(clientId);});return Result.ok(true);}}
Service
package cn.ideamake.feishu.service.sse;import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;/*** @author Barcke* @version 1.0* @projectName feishu-application* @className SseService* @date 2024/6/5 10:18* @slogan: 源于生活 高于生活* @description:**/
public interface SseService {/*** 创建连接** @param clientId 客户端ID*/SseEmitter createConnect(String clientId);/*** 根据客户端id获取SseEmitter对象** @param clientId 客户端ID*/SseEmitter getSseEmitterByClientId(String clientId);/*** 发送消息给所有客户端** @param msg 消息内容*/void sendMessageToAllClient(String msg);/*** 给指定客户端发送消息** @param clientId 客户端ID* @param msg 消息内容*/void sendMessageToOneClient(String clientId, String msg);/*** 关闭连接** @param clientId 客户端ID*/void closeConnect(String clientId);}
ServiceImpl
package cn.ideamake.feishu.service.sse.impl;import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.http.HttpStatus;
import cn.ideamake.feishu.pojo.dto.SseMessageDTO;
import cn.ideamake.feishu.service.sse.SseService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.io.IOException;
import java.util.Map;
import java.util.function.Consumer;/*** @author Barcke* @version 1.0* @projectName feishu-application* @className SseServiceImpl* @date 2024/6/5 10:18* @slogan: 源于生活 高于生活* @description:**/
@Slf4j
@RequiredArgsConstructor
@Service
public class SseServiceImpl implements SseService {/*** 容器,保存连接,用于输出返回 ;可使用其他方法实现*/private static final Map<String, SseEmitter> SSE_CACHE = MapUtil.newConcurrentHashMap();/*** 重试次数*/private final Integer RESET_COUNT = 3;/*** 重试等待事件 单位 ms*/private final Integer RESET_TIME = 5000;/*** 根据客户端id获取SseEmitter对象** @param clientId 客户端ID*/@Overridepublic SseEmitter getSseEmitterByClientId(String clientId) {return SSE_CACHE.get(clientId);}/*** 创建连接** @param clientId 客户端ID*/@Overridepublic SseEmitter createConnect(String clientId) {// 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutExceptionSseEmitter sseEmitter = new SseEmitter(0L);// 是否需要给客户端推送IDif (StrUtil.isBlank(clientId)) {clientId = IdUtil.simpleUUID();}// 注册回调// 长链接完成后回调接口(即关闭连接时调用)sseEmitter.onCompletion(completionCallBack(clientId));// 连接超时回调sseEmitter.onTimeout(timeoutCallBack(clientId));// 推送消息异常时,回调方法sseEmitter.onError(errorCallBack(clientId));SSE_CACHE.put(clientId, sseEmitter);log.info("创建新的sse连接,当前用户:{} 累计用户:{}", clientId, SSE_CACHE.size());try {// 注册成功返回用户信息sseEmitter.send(SseEmitter.event().id(String.valueOf(HttpStatus.HTTP_CREATED)).data(clientId, MediaType.APPLICATION_JSON));} catch (IOException e) {log.error("创建长链接异常,客户端ID:{} 异常信息:{}", clientId, e.getMessage());}return sseEmitter;}/*** 发送消息给所有客户端** @param msg 消息内容*/@Overridepublic void sendMessageToAllClient(String msg) {if (MapUtil.isEmpty(SSE_CACHE) || StringUtils.isBlank(msg)) {return;}// 判断发送的消息是否为空for (Map.Entry<String, SseEmitter> entry : SSE_CACHE.entrySet()) {SseMessageDTO sseMessageDTO = new SseMessageDTO();sseMessageDTO.setClientId(entry.getKey());sseMessageDTO.setData(msg);sendMsgToClientByClientId(entry.getKey(), sseMessageDTO, entry.getValue());}}/*** 给指定客户端发送消息** @param clientId 客户端ID* @param msg 消息内容*/@Overridepublic void sendMessageToOneClient(String clientId, String msg) {SseMessageDTO sseMessageDTO = new SseMessageDTO(clientId, msg);sendMsgToClientByClientId(clientId, sseMessageDTO, SSE_CACHE.get(clientId));}/*** 关闭连接** @param clientId 客户端ID*/@Overridepublic void closeConnect(String clientId) {SseEmitter sseEmitter = SSE_CACHE.get(clientId);if (sseEmitter != null) {sseEmitter.complete();removeUser(clientId);}}/*** 推送消息到客户端* 此处做了推送失败后,重试推送机制,可根据自己业务进行修改** @param clientId 客户端ID* @param sseMessageDTO 推送信息,此处结合具体业务,定义自己的返回值即可**/private void sendMsgToClientByClientId(String clientId, SseMessageDTO sseMessageDTO, SseEmitter sseEmitter) {if (sseEmitter == null) {log.error("推送消息失败:客户端{}未创建长链接,失败消息:{}", clientId, sseMessageDTO);return;}SseEmitter.SseEventBuilder sendData = SseEmitter.event().id(String.valueOf(HttpStatus.HTTP_OK)).data(sseMessageDTO, MediaType.APPLICATION_JSON);try {sseEmitter.send(sendData);} catch (IOException e) {// 推送消息失败,记录错误日志,进行重推log.error("推送消息失败:{},尝试进行重推", sseMessageDTO);boolean isSuccess = true;// 推送消息失败后,每隔10s推送一次,推送5次for (int i = 0; i < RESET_COUNT; i++) {try {Thread.sleep(RESET_TIME);sseEmitter = SSE_CACHE.get(clientId);if (sseEmitter == null) {log.error("{}的第{}次消息重推失败,未创建长链接", clientId, i + 1);continue;}sseEmitter.send(sendData);} catch (Exception ex) {log.error("{}的第{}次消息重推失败", clientId, i + 1, ex);continue;}log.info("{}的第{}次消息重推成功,{}", clientId, i + 1, sseMessageDTO);return;}}}/*** 长链接完成后回调接口(即关闭连接时调用)** @param clientId 客户端ID**/private Runnable completionCallBack(String clientId) {return () -> {log.info("结束连接:{}", clientId);removeUser(clientId);};}/*** 连接超时时调用** @param clientId 客户端ID**/private Runnable timeoutCallBack(String clientId) {return () -> {log.info("连接超时:{}", clientId);removeUser(clientId);};}/*** 推送消息异常时,回调方法** @param clientId 客户端ID**/private Consumer<Throwable> errorCallBack(String clientId) {return throwable -> {log.error("SseEmitterServiceImpl[errorCallBack]:连接异常,客户端ID:{}", clientId);// 推送消息失败后,每隔10s推送一次,推送5次for (int i = 0; i < RESET_COUNT; i++) {try {Thread.sleep(RESET_TIME);SseEmitter sseEmitter = SSE_CACHE.get(clientId);if (sseEmitter == null) {log.error("SseEmitterServiceImpl[errorCallBack]:第{}次消息重推失败,未获取到 {} 对应的长链接", i + 1, clientId);continue;}sseEmitter.send("失败后重新推送");} catch (Exception e) {log.error("sse推送消息异常", e);}}};}/*** 移除用户连接** @param clientId 客户端ID**/private void removeUser(String clientId) {SSE_CACHE.remove(clientId);log.info("SseEmitterServiceImpl[removeUser]:移除用户:{}", clientId);}
}
DTO
package cn.ideamake.feishu.pojo.dto;import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;import javax.validation.constraints.NotNull;/*** @author Barcke* @version 1.0* @projectName feishu-application* @className SseMessageDTO* @date 2024/6/5 10:19* @slogan: 源于生活 高于生活* @description:**/
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
@Accessors(chain = true)
public class SseMessageDTO {/*** 客户端id*/@NotNull(message = "客户端id 不能为空")private String clientId;/*** 传输数据体(json)*/private String data;}