spring boot 实现直播聊天室

spring boot 实现直播聊天室

技术方案:

  • spring boot
  • websocket
  • rabbitmq

使用 rabbitmq 提高系统吞吐量

引入依赖

<dependencies><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.42</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.23</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId></dependency>
</dependencies>

websocket 实现

MHttpSessionHandshakeInterceptor

参数拦截

/*** @Date: 2023/12/8 14:52* websocket 握手拦截* 1. 参数拦截(header或者 url 参数)* 2. token 校验*/
@Slf4j
public class MHttpSessionHandshakeInterceptor extends HttpSessionHandshakeInterceptor {@Overridepublic boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {if (request instanceof ServletServerHttpRequest servletRequest){//ws://127.0.0.1:8080/group/2?username=xxxxHttpServletRequest httpServletRequest = servletRequest.getServletRequest();String requestURI = httpServletRequest.getRequestURI();String groupId = requestURI.substring(requestURI.lastIndexOf("/") + 1);String username = httpServletRequest.getParameter("username");log.info(">>>>>>> beforeHandshake groupId: {} - username: {}", groupId, username);attributes.put("username", username);//解析占位符attributes.put("groupId", groupId);}return super.beforeHandshake(request, response, wsHandler, attributes);}}
GroupWebSocketHandler

消息发送

@Slf4j
public class GroupWebSocketHandler implements WebSocketHandler {//Map<room,List<map<session,username>>>private ConcurrentHashMap<String, Queue<WebSocketSession>> sessionMap = new ConcurrentHashMap<>();@Autowiredprivate MessageClient messagingClient;@Overridepublic void afterConnectionEstablished(WebSocketSession session) throws Exception {String username = (String) session.getAttributes().get("username");String groupId = (String) session.getAttributes().get("groupId");log.info("{} 用户上线房间 {}", username, groupId);TomcatWsSession wsSession = new TomcatWsSession(session.getId(),groupId, username, session);SessionRegistry.getInstance().addSession(wsSession);}@Overridepublic void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {String groupId = (String) session.getAttributes().get("groupId");String username = (String) session.getAttributes().get("username");if (message instanceof PingMessage){log.info("PING");return;}else if (message instanceof TextMessage textMessage) {MessageDto messageDto = new MessageDto();messageDto.setSessionId(session.getId());messageDto.setGroup(groupId);messageDto.setFromUser(username);messageDto.setContent(new String(textMessage.getPayload()));messagingClient.sendMessage(messageDto);}}@Overridepublic void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {String username = (String) session.getAttributes().get("username");String groupId = (String) session.getAttributes().get("groupId");log.info(">>> handleTransportError {} 用户上线房间 {}", username, groupId);}@Overridepublic void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {String username = (String) session.getAttributes().get("username");String groupId = (String) session.getAttributes().get("groupId");log.info("{} 用户下线房间 {}", username, groupId);TomcatWsSession wsSession = new TomcatWsSession(session.getId(),groupId, username, session);SessionRegistry.getInstance().removeSession(wsSession);}@Overridepublic boolean supportsPartialMessages() {return false;}}
WebSocketConfig

websocket 配置

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {@Overridepublic void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {registry.addHandler(myHandler(), "/group/{groupId}").addInterceptors(new MHttpSessionHandshakeInterceptor()).setAllowedOrigins("*");}@Beanpublic GroupWebSocketHandler myHandler() {return new GroupWebSocketHandler();}@Beanpublic ServletServerContainerFactoryBean createWebSocketContainer() {ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();container.setMaxTextMessageBufferSize(8192);  //文本消息最大缓存container.setMaxBinaryMessageBufferSize(8192);  //二进制消息大战缓存container.setMaxSessionIdleTimeout(3L * 60 * 1000); // 最大闲置时间,3分钟没动自动关闭连接container.setAsyncSendTimeout(10L * 1000); //异步发送超时时间return container;}}

session 管理

将 websocketSession进行抽像,websocketsession可以由不同容器实现

WsSession
public interface  WsSession {/*** session 组* @return*/String group();/*** session Id* @return*/String getId();/*** 用户名或其他唯一标识* @return*/String identity();/*** 发送文本消息* @param messageDto*/void sendTextMessage(MessageDto messageDto);
}public abstract class AbstractWsSession implements WsSession {private String id;private String group;private String identity;public AbstractWsSession(String id, String group, String identity) {this.id = id;this.group = group;this.identity = identity;}@Overridepublic String group() {return this.group;}@Overridepublic String getId() {return this.id;}@Overridepublic String identity() {return this.identity;}@Overridepublic boolean equals(Object o) {if (this == o) return true;if (o == null || getClass() != o.getClass()) return false;AbstractWsSession that = (AbstractWsSession) o;//简单比较 sessionIdreturn Objects.equals(id, that.id);}@Overridepublic int hashCode() {return Objects.hash(id, group, identity);}
}
TomcatWsSession

默认session实现

@Slf4j
public class TomcatWsSession extends AbstractWsSession {private WebSocketSession webSocketSession;public TomcatWsSession(String id, String group, String identity, WebSocketSession webSocketSession) {super(id, group, identity);this.webSocketSession = webSocketSession;}@Overridepublic void sendTextMessage(MessageDto messageDto) {String content = messageDto.getFromUser() + " say: " + messageDto.getContent();try {webSocketSession.sendMessage(new TextMessage(content));} catch (IOException e) {log.error("TomcatWsSession sendTextMessage error: identity:{}-group:{}-msg: {}",super.identity(), super.group(), JSON.toJSONString(messageDto));}}
}

SessionRegistry

websocket session管理

public class SessionRegistry {private static SessionRegistry instance;private SessionRegistry() {}public static SessionRegistry getInstance() {if (instance == null) {synchronized (SessionRegistry.class) {if (instance == null) {instance = new SessionRegistry();}}}return instance;}//Map<group,List<Session>>private ConcurrentHashMap<String, Queue<WsSession>> sessionMap = new ConcurrentHashMap<>();/*** 添加 session* @param wsSession*/public void addSession(WsSession wsSession) {sessionMap.computeIfAbsent(wsSession.group(),g -> new ConcurrentLinkedDeque<>()).add(wsSession);}/*** 移除 session* @param wsSession*/public void removeSession(WsSession wsSession) {Queue<WsSession> wsSessions = sessionMap.get(wsSession.group());if (!CollectionUtils.isEmpty(wsSessions)){//重写 WsSession equals 和 hashCode 方法,不然会移除失败wsSessions.remove(wsSession);if (CollectionUtils.isEmpty(wsSessions)){sessionMap.remove(wsSession.group());}}}/*** 发送消息* @param messageDto*/public void sendGroupTextMessage(MessageDto messageDto){Queue<WsSession> wsSessions = sessionMap.get(messageDto.getGroup());if (!CollectionUtils.isEmpty(wsSessions)){for (WsSession wsSession : wsSessions) {if (wsSession.getId().equals(messageDto.getSessionId())){continue;}wsSession.sendTextMessage(messageDto);}}}/*** session 在线统计* @param groupId* @return*/public Integer getSessionCount(String groupId) {if (StrUtil.isNotBlank(groupId)) {return sessionMap.get(groupId).size();}return sessionMap.values().stream().map(l -> l.size()).collect(Collectors.summingInt(a -> a));}
}

消息队列

这里使用 rabbitmq

MessageDto

消息体

@Data
public class MessageDto {/*** sessionId*/private String sessionId;/*** 组*/private String group;/*** 消息发送者*/private String fromUser;/*** 发送内容*/private String content;
}
MessageClient
@Component
@Slf4j
public class MessageClient {private String routeKey = "bws.key";private String exchange = "bws.exchange";@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMessage(MessageDto messageDto) {try {rabbitTemplate.convertAndSend(exchange, routeKey, JSON.toJSONString(messageDto));} catch (AmqpException e) {log.error("MessageClient.sendMessage: {}", JSON.toJSONString(messageDto), e);}}
}
MessageListener
@Slf4j
@Component
public class MessageListener {@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = "bws.exchange", type = "topic"), value =@Queue(value = "bws.queue", durable = "true"), key = "bws.key"))public void onMessage(Message message) {String messageStr = "";try {messageStr = new String(message.getBody(), StandardCharsets.UTF_8);log.info("<<<<<<<<< MessageListener.onMessage:{}", messageStr);MessageDto messageDto = JSON.parseObject(messageStr, MessageDto.class);if (!Objects.isNull(messageDto)) {SessionRegistry.getInstance().sendGroupTextMessage(messageDto);} else {log.info("<<<<<<<<< MessageListener.onMessage is null:{}", messageStr);}} catch (Exception e) {log.error("######### MessageListener.onMessage: {}-{}", messageStr, e);}}}

application.properties配置


spring.rabbitmq.host=192.168.x.x
spring.rabbitmq.password=guest
spring.rabbitmq.port=27067
spring.rabbitmq.username=guest
spring.rabbitmq.virtual-host=my-cluster

测试

websoket链接: ws://127.0.0.1:8080/group/2?username=xxx, websocket客户端测试地址

在这里插入图片描述

good luck!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/225818.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LeetCode 300最长递增子序列 674最长连续递增序列 718最长重复子数组 | 代码随想录25期训练营day52

动态规划算法10 LeetCode 300 最长递增子序列 2023.12.15 题目链接代码随想录讲解[链接] int lengthOfLIS(vector<int>& nums) {//创建变量result存储最终答案,设默认值为1int result 1;//1确定dp数组&#xff0c;dp[i]表示以nums[i]为结尾的子数组的最长长度ve…

VS Code连接远程Linux服务器调试C程序

1.在 VS Code 上安装扩展 C/C 2.通过 VS Code 连接远程 Linux 服务器 3.通过 VS Code 在远程 Linux 服务器上安装扩展 C/C 4.打开远程 Linux 服务器上的文件夹 【注】本文以 /root/ 为例。 5.创建项目文件夹&#xff0c;并在项目文件夹下创建C程序 6.按 F5&#xff0c;选…

mysql中的int(1)和int(10)的区别

今天偶然发现同事在写sql建表的时候把int类型括号后面的数字写成了1&#xff0c;但是我发现数据库里面的值已经远远超过了1位所能表示的范围&#xff0c;所以括号里面的数字肯定不是表示长度了&#xff08;印象中早期的navivat建表的时候&#xff0c;int类型如果默认不指定长度…

devc++如何建立一个c++项目?devc++提示源文件未编译?

打开devc APP后是这样的界面&#xff1b; 点击文件-> 新建->项目&#xff0c;这一点应该不难&#xff0c;主要是最后这个选择什么&#xff1f; 这样即可。 devc提示源文件未编译&#xff1f; 点击工具->编译选项&#xff1b; 如果不能解决&#xff0c;那就是可能路径…

文物数字化建模纹理贴图

在线工具推荐&#xff1a; 3D数字孪生场景编辑器 - GLTF/GLB材质纹理编辑器 - 3D模型在线转换 - Three.js AI自动纹理开发包 - YOLO 虚幻合成数据生成器 - 三维模型预览图生成器 - 3D模型语义搜索引擎 1、文物3D数字化建模的特点 文物埋在地下历经千年&#xff0c;由于时…

转动惯量与惯性张量 的推导

从牛顿第二定律推出绕固定轴旋转的转动惯量&#xff0c;再用类似方法从牛顿第二定律推出绕固定点转动的惯性张量 基础定义 角速度 ω \omega ω是一个三维向量&#xff0c;方向表示旋转轴&#xff0c;用右手定则代表旋转方向&#xff0c;长度代表旋转弧度的速度 线速度&#…

WebGL+Three.js入门与实战——给画布换颜色、绘制一个点、三维坐标系

个人简介 &#x1f440;个人主页&#xff1a; 前端杂货铺 &#x1f64b;‍♂️学习方向&#xff1a; 主攻前端方向&#xff0c;正逐渐往全干发展 &#x1f4c3;个人状态&#xff1a; 研发工程师&#xff0c;现效力于中国工业软件事业 &#x1f680;人生格言&#xff1a; 积跬步…

为了吃鸡苦练狙击,避免坑队友自己造一个狙击游戏!

引言 一文教会你造一个简易的狙击游戏。 说到狙击&#xff0c;相信大家都不陌生&#xff0c;无论是影视作品还是网络游戏&#xff0c;都经常能看到狙击枪的身影&#xff0c;最深刻的是它能够从百里之外&#xff0c;一枪爆头。 本文将介绍如何在Cocos Creator中造一个简易的狙…

UE5 动画 Sequencer-学习笔记

P2. 课程介绍 资料&#xff1a;https://www.bilibili.com/video/BV1Ag411873f?p2&vd_source707ec8983cc32e6e065d5496a7f79ee6 Sequencer不仅可以做互动动画&#xff0c;还可以导出视频与序列帧 P3-4. 界面介绍 https://www.bilibili.com/video/BV1Ag411873f?p3&spm_…

数据挖掘任务一般流程

数据挖掘是从大量数据中提取有价值信息的过程。它涉及多个步骤&#xff0c;每一步都对整个数据挖掘过程至关重要。以下是数据挖掘任务的一般流程&#xff1a; 业务理解&#xff1a; 确定业务目标。评估当前情况。定义数据挖掘问题。制定一个初步计划来达到这些目标。 数据理…

WPF-附加属性《十二》

非常重要 依赖属性和附加属性&#xff0c;两者是有关系的&#xff0c;也是有些区别的&#xff0c;很多时候&#xff0c;可能会把两者混淆了。 附加属性&#xff08;Attach Property&#xff09; 顾名思义&#xff0c;就是附加上面的属性&#xff0c;自身是没有的&#xff0c;…

人工智能在红斑狼疮应用主要以下4个方面

人工智能&#xff08;Artificial Intelligence, AI&#xff09;在医学领域的应用已取得了一定的进展。红斑狼疮&#xff08;Systemic Lupus Erythematosus, SLE&#xff09;是一种免疫系统性疾病&#xff0c;对该疾病进行诊断和治疗是一个复杂的过程。人工智能可以发挥作用&…

如何在Centos 7环境下安装MySQL并登录

目录 先获取MySQL官方yum源 然后正常使用yum命令下载mysql即可完成MySQL的下载 使用mysql客户端登录mysqld服务端 能够登录mysql客户端后&#xff0c;我们最后还需要做一点配置 先获取MySQL官方yum源&#xff08;包括对yum源的介绍&#xff09; 介绍一下yum源 yum源就是一…

瞳孔检测眼动追踪python实现(基于dlib)

效果展示&#xff1a; 原图&#xff1a;&#xff08;图片来自 b站up 借我300去洗牙&#xff09; dlib实现的特征点检测 瞳孔检测结果 完整代码&#xff1a; # encoding:utf-8import dlib import numpy as np import cv2def rect_to_bb(rect): # 获得人脸矩形的坐标信息x …

服务器感染了.locked、.locked1勒索病毒,如何确保数据文件完整恢复?

尊敬的读者&#xff1a; .locked、.locked1勒索病毒是当前网络安全威胁中备受关注的一种恶意软件。本文将深入介绍.locked、.locked1勒索病毒的特征&#xff0c;有效的数据恢复方法&#xff0c;以及一系列预防措施&#xff0c;以帮助用户更好地保护自己的数字资产。面对复杂的…

深入理解人工智能中的图神经网络:原理、应用与未来展望

导言&#xff1a; 图神经网络&#xff08;Graph Neural Networks, GNNs&#xff09;作为人工智能领域的一项前沿技术&#xff0c;在社交网络分析、推荐系统、生物信息学等多个领域展现出卓越的性能。本文将深入剖析图神经网络的原理、当前应用场景以及未来可能的发展方向。 1.…

swing快速入门(十二)

注释很详细&#xff0c;直接上代码 上一篇 新增内容 1.Box容器和BroadLayout布局管理器的结合用法 2.textArea&#xff08;多行文本域&#xff09; 3.Choice&#xff08;下拉选择栏&#xff09; 4. CheckboxGroup&#xff08;多项单选选择框&#xff09; 5. Checkbox&…

循环神经网络-1

目录 1 数据集构建 1.1 数据集的构建函数 1.2 加载数据并进行数据划分 1.3 构造Dataset类 2 模型构建 2.1 嵌入层 2.2 SRN层 2.3 线性层 2.4 模型汇总 3 模型训练 3.1 训练指定长度的数字预测模型 3.2 多组训练 3.3 损失曲线展示 4 模型评价 总结 参考文献 循环神经网络&…

从零开始:前端架构师的基础建设和架构设计之路

文章目录 一、引言二、前端架构师的职责三、基础建设四、架构设计思想五、总结《前端架构师&#xff1a;基础建设与架构设计思想》编辑推荐内容简介作者简介目录获取方式 一、引言 在现代软件开发中&#xff0c;前端开发已经成为了一个不可或缺的部分。随着互联网的普及和移动…

简洁高效的 NLP 入门指南: 200 行实现 Bert 文本分类 (TensorFlow 版)

简洁高效的 NLP 入门指南: 200 行实现 Bert 文本分类 TensorFlow 版 概述NLP 的不同任务Bert 概述MLM 任务 (Masked Language Modeling)TokenizeMLM 的工作原理为什么使用 MLM NSP 任务 (Next Sentence Prediction)NSP 任务的工作原理NSP 任务栗子NSP 任务的调整和局限性 安装和…