说明
前面消息互发以及广播都是单机就可以完成测试, 但实际场景中客户端的连接数量很大, 那就需要有一定数量的服务端去支撑, 所以准备虚拟机测试。
1. 虚拟机准备
1.1 准备1个1核1G的虚拟机(160), 配置java环境, 安装redis和minio
1.2 准备6个1核1G的空虚拟机(161到166), 只需要java环境即可
2. 服务端改造
2.1 修改 pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.4.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.hahashou.netty</groupId><artifactId>server</artifactId><version>1.0-SNAPSHOT</version><name>server</name><description>Netty Server Project For Spring Boot</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.100.Final</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.58</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.springframework.security</groupId><artifactId>spring-security-crypto</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build></project>
2.2 修改 application.yml (每个服务端的id是不一样的)
server:port: 32000spring:redis:host: 192.168.109.160port: 6379password: rootlogging:level:com.hahashou.netty: infonetty:server:# 唯一标识(与hosts文件里对应)id : netty-server-1# 客户端需要连接的端口port: 35000
2.3 config包下增加 NettyStatic类
package com.hahashou.netty.server.config;import io.netty.channel.Channel;import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** @description: 静态常量* @author: 哼唧兽* @date: 9999/9/21**/
public class NettyStatic {/** key: 用户code; value: channelId */public static Map<String, String> USER_CHANNEL = new ConcurrentHashMap<>(32);/** key: channelId; value: Channel */public static Map<String, Channel> CHANNEL = new ConcurrentHashMap<>(32);public static Map<String, NettyClientHandler> NETTY_CLIENT_HANDLER = new ConcurrentHashMap<>(32);public static Map<NettyClientHandler, NettyClient> NETTY_CLIENT = new ConcurrentHashMap<>(32);
}
2.4 config包下增加 RedisConfig类
package com.hahashou.netty.server.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;/*** @description: Redis配置* @author: 哼唧兽* @date: 9999/9/21**/
@Configuration
public class RedisConfig {@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();// 使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());redisTemplate.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());// 使用StringRedisSerializer来序列化和反序列化redis的keyredisTemplate.setKeySerializer(new StringRedisSerializer());redisTemplate.setHashKeySerializer(new StringRedisSerializer());// 开启事务:redisTemplate.setEnableTransactionSupport(true); 我觉得一般用不到(该操作是为了执行一组命令而设置的)redisTemplate.setConnectionFactory(redisConnectionFactory);return redisTemplate;}@Beanpublic ValueOperations<String, Object> redisOperation(RedisTemplate<String, Object> redisTemplate) {return redisTemplate.opsForValue();}public static String NETTY_SERVER_LOCK = "NETTY_SERVER_LOCK";public static String NETTY_SERVER_LIST = "NETTY_SERVER_LIST";public static String OFFLINE_MESSAGE = "OFFLINE_MESSAGE_";
}
2.5 修改 EventLoopGroupConfig类
package com.hahashou.netty.server.config;import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.RejectedExecutionHandlers;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;/*** @description: Netty线程组* @author: 哼唧兽* @date: 9999/9/21**/
@Configuration
public class EventLoopGroupConfig {private int bossNum = 1;private int workerNum = 4;private int businessNum = 1;private int maxPending = 100000;/** ------------------------------ 服务端 ------------------------------ */@Bean("bossGroup")public NioEventLoopGroup bossGroup() {return new NioEventLoopGroup(bossNum);}@Bean("workerGroup")public NioEventLoopGroup workerGroup() {return new NioEventLoopGroup(workerNum);}@Bean("businessGroup")public EventExecutorGroup businessGroup() {return new DefaultEventExecutorGroup(businessNum, new BusinessThreadFactory(),maxPending, RejectedExecutionHandlers.reject());}/** ------------------------------ 客户端 ------------------------------ */@Bean("clientWorkerGroup")public NioEventLoopGroup clientWorkerGroup() {return new NioEventLoopGroup(workerNum);}@Bean("clientBusinessGroup")public EventExecutorGroup clientBusinessGroup() {return new DefaultEventExecutorGroup(businessNum, new BusinessThreadFactory(), maxPending, RejectedExecutionHandlers.reject());}static class BusinessThreadFactory implements ThreadFactory {private final ThreadGroup group;private final AtomicInteger threadNumber = new AtomicInteger(1);private final String namePrefix;BusinessThreadFactory() {SecurityManager securityManager = System.getSecurityManager();group = (securityManager != null) ? securityManager.getThreadGroup() : Thread.currentThread().getThreadGroup();namePrefix = "netty-server-";}@Overridepublic Thread newThread(Runnable runnable) {Thread thread = new Thread(group, runnable, namePrefix + threadNumber.getAndIncrement(), 0);if (thread.isDaemon()) {thread.setDaemon(false);}if (thread.getPriority() != Thread.NORM_PRIORITY) {thread.setPriority(Thread.NORM_PRIORITY);}return thread;}}
}
2.6 config包下增加 SpringBean类
package com.hahashou.netty.server.config;import io.netty.util.HashedWheelTimer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder;
import org.springframework.security.crypto.password.PasswordEncoder;/*** @description: Spring Bean管理* @author: 哼唧兽* @date: 9999/9/21**/
@Configuration
public class SpringBean {@Beanpublic PasswordEncoder passwordEncoder() {return new BCryptPasswordEncoder();}/*** 最多能new64个, private static final int INSTANCE_COUNT_LIMIT = 64;* @return*/@Beanpublic HashedWheelTimer hashedWheelTimer() {// 默认tick间隔100毫秒, 轮子大小为512return new HashedWheelTimer();}
}
2.7 server包下增加 ApplicationInitial类
package com.hahashou.netty.server;import com.hahashou.netty.server.config.NettyServer;
import io.netty.util.HashedWheelTimer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;/*** @description: 应用初始化* @author: 哼唧兽* @date: 9999/9/21**/
@Component
@Slf4j
public class ApplicationInitial implements ApplicationRunner {@Resourceprivate HashedWheelTimer hashedWheelTimer;@Resourceprivate NettyServer nettyServer;@Overridepublic void run(ApplicationArguments args) {hashedWheelTimer.newTimeout(nettyServer, 1 , TimeUnit.SECONDS);}
}
2.8 修改 Message类
package com.hahashou.netty.server.config;import com.alibaba.fastjson.JSON;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.CharsetUtil;
import lombok.Data;
import lombok.Getter;import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;/*** @description:* @author: 哼唧兽* @date: 9999/9/21**/
@Data
public class Message {/** 广播秘钥 */private String secretKey;/** 发送者用户code */private String userCode;/** 中转的服务端Id */private String serverId;/** 接收者用户code */private String friendUserCode;/** 连接时专用 */private String channelId;/** 消息类型 */private Integer type;public enum TypeEnum {TEXT(0, "文字", "", new ArrayList<>()),IMAGE(1, "图片", "image", Arrays.asList("bmp", "gif", "jpeg", "jpg", "png")),VOICE(2, "语音", "voice", Arrays.asList("mp3", "amr", "flac", "wma", "aac")),VIDEO(3, "视频", "video", Arrays.asList("mp4", "avi", "rmvb", "flv", "3gp", "ts", "mkv")),;@Getterprivate Integer key;@Getterprivate String describe;@Getterprivate String bucketName;@Getterprivate List<String> formatList;TypeEnum(int key, String describe, String bucketName, List<String> formatList) {this.key = key;this.describe = describe;this.bucketName = bucketName;this.formatList = formatList;}public static TypeEnum select(String format) {TypeEnum result = null;for (TypeEnum typeEnum : TypeEnum.values()) {if (typeEnum.getFormatList().contains(format)) {result = typeEnum;break;}}return result;}}/** 文字或文件的全路径名称 */private String text;public static ByteBuf transfer(Message message) {return Unpooled.copiedBuffer(JSON.toJSONString(message), CharsetUtil.UTF_8);}/*** 生成指定长度的随机字符串* @param length* @return*/public static String randomString (int length) {if (length > 64) {length = 64;}List<String> list = new ArrayList<>();for (int i = 0; i < 10; i++) {list.add(i + "");}for (char i = 'A'; i <= 'Z'; i++) {list.add(String.valueOf(i));}for (char i = 'a'; i <= 'z'; i++) {list.add(String.valueOf(i));}list.add("α");list.add("ω");Collections.shuffle(list);String string = list.toString();return string.replace("[", "").replace("]", "").replace(", ", "").substring(0, length);}
}
2.9 config包下增加 NettyClientHandler类
package com.hahashou.netty.server.config;import com.alibaba.fastjson.JSON;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.util.StringUtils;import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;/*** @description:* @author: 哼唧兽* @date: 9999/9/21**/
@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {@Getter@Setterprivate String userCode;@Getter@Setterprivate String hostName;@Getter@Setterprivate int port;@Resourceprivate ValueOperations<String, Object> redisOperation;@Overridepublic void channelActive(ChannelHandlerContext ctx) {log.info("{}, 作为客户端, 与其他服务端连接", LocalDateTime.now());}@Overridepublic void channelInactive(ChannelHandlerContext ctx) {NettyStatic.CHANNEL.remove(ctx.channel().id().asLongText());NettyClientHandler nettyClientHandler = NettyStatic.NETTY_CLIENT_HANDLER.remove(hostName + "@" + port);NettyClient nettyClient = NettyStatic.NETTY_CLIENT.remove(nettyClientHandler);nettyClient = null;nettyClientHandler = null;System.gc();}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {if (msg != null) {Message message = JSON.parseObject(msg.toString(), Message.class);String channelId = message.getChannelId(),text = message.getText();if (StringUtils.hasText(channelId)) {Channel channel = ctx.channel();message.setUserCode(userCode);NettyStatic.USER_CHANNEL.put(hostName, channelId);NettyStatic.CHANNEL.put(channelId, channel);channel.writeAndFlush(Message.transfer(message));} else if (StringUtils.hasText(text)) {String friendUserCode = message.getFriendUserCode();if (StringUtils.hasText(message.getServerId())) {String queryChannelId = NettyStatic.USER_CHANNEL.get(friendUserCode);if (StringUtils.hasText(queryChannelId)) {Channel channel = NettyStatic.CHANNEL.get(queryChannelId);if (channel == null) {offlineMessage(friendUserCode, message);return;}// 此时, 已不需要serverIdmessage.setServerId(null);channel.writeAndFlush(Message.transfer(message));} else {offlineMessage(friendUserCode, message);}}}}}/*** 离线消息存储Redis* @param friendUserCode* @param message*/public void offlineMessage(String friendUserCode, Message message) {List<Message> messageList = new ArrayList<>();Object offlineMessage = redisOperation.get(RedisConfig.OFFLINE_MESSAGE + friendUserCode);if (offlineMessage != null) {messageList = JSON.parseArray(offlineMessage.toString(), Message.class);}messageList.add(message);redisOperation.set(RedisConfig.OFFLINE_MESSAGE + friendUserCode, JSON.toJSONString(messageList));}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {}
}
2.10 config包下增加 NettyClient类
package com.hahashou.netty.server.config;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.concurrent.EventExecutorGroup;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;import javax.annotation.PreDestroy;
import java.net.*;
import java.nio.charset.Charset;/*** @description: Netty-客户端TCP服务* @author: 哼唧兽* @date: 9999/9/21**/
@Slf4j
public class NettyClient {@Getter@Setterprivate NioEventLoopGroup clientWorkerGroup;@Getter@Setterprivate EventExecutorGroup clientBusinessGroup;public void createClient(NettyClientHandler nettyClientHandler) {Bootstrap bootstrap = new Bootstrap();bootstrap.group(clientWorkerGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringDecoder(Charset.forName("UTF-8")));pipeline.addLast(new StringEncoder(Charset.forName("UTF-8")));pipeline.addLast(clientBusinessGroup, nettyClientHandler);}});try {InetAddress inetAddress = InetAddress.getByName(nettyClientHandler.getHostName());SocketAddress socketAddress = new InetSocketAddress(inetAddress, nettyClientHandler.getPort());bootstrap.connect(socketAddress).sync().channel();} catch (UnknownHostException exception) {log.error("请检查hosts文件是否配置正确 : {}", exception.getMessage());} catch (InterruptedException exception) {log.error("客户端中断异常 : {}", exception.getMessage());}}@PreDestroypublic void destroy() {clientWorkerGroup.shutdownGracefully().syncUninterruptibly();log.info("客户端关闭成功");}
}
2.11 修改 NettyServer类
package com.hahashou.netty.server.config;import com.alibaba.fastjson.JSON;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.EventExecutorGroup;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.nio.charset.Charset;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;/*** @description: Netty-服务端TCP服务* @author: 哼唧兽* @date: 9999/9/21**/
@Component
@Slf4j
public class NettyServer implements TimerTask {@Value("${netty.server.id}")private String serverId;@Value("${netty.server.port}")private int port;@Resourceprivate NioEventLoopGroup bossGroup;@Resourceprivate NioEventLoopGroup workerGroup;@Resourceprivate EventExecutorGroup businessGroup;@Resourceprivate NettyServerHandler nettyServerHandler;@Resourceprivate NioEventLoopGroup clientWorkerGroup;@Resourceprivate EventExecutorGroup clientBusinessGroup;@Resourceprivate RedisTemplate<String, Object> redisTemplate;@Resourceprivate ValueOperations<String, Object> redisOperation;@Resourceprivate HashedWheelTimer hashedWheelTimer;@Overridepublic void run(Timeout timeout) {Object nettyServerLock = redisOperation.get(RedisConfig.NETTY_SERVER_LOCK);if (nettyServerLock != null) {hashedWheelTimer.newTimeout(this, 10, TimeUnit.SECONDS);return;}try {redisOperation.set(RedisConfig.NETTY_SERVER_LOCK, true);//String hostAddress = InetAddress.getLocalHost().getHostAddress();ServerBootstrap serverBootstrap = new ServerBootstrap();ChannelFuture channelFuture = serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringDecoder(Charset.forName("UTF-8")));pipeline.addLast(new StringEncoder(Charset.forName("UTF-8")));pipeline.addLast(businessGroup, nettyServerHandler);}})// 服务端可连接队列数,对应TCP/IP协议listen函数中backlog参数.option(ChannelOption.SO_BACKLOG, 1024).childOption(ChannelOption.TCP_NODELAY, true).childOption(ChannelOption.SO_KEEPALIVE, true)// 此处有个大坑, 详见文章脱坑指南.bind(port).sync();if (channelFuture.isSuccess()) {log.info("{} 启动成功", serverId);redisTemplate.delete(RedisConfig.NETTY_SERVER_LOCK);}thisNodeHandle(port);channelFuture.channel().closeFuture().sync();} catch (InterruptedException exception) {log.error("{} 启动失败: {}", serverId, exception.getMessage());} finally {redisTemplate.delete(RedisConfig.NETTY_SERVER_LOCK);}}private void thisNodeHandle(int port) {Set<String> nodeList = new HashSet<>();Object nettyServerList = redisOperation.get(RedisConfig.NETTY_SERVER_LIST);if (nettyServerList != null) {nodeList = new HashSet<>(JSON.parseArray(nettyServerList.toString(), String.class));for (String hostAndPort : nodeList) {String[] split = hostAndPort.split("@");String connectHost = split[0];int connectPort = Integer.parseInt(split[1]);NettyClient nettyClient = new NettyClient();nettyClient.setClientWorkerGroup(clientWorkerGroup);nettyClient.setClientBusinessGroup(clientBusinessGroup);NettyClientHandler nettyClientHandler = new NettyClientHandler();nettyClientHandler.setUserCode(serverId);nettyClientHandler.setHostName(connectHost);nettyClientHandler.setPort(connectPort);nettyClient.createClient(nettyClientHandler);NettyStatic.NETTY_CLIENT_HANDLER.put(connectHost + "@" + connectPort, nettyClientHandler);NettyStatic.NETTY_CLIENT.put(nettyClientHandler, nettyClient);}}nodeList.add(serverId + "@" + port);redisOperation.set(RedisConfig.NETTY_SERVER_LIST, JSON.toJSONString(nodeList));}public void stop() {bossGroup.shutdownGracefully().syncUninterruptibly();workerGroup.shutdownGracefully().syncUninterruptibly();log.info("TCP服务关闭成功");Object nettyServerList = redisOperation.get(RedisConfig.NETTY_SERVER_LIST);List<String> hostList = JSON.parseArray(nettyServerList.toString(), String.class);hostList.remove(serverId + "@" + port);if (CollectionUtils.isEmpty(hostList)) {redisTemplate.delete(RedisConfig.NETTY_SERVER_LIST);} else {redisOperation.set(RedisConfig.NETTY_SERVER_LIST, JSON.toJSONString(hostList));}}@PreDestroypublic void destroy() {stop();}
}
2.12 修改 NettyServerHandler类
package com.hahashou.netty.server.config;import com.alibaba.fastjson.JSON;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;/*** @description:* @author: 哼唧兽* @date: 9999/9/21**/
@Component
@ChannelHandler.Sharable
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {@Value("${netty.server.id}")private String serverId;public static String SERVER_PREFIX = "netty-server-";@Resourceprivate RedisTemplate<String, Object> redisTemplate;@Resourceprivate ValueOperations<String, Object> redisOperation;@Overridepublic void channelActive(ChannelHandlerContext ctx) {Channel channel = ctx.channel();String channelId = channel.id().asLongText();log.info("有客户端连接, channelId : {}", channelId);NettyStatic.CHANNEL.put(channelId, channel);Message message = new Message();message.setChannelId(channelId);channel.writeAndFlush(Message.transfer(message));}@Overridepublic void channelInactive(ChannelHandlerContext ctx) {String channelId = ctx.channel().id().asLongText();log.info("有客户端断开连接, channelId : {}", channelId);NettyStatic.CHANNEL.remove(channelId);for (Map.Entry<String, String> entry : NettyStatic.USER_CHANNEL.entrySet()) {if (entry.getValue().equals(channelId)) {redisTemplate.delete(entry.getKey());break;}}}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {if (msg != null) {Message message = JSON.parseObject(msg.toString(), Message.class);String userCode = message.getUserCode(),channelId = message.getChannelId(),friendUserCode = message.getFriendUserCode();if (StringUtils.hasText(userCode) && StringUtils.hasText(channelId)) {connect(userCode, channelId);} else if (StringUtils.hasText(message.getText())) {Object code = redisOperation.get(friendUserCode);if (code != null) {String queryServerId = code.toString();message.setServerId(serverId.equals(queryServerId) ? null : queryServerId);if (StringUtils.hasText(friendUserCode)) {sendOtherClient(message);} else {sendAdmin(ctx.channel(), message);}} else {offlineMessage(friendUserCode, message);}}}}/*** 建立连接* @param userCode* @param channelId*/private void connect(String userCode, String channelId) {log.info("{} 连接", userCode);NettyStatic.USER_CHANNEL.put(userCode, channelId);if (!userCode.startsWith(SERVER_PREFIX)) {redisOperation.set(userCode, serverId);}}/*** 发送给其他客户端* @param message*/private void sendOtherClient(Message message) {String friendUserCode = message.getFriendUserCode(),serverId = message.getServerId();String queryChannelId;if (StringUtils.hasText(serverId)) {log.info("向" + serverId + " 进行转发");queryChannelId = NettyStatic.USER_CHANNEL.get(serverId);} else {queryChannelId = NettyStatic.USER_CHANNEL.get(friendUserCode);}if (StringUtils.hasText(queryChannelId)) {Channel channel = NettyStatic.CHANNEL.get(queryChannelId);if (channel == null) {offlineMessage(friendUserCode, message);return;}channel.writeAndFlush(Message.transfer(message));} else {offlineMessage(friendUserCode, message);}}/*** 离线消息存储Redis* @param friendUserCode* @param message*/public void offlineMessage(String friendUserCode, Message message) {// 1条message在redis中大概是100B, 1万条算1M, redis.conf的maxmemory设置的是256MList<Message> messageList = new ArrayList<>();Object offlineMessage = redisOperation.get(RedisConfig.OFFLINE_MESSAGE + friendUserCode);if (offlineMessage != null) {messageList = JSON.parseArray(offlineMessage.toString(), Message.class);}messageList.add(message);redisOperation.set(RedisConfig.OFFLINE_MESSAGE + friendUserCode, JSON.toJSONString(messageList));}/*** 发送给服务端* @param channel* @param message*/private void sendAdmin(Channel channel, Message message) {message.setUserCode("ADMIN");message.setText(LocalDateTime.now().toString());channel.writeAndFlush(Message.transfer(message));}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {log.info("有客户端发生异常, channelId : {}", ctx.channel().id().asLongText());}
}
2.13 新建service包, 并新增 ServerService接口
package com.hahashou.netty.server.service;import com.hahashou.netty.server.config.Message;/*** @description:* @author: 哼唧兽* @date: 9999/9/21**/
public interface ServerService {/*** 发送消息* @param dto*/void send(Message dto);/*** 停止服务(为后续断线重连做准备)*/void stop();
}
2.14 service包下新建impl包, 并新增 ServerServiceImpl类
package com.hahashou.netty.server.service.impl;import com.alibaba.fastjson.JSON;
import com.hahashou.netty.server.config.*;
import com.hahashou.netty.server.service.ServerService;
import io.netty.channel.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder;
import org.springframework.security.crypto.password.PasswordEncoder;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;/*** @description:* @author: 哼唧兽* @date: 9999/9/21**/
@Service
@Slf4j
public class ServerServiceImpl implements ServerService {@Value("${netty.server.id}")private String serverId;@Resourceprivate PasswordEncoder passwordEncoder;@Resourceprivate ValueOperations<String, Object> redisOperation;@Resourceprivate NettyServer nettyServer;@Overridepublic void send(Message dto) {String friendUserCode = dto.getFriendUserCode();if (StringUtils.hasText(friendUserCode)) {Object code = redisOperation.get(friendUserCode);if (code != null) {String queryServerId = code.toString();dto.setServerId(serverId.equals(queryServerId) ? null : queryServerId);if (StringUtils.hasText(friendUserCode)) {sendOtherClient(dto);}} else {offlineMessage(friendUserCode, dto);}} else {// 全体广播, 需要校验秘钥(inputSecretKey应该是一个动态值, 通过手机+验证码每次广播时获取, 自行实现)String inputSecretKey = dto.getSecretKey();// encodedPassword生成见main方法String encodedPassword = "$2a$10$J/UEqtme/w2D0TWB4gJKFeSsyc3s8pepr6ahzOsORkC9zpaLSvZbG";if (StringUtils.hasText(inputSecretKey) && passwordEncoder.matches(inputSecretKey, encodedPassword)) {dto.setSecretKey(null);for (Map.Entry<String, String> entry : NettyStatic.USER_CHANNEL.entrySet()) {String key = entry.getKey();if (key.startsWith(NettyServerHandler.SERVER_PREFIX)) {// 这里可以用http调用其他服务端, 自行补充(信息redis都有)continue;}// 只处理连接本端的客户端String value = entry.getValue();Channel channel = NettyStatic.CHANNEL.get(value);if (channel == null) {offlineMessage(friendUserCode, dto);return;}channel.writeAndFlush(Message.transfer(dto));}}}}public static void main(String[] args) {String text = "uTωAoJIGBcy7piYCFgQntVvEh8RH6WMU";PasswordEncoder passwordEncoder = new BCryptPasswordEncoder();String encode = passwordEncoder.encode(text);log.info(encode);if (passwordEncoder.matches(text, encode)) {log.info("秘钥正确");}}/*** 发送给其他客户端* @param message*/private void sendOtherClient(Message message) {String friendUserCode = message.getFriendUserCode(),serverId = message.getServerId();String queryChannelId;if (StringUtils.hasText(serverId)) {log.info("向" + serverId + " 进行转发");queryChannelId = NettyStatic.USER_CHANNEL.get(serverId);} else {queryChannelId = NettyStatic.USER_CHANNEL.get(friendUserCode);}if (StringUtils.hasText(queryChannelId)) {Channel channel = NettyStatic.CHANNEL.get(queryChannelId);if (channel == null) {offlineMessage(friendUserCode, message);return;}channel.writeAndFlush(Message.transfer(message));} else {offlineMessage(friendUserCode, message);}}/*** 离线消息存储Redis* @param friendUserCode* @param message*/public void offlineMessage(String friendUserCode, Message message) {List<Message> messageList = new ArrayList<>();Object offlineMessage = redisOperation.get(RedisConfig.OFFLINE_MESSAGE + friendUserCode);if (offlineMessage != null) {messageList = JSON.parseArray(offlineMessage.toString(), Message.class);}messageList.add(message);redisOperation.set(RedisConfig.OFFLINE_MESSAGE + friendUserCode, JSON.toJSONString(messageList));}@Overridepublic void stop() {nettyServer.stop();}
}
2.15 修改 ServerController类
package com.hahashou.netty.server.controller;import com.hahashou.netty.server.config.Message;
import com.hahashou.netty.server.service.ServerService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;import javax.annotation.Resource;/*** @description:* @author: 哼唧兽* @date: 9999/9/21**/
@RestController
@RequestMapping("/server")
@Slf4j
public class ServerController {@Resourceprivate ServerService serverService;/*** 秘钥记录: uTωAoJIGBcy7piYCFgQntVvEh8RH6WMU* @param dto* @return*/@PostMapping("/send")public String send(@RequestBody Message dto) {serverService.send(dto);return "success";}@GetMapping("/stop")public String stop() {serverService.stop();return "stop netty success";}
}
3. 脱坑指南, 针对 NettyServer类
工具
yum -y install net-tools
netstat -tunlp
防火墙打开时, 当使用 bind(String inetHost, int inetPort) 方法时, 因为inetHost是127.0.0.1, 所以只有本机可以访问35000, 要想让其他机器可以连接到, 需使用 bind(int inetPort) 方法, 下图是前后两次端口占用情况
结论
当使用bind(String inetHost, int inetPort)方法时, 无论防火墙关闭以及启动, 虚拟机均有问题; 但当机器有公网IP, 且防火墙关闭或端口开放时, 通过DNS解析映射是没有问题的, 建议还是用bind(int inetPort)方法
4. 服务端准备
4.1 打包3个服务端的jar包, id分别为netty-server-1、netty-server-2、netty-server-3, 分别放在161到163上
4.2 161、162、163端口开放
firewall-cmd --zone=public --add-port=35000/tcp --permanent
firewall-cmd --zone=public --add-port=32000/tcp --permanent
firewall-cmd --reload
4.3 161、162、163修改hosts
vi /etc/hosts
追加内容
192.168.109.161 netty-server-1
192.168.109.162 netty-server-2
192.168.109.163 netty-server-3
4.4 依次启动161、162、163
java -Dfile.encoding=UTF-8 -jar server-1.0-SNAPSHOT.jar
161
162
163
redis中记录的服务列表
5. 客户端改造
5.1 修改 application.yml
server:port: 32001logging:level:com.hahashou.netty: infospring:servlet:multipart:max-file-size: 128MBmax-request-size: 256MBuserCode: Aa
host: 192.168.109.161minio:endpoint: http://192.168.109.160:9000accessKey: rootsecretKey: root123456
5.2 修改 NettyClient类
package com.hahashou.netty.client.config;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.concurrent.EventExecutorGroup;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.nio.charset.Charset;/*** @description: Netty-TCP服务* @author: 哼唧兽* @date: 9999/9/21**/
@Component
@Slf4j
public class NettyClient implements ApplicationListener<ApplicationStartedEvent> {@Value("${host}")private String host;public static int PORT = 35000;@Resourceprivate NioEventLoopGroup workerGroup;@Resourceprivate EventExecutorGroup businessGroup;@Resourceprivate NettyClientHandler nettyClientHandler;public static Channel CHANNEL;@SneakyThrows@Overridepublic void onApplicationEvent(ApplicationStartedEvent event) {createClient(workerGroup, businessGroup, nettyClientHandler, host, PORT);}public void createClient(NioEventLoopGroup workerGroup, EventExecutorGroup businessGroup,NettyClientHandler nettyClientHandler, String host, int port) {Bootstrap bootstrap = new Bootstrap();bootstrap.group(workerGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringDecoder(Charset.forName("UTF-8")));pipeline.addLast(new StringEncoder(Charset.forName("UTF-8")));pipeline.addLast(businessGroup, nettyClientHandler);}});try {CHANNEL = bootstrap.connect(host, port).sync().channel();} catch (InterruptedException exception) {log.error("客户端中断异常 : {}", exception.getMessage());}}@PreDestroypublic void destroy() {workerGroup.shutdownGracefully().syncUninterruptibly();log.info("客户端关闭成功");}
}
6. 客户端准备
6.1 准备6个jar包, 修改application.yml, 并根据下述规则放到对应机器上
Aa放在163上, Bb放在164上, Cc放在165上, Dd放在166上, Ee放在161上, Ff放在162上
userCode: Aa
host: 192.168.109.161
userCode: Bb
host: 192.168.109.161
userCode: Cc
host: 192.168.109.162
userCode: Dd
host: 192.168.109.162
userCode: Ee
host: 192.168.109.163
userCode: Ff
host: 192.168.109.163
6.2 161到166端口开放
firewall-cmd --zone=public --add-port=32001/tcp --permanent
firewall-cmd --reload
6.3 启动所有客户端
7. 测试
7.1 两个客户端连同一服务端, 不会出现转发
Aa向Bb发送消息, 且Bb收到后回复Aa
7.2 两个客户端连不同服务端
Aa向Cc发送消息(通过服务端1转发到服务端2), 且Cc收到后回复Aa(通过服务端2转发到服务端1)
Aa向Ee发送消息, 且Ee收到后回复Aa
7.3 广播