SSE(Server Sent Event)实战(3)- Spring Web Flux 实现

上篇博客 SSE(Server Sent Event)实战(2)- Spring MVC 实现,我们用 Spring MVC 实现了简单的消息推送,并且留下了两个问题,这篇博客,我们用 Spring Web Flux 实现,并且看看这两个问题怎么解决。

一、服务端实现

/** XingPan.com* Copyright (C) 2021-2024 All Rights Reserved.*/
package com.sse.demo2.controller;import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.time.Duration;
import java.util.Enumeration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** @author liuyuan* @version SseController.java, v 0.1 2024-07-15 14:24*/
@Slf4j
@RestController
@RequestMapping("/sse")
public class SseController {private static final HttpClient HTTP_CLIENT = HttpClient.create().responseTimeout(Duration.ofSeconds(5));private final Map<String, FluxSink<String>> USER_CONNECTIONS = new ConcurrentHashMap<>();/*** 用来存储用户和本机地址,实际生成请用 redis*/private final Map<String, String> USER_CLIENT = new ConcurrentHashMap<>();/*** 创建连接*/@GetMapping(value = "/create-connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<String> createConnect(@RequestParam("userId") String userId) {// 获取本机地址String hostAddress = this.getHostAddress();Flux<String> businessData = Flux.create(sink -> {USER_CONNECTIONS.put(userId, sink);USER_CLIENT.put(userId, hostAddress);log.info("创建了用户[{}]的SSE连接", userId);sink.onDispose(() -> {USER_CONNECTIONS.remove(userId);USER_CLIENT.remove(userId);log.info("移除用户[{}]的SSE连接", userId);});});// 创建心跳Flux<String> heartbeat = Flux.interval(Duration.ofMinutes(1)).map(tick -> "data: heartbeat\n\n");return Flux.merge(businessData, heartbeat);}/*** 发送消息 gateway*/@GetMapping("/send-message-gateway")public Mono<RpcResult<Boolean>> sendMessageGateway(@RequestParam("userId") String userId, @RequestParam("message") String message) {String userHostAddress = USER_CLIENT.get(userId);if (userHostAddress == null) {log.info("用户[{}]的SSE连接不存在,无法发送消息", userId);return Mono.just(RpcResult.error("10001", "SSE连接不存在,无法发送消息"));}// 获取本机地址和用户连接地址比较,如果相同,直接使用localhost发消息String hostAddress = this.getHostAddress();userHostAddress = userHostAddress.equals(hostAddress) ? "localhost" : userHostAddress;String baseUrl = "http://" + userHostAddress + ":8080";log.info("发送消息 > baseUrl = {}", baseUrl);WebClient webClient = WebClient.builder().clientConnector(new ReactorClientHttpConnector(HTTP_CLIENT)).baseUrl(baseUrl).build();RpcResult<Boolean> errorResult = RpcResult.error("10002", "消息发送失败");return webClient.get().uri("/sse/send-message?userId={userId}&message={message}", userId, message).exchangeToMono(clientResponse -> {if (clientResponse.statusCode().is2xxSuccessful()) {log.info("消息发送成功 > 用户 = {},消息内容 = {}", userId, message);return Mono.just(RpcResult.success(true));} else {log.error("消息发送失败 > 状态码 = {},用户 = {},消息内容 = {}", clientResponse.statusCode().value(), userId, message);return Mono.just(errorResult);}}).onErrorResume(error -> {log.error("消息发送失败 > 用户 = {}, 消息内容 = {}, e = ", userId, message, error);return Mono.just(errorResult);});}/*** 发送消息*/@GetMapping("/send-message")public Mono<Void> sendMessage(@RequestParam("userId") String userId, @RequestParam("message") String message) {FluxSink<String> sink = USER_CONNECTIONS.get(userId);if (sink != null) {try {sink.next(message);log.info("给用户[{}]发送消息成功: {}", userId, message);} catch (Exception e) {log.error("向用户[{}]发送消息失败,sink可能已关闭或无效", userId, e);USER_CONNECTIONS.remove(userId);USER_CLIENT.remove(userId);}} else {log.info("用户[{}]的SSE连接不存在或已关闭,无法发送消息", userId);}return Mono.empty();}private String getHostAddress() {String hostAddress = "localhost";try {Enumeration<NetworkInterface> networkInterfaces = NetworkInterface.getNetworkInterfaces();while (networkInterfaces.hasMoreElements()) {NetworkInterface networkInterface = networkInterfaces.nextElement();Enumeration<InetAddress> inetAddresses = networkInterface.getInetAddresses();while (inetAddresses.hasMoreElements()) {InetAddress inetAddress = inetAddresses.nextElement();if (!inetAddress.isLoopbackAddress() && !inetAddress.getHostAddress().contains(":") && inetAddress.getHostAddress().startsWith("10.")) {hostAddress = inetAddress.getHostAddress();}}}} catch (SocketException e) {log.error("获取主机地址失败", e);}log.info("获取主机地址 > hostAddress = {}", hostAddress);return hostAddress;}
}
  1. 如果我们服务设置了最大连接时间,比如 3 分钟,而服务端又长时间没有消息推送给客户端,导致长连接被关闭该怎么办?

在创建连接时/create-connect,增加心跳,只要心跳频率小于超时时间,基本就可以解决这个问题,但是前端要注意隐藏心跳内容。

