归档
- GitHub: Netty-读写原理
读原理
-
参考:选择器-监听-原理 sign_o_002
-
主要看
NioByteUnsafe#read()
- 相当于读取底层信道中的字节
-
io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe
- 下游链处理参考:处理器链-读流程
protected class NioByteUnsafe extends AbstractNioUnsafe {@Overridepublic final void read() {... // 校验处理final ChannelPipeline pipeline = pipeline();/*** allocator 用于创建 ByteBuf 实例* 测试时为 io.netty.buffer.PooledByteBufAllocator 实例* allocHandle 用于计算 ByteBuf 大小(初始时的)* 测试时为 io.netty.channel.AdaptiveRecvByteBufAllocator.HandleImpl 实例*/final ByteBufAllocator allocator = config.getAllocator();final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); // sign_m_001 allocHandle.reset(config); // 如果是缓存的 allocHandle 则重置下ByteBuf byteBuf = null;boolean close = false;try {do {byteBuf = allocHandle.allocate(allocator); // 分配 ByteBufallocHandle.lastBytesRead(doReadBytes(byteBuf)); // sign_m_010 读取信道内容if (allocHandle.lastBytesRead() <= 0) {... // 无数据可读,设置 close 为 true 等处理break;}allocHandle.incMessagesRead(1);pipeline.fireChannelRead(byteBuf); // sign_o_010 发送读取的字节byteBuf = null;} while (allocHandle.continueReading());allocHandle.readComplete();pipeline.fireChannelReadComplete(); // sign_o_020 发送读取完成事件if (close) {closeOnRead(pipeline); // 关闭读}} ... // catch finally }}
io.netty.channel.AbstractChannel.AbstractUnsafe
protected abstract class AbstractUnsafe implements Unsafe {private RecvByteBufAllocator.Handle recvHandle;// sign_m_001@Overridepublic RecvByteBufAllocator.Handle recvBufAllocHandle() {if (recvHandle == null) {recvHandle = config().getRecvByteBufAllocator().newHandle();}return recvHandle; // 相当于缓存}}
io.netty.channel.socket.nio.NioSocketChannel
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {// sign_m_010 读取信道内容@Overrideprotected int doReadBytes(ByteBuf byteBuf) throws Exception {final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();allocHandle.attemptedBytesRead(byteBuf.writableBytes());/*** byteBuf 为 io.netty.buffer.PooledUnsafeDirectByteBuf 实例* sign_m_020 读取 Java 底层信道内容*/return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());}
}
io.netty.buffer.AbstractByteBuf
public abstract class AbstractByteBuf extends ByteBuf {// sign_m_020 读取底层信道内容@Overridepublic int writeBytes(ScatteringByteChannel in, int length) throws IOException {ensureWritable(length);int writtenBytes = setBytes(writerIndex, in, length); // sign_m_030 写入内容if (writtenBytes > 0) {writerIndex += writtenBytes; // 改写索引}return writtenBytes;}
}
io.netty.buffer.PooledByteBuf
abstract class PooledByteBuf<T> extends AbstractReferenceCountedByteBuf {// sign_m_030 写入内容@Overridepublic final int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {try {/*** internalNioBuffer(index, length) 返回 ByteBuffer 实例* 读取信道内容到 ByteBuffer 缓冲区*/return in.read(internalNioBuffer(index, length));} catch (ClosedChannelException ignored) {return -1;}}// 返回 ByteBuffer 实例@Overridepublic final ByteBuffer internalNioBuffer(int index, int length) {checkIndex(index, length);return _internalNioBuffer(index, length, false);}// 返回 ByteBuffer 实例final ByteBuffer _internalNioBuffer(int index, int length, boolean duplicate) {index = idx(index);ByteBuffer buffer = duplicate ? newInternalNioBuffer(memory) : internalNioBuffer();buffer.limit(index + length).position(index);return buffer;}// 返回内部 ByteBuffer 实例protected final ByteBuffer internalNioBuffer() {ByteBuffer tmpNioBuf = this.tmpNioBuf;if (tmpNioBuf == null) { // 内部 ByteBuffer 实例为空,则创建this.tmpNioBuf = tmpNioBuf = newInternalNioBuffer(memory); // sign_m_040 创建 ByteBuffer 实例} else {tmpNioBuf.clear();}return tmpNioBuf;}
}
io.netty.buffer.PooledUnsafeDirectByteBuf
final class PooledUnsafeDirectByteBuf extends PooledByteBuf<ByteBuffer> {// sign_m_040 创建 ByteBuffer 实例@Overrideprotected ByteBuffer newInternalNioBuffer(ByteBuffer memory) {/*** memory 为 java.nio.DirectByteBuffer 实例*/return memory.duplicate(); // 创建副本}
}
写原理
- 主要有两部分:
HeadContext#write()
写入消息到缓冲区HeadContext#flush()
推送消息到对端
写入
-
调用参考:简单示例-客户端 sign_u_001
- 调用点:
channel.write(msg); // sign_u_001 写入消息
- 调用点:
-
上游链处理参考:处理器链-写流程
-
io.netty.channel.DefaultChannelPipeline.HeadContext
- 最终到头节点写入
final class HeadContext extends AbstractChannelHandlerContextimplements ChannelOutboundHandler, ChannelInboundHandler {// sign_o_030 写入消息@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {/*** msg 为 io.netty.buffer.PooledUnsafeDirectByteBuf 实例* unsafe 为 io.netty.channel.socket.nio.NioSocketChannel.NioSocketChannelUnsafe 实例* sign_m_100 写入消息到信道*/unsafe.write(msg, promise);}}
io.netty.channel.AbstractChannel.AbstractUnsafe
protected abstract class AbstractUnsafe implements Unsafe {// sign_m_100 写入消息到信道@Overridepublic final void write(Object msg, ChannelPromise promise) {... // 省略校验int size;try {msg = filterOutboundMessage(msg); // sign_m_110 封装处理size = pipeline.estimatorHandle().size(msg);if (size < 0) {size = 0;}} ... // catchoutboundBuffer.addMessage(msg, size, promise); // 添加要发送的缓冲区}}
io.netty.channel.nio.AbstractNioByteChannel
public abstract class AbstractNioByteChannel extends AbstractNioChannel {// sign_m_110 封装处理@Overrideprotected final Object filterOutboundMessage(Object msg) {if (msg instanceof ByteBuf) {ByteBuf buf = (ByteBuf) msg;if (buf.isDirect()) {return msg; // msg 为 PooledUnsafeDirectByteBuf 实例,直接返回}return newDirectBuffer(buf);}... // 省略其他处理}
}
推送
-
参考:简单示例-客户端 sign_u_002
- 调用点:
channel.flush(); // sign_u_002 推送消息
- 调用点:
-
io.netty.channel.DefaultChannelPipeline.HeadContext
- 最终是头节点推送
final class HeadContext extends AbstractChannelHandlerContextimplements ChannelOutboundHandler, ChannelInboundHandler {@Overridepublic void flush(ChannelHandlerContext ctx) {/*** unsafe 为 io.netty.channel.socket.nio.NioSocketChannel.NioSocketChannelUnsafe 实例* sign_m_120 推送消息*/unsafe.flush();}}
io.netty.channel.AbstractChannel.AbstractUnsafe
protected abstract class AbstractUnsafe implements Unsafe {// sign_m_120 推送消息@Overridepublic final void flush() {... // 省略校验outboundBuffer.addFlush();flush0(); // sign_m_121 推送消息}// sign_m_121 推送消息 (子类 AbstractNioUnsafe 重写,做了校验,但最终还是调用此方法)protected void flush0() {... // 省略校验try {doWrite(outboundBuffer); // sign_m_130 写入信道} ... // catch finally }}
io.netty.channel.socket.nio.NioSocketChannel
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {// sign_m_130 写入信道@Overrideprotected void doWrite(ChannelOutboundBuffer in) throws Exception {SocketChannel ch = javaChannel();int writeSpinCount = config().getWriteSpinCount();do {... // 省略校验int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);int nioBufferCnt = in.nioBufferCount();switch (nioBufferCnt) {case 0: ... // 略case 1: {ByteBuffer buffer = nioBuffers[0];int attemptedBytes = buffer.remaining();final int localWrittenBytes = ch.write(buffer); // 调用 Java 底层信道写入消息... // 省略其他in.removeBytes(localWrittenBytes); // 清空已发送的--writeSpinCount;break;}default: ... // 略}} while (writeSpinCount > 0);... // 略}
}