【从入门到放弃-Java】并发编程-NIO-Channel

前言

上篇[【从入门到放弃-Java】并发编程-NIO使用]()简单介绍了nio的基础使用,本篇将深入源码分析nio中channel的实现。

简介

channel即通道,可以用来读、写数据,它是全双工的可以同时用来读写操作。这也是它与stream流的最大区别。

channel需要与buffer配合使用,channel通道的一端是buffer,一端是数据源实体,如文件、socket等。在nio中,通过channel的不同实现来处理 不同实体与数据buffer中的数据传输。

channel接口:

package java.nio.channels;import java.io.IOException;
import java.io.Closeable;/*** A nexus for I/O operations.** <p> A channel represents an open connection to an entity such as a hardware* device, a file, a network socket, or a program component that is capable of* performing one or more distinct I/O operations, for example reading or* writing.** <p> A channel is either open or closed.  A channel is open upon creation,* and once closed it remains closed.  Once a channel is closed, any attempt to* invoke an I/O operation upon it will cause a {@link ClosedChannelException}* to be thrown.  Whether or not a channel is open may be tested by invoking* its {@link #isOpen isOpen} method.** <p> Channels are, in general, intended to be safe for multithreaded access* as described in the specifications of the interfaces and classes that extend* and implement this interface.*** @author Mark Reinhold* @author JSR-51 Expert Group* @since 1.4*/public interface Channel extends Closeable {/*** Tells whether or not this channel is open.** @return <tt>true</tt> if, and only if, this channel is open*/public boolean isOpen();/*** Closes this channel.** <p> After a channel is closed, any further attempt to invoke I/O* operations upon it will cause a {@link ClosedChannelException} to be* thrown.** <p> If this channel is already closed then invoking this method has no* effect.** <p> This method may be invoked at any time.  If some other thread has* already invoked it, however, then another invocation will block until* the first invocation is complete, after which it will return without* effect. </p>** @throws  IOException  If an I/O error occurs*/public void close() throws IOException;}

常见的channel实现有:

  • FileChannel:文件读写数据通道
  • SocketChannel:TCP读写网络数据通道
  • ServerSocketChannel:服务端网络数据读写通道,可以监听TCP连接。对每一个新进来的连接都会创建一个SocketChannel。
  • DatagramChannel:UDP读写网络数据通道

FileChannel


FileChannel是一个抽象类,它继承了AbstractInterruptibleChannel类,并实现了 SeekableByteChannel, GatheringByteChannel, ScatteringByteChannel接口。
具体的实现类主要是sun.nio.ch.FileChannelImpl。下面详细分析下FileChannelImpl中每个方法的具体实现。

open

private FileChannelImpl(FileDescriptor var1, String var2, boolean var3, boolean var4, boolean var5, Object var6) {//主要记载操作系统维护的文件描述符this.fd = var1;//是否可读this.readable = var3;//是否可写this.writable = var4;//是否以追加的方式打开this.append = var5;this.parent = var6;this.path = var2;//底层使用native的read和write来处理文件的this.nd = new FileDispatcherImpl(var5);
}//FileInputStream::getChannel 调用 FileChannelImpl.open(fd, path, true, false, this) 获取只读channel
public static FileChannel open(FileDescriptor var0, String var1, boolean var2, boolean var3, Object var4) {return new FileChannelImpl(var0, var1, var2, var3, false, var4);
}//FileOutputStream::getChannel 调用 FileChannelImpl.open(fd, path, false, true, append, this) 获取只写channel
public static FileChannel open(FileDescriptor var0, String var1, boolean var2, boolean var3, boolean var4, Object var5) {return new FileChannelImpl(var0, var1, var2, var3, var4, var5);
}
private FileChannelImpl(FileDescriptor fd, String path, boolean readable,boolean writable, boolean direct, Object parent)
{this.fd = fd;//是否可读this.readable = readable;//是否可写this.writable = writable;//对于从流创建的channel,在结束时要做不同的清理动作,(openJDK中才有,sun的jdk中没有)this.parent = parent;//源文件的paththis.path = path;//是否使用DirectIOthis.direct = direct;this.nd = new FileDispatcherImpl();if (direct) {assert path != null;this.alignment = nd.setDirectIO(fd, path);} else {this.alignment = -1;}//当parent不存在时,则注册一个cleaner,否则交由parent做清理动作。// Register a cleaning action if and only if there is no parent// as the parent will take care of closing the file descriptor.// FileChannel is used by the LambdaMetaFactory so a lambda cannot// be used here hence we use a nested class instead.this.closer = parent != null ? null :CleanerFactory.cleaner().register(this, new Closer(fd));
}// Used by FileInputStream.getChannel(), FileOutputStream.getChannel
// and RandomAccessFile.getChannel()
public static FileChannel open(FileDescriptor fd, String path,boolean readable, boolean writable,boolean direct, Object parent)
{return new FileChannelImpl(fd, path, readable, writable, direct, parent);
}
  • open方法主要是返回一个新new的FileChannelImpl对象,初始化时设置fileDescriptor、readable、writable、append、parent、path等属性,看变量名很容易理解,在此不赘述变量含义。

