最近想学学WebSocket做一个实时通讯的练手项目
主要用到的技术栈是WebSocket Netty Vue Pinia MySQL SpringBoot,实现一个持久化数据,单一群聊,支持多用户的聊天界面
下面是实现的过程
后端
SpringBoot启动的时候会占用一个端口,而Netty也会占用一个端口,这两个端口不能重复,并且因为Netty启动后会阻塞当前线程,因此需要另开一个线程防止阻塞住SpringBoot
1. 编写Netty服务器
个人认为,Netty最关键的就是channel,可以代表一个客户端
我在这使用的是@PostConstruct注解,在Bean初始化后调用里面的方法,新开一个线程运行Netty,因为希望Netty受Spring管理,所以加上了spring的注解,也可以直接在启动类里注入Netty然后手动启动
@Service
public class NettyService {private EventLoopGroup bossGroup = new NioEventLoopGroup(1);private EventLoopGroup workGroup = new NioEventLoopGroup();@Autowiredprivate WebSocketHandler webSocketHandler;@Autowiredprivate HeartBeatHandler heartBeatHandler;@PostConstructpublic void initNetty() throws BaseException {new Thread(()->{try {start();} catch (Exception e) {throw new RuntimeException(e);}}).start();}@PreDestroypublic void destroy() throws BaseException {bossGroup.shutdownGracefully();workGroup.shutdownGracefully();}@Asyncpublic void start() throws BaseException {try {ChannelFuture channelFuture = new ServerBootstrap().group(bossGroup, workGroup).channel(NioServerSocketChannel.class).handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {nioSocketChannel.pipeline()
// http解码编码器.addLast(new HttpServerCodec())
// 处理完整的 HTTP 消息.addLast(new HttpObjectAggregator(64 * 1024))
// 心跳检测时长.addLast(new IdleStateHandler(300, 0, 0, TimeUnit.SECONDS))
// 心跳检测处理器.addLast(heartBeatHandler)
// 支持ws协议(自定义).addLast(new WebSocketServerProtocolHandler("/ws",null,true,64*1024,true,true,10000))
// ws请求处理器(自定义).addLast(webSocketHandler);}}).bind(8081).sync();System.out.println("Netty启动成功");ChannelFuture future = channelFuture.channel().closeFuture().sync();}catch (InterruptedException e){throw new InterruptedException ();}finally {
//优雅关闭bossGroup.shutdownGracefully();workGroup.shutdownGracefully();}}
}
服务器类只是指明一些基本信息,包含处理器类,支持的协议等等,具体的处理逻辑需要再自定义类来实现
2. 心跳检测处理器
心跳检测是指 服务器无法主动确定客户端的状态(用户可能关闭了网页,但是服务端没办法知道),为了确定客户端是否在线,需要客户端定时发送一条消息,消息内容不重要,重要的是发送消息代表该客户端仍然在线,当客户端长时间没有发送数据时,代表客户端已经下线
package org.example.payroll_management.websocket.netty.handler;@Component
@ChannelHandler.Sharable
public class HeartBeatHandler extends ChannelDuplexHandler {@Autowiredprivate ChannelContext channelContext;private static final Logger logger = LoggerFactory.getLogger(HeartBeatHandler.class);@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent){// 心跳检测超时IdleStateEvent e = (IdleStateEvent) evt;logger.info("心跳检测超时");if (e.state() == IdleState.READER_IDLE){Attribute<Integer> attr = ctx.channel().attr(AttributeKey.valueOf(ctx.channel().id().toString()));Integer userId = attr.get();// 读超时,当前已经下线,主动断开连接ChannelContext.removeChannel(userId);ctx.close();} else if (e.state() == IdleState.WRITER_IDLE){ctx.writeAndFlush("心跳检测");}}super.userEventTriggered(ctx, evt);}
}
3. webSocket处理器
当客户端发送消息,消息的内容会发送当webSocket处理器中,可以对对应的方法进行处理,我这里偷懒了,就做了一个群组,全部用户只能在同一群中聊天,不过创建多个群组,或单对单聊天也不复杂,只需要将群组的ID进行保存就可以
这里就产生第一个问题了,就是SpringMVC的拦截器不会拦截其他端口的请求,解决方法是将token放置到请求参数中,在userEventTriggered方法中重新进行一次token检验
第二个问题,我是在拦截器中通过ThreadLocal保存用户ID,不走拦截器在其他地方拿不到用户ID,解决方法是,在userEventTriggered方法中重新保存,或者channel中可以保存附件(自身携带的数据),直接将id保存到附件中
第三个问题,消息的持久化,当用户重新打开界面时,肯定希望消息仍然存在,鉴于webSocket的实时性,数据持久化肯定不能在同一个线程中完成,我在这使用BlockingQueue+线程池完成对消息的异步保存,或者也可以用mq实现
不过用的Executors.newSingleThreadExecutor();可能会产生OOM的问题,后面可以自定义一个线程池,当任务满了之后,指定拒绝策略为抛出异常,再通过全局异常捕捉拿到对应的数据保存到数据库中,不过俺这种小项目应该不会产生这种问题
第四个问题,消息内容,这个需要前后端统一一下,确定一下传输格式就OK了,然后从JSON中取出数据处理
最后就是在线用户统计,这个没什么好说的,里面有对应的方法,当退出时,直接把channel踢出去就可以了
package org.example.payroll_management.websocket.netty.handler;@Component
@ChannelHandler.Sharable
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {@Autowiredprivate ChannelContext channelContext;@Autowiredprivate MessageMapper messageMapper;@Autowiredprivate UserService userService;private static final Logger logger = LoggerFactory.getLogger(WebSocketHandler.class);private static final BlockingQueue<WebSocketMessageDto> blockingQueue = new ArrayBlockingQueue(1024 * 1024);private static final ExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadExecutor();// 提交线程@PostConstructprivate void init(){EXECUTOR_SERVICE.submit(new MessageHandler());}private class MessageHandler implements Runnable{// 异步保存@Overridepublic void run() {while(true){WebSocketMessageDto message = null;try {message = blockingQueue.take();logger.info("消息持久化");} catch (InterruptedException e) {throw new RuntimeException(e);}Integer success = messageMapper.saveMessage(message);if (success < 1){try {throw new BaseException("保存信息失败");} catch (BaseException e) {throw new RuntimeException(e);}}}}}// 当读事件发生时(有客户端发送消息)@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {Channel channel = channelHandlerContext.channel();// 收到的消息String text = textWebSocketFrame.text();Attribute<Integer> attr = channelHandlerContext.channel().attr(AttributeKey.valueOf(channelHandlerContext.channel().id().toString()));Integer userId = attr.get();logger.info("接收到用户ID为 {} 的消息: {}",userId,text);// TODO 将text转成JSON,提取里面的数据WebSocketMessageDto webSocketMessage = JSONUtil.toBean(text, WebSocketMessageDto.class);if (webSocketMessage.getType().equals("心跳检测")){logger.info("{}发送心跳检测",userId);}else if (webSocketMessage.getType().equals("群发")){ChannelGroup channelGroup = ChannelContext.getChannelGroup(null);WebSocketMessageDto messageDto = JSONUtil.toBean(text, WebSocketMessageDto.class);WebSocketMessageDto webSocketMessageDto = new WebSocketMessageDto();webSocketMessageDto.setType("群发");webSocketMessageDto.setText(messageDto.getText());webSocketMessageDto.setReceiver("all");webSocketMessageDto.setSender(String.valueOf(userId));webSocketMessageDto.setSendDate(TimeUtil.timeFormat("yyyy-MM-dd"));blockingQueue.add(webSocketMessageDto);channelGroup.writeAndFlush(new TextWebSocketFrame(JSONUtil.toJsonPrettyStr(webSocketMessageDto)));}else{channel.writeAndFlush("请发送正确的格式");}}// 建立连接后触发(有客户端建立连接请求)@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {logger.info("建立连接");super.channelActive(ctx);}// 连接断开后触发(有客户端关闭连接请求)@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {Attribute<Integer> attr = ctx.channel().attr(AttributeKey.valueOf(ctx.channel().id().toString()));Integer userId = attr.get();logger.info("用户ID:{} 断开连接",userId);ChannelGroup channelGroup = ChannelContext.getChannelGroup(null);channelGroup.remove(ctx.channel());ChannelContext.removeChannel(userId);WebSocketMessageDto webSocketMessageDto = new WebSocketMessageDto();webSocketMessageDto.setType("用户变更");List<OnLineUserVo> onlineUser = userService.getOnlineUser();webSocketMessageDto.setText(JSONUtil.toJsonStr(onlineUser));webSocketMessageDto.setReceiver("all");webSocketMessageDto.setSender("0");webSocketMessageDto.setSendDate(TimeUtil.timeFormat("yyyy-MM-dd"));channelGroup.writeAndFlush(new TextWebSocketFrame(JSONUtil.toJsonStr(webSocketMessageDto)));super.channelInactive(ctx);}// 建立连接后触发(客户端完成连接)@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete){WebSocketServerProtocolHandler.HandshakeComplete handshakeComplete = (WebSocketServerProtocolHandler.HandshakeComplete) evt;String uri = handshakeComplete.requestUri();logger.info("uri: {}",uri);String token = getToken(uri);if (token == null){logger.warn("Token校验失败");ctx.close();throw new BaseException("Token校验失败");}logger.info("token: {}",token);Integer userId = null;try{Claims claims = JwtUtil.extractClaims(token);userId = Integer.valueOf((String) claims.get("userId"));}catch (Exception e){logger.warn("Token校验失败");ctx.close();throw new BaseException("Token校验失败");}// 向channel中的附件中添加用户IDchannelContext.addContext(userId,ctx.channel());ChannelContext.setChannel(userId,ctx.channel());ChannelContext.setChannelGroup(null,ctx.channel());ChannelGroup channelGroup = ChannelContext.getChannelGroup(null);WebSocketMessageDto webSocketMessageDto = new WebSocketMessageDto();webSocketMessageDto.setType("用户变更");List<OnLineUserVo> onlineUser = userService.getOnlineUser();webSocketMessageDto.setText(JSONUtil.toJsonStr(onlineUser));webSocketMessageDto.setReceiver("all");webSocketMessageDto.setSender("0");webSocketMessageDto.setSendDate(TimeUtil.timeFormat("yyyy-MM-dd"));channelGroup.writeAndFlush(new TextWebSocketFrame(JSONUtil.toJsonStr(webSocketMessageDto)));}super.userEventTriggered(ctx, evt);}private String getToken(String uri){if (uri.isEmpty()){return null;}if(!uri.contains("token")){return null;}String[] split = uri.split("\\?");if (split.length!=2){return null;}String[] split1 = split[1].split("=");if (split1.length!=2){return null;}return split1[1];}
}
4. 工具类
主要用来保存用户信息的
不要问我为什么又有static又有普通方法,问就是懒得改,这里我直接保存的同一个群组,如果需要多群组的话,就需要建立SQL数据了
package org.example.payroll_management.websocket;@Component
public class ChannelContext {private static final Map<Integer, Channel> USER_CHANNEL_MAP = new ConcurrentHashMap<>();private static final Map<Integer, ChannelGroup> USER_CHANNELGROUP_MAP = new ConcurrentHashMap<>();private static final Integer GROUP_ID = 10086;private static final Logger logger = LoggerFactory.getLogger(ChannelContext.class);public void addContext(Integer userId,Channel channel){String channelId = channel.id().toString();AttributeKey attributeKey = null;if (AttributeKey.exists(channelId)){attributeKey = AttributeKey.valueOf(channelId);} else{attributeKey = AttributeKey.newInstance(channelId);}channel.attr(attributeKey).set(userId);}public static List<Integer> getAllUserId(){return new ArrayList<>(USER_CHANNEL_MAP.keySet());}public static void setChannel(Integer userId,Channel channel){USER_CHANNEL_MAP.put(userId,channel);}public static Channel getChannel(Integer userId){return USER_CHANNEL_MAP.get(userId);}public static void removeChannel(Integer userId){USER_CHANNEL_MAP.remove(userId);}public static void setChannelGroup(Integer groupId,Channel channel){if(groupId == null){groupId = GROUP_ID;}ChannelGroup channelGroup = USER_CHANNELGROUP_MAP.get(groupId);if (channelGroup == null){channelGroup =new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);USER_CHANNELGROUP_MAP.put(GROUP_ID, channelGroup);}if (channel == null){return ;}channelGroup.add(channel);logger.info("向group中添加channel,ChannelGroup已有Channel数量:{}",channelGroup.size());}public static ChannelGroup getChannelGroup(Integer groupId){if (groupId == null){groupId = GROUP_ID;}return USER_CHANNELGROUP_MAP.get(groupId);}public static void removeChannelGroup(Integer groupId){if (groupId == null){groupId = GROUP_ID;}USER_CHANNELGROUP_MAP.remove(groupId);}
}
写到这里,Netty服务就搭建完成了,后面就可以等着前端的请求建立了
前端
前端我使用的vue,因为我希望当用户登录后自动建立ws连接,所以我在登录成功后添加上了ws建立请求,然后我发现,如果用户关闭网页后重新打开,因为跳过了登录界面,ws请求不会自动建立,所以需要一套全局的ws请求
不过我前端不是很好(其实后端也一般),所以很多地方肯定有更优的写法
1. pinia
使用pinia保存ws请求,方便在其他组件中调用
定义WebSocket实例(ws)和一个请求建立判断(wsConnect)
后面就可以通过ws接收服务的消息
import { defineStore } from 'pinia'export const useWebSocketStore = defineStore('webSocket', {state() {return {ws: null,wsConnect: false,}},actions: {wsInit() {if (this.ws === null) {const token = localStorage.getItem("token")if (token === null) return;this.ws = new WebSocket(`ws://localhost:8081/ws?token=${token}`)this.ws.onopen = () => {this.wsConnect = true;console.log("ws协议建立成功")// 发送心跳const intervalId = setInterval(() => {if (!this.wsConnect) {clearInterval(intervalId)}const webSocketMessageDto = {type: "心跳检测"}this.sendMessage(JSON.stringify(webSocketMessageDto));}, 1000 * 3 * 60);}this.ws.onclose = () => {this.ws = null;this.wsConnect = false;}}},sendMessage(message) {if (message == null || message == '') {return;}if (!this.wsConnect) {console.log("ws协议没有建立")this.wsInit();}this.ws.send(message);},wsClose() {if (this.wsConnect) {this.ws.close();this.wsConnect = false;}}}
})
然后再app.vue中循环建立连接(建立请求重试)
const wsConnect = function () {const token = localStorage.getItem("token")if (token === null) {return;}try {if (!webSocket.wsConnect) {console.log("尝试建立ws请求")webSocket.wsInit();} else {return;}} catch {wsConnect();}}
2. 聊天组件
界面相信大伙都会画,主要说一下我遇到的问题
第一个 上拉刷新,也就是加载历史记录的功能,我用的element-plus UI,也不知道是不是我的问题,UI里面的无限滚动不是重复发送请求就是无限发送请求,而且好像没有上拉加载的功能。于是我用了IntersectionObserver来解决,在页面底部加上一个div,当观察到这个div时,触发请求
第二个 滚动条到达顶部时,请求数据并放置数据,滚动条会自动滚动到顶部,并且由于观察的元素始终在顶端导致无限请求,这个其实也不是什么大问题,因为聊天的消息是有限的,没有数据之后我设置了停止观察,主要是用户体验不是很好。这是我是添加了display: flex; flex-direction: column-reverse;解决这个问题的(flex很神奇吧)。大致原理好像是垂直翻转了(例如上面我将观察元素放到div第一个子元素位置,添加flex后观察元素会到最后一个子元素位置上),也就是说当滚动条在最底部时,添加数据后,滚动条会自动滚动到最底部,不过这样体验感非常的不错
不要问我为什么数据要加 || 问就是数据懒得统一了
<style lang="scss" scoped>.chatBox {border-radius: 20px;box-shadow: rgba(0, 0, 0, 0.05) -2px 0px 8px 0px;width: 1200px;height: 600px;background-color: white;display: flex;.chat {width: 1000px;height: inherit;.chatBackground {height: 500px;overflow: auto;display: flex;flex-direction: column-reverse;.loading {text-align: center;font-size: 12px;margin-top: 20px;color: gray;}.chatItem {width: 100%;padding-bottom: 20px;.avatar {margin-left: 20px;display: flex;align-items: center;.username {margin-left: 10px;color: rgb(153, 153, 153);font-size: 13px;}}.chatItemMessage {margin-left: 60px;padding: 10px;font-size: 14px;width: 200px;word-break: break-all;max-width: 400px;line-height: 25px;width: fit-content;border-radius: 10px;height: auto;/* background-color: skyblue; */box-shadow: rgba(0, 0, 0, 0.05) -2px 0px 8px 0px;}.sendDate {font-size: 12px;margin-top: 10px;margin-left: 60px;color: rgb(187, 187, 187);}}}.chatBottom {height: 100px;background-color: #F3F3F3;border-radius: 20px;display: flex;box-shadow: rgba(0, 0, 0, 0.05) -2px 0px 8px 0px;.messageInput {border-radius: 20px;width: 400px;height: 40px;}}}.userList {width: 200px;height: inherit;border-radius: 20px;box-shadow: rgba(0, 0, 0, 0.05) -2px 0px 8px 0px;.user {width: inherit;height: 50px;line-height: 50px;text-indent: 2em;border-radius: 20px;transition: all 0.5s ease;}}}.user:hover {box-shadow: rgba(0, 0, 0, 0.05) -2px 0px 8px 0px;transform: translateX(-5px) translateY(-5px);}
</style><template>{{hasMessage}}<div class="chatBox"><div class="chat"><div class="chatBackground" ref="chatBackgroundRef"><div class="chatItem" v-for="i in messageList"><div class="avatar"><el-avatar :size="40" :src="imageUrl" /><div class="username">{{i.username || i.userId}}</div></div><div class="chatItemMessage">{{i.text || i.content}}</div><div class="sendDate">{{i.date || i.sendDate}}</div></div><div class="loading" ref="loading">显示更多内容</div></div><div class="chatBottom"><el-input class="messageInput" v-model="message" placeholder="消息内容"></el-input><el-button @click="sendMessage">发送消息</el-button></div></div><!-- 做成无限滚动 --><div class="userList"><div v-for="user in userList"><div class="user">{{user.userName}}</div></div></div></div>
</template><script setup>import { ref, onMounted, nextTick } from 'vue'import request from '@/utils/request.js'import { useWebSocketStore } from '@/stores/useWebSocketStore'import imageUrl from '@/assets/默认头像.jpg'const webSocketStore = useWebSocketStore();const chatBackgroundRef = ref(null)const userList = ref([])const message = ref('')const messageList = ref([])const loading = ref(null)const page = ref(1);const size = 10;const hasMessage = ref(true);const observer = new IntersectionObserver((entries, observer) => {entries.forEach(async entry => {if (entry.isIntersecting) {observer.unobserve(entry.target)await pageQueryMessage();}})})onMounted(() => {observer.observe(loading.value)getOnlineUserList();if (!webSocketStore.wsConnect) {webSocketStore.wsInit();}const ws = webSocketStore.ws;ws.onmessage = async (e) => {// console.log(e);const webSocketMessage = JSON.parse(e.data);const messageObj = {username: webSocketMessage.sender,text: webSocketMessage.text,date: webSocketMessage.sendDate,type: webSocketMessage.type}console.log("###")// console.log(JSON.parse(messageObj.text))if (messageObj.type === "群发") {messageList.value.unshift(messageObj)} else if (messageObj.type === "用户变更") {userList.value = JSON.parse(messageObj.text)}await nextTick();// 当发送新消息时,自动滚动到页面最底部,可以替换成消息提示的样式// chatBackgroundRef.value.scrollTop = chatBackgroundRef.value.scrollHeight;console.log(webSocketMessage)}})const pageQueryMessage = function () {request({url: '/api/message/pageQueryMessage',method: 'post',data: {page: page.value,size: size}}).then((res) => {console.log(res)if (res.data.data.length === 0) {hasMessage.value = false;}else {observer.observe(loading.value)page.value = page.value + 1;messageList.value.push(...res.data.data)}})}function getOnlineUserList() {request({url: '/api/user/getOnlineUser',method: 'get'}).then((res) => {console.log(res)userList.value = res.data.data;})}const sendMessage = function () {if (!webSocketStore.wsConnect) {webSocketStore.wsInit();}const webSocketMessageDto = {type: "群发",text: message.value}webSocketStore.sendMessage(JSON.stringify(webSocketMessageDto));}</script>
这样就实现了一个简易的聊天数据持久化,支持在线聊天的界面,总的来说WebSocket用起来还是十分方便的
后面我看看能不能做下上传图片,上传文件之类的功能