Netty底层数据交互源码分析

文章目录

    • 1. 前题回顾
    • 2. 主线流程源码分析
    • 3. Netty底层的零拷贝
    • 4. ByteBuf内存池设计

书接上文

1. 前题回顾

上一篇博客我们分析了Netty服务端启动的底层原理,主要就是将EventLoop里面的线程注册到了Select中,然后调用select方法监听客户端连接,我们这里从这个EventLoop里面线程的run方法开始分析。

2. 主线流程源码分析

进入EventLoop的run方法:

@Overrideprotected void run() {for (;;) {try {try {switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {case SelectStrategy.CONTINUE:continue;case SelectStrategy.BUSY_WAIT:// fall-through to SELECT since the busy-wait is not supported with NIOcase SelectStrategy.SELECT:select(wakenUp.getAndSet(false));// 'wakenUp.compareAndSet(false, true)' is always evaluated// before calling 'selector.wakeup()' to reduce the wake-up// overhead. (Selector.wakeup() is an expensive operation.)//// However, there is a race condition in this approach.// The race condition is triggered when 'wakenUp' is set to// true too early.//// 'wakenUp' is set to true too early if:// 1) Selector is waken up between 'wakenUp.set(false)' and//    'selector.select(...)'. (BAD)// 2) Selector is waken up between 'selector.select(...)' and//    'if (wakenUp.get()) { ... }'. (OK)//// In the first case, 'wakenUp' is set to true and the// following 'selector.select(...)' will wake up immediately.// Until 'wakenUp' is set to false again in the next round,// 'wakenUp.compareAndSet(false, true)' will fail, and therefore// any attempt to wake up the Selector will fail, too, causing// the following 'selector.select(...)' call to block// unnecessarily.//// To fix this problem, we wake up the selector again if wakenUp// is true immediately after selector.select(...).// It is inefficient in that it wakes up the selector for both// the first case (BAD - wake-up required) and the second case// (OK - no wake-up required).if (wakenUp.get()) {selector.wakeup();}// fall throughdefault:}} catch (IOException e) {// If we receive an IOException here its because the Selector is messed up. Let's rebuild// the selector and retry. https://github.com/netty/netty/issues/8566rebuildSelector0();handleLoopException(e);continue;}cancelledKeys = 0;needsToSelectAgain = false;final int ioRatio = this.ioRatio;if (ioRatio == 100) {try {processSelectedKeys();} finally {// Ensure we always run tasks.runAllTasks();}} else {final long ioStartTime = System.nanoTime();try {processSelectedKeys();} finally {// Ensure we always run tasks.final long ioTime = System.nanoTime() - ioStartTime;runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}}} catch (Throwable t) {handleLoopException(t);}// Always handle shutdown even if the loop processing threw an exception.try {if (isShuttingDown()) {closeAll();if (confirmShutdown()) {return;}}} catch (Throwable t) {handleLoopException(t);}}}

而run方法这里会执行一个select(wakenUp.getAndSet(false));方法,我们进入该方法:

 private void select(boolean oldWakenUp) throws IOException {Selector selector = this.selector;try {int selectCnt = 0;long currentTimeNanos = System.nanoTime();long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);long normalizedDeadlineNanos = selectDeadLineNanos - initialNanoTime();if (nextWakeupTime != normalizedDeadlineNanos) {nextWakeupTime = normalizedDeadlineNanos;}for (;;) {long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;if (timeoutMillis <= 0) {if (selectCnt == 0) {selector.selectNow();selectCnt = 1;}break;}// If a task was submitted when wakenUp value was true, the task didn't get a chance to call// Selector#wakeup. So we need to check task queue again before executing select operation.// If we don't, the task might be pended until select operation was timed out.// It might be pended until idle timeout if IdleStateHandler existed in pipeline.if (hasTasks() && wakenUp.compareAndSet(false, true)) {selector.selectNow();selectCnt = 1;break;}int selectedKeys = selector.select(timeoutMillis);selectCnt ++;if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {// - Selected something,// - waken up by user, or// - the task queue has a pending task.// - a scheduled task is ready for processingbreak;}if (Thread.interrupted()) {// Thread was interrupted so reset selected keys and break so we not run into a busy loop.// As this is most likely a bug in the handler of the user or it's client library we will// also log it.//// See https://github.com/netty/netty/issues/2426if (logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely because " +"Thread.currentThread().interrupt() was called. Use " +"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");}selectCnt = 1;break;}long time = System.nanoTime();if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {// timeoutMillis elapsed without anything selected.selectCnt = 1;} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {// The code exists in an extra method to ensure the method is not too big to inline as this// branch is not very likely to get hit very frequently.selector = selectRebuildSelector(selectCnt);selectCnt = 1;break;}currentTimeNanos = time;}if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {if (logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",selectCnt - 1, selector);}}} catch (CancelledKeyException e) {if (logger.isDebugEnabled()) {logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",selector, e);}// Harmless exception - log anyway}}

上面代码核心的一句就是int selectedKeys = selector.select(timeoutMillis);,这里就调用了Nio中的Selector的select方法。假如现在客户端有连接事件来了,这个方法就会结束阻塞,然后回到run方法,执行下面代码:

if (ioRatio == 100) {try {processSelectedKeys();} finally {// Ensure we always run tasks.runAllTasks();}}

处理连接事件的核心函数就是processSelectedKeys();,我们进入该方法:

private void processSelectedKeys() {if (selectedKeys != null) {processSelectedKeysOptimized();} else {processSelectedKeysPlain(selector.selectedKeys());}}

如果select轮询到的事件selectedKeys不为空,就执行processSelectedKeysOptimized方法,我们进入该方法:

private void processSelectedKeysOptimized() {//遍历selectedKeysfor (int i = 0; i < selectedKeys.size; ++i) {final SelectionKey k = selectedKeys.keys[i];// null out entry in the array to allow to have it GC'ed once the Channel close// See https://github.com/netty/netty/issues/2363selectedKeys.keys[i] = null;final Object a = k.attachment();if (a instanceof AbstractNioChannel) {processSelectedKey(k, (AbstractNioChannel) a);} else {@SuppressWarnings("unchecked")NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;processSelectedKey(k, task);}if (needsToSelectAgain) {// null out entries in the array to allow to have it GC'ed once the Channel close// See https://github.com/netty/netty/issues/2363selectedKeys.reset(i + 1);selectAgain();i = -1;}}}

上面代码就是便利所有的selectionKey然后调用 processSelectedKey(k, (AbstractNioChannel) a);函数进行处理。我们进入该方法:

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();if (!k.isValid()) {final EventLoop eventLoop;try {eventLoop = ch.eventLoop();} catch (Throwable ignored) {// If the channel implementation throws an exception because there is no event loop, we ignore this// because we are only trying to determine if ch is registered to this event loop and thus has authority// to close ch.return;}// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is// still healthy and should not be closed.// See https://github.com/netty/netty/issues/5125if (eventLoop != this || eventLoop == null) {return;}// close the channel if the key is not valid anymoreunsafe.close(unsafe.voidPromise());return;}try {int readyOps = k.readyOps();// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise// the NIO JDK channel implementation may throw a NotYetConnectedException.if ((readyOps & SelectionKey.OP_CONNECT) != 0) {// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking// See https://github.com/netty/netty/issues/924int ops = k.interestOps();ops &= ~SelectionKey.OP_CONNECT;k.interestOps(ops);unsafe.finishConnect();}// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.if ((readyOps & SelectionKey.OP_WRITE) != 0) {// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to writech.unsafe().forceFlush();}// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead// to a spin loopif ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}}

上面方法就是处理事件的核心代码(注意我们现在是分析的客户端发来连接事件的场景)。上面方法是非常重要的,我们详细分析一下:

  1. 首先执行int readyOps = k.readyOps();函数这里就是拿到当前的事件类型
  2. 然后Netty就会判断事件的类型,判断当前的事件类型是读事件、写事件还是连接事件
if ((readyOps & SelectionKey.OP_WRITE) != 0) {// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to writech.unsafe().forceFlush();}// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead// to a spin loopif ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();}
  1. 由于我们这里分析的是客户端连接事件,即SelectionKey.OP_ACCEPT事件,所以这里我们会执行unsafe.read();方法我们进入该方法
//实现是NioMessageUnsafeprivate final List<Object> readBuf = new ArrayList<Object>();@Overridepublic void read() {assert eventLoop().inEventLoop();final ChannelConfig config = config();final ChannelPipeline pipeline = pipeline();final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();allocHandle.reset(config);boolean closed = false;Throwable exception = null;try {try {do {int localRead = doReadMessages(readBuf);if (localRead == 0) {break;}if (localRead < 0) {closed = true;break;}allocHandle.incMessagesRead(localRead);} while (allocHandle.continueReading());} catch (Throwable t) {exception = t;}int size = readBuf.size();for (int i = 0; i < size; i ++) {readPending = false;pipeline.fireChannelRead(readBuf.get(i));}readBuf.clear();allocHandle.readComplete();pipeline.fireChannelReadComplete();if (exception != null) {closed = closeOnReadError(exception);pipeline.fireExceptionCaught(exception);}if (closed) {inputShutdown = true;if (isOpen()) {close(voidPromise());}}} finally {// Check if there is a readPending which was not processed yet.// This could be for two reasons:// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method//// See https://github.com/netty/netty/issues/2254if (!readPending && !config.isAutoRead()) {removeReadOp();}}}

首先上面调用的第一个核心函数是doReadMessages(readBuf);,readBuf就是上面声明的一个List集合private final List<Object> readBuf = new ArrayList<Object>();按照这个方法的字面意思我们可以理解为来读客户端发过来的消息的。我们进入该方法:

//由于现在是连接事件,所以用NioServerSocektChannel这个实现类@Overrideprotected int doReadMessages(List<Object> buf) throws Exception {//这里就是获取了一个Nio里面的SocketChannel(这里就是NIO的代码,accept方法返回一个SocketChannel)SocketChannel ch = SocketUtils.accept(javaChannel());try {if (ch != null) {//下面代码首先将NIO原生的SocketChannel封装为了netty中的NioSocketChannel,然后添加到了前面的list集合中了buf.add(new NioSocketChannel(this, ch));return 1;}} catch (Throwable t) {logger.warn("Failed to create a new channel from an accepted socket.", t);try {ch.close();} catch (Throwable t2) {logger.warn("Failed to close a socket.", t2);}}return 0;}

上面代码的逻辑是,首先对于连接事件它这里首先获得了一个SocketChannel对象,然后将这个原生的SocketChannel对象封装成了一个NioSocketChannel对象,然后添加到了前面代码声明的List集合中了。这里我们看一下将SocketChannel对象封装为NioSocketChannel对象底层到底做了些什么事:

 public NioSocketChannel(Channel parent, SocketChannel socket) {super(parent, socket);config = new NioSocketChannelConfig(this, socket.socket());}
//进入super方法protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {//parent就是当前的NIOServerSocketChannel,SelectionKey标记为读事件super(parent, ch, SelectionKey.OP_READ);}protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent);//记录当前的channelthis.ch = ch;//记录这个channel所感兴趣的事件this.readInterestOp = readInterestOp;try {//然后将channel设置为非阻塞ch.configureBlocking(false);} catch (IOException e) {try {ch.close();} catch (IOException e2) {logger.warn("Failed to close a partially initialized socket.", e2);}throw new ChannelException("Failed to enter non-blocking mode.", e);}}//继续进入super方法protected AbstractChannel(Channel parent) {//指定父管道this.parent = parent;id = newId();unsafe = newUnsafe();//创建一个管道pipeline = newChannelPipeline();}

上面代码的流程就是将我们创建的SocketChannel封装为NioSocketChannel的过程,在这个过程中它记录了我们创建的Channel以及记录了Channel感兴趣的事件,以及创建了channel对应的管道(一种组合设计模式),但是并没有将Channel注册到Selector中。doReadMessages在这就执行完毕了,回到read方法:

public void read() {assert eventLoop().inEventLoop();final ChannelConfig config = config();final ChannelPipeline pipeline = pipeline();final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();allocHandle.reset(config);boolean closed = false;Throwable exception = null;try {try {do {int localRead = doReadMessages(readBuf);if (localRead == 0) {break;}if (localRead < 0) {closed = true;break;}allocHandle.incMessagesRead(localRead);} while (allocHandle.continueReading());} catch (Throwable t) {exception = t;}int size = readBuf.size();for (int i = 0; i < size; i ++) {readPending = false;pipeline.fireChannelRead(readBuf.get(i));}readBuf.clear();allocHandle.readComplete();pipeline.fireChannelReadComplete();if (exception != null) {closed = closeOnReadError(exception);pipeline.fireExceptionCaught(exception);}if (closed) {inputShutdown = true;if (isOpen()) {close(voidPromise());}}} finally {// Check if there is a readPending which was not processed yet.// This could be for two reasons:// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method//// See https://github.com/netty/netty/issues/2254if (!readPending && !config.isAutoRead()) {removeReadOp();}}}

然后执行下面的重要代码:

int size = readBuf.size();for (int i = 0; i < size; i ++) {readPending = false;pipeline.fireChannelRead(readBuf.get(i));}

首先这里就是readBuf也就是前面的list的大小(前面我们知道这里面存储的实际是NioSocketChannel,这个list大小就是客户端连接的数量),然后遍历所有的NioSocketChannel,然后调用pipeline.fireChannelRead方法。首先我们看看这个pipeline是什么:

final ChannelPipeline pipeline = pipeline();

思考这个pipeline是什么?

这个pipeline其实就是NioServerSocketChannelpipeline,所以这里调用的是ServerSocketChannel对应的fireChannelRead,也就是执行ServerSocketChannel对应的pipieline里面的Handler逻辑(调用channelread方法)。

那在这里调用做了什么是,这里需要回到上一篇博客,在服务端创建过程中,向其pipiline中加入了一个ServerBootStrapAcceptor的handler,所里这里执行的是这个handler

ch.eventLoop().execute(new Runnable() {@Overridepublic void run() {pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});

然后调用它的channelRead方法:

        @Override@SuppressWarnings("unchecked")//msg在这里就是前面创建的NioSocketChannelpublic void channelRead(ChannelHandlerContext ctx, Object msg) {//然后就获得了该ServerSocketChannel创建的子NioSocketChannelfinal Channel child = (Channel) msg;//然后获得子NioSocketChannel对应的pipeline(此时就是socketChannel对应的pipeline)//childHandler就是我们在写netty服务端程序时加入的一个通道初始化对象】/**.childHandler((ChannelInitializer)(ch)->{//对workerGroup的SocketChannel设置处理器ch.pipeline().addLast(new NettyServerHandler());});**///这里的代码就是向socketchannel的pipeline中加入了通道初始化对象的handlerchild.pipeline().addLast(childHandler);setChannelOptions(child, childOptions, logger);setAttributes(child, childAttrs);try {//childGroup就是初始化netty服务端事创建的workEventLoopGroupchildGroup.register(child).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {forceClose(child, future.cause());}}});} catch (Throwable t) {forceClose(child, t);}}

childGroup.register(child)就将socketChannel注册到了WorkEventLoopGroup中,这里的逻辑和BooEventLoopGroup注册ServerSocketChannel中的逻辑是一样的(底层主要是将channel注册到selector中)。我们进入该方法:

//MultithreadEventLoopGroup的方法@Overridepublic ChannelFuture register(Channel channel) {return next().register(channel);}

next就是获得workerEventLoopGroup的下一个EventLoopGroup,然后调用EventLoopGroupregister方法。我们进入该方法:

 @Overridepublic ChannelFuture register(Channel channel) {return register(new DefaultChannelPromise(channel, this));}

然后这里的逻辑和ServerSocketChennel的注册逻辑是几乎一样的,这里我们知道在NioServerSocketChannel会被注册到对应的EventLoop的Selector上。这里同样是需要把NioSocketChannel注册到某个EventLoop的Selector上,这个逻辑就是在该方法中实现的。

在这里插入图片描述
到此SocketChannel的创建和注册就分析完了。现在Netty客户端就可以向服务端发送数据了,现在我们开始分析Netty服务端是如何处理这个流程的。这里我们同样需要回到上面的processSelectedKey方法。

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();if (!k.isValid()) {final EventLoop eventLoop;try {eventLoop = ch.eventLoop();} catch (Throwable ignored) {// If the channel implementation throws an exception because there is no event loop, we ignore this// because we are only trying to determine if ch is registered to this event loop and thus has authority// to close ch.return;}// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is// still healthy and should not be closed.// See https://github.com/netty/netty/issues/5125if (eventLoop != this || eventLoop == null) {return;}// close the channel if the key is not valid anymoreunsafe.close(unsafe.voidPromise());return;}try {int readyOps = k.readyOps();// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise// the NIO JDK channel implementation may throw a NotYetConnectedException.if ((readyOps & SelectionKey.OP_CONNECT) != 0) {// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking// See https://github.com/netty/netty/issues/924int ops = k.interestOps();ops &= ~SelectionKey.OP_CONNECT;k.interestOps(ops);unsafe.finishConnect();}// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.if ((readyOps & SelectionKey.OP_WRITE) != 0) {// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to writech.unsafe().forceFlush();}// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead// to a spin loopif ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}

然后这里读事件是OP_READ事件,所以这里还是要调用unsafe.read();,继续进入read方法(实现类现在是NioByteUnsafe类):

     @Overridepublic final void read() {final ChannelConfig config = config();if (shouldBreakReadReady(config)) {clearReadPending();return;}final ChannelPipeline pipeline = pipeline();final ByteBufAllocator allocator = config.getAllocator();final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();allocHandle.reset(config);ByteBuf byteBuf = null;boolean close = false;try {do {//首先分配一个byteBufbyteBuf = allocHandle.allocate(allocator);//将收到的数据读入ByteBufallocHandle.lastBytesRead(doReadBytes(byteBuf));if (allocHandle.lastBytesRead() <= 0) {// nothing was read. release the buffer.byteBuf.release();byteBuf = null;close = allocHandle.lastBytesRead() < 0;if (close) {// There is nothing left to read as we received an EOF.readPending = false;}break;}allocHandle.incMessagesRead(1);readPending = false;pipeline.fireChannelRead(byteBuf);byteBuf = null;} while (allocHandle.continueReading());allocHandle.readComplete();pipeline.fireChannelReadComplete();if (close) {closeOnRead(pipeline);}} catch (Throwable t) {handleReadException(pipeline, byteBuf, t, close, allocHandle);} finally {// Check if there is a readPending which was not processed yet.// This could be for two reasons:// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method//// See https://github.com/netty/netty/issues/2254if (!readPending && !config.isAutoRead()) {removeReadOp();}}}}

上面首先 byteBuf = allocHandle.allocate(allocator);这就生成了一个ByteBuf(我们知道Netty底层数据交互都是给予ByteBuf实现的,这句代码底层涉及了0拷贝,我们这里后面在分析)。 allocHandle.lastBytesRead(doReadBytes(byteBuf));这句代码就将收到的客户端数据读取到了byteBuf中,首先我们先看doReadBytes(byteBuf)函数:

  @Overrideprotected int doReadBytes(ByteBuf byteBuf) throws Exception {final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();allocHandle.attemptedBytesRead(byteBuf.writableBytes());return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());}

byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());这里就是将channel中的数据,写到ByteBuf中。回到read()方法。下面就是调用pipeline.fireChannelRead(byteBuf);这里就是将ByteBuf中的数据经过pipeline中所有的Handler进行处理。到此,结合上一部分的源码分析,到此Netty所有的主线源码就分析完了。

3. Netty底层的零拷贝

了解Netty的零拷贝之前,我们需要对Netty的直接内存有所了解,直接内存并不是虚拟机运行时数据区的一部分,也不是Java虚拟机规范中定义的内存区域,某些情况下这部分内存也会被频繁的使用,而且也可能导致OOM问题,Java里面的DirectByteBuffer可以分配一块直接内存(堆外内存),元空间对应的内存也被称为直接内存,它们对应的是机器的物理内存。

ByteBuf类有两个方法,一个是allocate方法,它分配的内存就是直接在Java堆上面分配的,另一个方法是allocateDirect方法,它就是分配的直接内存。allocate我们可以想到它底层一定是new了一个数组,我们重点分析allocateDirect底层在干什么,我们进入该方法:

public static ByteBuffer allocateDirect(int capacity) {return new DirectByteBuffer(capacity);}

可以发现它底层就是创建了一个NIO的DirectByteBuffer对象。我们继续进入该对象:

DirectByteBuffer(int cap) {                   // package-privatesuper(-1, 0, cap, cap);boolean pa = VM.isDirectMemoryPageAligned();int ps = Bits.pageSize();long size = Math.max(1L, (long)cap + (pa ? ps : 0));Bits.reserveMemory(size, cap);long base = 0;try {base = unsafe.allocateMemory(size);} catch (OutOfMemoryError x) {Bits.unreserveMemory(size, cap);throw x;}unsafe.setMemory(base, size, (byte) 0);if (pa && (base % ps != 0)) {// Round up to page boundaryaddress = base + ps - (base & (ps - 1));} else {address = base;}cleaner = Cleaner.create(this, new Deallocator(base, size, cap));att = null;}

base = unsafe.allocateMemory(size);这里就是真正分配直接内存的逻辑,allocateMemory是一个本地方法,它底层就是用malloc分配了一块内存。又遇见创建的是堆外内存,我们创建的DirectByteBuffer是有这块堆外内存的引用的,当方法执行完毕,DirectByteBuffer会被JVM回收掉,所以对堆外内存的引用也会回收掉,而这个堆外内存也会被回收掉。

使用直接内存的优缺点如下:

  • 优点:不占用堆空间,减少了发生GC的可能,java虚拟机上,本地IO会直接操作直接内存(直接内存->系统调用->硬盘/网卡),而非直接内存则需要二次拷贝(堆内存->直接内存->系统调用->硬盘/网卡)
  • 缺点:初始化分配比较慢,没有JVM直接帮助管理内存,容易发生内存溢出。为例避免一致没有FULL GC,最终导致直接内存把物理内存耗完了。(我们可以通过-XX:MaxDirectMemorySize来指定直接内存的大小,当达到阈值是,会调用system.gc()来进行一次FULL GC,简介把那些没有被使用的直接内存回收掉)

在这里插入图片描述
对于上图,上面一部分不使用直接内存,首先客户端client发送数据过来,socket缓存区接受到数据,然后操作系统会把Socket缓存区的数据拷贝到直接内存(DMA控制器),这是第一次拷贝,然后JDK把直接内存的数据拷贝到堆内存,这是第二次拷贝。对于操作直接内存的就只需要前面的第一次拷贝即可。

Netty的接收和发送ByteBuf采用Direct BUFFERS,使用堆外直接内存进行Socket读写,不需要进行字节缓冲区的二次拷贝。 我们回到Read方法,解决上面遗留的点:

byteBuf = allocHandle.allocate(allocator)

进入allocate方法:

  @Overridepublic ByteBuf allocate(ByteBufAllocator alloc) {return alloc.ioBuffer(guess());}@Overridepublic ByteBuf ioBuffer(int initialCapacity) {if (PlatformDependent.hasUnsafe() || isDirectBufferPooled()) {return directBuffer(initialCapacity);}return heapBuffer(initialCapacity);}

可以看到了 directBuffer(initialCapacity);,可以看出netty默认使用的是直接内存。

4. ByteBuf内存池设计

随着JVM虚拟机和JIT即时编译技术的发展,对象的分配和回收是个非常轻量的工作。但是对于缓存区Buffer(相当于一个内存块),情况缺稍有不同,特别是对于堆外内存直接内存的分配和回收,是一件非常耗时的操作。为了尽量避免重用缓存区,Netty提供了基于ByteBuf内存池的缓存区重用机制。需要的时候直接从池值中获取ByteBuf就行,使用完毕后会放回池子中。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/9696.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深入了解模拟和存根:提高单元测试质量的关键技术

一、引言 在进行单元测试时&#xff0c;我们经常会遇到对外部资源的依赖&#xff0c;如数据库、网络接口等。模拟&#xff08;Mocking&#xff09;和存根&#xff08;Stubbing&#xff09;是两种帮助我们模拟这些外部资源&#xff0c;使我们能够在隔离环境中测试单元的方法。在…

分布式任务调度框架xxl-job使用手册

官网地址和文档地址&#xff1a;https://www.xuxueli.com/xxl-job/ 一、快速入门 1.1 下载源码 https://github.com/xuxueli/xxl-job https://gitee.com/xuxueli0323/xxl-job 下载完成后有以下模块 1.2 初始化数据库 官方指定mysql8.0&#xff0c;但我是mysql5.7 执行/xxl…

PyQt6--Python桌面开发(6.QLineEdit单行文本框)

QLineEdit单行文本框 import sys import time from PyQt6.QtGui import QValidator,QIntValidator from PyQt6.QtWidgets import QApplication,QLabel,QLineEdit from PyQt6 import uicif __name__ __main__:appQApplication(sys.argv)uiuic.loadUi("./QLine单行文本框.u…

Qt 6.7功能介绍

Qt 6.7为我们所有喜欢在构建现代应用程序和用户体验时获得乐趣的人提供了许多大大小小的改进。一些新增内容作为技术预览发布&#xff0c;接下来我们就一起来看看吧&#xff1a; 将C20与Qt一起使用 对于许多编译器工具链来说&#xff0c;C20仍然是可选的和实验性的&#xff0c;…

台服dnf局域网搭建,学习用笔记

台服dnf局域网搭建 前置条件虚拟机初始化上传安装脚本以及其他文件至虚拟机密钥publickey.pem客户端配置如果IP地址填写有误&#xff0c;批量修改IP地址 前置条件 安装有vmvarecentos7.6镜像&#xff1a;https://mirrors.tuna.tsinghua.edu.cn/centos-vault/7.6.1810/isos/x86…

Python注意事项【自我维护版】

各位大佬好 &#xff0c;这里是阿川的博客 &#xff0c; 祝您变得更强 个人主页&#xff1a;在线OJ的阿川 大佬的支持和鼓励&#xff0c;将是我成长路上最大的动力 阿川水平有限&#xff0c;如有错误&#xff0c;欢迎大佬指正 本篇博客在之前的博客上进行的维护 创建Python…

Day7 字符串和常用数据结构

文章目录 字符串和常用数据结构使用字符串使用列表生成式和生成器使用元组使用集合使用字典练习练习1&#xff1a;在屏幕上显示跑马灯文字。练习2&#xff1a;设计一个函数产生指定长度的验证码&#xff0c;验证码由大小写字母和数字构成。练习3&#xff1a;设计一个函数返回给…

linux下使用jexus部署aspnet站点

1.运行环境 Centos 7 安装dos2unix工具 yum install dos2unix 安装jexus curl https://jexus.org/release/x64/install.sh|sudo sh2.网站部署 2.1. 将windows下的网站发布包Msc_qingdao_admin.zip上传到linux中&#xff0c; 然后解压后放入/var/www(没有则创建)目录下 r…

福昕PDF阅读器取消手型工具鼠标点击翻页

前言&#xff1a; 本文介绍如何关闭福昕PDF阅读器取消手型工具鼠标点击翻页&#xff0c;因为这样真的很容易误触发PDF翻页&#xff0c;使用起来让人窝火。 引用&#xff1a; NA 正文&#xff1a; 新版的福昕PDF阅读器默认打开了“使用手型工具阅读文章”这个勾选项&#x…

超全MySQL锁机制介绍

前言 MySQL作为关系型数据库管理系统中的佼佼者&#xff0c;为了保证数据的一致性和完整性&#xff0c;在并发控制方面采用了锁机制。锁机制是数据库管理系统用于控制对共享资源的访问&#xff0c;避免多个事务同时修改同一数据造成的数据不一致问题。了解MySQL的锁机制对于数…

中信证券:量子产业蓄势待发,看好相关投资机会!

在1994年&#xff0c;数学家彼得肖尔&#xff08;Peter Shor&#xff09;首次提出了现在广为人知的肖尔算法&#xff0c;那时许多人认为量子计算机的概念遥不可及、纯属幻想。然而&#xff0c;到了2024年&#xff0c;全球正深入探讨量子科技在现实世界的应用&#xff0c;以及所…

pytorch技术栈

张量&#xff08;Tensors&#xff09;&#xff1a;PyTorch的核心数据结构&#xff0c;用于存储和操作多维数组。 自动微分&#xff08;Autograd&#xff09;&#xff1a;PyTorch的自动微分引擎&#xff0c;可以自动计算梯度&#xff0c;这对于训练神经网络至关重要。 数据加载…

Git 如何管理标签命令(tag)

1.查看本地仓库tag --1.查看本地仓库tag UserDESKTOP-2NRT2ST MINGW64 /e/GITROOT/STARiBOSS/STARiBOSS-5GCA (gw_frontend_master) $ git tag 1stBossUpgrade V10.0.1_20220224_test V10.0.1_20220301_test tag-gwfrontend-V1.0.12-230625 tag-gw_frontend-23.08.29 tag-gw_f…

45.乐理基础-音符的组合方式-复附点

复附点&#xff1a; 复附点顾名思义就是两个附点 复附点表示的音符&#xff0c;有多少拍&#xff1f;下面拿 复附点四分音符举例&#xff0c;可以把整个音符看成三部分&#xff0c;第一部分是原本的四分音符&#xff0c;第二部分是第一个附点&#xff0c;第三部分是第二个附点&…

vue cmd执行报错 ‘vue‘ 不是内部或外部命令

使用vue脚手架快速搭建项目&#xff0c;在cmd中执行&#xff1a;vue init webpack vue-demo&#xff0c;报错&#xff1a; vue 不是内部或外部命令,也不是可运行的程序 或批处理文件。 解决方法&#xff0c;执行如下的命令 npm config list 注意&#xff1a;找到prefix等号后…

python之并发编程

python之并发编程 线程的创建方式线程的创建方式(方法包装)线程的创建方式(类包装)join()【让主线程等待子线程结束】守护线程【主线程结束&#xff0c;子线程就结束】 锁多线程操作同一个对象(未使用线程同步)多线程操作同一个对象(增加互斥锁&#xff0c;使用线程同步)死锁案…

ChatGLM 本地部署指南(问题解决)

硬件要求&#xff08;模型推理&#xff09;&#xff1a; INT4 &#xff1a; RTX3090*1&#xff0c;显存24GB&#xff0c;内存32GB&#xff0c;系统盘200GB 如果你没有 GPU 硬件的话&#xff0c;也可以在 CPU 上进行推理&#xff0c;但是推理速度会更慢。 模型微调硬件要求更高。…

【双碳系列】碳中和、碳排放、温室气体、弹手指、碳储量、碳循环及leap、cge、dice、openLCA模型

气候变化是当前人类生存和发展所面临的共同挑战&#xff0c;受到世界各国人民和政府的高度关注 ①“双碳”目标下资源环境中的可计算一般均衡&#xff08;CGE&#xff09;模型实践技术应用 可计算一般均衡模型&#xff08;CGE模型&#xff09;由于其能够模拟宏观经济系统运行…

在论文写作中使用 LaTeX 生成算法伪代码

最近在论文写作中&#xff0c;我需要表示算法的逻辑。由于 Word 没有较好的模板&#xff0c;因此我选择使用 LaTeX 来生成算法伪代码&#xff0c;然后将其截图或转换为 SVG 格式&#xff0c;贴入论文中。 关于 LaTeX 的伪代码写作技巧&#xff0c;可以参考这篇文章&#xff1a…

OpenBayes 一周速览|Apple 开源大模型 OpenELM 上线;字节发布 COCONut 首个全景图像分割数据集,入选 CVPR2024

公共资源速递 This Weekly Snapshots &#xff01; 5 个数据集&#xff1a; * COCONut 大规模图像分割数据集 * THUCNews 新闻数据集 * DuConv 对话数据集 * 安徽电信知道问答数据集 * Sentiment Analysis 中文情感分析数据集 2 个模型&#xff1a; * OpenELM-3B-Inst…