read

//实现自SeekableByteChannel接口的方法,将文件中的内容读取到给定的byteBuffer中
public int read(ByteBuffer dst) throws IOException {//保证读写时,channel处于开启状态ensureOpen();//判断是否可读if (!readable)throw new NonReadableChannelException();synchronized (positionLock) {if (direct)Util.checkChannelPositionAligned(position(), alignment);int n = 0;int ti = -1;try {//开始阻塞,并注册为Interruptible,可以被中断beginBlocking();//将当前线程添加到NativeThreadSet中,并返回索引,方便后续操作。//NativeThreadSet是一个线程安全的本地线程集合,方便管理,用来发送信号ti = threads.add();if (!isOpen())return 0;do {//当未被系统中断(即读取完毕)或channel未被关闭,则一直读,将内容写入到byteBuffer(dst)中n = IOUtil.read(fd, dst, -1, direct, alignment, nd);} while ((n == IOStatus.INTERRUPTED) && isOpen());return IOStatus.normalize(n);} finally {//把当前线程从set中移出threads.remove(ti);//结束,释放锁endBlocking(n > 0);assert IOStatus.check(n);}}
}//实现自ScatteringByteChannel接口的方法,将文件中的内容依次读取到给定的byteBuffer数组中。
public long read(ByteBuffer[] dsts, int offset, int length)throws IOException
{if ((offset < 0) || (length < 0) || (offset > dsts.length - length))throw new IndexOutOfBoundsException();//保证读写时,channel处于开启状态ensureOpen();//判断是否可读if (!readable)throw new NonReadableChannelException();synchronized (positionLock) {if (direct)Util.checkChannelPositionAligned(position(), alignment);long n = 0;int ti = -1;try {//开始阻塞,并注册为Interruptible,可以被中断beginBlocking();//将当前线程添加到NativeThreadSet中,并返回索引,方便后续操作。//NativeThreadSet是一个线程安全的本地线程集合,方便管理,用来发送信号ti = threads.add();if (!isOpen())return 0;do {//当未被系统中断(即读取完毕)或channel未被关闭,则一直读,将内容写入到byteBuffer(dst)中n = IOUtil.read(fd, dsts, offset, length,direct, alignment, nd);} while ((n == IOStatus.INTERRUPTED) && isOpen());return IOStatus.normalize(n);} finally {//把当前线程从set中移出threads.remove(ti);//结束,释放锁endBlocking(n > 0);assert IOStatus.check(n);}}
}

write

