netty与websockt实现聊天

配置websockt:


import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;/*** websocket配置*/
@Data
@Configuration
@ConfigurationProperties(prefix = "ws")
public class WsConfig {/*** websockt服务端口,不可和web服务同一个端口*/private Integer port=8779;/*** 心跳超时时间-单位-秒*/private Integer heartTimeout = 60;/*** 默认匹配的路径*/private String url="/";
}

返回实体载体:

import com.alibaba.fastjson2.JSON;
import lombok.Data;import java.io.Serializable;@Data
public class MsgBody implements Serializable {public enum MsgType{/*** 普通文字消息*/text,/*** 图片消息*/img,/*** 文件*/file,}public enum Type{/*** 自己*/self,/*** 别人*/other,}private Type type;private MsgType msgType;/*** 消息主体*/private String msgContent;public String toJson(){return JSON.toJSONString(this);}
}
import com.alibaba.fastjson2.JSON;import java.io.Serializable;
import java.util.LinkedHashMap;/*** websocket返回载体*/
public class WsBean extends LinkedHashMap<String,Object> implements Serializable {/*** 指定调用前端的回调函数*/public enum CallbackEm{/*** 通知回调函数*/notice,/*** 收到消息的回调*/receive_msg,}public WsBean(CallbackEm callbackEm,Object data) {super();this.put("code",callbackEm.name());this.put("data",data);}public static WsBean get(CallbackEm callbackEm){return new WsBean(callbackEm,"");}public static WsBean get(CallbackEm callbackEm,Object data){return new WsBean(callbackEm,data);}public WsBean setData(Object data){this.put("data",data);return this;}public WsBean set(String key,Object value){this.put(key,value);return this;}public String toJson(){return JSON.toJSONString(this);}}

netty通道:

import cn.hutool.core.map.BiMap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;/*** 这个类用来处理用户和连接的关联关系*/
@Slf4j
@Component
public class NioWebSocketChannelPool {/*** 用户保持连接*/private final DefaultChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);/*** 保持连接用户对应的长连接id,此为双向绑定*/private final BiMap<String, ChannelId> bindUserMap = new BiMap<>(new ConcurrentHashMap<>());/*** 新增一个客户端通道*/public void addChannel(Channel channel){channels.add(channel);}/*** 移除一个客户端通道* @param channel*/public void removeChannel(Channel channel){String mapKey = bindUserMap.getKey(channel.id());if (mapKey != null){bindUserMap.remove(mapKey);}channels.remove(channel);}/*** 绑定用户*/public void bindUser(String userId,Channel channel){bindUserMap.put(userId,channel.id());}/*** 向用户推送消息*/public void sendToUser(String userId,WsBean data){ChannelId channelId = bindUserMap.get(userId);if (channelId != null){channels.find(channelId).writeAndFlush(new TextWebSocketFrame(data.toJson()));}}public void sendToUser(String userId,MsgBody data){ChannelId channelId = bindUserMap.get(userId);if (channelId != null){channels.find(channelId).writeAndFlush(new TextWebSocketFrame(data.toJson()));}}public BiMap<String,ChannelId> getBindUserMap(){return bindUserMap;}/*** 群发推送消息*/public void writeAndFlush(WsBean data){Set<String> onlineIds = getBindUserMap().keySet();onlineIds.forEach(userId->{ChannelId channelId = bindUserMap.get(userId);if (channelId != null){channels.find(channelId).writeAndFlush(new TextWebSocketFrame(data.toJson()));}});}}

netty处理类:


