基于WebFlux的websocket的分组和群发实现

一,分组发送

在WebFlux中实现分组发送数据和群发数据给所有客户端发送,可以借助Sinks.Many来管理消息流,并使用Flux进行订阅和发送消息。以下是一个示例代码,演示如何实现这两个功能:

import org.springframework.stereotype.Component;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.Many;import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;@Component
public class ChatWebSocketHandler implements WebSocketHandler {private final Map<String, Many<ChatMessage>> groupChatMessages = new HashMap<>();private final Many<ChatMessage> allChatMessages = Sinks.many().multicast().onBackpressureBuffer();@Overridepublic Mono<Void> handle(WebSocketSession session) {String username = session.getId(); // 使用WebSocket连接的唯一标识符作为用户名// 处理新用户加入聊天室Mono<Void> join = session.receive().map(WebSocketMessage::getPayloadAsText).doOnNext(group -> joinGroup(username, group)).then();// 处理接收到的消息Mono<Void> chat = session.receive().map(WebSocketMessage::getPayloadAsText).map(message -> {ChatMessage chatMessage = new ChatMessage();chatMessage.setSender(username);chatMessage.setMessage(message);chatMessage.setTimestamp(LocalDateTime.now());return chatMessage;}).doOnNext(this::sendMessageToGroup).then();// 关闭连接时,将用户从所有分组中移除Mono<Void> leave = session.close().doFinally(signal -> leaveAllGroups(username));return Mono.zip(join, chat, leave).then();}private void joinGroup(String username, String group) {Many<ChatMessage> groupMessages = groupChatMessages.getOrDefault(group, Sinks.many().multicast().onBackpressureBuffer());groupChatMessages.put(group, groupMessages);groupMessages.asFlux().subscribe(); // 强制激活订阅以开始接收消息}private void sendMessageToGroup(ChatMessage chatMessage) {String group = chatMessage.getGroup();if (group != null && groupChatMessages.containsKey(group)) {Many<ChatMessage> groupMessages = groupChatMessages.get(group);groupMessages.tryEmitNext(chatMessage);}allChatMessages.tryEmitNext(chatMessage); // 群发给所有客户端}private void leaveAllGroups(String username) {groupChatMessages.values().forEach(groupMessages -> groupMessages.tryEmitNext(new ChatMessage(username, "left the chat room.", LocalDateTime.now())));}
}

在上述代码中,我们使用Sinks.Many来管理分组消息流和群发消息流。当有用户加入分组时,我们使用joinGroup()方法创建一个新的Many实例,并将其存储在groupChatMessages中。当收到消息时,我们通过sendMessageToGroup()方法选择性地将消息发送给特定分组的所有成员,并使用tryEmitNext()方法来发送消息到对应的消息流。同时,我们还维护了一个allChatMessages的消息流,用于群发给所有客户端。

通过使用groupChatMessagesallChatMessages管理消息流,你可以实现分组发送数据和群发数据给所有客户端发送的功能。

二, 广播发送

import org.springframework.stereotype.Component;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.Flux;
import reactor.core.publisher.UnicastProcessor;@Component
public class BroadcastWebSocketHandler implements WebSocketHandler {private final UnicastProcessor<String> messagePublisher;private final Flux<String> outputMessages;public BroadcastWebSocketHandler() {this.messagePublisher = UnicastProcessor.create();this.outputMessages = messagePublisher.replay(25).autoConnect();}public void broadcastMessage(String message) {messagePublisher.onNext(message);}@Overridepublic Mono<Void> handle(WebSocketSession session) {Flux<WebSocketMessage> input = session.receive();Flux<WebSocketMessage> output = outputMessages.map(session::textMessage);return session.send(output).and(input.then());}
}

上述代码中的BroadcastWebSocketHandler类实现了广播消息给所有连接的WebSocket客户端的功能。让我们逐行解释代码的实现:

  • messagePublisher是一个UnicastProcessor,用于发布新的消息。
  • outputMessages是一个Flux,用于保存最新的25条消息,并在每个新的WebSocket连接时自动发送这些消息。
  • broadcastMessage()方法用于向所有连接的WebSocket客户端广播消息。调用该方法时,新的消息将被发布到messagePublisher
  • handle()方法中,我们创建了一个input流来接收从客户端发送的消息。
  • 我们使用outputMessages流来创建一个output流,将最新的消息转换为WebSocketMessage对象。
  • 最后,我们将output流发送到客户端,并在input流完成时结束WebSocket会话。

通过这样的实现,当有新的消息到达时,所有连接的WebSocket客户端都会收到该消息。

三. 向特定用户发送消息:

import org.springframework.stereotype.Component;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;@Component
public class UserWebSocketHandler implements WebSocketHandler {private final Map<String, Sinks.Many<String>> userSinks = new ConcurrentHashMap<>();public void sendMessageToUser(String userId, String message) {Sinks.Many<String> userSink = userSinks.get(userId);if (userSink != null) {userSink.tryEmitNext(message);}}@Overridepublic Mono<Void> handle(WebSocketSession session) {String userId = session.getHandshakeInfo().getUri().getQuery();Sinks.Many<String> userSink = Sinks.many().unicast().onBackpressureBuffer();userSinks.put(userId, userSink);Flux<String> input = session.receive().map(WebSocketMessage::getPayloadAsText).subscribeOn(Schedulers.boundedElastic()).doFinally(signalType -> userSinks.remove(userId));Flux<String> output = userSink.asFlux().doOnNext(message ->session.send(Mono.just(session.textMessage(message))));return session.send(output).and(input.then());}
}

上述代码中的UserWebSocketHandler类实现了向特定用户发送消息的功能。让我们逐行解释代码的实现:

