Spring 使用SSE(Server-Sent Events)学习

什么是SSE

SSE 即服务器发送事件(Server-Sent Events),是一种服务器推送技术,允许服务器在客户端建立连接后,主动向客户端推送数据。

SSE 基于 HTTP 协议,使用简单,具有轻量级、实时性和断线重连等特点。它在一些需要实时数据更新的场景中非常有用,如股票行情、实时通知等。与传统的轮询方式相比,SSE 可以减少不必要的网络请求,提高数据传输效率。

SSE 的主要优点包括:

实时性:服务器可以实时推送数据到客户端,无需客户端不断轮询。
轻量级:SSE 使用简单的文本协议,数据量小,对网络带宽要求较低。
兼容性好:SSE 基于 HTTP 协议,大多数现代浏览器都支持。
易于实现:服务器端和客户端的实现都相对简单。

然而,SSE 也有一些局限性:

单向通信:SSE 只允许服务器向客户端推送数据,客户端无法直接向服务器发送数据。
支持的浏览器有限:虽然大多数现代浏览器支持 SSE,但一些较旧的浏览器可能不支持。
数据格式受限:SSE 通常只能传输文本数据,对于二进制数据的支持有限。

与 HTTP 相比,SSE 提供了更高效的实时数据推送机制,减少了不必要的请求和响应,降低了服务器负载。但 HTTP 更适合一般性的请求-响应模式的数据传输。

SSE WebSocket 对比

SSE 的优点:

  • 简单易用:SSE 使用标准的 HTTP 协议,实现相对简单,不需要复杂的握手和协议转换。
  • 单向通信:适合只需从服务器向客户端推送数据的场景,减少了不必要的双向通信开销。
  • 低延迟:由于基于 HTTP 协议,数据可以在服务器有新数据时立即推送,延迟较低。
  • 兼容性好:大多数现代浏览器都支持 SSE,不需要特殊的插件或扩展。
  • 轻量级:相比 WebSocket,SSE 的实现相对较轻量,对服务器资源的消耗较少。
  • 自动重连:如果连接中断,SSE 会自动尝试重新连接,确保数据的持续推送。

SSE 的缺点:

  • 单向通信限制:SSE 只支持服务器向客户端发送数据,客户端无法向服务器发送数据。
  • 数据格式受限:SSE 通常只能发送文本数据,对于二进制数据的支持有限。
  • 连接管理:每个 SSE 连接在每次数据推送后都会关闭,然后需要重新建立连接,这可能会导致一些额外的开销。

** WebSocket 的优点:**

  • 全双工通信:支持双向通信,客户端和服务器可以随时互相发送数据,适用于实时交互性较高的应用。
  • 低延迟:建立连接后,数据可以实时传输,延迟较低。
  • 二进制支持:WebSocket 可以发送文本和二进制数据,更适合处理多媒体等二进制数据。
  • 较少的 HTTP 开销:由于建立了持久连接,减少了 HTTP 请求头和响应头的开销。

WebSocket 的缺点:

  • 协议复杂性:WebSocket 协议相对较复杂,需要更多的代码和服务器资源来处理连接和数据传输。
  • 兼容性问题:虽然大多数现代浏览器支持 WebSocket,但在一些旧版本的浏览器或特定环境中可能存在兼容性问题。
  • 安全风险:由于 WebSocket 可以实现双向通信,需要注意安全问题,如防止跨站脚本攻击(XSS)和跨站请求伪造(CSRF)。
  • 服务器资源消耗:相比 SSE,WebSocket 可能会消耗更多的服务器资源,特别是在处理大量并发连接时。

SSE 适用于简单的单向数据推送场景,如新闻更新、实时通知等,而 WebSocket 更适合需要双向实时通信的场景,如在线聊天、实时游戏等。在选择使用哪种技术时,需要根据具体的应用需求、浏览器兼容性和服务器资源等因素进行综合考虑

效果演示

话不多说。直接上代码

Controller