import cn.hutool.core.util.StrUtil;
import com.xx.framework.config.WsConfig;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.websocketx.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;/*** 处理端*/
@Slf4j
@ChannelHandler.Sharable
@Component
public class NioWebSocketHandler extends SimpleChannelInboundHandler<WebSocketFrame> {@Resourceprivate WsConfig wsConfig;@Resourceprivate NioWebSocketChannelPool webSocketChannelPool;@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.info("客户端连接:{}",ctx.channel().id());webSocketChannelPool.addChannel(ctx.channel());super.channelActive(ctx);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.info("客户端端口连接:{}",ctx.channel().id());webSocketChannelPool.removeChannel(ctx.channel());super.channelInactive(ctx);}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.channel().flush();super.channelReadComplete(ctx);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info("客户端请求数据类型:{}",msg.getClass());if (msg instanceof FullHttpRequest){fullHttpRequestHandler(ctx,(FullHttpRequest)msg);}super.channelRead(ctx, msg);}/*** 处理连接请求,客户端websockt发送握手包时会执行第一次请求* @param ctx* @param request*/private void fullHttpRequestHandler(ChannelHandlerContext ctx, FullHttpRequest request) {String uri = request.getUri();Map<String,String> params = getParams(uri);log.info("客户端请求参数:{}",params);/*** 判断请求路径是否跟配置中的一致*/if (wsConfig.getUrl().equals(getBasePath(uri))){/*** 因为有可能携带了参数,导致客户端一致无法返回握手包,因此在校验通过后,重置请求路径*/request.setUri(wsConfig.getUrl());}else{ctx.close();}String userId = params.get("user_id");if (StrUtil.isBlank(userId)){log.info("用户ID为空,无法登录");return;}webSocketChannelPool.bindUser(userId,ctx.channel());}/*** 获取URI中参数以外部分路径* @param uri* @return*/private String getBasePath(String uri) {if (uri == null || uri.isEmpty()){return null;}int idx = uri.indexOf("?");if (idx  == -1){return uri;}return uri.substring(0,idx);}/*** 请路径参数转换成Map对象,如果路径参数出现重复参数名,将以最后的参数值为准* @param uri* @return*/private Map<String, String> getParams(String uri) {Map<String,String> params = new HashMap<>(10);int idx = uri.indexOf("?");if (idx != -1){String[] paramsArr = uri.substring(idx+1).split("&");for (String param:paramsArr){idx = param.indexOf("=");params.put(param.substring(0,idx),param.substring(idx+1));}}return params;}/*** 客户端发送断开请求处理*/private void closeWebSocketFrameHandler(ChannelHandlerContext ctx, CloseWebSocketFrame frame){ctx.close();}/*** 创建连接之后,客户端发送的消息都会在这里处理*/private void textWebSocketFrameHandler(ChannelHandlerContext ctx, TextWebSocketFrame frame){//客户端发送过来的内容不进行业务处理,原样返回,一般不做处理
//        log.info("收到客户端信息-channelId: {}, 消息内容: {}", ctx.channel().id(), frame.text());}/*** 处理客户端心跳包*/private void  pingWebSocketFrameHandler(ChannelHandlerContext ctx, PingWebSocketFrame frame){ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content().retain()));}@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, WebSocketFrame webSocketFrame) throws Exception {//根据请求数量类型进厂分发处理if (webSocketFrame instanceof PingWebSocketFrame){PingWebSocketFrame pingWebSocketFrame = (PingWebSocketFrame) webSocketFrame;pingWebSocketFrameHandler(channelHandlerContext,pingWebSocketFrame);}else if (webSocketFrame instanceof TextWebSocketFrame){TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) webSocketFrame;textWebSocketFrameHandler(channelHandlerContext,textWebSocketFrame);}else if (webSocketFrame instanceof CloseWebSocketFrame){CloseWebSocketFrame closeWebSocketFrame = (CloseWebSocketFrame) webSocketFrame;closeWebSocketFrameHandler(channelHandlerContext,closeWebSocketFrame);}}
}

netty服务类:

package com.xx.framework.ws;import com.xx.framework.config.WsConfig;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.Ordered;
import org.springframework.stereotype.Component;import javax.annotation.Resource;/*** 服务端*/
@Slf4j
@Component
public class NioWebSocketServer implements InitializingBean , DisposableBean, Ordered {@Resourceprivate WsConfig wsConfig;@Resourceprivate NioWebSocketHandler nioWebSocketHandler;private EventLoopGroup bossGroup;private EventLoopGroup workGroup;private ChannelFuture channelFuture;@Overridepublic void destroy() throws Exception {log.info("shutting down netty server....");if (bossGroup != null){bossGroup.shutdownGracefully().sync();}if (workGroup != null){workGroup.shutdownGracefully().sync();}if (channelFuture != null){channelFuture.channel().closeFuture().syncUninterruptibly();}log.info("netty server shutdown");}@Overridepublic void afterPropertiesSet() throws Exception {try{bossGroup = new NioEventLoopGroup();workGroup = new NioEventLoopGroup();ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.option(ChannelOption.SO_BACKLOG,1024).group(bossGroup,workGroup).channel(NioServerSocketChannel.class).localAddress(wsConfig.getPort()).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline = socketChannel.pipeline();pipeline.addLast(new HttpServerCodec()).addLast(new ChunkedWriteHandler()).addLast(new HttpObjectAggregator(8192)).addLast(nioWebSocketHandler).addLast(new WebSocketServerProtocolHandler(wsConfig.getUrl(),null,true));}});channelFuture = serverBootstrap.bind().sync();}finally {if (channelFuture != null && channelFuture.isSuccess()){log.info("netty server startup on port:{} (websockt)with context path '{}'",wsConfig.getPort(),"/");}else{log.info("netty server startup failed");if (bossGroup != null){bossGroup.shutdownGracefully().sync();}if (workGroup != null){workGroup.shutdownGracefully().sync();}}}}@Overridepublic int getOrder() {return 0;}
}

