SSE(Server-Send-Event)服务端推送数据技术

SSE(Server-Send-Event)服务端推送数据技术

大家是否遇到过服务端需要主动传输数据到客户端的情况,目前有三种解决方案。

  1. 客户端轮询更新数据。
  2. 服务端与客户端建立 Socket 连接双向通信
  3. 服务端与客户建立 SSE 连接单向通信

几种方案的比较:

  1. 轮询:

    客户端通过频繁请求向服务端请求数据,达到类似实时更新的效果。轮询的优点是实现简单,但是会给服务端和网络带来额外的压力,且延迟较高。

  2. WebSocket连接:

    服务端与客户端建立Socket连接进行数据传输,Socket的传输方式是全双工的。WebSocket是基于 TCP 的长连接,和HTTP 协议相比,它能实现轻量级的、低延迟的数据传输,非常适合实时通信场景,主要用于交互性强的双向通信。

  3. SSE推送:

    SSE(Server-Sent Events)是一种基于 HTTP 协议的推送技术,只允许单向通讯。相较于 WebSocket,SSE 更简单、更轻量级。

下面是SpringBoot使用SSE的步骤和示例代码

  1. 配置依赖

    	    <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-validation</artifactId></dependency>
    

    SSE已经集成到spring-web中,所以可以直接使用。

  2. 后端代码

    import com.wry.wry_test.service.SseService;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.validation.annotation.Validated;
    import org.springframework.web.bind.annotation.*;
    import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import javax.validation.constraints.NotBlank;
    import java.util.concurrent.CompletableFuture;@RestController
    @RequestMapping("/sse")
    @Slf4j
    @Validated
    public class SseTestController {@Autowiredprivate SseService service;@GetMapping("/testSse")public SseEmitter testSse(@RequestParam("clientId") @NotBlank(message = "客户端id不能为空") String clientId) {final SseEmitter emitter = service.getConn(clientId);CompletableFuture.runAsync(() -> {try {service.send(clientId);log.info("建立连接成功!clientId = {}", clientId);} catch (Exception e) {log.error("推送数据异常");}});return emitter;}@GetMapping("/sseConection")public SseEmitter createConnection(@RequestParam("clientId") @NotBlank(message = "客户端id不能为空") String clientId) {return service.getConn(clientId);}@GetMapping("/sendMsg")public void sendMsg(@RequestParam("clientId") String clientId) {try {// 异步发送消息CompletableFuture.runAsync(() -> {try {service.send(clientId);} catch (Exception e) {log.error("推送数据异常");}});} catch (Exception e) {e.printStackTrace();}}@GetMapping("/sendMsgToAll")public void sendMsgToAll() {try {//异步发送消息CompletableFuture.runAsync(() -> {try {service.sendToAll();} catch (Exception e) {e.printStackTrace();}});} catch (Exception e) {e.printStackTrace();}}@GetMapping("closeConn/{clientId}")public String closeConn(@PathVariable("clientId") @NotBlank(message = "客户端id不能为空") String clientId) {service.closeConn(clientId);return "连接已关闭";}}
    package com.wry.wry_test.service;import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import javax.validation.constraints.NotBlank;public interface SseService {/*** 获取连接* @param clientId 客户端id* @return*/SseEmitter getConn(String clientId);/***  发送消息到指定客户端* @param clientId 客户端id* @throws Exception*/void send(String clientId);/*** 发送消息到所有SSE客户端* @throws Exception*/void sendToAll() throws Exception;/*** 关闭指定客户端的连接* @param clientId 客户端id*/void closeConn(String clientId);
    }
    package com.wry.wry_test.service.impl;import com.wry.wry_test.service.SseService;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.stereotype.Service;
    import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import javax.validation.constraints.NotBlank;
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;@Service
    @Slf4j
    public class SseServiceImpl implements SseService {private static final Map<String, SseEmitter> SSE_CACHE = new ConcurrentHashMap<>();@Overridepublic SseEmitter getConn(@NotBlank String clientId) {final SseEmitter sseEmitter = SSE_CACHE.get(clientId);if (sseEmitter != null) {return sseEmitter;} else {// 设置连接超时时间,需要配合配置项 spring.mvc.async.request-timeout: 600000 一起使用final SseEmitter emitter = new SseEmitter(600_000L);// 注册超时回调,超时后触发emitter.onTimeout(() -> {log.info("连接已超时,正准备关闭,clientId = {}", clientId);SSE_CACHE.remove(clientId);});// 注册完成回调,调用 emitter.complete() 触发emitter.onCompletion(() -> {log.info("连接已关闭,正准备释放,clientId = {}", clientId);SSE_CACHE.remove(clientId);log.info("连接已释放,clientId = {}", clientId);});// 注册异常回调,调用 emitter.completeWithError() 触发emitter.onError(throwable -> {log.error("连接已异常,正准备关闭,clientId = {}", clientId, throwable);SSE_CACHE.remove(clientId);});SSE_CACHE.put(clientId, emitter);log.info("建立连接成功!clientId = {}", clientId);return emitter;}}/*** 模拟类似于 chatGPT 的流式推送回答** @param clientId 客户端 id* @throws IOException 异常*/@Overridepublic void send(@NotBlank String clientId) {final SseEmitter emitter = SSE_CACHE.get(clientId);if (emitter == null) return;// 开始推送数据// todo 模拟推送数据for (int i = 0; i < 10000000; i++) {String msg = "SSE 测试数据";try {this.sseSend(emitter, msg, clientId);Thread.sleep(1000);} catch (Exception e) {log.error("推送数据异常", e);break;}}log.info("推送数据结束,clientId = {}", clientId);// 结束推流emitter.complete();}/*** 发送数据给所有连接*/public void sendToAll() {List<SseEmitter> emitters = new ArrayList<>(SSE_CACHE.values());for (int i = 0; i < 10000000; i++) {String msg = "SSE 测试数据";this.sseSend(emitters, msg);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}@Overridepublic void closeConn(@NotBlank String clientId) {final SseEmitter sseEmitter = SSE_CACHE.get(clientId);if (sseEmitter != null) {sseEmitter.complete();}}/*** 推送数据封装** @param emitter  sse长连接* @param data     发送数据* @param clientId 客户端id*/private void sseSend(SseEmitter emitter, Object data, String clientId) {try {emitter.send(data);log.info("推送数据成功,clientId = {}", clientId);} catch (Exception e) {log.error("推送数据异常", e);throw new RuntimeException("推送数据异常");}}/*** 推送数据封装** @param emitter sse长连接* @param data    发送数据*/private void sseSend(List<SseEmitter> emitter, Object data) {emitter.forEach(e -> {try {e.send(data);} catch (IOException ioException) {log.error("推送数据异常", ioException);}});log.info("推送数据成功");}}
    

    实现效果如下:服务端不断推送数据到前端,前端可以也可以调用接口主动关闭连接。

    image-20240710180401231

适用场景:SSE由于是服务端单向通讯,所以适合那种需要单向持久的连接。比如:

  • ChatGPT这种实时加载会话数据
  • 文件下载,通过SSE异步下载文件
  • 服务端实时数据推送

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/44115.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【前端】fis框架学习

文章目录 1. 介绍 1. 介绍 FIS是专为解决前端开发中自动化工具、性能优化、模块化框架、开发规范、代码部署、开发流程等问题的工具框架。 使用FIS我们可以快速的完成各种前端项目的资源压缩、合并等等各种性能优化工作&#xff0c;同时FIS还提供了大量的开发辅助功能 首先我们…

Nginx上配置多个网站

一、需求描述 我们只有一台安装了Nginx的服务器,但是我们需要实现在这台服务器上部署多个网站,用以对外提供服务。 二、Nginx上配置多个网站分析 一般网站的格式为:【http://ip地址:端口号/URI】(比如:http://192.168.3.201:80),IP地址也可用域名表示;那么要实现在Nginx…

QT实现WebSocket通信

文章目录 WebSocket服务端WebSocket客户端html websocket客户端在Qt5中实现WebSocket通信可以通过使用QtWebSockets模块来实现。这个模块提供了一个WebSocket客户端和服务器的实现,可以很方便地在你的应用程序中集成WebSocket功能。 使用的时候,首先在pro工程文件中添加对应的…

【Vue】vue-element-admin概述

一、项目简介 定位&#xff1a;vue-element-admin是一个后台集成解决方案&#xff0c;旨在提供一种快速开发企业级后台应用的方案&#xff0c;让开发者能更专注于业务逻辑和功能实现&#xff0c;而非基础架构的搭建。技术栈&#xff1a;该项目基于Vue.js、Element UI、Vue Rou…

Redis 7.x 系列【24】哨兵模式配置项

有道无术&#xff0c;术尚可求&#xff0c;有术无道&#xff0c;止于术。 本系列Redis 版本 7.2.5 源码地址&#xff1a;https://gitee.com/pearl-organization/study-redis-demo 文章目录 1. 前言2. 配置项2.1 protected-mode2.2 port2.3 daemonize2.4 pidfile2.5 loglevel2.…

i18n、L10n、G11N 和 T9N 的含义

注&#xff1a;机翻&#xff0c;未校对。 Looking into localization for the first time can be terrifying, if only due to all of the abbreviations. But the meaning of i18n, L10n, G11N, and T9N, are all very easy to understand. 第一次研究本地化可能会很可怕&…

深入探索Python Web抓取世界:利用BeautifulSoup与Pandas构建全面的网页数据采集与分析流程

引言 在信息爆炸的时代&#xff0c;网络成为了一个无尽的知识宝库&#xff0c;其中包含了大量有价值的公开数据。Python作为一种灵活多变且具有强大生态系统支持的编程语言&#xff0c;尤其擅长于数据的收集、处理与分析工作。本文将聚焦于Python的两大利器——BeautifulSoup和…

如何做一个迟钝不受伤的打工人?

一、背景 在当前激烈的职场环境中&#xff0c;想要成为一个相对“迟钝”且不易受伤的打工人&#xff0c;以下是一些建议&#xff0c;但请注意&#xff0c;这里的“迟钝”并非指智力上的迟钝&#xff0c;而是指在应对复杂人际关系和压力时展现出的豁达与钝感力&#xff1a; 尊重…

【测开能力提升-fastapi框架】fastapi路由分发

1.7 路由分发 apps/app01.py from fastapi import APIRouterapp01 APIRouter()app01.get("/food") async def shop_food():return {"shop": "food"}app01.get("/bed") async def shop_food():return {"shop": "bed&…

部署stable-diffusion时遇到RuntimeError: Couldn‘t clone Stable Diffusion XL.问题

错误信息如下&#xff1a; venv "E:\AI\stable-diffusion-webui-master\venv\Scripts\Python.exe" fatal: ambiguous argument HEAD: unknown revision or path not in the working tree. Use -- to separate paths from revisions, like this: git <command>…

js前端隐藏列 并且获取值,列表复选框

列表框 <div class"block" id"psi_wh_allocation_m"><table id"result" class"list auto hover fixed" style"width:100%;border-collapse:collapse"><thead><tr><%--<th></th>--%&…

LabVIEW滤波器性能研究

为了研究滤波器的滤波性能&#xff0c;采用LabVIEW设计了一套滤波器性能研究系统。该系统通过LabVIEW中的波形生成函数&#xff0c;输出幅值及频率可调的正弦波和白噪声两种信号&#xff0c;并将白噪声与正弦波叠加&#xff0c;再通过滤波器输出纯净的正弦波信号。系统通过FFT&…

Python从0到100(三十八):json字符串的数据提取

JSON的数据提取 1.学习目标 掌握JSON相关的方法&#xff08;load, loads, dump, dumps&#xff09;了解JSONPath的使用&#xff08;提取JSON中的数据&#xff09; 2 复习什么是JSON JSON(JavaScript Object Notation) 是一种轻量级的数据交换格式&#xff0c;它使得人们很容…

富文本braft-editor插件分享

效果展示 安装插件 npm install braft-editor 或者 yarn add braft-editor 主要代码 import React, { useState, forwardRef } from react //引入富文本编辑器 import BraftEditor from braft-editor // 引入编辑器样式 import braft-editor/dist/index.css import { B…

thinkphp8框架源码精讲

前言 很开心你能看到这个笔记&#xff0c;相信你对thinkphp是有一定兴趣的&#xff0c;正好大家都是志同道合的人。 thinkphp是我入门学习的第一个框架&#xff0c;经过这么多年了&#xff0c;还没好好的研究它&#xff0c;今年利用了空闲的时间狠狠的深入源码学习了一把&…

缺陷检测总结

基于深度学习的缺陷检测方法 1、全监督模型&#xff1a;基于表征学习的缺陷检测模型&#xff0c;基于度量学习的缺陷检测模型 1.1、基于表征学习的缺陷检测模型&#xff1a;分类网络&#xff0c;检测网络&#xff0c;分割网络&#xff1b; 其中分类网络的使用方式主要有三种…

2974. 最小数字游戏 Easy

你有一个下标从 0 开始、长度为 偶数 的整数数组 nums &#xff0c;同时还有一个空数组 arr 。Alice 和 Bob 决定玩一个游戏&#xff0c;游戏中每一轮 Alice 和 Bob 都会各自执行一次操作。游戏规则如下&#xff1a; 每一轮&#xff0c;Alice 先从 nums 中移除一个 最小 元素&a…

硅谷甄选运营平台-vue3组件通信方式

vue3组件通信方式 vue2组件通信方式&#xff1a; props:可以实现父子组件、子父组件、甚至兄弟组件通信自定义事件:可以实现子父组件通信全局事件总线$bus:可以实现任意组件通信pubsub:发布订阅模式实现任意组件通信vuex:集中式状态管理容器&#xff0c;实现任意组件通信ref:父…

camunda最终章-springboot

1.实现并行流子流程 1.画图 2.创建实体 package com.jmj.camunda7test.subProcess.entity;import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor;import java.io.Serializable; import java.util.ArrayList; import java.util.List;Data …

C语言 | Leetcode C语言题解之第230题二叉搜索树中第K小的元素

题目&#xff1a; 题解&#xff1a; /*** Definition for a binary tree node.* struct TreeNode {* int val;* struct TreeNode *left;* struct TreeNode *right;* };*/int search_num(struct TreeNode* root, int k, int *result, int num) {if(num k 1){retu…