文章目录
- LimServer
- LimServer
- snakeyaml
- 依赖
- 使用
- 配置类
- 配置文件
- 私有协议解码
- MessageDecoder
- ByteBufToMessageUtils
这个很全: IM即时通讯系统[SpringBoot+Netty]——梳理(总)
IO线程模型
Redis 分布式客户端 Redisson 分布式锁快速入门
LimServer
public class LimServer {private final static Logger logger = LoggerFactory.getLogger(LimServer.class);BootstrapConfig.TcpConfig config;EventLoopGroup mainGroup;EventLoopGroup subGroup;ServerBootstrap server;public LimServer(BootstrapConfig.TcpConfig config) {this.config = config;mainGroup = new NioEventLoopGroup(config.getBossThreadSize());subGroup = new NioEventLoopGroup(config.getWorkThreadSize());server = new ServerBootstrap();server.group(mainGroup,subGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 10240) // 服务端可连接队列大小.option(ChannelOption.SO_REUSEADDR, true) // 参数表示允许重复使用本地地址和端口.childOption(ChannelOption.TCP_NODELAY, true) // 是否禁用Nagle算法 简单点说是否批量发送数据 true关闭 false开启。 开启的话可以减少一定的网络开销,但影响消息实时性.childOption(ChannelOption.SO_KEEPALIVE, true) // 保活开关2h没有数据服务端会发送心跳包.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new MessageDecoder());ch.pipeline().addLast(new MessageEncoder());
// ch.pipeline().addLast(new IdleStateHandler(
// 0, 0,
// 10));ch.pipeline().addLast(new HeartBeatHandler(config.getHeartBeatTime()));ch.pipeline().addLast(new NettyServerHandler(config.getBrokerId(),config.getLogicUrl()));}});}public void start(){this.server.bind(this.config.getTcpPort());}}
LimServer
public class LimWebSocketServer {private final static Logger logger = LoggerFactory.getLogger(LimWebSocketServer.class);BootstrapConfig.TcpConfig config;EventLoopGroup mainGroup;EventLoopGroup subGroup;ServerBootstrap server;public LimWebSocketServer(BootstrapConfig.TcpConfig config) {this.config = config;mainGroup = new NioEventLoopGroup();subGroup = new NioEventLoopGroup();server = new ServerBootstrap();server.group(mainGroup, subGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 10240) // 服务端可连接队列大小.option(ChannelOption.SO_REUSEADDR, true) // 参数表示允许重复使用本地地址和端口.childOption(ChannelOption.TCP_NODELAY, true) // 是否禁用Nagle算法 简单点说是否批量发送数据 true关闭 false开启。 开启的话可以减少一定的网络开销,但影响消息实时性.childOption(ChannelOption.SO_KEEPALIVE, true) // 保活开关2h没有数据服务端会发送心跳包.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();// websocket 基于http协议,所以要有http编解码器pipeline.addLast("http-codec", new HttpServerCodec());// 对写大数据流的支持pipeline.addLast("http-chunked", new ChunkedWriteHandler());// 几乎在netty中的编程,都会使用到此hanlerpipeline. addLast("aggregator", new HttpObjectAggregator(65535));/*** websocket 服务器处理的协议,用于指定给客户端连接访问的路由 : /ws* 本handler会帮你处理一些繁重的复杂的事* 会帮你处理握手动作: handshaking(close, ping, pong) ping + pong = 心跳* 对于websocket来讲,都是以frames进行传输的,不同的数据类型对应的frames也不同*/pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));pipeline.addLast(new WebSocketMessageDecoder());pipeline.addLast(new WebSocketMessageEncoder());pipeline.addLast(new NettyServerHandler(config.getBrokerId(),config.getLogicUrl()));}});}public void start(){this.server.bind(this.config.getWebSocketPort());}
}
snakeyaml
依赖
<!-- yaml -->
<dependency><groupId>org.yaml</groupId><artifactId>snakeyaml</artifactId><version>${snakeyaml.version}</version>
</dependency>
使用
private static void start(String path){try {Yaml yaml = new Yaml();InputStream inputStream = new FileInputStream(path);BootstrapConfig bootstrapConfig = yaml.loadAs(inputStream, BootstrapConfig.class);new LimServer(bootstrapConfig.getLim()).start();new LimWebSocketServer(bootstrapConfig.getLim()).start();RedisManager.init(bootstrapConfig);MqFactory.init(bootstrapConfig.getLim().getRabbitmq());MessageReciver.init(bootstrapConfig.getLim().getBrokerId()+"");registerZK(bootstrapConfig);}catch (Exception e){e.printStackTrace();System.exit(500);}}
配置类
@Data
public class BootstrapConfig {private TcpConfig lim;@Datapublic static class TcpConfig {private Integer tcpPort;// tcp 绑定的端口号private Integer webSocketPort; // webSocket 绑定的端口号private boolean enableWebSocket; //是否启用webSocketprivate Integer bossThreadSize; // boss线程 默认=1private Integer workThreadSize; //work线程private Long heartBeatTime; //心跳超时时间 单位毫秒private Integer loginModel;/*** redis配置*/private RedisConfig redis;/*** rabbitmq配置*/private Rabbitmq rabbitmq;/*** zk配置*/private ZkConfig zkConfig;/*** brokerId*/private Integer brokerId;private String logicUrl;}@Datapublic static class ZkConfig {/*** zk连接地址*/private String zkAddr;/*** zk连接超时时间*/private Integer zkConnectTimeOut;}@Data@Builder@NoArgsConstructor@AllArgsConstructorpublic static class RedisConfig {/*** 单机模式:single 哨兵模式:sentinel 集群模式:cluster*/private String mode;/*** 数据库*/private Integer database;/*** 密码*/private String password;/*** 超时时间*/private Integer timeout;/*** 最小空闲数*/private Integer poolMinIdle;/*** 连接超时时间(毫秒)*/private Integer poolConnTimeout;/*** 连接池大小*/private Integer poolSize;/*** redis单机配置*/private RedisSingle single;}/*** redis单机配置*/@Data@Builder@NoArgsConstructor@AllArgsConstructorpublic static class RedisSingle {/*** 地址*/private String address;}/*** rabbitmq哨兵模式配置*/@Data@Builder@NoArgsConstructor@AllArgsConstructorpublic static class Rabbitmq {private String host;private Integer port;private String virtualHost;private String userName;private String password;}}
配置文件
lim:tcpPort: 9000webSocketPort: 19000bossThreadSize: 1workThreadSize: 8heartBeatTime: 20000 #心跳超时时间 单位毫秒brokerId: 1000loginModel: 3logicUrl: http://127.0.0.1:8000/v1# * 多端同步模式:1 只允许一端在线,手机/电脑/web 踢掉除了本client+imel的设备# * 2 允许手机/电脑的一台设备 + web在线 踢掉除了本client+imel的非web端设备# * 3 允许手机和电脑单设备 + web 同时在线 踢掉非本client+imel的同端设备# * 4 允许所有端多设备登录 不踢任何设备redis:mode: single # 单机模式:single 哨兵模式:sentinel 集群模式:clusterdatabase: 0password:timeout: 3000 # 超时时间poolMinIdle: 8 #最小空闲数poolConnTimeout: 3000 # 连接超时时间(毫秒)poolSize: 10 # 连接池大小single: #redis单机配置address: 127.0.0.1:6379rabbitmq:host: 127.0.0.1port: 5672virtualHost: /userName: guestpassword: guestzkConfig:zkAddr: 127.0.0.1:2181zkConnectTimeOut: 5000
私有协议解码
MessageDecoder
public class MessageDecoder extends ByteToMessageDecoder {@Overrideprotected void decode(ChannelHandlerContext ctx,ByteBuf in, List<Object> out) throws Exception {//请求头(指令// 版本// clientType// 消息解析类型// appId// imei长度// bodylen)+ imei号 + 请求体if(in.readableBytes() < 28){return;}Message message = ByteBufToMessageUtils.transition(in);if(message == null){return;}out.add(message);}
}
ByteBufToMessageUtils
/*** @author: Chackylee* @description: 将ByteBuf转化为Message实体,根据私有协议转换* 私有协议规则,* 4位表示Command表示消息的开始,* 4位表示version* 4位表示clientType* 4位表示messageType* 4位表示appId* 4位表示imei长度* imei* 4位表示数据长度* data* 后续将解码方式加到数据头根据不同的解码方式解码,如pb,json,现在用json字符串* @version: 1.0*/
public class ByteBufToMessageUtils {public static Message transition(ByteBuf in){/** 获取command*/int command = in.readInt();/** 获取version*/int version = in.readInt();/** 获取clientType*/int clientType = in.readInt();/** 获取clientType*/int messageType = in.readInt();/** 获取appId*/int appId = in.readInt();/** 获取imeiLength*/int imeiLength = in.readInt();/** 获取bodyLen*/int bodyLen = in.readInt();if(in.readableBytes() < bodyLen + imeiLength){in.resetReaderIndex();return null;}byte [] imeiData = new byte[imeiLength];in.readBytes(imeiData);String imei = new String(imeiData);byte [] bodyData = new byte[bodyLen];in.readBytes(bodyData);MessageHeader messageHeader = new MessageHeader();messageHeader.setAppId(appId);messageHeader.setClientType(clientType);messageHeader.setCommand(command);messageHeader.setLength(bodyLen);messageHeader.setVersion(version);messageHeader.setMessageType(messageType);messageHeader.setImei(imei);Message message = new Message();message.setMessageHeader(messageHeader);if(messageType == 0x0){String body = new String(bodyData);JSONObject parse = (JSONObject) JSONObject.parse(body);message.setMessagePack(parse);}in.markReaderIndex();return message;}}