spring boot 实现直播聊天室(二)

spring boot 实现直播聊天室(二)

技术方案:

  • spring boot
  • netty
  • rabbitmq

目录结构

在这里插入图片描述

引入依赖

<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.96.Final</version>
</dependency>

SimpleNettyWebsocketServer

netty server 启动类

@Slf4j
public class SimpleNettyWebsocketServer {private SimpleWsHandler simpleWsHandler;public SimpleNettyWebsocketServer(SimpleWsHandler simpleWsHandler) {this.simpleWsHandler = simpleWsHandler;}public void start(int port) throws InterruptedException {NioEventLoopGroup boss = new NioEventLoopGroup(1);NioEventLoopGroup work = new NioEventLoopGroup(2);try {ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(boss, work).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();//HTTP协议编解码器,用于处理HTTP请求和响应的编码和解码。其主要作用是将HTTP请求和响应消息转换为Netty的ByteBuf对象,并将其传递到下一个处理器进行处理。pipeline.addLast(new HttpServerCodec());//用于HTTP服务端,将来自客户端的HTTP请求和响应消息聚合成一个完整的消息,以便后续的处理。pipeline.addLast(new HttpObjectAggregator(65535));pipeline.addLast(new IdleStateHandler(30,0,0));//处理请求参数pipeline.addLast(new SimpleWsHttpHandler());pipeline.addLast(new WebSocketServerProtocolHandler("/n/ws"));pipeline.addLast(simpleWsHandler);}});Channel channel = bootstrap.bind(port).sync().channel();log.info("server start at port: {}", port);channel.closeFuture().sync();} finally {boss.shutdownGracefully();work.shutdownGracefully();}}
}

NettyUtil: 工具类

public class NettyUtil {public static AttributeKey<String> G_U = AttributeKey.valueOf("GU");/*** 设置上下文参数* @param channel* @param attributeKey* @param data* @param <T>*/public static <T> void setAttr(Channel channel, AttributeKey<T> attributeKey, T data) {Attribute<T> attr = channel.attr(attributeKey);if (attr != null) {attr.set(data);}}/*** 获取上下文参数 * @param channel* @param attributeKey* @return* @param <T>*/public static <T> T getAttr(Channel channel, AttributeKey<T> attributeKey) {return channel.attr(attributeKey).get();}/*** 根据 渠道获取 session* @param channel* @return*/public static NettySimpleSession getSession(Channel channel) {String attr = channel.attr(G_U).get();if (StrUtil.isNotBlank(attr)){String[] split = attr.split(",");String groupId = split[0];String username = split[1];return new NettySimpleSession(channel.id().toString(),groupId,username,channel);}return null;}
}

处理handler

SimpleWsHttpHandler

处理 websocket 协议升级时地址请求参数 ws://127.0.0.1:8881/n/ws?groupId=1&username=tom, 解析groupId 和 username ,并设置这个属性到上下文

/*** @Date: 2023/12/13 9:53* 提取参数*/
@Slf4j
public class SimpleWsHttpHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof  FullHttpRequest request){//ws://localhost:8080/n/ws?groupId=xx&username=tomString decode = URLDecoder.decode(request.uri(), StandardCharsets.UTF_8);log.info("raw request url: {}", decode);Map<String, String> queryMap = getParams(decode);String groupId = MapUtil.getStr(queryMap, "groupId", null);String username = MapUtil.getStr(queryMap, "username", null);if (StrUtil.isNotBlank(groupId) && StrUtil.isNotBlank(username)) {NettyUtil.setAttr(ctx.channel(), NettyUtil.G_U, groupId.concat(",").concat(username));}//去掉参数 ===>  ws://localhost:8080/n/wsrequest.setUri(request.uri().substring(0,request.uri().indexOf("?")));ctx.pipeline().remove(this);ctx.fireChannelRead(request);}else{ctx.fireChannelRead(msg);}}/*** 解析 queryString* @param uri* @return*/public static Map<String, String> getParams(String uri) {Map<String, String> params = new HashMap<>(10);int idx = uri.indexOf("?");if (idx != -1) {String[] paramsArr = uri.substring(idx + 1).split("&");for (String param : paramsArr) {idx = param.indexOf("=");params.put(param.substring(0, idx), param.substring(idx + 1));}}return params;}
}
SimpleWsHandler

处理消息

@Slf4j
@ChannelHandler.Sharable
public class SimpleWsHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {@Autowiredprivate PushService pushService;/*** 在新的 Channel 被添加到 ChannelPipeline 中时被调用。这通常发生在连接建立时,即 Channel 已经被成功绑定并注册到 EventLoop 中。* 在连接建立时被调用一次** @param ctx* @throws Exception*/@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {NettySimpleSession session = NettyUtil.getSession(ctx.channel());if (session == null) {log.info("handlerAdded channel id: {}", ctx.channel().id());} else {log.info("handlerAdded channel group-username: {}-{}", session.group(), session.identity());}}/*** 连接断开时,Netty 会自动触发 channelInactive 事件,并将该事件交给事件处理器进行处理* 在 channelInactive 事件的处理过程中,会调用 handlerRemoved 方法** @param ctx* @throws Exception*/@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {NettySimpleSession session = NettyUtil.getSession(ctx.channel());if (session!=null){log.info("handlerRemoved channel group-username: {}-{}", session.group(), session.identity());}offline(ctx.channel());}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {//todo msg 可以是json字符串,这里仅仅只是纯文本NettySimpleSession session = NettyUtil.getSession(ctx.channel());if (session!=null){MessageDto messageDto = new MessageDto();messageDto.setSessionId(session.getId());messageDto.setGroup(session.group());messageDto.setFromUser(session.identity());messageDto.setContent(msg.text());pushService.pushGroupMessage(messageDto);}else {log.info("channelRead0 session is null channel id: {}-{}", ctx.channel().id(),msg.text());}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.info("SimpleWsHandler 客户端异常断开 {}", cause.getMessage());//todo offlineoffline(ctx.channel());}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent idleStateEvent) {if (idleStateEvent.state().equals(IdleStateEvent.READER_IDLE_STATE_EVENT)) {log.info("SimpleWsIdleHandler channelIdle 5 秒未收到客户端消息,强制关闭: {}", ctx.channel().id());//todo offlineoffline(ctx.channel());}} else if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {String attr = NettyUtil.getAttr(ctx.channel(), NettyUtil.G_U);if (StrUtil.isBlank(attr)) {ctx.writeAndFlush("参数异常");offline(ctx.channel());} else {//todo 可以做用户认证等等//记录用户登陆sessionNettySimpleSession session = NettyUtil.getSession(ctx.channel());Assert.notNull(session, "session 不能为空");SessionRegistry.getInstance().addSession(session);}}super.userEventTriggered(ctx,evt);}/*** 用户下线,处理失效 session* @param channel*/public void offline(Channel channel){NettySimpleSession session = NettyUtil.getSession(channel);if (session!=null){SessionRegistry.getInstance().removeSession(session);}channel.close();}}

PushService

推送服务抽取

public interface PushService {/*** 组推送* @param messageDto*/void pushGroupMessage(MessageDto messageDto);}@Service
public class PushServiceImpl implements PushService {@Autowiredprivate MessageClient messagingClient;@Overridepublic void pushGroupMessage(MessageDto messageDto) {messagingClient.sendMessage(messageDto);}
}

NettySimpleSession

netty session 封装

public class NettySimpleSession extends AbstractWsSession {private Channel channel;public NettySimpleSession(String id, String group, String identity, Channel channel) {super(id, group, identity);this.channel = channel;}@Overridepublic void sendTextMessage(MessageDto messageDto) {String content = messageDto.getFromUser() + " say: " + messageDto.getContent();// 不能直接 write content, channel.writeAndFlush(content);// 要封装成 websocketFrame,不然不能编解码!!!channel.writeAndFlush(new TextWebSocketFrame(content));}
}

启动类

@Slf4j
@SpringBootApplication
public class DemoApplication {public static void main(String[] args) {SpringApplication.run(DemoApplication.class, args);}@Beanpublic SimpleWsHandler simpleWsHandler(){return new SimpleWsHandler();}@PostConstructpublic void init() {new Thread(() -> {log.info(">>>>>>>> start netty ws server....");try {new SimpleNettyWebsocketServer(simpleWsHandler()).start(8881);} catch (InterruptedException e) {log.info(">>>>>>>> SimpleNettyWebsocketServer start error", e);}}).start();}}

其他代码参考 spring boot 实现直播聊天室

测试

websocket 地址 ws://127.0.0.1:8881/n/ws?groupId=1&username=tom

在这里插入图片描述

good luck!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/223046.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

安装DevEco Studio

下载 首先进入鸿蒙开发者官网&#xff0c;顶部导航栏选择开发->DevEco Studio 根据操作系统下载不同版本&#xff0c;其中Mac(X86)为英特尔芯片&#xff0c;Mac(ARM)为M芯片。 安装 下载完毕后&#xff0c;开始安装。 点击Agree 首次使用&#xff0c;请选择Do not impor…

Vue 详细教程

Vue实战 1. Vue 引言 渐进式 JavaScript 框架 --摘自官网 官网地址&#xff1a;Vue.js - 渐进式 JavaScript 框架 | Vue.js # 渐进式 1. 易用 html css javascript 2. 高效 开发前端页面 非常高效 3. 灵活 开发灵活 多样性 # 总结 Vue 是一个javascript 框架 js 简化页面js操作…

Pr自动从视频脚本剪辑视频FirstCut插件免费下载

FirstCut 插件将自动从视频脚本中剪辑视频&#xff0c;在例如新闻、采访、自媒体视频等带有配音或字幕内容的视频制作中提高了粗剪效率。 使用 FirstCut&#xff0c;大大缩短了粗剪的时间&#xff0c;而不是转到每个视频文件并找到 IN 点和 OUT 点&#xff0c;然后将其插入到序…

jmeter,读取CSV文件数据的循环控制

1、构造csv数据 保存文件时需要注意文件的编码格式 id,name,limit,status,address,start_time 100,小米100,1000,1,某某会展中心101,2023/8/20 14:20 101,小米101,1001,1,某某会展中心102,2023/8/21 14:20 2、在线程组下添加【CSV数据文件设置】元件 3、CSV文件数据的循环控…

数据库动态视图和存储过程报表数据管理功能设计

需求&#xff1a;需要将ERP的报表数据挪到OA中&#xff0c;但是OA表单设计不支持存储过程动态传参&#xff0c;所以需要设计一个系统&#xff0c;可以手动配置&#xff0c;动态显示原本ERP的报表数据&#xff0c;ERP报表是存在数据库的视图和存储过程中 思路&#xff1a;因为E…

c#按照时间进行数据存储(不用数据库)

概要介绍 按照日期生成文件夹&#xff0c;按照时间生成文件名&#xff0c;存储字符串。 可以用于简单数据记录&#xff08;如果数据存储考虑格式文本&#xff0c;保存为csv格式&#xff09; 实现效果 调用方法 SaveText.saveStr("测试字符串"DateTime.Now.ToStrin…

高效排队,紧急响应:RabbitMQ Priority Queue全面指南【RabbitMQ 九】

欢迎来到我的博客&#xff0c;代码的世界里&#xff0c;每一行都是一个故事 高效排队&#xff0c;紧急响应&#xff1a;RabbitMQ Priority Queue全面指南 引言前言第一&#xff1a;初识RabbitMQ Priority Queue插件插件的背景和目的&#xff1a;为什么需要消息优先级&#xff1…

我的NPI项目之Android 安全系列 -- Google Wallet and Secure Element(SE)

随着电子支付的兴起&#xff0c;越来越多的支付方式出现在我们的生活中。其中就有基于NFC的“碰一碰”的支付&#xff0c;支付宝的“扫一扫”支付&#xff0c;我们还知道有Google Pay(Wallet), Apple Pay(Wallet)。作为Android BSP的开发者&#xff0c;我比较关心的是Google Pa…

Processon的使用以及流程图的绘制

目录 一、ProcessOn 1.2 官方网站 门诊流程图 会议OA流程图 药库采购入库流程图 ​住院流程图 二、Axure自定义元件库 2.1 新建元件库 2.2 自定义元件 2.3 添加元件库 一、ProcessOn ProcessOn是一款在线的流程图、思维导图、组织结构图、网络拓扑图等多种图表类型…

2020年第九届数学建模国际赛小美赛A题自由泳解题全过程文档及程序

2020年第九届数学建模国际赛小美赛 A题 自由泳 原题再现&#xff1a; 在所有常见的游泳泳姿中&#xff0c;哪一种最快&#xff1f;哪个冲程推力最大&#xff1f;在自由泳项目中&#xff0c;游泳者可以选择他们的泳姿&#xff0c;他们通常选择前面的爬行。然而&#xff0c;游泳…

Java基础面试题小结

基础面试题 Java语言简介 Java是1995年由sun公司推出的一门高级语言&#xff0c;该语言具备如下特点: 简单易学&#xff0c;相较于C语言和C&#xff0c;没有指针的概念&#xff0c;所以操作和使用是会相对容易一些。平台无关性&#xff0c;即Java程序可以通过Java虚拟机在不…

MongoDB 与 Python 的交互

文章目录 第1关&#xff1a;MongoDB 与 Python 的交互 第1关&#xff1a;MongoDB 与 Python 的交互 编程要求 根据提示&#xff0c;在右侧编辑器 Begin-End 处补充代码&#xff0c;完成右侧程序。 测试说明 点击评测&#xff0c;平台会对你编写的代码进行测试。 import pymo…

tomcat优化

目录 一.tomcat的优化 二.nginxtomcat负载均衡、动静分离 三.nginx的反向代理类型 四.nginx的调度算法&#xff08;调度策略、负载均衡模式&#xff09; 五.nginx反向代理如何实现会话保持 一.tomcat的优化 tomcat的优化分为&#xff1a;系统优化&#xff0c;配置文件参数…

GoLong的学习之路,进阶,微服务之序列化协议,Protocol Buffers V3

这章是接上一章&#xff0c;使用RPC包&#xff0c;序列化中没有详细去讲&#xff0c;因为这一块需要看的和学习的地方很多。并且这一块是RPC中可以说是最重要的一块&#xff0c;也是性能的重要影响因子。今天这篇主要会讲其使用方式。 文章目录 Protocol Buffers V3 背景以及概…

RHEL7.5编译openssl1.1.1w源码包到rpm包

openssl1.1.1w下载地址 https://www.openssl.org/source/ 安装依赖包 yum -y install curl which make gcc perl perl-WWW-Curl rpm-build wget http://mirrors.aliyun.com/centos-vault/7.5.1804/os/x86_64/Packages/perl-WWW-Curl-4.15-13.el7.x86_64.rpm rpm -ivh pe…

JVM之堆学习

一、Java虚拟机内存结构图 二、堆的介绍 1. 前面学习的程序计数器&#xff0c;虚拟机栈和本地方法栈都是线程私有的&#xff0c;堆是线程共享的&#xff1b; 2. 通过 new 关键字&#xff0c;创建的对象都会使用堆内存&#xff0c;其特点是&#xff1a; 它是线程共享的&#x…

【每日一题】【面试经典150 | 动态规划】爬楼梯

Tag 【动态规划】【数组】 题目来源 70. 爬楼梯 题目解读 有过刷题「动态规划」刷题经验的读者都知道&#xff0c;爬楼梯问题是一种最典型也是最简单的动态规划问题了。 题目描述为&#xff1a;你每次可以爬 1 或者 2 个台阶&#xff0c;问爬上 n 阶有多少种方式。 解题思路…

智能优化算法应用:基于探路者算法3D无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于探路者算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于探路者算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.探路者算法4.实验参数设定5.算法结果6.参考文…

开源书籍—鸢尾花书:从加减乘除到机器学习系列 | 开源专题 No.50

Visualize-ML/Book1_Python-For-Beginners Stars: 2.4k License: NOASSERTION 《编程不难》是一本名为鸢尾花书的开源项目&#xff0c;它从基础的加减乘除开始&#xff0c;逐步引导读者进入机器学习领域。该项目提供了 PDF 草稿和 Jupyter 笔记&#xff0c;并经过至少两轮修改…

LED恒流调节器FP7125,应用LED街道照明、调光电源、汽车大灯、T5T8日光灯

目录 一、FP7125概述 二、FP7125功能 三、应用领域 近年来&#xff0c;随着人们环保意识的不断增强&#xff0c;LED照明产品逐渐成为照明行业的主流。而作为LED照明产品中的重要配件&#xff0c;LED恒流调节器FP7125的出现为LED照明带来了全新的发展机遇。 一、FP7125概述 FP…