文章目录
- 前言
- 技术积累
- 什么是netty
- netty如何实现IM
- 如何实现IM集群
- 实战演示
- 基础配置
- netty搭建IM集群
- redis发布订阅
- 实战测试
前言
在前面的博文中我们分享了原生websoket集群搭建,也用redis 发布订阅实现了集群消息正常有序分发。但是有不少同学希望风向一期netty实现websoket,可以实现对聊方案。其实前面分享的websocket集群解决方案完全可以实现im对聊的,只是性能处理上没有nettty那么好。今天我们就分享一起使用高性能NIO框架netty实现IM集群对聊方案,各位看官敬请鉴赏。
技术积累
什么是netty
Netty是由JBOSS提供的一个java开源框架,现为 Github上的独立项目。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。
netty的核心是支持零拷贝的bytebuf缓冲对象、通用通信api和可扩展的事件模型;它支持多种传输服务并且支持HTTP、Protobuf、二进制、文本、WebSocket 等一系列常见协议,也支持自定义协议。
netty的模型是基于reactor多线程模型,其中mainReactor用于接收客户端请求并转发给subReactor。SubReactor负责通道的读写请求,非 IO 请求(具体逻辑处理)的任务则会直接写入队列,等待 worker threads 进行处理。
netty如何实现IM
netty支持websocket通讯协议,那么我们就可以用它来实现websoket,实现后端服务主动向前端推送消息的功能。
比如AB用户分别注册到websoket后台,A用户向B用户发送消息,后端接收到A用户消息后判断消息接收者是B用户,然后后端逻辑直接调用B用户websoket连接进行推送即可。
如何实现IM集群
由于websocket 长连接是注册到后端服务本地缓存的,而且这个信道回话是不能被其他中间件缓存的,你们我们就只能在缓存目标用户的服务上拿到会话进行推送消息。
之前博文中也讲到过可以使用消息广播的形式找到目标回话服务,比如Redis的发布订阅、其他Mq等等。当A用户注册到C后端服务上,B服务注册到D后端服务上,这个时候如果A向B发送消息,则需要在C后端服务上增肌广播逻辑,让其他服务感知并监听到消息体,其他服务D收到消息后会验证是否这个用户会话缓存在本地,如果存在则向B前端对送消息,不再则不予处理。这样,就完成了IM集群回话的整个链路流程。
实战演示
本次实战我们简单使用netty实现IM集群通讯即可,如果需要用于生成环境需要增加一些防卫式程序设计,比如Redis发布监听冗余检验后错误处理等等。
基础配置
maven依赖
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.12.RELEASE</version><relativePath/> <!-- lookup parent from repository -->
</parent>
<properties><java.version>8</java.version><spring-cloud.version>Hoxton.SR12</spring-cloud.version>
</properties>
<dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>${spring-cloud.version}</version><type>pom</type><scope>import</scope></dependency></dependencies>
</dependencyManagement>
<!--netty-->
<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.6.Final</version>
</dependency>
<!-- 整合thymeleaf前端页面 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>
<dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.68</version>
</dependency>
application配置文件
server:port: 9999
spring:profiles:active: devmvc:pathmatch:# Springfox使用的路径匹配是基于AntPathMatcher的,而Spring Boot 2.6.X使用的是PathPatternMatchermatching-strategy: ant_path_matcherredis:host: 127.0.0.1port: 6379thymeleaf:mode: HTMLencoding: UTF-8content-type: text/htmlcache: falseprefix: classpath:/templates/
websocket演示html
<!DOCTYPE html>
<html>
<head><meta charset="utf-8"><title>websocket通讯</title>
</head>
<script src="https://cdn.bootcss.com/jquery/3.3.1/jquery.js"></script>
<script>var socket;function openSocket() {if (typeof (WebSocket) == "undefined") {console.log("您的浏览器不支持WebSocket");} else {console.log("您的浏览器支持WebSocket");//实现化WebSocket对象,指定要连接的服务器地址与端口 建立连接//等同于socket = new WebSocket("ws://localhost:8888/xxxx/im/25");//var socketUrl="${request.contextPath}/im/"+$("#userId").val();//var socketUrl = "ws://192.168.112.10:7777/ws/" + $("#userId").val();//var socketUrl = "wss://192.168.112.10/ws/"+ $("#userId").val();//var socketUrl = "wss://192.168.112.10:8899"var socketUrl = "ws://127.0.0.1:88?userId="+$("#userId").val();socketUrl = socketUrl.replace("https", "ws").replace("http", "ws");console.log(socketUrl);if (socket != null) {socket.close();socket = null;}socket = new WebSocket(socketUrl);//打开事件socket.onopen = function () {console.log("websocket已打开");//socket.send("这是来自客户端的消息" + location.href + new Date());};//获得消息事件socket.onmessage = function (msg) {console.log("接收消息为:"+msg.data);};//关闭事件socket.onclose = function () {console.log("websocket已关闭");};//发生了错误事件socket.onerror = function () {console.log("websocket发生了错误");}}}function sendMessage() {if (typeof (WebSocket) == "undefined") {console.log("您的浏览器不支持WebSocket");} else {console.log("您的浏览器支持WebSocket");console.log('发送消息为:{"fromUserId":"' + $("#userId").val() + '","toUserId":"' + $("#toUserId").val() + '","contentText":"' + $("#contentText").val() + '"}');socket.send('{"fromUserId":"' + $("#userId").val() + '","toUserId":"' + $("#toUserId").val() + '","contentText":"' + $("#contentText").val() + '"}');}}</script>
<body>
<p>【userId】:<div><input id="userId" name="userId" type="text" value="10"></div>
<p>【toUserId】:<div><input id="toUserId" name="toUserId" type="text" value="20"></div>
<p>【toUserId】:<div><input id="contentText" name="contentText" type="text" value="hello websocket"></div>
<p>【操作】:<div><button onclick="openSocket()">开启socket</button></div>
<p>【操作】:<div><button onclick="sendMessage()">发送消息</button></div>
</body>
</html>
netty搭建IM集群
创建消息体
Message.java
import lombok.Data;/*** Message* @author senfel* @version 1.0* @date 2024/5/17 14:39*/
@Data
public class Message {/*** 消息编码*/private String code;/*** 来自(保证唯一)*/private String fromUserId;/*** 去自(保证唯一)*/private String toUserId;/*** 内容*/private String contentText;}
创建netty websocket连接池
NettyWebSocketPool.java
import io.netty.channel.Channel;
import java.util.concurrent.ConcurrentHashMap;/*** NettyWebSocketPool* @author senfel* @version 1.0* @date 2024/5/23 10:19*/
public class NettyWebSocketPool {/*** 通道连接池*/public static final ConcurrentHashMap<String, Channel> CHANNELS = new ConcurrentHashMap<>();
}
创建websocket处理器
WebsocketServerHandler.java
import com.example.ccedemo.im.CommonConstants;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.core.StringRedisTemplate;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;/*** WebsocketServerHandler* @author senfel* @version 1.0* @date 2024/5/22 10:57*/
@Slf4j
public class WebsocketServerHandler extends SimpleChannelInboundHandler<Object> {private final static ThreadLocal<String> USER_LIST = new ThreadLocal<>();private WebSocketServerHandshaker handshaker;private StringRedisTemplate stringRedisTemplate;public WebsocketServerHandler() {}public WebsocketServerHandler(StringRedisTemplate stringRedisTemplate) {this.stringRedisTemplate = stringRedisTemplate;}@Overridepublic void channelActive(ChannelHandlerContext ctx) {InetSocketAddress reAddr = (InetSocketAddress) ctx.channel().remoteAddress();String clientIp = reAddr.getAddress().getHostAddress();String clientPort = String.valueOf(reAddr.getPort());log.debug("有新的客户端接入:{}:{}", clientIp, clientPort);}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof FullHttpRequest) {handleHttpRequest(ctx, (FullHttpRequest) msg);} else if (msg instanceof WebSocketFrame) {handleWebSocketFrame(ctx, (WebSocketFrame) msg);}}private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame msg) {if (msg instanceof CloseWebSocketFrame) {disconnectCurrentUser();handshaker.close(ctx.channel(), (CloseWebSocketFrame) msg.retain());return;}if (msg instanceof PingWebSocketFrame) {log.info("websocket ping message");ctx.channel().write(new PingWebSocketFrame(msg.content().retain()));} else if (msg instanceof TextWebSocketFrame) {// websocket消息解压成字符串让下一个handler处理String text = ((TextWebSocketFrame) msg).text();log.info("请求数据|{}", text);// 如果不调用这个方法后面的handler就获取不到数据ctx.fireChannelRead(text);} else {log.error("不支持的消息格式");throw new UnsupportedOperationException("不支持的消息格式");}}private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {if (!req.decoderResult().isSuccess()|| (!"websocket".equalsIgnoreCase(req.headers().get(HttpHeaderNames.UPGRADE)))) {sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));return;}WebSocketServerHandshakerFactory wsShakerFactory = new WebSocketServerHandshakerFactory("ws://" + req.headers().get(HttpHeaderNames.HOST), null, false);handshaker = wsShakerFactory.newHandshaker(req);if (handshaker == null) {WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());} else {String uri = req.uri();Map<String, String> paramMap = null;//如果url包含参数,需要处理if (uri.contains(CommonConstants.QUESTION)) {paramMap = getUrlParams(uri);String newUri = uri.substring(0, uri.indexOf(CommonConstants.QUESTION));req.setUri(newUri);}//缓存当前连接assert paramMap != null;String channelId = "userId:"+paramMap.get("userId");log.info("缓存用户通道信息:{}",ctx.channel().localAddress());log.info("缓存用户通道信息:{}",ctx.channel().remoteAddress());NettyWebSocketPool.CHANNELS.put(channelId, ctx.channel());USER_LIST.set(channelId);//写入在线用户stringRedisTemplate.opsForValue().set(CommonConstants.WEBSOCKET_CHANNEL_ID_PREFIX+channelId, channelId);//建立websocket连接握手handshaker.handshake(ctx.channel(), req);}}private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest msg, DefaultFullHttpResponse response) {if (response.status().code() != HttpResponseStatus.OK.code()) {ByteBuf buf = Unpooled.copiedBuffer(response.status().toString(), CharsetUtil.UTF_8);response.content().writeBytes(buf);buf.release();}ChannelFuture cf = ctx.channel().writeAndFlush(response);if (!HttpUtil.isKeepAlive(msg) || response.status().code() != HttpResponseStatus.OK.code()) {cf.addListener(ChannelFutureListener.CLOSE);}}/*** url 参数切割* @param url* @return*/private Map<String, String> getUrlParams(String url) {Map<String, String> map = new HashMap<>(4);url = url.replace("?", ";");if (!url.contains(";")) {return map;}if (url.split(";").length > 0) {String[] arr = url.split(";")[1].split("&");for (String s : arr) {String[] data = s.split("=");if (data.length > 1) {map.put(data[0], data[1]);}}return map;} else {return map;}}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.flush();}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {super.channelInactive(ctx);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.error("消息处理异常:{}", USER_LIST.get(), cause);disconnectCurrentUser();ctx.close();}/*** 状态触发 检测是否处于空闲状态 间隔时间 60s* @param ctx* @param evt* @throws Exception*/@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) {log.info("握手完成,连接地址为:{}", ctx.channel().remoteAddress());} else if (evt instanceof IdleStateEvent) {if (!StringUtils.isEmpty(USER_LIST.get())) {//断开连接disconnectCurrentUser();ctx.disconnect();}} else {super.userEventTriggered(ctx, evt);}}/*** disconnectCurrentUser* @author senfel* @date 2024/5/23 15:13* @return void*/private void disconnectCurrentUser() {log.info("谁断开了连接:{}",USER_LIST.get());log.info("userEventTriggered 触发,断开连接");NettyWebSocketPool.CHANNELS.remove(USER_LIST.get());stringRedisTemplate.delete(CommonConstants.WEBSOCKET_CHANNEL_ID_PREFIX+USER_LIST.get());USER_LIST.remove();}
}
创建websocket输入输出处理器
UserWebsocketInHandler.java
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;/*** UserWebsocketInHandler* 入站处理器:获取请求数据,完成业务处理,推送消息给浏览器* @author senfel* @version 1.0* @date 2024/5/22 11:10*/
@Slf4j
public class UserWebsocketInHandler extends SimpleChannelInboundHandler<String> {private StringRedisTemplate stringRedisTemplate;public UserWebsocketInHandler(StringRedisTemplate stringRedisTemplate) {this.stringRedisTemplate = stringRedisTemplate;}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {log.error(Thread.currentThread().getName() + "|" + msg);String pattern ="\\{.*\\}|\\[.*\\]";Pattern r= Pattern.compile(pattern);Matcher m =r.matcher(msg);if(m.matches()){stringRedisTemplate.convertAndSend("nettyWebsocketMsgPush",msg);}else {ctx.writeAndFlush(new TextWebSocketFrame(msg));}}
}
import io.netty.channel.*;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;/*** UserWebsocketOutHandler* 出站处理器:判断数据是否需要进行封装* @author senfel* @version 1.0* @date 2024/5/22 11:10*/public class UserWebsocketOutHandler extends ChannelOutboundHandlerAdapter {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {if(msg instanceof String) {ctx.write(new TextWebSocketFrame((String) msg), promise);} else {super.write(ctx, msg, promise);}}
}
创建netty服务端
NettyWebsocketServer.java
import com.example.ccedemo.im.NettyServer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;/*** NettyWebsocketServer* @author senfel* @version 1.0* @date 2024/5/22 11:03*/
@Slf4j
public class NettyWebsocketServer implements Runnable {private StringRedisTemplate stringRedisTemplate;/*** 服务端IP地址*/private String ip;/*** 服务端端口号*/private int port;public NettyWebsocketServer(String ip, int port, StringRedisTemplate stringRedisTemplate) {this.ip = ip;this.port = port;this.stringRedisTemplate = stringRedisTemplate;}@Overridepublic void run() {// 指定boss线程数:主要负责接收连接请求,一般设置为1就可以final EventLoopGroup boss = new NioEventLoopGroup(1, new ThreadFactory() {private AtomicInteger index = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NioBoss_%d", this.index.incrementAndGet()));}});// 指定worker线程数:主要负责处理连接就绪的连接,一般设置为CPU的核心数final int totalThread = Runtime.getRuntime().availableProcessors();final EventLoopGroup worker = new NioEventLoopGroup(totalThread, new ThreadFactory() {private AtomicInteger index = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NioSelector_%d_%d", totalThread, this.index.incrementAndGet()));}});// 指定任务处理线程数:主要负责读取数据和处理响应,一般该值设置的比较大,与业务相对应final int jobThreads = 1024;final EventLoopGroup job = new DefaultEventLoopGroup(jobThreads, new ThreadFactory() {private AtomicInteger index = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NioJob_%d_%d", jobThreads, this.index.incrementAndGet()));}});// 日志处理handler:类定义上面有Sharable表示线程安全,可以将对象定义在外面使用final LoggingHandler LOGGING_HANDLER = new LoggingHandler();// 指定服务端bootstrapServerBootstrap server = new ServerBootstrap();server.group(boss, worker)// 指定通道类型.channel(NioServerSocketChannel.class)// 指定全连接队列大小:windows下默认是200,linux/mac下默认是128.option(ChannelOption.SO_BACKLOG, 2048)// 维持链接的活跃,清除死链接.childOption(ChannelOption.SO_KEEPALIVE, true)// 关闭延迟发送.childOption(ChannelOption.TCP_NODELAY, true)// 添加handler处理链.childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel channel) throws Exception {ChannelPipeline pipeline = channel.pipeline();// 日志处理pipeline.addLast(LOGGING_HANDLER);// 心跳检测:读超时时间、写超时时间、全部超时时间(单位是秒,0表示不处理)pipeline.addLast(new IdleStateHandler(60, 60, 60, TimeUnit.SECONDS));// 处理http请求的编解码器pipeline.addLast(job, "httpServerCodec", new HttpServerCodec());pipeline.addLast(job, "chunkedWriteHandler", new ChunkedWriteHandler());pipeline.addLast(job, "httpObjectAggregator", new HttpObjectAggregator(65536));// 处理websocket的编解码器pipeline.addLast(job, "websocketHandler", new WebsocketServerHandler(stringRedisTemplate));// 自定义处理器pipeline.addLast(job, "userOutHandler", new UserWebsocketOutHandler());pipeline.addLast(job, "userInHandler", new UserWebsocketInHandler(stringRedisTemplate));}});try {// 服务端绑定对外服务地址ChannelFuture future = server.bind(ip, port).sync();log.info(NettyServer.class + " 启动正在监听: " + future.channel().localAddress());// 等待服务关闭,关闭后释放相关资源future.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {boss.shutdownGracefully();worker.shutdownGracefully();job.shutdownGracefully();}}
}
redis发布订阅
redis配置消息监听
RedisListenerConfig.java
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;/*** RedisListenerConfig* @author senfel* @version 1.0* @date 2024/5/24 16:26*/
@Configuration
public class RedisListenerConfig {@BeanRedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.addMessageListener(new LiveRedisKeysExpireListener(), new PatternTopic("nettyWebsocketMsgPush"));return container;}
}
redis监听消息处理
LiveRedisKeysExpireListener.java
import com.alibaba.fastjson.JSONObject;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import java.util.Objects;/*** LiveRedisKeysExpireListener* @author senfel* @version 1.0* @date 2024/5/24 16:25*/
public class LiveRedisKeysExpireListener implements MessageListener {@Overridepublic void onMessage(Message msg, byte[] bytes) {System.out.println("监听到需要进行负载转发的消息:" + msg.toString());com.example.ccedemo.nettydemo.Message message = JSONObject.parseObject(msg.toString(), com.example.ccedemo.nettydemo.Message.class);Channel channel = NettyWebSocketPool.CHANNELS.get("userId:" + message.getToUserId());if(Objects.nonNull(channel)){channel.writeAndFlush(new TextWebSocketFrame(msg.toString()));}}
}
实战测试
浏览器分别打开两个无痕页面
http://127.0.0.1:9999/websocket/page
模拟对话两个页面的用户互斥
分别发送消息实现对聊