  1. 实际生产环境,我们肯定是多个实例部署,那么怎么保证创建连接和发送消息是在同一个实例完成?如果不是一个实例,就意味着用户没有建立连接,消息肯定发送失败。

a. 将用户id 和用户请求的实例 ip 绑定,我这里用的是Map(USER_CLIENT)存储,生产请换成分布式缓存;
b. 服务端发送消息使用/send-message-gateway接口,这个接口只做消息分发,不真实发送消息。从USER_CLIENT中获取用户所在的实例,然后将请求分发到具体实例;
c. /send-message-gateway将请求打到/send-message,然后给用户推送消息;

二、客户端实现


<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>SSE Demo</title><script>        document.addEventListener('DOMContentLoaded', function () {var userId = "1";// 创建一个新的EventSource对象var source = new EventSource('http://localhost:8080/sse/create-connect?userId=' + userId);// 当连接打开时触发source.onopen = function (event) {console.log('SSE连接已打开');};// 当从服务器接收到消息时触发source.onmessage = function (event) {// event.data 包含服务器发送的文本数据console.log('接收到消息:', event.data);// 在页面上显示消息var messagesDiv = document.getElementById('messages');if (messagesDiv) {messagesDiv.innerHTML += '<p>' + event.data + '</p>'; // 直接使用event.data} else {console.error('未找到消息容器元素');}};// 当发生错误时触发source.onerror = function (event) {console.error('SSE连接错误:', event);};});</script>
</head>
<body>
<div id="messages"><!-- 这里将显示接收到的消息 -->
</div>
</body>
</html>

三、启动项目

  1. 运行 Spring 项目
  2. 浏览器打开 index.html文件
  3. 调用发送消息接口
    curl http://localhost:8080/sse/send-message-gateway?userId=1&message=test0001
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/47493.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

STM32(六):STM32指南者-定时器实验

目录 一、基本概念1、常规定时器2、内核定时器 二、基本定时器实验1、实验说明2、编程过程&#xff08;1&#xff09;配置LED&#xff08;2&#xff09;配置定时器&#xff08;3&#xff09;设定中断事件&#xff08;4&#xff09;主函数计数 3、工程代码 三、通用定时器实验实…

【Neural signal processing and analysis zero to hero】- 2

Nonstationarities and effects of the FT course from youtube: 传送地址 why we need extinguish stationary and non-stationary signal, because most of neural signal is non-stationary. Welch’s method for smooth spectral decomposition Full FFT method y…

【TDA4板端部署】基于 Pytorch 训练并部署 ONNX 模型在 TDA4

1 将torch模型转onnx模型 Ti转换工具只支持以下格式&#xff1a; Caffe - 0.17 (caffe-jacinto in gitHub) Tensorflow - 1.12 ONNX - 1.3.0 (opset 9 and 11) TFLite - Tensorflow 2.0-Alpha 基于 Tensorflow、Pytorch、Caffe 等训练框架&#xff0c;训练模型&#xff1a;选择…

数据结构与算法(2):顺序表与链表

1.前言 哈喽大家好喔&#xff0c;今天博主继续进行数据结构的分享与学习&#xff0c;今天的主要内容是顺序表与链表&#xff0c;是最简单但又相当重要的数据结构&#xff0c;为以后的学习有重要的铺垫&#xff0c;希望大家一起交流学习&#xff0c;互相进步&#xff0c;让我们…

数据结构之跳表SkipList、ConcurrentSkipListMap

概述 SkipList&#xff0c;跳表&#xff0c;跳跃表&#xff0c;在LevelDB和Lucene中都广为使用。跳表被广泛地运用到各种缓存实现当中&#xff0c;跳跃表使用概率均衡技术而不是使用强制性均衡&#xff0c;因此对于插入和删除结点比传统上的平衡树算法更为简洁高效。 Skip lis…

AQS详解(详细图文)

目录 AQS详解1、AQS简介AbstractQueuedSynchronizer的继承结构和类属性AQS的静态内部类Node总结AQS的实现思想总结AQS的实现原理AQS和锁的关系 2、AQS的核心方法AQS管理共享资源的方式独占方式下&#xff0c;AQS获取资源的流程详解独占方式下&#xff0c;AQS释放资源的流程详解…

如何通过DBC文件看懂CAN通信矩阵

实现汽车CAN通信开发&#xff0c;必不可少要用到DBC文件和CAN通信矩阵。 CAN通信矩阵是指用于描述 CAN 网络中各个节点之间通信关系的表格或矩阵。它通常记录了每个节点能够发送和接收的消息标识符&#xff08;ID&#xff09;以及与其他节点之间的通信权限。 通信矩阵在 CAN 网…

利用Msfvenom获取WindowsShell

一、在kali主机上利用msfvenom生成windows端的安装程序(exe文件),程序名最好取一个大家经常安装的程序,如腾讯视频、爱奇艺等。 (1)由于生成的程序可能会被杀毒软件识别,我们比较一下使用单个编码器生成的程序与用两个编码器生成的程序,哪个更容易被识别。 利用单个编码…

SSE(Server Sent Event)实战(2)- Spring MVC 实现

一、服务端实现 使用 RestController 注解创建一个控制器类&#xff08;Controller&#xff09; 创建一个方法来创建一个客户端连接&#xff0c;它返回一个 SseEmitter&#xff0c;处理 GET 请求并产生&#xff08;produces&#xff09;文本/事件流 (text/event-stream) 创建…

如何使用Milvus Cloud进行稀疏向量搜索

如何使用Milvus Cloud进行向量搜索Milvus Cloud 是一款高度可扩展、性能出色的开源向量数据库。在最新的 2.4 版本中,Milvus Cloud 支持了稀疏和稠密向量(公测中)。本文将利用 Milvus Cloud 2.4 来存储数据集并执行向量搜索。 接下来,我们将演示如何利用 Milvus Cloud 在 M…

[GXYCTF2019]Ping Ping Ping1

打开靶机 结合题目名称&#xff0c;考虑是命令注入&#xff0c;试试ls 结果应该就在flag.php。尝试构造命令注入载荷。 cat flag.php 可以看到过滤了空格,用 $IFS$1替换空格 还过滤了flag&#xff0c;我们用字符拼接的方式看能否绕过,ag;cat$IFS$1fla$a.php。注意这里用分号间隔…

睡前故事—绿色科技的未来:可持续发展的梦幻故事

欢迎来到《Bedtime Stories Time》。这是一个我们倾听、放松、并逐渐入睡的播客。感谢你收听并支持我们&#xff0c;希望你能将这个播客作为你睡前例行活动的一部分。今晚我们将讲述绿色科技的未来&#xff1a;可持续发展的梦幻故事的故事。一个宁静的夜晚&#xff0c;希望你现…

0602STM32定时器输出比较

STM32定时器输出比较 PWM简介 主要用来输出PWM波形&#xff0c;PWM波形又是驱动电机的必要条件&#xff0c;所以如果想用STM32做一些有电机的项目&#xff0c;比如智能车&#xff0c;机器人等。那输出比较功能就要认真掌握 1.PWM驱动LED呼吸灯 2.PWM驱动舵机 3.PWM驱动直流电机…

搜维尔科技:【研究】触觉技术将在5年内以8种方式改变人们的世界

触觉技术在过去几年中发展迅猛&#xff0c;大大提高了反馈的精确度和真实度。其应用产生了真正的影响&#xff0c;数百家公司和企业都集成了触觉技术来增强培训和研究模拟。 虽然触觉技术主要用于 B2B 层面&#xff0c;但触觉技术可能会彻底改变我们的生活&#xff0c;尤其是通…

视频共享融合赋能平台LntonCVS视频监控业务平台技术方案详细介绍

LntonCVS国标视频综合管理平台是一款智慧物联应用平台&#xff0c;核心技术基于视频流媒体&#xff0c;采用分布式和负载均衡技术开发&#xff0c;提供广泛兼容、安全可靠、开放共享的视频综合服务。该平台功能丰富&#xff0c;包括视频直播、录像、回放、检索、云存储、告警上…

【数据结构】详解堆

一、堆的概念 堆(Heap)是计算机科学中一类特殊的数据结构的统称。堆通常是一个可以被看做一棵 完全二叉树的 数组对象。 堆是非线性数据结构&#xff0c;相当于一维数组&#xff0c;有两个直接后继。 如果有一个关键码的集合K { k₀&#xff0c;k₁&#xff0c;k₂ &#xff0…

数据结构(双向链表)

链表的分类 链表的结构⾮常多样&#xff0c;以下情况组合起来就有8种&#xff08;2 x 2 x 2&#xff09;链表结构&#xff1a; 虽然有这么多的链表的结构&#xff0c;但是我们实际中最常⽤还是两种结构&#xff1a;单链表和双向带头循环链表 1.⽆头单向⾮循环链表&#xff1a…

第十课:telnet(远程登入)

如何远程管理网络设备&#xff1f; 只要保证PC和路由器的ip是互通的&#xff0c;那么PC就可以远程管理路由器&#xff08;用telnet技术管理&#xff09;。 我们搭建一个下面这样的简单的拓扑图进行介绍 首先我们点击云&#xff0c;把云打开&#xff0c;点击增加 我们绑定vmn…

【面试题】Redo log和Undo log

Redo log 介绍Redo log之前我们需要了解一下&#xff0c;mysql数据操作的流程&#xff1a; 上述就是数据操作的流程图&#xff0c;可以发现sql语句并不是直接操作的磁盘而是通过操作内存&#xff0c;然后进行内存到磁盘的一个同步。这里我们必须要了解一些区域&#xff1a; 缓…

华为HCIP Datacom H12-821 卷42

42.填空题 如图所示&#xff0c;MSTP网络中SW1为总根&#xff0c;请将以下交换机与IST域根和主桥配对。 参考答案&#xff1a;主桥1468 既是IST域根又是主桥468 既不是又不是就是25 解析&#xff1a; 主桥1468 既是IST域根又是主桥468 既不是又不是就是25 43.填空题 网络有…