  • userSinks是一个ConcurrentHashMap,用于存储特定用户的Sinks.Many实例。
  • sendMessageToUser()方法用于向特定用户发送消息。我们通过用户ID从userSinks中获取相应的Sinks.Many实例,并使用tryEmitNext()方法发送消息。
  • handle()方法中,我们首先从WebSocket会话中获取用户ID,并创建一个新的Sinks.Many实例,并将其放入userSinks中。
  • 我们创建一个input流来接收从客户端发送的消息。使用subscribeOn(Schedulers.boundedElastic())将其放入弹性调度器中,以避免阻塞WebFlux线程。
  • output流中,我们将userSink转换为Flux,并在每个新的消息到达时发送给客户端。
  • 最后,我们将output流发送到客户端,并在input流完成时结束WebSocket会话。
    很抱歉,我无法直接为您编写一篇博客。然而,我可以为您提供一个关于基于WebFlux的WebSocket分组和群发实现的简要代码案例分析,您可以根据此信息撰写自己的博客。

总结:

本文将介绍如何使用Spring WebFlux框架实现WebSocket的分组和群发功能。WebSocket提供了一种双向通信的机制,能够在服务器和客户端之间实现实时的数据传输。在本文中,我们将使用WebFlux的响应式编程模型和相关的类库来实现WebSocket的分组和群发功能。

通过本文的介绍和代码案例分析,我们了解了如何使用Spring WebFlux框架实现WebSocket的分组和群发功能。这种实现方式能够满足实时通信和群聊等需求,并且借助WebFlux的响应式编程模型,能够处理大量并发连接和高吞吐量的场景。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/626233.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

读元宇宙改变一切笔记09_硬件与互操作性(下)

1. 移动互联网的继承者 1.1. 要想让元宇宙成为现实&#xff0c;需要开发新的标准&#xff0c;创建新的基础设施&#xff0c;可能还需要对长期存在的TCP/IP协议进行彻底改革 1.1.1. 采用新的设备和硬件&#xff0c;甚至可能打破技术巨头、独立开发者和终端用户之间的权利平衡 …

Go+快速开始详细指南

Go快速开始 Go编程语言是为工程、STEM教育和数据科学设计的。 对于工程:用儿童能掌握的最简单的语言工作。对于STEM教育:学习一门可以在未来工作中使用的工程语言。对于数据科学:用同一种语言与工程师交流。 安装方法 现在&#xff0c;我们建议您从源代码安装Go。 注意:需…

CDMP认证与CDGA/CDGP的区别有哪些?

&#x1f451;CDMP是DAMA国际组织的全英文考试 &#x1f308;CDGA和CDGP是DAMA中国主导&#xff0c;考试为中文考试。需要在规定的时间内报名后&#xff0c;参加线下笔试考试。 &#x1f44d;CDGA、CDGP证书和英文版的CDMP证书都是国际通用的&#xff0c;是数据管理领域专业的职…

js等于操作符和全等操作符(== 和 ===)的区别,在什么情况下使用

在JavaScript中&#xff0c;&#xff08;等于操作符&#xff09;和&#xff08;全等操作符&#xff09;都是用来比较两个值是否相等的工具&#xff0c;但它们有一些重要的区别。 会尝试进行类型转换&#xff0c;然后再比较。这意味着它可能会将不同类型的值转换为相同类型&…

【教3妹学编程-算法题】3008. 找出数组中的美丽下标 II

3妹&#xff1a;呜呜&#xff0c;烦死了&#xff0c; 脸上长了一个痘 2哥 : 不要在意这些细节嘛&#xff0c;不用管它&#xff0c;过两天自然不就好了。 3妹&#xff1a;切&#xff0c;你不懂&#xff0c;影响这两天的心情哇。 2哥 : 我看你是不急着找工作了啊&#xff0c; 工作…

IP 网络分为接入网、城域网和骨干网

根据前述的IP 网络设计思想&#xff0c;结合算力网络对 正网络的需求分析&#xff0c;卫网络的具体实现可以从架构设计利网络技术两个方面进行总体设计。 首先从架构设计上考虑&#xff0c;架构应尽量简化&#xff0c;做到“以简应繁”。因此&#xff0c;整体网络架构不宜设计…

爬虫-8-数据存储-mysql

#mysql占空间最小吧&#xff0c;数据存储没问题吧 (//∇//)

【.net core】yisha框架,bootstrap-table组件增加固定列功能

需要引入 bootstrap-table-fixed-columns.css和bootstrap-table-fixed-columns.js文件 文件代码&#xff1a; bootstrap-table-fixed-columns.css样式文件代码 .fixed-table-header-columns, .fixed-table-body-columns {position: absolute;background-color: #fff;displa…

C++系统笔记教程----vscode远程连接ssh

C系统笔记教程 文章目录 C系统笔记教程前言开发环境配置总结 前言 开发环境配置 Ubuntu20.24VScode 如果没有linux系统&#xff0c;但是想用其编译&#xff0c;可以使用ssh远程连接。 首先进入vscode,打开远程连接窗口&#xff08;蓝色的小箭头这&#xff09; 选择连接到主机…

FFmpeg之AVFormat

文章目录 一、概述二、解封装流程三、重要结构体3.1、AVFormatContext3.2、AVInputFormat3.3、AVOutputFormat3.4、AVStream 四、重要函数分析4.1、avformat_alloc_context4.2、avformat_open_input4.2.1、init_input4.2.2、av_probe_input_format2 4.3、avformat_find_stream_…

Linux环境搭建FastDFS文件服务器(附带Nginx安装)

本文主要介绍在linux服务器如何搭建FastDFS文件服务器。大概分为9个步骤&#xff0c;由于内容较为繁琐。下面带你入坑&#xff01; 首先简单介绍一下FastDFS是淘宝资深架构师余庆老师主导开源的一个分布式文件系统&#xff0c;用C语言编写。适应与中小企业&#xff0c;对文件不…

设计 Pastebin.com

设计 Pastebin.com 设计 Bit.ly 是一个类似的问题&#xff0c;区别是 pastebin 需要存储的是 paste 的内容&#xff0c;而不是原始的未短化的 url. 怎么处理这个问题&#xff1f; 第一步&#xff1a;概述用例和约束 收集这个问题的需求和范畴。问相关问题来明确用例和约束&…

Github 2024-01-16 Python开源项目日报 Top10

根据Github Trendings的统计&#xff0c;今日(2024-01-16统计)共有10个项目上榜。根据开发语言中项目的数量&#xff0c;汇总情况如下&#xff1a; 开发语言项目数量Python项目10HTML项目1 精心策划的Python资源列表 创建周期&#xff1a;3490 天开发语言&#xff1a;Python…

常用Java代码-Java中的Lambda表达式和函数式接口

Java中的Lambda表达式和函数式接口是Java 8中引入的一种新特性&#xff0c;允许编写简洁、可读性强的代码。Lambda表达式允许将简单的代码块作为参数传递给函数&#xff0c;而函数式接口则是一种只有一个抽象方法的接口&#xff0c;可以用于定义Lambda表达式。 下面是一个Lamb…

使用Python实现MySQL数据库表管理,有录播直播私教课视频教程

查看所有表格 from mysql.db_object import Databasedb Database(host"localhost", database"zdppy_mysql_demo", password"zhangdapeng520")# 查看所有表格 print(db.get_all_table())创建表格 from mysql.db_object import Databasedb Dat…

蓝桥杯备赛 | 洛谷做题打卡day3

蓝桥杯备赛 | 洛谷做题打卡day3 sort函数真的很厉害&#xff01; 文章目录 蓝桥杯备赛 | 洛谷做题打卡day3sort函数真的很厉害&#xff01;【深基9.例1】选举学生会题目描述输入格式输出格式样例 #1样例输入 #1 样例输出 #1 我的一些话 【深基9.例1】选举学生会 题目描述 学校…

MySQL深分页问题四种方案解析

mysql深分页问题&#xff1a; 这个问题在实际项目中很常见&#xff0c;当数据量大以后&#xff0c;分页会非常的慢&#xff08;几年前做过一个调度日志的分页查询&#xff0c;简直没法用&#xff09; 深分页为什么慢 前言&#xff1a;N个条件为索引&#xff0c;id为主键…

使用Python实现MySQL数据库的查询,有录播直播私教课视频教程

查询所有 from mysql.db_object import Databasedb Database(host"localhost",password"zhangdapeng520",database"zdppy_mysql_demo")# 创建表 table "test_user" sql """ create table test_user(id bigint prim…

23111 网络编程 day2

思维导图 重打代码 #include<myhead.h> #define SER_IP "192.168.122.150" //服务器ip #define SER_PORT 8888 //服务器端口int main(int argc, const char *argv[]) {//1.创建用于连接的套接字int sfdsocket(AF_INET,SOCK_STREAM,0);if(sfd-1){perror("…

Python - 深夜数据结构与算法之 位运算

目录 一.引言 二.位运算简介 1.二进制与十进制 2.左/右移 3.位运算 4.异或 XOR 5.指定位置的位运算 6.实战要点 三.经典算法实战 1.Number-1-of-bits [191] 2.Power-Of-Two [231] 3.Reverse-2-Bits [190] 4.N-Queens [51] 四.总结 一.引言 通常情况下我们计数采…