SSE(Server-Send-Event)服务端推送数据技术

SSE(Server-Send-Event)服务端推送数据技术

大家是否遇到过服务端需要主动传输数据到客户端的情况,目前有三种解决方案。

  1. 客户端轮询更新数据。
  2. 服务端与客户端建立 Socket 连接双向通信
  3. 服务端与客户建立 SSE 连接单向通信

几种方案的比较:

  1. 轮询:

    客户端通过频繁请求向服务端请求数据,达到类似实时更新的效果。轮询的优点是实现简单,但是会给服务端和网络带来额外的压力,且延迟较高。

  2. WebSocket连接:

    服务端与客户端建立Socket连接进行数据传输,Socket的传输方式是全双工的。WebSocket是基于 TCP 的长连接,和HTTP 协议相比,它能实现轻量级的、低延迟的数据传输,非常适合实时通信场景,主要用于交互性强的双向通信。

  3. SSE推送:

    SSE(Server-Sent Events)是一种基于 HTTP 协议的推送技术,只允许单向通讯。相较于 WebSocket,SSE 更简单、更轻量级。

下面是SpringBoot使用SSE的步骤和示例代码

  1. 配置依赖

    	    <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-validation</artifactId></dependency>
    

    SSE已经集成到spring-web中,所以可以直接使用。

  2. 后端代码

    import com.wry.wry_test.service.SseService;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.validation.annotation.Validated;
    import org.springframework.web.bind.annotation.*;
    import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import javax.validation.constraints.NotBlank;
    import java.util.concurrent.CompletableFuture;@RestController
    @RequestMapping("/sse")
    @Slf4j
    @Validated
    public class SseTestController {@Autowiredprivate SseService service;@GetMapping("/testSse")public SseEmitter testSse(@RequestParam("clientId") @NotBlank(message = "客户端id不能为空") String clientId) {final SseEmitter emitter = service.getConn(clientId);CompletableFuture.runAsync(() -> {try {service.send(clientId);log.info("建立连接成功!clientId = {}", clientId);} catch (Exception e) {log.error("推送数据异常");}});return emitter;}@GetMapping("/sseConection")public SseEmitter createConnection(@RequestParam("clientId") @NotBlank(message = "客户端id不能为空") String clientId) {return service.getConn(clientId);}@GetMapping("/sendMsg")public void sendMsg(@RequestParam("clientId") String clientId) {try {// 异步发送消息CompletableFuture.runAsync(() -> {try {service.send(clientId);} catch (Exception e) {log.error("推送数据异常");}});} catch (Exception e) {e.printStackTrace();}}@GetMapping("/sendMsgToAll")public void sendMsgToAll() {try {//异步发送消息CompletableFuture.runAsync(() -> {try {service.sendToAll();} catch (Exception e) {e.printStackTrace();}});} catch (Exception e) {e.printStackTrace();}}@GetMapping("closeConn/{clientId}")public String closeConn(@PathVariable("clientId") @NotBlank(message = "客户端id不能为空") String clientId) {service.closeConn(clientId);return "连接已关闭";}}
    package com.wry.wry_test.service;import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import javax.validation.constraints.NotBlank;public interface SseService {/*** 获取连接* @param clientId 客户端id* @return*/SseEmitter getConn(String clientId);/***  发送消息到指定客户端* @param clientId 客户端id* @throws Exception*/void send(String clientId);/*** 发送消息到所有SSE客户端* @throws Exception*/void sendToAll() throws Exception;/*** 关闭指定客户端的连接* @param clientId 客户端id*/void closeConn(String clientId);
    }
    package com.wry.wry_test.service.impl;import com.wry.wry_test.service.SseService;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.stereotype.Service;
    import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import javax.validation.constraints.NotBlank;
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;@Service
    @Slf4j
    public class SseServiceImpl implements SseService {private static final Map<String, SseEmitter> SSE_CACHE = new ConcurrentHashMap<>();@Overridepublic SseEmitter getConn(@NotBlank String clientId) {final SseEmitter sseEmitter = SSE_CACHE.get(clientId);if (sseEmitter != null) {return sseEmitter;} else {// 设置连接超时时间,需要配合配置项 spring.mvc.async.request-timeout: 600000 一起使用final SseEmitter emitter = new SseEmitter(600_000L);// 注册超时回调,超时后触发emitter.onTimeout(() -> {log.info("连接已超时,正准备关闭,clientId = {}", clientId);SSE_CACHE.remove(clientId);});// 注册完成回调,调用 emitter.complete() 触发emitter.onCompletion(() -> {log.info("连接已关闭,正准备释放,clientId = {}", clientId);SSE_CACHE.remove(clientId);log.info("连接已释放,clientId = {}", clientId);});// 注册异常回调,调用 emitter.completeWithError() 触发emitter.onError(throwable -> {log.error("连接已异常,正准备关闭,clientId = {}", clientId, throwable);SSE_CACHE.remove(clientId);});SSE_CACHE.put(clientId, emitter);log.info("建立连接成功!clientId = {}", clientId);return emitter;}}/*** 模拟类似于 chatGPT 的流式推送回答** @param clientId 客户端 id* @throws IOException 异常*/@Overridepublic void send(@NotBlank String clientId) {final SseEmitter emitter = SSE_CACHE.get(clientId);if (emitter == null) return;// 开始推送数据// todo 模拟推送数据for (int i = 0; i < 10000000; i++) {String msg = "SSE 测试数据";try {this.sseSend(emitter, msg, clientId);Thread.sleep(1000);} catch (Exception e) {log.error("推送数据异常", e);break;}}log.info("推送数据结束,clientId = {}", clientId);// 结束推流emitter.complete();}/*** 发送数据给所有连接*/public void sendToAll() {List<SseEmitter> emitters = new ArrayList<>(SSE_CACHE.values());for (int i = 0; i < 10000000; i++) {String msg = "SSE 测试数据";this.sseSend(emitters, msg);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}@Overridepublic void closeConn(@NotBlank String clientId) {final SseEmitter sseEmitter = SSE_CACHE.get(clientId);if (sseEmitter != null) {sseEmitter.complete();}}/*** 推送数据封装** @param emitter  sse长连接* @param data     发送数据* @param clientId 客户端id*/private void sseSend(SseEmitter emitter, Object data, String clientId) {try {emitter.send(data);log.info("推送数据成功,clientId = {}", clientId);} catch (Exception e) {log.error("推送数据异常", e);throw new RuntimeException("推送数据异常");}}/*** 推送数据封装** @param emitter sse长连接* @param data    发送数据*/private void sseSend(List<SseEmitter> emitter, Object data) {emitter.forEach(e -> {try {e.send(data);} catch (IOException ioException) {log.error("推送数据异常", ioException);}});log.info("推送数据成功");}}
    

    实现效果如下:服务端不断推送数据到前端,前端可以也可以调用接口主动关闭连接。

    image-20240710180401231

适用场景:SSE由于是服务端单向通讯,所以适合那种需要单向持久的连接。比如:

  • ChatGPT这种实时加载会话数据
  • 文件下载,通过SSE异步下载文件
  • 服务端实时数据推送

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/44115.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Nginx上配置多个网站

一、需求描述 我们只有一台安装了Nginx的服务器,但是我们需要实现在这台服务器上部署多个网站,用以对外提供服务。 二、Nginx上配置多个网站分析 一般网站的格式为:【http://ip地址:端口号/URI】(比如:http://192.168.3.201:80),IP地址也可用域名表示;那么要实现在Nginx…

i18n、L10n、G11N 和 T9N 的含义

注&#xff1a;机翻&#xff0c;未校对。 Looking into localization for the first time can be terrifying, if only due to all of the abbreviations. But the meaning of i18n, L10n, G11N, and T9N, are all very easy to understand. 第一次研究本地化可能会很可怕&…

如何做一个迟钝不受伤的打工人?

一、背景 在当前激烈的职场环境中&#xff0c;想要成为一个相对“迟钝”且不易受伤的打工人&#xff0c;以下是一些建议&#xff0c;但请注意&#xff0c;这里的“迟钝”并非指智力上的迟钝&#xff0c;而是指在应对复杂人际关系和压力时展现出的豁达与钝感力&#xff1a; 尊重…

【测开能力提升-fastapi框架】fastapi路由分发

1.7 路由分发 apps/app01.py from fastapi import APIRouterapp01 APIRouter()app01.get("/food") async def shop_food():return {"shop": "food"}app01.get("/bed") async def shop_food():return {"shop": "bed&…

js前端隐藏列 并且获取值,列表复选框

列表框 <div class"block" id"psi_wh_allocation_m"><table id"result" class"list auto hover fixed" style"width:100%;border-collapse:collapse"><thead><tr><%--<th></th>--%&…

LabVIEW滤波器性能研究

为了研究滤波器的滤波性能&#xff0c;采用LabVIEW设计了一套滤波器性能研究系统。该系统通过LabVIEW中的波形生成函数&#xff0c;输出幅值及频率可调的正弦波和白噪声两种信号&#xff0c;并将白噪声与正弦波叠加&#xff0c;再通过滤波器输出纯净的正弦波信号。系统通过FFT&…

Python从0到100(三十八):json字符串的数据提取

JSON的数据提取 1.学习目标 掌握JSON相关的方法&#xff08;load, loads, dump, dumps&#xff09;了解JSONPath的使用&#xff08;提取JSON中的数据&#xff09; 2 复习什么是JSON JSON(JavaScript Object Notation) 是一种轻量级的数据交换格式&#xff0c;它使得人们很容…

富文本braft-editor插件分享

效果展示 安装插件 npm install braft-editor 或者 yarn add braft-editor 主要代码 import React, { useState, forwardRef } from react //引入富文本编辑器 import BraftEditor from braft-editor // 引入编辑器样式 import braft-editor/dist/index.css import { B…

thinkphp8框架源码精讲

前言 很开心你能看到这个笔记&#xff0c;相信你对thinkphp是有一定兴趣的&#xff0c;正好大家都是志同道合的人。 thinkphp是我入门学习的第一个框架&#xff0c;经过这么多年了&#xff0c;还没好好的研究它&#xff0c;今年利用了空闲的时间狠狠的深入源码学习了一把&…

硅谷甄选运营平台-vue3组件通信方式

vue3组件通信方式 vue2组件通信方式&#xff1a; props:可以实现父子组件、子父组件、甚至兄弟组件通信自定义事件:可以实现子父组件通信全局事件总线$bus:可以实现任意组件通信pubsub:发布订阅模式实现任意组件通信vuex:集中式状态管理容器&#xff0c;实现任意组件通信ref:父…

camunda最终章-springboot

1.实现并行流子流程 1.画图 2.创建实体 package com.jmj.camunda7test.subProcess.entity;import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor;import java.io.Serializable; import java.util.ArrayList; import java.util.List;Data …

C语言 | Leetcode C语言题解之第230题二叉搜索树中第K小的元素

题目&#xff1a; 题解&#xff1a; /*** Definition for a binary tree node.* struct TreeNode {* int val;* struct TreeNode *left;* struct TreeNode *right;* };*/int search_num(struct TreeNode* root, int k, int *result, int num) {if(num k 1){retu…

FastGPT连接OneAI接入网络模型

文章目录 FastGPT连接OneAI接入网络模型1.准备工作2.开始部署2.1下载 docker-compose.yml2.2修改docker-compose.yml里的参数 3.打开FastGPT添加模型3.1打开OneAPI3.2接入网络模型3.3重启服务 FastGPT连接OneAI接入网络模型 1.准备工作 本文档参考FastGPT的官方文档 主机ip接…

记一次若依框架和Springboot常见报错的实战漏洞挖掘

目录 前言 本次测实战利用图​ 1.判段系统框架 2.登录页面功能点测试 2.1 弱口令 2.2 webpack泄露信息判断 2.3 未授权接口信息发现 3.进一步测试发现新的若依测试点 3.1 默认弱口令 3.2 历史漏洞 4.访问8080端口发现spring经典爆粗 4.1 druid弱口令 4.2 SwaggerU…

浅析Kafka-Stream消息流式处理流程及原理

以下结合案例&#xff1a;统计消息中单词出现次数&#xff0c;来测试并说明kafka消息流式处理的执行流程 Maven依赖 <dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><exclusio…

音频语言学习领域数据集现状、分类及评估

Audio Language Learning (Audio-Text Learning) 是一个新兴的研究领域&#xff0c;专注于处理、理解和描述声音。它的发展动力是机器学习技术的进步以及越来越多地将声音与其相应的文本描述相结合的数据集的可用性。 Audio Language Models (ALMs) 是这个领域的关键技术&#…

MATLAB中的SDPT3、LMILab、SeDuMi工具箱

MATLAB中的SDPT3、LMILab、SeDuMi工具箱都是用于解决特定数学优化问题的工具箱&#xff0c;它们在控制系统设计、机器学习、信号处理等领域有广泛的应用。以下是对这三个工具箱的详细介绍&#xff1a; 1. SDPT3工具箱 简介&#xff1a; SDPT3&#xff08;Semidefinite Progra…

基于QT开发的反射内存小工具

前言 最近项目需要需要开发一个反射内存小工具&#xff0c;经过2天的修修改终于完成了。界面如下&#xff1a; 功能简介 反射内存指定地址数据读取反射内存指定地址数据写入反射内存指定地址数据清理十进制、十六进制、二进制数据相互转换 部分代码 void RfmMain::setWOthe…

攻防世界(PHP过滤器过滤)file_include

转换过滤器官方文档&#xff1a;https://www.php.net/manual/zh/filters.convert.php#filters.convert.iconv 这道题因为convert.base64-encode被过滤掉了&#xff0c;所以使用convert.iconv.*过滤器 在激活 iconv 的前提下可以使用 convert.iconv.* 压缩过滤器&#xff0c; 等…

Win10安装MongoDB(详细版)

文章目录 1、安装MongoDB Server1.1. 下载1.2. 安装 2、手动安装MongoDB Compass(GUI可视工具)2.1. 下载2.2.安装 3、测试连接3.1.MongoDB Compass 连接3.2.使用Navicat连接 1、安装MongoDB Server 1.1. 下载 官网下载地址 https://www.mongodb.com/try/download/community …