文章目录
- 四. 优化与源码
- 1. 优化
- 1.1 扩展序列化算法
- jdk序列化与反序列化
- Serializer & Algorithm
- Config
- application.properties
- MessageCodecSharable
- Message(抽象类)
- 测试
- 序列化测试
- 反序列化测试
- 1.2 参数调优
- 1)CONNECT_TIMEOUT_MILLIS
- 2)SO_BACKLOG
- 三次握手与accept
- 课堂测试
- TestBacklogServer
- TestBacklogClient
- debug的位置
- 报错
- 笔记测试
- 3)ulimit -n
- 4)TCP_NODELAY
- 5)SO_SNDBUF & SO_RCVBUF
- 6)ALLOCATOR
- ByteBufUtil
- 测试
- TestByteBuf
- TestByteBufClient
- 测试结果
- 7)RCVBUF_ALLOCATOR
- 测试
- 源码要点
- 1.3 RPC 框架
- 1)准备工作
- 修改Message
- RpcRequestMessage
- RpcResponseMessage
- RpcServer服务器架子
- RpcClient客户端架子
- ServicesFactory
- 2)服务器 handler
- RpcRequestMessageHandler
- 3)RpcClient第一版
- 4)客户端 handler 第一版
- ==流程分析==
- Gson问题
- 5)RpcClientManager
- SequenceIdGenerator
- ==流程分析==
- 6)RpcResponseMessageHandler
- 2. 源码分析
- 2.1 启动剖析
- 2.2 NioEventLoop 剖析
- 注意
- 2.3 accept 剖析
- 2.4 read 剖析
四. 优化与源码
1. 优化
1.1 扩展序列化算法
序列化,反序列化主要用在消息正文的转换上
- 序列化时,需要将 Java 对象变为要传输的数据(可以是 byte[],或 json 等,最终都需要变成 byte[])
- 反序列化时,需要将传入的正文数据还原成 Java 对象,便于处理
jdk序列化与反序列化
目前的代码仅支持 Java 自带的序列化,反序列化机制,核心代码如下
// 反序列化(将 字节数组 转换为 java对象, jdk反序列化不需要提供类型信息)
byte[] body = new byte[bodyLength];
byteBuf.readBytes(body);
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(body));
Message message = (Message) in.readObject();
message.setSequenceId(sequenceId);// 序列化(将 java对象 转换为 字节数组)
ByteArrayOutputStream out = new ByteArrayOutputStream();
new ObjectOutputStream(out).writeObject(message);
byte[] bytes = out.toByteArray();
Serializer & Algorithm
- 为了支持更多序列化算法,抽象一个 Serializer 接口
- 提供 Serializer 接口 两个实现,我这里直接将实现加入了枚举类 Serializer.Algorithm 中(此Algorithm枚举类写在Serializer接口中,作为Serializer接口类中的成员内部类)
public interface Serializer {// 反序列化方法<T> T deserialize(Class<T> clazz, byte[] bytes);// 序列化方法<T> byte[] serialize(T object);enum Algorithm implements Serializer {// 枚举类特点// 1. 如下枚举类对象因为实现了Serializer接口, 因此, 须实现serialize方法 & deserialize方法// 2. 枚举类中定义的枚举对象有特定的顺序, 枚举类对象调用ordinal()方法可获取到枚举类对象的顺序值// (顺序值从0开始)// 3. 调用 枚举类.valueOf(枚举类对象字符串形式), 可获取到对应的枚举类对象// (如Serializer.Algorithm.valueOf(Java))// 4. 调用 枚举类.values(), 可获取到所有的枚举类对象的数组// Java 实现Java {@Overridepublic <T> T deserialize(Class<T> clazz, byte[] bytes) {try {ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bytes));Object object = in.readObject();return (T) object;} catch (IOException | ClassNotFoundException e) {throw new RuntimeException("SerializerAlgorithm.Java 反序列化错误", e);}}@Overridepublic <T> byte[] serialize(T object) {try {ByteArrayOutputStream out = new ByteArrayOutputStream();new ObjectOutputStream(out).writeObject(object);return out.toByteArray();} catch (IOException e) {throw new RuntimeException("SerializerAlgorithm.Java 序列化错误", e);}}}, // Json 实现(引入了 Gson 依赖)Json {@Overridepublic <T> T deserialize(Class<T> clazz, byte[] bytes) {// Gson反序列化, 需要提供具体的反序列化类return new Gson().fromJson(new String(bytes, StandardCharsets.UTF_8), clazz);}@Overridepublic <T> byte[] serialize(T object) {// 指定json字符串转为字节数组时(使用Gson), 所使用的编码集return new Gson().toJson(object).getBytes(StandardCharsets.UTF_8);}};}}
Config
增加配置类和配置文件,将序列化算法配置到类路径下的application.properties文件中
public abstract class Config {static Properties properties;static {/* 加载类路径下的application.properties文件 */try (InputStream in = Config.class.getResourceAsStream("/application.properties")) {properties = new Properties();properties.load(in);} catch (IOException e) {throw new ExceptionInInitializerError(e);}}// 读取配置文件中 server.portpublic static int getServerPort() {String value = properties.getProperty("server.port");if(value == null) {return 8080;} else {return Integer.parseInt(value);}}// 读取配置文件中 serializer.algorithmpublic static Serializer.Algorithm getSerializerAlgorithm() {String value = properties.getProperty("serializer.algorithm");if(value == null) {// 默认使用jdk序列化return Serializer.Algorithm.Java;} else {// 传入枚举类字面形式字符串作为参数, 以返回对应的枚举类对象, 可配置为 Java 或 Gsonreturn Serializer.Algorithm.valueOf(value);}}
}
application.properties
配置文件
# 配置序列化算法, 配置的值就是枚举类对象的字面形式字符串
serializer.algorithm=Json
MessageCodecSharable
修改编解码器
/*** 必须和 LengthFieldBasedFrameDecoder 一起使用,确保接到的 ByteBuf 消息是完整的*/
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {@Overridepublic void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {ByteBuf out = ctx.alloc().buffer();// 1. 4 字节的魔数out.writeBytes(new byte[]{1, 2, 3, 4});// 2. 1 字节的版本,out.writeByte(1);// 3. 1 字节的序列化方式 jdk 0 , json 1//(获取配置文件中的序列化对象 的 索引)out.writeByte(Config.getSerializerAlgorithm().ordinal());// 4. 1 字节的指令类型out.writeByte(msg.getMessageType());// 5. 4 个字节out.writeInt(msg.getSequenceId());// 无意义,对齐填充out.writeByte(0xff);// 6. 获取内容的字节数组//(使用配置文件指定的算法 将 java对象 转换为 字节数组)byte[] bytes = Config.getSerializerAlgorithm().serialize(msg);// 7. 长度out.writeInt(bytes.length);// 8. 写入内容out.writeBytes(bytes);outList.add(out);}@Overrideprotected void decode(ChannelHandlerContext ctx,ByteBuf in, List<Object> out) throws Exception {int magicNum = in.readInt();byte version = in.readByte();// 获取待解码的数据中 指定的序列化算法 标识byte serializerAlgorithm = in.readByte(); // 0 或 1byte messageType = in.readByte(); // 0,1,2...int sequenceId = in.readInt();in.readByte();int length = in.readInt();// 这个长度是通过解析ByteBuf字节数据消息来的byte[] bytes = new byte[length];in.readBytes(bytes, 0, length);// 确定具体消息类型Class<? extends Message> messageClass = Message.getMessageClass(messageType);// 获取待解码的数据反序列化算法(枚举类的顺序, 从0开始)Serializer.Algorithm algorithm = Serializer.Algorithm.values()[serializerAlgorithm];// 使用指定的反序列化算法Message message = algorithm.deserialize(messageClass, bytes);out.add(message);}
}
Message(抽象类)
其中确定具体消息类型,可以根据 消息类型字节
获取到对应的 消息 class
(下面就是为了让Gson能够根据消息类型 获取到 具体的Message实现类,
从而 让Gson能够把Json字符串编码得到的字节数组转为 具体的Message对象,
而不能简单的指定Message这个抽象类作为Gson的反序列化目标类)
@Data
public abstract class Message implements Serializable {/*** 根据消息类型字节,获得对应的消息 class* @param messageType 消息类型字节* @return 消息 class*/public static Class<? extends Message> getMessageClass(int messageType) {return messageClasses.get(messageType);}private int sequenceId;private int messageType;public abstract int getMessageType();public static final int LoginRequestMessage = 0;public static final int LoginResponseMessage = 1;public static final int ChatRequestMessage = 2;public static final int ChatResponseMessage = 3;public static final int GroupCreateRequestMessage = 4;public static final int GroupCreateResponseMessage = 5;public static final int GroupJoinRequestMessage = 6;public static final int GroupJoinResponseMessage = 7;public static final int GroupQuitRequestMessage = 8;public static final int GroupQuitResponseMessage = 9;public static final int GroupChatRequestMessage = 10;public static final int GroupChatResponseMessage = 11;public static final int GroupMembersRequestMessage = 12;public static final int GroupMembersResponseMessage = 13;public static final int PingMessage = 14;public static final int PongMessage = 15;private static final Map<Integer, Class<? extends Message>> messageClasses = new HashMap<>();static {messageClasses.put(LoginRequestMessage, LoginRequestMessage.class);messageClasses.put(LoginResponseMessage, LoginResponseMessage.class);messageClasses.put(ChatRequestMessage, ChatRequestMessage.class);messageClasses.put(ChatResponseMessage, ChatResponseMessage.class);messageClasses.put(GroupCreateRequestMessage, GroupCreateRequestMessage.class);messageClasses.put(GroupCreateResponseMessage, GroupCreateResponseMessage.class);messageClasses.put(GroupJoinRequestMessage, GroupJoinRequestMessage.class);messageClasses.put(GroupJoinResponseMessage, GroupJoinResponseMessage.class);messageClasses.put(GroupQuitRequestMessage, GroupQuitRequestMessage.class);messageClasses.put(GroupQuitResponseMessage, GroupQuitResponseMessage.class);messageClasses.put(GroupChatRequestMessage, GroupChatRequestMessage.class);messageClasses.put(GroupChatResponseMessage, GroupChatResponseMessage.class);messageClasses.put(GroupMembersRequestMessage, GroupMembersRequestMessage.class);messageClasses.put(GroupMembersResponseMessage, GroupMembersResponseMessage.class);}
}
测试
序列化测试
将application.properties的序列化算法分别配置为Java、Json
public class TestSerializer {public static void main(String[] args) {MessageCodecSharable CODEC = new MessageCodecSharable();LoggingHandler LOGGING = new LoggingHandler();/* 在自定义编解码器的前后, 各配置1个日志处理器 */EmbeddedChannel channel = new EmbeddedChannel(LOGGING, CODEC, LOGGING);LoginRequestMessage message = new LoginRequestMessage("zhangsan", "123");/* 写 1个 出站消息, 将会经过 编码处理 */channel.writeOutbound(message);}}
从下面的日志输出可以看到,使用Json的方式,所传输的数据更小,并且易读
使用java序列化日志输出
18:10:29 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] REGISTERED
18:10:29 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] REGISTERED
18:10:29 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] ACTIVE
18:10:29 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] ACTIVE
18:10:29 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] WRITE: LoginRequestMessage(super=Message(sequenceId=0, messageType=0), username=zhangsan, password=123)
18:10:29 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] WRITE: 224B+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 01 00 00 00 00 00 00 ff 00 00 00 d0 |................|
|00000010| ac ed 00 05 73 72 00 2a 63 6f 6d 2e 7a 7a 68 75 |....sr.*com.zzhu|
|00000020| 61 2e 63 68 61 74 2e 6d 65 73 73 61 67 65 2e 4c |a.chat.message.L|
|00000030| 6f 67 69 6e 52 65 71 75 65 73 74 4d 65 73 73 61 |oginRequestMessa|
|00000040| 67 65 62 ad c8 8d f0 30 e5 ae 02 00 02 4c 00 08 |geb....0.....L..|
|00000050| 70 61 73 73 77 6f 72 64 74 00 12 4c 6a 61 76 61 |passwordt..Ljava|
|00000060| 2f 6c 61 6e 67 2f 53 74 72 69 6e 67 3b 4c 00 08 |/lang/String;L..|
|00000070| 75 73 65 72 6e 61 6d 65 71 00 7e 00 01 78 72 00 |usernameq.~..xr.|
|00000080| 1e 63 6f 6d 2e 7a 7a 68 75 61 2e 63 68 61 74 2e |.com.zzhua.chat.|
|00000090| 6d 65 73 73 61 67 65 2e 4d 65 73 73 61 67 65 c3 |message.Message.|
|000000a0| fd 8f 9c bb 24 c3 cd 02 00 02 49 00 0b 6d 65 73 |....$.....I..mes|
|000000b0| 73 61 67 65 54 79 70 65 49 00 0a 73 65 71 75 65 |sageTypeI..seque|
|000000c0| 6e 63 65 49 64 78 70 00 00 00 00 00 00 00 00 74 |nceIdxp........t|
|000000d0| 00 03 31 32 33 74 00 08 7a 68 61 6e 67 73 61 6e |..123t..zhangsan|
+--------+-------------------------------------------------+----------------+
18:10:29 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] FLUSH
18:10:29 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] FLUSH
使用Json序列化日志输出
18:10:47 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] REGISTERED
18:10:47 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] REGISTERED
18:10:47 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] ACTIVE
18:10:47 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] ACTIVE
18:10:47 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] WRITE: LoginRequestMessage(super=Message(sequenceId=0, messageType=0), username=zhangsan, password=123)
18:10:47 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] WRITE: 87B+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 01 01 00 00 00 00 00 ff 00 00 00 47 |...............G|
|00000010| 7b 22 75 73 65 72 6e 61 6d 65 22 3a 22 7a 68 61 |{"username":"zha|
|00000020| 6e 67 73 61 6e 22 2c 22 70 61 73 73 77 6f 72 64 |ngsan","password|
|00000030| 22 3a 22 31 32 33 22 2c 22 73 65 71 75 65 6e 63 |":"123","sequenc|
|00000040| 65 49 64 22 3a 30 2c 22 6d 65 73 73 61 67 65 54 |eId":0,"messageT|
|00000050| 79 70 65 22 3a 30 7d |ype":0} |
+--------+-------------------------------------------------+----------------+
18:10:47 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] FLUSH
18:10:47 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] FLUSH
反序列化测试
public class TestSerializer {public static void main(String[] args) {MessageCodecSharable CODEC = new MessageCodecSharable();LoggingHandler LOGGING = new LoggingHandler();/* 在自定义编解码器的前后, 各配置1个日志处理器 */EmbeddedChannel channel = new EmbeddedChannel(LOGGING, CODEC, LOGGING);LoginRequestMessage message = new LoginRequestMessage("zhangsan", "123");/* 先使用自定义的方法, 把消息写到ByteBuf中 */ByteBuf buf = messageToByteBuf(message);/* 写入1个入站消息, 将会经过解码处理 */channel.writeInbound(buf);}public static ByteBuf messageToByteBuf(Message msg) {int algorithm = Config.getSerializerAlgorithm().ordinal();ByteBuf out = ByteBufAllocator.DEFAULT.buffer();out.writeBytes(new byte[]{1, 2, 3, 4});out.writeByte(1);out.writeByte(algorithm);out.writeByte(msg.getMessageType());out.writeInt(msg.getSequenceId());out.writeByte(0xff);byte[] bytes = Serializer.Algorithm.values()[algorithm].serialize(msg);out.writeInt(bytes.length);out.writeBytes(bytes);return out;}
}
java反序列化日志输出
18:13:24 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] REGISTERED
18:13:24 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] REGISTERED
18:13:24 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] ACTIVE
18:13:24 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] ACTIVE
18:13:24 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ: 224B+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 01 00 00 00 00 00 00 ff 00 00 00 d0 |................|
|00000010| ac ed 00 05 73 72 00 2a 63 6f 6d 2e 7a 7a 68 75 |....sr.*com.zzhu|
|00000020| 61 2e 63 68 61 74 2e 6d 65 73 73 61 67 65 2e 4c |a.chat.message.L|
|00000030| 6f 67 69 6e 52 65 71 75 65 73 74 4d 65 73 73 61 |oginRequestMessa|
|00000040| 67 65 62 ad c8 8d f0 30 e5 ae 02 00 02 4c 00 08 |geb....0.....L..|
|00000050| 70 61 73 73 77 6f 72 64 74 00 12 4c 6a 61 76 61 |passwordt..Ljava|
|00000060| 2f 6c 61 6e 67 2f 53 74 72 69 6e 67 3b 4c 00 08 |/lang/String;L..|
|00000070| 75 73 65 72 6e 61 6d 65 71 00 7e 00 01 78 72 00 |usernameq.~..xr.|
|00000080| 1e 63 6f 6d 2e 7a 7a 68 75 61 2e 63 68 61 74 2e |.com.zzhua.chat.|
|00000090| 6d 65 73 73 61 67 65 2e 4d 65 73 73 61 67 65 c3 |message.Message.|
|000000a0| fd 8f 9c bb 24 c3 cd 02 00 02 49 00 0b 6d 65 73 |....$.....I..mes|
|000000b0| 73 61 67 65 54 79 70 65 49 00 0a 73 65 71 75 65 |sageTypeI..seque|
|000000c0| 6e 63 65 49 64 78 70 00 00 00 00 00 00 00 00 74 |nceIdxp........t|
|000000d0| 00 03 31 32 33 74 00 08 7a 68 61 6e 67 73 61 6e |..123t..zhangsan|
+--------+-------------------------------------------------+----------------+
18:13:24 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ: LoginRequestMessage(super=Message(sequenceId=0, messageType=0), username=zhangsan, password=123)
18:13:24 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ COMPLETE
18:13:24 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ COMPLETE
json反序列化日志输出
18:12:58 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] REGISTERED
18:12:58 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] REGISTERED
18:12:58 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] ACTIVE
18:12:58 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] ACTIVE
18:12:58 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ: 87B+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 01 01 00 00 00 00 00 ff 00 00 00 47 |...............G|
|00000010| 7b 22 75 73 65 72 6e 61 6d 65 22 3a 22 7a 68 61 |{"username":"zha|
|00000020| 6e 67 73 61 6e 22 2c 22 70 61 73 73 77 6f 72 64 |ngsan","password|
|00000030| 22 3a 22 31 32 33 22 2c 22 73 65 71 75 65 6e 63 |":"123","sequenc|
|00000040| 65 49 64 22 3a 30 2c 22 6d 65 73 73 61 67 65 54 |eId":0,"messageT|
|00000050| 79 70 65 22 3a 30 7d |ype":0} |
+--------+-------------------------------------------------+----------------+
18:12:58 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ: LoginRequestMessage(super=Message(sequenceId=0, messageType=0), username=zhangsan, password=123)
18:12:58 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ COMPLETE
18:12:58 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ COMPLETE
1.2 参数调优
1)CONNECT_TIMEOUT_MILLIS
-
属于 SocketChannal 参数
-
用在客户端建立连接时,如果在指定毫秒内无法连接,会抛出 timeout 异常
-
SO_TIMEOUT 主要用在阻塞 IO(即BIO),阻塞 IO 中 accept,read 等都是无限等待的,如果不希望永远阻塞,使用它调整超时时间(这就是SO_TIMEOUT与CONNECT_TIMEOUT_MILLIS的区别)
@Slf4j
public class TestConnectionTimeout {public static void main(String[] args) {// 1. 客户端通过 new Bootstrap().option() 方法配置参数 给 SocketChannel 配置参数// 2. 服务器端// new ServerBootstrap().option() // 是给 ServerSocketChannel 配置参数// new ServerBootstrap().childOption() // 是给 SocketChannel 配置参数NioEventLoopGroup group = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap().group(group).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000).channel(NioSocketChannel.class).handler(new LoggingHandler());// 此处仅发出建立连接的命令, 而不会等建立连接后才返回,ChannelFuture future = bootstrap.connect("127.0.0.1", 8080); // 调用future.sync()可以等到建立连接后停止阻塞, 程序才会向下运行//(可使用idea的右键该future变量选择markObject标记此future变量)future.sync().channel().closeFuture().sync(); // 断点1(注意断点的模式选择Thread)} catch (Exception e) {e.printStackTrace();log.debug("timeout");} finally {group.shutdownGracefully();}}
}
另外源码部分 io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#connect
@Override
public final void connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {// ...// Schedule connect timeout.int connectTimeoutMillis = config().getConnectTimeoutMillis();if (connectTimeoutMillis > 0) {// eventLoop开启了1个定时任务connectTimeoutFuture = eventLoop().schedule(new Runnable() {@Overridepublic void run() { ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;ConnectTimeoutException cause =new ConnectTimeoutException("connection timed out: " + remoteAddress); // 断点2// Promise用于在2个线程间交换数据, 可以通知另外1个线程是否成功或失败//(通过idea调试工具, 发现此处的connectPromise正好就是上面标记的future)//(此处通过Promise传入异常, 唤醒主线程, 通知主线程失败, 主线程的sync就停止阻塞了, // 但是主线程的future.sync()调用等来的是个异常, 所以不会调用后面的channel()方法, // 而是直接抛出了异常, 进入到了catch块)if (connectPromise != null && connectPromise.tryFailure(cause)) {close(voidPromise());}}}, connectTimeoutMillis, TimeUnit.MILLISECONDS);}// ...
}
2)SO_BACKLOG
- 属于 ServerSocketChannal 参数
三次握手与accept
- 第一次握手,client 发送 SYN 到 server,状态修改为 SYN_SEND,server 收到,状态改变为 SYN_REVD,并将该请求放入 sync queue 队列
- 第二次握手,server 回复 SYN + ACK 给 client,client 收到,状态改变为 ESTABLISHED,并发送 ACK 给 server
- 第三次握手,server 收到 ACK,状态改变为 ESTABLISHED,将该请求从 sync queue 放入 accept queue
其中
- 3次握手是发生在在accept前面的;
- 为什么要放入1个全连接队列呢?因为accept接受连接的能力是有限的,放入队列就是为了接受完1个连接之后,再从队列里面取出来接受下一个连接,这个accept就可以理解为nio中的accept接受连接的方法。
- 在 linux 2.2 之前,backlog 大小包括了两个队列的大小,在 2.2 之后,分别用下面两个参数来控制
- sync queue - 半连接队列
- 半连接队列就是还没有完成3次握手的连接信息先放入半连接队列
- 大小通过 /proc/sys/net/ipv4/tcp_max_syn_backlog 指定,在
syncookies
启用的情况下,逻辑上没有最大值限制,这个设置便被忽略
- accept queue - 全连接队列
- 已经完成3次握手的连接信息放入全连接队列
- 其大小通过 /proc/sys/net/core/somaxconn 指定,在使用 listen 函数时,内核会根据传入的 backlog 参数与系统参数,取二者的较小值
- 如果 accpet queue 队列满了,再有一个客户端再想连,server 将发送一个拒绝连接的错误信息到 client
netty 中可以通过 option(ChannelOption.SO_BACKLOG, 值) 来设置大小
可以通过下面源码查看默认大小
public class DefaultServerSocketChannelConfig extends DefaultChannelConfigimplements ServerSocketChannelConfig {private volatile int backlog = NetUtil.SOMAXCONN;// ...
}
课堂调试关键断点为:io.netty.channel.nio.NioEventLoop#processSelectedKey
课堂测试
为了测试SO_BACKLOG参数的作用,使用ServerBootstrap().option(…)设置SocketChannel的参数为2(取系统的参数与应用参数的最小值,这里肯定就是2了,2比较小),然后在NioEventLoop#processSelectedKey中打上debug,并且开3个客户端与服务端建立连接,发现第三个客户端就报错了:拒绝连接
TestBacklogServer
public class TestBacklogServer {public static void main(String[] args) {new ServerBootstrap().group(new NioEventLoopGroup()).option(ChannelOption.SO_BACKLOG, 2) // 全队列满了.channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) {ch.pipeline().addLast(new LoggingHandler());}}).bind(8080);}
}
TestBacklogClient
@Slf4j
public class TestBacklogClient {public static void main(String[] args) {NioEventLoopGroup worker = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler());}});ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("client error", e);} finally {worker.shutdownGracefully();}}
}
debug的位置
public final class NioEventLoop extends SingleThreadEventLoop {// ...private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {// ...// 在此处打上debug(使用Thread模式), 这里当触发了accept事件, 下面的read()方法就相当于accept接受连接// (这里打上debug之后,可短请求建立连接的过程就都卡在 全连接队列里面了)if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();}// ...}// ...
}
报错
当连接第三个客户端时,客户端报错如下:
21:28:41 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0xc42b2531] REGISTERED
21:28:41 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0xc42b2531] CONNECT: /127.0.0.1:8080
21:28:43 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0xc42b2531] CLOSE
21:28:43 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0xc42b2531] UNREGISTERED
Exception in thread "main" io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: /127.0.0.1:8080
Caused by: java.net.ConnectException: Connection refused: no further informationat sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:327)at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:336)at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:685)at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632)at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549)at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511)at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)at java.lang.Thread.run(Thread.java:748)
笔记测试
oio 中更容易说明,不用 debug 模式
public class Server {public static void main(String[] args) throws IOException {ServerSocket ss = new ServerSocket(8888, 2);Socket accept = ss.accept();System.out.println(accept);System.in.read();}
}
客户端启动 4 个
public class Client {public static void main(String[] args) throws IOException {try {Socket s = new Socket();System.out.println(new Date()+" connecting...");s.connect(new InetSocketAddress("localhost", 8888),1000);System.out.println(new Date()+" connected...");s.getOutputStream().write(1);System.in.read();} catch (IOException e) {System.out.println(new Date()+" connecting timeout...");e.printStackTrace();}}
}
第 1,2,3 个客户端都打印,但除了第一个处于 accpet 外,其它两个都处于 accept queue 中
Tue Apr 21 20:30:28 CST 2020 connecting...
Tue Apr 21 20:30:28 CST 2020 connected...
第 4 个客户端连接时
Tue Apr 21 20:53:58 CST 2020 connecting...
Tue Apr 21 20:53:59 CST 2020 connecting timeout...
java.net.SocketTimeoutException: connect timed out
3)ulimit -n
- 属于操作系统参数
- 它是限制一个进程能够同时打开的最大的文件描述符的数量(是为了保护系统,防止1个进程打开文件或socket太多了)
4)TCP_NODELAY
- 属于 SocketChannal 参数(在服务端使用childOption(…)来设置)
- Nagle算法会当发送的数据包攒够了一批了,才会把数据包发送出去,所以可能造成接收方不能及时的收到消息。netty中默认该设置为false(即开启了Nagle算法),建议为了保证及时,设置为true
5)SO_SNDBUF & SO_RCVBUF
- SO_SNDBUF 是发送缓冲区,SO_RCVBUF是接收缓冲区,它两决定了滑动窗口的上限,建议不要调整,操作系统会自动调整
- SO_SNDBUF 属于 SocketChannal 参数
- SO_RCVBUF 既可用于 SocketChannal 参数,也可以用于 ServerSocketChannal 参数(建议设置到 ServerSocketChannal 上)
6)ALLOCATOR
- 属于 SocketChannal 参数
- 用来分配 ByteBuf, ctx.alloc()
追踪配置参数过程:由ChannelConfig 找到默认的实现类 DefaultChannelConfig,在DefaultChannelConfig的成员变量ByteBufAllocator allocator中默认赋值为ByteBufAllocator.DEFAULT,从而找到ByteBufUtil.DEFAULT_ALLOCATOR
ByteBufUtil
public final class ByteBufUtil {static final int WRITE_CHUNK_SIZE = 8192;static final ByteBufAllocator DEFAULT_ALLOCATOR;// ...static {// 系统属性配置: io.netty.allocator.typeString allocType = SystemPropertyUtil.get("io.netty.allocator.type", PlatformDependent.isAndroid() ? "unpooled" : "pooled");allocType = allocType.toLowerCase(Locale.US).trim();ByteBufAllocator alloc;if ("unpooled".equals(allocType)) {// 从这里点击DEFAULT进去可看到io.netty.noPreferDirect系统属性可配置是否不优先使用直接内存,默认false// (即优先使用直接内存)alloc = UnpooledByteBufAllocator.DEFAULT;logger.debug("-Dio.netty.allocator.type: {}", allocType);} else if ("pooled".equals(allocType)) {alloc = PooledByteBufAllocator.DEFAULT;logger.debug("-Dio.netty.allocator.type: {}", allocType);} else {alloc = PooledByteBufAllocator.DEFAULT;logger.debug("-Dio.netty.allocator.type: pooled (unknown: {})", allocType);}DEFAULT_ALLOCATOR = alloc;THREAD_LOCAL_BUFFER_SIZE = SystemPropertyUtil.getInt("io.netty.threadLocalDirectBufferSize", 0);logger.debug("-Dio.netty.threadLocalDirectBufferSize: {}", THREAD_LOCAL_BUFFER_SIZE);MAX_CHAR_BUFFER_SIZE = SystemPropertyUtil.getInt("io.netty.maxThreadLocalCharBufferSize", 16 * 1024);logger.debug("-Dio.netty.maxThreadLocalCharBufferSize: {}", MAX_CHAR_BUFFER_SIZE);}// ...
}
Idea中为java程序添加启动参数(含:VM options、Program arguments、Environment variable
测试
如下配置系统属性,查看服务端分配的ByteBuf类型(注意:是使用ctx.alloc()方法时,分配得到的ByteBuf类型哦!)
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jqxEwsDA-1690809538413)(assets/image-20230730230801541.png)]
TestByteBuf
@Slf4j
public class TestByteBuf {public static void main(String[] args) {new ServerBootstrap().group(new NioEventLoopGroup()).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) {ch.pipeline().addLast(new LoggingHandler());ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, // 前面没有设置任何的解码器, 因此这里的msg就是ByteBuf,// 注意:这里的msg会强制使用直接内存,不受所设置的参数控制,// 因为netty认为直接内存对于网络io的效率更高,Object msg) throws Exception {// 注意: 这里是从ctx中分配的ByteBufByteBuf buf = ctx.alloc().buffer();log.debug("alloc buf {}", buf);}});}}).bind(8080);}
}
TestByteBufClient
@Slf4j
public class TestByteBufClient {public static void main(String[] args) {NioEventLoopGroup worker = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler());ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.writeAndFlush(ctx.alloc().buffer().writeBytes("hello!".getBytes()));}});}});ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("client error", e);} finally {worker.shutdownGracefully();}}
}
测试结果
当不按照图上配置时,即默认的情况如下:使用的是 PooledUnsafeDirectByteBuf
,即默认为池化的直接内存
23:15:16 [DEBUG] [nioEventLoopGroup-2-2] i.n.h.l.LoggingHandler - [id: 0x0e291fc0, L:/127.0.0.1:8080 - R:/127.0.0.1:61682] REGISTERED
23:15:16 [DEBUG] [nioEventLoopGroup-2-2] i.n.h.l.LoggingHandler - [id: 0x0e291fc0, L:/127.0.0.1:8080 - R:/127.0.0.1:61682] ACTIVE
23:15:16 [DEBUG] [nioEventLoopGroup-2-2] i.n.h.l.LoggingHandler - [id: 0x0e291fc0, L:/127.0.0.1:8080 - R:/127.0.0.1:61682] READ: 6B+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f 21 |hello! |
+--------+-------------------------------------------------+----------------+
23:15:16 [DEBUG] [nioEventLoopGroup-2-2] c.z.s.TestByteBuf - alloc buf PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 256)
23:15:16 [DEBUG] [nioEventLoopGroup-2-2] i.n.h.l.LoggingHandler - [id: 0x0e291fc0, L:/127.0.0.1:8080 - R:/127.0.0.1:61682] READ COMPLETE
当按照图上配置时,使用的是:UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf
,即非池化堆内存
23:17:34 [DEBUG] [nioEventLoopGroup-2-2] i.n.h.l.LoggingHandler - [id: 0xfa6477e0, L:/127.0.0.1:8080 - R:/127.0.0.1:61787] REGISTERED
23:17:34 [DEBUG] [nioEventLoopGroup-2-2] i.n.h.l.LoggingHandler - [id: 0xfa6477e0, L:/127.0.0.1:8080 - R:/127.0.0.1:61787] ACTIVE
23:17:34 [DEBUG] [nioEventLoopGroup-2-2] i.n.h.l.LoggingHandler - [id: 0xfa6477e0, L:/127.0.0.1:8080 - R:/127.0.0.1:61787] READ: 6B+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f 21 |hello! |
+--------+-------------------------------------------------+----------------+
23:17:34 [DEBUG] [nioEventLoopGroup-2-2] c.z.s.TestByteBuf - alloc buf UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 0, cap: 256)
23:17:34 [DEBUG] [nioEventLoopGroup-2-2] i.n.h.l.LoggingHandler - [id: 0xfa6477e0, L:/127.0.0.1:8080 - R:/127.0.0.1:61787] READ COMPLETE
7)RCVBUF_ALLOCATOR
- 属于 SocketChannal 参数
- 控制 netty 接收缓冲区大小
- 负责入站数据ByteBuf的初始分配,决定入站缓冲区的大小(并可动态调整),统一采用 direct 直接内存,具体池化还是非池化由 上面的提到的 allocator 决定
测试
客户端向服务端发送消息后,服务端收到这个消息,初始默认封装为ByteBuf,这个初始的ByteBuf设置其实就是由RCVBUF_ALLOCATOR设置决定的(具体规则上面有写)
服务端
@Slf4j
public class TestByteBuf {public static void main(String[] args) {new ServerBootstrap().group(new NioEventLoopGroup()).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) {ch.pipeline().addLast(new LoggingHandler());ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {/*ByteBuf buf = ctx.alloc().buffer();log.debug("alloc buf {}", buf);*/log.info("msg: {}", msg);}});}}).bind(8080);}
}
客户端
@Slf4j
public class TestBacklogClient {public static void main(String[] args) {NioEventLoopGroup worker = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler());ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.writeAndFlush(ctx.alloc().buffer().writeBytes("hello!".getBytes()));}});}});ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("client error", e);} finally {worker.shutdownGracefully();}}
}
输出如下(测试仍然是设置了-Dio.netty.allocator.type=unpooled -Dio.netty.noPreferDirect=true系统属性,但是显然第二个属性设置对此是无效的)
23:50:44 [DEBUG] [nioEventLoopGroup-2-2] i.n.h.l.LoggingHandler - [id: 0x27e1162b, L:/127.0.0.1:8080 - R:/127.0.0.1:62407] REGISTERED
23:50:44 [DEBUG] [nioEventLoopGroup-2-2] i.n.h.l.LoggingHandler - [id: 0x27e1162b, L:/127.0.0.1:8080 - R:/127.0.0.1:62407] ACTIVE
23:50:44 [DEBUG] [nioEventLoopGroup-2-2] i.n.h.l.LoggingHandler - [id: 0x27e1162b, L:/127.0.0.1:8080 - R:/127.0.0.1:62407] READ: 6B+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f 21 |hello! |
+--------+-------------------------------------------------+----------------+
23:50:44 [INFO ] [nioEventLoopGroup-2-2] c.z.s.TestByteBuf - msg: UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(ridx: 0, widx: 6, cap: 1024)
23:50:44 [DEBUG] [nioEventLoopGroup-2-2] i.n.h.l.LoggingHandler - [id: 0x27e1162b, L:/127.0.0.1:8080 - R:/127.0.0.1:62407] READ COMPLETE
源码要点
跟源码时候,注意以下几个关键的类:
-
AbstractNioByteChannel#read()方法
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-EWouPHAv-1690809538416)(assets/image-20230730234754420.png)]
-
DefaultMaxMessageRecvByteBufAllocator#allocate(ByteBufAllocator alloc)
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-e0jhsMzy-1690809538416)(assets/image-20230730234813577.png)]
-
AdaptiveRecvByteBufAllocator默认无参构造
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-My61lpdG-1690809538417)(assets/image-20230730234824342.png)]
1.3 RPC 框架
1)准备工作
这些代码可以认为是现成的,无需从头编写练习
为了简化起见,在原来聊天项目的基础上新增 Rpc 请求和响应消息
修改Message
@Data
public abstract class Message implements Serializable {// 省略旧的代码public static final int RPC_MESSAGE_TYPE_REQUEST = 101;public static final int RPC_MESSAGE_TYPE_RESPONSE = 102;static {// ...messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);}}
RpcRequestMessage
请求消息
@Getter
@ToString(callSuper = true)
public class RpcRequestMessage extends Message {/*** 调用的接口全限定名,服务端根据它找到实现*/private String interfaceName;/*** 调用接口中的方法名*/private String methodName;/*** 方法返回类型*/private Class<?> returnType;/*** 方法参数类型数组*/private Class[] parameterTypes;/*** 方法参数值数组*/private Object[] parameterValue;public RpcRequestMessage(int sequenceId, String interfaceName, String methodName, Class<?> returnType, Class[] parameterTypes, Object[] parameterValue) {super.setSequenceId(sequenceId);this.interfaceName = interfaceName;this.methodName = methodName;this.returnType = returnType;this.parameterTypes = parameterTypes;this.parameterValue = parameterValue;}@Overridepublic int getMessageType() {return RPC_MESSAGE_TYPE_REQUEST;}
}
RpcResponseMessage
响应消息
@Data
@ToString(callSuper = true)
public class RpcResponseMessage extends Message {/*** 返回值*/private Object returnValue;/*** 异常值*/private Exception exceptionValue;@Overridepublic int getMessageType() {return RPC_MESSAGE_TYPE_RESPONSE;}
}
RpcServer服务器架子
@Slf4j
public class RpcServer {public static void main(String[] args) {NioEventLoopGroup boss = new NioEventLoopGroup();NioEventLoopGroup worker = new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();// rpc 请求消息处理器,待实现RpcRequestMessageHandler RPC_HANDLER = new RpcRequestMessageHandler();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss, worker);serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProcotolFrameDecoder());ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);ch.pipeline().addLast(RPC_HANDLER);}});Channel channel = serverBootstrap.bind(8080).sync().channel();channel.closeFuture().sync();} catch (InterruptedException e) {log.error("server error", e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();}}
}
RpcClient客户端架子
public class RpcClient {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();// rpc 响应消息处理器,待实现RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(group);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProcotolFrameDecoder());ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);ch.pipeline().addLast(RPC_HANDLER);}});Channel channel = bootstrap.connect("localhost", 8080).sync().channel();channel.closeFuture().sync();} catch (Exception e) {log.error("client error", e);} finally {group.shutdownGracefully();}}
}
ServicesFactory
服务器端的 service 获取
public class ServicesFactory {static Properties properties;static Map<Class<?>, Object> map = new ConcurrentHashMap<>();static {try (InputStream in = Config.class.getResourceAsStream("/application.properties")) {properties = new Properties();properties.load(in);Set<String> names = properties.stringPropertyNames();for (String name : names) {if (name.endsWith("Service")) {Class<?> interfaceClass = Class.forName(name);Class<?> instanceClass = Class.forName(properties.getProperty(name));map.put(interfaceClass, instanceClass.newInstance());}}} catch (IOException | ClassNotFoundException | InstantiationException | IllegalAccessException e) {throw new ExceptionInInitializerError(e);}}public static <T> T getService(Class<T> interfaceClass) {return (T) map.get(interfaceClass);}
}
相关配置 application.properties
serializer.algorithm=Json
cn.itcast.server.service.HelloService=cn.itcast.server.service.HelloServiceImpl
2)服务器 handler
RpcRequestMessageHandler
@Slf4j
@ChannelHandler.Sharable
public class RpcRequestMessageHandler extends SimpleChannelInboundHandler<RpcRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage message) {RpcResponseMessage response = new RpcResponseMessage();response.setSequenceId(message.getSequenceId());try {// 获取真正的实现对象Object service = ServicesFactory.getService(Class.forName(message.getInterfaceName()));// 获取要调用的方法Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes());// 调用方法Object invoke = method.invoke(service, message.getParameterValue());// 调用成功response.setReturnValue(invoke);} catch (Exception e) {e.printStackTrace();// 调用异常response.setExceptionValue(e);}// 返回结果ctx.writeAndFlush(response);}
}
3)RpcClient第一版
只发消息
@Slf4j
public class RpcClient {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(group);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProcotolFrameDecoder());ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);ch.pipeline().addLast(RPC_HANDLER);}});// 客户端请求与服务端建立连接Channel channel = bootstrap.connect("localhost", 8080).sync().channel();// 客户端发送rpc请求消息//(这个writeAndFlush方法也是命令调用, 发出写出消息命令后, 就会立即返回, 真正写出消息是交给其它线程完成)ChannelFuture future = channel.writeAndFlush(new RpcRequestMessage( 1,"cn.itcast.server.service.HelloService","sayHello",String.class,new Class[]{String.class},new Object[]{"张三"}))// 这里可以使用addListener方法对上面这个writeAndFlush作调试//(也可以调用future.sync()对future在当前线程作同步调用)//(这里的promise就是上面的future, 是同一个对象).addListener(promise -> {if (!promise.isSuccess()) {Throwable cause = promise.cause();log.error("error", cause);}});channel.closeFuture().sync(); // 调用sync()方法, 同步阻塞, 当channel关闭时, 才会向下运行} catch (Exception e) {log.error("client error", e);} finally {group.shutdownGracefully();}}
}
4)客户端 handler 第一版
@Slf4j
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {log.debug("{}", msg);}
}
流程分析
客户端请求与服务端建立连接,然后客户端发送了1个rpc请求消息,这个请求消息就找到了客户端的所有出站处理器,从下往上找取依次执行(首先是自定义编解码器对消息编码、日志处理器记录日志),最后就发给了服务端。消息到达服务器端,服务端拿到消息后,进行入站处理器依次处理(首先是粘包半包处理、日志处理器记录日志、自定义编解码器对消息解码、服务端自定义rpc请求处理器),在服务端自定义rpc请求处理器中通过获取解码后的消息,获取到反射相关的信息,找到 要调用的目标对象和方法,进行反射调用,获得反射调用结果,然后把调用结果(成功就是结果,失败就是异常封装到rpc响应消息对象中)通过ctx写回给客户端,服务端写回响应消息之前,又会依次经过服务端出站处理器(首先是自定义编解码器对消息编码、日志处理器记录日志),最后就发给了客户端。消息到达客户端,客户端拿到消息后,再做入站处理器依次处理(首先是粘包半包处理、日志处理器记录日志、自定义编解码器对消息解码、客户端rpc响应处理器),其中客户端rpc响应处理器最后对消息进行了打印。
Gson问题
上面的自定义编解码器,在序列化 和反序列化 Class类的时候,会报错。因此,需要往Gson对象中注册类型适配器,使用注册过类型适配器的Gson对象去做转换。下面创建了ClassCodec对象(实现了JsonSerializer<Class<?>>
, JsonDeserializer<Class<?>>
接口),并注册到了Gson中,以支持Class类的json转换操作。
/*** 用于扩展序列化、反序列化算法*/
public interface Serializer {// 反序列化方法<T> T deserialize(Class<T> clazz, byte[] bytes);// 序列化方法<T> byte[] serialize(T object);enum Algorithm implements Serializer {Java {// ...java序列化和反序列化不作修改, 省略},Json {@Overridepublic <T> T deserialize(Class<T> clazz, byte[] bytes) {Gson gson = new GsonBuilder().registerTypeAdapter(Class.class, new Serializer.ClassCodec()).create();String json = new String(bytes, StandardCharsets.UTF_8);return gson.fromJson(json, clazz);}@Overridepublic <T> byte[] serialize(T object) {Gson gson = new GsonBuilder().registerTypeAdapter(Class.class, new Serializer.ClassCodec()).create();String json = gson.toJson(object);return json.getBytes(StandardCharsets.UTF_8);}}}class ClassCodec implements JsonSerializer<Class<?>>, JsonDeserializer<Class<?>> {@Overridepublic Class<?> deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException {try {String str = json.getAsString();return Class.forName(str);} catch (ClassNotFoundException e) {throw new JsonParseException(e);}}@Override // String.classpublic JsonElement serialize(Class<?> src, Type typeOfSrc, JsonSerializationContext context) {// class -> jsonreturn new JsonPrimitive(src.getName());}}
}
5)RpcClientManager
-
获得channel:使用netty的Bootstrap客户端启动器连接netty服务端,获得channel,并将此channel使用单例模式设计成唯一实例
-
使用代理发起rpc请求:使用动态代理,屏蔽发起rpc请求是封装消息的过程(把这个复杂的过程封装到jdk的InvocationHandler中),然后使用channel写出消息
-
两个线程通信拿结果的问题:发起rpc请求的线程是main线程,获得结果的线程是netty的nio线程(netty中的eventLoop),因此,需要把nio线程获得的结果给到main线程的调用处(提示:使用Promise可以做这件事)。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-zzI0YCUf-1690809538418)(assets/image-20230731121531720.png)]
@Slf4j
public class RpcClientManager {public static void main(String[] args) {HelloService service = getProxyService(HelloService.class);System.out.println(service.sayHello("zhangsan"));// System.out.println(service.sayHello("lisi"));// System.out.println(service.sayHello("wangwu"));}// 创建代理类public static <T> T getProxyService(Class<T> serviceClass) {ClassLoader loader = serviceClass.getClassLoader();Class<?>[] interfaces = new Class[]{serviceClass};Object o = Proxy.newProxyInstance(loader, interfaces, (proxy, method, args) -> {// 1. 将方法调用转换为 消息对象int sequenceId = SequenceIdGenerator.nextId(); RpcRequestMessage msg = new RpcRequestMessage(sequenceId,serviceClass.getName(),method.getName(),method.getReturnType(),method.getParameterTypes(),args);// 2. 将消息对象发送出去getChannel().writeAndFlush(msg);// 3. 准备一个空 Promise 对象,来接收结果 指定 promise 对象 异步接收结果线程DefaultPromise<Object> promise = new DefaultPromise<>(getChannel().eventLoop());RpcResponseMessageHandler.PROMISES.put(sequenceId, promise);//promise.addListener(future -> { // => 这个回调方法就是由创建DefaultPromise时构造方法中所传入的// 线程 // 的 异步接收结果线程 来执行。但是这里没有使用这种用法, 这里// }); // 使用的是下面在调用线程上(main线程)作同步等待// 4. 等待 promise 结果// (与promise.sync()的区别在于:promise.await()不会抛出异常;而promise.sync()会抛出异常)promise.await(); if(promise.isSuccess()) {// 调用正常return promise.getNow();} else {// 调用失败throw new RuntimeException(promise.cause());}});return (T) o;}private static Channel channel = null;private static final Object LOCK = new Object();// 获取唯一的 channel 对象public static Channel getChannel() {if (channel != null) {return channel;}synchronized (LOCK) {if (channel != null) {return channel;}initChannel();return channel;}}// 初始化 channel 方法private static void initChannel() {NioEventLoopGroup group = new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(group);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProcotolFrameDecoder());ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);ch.pipeline().addLast(RPC_HANDLER);}});try {channel = bootstrap.connect("localhost", 8080).sync().channel();// 当channel关闭时, 执行的异步任务(由原来的同步, 改成了异步)channel.closeFuture().addListener(future -> {group.shutdownGracefully();});} catch (Exception e) {log.error("client error", e);}}
}
SequenceIdGenerator
public abstract class SequenceIdGenerator {private static final AtomicInteger id = new AtomicInteger();public static int nextId() {return id.incrementAndGet();}
}
流程分析
动态代理执行流程分析: main线程调用代理对象的方法并传入方法参数,触发代理对象的InvocationHandler#invoke方法执行,使用channel写出消息(发出rpc请求),但是不能立即得到响应结果,因此创建1个Promise对象来接收响应消息,而Promise放在ConcurrentHashMap中(Promise与rpc请求绑定的sequenceId绑定到一起保存在这个Map中),然后调用promise.await()方法等待结果。当响应回来后,在RpcResponseMessageHandler中(由nio线程执行,即eventLoop)根据响应的sequenceId从ConcurrentHashMap中找到对应的Promise,将结果设置到此Promise对象中,一旦Promise对象中被填入结果(不管是成功还是失败),main线程中调用promise.await()方法就会停止阻塞而向下运行,然后获取响应结果,将结果返回给代理对象的调用者。
6)RpcResponseMessageHandler
@Slf4j
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {// 序号 用来接收结果的 promise 对象public static final Map<Integer, Promise<Object>> PROMISES = new ConcurrentHashMap<>();@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {log.debug("{}", msg);// 根据响应消息的的sequenceId 拿到对应的 空promise, 并且移除该Promise对象Promise<Object> promise = PROMISES.remove(msg.getSequenceId());if (promise != null) {Object returnValue = msg.getReturnValue();Exception exceptionValue = msg.getExceptionValue();// 当promise被填充值后, 主线程使用promise.await()方法就会停止阻塞向下运行if(exceptionValue != null) {promise.setFailure(exceptionValue);} else {promise.setSuccess(returnValue);}}}
}
2. 源码分析
2.1 启动剖析
我们就来看看 netty 中对下面的代码是怎样进行处理的
//1 netty 中使用 NioEventLoopGroup (简称 nio boss 线程)来封装线程和 selector
Selector selector = Selector.open(); //2 创建 NioServerSocketChannel,同时会初始化它关联的 handler,以及为原生 ssc 存储 config
NioServerSocketChannel attachment = new NioServerSocketChannel();//3 创建 NioServerSocketChannel 时,创建了 java 原生的 ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);//4 启动 nio boss 线程执行接下来的操作//5 注册(仅关联 selector 和 NioServerSocketChannel),未关注事件
SelectionKey selectionKey = serverSocketChannel.register(selector, 0, attachment);//6 head -> 初始化器 -> ServerBootstrapAcceptor -> tail,初始化器是一次性的,只为添加 acceptor//7 绑定端口
serverSocketChannel.bind(new InetSocketAddress(8080));//8 触发 channel active 事件,在 head 中关注 op_accept 事件
selectionKey.interestOps(SelectionKey.OP_ACCEPT);
入口 io.netty.bootstrap.ServerBootstrap#bind
关键代码 io.netty.bootstrap.AbstractBootstrap#doBind
private ChannelFuture doBind(final SocketAddress localAddress) {// 1. 执行初始化和注册 regFuture 会由 initAndRegister 设置其是否完成,从而回调 3.2 处代码final ChannelFuture regFuture = initAndRegister();final Channel channel = regFuture.channel();if (regFuture.cause() != null) {return regFuture;}// 2. 因为是 initAndRegister 异步执行,需要分两种情况来看,调试时也需要通过 suspend 断点类型加以区分// 2.1 如果已经完成if (regFuture.isDone()) {ChannelPromise promise = channel.newPromise();// 3.1 立刻调用 doBind0doBind0(regFuture, channel, localAddress, promise);return promise;} // 2.2 还没有完成else {final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);// 3.2 回调 doBind0regFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {Throwable cause = future.cause();if (cause != null) {// 处理异常...promise.setFailure(cause);} else {promise.registered();// 3. 由注册线程去执行 doBind0doBind0(regFuture, channel, localAddress, promise);}}});return promise;}
}
关键代码 io.netty.bootstrap.AbstractBootstrap#initAndRegister
final ChannelFuture initAndRegister() {Channel channel = null;try {channel = channelFactory.newChannel();// 1.1 初始化 - 做的事就是添加一个初始化器 ChannelInitializerinit(channel);} catch (Throwable t) {// 处理异常...return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);}// 1.2 注册 - 做的事就是将原生 channel 注册到 selector 上ChannelFuture regFuture = config().group().register(channel);if (regFuture.cause() != null) {// 处理异常...}return regFuture;
}
关键代码 io.netty.bootstrap.ServerBootstrap#init
// 这里 channel 实际上是 NioServerSocketChannel
void init(Channel channel) throws Exception {final Map<ChannelOption<?>, Object> options = options0();synchronized (options) {setChannelOptions(channel, options, logger);}final Map<AttributeKey<?>, Object> attrs = attrs0();synchronized (attrs) {for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {@SuppressWarnings("unchecked")AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();channel.attr(key).set(e.getValue());}}ChannelPipeline p = channel.pipeline();final EventLoopGroup currentChildGroup = childGroup;final ChannelHandler currentChildHandler = childHandler;final Entry<ChannelOption<?>, Object>[] currentChildOptions;final Entry<AttributeKey<?>, Object>[] currentChildAttrs;synchronized (childOptions) {currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));}synchronized (childAttrs) {currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));}// 为 NioServerSocketChannel 添加初始化器p.addLast(new ChannelInitializer<Channel>() {@Overridepublic void initChannel(final Channel ch) throws Exception {final ChannelPipeline pipeline = ch.pipeline();ChannelHandler handler = config.handler();if (handler != null) {pipeline.addLast(handler);}// 初始化器的职责是将 ServerBootstrapAcceptor 加入至 NioServerSocketChannelch.eventLoop().execute(new Runnable() {@Overridepublic void run() {pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});}});
}
关键代码 io.netty.channel.AbstractChannel.AbstractUnsafe#register
public final void register(EventLoop eventLoop, final ChannelPromise promise) {// 一些检查,略...AbstractChannel.this.eventLoop = eventLoop;if (eventLoop.inEventLoop()) {register0(promise);} else {try {// 首次执行 execute 方法时,会启动 nio 线程,之后注册等操作在 nio 线程上执行// 因为只有一个 NioServerSocketChannel 因此,也只会有一个 boss nio 线程// 这行代码完成的事实是 main -> nio boss 线程的切换eventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);}});} catch (Throwable t) {// 日志记录...closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}}
}
io.netty.channel.AbstractChannel.AbstractUnsafe#register0
private void register0(ChannelPromise promise) {try {if (!promise.setUncancellable() || !ensureOpen(promise)) {return;}boolean firstRegistration = neverRegistered;// 1.2.1 原生的 nio channel 绑定到 selector 上,注意此时没有注册 selector 关注事件,附件为 NioServerSocketChanneldoRegister();neverRegistered = false;registered = true;// 1.2.2 执行 NioServerSocketChannel 初始化器的 initChannelpipeline.invokeHandlerAddedIfNeeded();// 回调 3.2 io.netty.bootstrap.AbstractBootstrap#doBind0safeSetSuccess(promise);pipeline.fireChannelRegistered();// 对应 server socket channel 还未绑定,isActive 为 falseif (isActive()) {if (firstRegistration) {pipeline.fireChannelActive();} else if (config().isAutoRead()) {beginRead();}}} catch (Throwable t) {// Close the channel directly to avoid FD leak.closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}
}
关键代码 io.netty.channel.ChannelInitializer#initChannel
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {if (initMap.add(ctx)) { // Guard against re-entrance.try {// 1.2.2.1 执行初始化initChannel((C) ctx.channel());} catch (Throwable cause) {exceptionCaught(ctx, cause);} finally {// 1.2.2.2 移除初始化器ChannelPipeline pipeline = ctx.pipeline();if (pipeline.context(this) != null) {pipeline.remove(this);}}return true;}return false;
}
关键代码 io.netty.bootstrap.AbstractBootstrap#doBind0
// 3.1 或 3.2 执行 doBind0
private static void doBind0(final ChannelFuture regFuture, final Channel channel,final SocketAddress localAddress, final ChannelPromise promise) {channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {if (regFuture.isSuccess()) {channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {promise.setFailure(regFuture.cause());}}});
}
关键代码 io.netty.channel.AbstractChannel.AbstractUnsafe#bind
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {assertEventLoop();if (!promise.setUncancellable() || !ensureOpen(promise)) {return;}if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&localAddress instanceof InetSocketAddress &&!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&!PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {// 记录日志...}boolean wasActive = isActive();try {// 3.3 执行端口绑定doBind(localAddress);} catch (Throwable t) {safeSetFailure(promise, t);closeIfClosed();return;}if (!wasActive && isActive()) {invokeLater(new Runnable() {@Overridepublic void run() {// 3.4 触发 active 事件pipeline.fireChannelActive();}});}safeSetSuccess(promise);
}
3.3 关键代码 io.netty.channel.socket.nio.NioServerSocketChannel#doBind
protected void doBind(SocketAddress localAddress) throws Exception {if (PlatformDependent.javaVersion() >= 7) {javaChannel().bind(localAddress, config.getBacklog());} else {javaChannel().socket().bind(localAddress, config.getBacklog());}
}
3.4 关键代码 io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive
public void channelActive(ChannelHandlerContext ctx) {ctx.fireChannelActive();// 触发 read (NioServerSocketChannel 上的 read 不是读取数据,只是为了触发 channel 的事件注册)readIfIsAutoRead();
}
关键代码 io.netty.channel.nio.AbstractNioChannel#doBeginRead
protected void doBeginRead() throws Exception {// Channel.read() or ChannelHandlerContext.read() was calledfinal SelectionKey selectionKey = this.selectionKey;if (!selectionKey.isValid()) {return;}readPending = true;final int interestOps = selectionKey.interestOps();// readInterestOp 取值是 16,在 NioServerSocketChannel 创建时初始化好,代表关注 accept 事件if ((interestOps & readInterestOp) == 0) {selectionKey.interestOps(interestOps | readInterestOp);}
}
2.2 NioEventLoop 剖析
NioEventLoop 线程不仅要处理 IO 事件,还要处理 Task(包括普通任务和定时任务),
提交任务代码 io.netty.util.concurrent.SingleThreadEventExecutor#execute
public void execute(Runnable task) {if (task == null) {throw new NullPointerException("task");}boolean inEventLoop = inEventLoop();// 添加任务,其中队列使用了 jctools 提供的 mpsc 无锁队列addTask(task);if (!inEventLoop) {// inEventLoop 如果为 false 表示由其它线程来调用 execute,即首次调用,这时需要向 eventLoop 提交首个任务,启动死循环,会执行到下面的 doStartThreadstartThread();if (isShutdown()) {// 如果已经 shutdown,做拒绝逻辑,代码略...}}if (!addTaskWakesUp && wakesUpForTask(task)) {// 如果线程由于 IO select 阻塞了,添加的任务的线程需要负责唤醒 NioEventLoop 线程wakeup(inEventLoop);}
}
唤醒 select 阻塞线程io.netty.channel.nio.NioEventLoop#wakeup
@Override
protected void wakeup(boolean inEventLoop) {if (!inEventLoop && wakenUp.compareAndSet(false, true)) {selector.wakeup();}
}
启动 EventLoop 主循环 io.netty.util.concurrent.SingleThreadEventExecutor#doStartThread
private void doStartThread() {assert thread == null;executor.execute(new Runnable() {@Overridepublic void run() {// 将线程池的当前线程保存在成员变量中,以便后续使用thread = Thread.currentThread();if (interrupted) {thread.interrupt();}boolean success = false;updateLastExecutionTime();try {// 调用外部类 SingleThreadEventExecutor 的 run 方法,进入死循环,run 方法见下SingleThreadEventExecutor.this.run();success = true;} catch (Throwable t) {logger.warn("Unexpected exception from an event executor: ", t);} finally {// 清理工作,代码略...}}});
}
io.netty.channel.nio.NioEventLoop#run
主要任务是执行死循环,不断看有没有新任务,有没有 IO 事件
protected void run() {for (;;) {try {try {// calculateStrategy 的逻辑如下:// 有任务,会执行一次 selectNow,清除上一次的 wakeup 结果,无论有没有 IO 事件,都会跳过 switch// 没有任务,会匹配 SelectStrategy.SELECT,看是否应当阻塞switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {case SelectStrategy.CONTINUE:continue;case SelectStrategy.BUSY_WAIT:case SelectStrategy.SELECT:// 因为 IO 线程和提交任务线程都有可能执行 wakeup,而 wakeup 属于比较昂贵的操作,因此使用了一个原子布尔对象 wakenUp,它取值为 true 时,表示该由当前线程唤醒// 进行 select 阻塞,并设置唤醒状态为 falseboolean oldWakenUp = wakenUp.getAndSet(false);// 如果在这个位置,非 EventLoop 线程抢先将 wakenUp 置为 true,并 wakeup// 下面的 select 方法不会阻塞// 等 runAllTasks 处理完成后,到再循环进来这个阶段新增的任务会不会及时执行呢?// 因为 oldWakenUp 为 true,因此下面的 select 方法就会阻塞,直到超时// 才能执行,让 select 方法无谓阻塞select(oldWakenUp);if (wakenUp.get()) {selector.wakeup();}default:}} catch (IOException e) {rebuildSelector0();handleLoopException(e);continue;}cancelledKeys = 0;needsToSelectAgain = false;// ioRatio 默认是 50final int ioRatio = this.ioRatio;if (ioRatio == 100) {try {processSelectedKeys();} finally {// ioRatio 为 100 时,总是运行完所有非 IO 任务runAllTasks();}} else { final long ioStartTime = System.nanoTime();try {processSelectedKeys();} finally {// 记录 io 事件处理耗时final long ioTime = System.nanoTime() - ioStartTime;// 运行非 IO 任务,一旦超时会退出 runAllTasksrunAllTasks(ioTime * (100 - ioRatio) / ioRatio);}}} catch (Throwable t) {handleLoopException(t);}try {if (isShuttingDown()) {closeAll();if (confirmShutdown()) {return;}}} catch (Throwable t) {handleLoopException(t);}}
}
注意
这里有个费解的地方就是 wakeup,它既可以由提交任务的线程来调用(比较好理解),也可以由 EventLoop 线程来调用(比较费解),这里要知道 wakeup 方法的效果:
- 由非 EventLoop 线程调用,会唤醒当前在执行 select 阻塞的 EventLoop 线程
- 由 EventLoop 自己调用,会本次的 wakeup 会取消下一次的 select 操作
参考下图
io.netty.channel.nio.NioEventLoop#select
private void select(boolean oldWakenUp) throws IOException {Selector selector = this.selector;try {int selectCnt = 0;long currentTimeNanos = System.nanoTime();// 计算等待时间// * 没有 scheduledTask,超时时间为 1s// * 有 scheduledTask,超时时间为 `下一个定时任务执行时间 - 当前时间`long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);for (;;) {long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;// 如果超时,退出循环if (timeoutMillis <= 0) {if (selectCnt == 0) {selector.selectNow();selectCnt = 1;}break;}// 如果期间又有 task 退出循环,如果没这个判断,那么任务就会等到下次 select 超时时才能被执行// wakenUp.compareAndSet(false, true) 是让非 NioEventLoop 不必再执行 wakeupif (hasTasks() && wakenUp.compareAndSet(false, true)) {selector.selectNow();selectCnt = 1;break;}// select 有限时阻塞// 注意 nio 有 bug,当 bug 出现时,select 方法即使没有时间发生,也不会阻塞住,导致不断空轮询,cpu 占用 100%int selectedKeys = selector.select(timeoutMillis);// 计数加 1selectCnt ++;// 醒来后,如果有 IO 事件、或是由非 EventLoop 线程唤醒,或者有任务,退出循环if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {break;}if (Thread.interrupted()) {// 线程被打断,退出循环// 记录日志selectCnt = 1;break;}long time = System.nanoTime();if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {// 如果超时,计数重置为 1,下次循环就会 breakselectCnt = 1;} // 计数超过阈值,由 io.netty.selectorAutoRebuildThreshold 指定,默认 512// 这是为了解决 nio 空轮询 bugelse if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {// 重建 selectorselector = selectRebuildSelector(selectCnt);selectCnt = 1;break;}currentTimeNanos = time;}if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {// 记录日志}} catch (CancelledKeyException e) {// 记录日志}
}
处理 keys io.netty.channel.nio.NioEventLoop#processSelectedKeys
private void processSelectedKeys() {if (selectedKeys != null) {// 通过反射将 Selector 实现类中的就绪事件集合替换为 SelectedSelectionKeySet // SelectedSelectionKeySet 底层为数组实现,可以提高遍历性能(原本为 HashSet)processSelectedKeysOptimized();} else {processSelectedKeysPlain(selector.selectedKeys());}
}
io.netty.channel.nio.NioEventLoop#processSelectedKey
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();// 当 key 取消或关闭时会导致这个 key 无效if (!k.isValid()) {// 无效时处理...return;}try {int readyOps = k.readyOps();// 连接事件if ((readyOps & SelectionKey.OP_CONNECT) != 0) {int ops = k.interestOps();ops &= ~SelectionKey.OP_CONNECT;k.interestOps(ops);unsafe.finishConnect();}// 可写事件if ((readyOps & SelectionKey.OP_WRITE) != 0) {ch.unsafe().forceFlush();}// 可读或可接入事件if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {// 如果是可接入 io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read// 如果是可读 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#readunsafe.read();}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}
}
2.3 accept 剖析
nio 中如下代码,在 netty 中的流程
//1 阻塞直到事件发生
selector.select();Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) { //2 拿到一个事件SelectionKey key = iter.next();//3 如果是 accept 事件if (key.isAcceptable()) {//4 执行 acceptSocketChannel channel = serverSocketChannel.accept();channel.configureBlocking(false);//5 关注 read 事件channel.register(selector, SelectionKey.OP_READ);}// ...
}
先来看可接入事件处理(accept)
io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read
public void read() {assert eventLoop().inEventLoop();final ChannelConfig config = config();final ChannelPipeline pipeline = pipeline(); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();allocHandle.reset(config);boolean closed = false;Throwable exception = null;try {try {do {// doReadMessages 中执行了 accept 并创建 NioSocketChannel 作为消息放入 readBuf// readBuf 是一个 ArrayList 用来缓存消息int localRead = doReadMessages(readBuf);if (localRead == 0) {break;}if (localRead < 0) {closed = true;break;}// localRead 为 1,就一条消息,即接收一个客户端连接allocHandle.incMessagesRead(localRead);} while (allocHandle.continueReading());} catch (Throwable t) {exception = t;}int size = readBuf.size();for (int i = 0; i < size; i ++) {readPending = false;// 触发 read 事件,让 pipeline 上的 handler 处理,这时是处理// io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelReadpipeline.fireChannelRead(readBuf.get(i));}readBuf.clear();allocHandle.readComplete();pipeline.fireChannelReadComplete();if (exception != null) {closed = closeOnReadError(exception);pipeline.fireExceptionCaught(exception);}if (closed) {inputShutdown = true;if (isOpen()) {close(voidPromise());}}} finally {if (!readPending && !config.isAutoRead()) {removeReadOp();}}
}
关键代码 io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead
public void channelRead(ChannelHandlerContext ctx, Object msg) {// 这时的 msg 是 NioSocketChannelfinal Channel child = (Channel) msg;// NioSocketChannel 添加 childHandler 即初始化器child.pipeline().addLast(childHandler);// 设置选项setChannelOptions(child, childOptions, logger);for (Entry<AttributeKey<?>, Object> e: childAttrs) {child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());}try {// 注册 NioSocketChannel 到 nio worker 线程,接下来的处理也移交至 nio worker 线程childGroup.register(child).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {forceClose(child, future.cause());}}});} catch (Throwable t) {forceClose(child, t);}
}
又回到了熟悉的 io.netty.channel.AbstractChannel.AbstractUnsafe#register
方法
public final void register(EventLoop eventLoop, final ChannelPromise promise) {// 一些检查,略...AbstractChannel.this.eventLoop = eventLoop;if (eventLoop.inEventLoop()) {register0(promise);} else {try {// 这行代码完成的事实是 nio boss -> nio worker 线程的切换eventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);}});} catch (Throwable t) {// 日志记录...closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}}
}
io.netty.channel.AbstractChannel.AbstractUnsafe#register0
private void register0(ChannelPromise promise) {try {if (!promise.setUncancellable() || !ensureOpen(promise)) {return;}boolean firstRegistration = neverRegistered;doRegister();neverRegistered = false;registered = true;// 执行初始化器,执行前 pipeline 中只有 head -> 初始化器 -> tailpipeline.invokeHandlerAddedIfNeeded();// 执行后就是 head -> logging handler -> my handler -> tailsafeSetSuccess(promise);pipeline.fireChannelRegistered();if (isActive()) {if (firstRegistration) {// 触发 pipeline 上 active 事件pipeline.fireChannelActive();} else if (config().isAutoRead()) {beginRead();}}} catch (Throwable t) {closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}
}
回到了熟悉的代码 io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive
public void channelActive(ChannelHandlerContext ctx) {ctx.fireChannelActive();// 触发 read (NioSocketChannel 这里 read,只是为了触发 channel 的事件注册,还未涉及数据读取)readIfIsAutoRead();
}
io.netty.channel.nio.AbstractNioChannel#doBeginRead
protected void doBeginRead() throws Exception {// Channel.read() or ChannelHandlerContext.read() was calledfinal SelectionKey selectionKey = this.selectionKey;if (!selectionKey.isValid()) {return;}readPending = true;// 这时候 interestOps 是 0final int interestOps = selectionKey.interestOps();if ((interestOps & readInterestOp) == 0) {// 关注 read 事件selectionKey.interestOps(interestOps | readInterestOp);}
}
2.4 read 剖析
再来看可读事件 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read
,注意发送的数据未必能够一次读完,因此会触发多次 nio read 事件,一次事件内会触发多次 pipeline read,一次事件会触发一次 pipeline read complete
public final void read() {final ChannelConfig config = config();if (shouldBreakReadReady(config)) {clearReadPending();return;}final ChannelPipeline pipeline = pipeline();// io.netty.allocator.type 决定 allocator 的实现final ByteBufAllocator allocator = config.getAllocator();// 用来分配 byteBuf,确定单次读取大小final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();allocHandle.reset(config);ByteBuf byteBuf = null;boolean close = false;try {do {byteBuf = allocHandle.allocate(allocator);// 读取allocHandle.lastBytesRead(doReadBytes(byteBuf));if (allocHandle.lastBytesRead() <= 0) {byteBuf.release();byteBuf = null;close = allocHandle.lastBytesRead() < 0;if (close) {readPending = false;}break;}allocHandle.incMessagesRead(1);readPending = false;// 触发 read 事件,让 pipeline 上的 handler 处理,这时是处理 NioSocketChannel 上的 handlerpipeline.fireChannelRead(byteBuf);byteBuf = null;} // 是否要继续循环while (allocHandle.continueReading());allocHandle.readComplete();// 触发 read complete 事件pipeline.fireChannelReadComplete();if (close) {closeOnRead(pipeline);}} catch (Throwable t) {handleReadException(pipeline, byteBuf, t, close, allocHandle);} finally {if (!readPending && !config.isAutoRead()) {removeReadOp();}}
}
io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator.MaxMessageHandle#continueReading(io.netty.util.UncheckedBooleanSupplier)
public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {return // 一般为 trueconfig.isAutoRead() &&// respectMaybeMoreData 默认为 true// maybeMoreDataSupplier 的逻辑是如果预期读取字节与实际读取字节相等,返回 true(!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&// 小于最大次数,maxMessagePerRead 默认 16totalMessages < maxMessagePerRead &&// 实际读到了数据totalBytesRead > 0;
}