package cn.ideamake.feishu.web.controller.sse;import cn.hutool.core.thread.ThreadUtil;
import cn.ideamake.common.response.Result;
import cn.ideamake.feishu.pojo.dto.SseMessageDTO;
import cn.ideamake.feishu.service.sse.SseService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import javax.validation.Valid;/*** @author Barcke* @version 1.0* @projectName feishu-application* @className SseController* @date 2024/6/5 10:14* @slogan: 源于生活 高于生活* @description:**/
@Slf4j
@RequiredArgsConstructor
@RestController
@RequestMapping("/sse")
@Validated
public class SseController {private final SseService sseService;/*** 创建sse链接* @param clientId* @return*/@GetMapping("/createConnect")public SseEmitter createConnect(String clientId) {return sseService.createConnect(clientId);}/*** 给所有客户端发送消息* @param msg* @return*/@PostMapping("/broadcast")public Result<Boolean> sendMessageToAllClient(@RequestBody String msg) {ThreadUtil.execute(() -> {sseService.sendMessageToAllClient(msg);});return Result.ok(true);}/*** 给指定端发送消息* @param sseMessageDTO* @return*/@PostMapping("/sendMessage")public Result<Boolean> sendMessageToOneClient(@RequestBody @Valid SseMessageDTO sseMessageDTO) {ThreadUtil.execute(() -> {sseService.sendMessageToOneClient(sseMessageDTO.getClientId(), sseMessageDTO.getData());});return Result.ok(true);}/*** 关闭链接* @param clientId* @return*/@GetMapping("/closeConnect")public Result<Boolean> closeConnect(@RequestParam("clientId") String clientId) {ThreadUtil.execute(() -> {sseService.closeConnect(clientId);});return Result.ok(true);}}

Service

package cn.ideamake.feishu.service.sse;import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;/*** @author Barcke* @version 1.0* @projectName feishu-application* @className SseService* @date 2024/6/5 10:18* @slogan: 源于生活 高于生活* @description:**/
public interface SseService {/*** 创建连接** @param clientId 客户端ID*/SseEmitter createConnect(String clientId);/*** 根据客户端id获取SseEmitter对象** @param clientId 客户端ID*/SseEmitter getSseEmitterByClientId(String clientId);/*** 发送消息给所有客户端** @param msg 消息内容*/void sendMessageToAllClient(String msg);/*** 给指定客户端发送消息** @param clientId 客户端ID* @param msg      消息内容*/void sendMessageToOneClient(String clientId, String msg);/*** 关闭连接** @param clientId 客户端ID*/void closeConnect(String clientId);}

ServiceImpl