//实现自SeekableByteChannel接口的方法,将byteBuffer中的内容写入到文件中
public int write(ByteBuffer src) throws IOException {//保证写时,channel处于开启状态ensureOpen();//判断是否可写if (!writable)throw new NonWritableChannelException();synchronized (positionLock) {if (direct)Util.checkChannelPositionAligned(position(), alignment);int n = 0;int ti = -1;try {//开始阻塞,并注册为Interruptible,可以被中断beginBlocking();//将当前线程添加到NativeThreadSet中,并返回索引,方便后续操作。//NativeThreadSet是一个线程安全的本地线程集合,方便管理,用来发送信号ti = threads.add();if (!isOpen())return 0;do {//当未被系统中断(即写入完毕)或channel未被关闭,则一直写,将内容写入到文件中n = IOUtil.write(fd, src, -1, direct, alignment, nd);} while ((n == IOStatus.INTERRUPTED) && isOpen());return IOStatus.normalize(n);} finally {//把当前线程从set中移出threads.remove(ti);//结束,释放锁assert IOStatus.check(n);}}
}//实现自GatheringByteChannel接口的方法,将byteBuffer数组中的内容依次写入到文件中
public long write(ByteBuffer[] srcs, int offset, int length)throws IOException
{if ((offset < 0) || (length < 0) || (offset > srcs.length - length))throw new IndexOutOfBoundsException();//保证写时,channel处于开启状态ensureOpen();//判断是否可写if (!writable)throw new NonWritableChannelException();synchronized (positionLock) {if (direct)Util.checkChannelPositionAligned(position(), alignment);long n = 0;int ti = -1;try {//开始阻塞,并注册为Interruptible,可以被中断beginBlocking();//将当前线程添加到NativeThreadSet中,并返回索引,方便后续操作。//NativeThreadSet是一个线程安全的本地线程集合,方便管理,用来发送信号ti = threads.add();if (!isOpen())return 0;do {//当未被系统中断(即写入完毕)或channel未被关闭,则一直写,将内容写入到文件中n = IOUtil.write(fd, srcs, offset, length,direct, alignment, nd);} while ((n == IOStatus.INTERRUPTED) && isOpen());return IOStatus.normalize(n);} finally {//把当前线程从set中移出threads.remove(ti);//结束,释放锁assert IOStatus.check(n);}}
}

position