以上代码serviceImpl应用:

/*** 新增聊天记录** @param chatRecord 聊天记录* @return 结果*/@Overridepublic int insertChatRecord(ChatRecord chatRecord) {MsgBody msgBody = new MsgBody();if (StrUtil.isNotBlank(chatRecord.getMsgType())){if (chatRecord.getMsgType().equals(MsgTypeEnum.TEXT.getMsgType())){msgBody.setMsgType(MsgBody.MsgType.text);}else if (chatRecord.getMsgType().equals(MsgTypeEnum.IMG.getMsgType())){msgBody.setMsgType(MsgBody.MsgType.img);}}msgBody.setMsgContent(chatRecord.getMsgContent());chatRecord.setMsgContent(JSON.toJSONString(msgBody));chatRecord.setId(IdUtils.getLongId());chatRecord.setCreateTime(DateUtils.getNowDate());int insert = chatRecordMapper.insertChatRecord(chatRecord);if(insert > 0){msgBody.setType(MsgBody.Type.self);//通知浏览器用户            webSocketChannelPool.sendToUser(chatRecord.getSendUserId().toString(),msgBody);msgBody.setType(MsgBody.Type.other);webSocketChannelPool.sendToUser(chatRecord.getReceiveUserId().toString(),msgBody);}return insert;}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/57368.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PHP获取两个日期之间的所有日期

下面是一个示例代码&#xff0c;用于计算给定开始和结束日期之间的所有日期&#xff1a; <?phpfunction getDatesBetween($start_date, $end_date) {// 初始化结果数组$dates array();// 将开始日期转换为时间戳$current_date strtotime($start_date);$end_date strtot…

Java并发编程第6讲——线程池(万字详解)

Java中的线程池是运用场景最多的并发框架&#xff0c;几乎所有需要异步或并发执行任务的程序都可以使用线程池&#xff0c;本篇文章就详细介绍一下。 一、什么是线程池 定义&#xff1a;线程池是一种用于管理和重用线程的技术&#xff08;池化技术&#xff09;&#xff0c;它主…

微服务中间件--分布式搜索ES

分布式搜索ES 11.分布式搜索 ESa.介绍ESb.IK分词器c.索引库操作 (类似于MYSQL的Table)d.查看、删除、修改 索引库e.文档操作 (类似MYSQL的数据)1) 添加文档2) 查看文档3) 删除文档4) 修改文档 f.RestClient操作索引库1) 创建索引库2) 删除索引库/判断索引库 g.RestClient操作文…

http协议与apache

http概念&#xff1a; 互联网&#xff1a;是网络的网络&#xff0c;是所有类型网络的母集 因特网&#xff1a;世界上最大的互联网网络。即因特网概念从属于互联网概念 万维网&#xff1a;万维网并非某种特殊的计算机网络&#xff0c;是一个大规模的、联机式的信息贮藏库&…

C++11---std::bind

下面这段代码解析 std::function<decltype(f(args...))()> func std::bind(std::forward<F>(f), std::forward<Args>(args)...); 这行代码的作用是创建一个 std::function 对象 func&#xff0c;将其绑定到一个可调用对象上。 让我们逐步解释这行代码的各…

长胜证券:沪指探底回升涨0.47%,券商、酿酒板块拉升,传媒板块活跃

24日早盘&#xff0c;沪指盘中震动回落&#xff0c;接近午盘快速拉升走高&#xff1b;深成指、创业板指强势上扬&#xff1b;北向资金今天转向&#xff0c;早盘积极出场&#xff0c;半日净买入近30亿元。 到午间收盘&#xff0c;沪指涨0.47%报3092.88点&#xff0c;深成指涨1.1…

最新AI创作系统ChatGPT源码+详细图文部署教程/支持GPT-4/AI绘画/H5端/Prompt知识库/思维导图生成

一、AI系统 如何搭建部署AI创作ChatGPT系统呢&#xff1f;小编这里写一个详细图文教程吧&#xff01;SparkAi使用Nestjs和Vue3框架技术&#xff0c;持续集成AI能力到AIGC系统&#xff01; 1.1 程序核心功能 程序已支持ChatGPT3.5/GPT-4提问、AI绘画、Midjourney绘画&#xf…

Django(8)-静态资源引用CSS和图片

除了服务端生成的 HTML 以外&#xff0c;网络应用通常需要一些额外的文件——比如图片&#xff0c;脚本和样式表——来帮助渲染网络页面。在 Django 中&#xff0c;我们把这些文件统称为“静态文件”。 我们使用static文件来存放静态资源&#xff0c;django会在每个 INSTALLED…

Vue——axios的二次封装

