第 5 篇 : 多节点Netty服务端(可扩展)

说明

前面消息互发以及广播都是单机就可以完成测试, 但实际场景中客户端的连接数量很大, 那就需要有一定数量的服务端去支撑, 所以准备虚拟机测试。

1. 虚拟机准备

1.1 准备1个1核1G的虚拟机(160), 配置java环境, 安装redis和minio

1.2 准备6个1核1G的空虚拟机(161到166), 只需要java环境即可

2. 服务端改造

2.1 修改 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.4.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.hahashou.netty</groupId><artifactId>server</artifactId><version>1.0-SNAPSHOT</version><name>server</name><description>Netty Server Project For Spring Boot</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.100.Final</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.58</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.springframework.security</groupId><artifactId>spring-security-crypto</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build></project>

2.2 修改 application.yml (每个服务端的id是不一样的)

server:port: 32000spring:redis:host: 192.168.109.160port: 6379password: rootlogging:level:com.hahashou.netty: infonetty:server:# 唯一标识(与hosts文件里对应)id : netty-server-1# 客户端需要连接的端口port: 35000

2.3 config包下增加 NettyStatic类

package com.hahashou.netty.server.config;import io.netty.channel.Channel;import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** @description: 静态常量* @author: 哼唧兽* @date: 9999/9/21**/
public class NettyStatic {/** key: 用户code; value: channelId */public static Map<String, String> USER_CHANNEL = new ConcurrentHashMap<>(32);/** key: channelId; value: Channel */public static Map<String, Channel> CHANNEL = new ConcurrentHashMap<>(32);public static Map<String, NettyClientHandler> NETTY_CLIENT_HANDLER = new ConcurrentHashMap<>(32);public static Map<NettyClientHandler, NettyClient> NETTY_CLIENT = new ConcurrentHashMap<>(32);
}

2.4 config包下增加 RedisConfig类

package com.hahashou.netty.server.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;/*** @description: Redis配置* @author: 哼唧兽* @date: 9999/9/21**/
@Configuration
public class RedisConfig {@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();// 使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());redisTemplate.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());// 使用StringRedisSerializer来序列化和反序列化redis的keyredisTemplate.setKeySerializer(new StringRedisSerializer());redisTemplate.setHashKeySerializer(new StringRedisSerializer());// 开启事务:redisTemplate.setEnableTransactionSupport(true); 我觉得一般用不到(该操作是为了执行一组命令而设置的)redisTemplate.setConnectionFactory(redisConnectionFactory);return redisTemplate;}@Beanpublic ValueOperations<String, Object> redisOperation(RedisTemplate<String, Object> redisTemplate) {return redisTemplate.opsForValue();}public static String NETTY_SERVER_LOCK = "NETTY_SERVER_LOCK";public static String NETTY_SERVER_LIST = "NETTY_SERVER_LIST";public static String OFFLINE_MESSAGE = "OFFLINE_MESSAGE_";
}

2.5 修改 EventLoopGroupConfig类

package com.hahashou.netty.server.config;import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.RejectedExecutionHandlers;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;/*** @description: Netty线程组* @author: 哼唧兽* @date: 9999/9/21**/
@Configuration
public class EventLoopGroupConfig {private int bossNum = 1;private int workerNum = 4;private int businessNum = 1;private int maxPending = 100000;/** ------------------------------ 服务端 ------------------------------ */@Bean("bossGroup")public NioEventLoopGroup bossGroup() {return new NioEventLoopGroup(bossNum);}@Bean("workerGroup")public NioEventLoopGroup workerGroup() {return new NioEventLoopGroup(workerNum);}@Bean("businessGroup")public EventExecutorGroup businessGroup() {return new DefaultEventExecutorGroup(businessNum, new BusinessThreadFactory(),maxPending, RejectedExecutionHandlers.reject());}/** ------------------------------ 客户端 ------------------------------ */@Bean("clientWorkerGroup")public NioEventLoopGroup clientWorkerGroup() {return new NioEventLoopGroup(workerNum);}@Bean("clientBusinessGroup")public EventExecutorGroup clientBusinessGroup() {return new DefaultEventExecutorGroup(businessNum, new BusinessThreadFactory(), maxPending, RejectedExecutionHandlers.reject());}static class BusinessThreadFactory implements ThreadFactory {private final ThreadGroup group;private final AtomicInteger threadNumber = new AtomicInteger(1);private final String namePrefix;BusinessThreadFactory() {SecurityManager securityManager = System.getSecurityManager();group = (securityManager != null) ? securityManager.getThreadGroup() : Thread.currentThread().getThreadGroup();namePrefix = "netty-server-";}@Overridepublic Thread newThread(Runnable runnable) {Thread thread = new Thread(group, runnable, namePrefix + threadNumber.getAndIncrement(), 0);if (thread.isDaemon()) {thread.setDaemon(false);}if (thread.getPriority() != Thread.NORM_PRIORITY) {thread.setPriority(Thread.NORM_PRIORITY);}return thread;}}
}