package cn.ideamake.feishu.service.sse.impl;import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.http.HttpStatus;
import cn.ideamake.feishu.pojo.dto.SseMessageDTO;
import cn.ideamake.feishu.service.sse.SseService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.io.IOException;
import java.util.Map;
import java.util.function.Consumer;/*** @author Barcke* @version 1.0* @projectName feishu-application* @className SseServiceImpl* @date 2024/6/5 10:18* @slogan: 源于生活 高于生活* @description:**/
@Slf4j
@RequiredArgsConstructor
@Service
public class SseServiceImpl implements SseService {/*** 容器,保存连接,用于输出返回 ;可使用其他方法实现*/private static final Map<String, SseEmitter> SSE_CACHE = MapUtil.newConcurrentHashMap();/*** 重试次数*/private final Integer RESET_COUNT = 3;/*** 重试等待事件 单位 ms*/private final Integer RESET_TIME = 5000;/*** 根据客户端id获取SseEmitter对象** @param clientId 客户端ID*/@Overridepublic SseEmitter getSseEmitterByClientId(String clientId) {return SSE_CACHE.get(clientId);}/*** 创建连接** @param clientId 客户端ID*/@Overridepublic SseEmitter createConnect(String clientId) {// 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutExceptionSseEmitter sseEmitter = new SseEmitter(0L);// 是否需要给客户端推送IDif (StrUtil.isBlank(clientId)) {clientId = IdUtil.simpleUUID();}// 注册回调// 长链接完成后回调接口(即关闭连接时调用)sseEmitter.onCompletion(completionCallBack(clientId));// 连接超时回调sseEmitter.onTimeout(timeoutCallBack(clientId));// 推送消息异常时,回调方法sseEmitter.onError(errorCallBack(clientId));SSE_CACHE.put(clientId, sseEmitter);log.info("创建新的sse连接,当前用户:{}    累计用户:{}", clientId, SSE_CACHE.size());try {// 注册成功返回用户信息sseEmitter.send(SseEmitter.event().id(String.valueOf(HttpStatus.HTTP_CREATED)).data(clientId, MediaType.APPLICATION_JSON));} catch (IOException e) {log.error("创建长链接异常,客户端ID:{}   异常信息:{}", clientId, e.getMessage());}return sseEmitter;}/*** 发送消息给所有客户端** @param msg 消息内容*/@Overridepublic void sendMessageToAllClient(String msg) {if (MapUtil.isEmpty(SSE_CACHE) || StringUtils.isBlank(msg)) {return;}// 判断发送的消息是否为空for (Map.Entry<String, SseEmitter> entry : SSE_CACHE.entrySet()) {SseMessageDTO sseMessageDTO = new SseMessageDTO();sseMessageDTO.setClientId(entry.getKey());sseMessageDTO.setData(msg);sendMsgToClientByClientId(entry.getKey(), sseMessageDTO, entry.getValue());}}/*** 给指定客户端发送消息** @param clientId 客户端ID* @param msg      消息内容*/@Overridepublic void sendMessageToOneClient(String clientId, String msg) {SseMessageDTO sseMessageDTO = new SseMessageDTO(clientId, msg);sendMsgToClientByClientId(clientId, sseMessageDTO, SSE_CACHE.get(clientId));}/*** 关闭连接** @param clientId 客户端ID*/@Overridepublic void closeConnect(String clientId) {SseEmitter sseEmitter = SSE_CACHE.get(clientId);if (sseEmitter != null) {sseEmitter.complete();removeUser(clientId);}}/*** 推送消息到客户端* 此处做了推送失败后,重试推送机制,可根据自己业务进行修改** @param clientId  客户端ID* @param sseMessageDTO 推送信息,此处结合具体业务,定义自己的返回值即可**/private void sendMsgToClientByClientId(String clientId, SseMessageDTO sseMessageDTO, SseEmitter sseEmitter) {if (sseEmitter == null) {log.error("推送消息失败:客户端{}未创建长链接,失败消息:{}", clientId, sseMessageDTO);return;}SseEmitter.SseEventBuilder sendData = SseEmitter.event().id(String.valueOf(HttpStatus.HTTP_OK)).data(sseMessageDTO, MediaType.APPLICATION_JSON);try {sseEmitter.send(sendData);} catch (IOException e) {// 推送消息失败,记录错误日志,进行重推log.error("推送消息失败:{},尝试进行重推", sseMessageDTO);boolean isSuccess = true;// 推送消息失败后,每隔10s推送一次,推送5次for (int i = 0; i < RESET_COUNT; i++) {try {Thread.sleep(RESET_TIME);sseEmitter = SSE_CACHE.get(clientId);if (sseEmitter == null) {log.error("{}的第{}次消息重推失败,未创建长链接", clientId, i + 1);continue;}sseEmitter.send(sendData);} catch (Exception ex) {log.error("{}的第{}次消息重推失败", clientId, i + 1, ex);continue;}log.info("{}的第{}次消息重推成功,{}", clientId, i + 1, sseMessageDTO);return;}}}/*** 长链接完成后回调接口(即关闭连接时调用)** @param clientId 客户端ID**/private Runnable completionCallBack(String clientId) {return () -> {log.info("结束连接:{}", clientId);removeUser(clientId);};}/*** 连接超时时调用** @param clientId 客户端ID**/private Runnable timeoutCallBack(String clientId) {return () -> {log.info("连接超时:{}", clientId);removeUser(clientId);};}/*** 推送消息异常时,回调方法** @param clientId 客户端ID**/private Consumer<Throwable> errorCallBack(String clientId) {return throwable -> {log.error("SseEmitterServiceImpl[errorCallBack]:连接异常,客户端ID:{}", clientId);// 推送消息失败后,每隔10s推送一次,推送5次for (int i = 0; i < RESET_COUNT; i++) {try {Thread.sleep(RESET_TIME);SseEmitter sseEmitter = SSE_CACHE.get(clientId);if (sseEmitter == null) {log.error("SseEmitterServiceImpl[errorCallBack]:第{}次消息重推失败,未获取到 {} 对应的长链接", i + 1, clientId);continue;}sseEmitter.send("失败后重新推送");} catch (Exception e) {log.error("sse推送消息异常", e);}}};}/*** 移除用户连接** @param clientId 客户端ID**/private void removeUser(String clientId) {SSE_CACHE.remove(clientId);log.info("SseEmitterServiceImpl[removeUser]:移除用户:{}", clientId);}
}

DTO

package cn.ideamake.feishu.pojo.dto;import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;import javax.validation.constraints.NotNull;/*** @author Barcke* @version 1.0* @projectName feishu-application* @className SseMessageDTO* @date 2024/6/5 10:19* @slogan: 源于生活 高于生活* @description:**/
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
@Accessors(chain = true)
public class SseMessageDTO {/*** 客户端id*/@NotNull(message = "客户端id 不能为空")private String clientId;/*** 传输数据体(json)*/private String data;}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/848392.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Go微服务: 基于rocketmq:5.2.0搭建RocketMQ环境,以及示例参考

概述 参考最新官方文档&#xff1a;https://rocketmq.apache.org/zh/docs/quickStart/03quickstartWithDockercompose以及&#xff1a;https://rocketmq.apache.org/zh/docs/deploymentOperations/04Dashboard综合以上两个文档来搭建环境 搭建RocketMQ环境 1 ) 基于 docker-c…

速通数据挖掘课程

速通 数据挖掘课程 大的分类 标签预测&#xff08;分类&#xff09; 和 数值预测&#xff08;预测呀&#xff09; 监督 非监督 是否 需要预先训练模型 然后预测 聚类&#xff1a;拿一个比一个&#xff0c;看看相似否&#xff0c;然后归一类 数据四种类型 数据属性有四种&…

simplicity studio 5 修改设备电压

工装板的soc额定输入电压为1.5v&#xff0c;而常态下ttl高电平为5v/3.3v&#xff0c;所以需要设定烧录程序时的设备电压。 确保连接设备&#xff0c;并且被识别。 进入管理员模式。 烧录.hex文件快捷方法。

人工智能芯片封装技术及应用趋势分析

简介人工智能&#xff08;AI&#xff09;、物联网&#xff08;IoT&#xff09;和大数据的融合正在开创全新的智能时代&#xff0c;以智能解决方案改变各行各业。人工智能芯片在支持人工智能学习和推理计算方面发挥着非常重要的作用&#xff0c;可实现各行各业的多样化应用。 本…

HTML+CSS+JS 动态登录表单

效果演示 实现了一个登录表单的背景动画效果,包括一个渐变背景、一个输入框和一个登录按钮。背景动画由多个不同大小和颜色的正方形组成,它们在页面上以不同的速度和方向移动。当用户成功登录后,标题会向上移动,表单会消失。 Code <!DOCTYPE html> <html lang=&q…

电阻、电容和电感测试仪设计

在现代化生产、学习、实验当中,往往需要对某个元器件的具体参数进行测量,在这之中万用表以其简单易用,功耗低等优点被大多数人所选择使用。然而万用表有一定的局限性,比如:不能够测量电感,而且容量稍大的电容也显得无能为力。所以制作一个简单易用的电抗元器件测量仪是很…

智能视频监控平台LntonCVS视频汇聚共享平台智慧楼宇应用方案

随着城市经济的迅速发展&#xff0c;大中型城市的写字楼数量不断增加。在像香港、台北、上海、北京等大城市&#xff0c;写字楼的安保成本相当高。为了降低这一成本&#xff0c;越来越多的物业公司开始采用技术手段。写字楼安防监控系统便是其中之一&#xff0c;它利用安全防范…

【control_manager】无法加载,gazebo_ros2_control 0.4.8,机械臂乱飞

删除URDF和SDRF文件中的特殊注释#, !,&#xff1a; xacro文件解析为字符串时出现报错 一开始疯狂报错Waiting for /controller_manager node to exist 1717585645.4673686 [spawner-2] [INFO] [1717585645.467015300] [spawner_joint_state_broadcaster]: Waiting for /con…

这家公司的39亿存款,无法收回了?

新闻提要 4日晚间&#xff0c;亿利洁能发布公告称&#xff0c;亿利财务公司对于公司存放在亿利财务公司的 39.06 亿元货币资金的用途主要是向亿利集团及其关联方发放贷款&#xff0c;近日公司获悉相关贷款已被划分为次级贷款&#xff08;不良贷款的一种&#xff09;&#xff0…

Pinterest免费引流实操演示

这篇文章中你将了解到 1.Pinterest网站介绍&#xff0c;用户群体&#xff0c;适合做什么品类。 2.现在的商家都在上面做什么&#xff1f;案例展示。 3.我们在这个站免费引流要怎么做以及注意事项。 1.Pinterest网站介绍&#xff0c;用户群体&#xff0c;适合做什么品类。 P…

Nginx配置详细解释

文章目录 一、配置详细解释关闭版本修改启动的进程数cpu与work进程绑定nginx进程的优先级work进程打开的文件个数event事件 二、Http设置协议配置说明mime虚拟主机aliaslocationaccess模块验证模块自定义错误页面自定义日志存放位置try_files检测文件是否存在长连接 一、配置详…

附件下载跨域问题-解决

1.若依附件下载跨域 源码&#xff1a; package com.ruoyi.common.utils.file;import java.io.*; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; i…

真实场景 这周的任意一天,获取上周一到周日的时间范围-作者:【小可耐教你学影刀RPA】

用户场景 我想在这周的任意一天&#xff0c;获取上周一到周日的时间范围&#xff0c;应该怎么做 解决办法1 用指令解决 最简单 解决办法2 自己写逻辑 不过要用到 获取当前日期指令 当前是礼拜几

Hugging Face系列2:详细剖析Hugging Face网站资源——实战六类开源库

Hugging Face系列2&#xff1a;详细剖析Hugging Face网站资源——实战六类开源库 前言本篇摘要2. Hugging Face开源库2.1 transformers2.1.1 简介2.1.2 实战1. 文本分类2. 图像识别3. 在Pytorch和TensorFlow中使用pipeline 2.2 diffusers2.2.1 简介2.2.2 实战1. 管线2. 模型和调…

俄罗斯ozon平台计算器,ozon定价计算器

在数字化飞速发展的今天&#xff0c;电商平台已成为商家们展示产品、吸引顾客的重要窗口。而在俄罗斯这一广阔的市场中&#xff0c;Ozon平台以其独特的优势&#xff0c;成为了众多电商卖家的首选。然而&#xff0c;想要在Ozon平台上脱颖而出&#xff0c;除了优质的产品和服务外…

初识C++ · 反向迭代器简介

目录 前言 反向迭代器的实现 前言 继模拟实现了list和vector之后&#xff0c;我们对迭代器的印象也是加深了许多&#xff0c;但是我们实现的都是正向迭代器&#xff0c;还没有实现反向迭代器&#xff0c;那么为什么迟迟不实现呢&#xff1f;因为难吗&#xff1f;实际上还好。…

《精品生活》万方普刊投稿发表简介

《精品生活》杂志是由国家新闻出版总署批准&#xff0c;南方出版传媒股份有限公司主管&#xff0c;广东大沿海出版工贸有限公司主办&#xff0c;广东精品生活杂志社出版的综合性文化期刊。主要栏目&#xff1a;教学研究、艺术教育、文化广角、民族文化、理论前沿、综合论坛。 刊…

【2024】Kafka Streams纤细介绍与具体使用(1)

目录 介绍关键特性应用场景核心概念部署方式kafka streams的处理模式 具体使用1、准备工作2、添加依赖3、代码实现3、测试 介绍 Kafka Streams是构建在Apache Kafka之上的客户端库&#xff0c;用于构建高效、实时的流处理应用。它允许你以高吞吐量和低延迟的方式处理记录流&am…

Prompt 指南之零样本与少样本提示,超详细解析!

前言 我将在本文中为你带来另外 2 种提示技术&#xff0c;它们分别是&#xff1a; 零样本提示&#xff08;Zero-shot Prompting&#xff09;少样本提示&#xff08;Few-shot Prompting&#xff09; shot 即代表示例 这两种技术利用 LLM 的强大预训练知识&#xff0c;通过最小…

云端狂飙:Django项目部署与性能优化的极速之旅

Hello&#xff0c;我是阿佑&#xff0c;这次阿佑将手把手带你亲自踏上Django项目从单机到云端的全过程&#xff0c;以及如何通过Docker实现项目的无缝迁移和扩展。不仅详细介绍了Docker的基本概念和操作&#xff0c;还深入探讨Docker Compose、Swarm和Kubernetes等高级工具的使…