文章目录 一、请求和传递参数1、get 请求2、post 请求3、axios 请求配置 二、axios 的二次封装1、配置拦截器2、发送请求 三、API 的解耦1、配置文件对应的请求2、获取请求的数据 四、总结 一、请求和传递参数 在 Vue 中&#xff0c;发送请求一般在 created 钩子中&#xff0c…

LiveGBS伴侣

【1】LiveGBS 简介 LiveGBS是一套支持国标(GB28181)流媒体服务软件。 国标无插件;提供用户管理及Web可视化页面管理&#xff1b; 提供设备状态管理&#xff0c;可实时查看设备是否掉线等信息&#xff1b; 实时流媒体处理&#xff0c;PS&#xff08;TS&#xff09;转ES&…

githubssh配置

GitHub SSH配置是用来将本地计算机与GitHub服务器之间建立安全连接的一种方法。它允许用户通过SSH密钥进行身份验证&#xff0c;从而实现无需每次都输入用户名和密码的登录过程。 以下是在Windows环境下配置GitHub SSH的步骤&#xff1a; 首先&#xff0c;在本地计算机上打开…

GFPGAN 集成Flask 接口化改造

GFPGAN是一款腾讯开源的人脸高清修复模型&#xff0c;基于github上提供的demo&#xff0c;可以简单的集成Flask以实现功能接口化。 GFPGAN的安装&#xff0c;Flask的安装请参见其他文章。 如若使用POSTMAN进行测试&#xff0c;需使用POST方式&#xff0c;form-data的请求体&am…

5G 数字乡村数字农业农村大数据中心项目农业大数据建设方案PPT

导读&#xff1a;原文《5G 数字乡村数字农业农村大数据中心项目农业大数据建设方案PPT》&#xff08;获取来源见文尾&#xff09;&#xff0c;本文精选其中精华及架构部分&#xff0c;逻辑清晰、内容完整&#xff0c;为快速形成售前方案提供参考。以下是部分内容&#xff0c; 喜…

TCP协议的重点知识点

TCP协议的重点知识点 TCP(传输控制协议)是一种面向连接、可靠的数据传输协议,工作在传输层,提供可靠的字节流服务。它是互联网协议栈中最重要、最复杂的协议之一,也是面试中常被问到的知识点。本文将详细介绍TCP协议的各个重要概念。 TCP基本特性 TCP主要具有以下基本特性: …

云原生周刊:CNCF 宣布 KEDA 毕业 | 2023.8.28

开源项目推荐 KDash KDash 是一个用 Rust 构建的简单快速的 Kubernetes 仪表板。它提供了一个终端界面&#xff0c;用于监视和管理 Kubernetes 集群。该仪表板具有多种功能&#xff0c;包括节点指标、资源监视、自定义资源定义、容器日志流式传输、上下文切换等。它还支持不同…

Django(9)-表单处理

django支持使用类创建表单实例 polls/forms.py from django import forms class NameForm(forms.Form):your_nameforms.CharField(label"Your name",max_length100)这个类创建了一个属性&#xff0c;定义了一个文本域&#xff0c;和它的label和最大长度。 polls/vi…

浅析Linux SCSI子系统:设备管理

文章目录 概述设备管理数据结构scsi_host_template&#xff1a;SCSI主机适配器模板scsi_host&#xff1a;SCSI主机适配器主机适配器支持DIF scsi_target&#xff1a;SCSI目标节点scsi_device&#xff1a;SCSI设备 添加主机适配器构建sysfs目录 添加SCSI设备挂载LunIO请求队列初…

二、Tomcat 安装集

一、Tomcat—Docker 1. 拉取镜像 # 1、拉取镜像&#xff08;tomcat版本8&#xff0c;jre版本8&#xff09;。 docker pull tomcat:8-jre82. 启动容器 # 2、启动一个tomcat容器。 docker run -id --name tomcat -p 8080:8080 镜像ID # 3、宿主机里新建/root/tomcat目录&#x…

从零开始的Hadoop学习(三)| 集群分发脚本xsync

1. Hadoop目录结构 bin目录&#xff1a;存放对Hadoop相关服务&#xff08;hdfs&#xff0c;yarn&#xff0c;mapred&#xff09;进行操作的脚本etc目录&#xff1a;Hadoop的配置文件目录&#xff0c;存放Hadoop的配置文件lib目录&#xff1a;存放Hadoop的本地库&#xff08;对…

华为云Stack的学习(三)

四、华为云Stack公共组件 1.华为云Stack公共负载均衡方案介绍 1.1 LVS原理 LVS是四层负载均衡&#xff0c;建立在OSI模型的传输层之上&#xff0c;所以效率非常高。 LVS有两种转发模式&#xff1a; NAT模式的转发主要通过修改IP地址&#xff08;位于OSI模型的第三层网络层&…