2.6 config包下增加 SpringBean类

package com.hahashou.netty.server.config;import io.netty.util.HashedWheelTimer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder;
import org.springframework.security.crypto.password.PasswordEncoder;/*** @description: Spring Bean管理* @author: 哼唧兽* @date: 9999/9/21**/
@Configuration
public class SpringBean {@Beanpublic PasswordEncoder passwordEncoder() {return new BCryptPasswordEncoder();}/*** 最多能new64个, private static final int INSTANCE_COUNT_LIMIT = 64;* @return*/@Beanpublic HashedWheelTimer hashedWheelTimer() {// 默认tick间隔100毫秒, 轮子大小为512return new HashedWheelTimer();}
}

2.7 server包下增加 ApplicationInitial类

package com.hahashou.netty.server;import com.hahashou.netty.server.config.NettyServer;
import io.netty.util.HashedWheelTimer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;/*** @description: 应用初始化* @author: 哼唧兽* @date: 9999/9/21**/
@Component
@Slf4j
public class ApplicationInitial implements ApplicationRunner {@Resourceprivate HashedWheelTimer hashedWheelTimer;@Resourceprivate NettyServer nettyServer;@Overridepublic void run(ApplicationArguments args) {hashedWheelTimer.newTimeout(nettyServer, 1 , TimeUnit.SECONDS);}
}

2.8 修改 Message类

package com.hahashou.netty.server.config;import com.alibaba.fastjson.JSON;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.CharsetUtil;
import lombok.Data;
import lombok.Getter;import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;/*** @description:* @author: 哼唧兽* @date: 9999/9/21**/
@Data
public class Message {/** 广播秘钥 */private String secretKey;/** 发送者用户code */private String userCode;/** 中转的服务端Id */private String serverId;/** 接收者用户code */private String friendUserCode;/** 连接时专用 */private String channelId;/** 消息类型 */private Integer type;public enum TypeEnum {TEXT(0, "文字", "", new ArrayList<>()),IMAGE(1, "图片", "image", Arrays.asList("bmp", "gif", "jpeg", "jpg", "png")),VOICE(2, "语音", "voice", Arrays.asList("mp3", "amr", "flac", "wma", "aac")),VIDEO(3, "视频", "video", Arrays.asList("mp4", "avi", "rmvb", "flv", "3gp", "ts", "mkv")),;@Getterprivate Integer key;@Getterprivate String describe;@Getterprivate String bucketName;@Getterprivate List<String> formatList;TypeEnum(int key, String describe, String bucketName, List<String> formatList) {this.key = key;this.describe = describe;this.bucketName = bucketName;this.formatList = formatList;}public static TypeEnum select(String format) {TypeEnum result = null;for (TypeEnum typeEnum : TypeEnum.values()) {if (typeEnum.getFormatList().contains(format)) {result = typeEnum;break;}}return result;}}/** 文字或文件的全路径名称 */private String text;public static ByteBuf transfer(Message message) {return Unpooled.copiedBuffer(JSON.toJSONString(message), CharsetUtil.UTF_8);}/*** 生成指定长度的随机字符串* @param length* @return*/public static String randomString (int length) {if (length > 64) {length = 64;}List<String> list = new ArrayList<>();for (int i = 0; i < 10; i++) {list.add(i + "");}for (char i = 'A'; i <= 'Z'; i++) {list.add(String.valueOf(i));}for (char i = 'a'; i <= 'z'; i++) {list.add(String.valueOf(i));}list.add("α");list.add("ω");Collections.shuffle(list);String string = list.toString();return string.replace("[", "").replace("]", "").replace(", ", "").substring(0, length);}
}

2.9 config包下增加 NettyClientHandler类

package com.hahashou.netty.server.config;import com.alibaba.fastjson.JSON;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.util.StringUtils;import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;/*** @description:* @author: 哼唧兽* @date: 9999/9/21**/
@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {@Getter@Setterprivate String userCode;@Getter@Setterprivate String hostName;@Getter@Setterprivate int port;@Resourceprivate ValueOperations<String, Object> redisOperation;@Overridepublic void channelActive(ChannelHandlerContext ctx) {log.info("{}, 作为客户端, 与其他服务端连接", LocalDateTime.now());}@Overridepublic void channelInactive(ChannelHandlerContext ctx) {NettyStatic.CHANNEL.remove(ctx.channel().id().asLongText());NettyClientHandler nettyClientHandler = NettyStatic.NETTY_CLIENT_HANDLER.remove(hostName + "@" + port);NettyClient nettyClient = NettyStatic.NETTY_CLIENT.remove(nettyClientHandler);nettyClient = null;nettyClientHandler = null;System.gc();}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {if (msg != null) {Message message = JSON.parseObject(msg.toString(), Message.class);String channelId = message.getChannelId(),text = message.getText();if (StringUtils.hasText(channelId)) {Channel channel = ctx.channel();message.setUserCode(userCode);NettyStatic.USER_CHANNEL.put(hostName, channelId);NettyStatic.CHANNEL.put(channelId, channel);channel.writeAndFlush(Message.transfer(message));} else if (StringUtils.hasText(text)) {String friendUserCode = message.getFriendUserCode();if (StringUtils.hasText(message.getServerId())) {String queryChannelId = NettyStatic.USER_CHANNEL.get(friendUserCode);if (StringUtils.hasText(queryChannelId)) {Channel channel = NettyStatic.CHANNEL.get(queryChannelId);if (channel == null) {offlineMessage(friendUserCode, message);return;}// 此时, 已不需要serverIdmessage.setServerId(null);channel.writeAndFlush(Message.transfer(message));} else {offlineMessage(friendUserCode, message);}}}}}/*** 离线消息存储Redis* @param friendUserCode* @param message*/public void offlineMessage(String friendUserCode, Message message) {List<Message> messageList = new ArrayList<>();Object offlineMessage = redisOperation.get(RedisConfig.OFFLINE_MESSAGE + friendUserCode);if (offlineMessage != null) {messageList = JSON.parseArray(offlineMessage.toString(), Message.class);}messageList.add(message);redisOperation.set(RedisConfig.OFFLINE_MESSAGE + friendUserCode, JSON.toJSONString(messageList));}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {}
}

2.10 config包下增加 NettyClient类

package com.hahashou.netty.server.config;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.concurrent.EventExecutorGroup;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;import javax.annotation.PreDestroy;
import java.net.*;
import java.nio.charset.Charset;/*** @description: Netty-客户端TCP服务* @author: 哼唧兽* @date: 9999/9/21**/
@Slf4j
public class NettyClient {@Getter@Setterprivate NioEventLoopGroup clientWorkerGroup;@Getter@Setterprivate EventExecutorGroup clientBusinessGroup;public void createClient(NettyClientHandler nettyClientHandler) {Bootstrap bootstrap = new Bootstrap();bootstrap.group(clientWorkerGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringDecoder(Charset.forName("UTF-8")));pipeline.addLast(new StringEncoder(Charset.forName("UTF-8")));pipeline.addLast(clientBusinessGroup, nettyClientHandler);}});try {InetAddress inetAddress = InetAddress.getByName(nettyClientHandler.getHostName());SocketAddress socketAddress = new InetSocketAddress(inetAddress, nettyClientHandler.getPort());bootstrap.connect(socketAddress).sync().channel();} catch (UnknownHostException exception) {log.error("请检查hosts文件是否配置正确 : {}", exception.getMessage());} catch (InterruptedException exception) {log.error("客户端中断异常 : {}", exception.getMessage());}}@PreDestroypublic void destroy() {clientWorkerGroup.shutdownGracefully().syncUninterruptibly();log.info("客户端关闭成功");}
}

2.11 修改 NettyServer类

package com.hahashou.netty.server.config;import com.alibaba.fastjson.JSON;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.EventExecutorGroup;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.nio.charset.Charset;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;/*** @description: Netty-服务端TCP服务* @author: 哼唧兽* @date: 9999/9/21**/
@Component
@Slf4j
public class NettyServer implements TimerTask {@Value("${netty.server.id}")private String serverId;@Value("${netty.server.port}")private int port;@Resourceprivate NioEventLoopGroup bossGroup;@Resourceprivate NioEventLoopGroup workerGroup;@Resourceprivate EventExecutorGroup businessGroup;@Resourceprivate NettyServerHandler nettyServerHandler;@Resourceprivate NioEventLoopGroup clientWorkerGroup;@Resourceprivate EventExecutorGroup clientBusinessGroup;@Resourceprivate RedisTemplate<String, Object> redisTemplate;@Resourceprivate ValueOperations<String, Object> redisOperation;@Resourceprivate HashedWheelTimer hashedWheelTimer;@Overridepublic void run(Timeout timeout) {Object nettyServerLock = redisOperation.get(RedisConfig.NETTY_SERVER_LOCK);if (nettyServerLock != null) {hashedWheelTimer.newTimeout(this, 10, TimeUnit.SECONDS);return;}try {redisOperation.set(RedisConfig.NETTY_SERVER_LOCK, true);//String hostAddress = InetAddress.getLocalHost().getHostAddress();ServerBootstrap serverBootstrap = new ServerBootstrap();ChannelFuture channelFuture = serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringDecoder(Charset.forName("UTF-8")));pipeline.addLast(new StringEncoder(Charset.forName("UTF-8")));pipeline.addLast(businessGroup, nettyServerHandler);}})// 服务端可连接队列数,对应TCP/IP协议listen函数中backlog参数.option(ChannelOption.SO_BACKLOG, 1024).childOption(ChannelOption.TCP_NODELAY, true).childOption(ChannelOption.SO_KEEPALIVE, true)// 此处有个大坑, 详见文章脱坑指南.bind(port).sync();if (channelFuture.isSuccess()) {log.info("{} 启动成功", serverId);redisTemplate.delete(RedisConfig.NETTY_SERVER_LOCK);}thisNodeHandle(port);channelFuture.channel().closeFuture().sync();} catch (InterruptedException exception) {log.error("{} 启动失败: {}", serverId, exception.getMessage());} finally {redisTemplate.delete(RedisConfig.NETTY_SERVER_LOCK);}}private void thisNodeHandle(int port) {Set<String> nodeList = new HashSet<>();Object nettyServerList = redisOperation.get(RedisConfig.NETTY_SERVER_LIST);if (nettyServerList != null) {nodeList = new HashSet<>(JSON.parseArray(nettyServerList.toString(), String.class));for (String hostAndPort : nodeList) {String[] split = hostAndPort.split("@");String connectHost = split[0];int connectPort = Integer.parseInt(split[1]);NettyClient nettyClient = new NettyClient();nettyClient.setClientWorkerGroup(clientWorkerGroup);nettyClient.setClientBusinessGroup(clientBusinessGroup);NettyClientHandler nettyClientHandler = new NettyClientHandler();nettyClientHandler.setUserCode(serverId);nettyClientHandler.setHostName(connectHost);nettyClientHandler.setPort(connectPort);nettyClient.createClient(nettyClientHandler);NettyStatic.NETTY_CLIENT_HANDLER.put(connectHost + "@" + connectPort, nettyClientHandler);NettyStatic.NETTY_CLIENT.put(nettyClientHandler, nettyClient);}}nodeList.add(serverId + "@" + port);redisOperation.set(RedisConfig.NETTY_SERVER_LIST, JSON.toJSONString(nodeList));}public void stop() {bossGroup.shutdownGracefully().syncUninterruptibly();workerGroup.shutdownGracefully().syncUninterruptibly();log.info("TCP服务关闭成功");Object nettyServerList = redisOperation.get(RedisConfig.NETTY_SERVER_LIST);List<String> hostList = JSON.parseArray(nettyServerList.toString(), String.class);hostList.remove(serverId + "@" + port);if (CollectionUtils.isEmpty(hostList)) {redisTemplate.delete(RedisConfig.NETTY_SERVER_LIST);} else {redisOperation.set(RedisConfig.NETTY_SERVER_LIST, JSON.toJSONString(hostList));}}@PreDestroypublic void destroy() {stop();}
}

2.12 修改 NettyServerHandler类

package com.hahashou.netty.server.config;import com.alibaba.fastjson.JSON;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;/*** @description:* @author: 哼唧兽* @date: 9999/9/21**/
@Component
@ChannelHandler.Sharable
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {@Value("${netty.server.id}")private String serverId;public static String SERVER_PREFIX = "netty-server-";@Resourceprivate RedisTemplate<String, Object> redisTemplate;@Resourceprivate ValueOperations<String, Object> redisOperation;@Overridepublic void channelActive(ChannelHandlerContext ctx) {Channel channel = ctx.channel();String channelId = channel.id().asLongText();log.info("有客户端连接, channelId : {}", channelId);NettyStatic.CHANNEL.put(channelId, channel);Message message = new Message();message.setChannelId(channelId);channel.writeAndFlush(Message.transfer(message));}@Overridepublic void channelInactive(ChannelHandlerContext ctx) {String channelId = ctx.channel().id().asLongText();log.info("有客户端断开连接, channelId : {}", channelId);NettyStatic.CHANNEL.remove(channelId);for (Map.Entry<String, String> entry : NettyStatic.USER_CHANNEL.entrySet()) {if (entry.getValue().equals(channelId)) {redisTemplate.delete(entry.getKey());break;}}}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {if (msg != null) {Message message = JSON.parseObject(msg.toString(), Message.class);String userCode = message.getUserCode(),channelId = message.getChannelId(),friendUserCode = message.getFriendUserCode();if (StringUtils.hasText(userCode) && StringUtils.hasText(channelId)) {connect(userCode, channelId);} else if (StringUtils.hasText(message.getText())) {Object code = redisOperation.get(friendUserCode);if (code != null) {String queryServerId = code.toString();message.setServerId(serverId.equals(queryServerId) ? null : queryServerId);if (StringUtils.hasText(friendUserCode)) {sendOtherClient(message);} else {sendAdmin(ctx.channel(), message);}} else {offlineMessage(friendUserCode, message);}}}}/*** 建立连接* @param userCode* @param channelId*/private void connect(String userCode, String channelId) {log.info("{} 连接", userCode);NettyStatic.USER_CHANNEL.put(userCode, channelId);if (!userCode.startsWith(SERVER_PREFIX)) {redisOperation.set(userCode, serverId);}}/*** 发送给其他客户端* @param message*/private void sendOtherClient(Message message) {String friendUserCode = message.getFriendUserCode(),serverId = message.getServerId();String queryChannelId;if (StringUtils.hasText(serverId)) {log.info("向" + serverId + " 进行转发");queryChannelId = NettyStatic.USER_CHANNEL.get(serverId);} else {queryChannelId = NettyStatic.USER_CHANNEL.get(friendUserCode);}if (StringUtils.hasText(queryChannelId)) {Channel channel = NettyStatic.CHANNEL.get(queryChannelId);if (channel == null) {offlineMessage(friendUserCode, message);return;}channel.writeAndFlush(Message.transfer(message));} else {offlineMessage(friendUserCode, message);}}/*** 离线消息存储Redis* @param friendUserCode* @param message*/public void offlineMessage(String friendUserCode, Message message) {// 1条message在redis中大概是100B, 1万条算1M, redis.conf的maxmemory设置的是256MList<Message> messageList = new ArrayList<>();Object offlineMessage = redisOperation.get(RedisConfig.OFFLINE_MESSAGE + friendUserCode);if (offlineMessage != null) {messageList = JSON.parseArray(offlineMessage.toString(), Message.class);}messageList.add(message);redisOperation.set(RedisConfig.OFFLINE_MESSAGE + friendUserCode, JSON.toJSONString(messageList));}/*** 发送给服务端* @param channel* @param message*/private void sendAdmin(Channel channel, Message message) {message.setUserCode("ADMIN");message.setText(LocalDateTime.now().toString());channel.writeAndFlush(Message.transfer(message));}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {log.info("有客户端发生异常, channelId : {}", ctx.channel().id().asLongText());}
}

2.13 新建service包, 并新增 ServerService接口

package com.hahashou.netty.server.service;import com.hahashou.netty.server.config.Message;/*** @description:* @author: 哼唧兽* @date: 9999/9/21**/
public interface ServerService {/*** 发送消息* @param dto*/void send(Message dto);/*** 停止服务(为后续断线重连做准备)*/void stop();
}

2.14 service包下新建impl包, 并新增 ServerServiceImpl类

package com.hahashou.netty.server.service.impl;import com.alibaba.fastjson.JSON;
import com.hahashou.netty.server.config.*;
import com.hahashou.netty.server.service.ServerService;
import io.netty.channel.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder;
import org.springframework.security.crypto.password.PasswordEncoder;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;/*** @description:* @author: 哼唧兽* @date: 9999/9/21**/
@Service
@Slf4j
public class ServerServiceImpl implements ServerService {@Value("${netty.server.id}")private String serverId;@Resourceprivate PasswordEncoder passwordEncoder;@Resourceprivate ValueOperations<String, Object> redisOperation;@Resourceprivate NettyServer nettyServer;@Overridepublic void send(Message dto) {String friendUserCode = dto.getFriendUserCode();if (StringUtils.hasText(friendUserCode)) {Object code = redisOperation.get(friendUserCode);if (code != null) {String queryServerId = code.toString();dto.setServerId(serverId.equals(queryServerId) ? null : queryServerId);if (StringUtils.hasText(friendUserCode)) {sendOtherClient(dto);}} else {offlineMessage(friendUserCode, dto);}} else {// 全体广播, 需要校验秘钥(inputSecretKey应该是一个动态值, 通过手机+验证码每次广播时获取, 自行实现)String inputSecretKey = dto.getSecretKey();// encodedPassword生成见main方法String encodedPassword = "$2a$10$J/UEqtme/w2D0TWB4gJKFeSsyc3s8pepr6ahzOsORkC9zpaLSvZbG";if (StringUtils.hasText(inputSecretKey) && passwordEncoder.matches(inputSecretKey, encodedPassword)) {dto.setSecretKey(null);for (Map.Entry<String, String> entry : NettyStatic.USER_CHANNEL.entrySet()) {String key = entry.getKey();if (key.startsWith(NettyServerHandler.SERVER_PREFIX)) {// 这里可以用http调用其他服务端, 自行补充(信息redis都有)continue;}// 只处理连接本端的客户端String value = entry.getValue();Channel channel = NettyStatic.CHANNEL.get(value);if (channel == null) {offlineMessage(friendUserCode, dto);return;}channel.writeAndFlush(Message.transfer(dto));}}}}public static void main(String[] args) {String text = "uTωAoJIGBcy7piYCFgQntVvEh8RH6WMU";PasswordEncoder passwordEncoder = new BCryptPasswordEncoder();String encode = passwordEncoder.encode(text);log.info(encode);if (passwordEncoder.matches(text, encode)) {log.info("秘钥正确");}}/*** 发送给其他客户端* @param message*/private void sendOtherClient(Message message) {String friendUserCode = message.getFriendUserCode(),serverId = message.getServerId();String queryChannelId;if (StringUtils.hasText(serverId)) {log.info("向" + serverId + " 进行转发");queryChannelId = NettyStatic.USER_CHANNEL.get(serverId);} else {queryChannelId = NettyStatic.USER_CHANNEL.get(friendUserCode);}if (StringUtils.hasText(queryChannelId)) {Channel channel = NettyStatic.CHANNEL.get(queryChannelId);if (channel == null) {offlineMessage(friendUserCode, message);return;}channel.writeAndFlush(Message.transfer(message));} else {offlineMessage(friendUserCode, message);}}/*** 离线消息存储Redis* @param friendUserCode* @param message*/public void offlineMessage(String friendUserCode, Message message) {List<Message> messageList = new ArrayList<>();Object offlineMessage = redisOperation.get(RedisConfig.OFFLINE_MESSAGE + friendUserCode);if (offlineMessage != null) {messageList = JSON.parseArray(offlineMessage.toString(), Message.class);}messageList.add(message);redisOperation.set(RedisConfig.OFFLINE_MESSAGE + friendUserCode, JSON.toJSONString(messageList));}@Overridepublic void stop() {nettyServer.stop();}
}

2.15 修改 ServerController类

package com.hahashou.netty.server.controller;import com.hahashou.netty.server.config.Message;
import com.hahashou.netty.server.service.ServerService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;import javax.annotation.Resource;/*** @description:* @author: 哼唧兽* @date: 9999/9/21**/
@RestController
@RequestMapping("/server")
@Slf4j
public class ServerController {@Resourceprivate ServerService serverService;/*** 秘钥记录: uTωAoJIGBcy7piYCFgQntVvEh8RH6WMU* @param dto* @return*/@PostMapping("/send")public String send(@RequestBody Message dto) {serverService.send(dto);return "success";}@GetMapping("/stop")public String stop() {serverService.stop();return "stop netty success";}
}

3. 脱坑指南, 针对 NettyServer类

工具

yum -y install net-tools
netstat -tunlp

防火墙打开时, 当使用 bind(String inetHost, int inetPort) 方法时, 因为inetHost是127.0.0.1, 所以只有本机可以访问35000, 要想让其他机器可以连接到, 需使用 bind(int inetPort) 方法, 下图是前后两次端口占用情况
端口占用情况
结论
当使用bind(String inetHost, int inetPort)方法时, 无论防火墙关闭以及启动, 虚拟机均有问题; 但当机器有公网IP, 且防火墙关闭或端口开放时, 通过DNS解析映射是没有问题的, 建议还是用bind(int inetPort)方法

4. 服务端准备

4.1 打包3个服务端的jar包, id分别为netty-server-1、netty-server-2、netty-server-3, 分别放在161到163上

4.2 161、162、163端口开放

firewall-cmd --zone=public --add-port=35000/tcp --permanent
firewall-cmd --zone=public --add-port=32000/tcp --permanent
firewall-cmd --reload

4.3 161、162、163修改hosts

vi /etc/hosts

追加内容

192.168.109.161 netty-server-1
192.168.109.162 netty-server-2
192.168.109.163 netty-server-3

4.4 依次启动161、162、163

java -Dfile.encoding=UTF-8 -jar server-1.0-SNAPSHOT.jar

161
服务端1启动
162
服务端2启动
163
服务端3启动
redis中记录的服务列表
redis中记录的服务列表

5. 客户端改造

5.1 修改 application.yml

server:port: 32001logging:level:com.hahashou.netty: infospring:servlet:multipart:max-file-size: 128MBmax-request-size: 256MBuserCode: Aa
host: 192.168.109.161minio:endpoint: http://192.168.109.160:9000accessKey: rootsecretKey: root123456

5.2 修改 NettyClient类

package com.hahashou.netty.client.config;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.concurrent.EventExecutorGroup;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.nio.charset.Charset;/*** @description: Netty-TCP服务* @author: 哼唧兽* @date: 9999/9/21**/
@Component
@Slf4j
public class NettyClient implements ApplicationListener<ApplicationStartedEvent> {@Value("${host}")private String host;public static int PORT = 35000;@Resourceprivate NioEventLoopGroup workerGroup;@Resourceprivate EventExecutorGroup businessGroup;@Resourceprivate NettyClientHandler nettyClientHandler;public static Channel CHANNEL;@SneakyThrows@Overridepublic void onApplicationEvent(ApplicationStartedEvent event) {createClient(workerGroup, businessGroup, nettyClientHandler, host, PORT);}public void createClient(NioEventLoopGroup workerGroup, EventExecutorGroup businessGroup,NettyClientHandler nettyClientHandler, String host, int port) {Bootstrap bootstrap = new Bootstrap();bootstrap.group(workerGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringDecoder(Charset.forName("UTF-8")));pipeline.addLast(new StringEncoder(Charset.forName("UTF-8")));pipeline.addLast(businessGroup, nettyClientHandler);}});try {CHANNEL = bootstrap.connect(host, port).sync().channel();} catch (InterruptedException exception) {log.error("客户端中断异常 : {}", exception.getMessage());}}@PreDestroypublic void destroy() {workerGroup.shutdownGracefully().syncUninterruptibly();log.info("客户端关闭成功");}
}

6. 客户端准备

6.1 准备6个jar包, 修改application.yml, 并根据下述规则放到对应机器上

Aa放在163上, Bb放在164上, Cc放在165上, Dd放在166上, Ee放在161上, Ff放在162上

userCode: Aa
host: 192.168.109.161
userCode: Bb
host: 192.168.109.161
userCode: Cc
host: 192.168.109.162
userCode: Dd
host: 192.168.109.162
userCode: Ee
host: 192.168.109.163
userCode: Ff
host: 192.168.109.163

6.2 161到166端口开放

firewall-cmd --zone=public --add-port=32001/tcp --permanent
firewall-cmd --reload

6.3 启动所有客户端

AB连接
CD连接
EF连接

7. 测试

请求参数

7.1 两个客户端连同一服务端, 不会出现转发

Aa向Bb发送消息, 且Bb收到后回复Aa
Aa向Bb
Bb向Aa

7.2 两个客户端连不同服务端

Aa向Cc发送消息(通过服务端1转发到服务端2), 且Cc收到后回复Aa(通过服务端2转发到服务端1)
A到C的转发
Aa向CcC到A的转发
Cc向Aa
Aa向Ee发送消息, 且Ee收到后回复Aa
Aa向Ee
Ee向Aa

7.3 广播

广播请求参数
收到广播

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/12431.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Elasticsearch的使用

Elasticsearch 1、认识和安装 Elasticsearch的官方网站如下&#xff1a; https://www.elastic.co/cn/elasticsearch Elasticsearch是由elastic公司开发的一套搜索引擎技术&#xff0c;它是elastic技术栈中的一部分。完整的技术栈包括&#xff1a; Elasticsearch&#xff1…

2024CCPC全国邀请赛(郑州)暨河南省赛

2024CCPC全国邀请赛&#xff08;郑州站&#xff09;暨河南省赛 一铜一银&#xff0c;虽不是线下第一次参赛但是第一次拿xcpc奖牌&#xff0c;还有个国赛奖真是不戳。感谢学长&#xff0c;感谢队友&#xff01; 虽然遗憾没有冲到省赛金&#xff0c;不过还有icpc商丘&#xff08…

Milvus Cloud:打造向量数据库的Airtable级体验

向量数据库Milvus Cloud是一种用于处理和存储向量数据的数据库,它通常用于机器学习、图像和视频检索、自然语言处理等领域。要将其升级为类似Airtable那样易用且一体化的系统,需要考虑以下几个关键方面: 1. 用户界面(UI)设计 Airtable之所以用户友好,很大程度上归功于其直…

类图及类的关系

类图&#xff08;Class Diagram&#xff09;是UML&#xff08;Unified Modeling Language&#xff0c;统一建模语言&#xff09;中的一种图&#xff0c;用于描述系统中类的静态结构&#xff0c;包括类的属性、方法以及类之间的关系。 一、类 类&#xff08;Class&#xff09;…

海外仓混合订单拣货策略:人工与海外仓系统的最佳搭配模式

根据订单高效拣货是任何海外仓都要面对的问题。只有当订单可以被高效&#xff0c;准确的拣货之后&#xff0c;才能继续走下面的物流流程&#xff0c;所以尽可能的缩短拣货时间&#xff0c;提升拣货精准度&#xff0c;才是提升订单交付率的最佳方法。 海外仓企业都在不断寻找&am…

Android动态布局framelayout

功能说明 最近碰到一个需求&#xff0c;要求在网页端拖控件&#xff0c;动态配置app控件的模块&#xff0c;大小和位置&#xff0c;显示不同的功能&#xff0c;然后在app大屏展示。 技术难点&#xff1a; 1.动态控件位置和大小难调&#xff0c;会出现布局混乱&#xff0c;位置错…

129.哈希表:有效的字母异位词(力扣)

242. 有效的字母异位词 - 力扣&#xff08;LeetCode&#xff09; 题目描述 代码解决以及思路 这个方法的时间复杂度为O(N)&#xff0c;其中N是字符串的长度&#xff0c;空间复杂度为O(1)&#xff08;因为辅助数组的大小是固定的26&#xff09;。 class Solution { public:bo…

初识C++ · string的使用(2)

目录 1 Modifiers部分 1.1 assign的使用 1.2 insert的使用 1.3 erase的使用 1.4 replace的使用 2 capacity部分 2.1 max_size的使用 2.2 capacity的使用 2.3 reserve的使用 2.4 shrink_to_fit简介 2.5 resize的使用 2.6 clear的使用 3 String operations部分 3.1 …

[数据结构1.0]快速排序

最近学习了快速排序&#xff0c;鼠鼠俺来做笔记了&#xff01; 本篇博客用排升序为例介绍快速排序&#xff01; 1.快速排序 快速排序是Hoare于1962年提出的一种二叉树结构的交换排序方法&#xff0c;其基本思想为&#xff1a;任取待排序元素序列中的某元素作为基准值&#x…

Java面试八股之一个char类型变量能不能存储一个中文字符

Java中一个char类型变量能不能存储一个中文字符&#xff1f;为什么&#xff1f; Java中一个char类型变量可以存储一个中文字符。原因如下&#xff1a; Unicode编码支持&#xff1a;Java语言采用Unicode字符集作为其内建字符编码方式。Unicode是一种广泛接受的字符编码标准&am…

两小时看完花书(深度学习入门篇)

1.深度学习花书前言 机器学习早期的时候十分依赖于已有的知识库和人为的逻辑规则&#xff0c;需要人们花大量的时间去制定合理的逻辑判定&#xff0c;可以说是有多少人工&#xff0c;就有多少智能。后来逐渐发展出一些简单的机器学习方法例如logistic regression、naive bayes等…

什么是CCRC?做什么用的?

CCRC&#xff08;中国网络安全审查认证和市场监管大数据中心&#xff09;原名为中国网络安全审查技术与认证中心&#xff0c;也被称为中国信息安全认证中心&#xff08;ISCCC&#xff09;。 该中心是经中央机构编制委员会办公室批准成立的&#xff0c;其主要职责是依据国家法律…

超级简单的地图操作工具开发可疑应急,地图画点,画线,画区域,获取地图经纬度等

使用echars的地图画点,画线,画区域,获取地图经纬度等 解压密码:10086007 地图也是用临时的bmap.js和china.js纯离线二选一 一共就这么多文件 画点,画线,画区域 点击地图获取经纬度-打印到控制台,这样就能渲染航迹,多变形,结合其他算法算圆等等操作 下载资源:https://download…

代码随想录训练营Day28:贪心算法06

1.738单调递增的数字 贪心策略&#xff1a;如果strNum[i]<strNum[i-1]那么strNum[i] 9,strNum[i-1]--;//比如87对应的最大的单调递增的就是79. 具体实现&#xff1a; 对于遇到小于的情况&#xff1a;如果strNum[i]<strNum[i-1]那么strNum[i] 9,strNum[i-1]--;遍历顺…

linux phpstudy 重启命令

[rootLinuxWeb phpstudy]# ./system/phpstudyctl restart 查看命令 1) phpstudy -start 启动小皮面板 2) phpstudy -stop 停止小皮面板 3) phpstudy -restart 重启小皮面板 4) phpstudy -status 查询面板状态 5) phpstudy -in…

OFDM802.11a的FPGA实现(十五)短训练序列:STS(含Matlab和verilog代码)

原文链接&#xff08;相关文章合集&#xff09;&#xff1a;OFDM 802.11a的xilinx FPGA实现 1.前言 在之前已经完成了data域数据的处理&#xff0c;在构建整个802.11a OFDM数据帧的时候&#xff0c;还剩下前导码和signal域的数据帧&#xff0c;这两部分的内容。 PLCP的前导部分…

Nodejs笔记2

模块化 模块化初体验 模块暴露数据 导入模块 fs 写绝对路径 require写相对路径不会受到影响 ./../不能省略 js 和json文件后缀可以省略 如果存在 命名相同的js和json文件&#xff0c;优先导入js文件 导入文件夹时的情况 require导入模块的基本流程 commonJS模块…

其它高阶数据结构①_并查集(概念+代码+两道OJ)

目录 1. 并查集的概念 2. 并查集的实现 3. 并查集的应用 3.1 力扣LCR 116. 省份数量 解析代码1 解析代码2 3.2 力扣990. 等式方程的可满足性 解析代码 本篇完。 写在前面&#xff1a; 此高阶数据结构系列&#xff0c;虽然放在⑤数据结构与算法专栏&#xff0c;但还是作…

【数据可视化01】matplotlib实例介绍4之六边形分箱图

目录 一、引言二、实例介绍 一、引言 hexbin是一个二维直方图&#xff0c;其中箱子是六边形&#xff0c;颜色表示每个箱子内的数据点数。 二、实例介绍 import matplotlib.pyplot as plt import numpy as np# Fixing random state for reproducibility np.random.seed(19680…

服务器利用率的神器脚本

在服务器管理的过程中&#xff0c;了解服务器的各项性能指标是至关重要的。无论是CPU的负载情况&#xff0c;内存使用情况&#xff0c;还是硬盘的存储空间以及TCP连接状态&#xff0c;这些都是我们判断服务器健康状态和性能的重要依据。然而&#xff0c;手动一项项去检查这些指…