//实现自SeekableByteChannel接口的方法,获取当前channel的position
public long position() throws IOException {ensureOpen();synchronized (positionLock) {long p = -1;int ti = -1;try {beginBlocking();ti = threads.add();if (!isOpen())return 0;boolean append = fdAccess.getAppend(fd);do {//append模式下,position在channel的末尾// in append-mode then position is advanced to end before writingp = (append) ? nd.size(fd) : nd.seek(fd, -1);} while ((p == IOStatus.INTERRUPTED) && isOpen());return IOStatus.normalize(p);} finally {threads.remove(ti);endBlocking(p > -1);assert IOStatus.check(p);}}
}//实现自SeekableByteChannel接口的方法,设置当前channel的position为newPosition
public FileChannel position(long newPosition) throws IOException {ensureOpen();if (newPosition < 0)throw new IllegalArgumentException();synchronized (positionLock) {long p = -1;int ti = -1;try {beginBlocking();ti = threads.add();if (!isOpen())return null;do {//设置当前position为newPositionp = nd.seek(fd, newPosition);} while ((p == IOStatus.INTERRUPTED) && isOpen());return this;} finally {threads.remove(ti);endBlocking(p > -1);assert IOStatus.check(p);}}
}

size

实现自SeekableByteChannel接口的方法,返回当前实体(文件)的大小

truncate

实现自SeekableByteChannel接口的方法,用来截取文件至newSize大小

force

实现自SeekableByteChannel接口的方法,用来将channel中尚未写入磁盘的数据强制落盘

transferTo

将fileChannel中的数据传递至另一个channel

transferFrom

从其它channel读取数据至fileChannel

SocketChannel

open

/*** Opens a socket channel.** <p> The new channel is created by invoking the {@link* java.nio.channels.spi.SelectorProvider#openSocketChannel* openSocketChannel} method of the system-wide default {@link* java.nio.channels.spi.SelectorProvider} object.  </p>** @return  A new socket channel** @throws  IOException*          If an I/O error occurs*/
public static SocketChannel open() throws IOException {return SelectorProvider.provider().openSocketChannel();
}

open方法是调用SelectorProvider中实现了java.nio.channels.spi.SelectorProvider#openSocketChannel的方法,底层实际是new SocketChannelImpl,调用native方法创建socket

connect

public boolean connect(SocketAddress sa) throws IOException {//校验Address是否合法InetSocketAddress isa = Net.checkAddress(sa);//获取系统安全管理器SecurityManager sm = System.getSecurityManager();if (sm != null)//校验IP和端口是否被允许连接sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());InetAddress ia = isa.getAddress();//如果是本机地址,则获取本机的hostif (ia.isAnyLocalAddress())ia = InetAddress.getLocalHost();try {//加读锁readLock.lock();try {//加写锁writeLock.lock();try {int n = 0;//是否阻塞boolean blocking = isBlocking();try {//开启connect前的校验并设置为ST_CONNECTIONPENDING,如果blocking是true 即阻塞模式,则记录当前线程的ID,以便接收信号处理。beginConnect(blocking, isa);do {//调用native connect方法n = Net.connect(fd, ia, isa.getPort());} while (n == IOStatus.INTERRUPTED && isOpen());} finally {//结束连接endConnect(blocking, (n > 0));}assert IOStatus.check(n);return n > 0;} finally {//释放写锁writeLock.unlock();}} finally {//释放读锁readLock.unlock();}} catch (IOException ioe) {// connect failed, close the channelclose();throw SocketExceptions.of(ioe, isa);}
}

configureBlocking

实现自SelectableChannel的接口方法,调用native方法设置socket的阻塞状态

register

在AbstractSelectableChannel中定义,注册要监听的事件。

public final SelectionKey register(Selector sel, int ops, Object att)throws ClosedChannelException
{if ((ops & ~validOps()) != 0)throw new IllegalArgumentException();if (!isOpen())throw new ClosedChannelException();synchronized (regLock) {if (isBlocking())throw new IllegalBlockingModeException();synchronized (keyLock) {// re-check if channel has been closedif (!isOpen())throw new ClosedChannelException();SelectionKey k = findKey(sel);if (k != null) {k.attach(att);k.interestOps(ops);} else {// 向Selector中注册事件// New registrationk = ((AbstractSelector)sel).register(this, ops, att);addKey(k);}return k;}}
}

read

//实现自ReadableByteChannel接口的方法,从socket中读取数据至ByteBuffer
@Override
public int read(ByteBuffer buf) throws IOException {Objects.requireNonNull(buf);readLock.lock();try {boolean blocking = isBlocking();int n = 0;try {//检查channel是否开启并已经是connected的状态。如果blocking是true 即阻塞模式,则记录当前线程的ID,以便接收信号处理。beginRead(blocking);// check if input is shutdownif (isInputClosed)return IOStatus.EOF;//如果是阻塞模式,则一直读取直到数据读取完毕;非阻塞模式则直接调用native方法不需要等待。if (blocking) {do {n = IOUtil.read(fd, buf, -1, nd);} while (n == IOStatus.INTERRUPTED && isOpen());} else {n = IOUtil.read(fd, buf, -1, nd);}} finally {endRead(blocking, n > 0);if (n <= 0 && isInputClosed)return IOStatus.EOF;}return IOStatus.normalize(n);} finally {readLock.unlock();}
}//实现自ScatteringByteChannel接口的方法,从socket中依次读取数据至ByteBuffer数组
@Override
public long read(ByteBuffer[] dsts, int offset, int length)throws IOException
{Objects.checkFromIndexSize(offset, length, dsts.length);readLock.lock();try {boolean blocking = isBlocking();long n = 0;try {beginRead(blocking);// check if input is shutdownif (isInputClosed)return IOStatus.EOF;//如果是阻塞模式,则一直读取直到数据读取完毕;非阻塞模式则直接调用native方法不需要等待。if (blocking) {do {n = IOUtil.read(fd, dsts, offset, length, nd);} while (n == IOStatus.INTERRUPTED && isOpen());} else {n = IOUtil.read(fd, dsts, offset, length, nd);}} finally {endRead(blocking, n > 0);if (n <= 0 && isInputClosed)return IOStatus.EOF;}return IOStatus.normalize(n);} finally {readLock.unlock();}
}

write

//实现自ReadableByteChannel接口的方法,将ByteBuffer中的数据写入socket
@Override
public int write(ByteBuffer buf) throws IOException {Objects.requireNonNull(buf);writeLock.lock();try {boolean blocking = isBlocking();int n = 0;try {beginWrite(blocking);//如果是阻塞模式,则一直读取直到数据读取完毕;非阻塞模式则直接调用native方法不需要等待。if (blocking) {do {n = IOUtil.write(fd, buf, -1, nd);} while (n == IOStatus.INTERRUPTED && isOpen());} else {n = IOUtil.write(fd, buf, -1, nd);}} finally {endWrite(blocking, n > 0);if (n <= 0 && isOutputClosed)throw new AsynchronousCloseException();}return IOStatus.normalize(n);} finally {writeLock.unlock();}
}@Override
public long write(ByteBuffer[] srcs, int offset, int length)throws IOException
{Objects.checkFromIndexSize(offset, length, srcs.length);writeLock.lock();try {boolean blocking = isBlocking();long n = 0;try {beginWrite(blocking);//如果是阻塞模式,则一直等待直到数据写入完毕;非阻塞模式则直接调用native方法不需要等待。if (blocking) {do {n = IOUtil.write(fd, srcs, offset, length, nd);} while (n == IOStatus.INTERRUPTED && isOpen());} else {n = IOUtil.write(fd, srcs, offset, length, nd);}} finally {endWrite(blocking, n > 0);if (n <= 0 && isOutputClosed)throw new AsynchronousCloseException();}return IOStatus.normalize(n);} finally {writeLock.unlock();}
}//实现自ReadableByteChannel接口的方法,将ByteBuffer数组中的数据依次写入socket
/*** Writes a byte of out of band data.*/
int sendOutOfBandData(byte b) throws IOException {writeLock.lock();try {boolean blocking = isBlocking();int n = 0;try {beginWrite(blocking);//如果是阻塞模式,则一直等待直到数据写入完毕;非阻塞模式则直接调用native方法不需要等待。if (blocking) {do {n = sendOutOfBandData(fd, b);} while (n == IOStatus.INTERRUPTED && isOpen());} else {n = sendOutOfBandData(fd, b);}} finally {endWrite(blocking, n > 0);if (n <= 0 && isOutputClosed)throw new AsynchronousCloseException();}return IOStatus.normalize(n);} finally {writeLock.unlock();}
}

ServerSocketChannel

socket

@Override
public ServerSocket socket() {synchronized (stateLock) {if (socket == null)socket = ServerSocketAdaptor.create(this);return socket;}
}

bind

@Override
public ServerSocketChannel bind(SocketAddress local, int backlog) throws IOException {synchronized (stateLock) {ensureOpen();if (localAddress != null)throw new AlreadyBoundException();InetSocketAddress isa = (local == null)? new InetSocketAddress(0): Net.checkAddress(local);SecurityManager sm = System.getSecurityManager();if (sm != null)sm.checkListen(isa.getPort());//绑定前做一些前置处理,如将tcp socket文件描述符转换成SDPNetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort());//绑定IP和地址Net.bind(fd, isa.getAddress(), isa.getPort());//开始监听,设置socket上最多可以挂起backlog个连接,若backlog小于1 则默认设置50个Net.listen(fd, backlog < 1 ? 50 : backlog);localAddress = Net.localAddress(fd);}return this;
}

accept

@Override
public SocketChannel accept() throws IOException {acceptLock.lock();try {int n = 0;FileDescriptor newfd = new FileDescriptor();InetSocketAddress[] isaa = new InetSocketAddress[1];boolean blocking = isBlocking();try {begin(blocking);do {//阻塞等待接收客户端链接n = accept(this.fd, newfd, isaa);} while (n == IOStatus.INTERRUPTED && isOpen());} finally {end(blocking, n > 0);assert IOStatus.check(n);}if (n < 1)return null;//新接收的socket初始设置为阻塞模式(因此非阻塞模式的每次需要显示设置)// newly accepted socket is initially in blocking modeIOUtil.configureBlocking(newfd, true);InetSocketAddress isa = isaa[0];//用新接收的socket创建SocketChannelSocketChannel sc = new SocketChannelImpl(provider(), newfd, isa);// check permitted to accept connections from the remote addressSecurityManager sm = System.getSecurityManager();if (sm != null) {try {sm.checkAccept(isa.getAddress().getHostAddress(), isa.getPort());} catch (SecurityException x) {sc.close();throw x;}}return sc;} finally {acceptLock.unlock();}
}

ServerSocketChannel并没有read和write方法,只是继承了AbstractSelectableChannel,以便在selector中使用

DatagramChannel

open

public DatagramChannelImpl(SelectorProvider sp)throws IOException
{super(sp);ResourceManager.beforeUdpCreate();try {//如果不支持IPv6则使用IPv4this.family = Net.isIPv6Available()? StandardProtocolFamily.INET6: StandardProtocolFamily.INET;//设置非流式的socket(tcp是流模式协议,udp是数据报模式协议)this.fd = Net.socket(family, false);this.fdVal = IOUtil.fdVal(fd);} catch (IOException ioe) {ResourceManager.afterUdpClose();throw ioe;}
}

receive

public SocketAddress receive(ByteBuffer dst) throws IOException {if (dst.isReadOnly())throw new IllegalArgumentException("Read-only buffer");readLock.lock();try {boolean blocking = isBlocking();int n = 0;ByteBuffer bb = null;try {SocketAddress remote = beginRead(blocking, false);boolean connected = (remote != null);SecurityManager sm = System.getSecurityManager();if (connected || (sm == null)) {// connected or no security managerdo {n = receive(fd, dst, connected);} while ((n == IOStatus.INTERRUPTED) && isOpen());if (n == IOStatus.UNAVAILABLE)return null;} else {// Cannot receive into user's buffer when running with a// security manager and not connectedbb = Util.getTemporaryDirectBuffer(dst.remaining());for (;;) {do {n = receive(fd, bb, connected);} while ((n == IOStatus.INTERRUPTED) && isOpen());if (n == IOStatus.UNAVAILABLE)return null;InetSocketAddress isa = (InetSocketAddress)sender;try {sm.checkAccept(isa.getAddress().getHostAddress(),isa.getPort());} catch (SecurityException se) {// Ignore packetbb.clear();n = 0;continue;}bb.flip();dst.put(bb);break;}}//sender:发送方地址, Set by receive0 (## ugh)assert sender != null;return sender;} finally {if (bb != null)Util.releaseTemporaryDirectBuffer(bb);endRead(blocking, n > 0);assert IOStatus.check(n);}} finally {readLock.unlock();}
}

send

public int send(ByteBuffer src, SocketAddress target)throws IOException
{Objects.requireNonNull(src);InetSocketAddress isa = Net.checkAddress(target, family);writeLock.lock();try {boolean blocking = isBlocking();int n = 0;try {//当connect后,remote会设置为连接的地址SocketAddress remote = beginWrite(blocking, false);if (remote != null) {// connectedif (!target.equals(remote)) {throw new AlreadyConnectedException();}do {n = IOUtil.write(fd, src, -1, nd);} while ((n == IOStatus.INTERRUPTED) && isOpen());} else {// not connectedSecurityManager sm = System.getSecurityManager();if (sm != null) {InetAddress ia = isa.getAddress();if (ia.isMulticastAddress()) {sm.checkMulticast(ia);} else {sm.checkConnect(ia.getHostAddress(), isa.getPort());}}do {n = send(fd, src, isa);} while ((n == IOStatus.INTERRUPTED) && isOpen());}} finally {endWrite(blocking, n > 0);assert IOStatus.check(n);}return IOStatus.normalize(n);} finally {writeLock.unlock();}
}

connect

@Override
public DatagramChannel connect(SocketAddress sa) throws IOException {InetSocketAddress isa = Net.checkAddress(sa, family);SecurityManager sm = System.getSecurityManager();if (sm != null) {InetAddress ia = isa.getAddress();if (ia.isMulticastAddress()) {sm.checkMulticast(ia);} else {sm.checkConnect(ia.getHostAddress(), isa.getPort());sm.checkAccept(ia.getHostAddress(), isa.getPort());}}readLock.lock();try {writeLock.lock();try {synchronized (stateLock) {ensureOpen();if (state == ST_CONNECTED)throw new AlreadyConnectedException();int n = Net.connect(family,fd,isa.getAddress(),isa.getPort());if (n <= 0)throw new Error();      // Can't happen// connectedremoteAddress = isa;state = ST_CONNECTED;// refresh local addresslocalAddress = Net.localAddress(fd);// flush any packets already received.boolean blocking = isBlocking();if (blocking) {IOUtil.configureBlocking(fd, false);}try {ByteBuffer buf = ByteBuffer.allocate(100);while (receive(buf) != null) {buf.clear();}} finally {if (blocking) {IOUtil.configureBlocking(fd, true);}}}} finally {writeLock.unlock();}} finally {readLock.unlock();}return this;
}

udp是数据报模式的协议,是没有connect的。这里的connect实际上是在底层忽略了与其他地址的数据传输。
在connect后,就可以像socketChannel似得使用read和write了

总结

本文学习了各种channel的实现,主要是对底层native方法的一些封装,针对不同属性的实体(文件、socket),使用对应的channel与byteBuffer传输数据。再通过byteBuffer与byte数据进行转换。
channel的实现中,封装了大量的native方法,重要的底层实现全在native中,后续可以深入学习下。

本文中出现的byteBuffer和selector将在接下来的文章中,单独分析。


原文链接
本文为云栖社区原创内容,未经允许不得转载。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/518472.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【IPF2020】浪潮集团执行总裁、首席科学家王恩东:智慧计算、源动新基建

CSDN记者于前方报道 众所周知计算力就是生产力&#xff0c;智慧计算改造升级了生产力三要素并最终驱动了人类社会的转型升级。具体来说&#xff0c;智慧计算将劳动者由人变成了人与人工智能的结合体&#xff0c;以此可以顺利实现指数级增长&#xff0c;将数据变成一种创新生产…

使用Spark Streaming SQL基于时间窗口进行数据统计

1.背景介绍 流式计算一个很常见的场景是基于事件时间进行处理&#xff0c;常用于检测、监控、根据时间进行统计等系统中。比如埋点日志中每条日志记录了埋点处操作的时间&#xff0c;或者业务系统中记录了用户操作时间&#xff0c;用于统计各种操作处理的频率等&#xff0c;或…

html-网页基本标签

<!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>基本标签学习</title> </head> <body><!-- 标题标签 --> <h1>一级标签</h1> <h2>二级标签</h2> <…

阿里AI再出神器,“你是什么垃圾”一拍便知

“干垃圾&#xff0c;还是湿垃圾&#xff1f;你是什么垃圾&#xff1f;” 相信魔都的小伙伴已经要被垃圾分类逼疯了&#xff0c;还要面临垃圾桶前&#xff0c;志愿者们的灵魂一问&#xff1a;“你是什么垃圾&#xff1f;” 更糟糕的是&#xff0c;垃圾分类&#xff0c;还要“…

【IPF2020】浪潮集团高级副总裁彭震:智算中心 筑基智慧世界

【快讯】浪潮关注智算中心&#xff0c;据浪潮集团高级副总裁彭震来看主要归结为几个主要问题&#xff0c;分别是算力、数据以及互联。此外针对智算中心的分析往往不仅仅是一个中心的单一要素&#xff0c;更多是很多中心之间彼此互联的关系&#xff0c;如何解决多元融合的问题才…

html-图像标签

<!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>图像标签学习</title> </head> <body> <!-- img学习 src: 图片地址 必填相对地址&#xff08;推荐&#xff09; &#xff0c; …

工程师如何给女友买包?问问阿里“百事通”

阿里妹导读&#xff1a;工作那么忙&#xff0c;怎么给女朋友买包&#xff1f;是翻看包包的详情页&#xff0c;再从商品评论中去找信息吗&#xff1f;为了帮助类似的同学节省时间&#xff0c;阿里工程师们提出快速回答生成模型RAGE。你问它答&#xff0c;这个“百事通”能从整体…

如何成功构建大规模 Web 搜索引擎架构?

Web搜索引擎十分复杂&#xff0c;我们的产品是一个分布式系统&#xff0c;在性能和延迟方面有非常苛刻的要求。除此之外&#xff0c;这个系统的运营也非常昂贵&#xff0c;需要大量人力&#xff0c;当然也需要大量金钱。这篇文章将探讨我们使用的一些技术栈&#xff0c;以及我们…

html-超链接标签

一、 a标签 <!-- a标签 href: 必填&#xff0c; 表示要跳转到哪个页面 target: 表示窗口在哪里打开_blank 在新标签中打开_self 在当前网页打开 --><a href"1.我的第一个网页.html" target"_blank">点击跳转到第一个页面</a> <a …

运维编排场景系列----给实例加到SLS机器组

场景简介 我们经常会有这样的运维场景&#xff0c;扩容一批机器需要配置SLS日志&#xff0c;对于已经配置好的SLS Logstore后&#xff0c;我们只需要将机器加到机器组里。 解决方案 传统的解决方案是登录每台ecs实例并安装logtail&#xff0c;执行的命令为 wget http://log…

UI2CODE复杂背景无法识别?闲鱼工程师这样打造高准确率方案

引言: 复杂背景内容提取指的是从复杂的背景中提取出特定的内容&#xff0c;例如在图片中提取特定的文字&#xff0c;在图片中提取特定的叠加图层等等。 这是一个业界难题&#xff0c;基于传统的图像处理的方法存在准确率和召回率的问题&#xff0c;没法解决语义的问题。而主流…

万字干货:一步步教你如何在容器上构建持续部署!

作者| 倚天码农责编| 徐威龙封图| CSDN下载于视觉中国要想理解持续集成和持续部署&#xff0c;先要了解它的部分组成&#xff0c;以及各个组成部分之间的关系。下面这张图是我见过的最简洁、清晰的持续部署和集成的关系图。图源&#xff1a;sonatype持续部署如上图所示&#xf…

html-列表标签

<!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>列表学习</title> </head> <body><!-- 有序列表 应用范围&#xff1a; 试卷&#xff0c;问答。。。 --> <ol><li>…

阿里云环境中TLS/SSL握手失败的场景分析

TLS/SSL握手是一个相对复杂的过程&#xff0c;在阿里云环境中结合产品&#xff0c;安全等特性&#xff0c;可能会让TLS/SSL握手过程的不定性更多。本文来总结下各种握手失败的场景。 一次TLS/SSL握手的过程 本文不详细介绍TLS/SSL基础知识&#xff0c;相关介绍可以参考文章。…

千亿级的数据难题,优酷工程师怎么解决?

阿里妹导读&#xff1a;优酷一天的日志量会达到千亿级别&#xff0c;面对如此大的数据样本&#xff0c;2017年5月&#xff0c;优酷完成了从Hadoop迁移到阿里云MaxCompute&#xff0c;实现计算消耗和储存的消耗呈下降趋势&#xff0c;得到了非常大的收益。今天&#xff0c;阿里数…

热搜!华为:这类程序员领10亿,程序员:真香!你怎么看?

人工智能真的玩大了吗&#xff1f;人工智能行业的人才真的“爆发了&#xff1f;”华为&#xff1a;10亿培养AI人才程序员&#xff1a;真香&#xff01;你怎么看&#xff1f;最近&#xff0c;在AI圈里&#xff0c;发生了这样一件大事,华为宣布&#xff1a;计划投入10亿元人民币用…

html-表格标签

<!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>表格学习</title> </head> <body> <!-- 表格table 行 tr rows 列 td --> <table border"1px"><tr><…

玩转运维编排服务的权限:Assume Role+Pass Role

什么是运维编排服务&#xff1f; 阿里云运维编排服务&#xff08;Operation Orchestration Service&#xff0c;简称OOS&#xff09;是云上的自动化运维平台&#xff0c;提供运维任务的管理和执行。典型使用场景包括&#xff1a;事件驱动运维&#xff0c;批量操作运维&#xf…

机器学习在高德搜索建议中的应用优化实践

导读&#xff1a;高德的愿景是&#xff1a;连接真实世界&#xff0c;让出行更美好。为了实现愿景&#xff0c;我们要处理好LBS大数据和用户之间的智能链接。信息检索是其中的关键技术&#xff0c;而搜索建议又是检索服务不可或缺的组成部分。 本文将主要介绍机器学习在高德搜索…

IntelliJ IDEA 2020.x 入门到爱不释手

文章目录一、默认快捷键二、案例演示2.1. 查看最近浏览过的文件 | ctrle2.2. 根据行号定位代码 | ctrlg2.3. 导航栏快速切换2.4. 按照文本的内容替换-整个项目 |CtrlShiftr2.5. 按照文本的内容查找-整个项目 | CtrlShiftF2.6. 快速生成|ALTENTER2.7. 生成try..catch..等方法块 …