Netty writeAndFlush() 流程与异步

Netty writeAndFlush()方法分为两步, 先 write 再 flush

    @Overridepublic ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {DefaultChannelHandlerContext next;next = findContextOutbound(MASK_WRITE);ReferenceCountUtil.touch(msg, next);next.invoker.invokeWrite(next, msg, promise);next = findContextOutbound(MASK_FLUSH);next.invoker.invokeFlush(next);return promise;}

以上是DefaultChannelHandlerContext中的writeAndFlush方法, 可见实际上是先调用了write, 然后调用flush

1. write

write方法从TailHandler开始, 穿过中间自定义的各种handler以后到达HeadHandler, 然后调用了HeadHandler的成员变量Unsafe的write

如下

        @Overridepublic void write(Object msg, ChannelPromise promise) {ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;if (outboundBuffer == null) {// If the outboundBuffer is null we know the channel was closed and so// need to fail the future right away. If it is not null the handling of the rest// will be done in flush0()// See https://github.com/netty/netty/issues/2362
                safeSetFailure(promise, CLOSED_CHANNEL_EXCEPTION);// release message now to prevent resource-leak
                ReferenceCountUtil.release(msg);return;}outboundBuffer.addMessage(msg, promise);}

最终会把需要write的msg和promise(也就是一个future, 我们拿到手的future, 添加Listener的也是这个)放入到outboundBuffer中, msg和promise在outboundBuffer中的存在形式是一个自定义的结构体Entry.

也就是说调用write方法实际上并不是真的将消息写出去, 而是将消息和此次操作的promise放入到了一个队列中

2. flush

flush也是从Tail开始, 最后到Head, 最终调用的也是Head里的unsafe的flush0()方法, 然后flush0()里再调用doWrite()方法, 如下:

 @Overrideprotected void doWrite(ChannelOutboundBuffer in) throws Exception {int writeSpinCount = -1;for (;;) {Object msg = in.current();if (msg == null) {// Wrote all messages.
                clearOpWrite();break;}if (msg instanceof ByteBuf) {ByteBuf buf = (ByteBuf) msg;int readableBytes = buf.readableBytes();if (readableBytes == 0) {in.remove();continue;}boolean setOpWrite = false;boolean done = false;long flushedAmount = 0;if (writeSpinCount == -1) {writeSpinCount = config().getWriteSpinCount();}for (int i = writeSpinCount - 1; i >= 0; i --) {int localFlushedAmount = doWriteBytes(buf); // 这里才是实际将数据写出去的地方if (localFlushedAmount == 0) {setOpWrite = true;break;}flushedAmount += localFlushedAmount;if (!buf.isReadable()) {done = true;break;}}in.progress(flushedAmount);if (done) {in.remove();} else {incompleteWrite(setOpWrite);break;}} else if (msg instanceof FileRegion) {FileRegion region = (FileRegion) msg;boolean setOpWrite = false;boolean done = false;long flushedAmount = 0;if (writeSpinCount == -1) {writeSpinCount = config().getWriteSpinCount();}for (int i = writeSpinCount - 1; i >= 0; i --) {long localFlushedAmount = doWriteFileRegion(region);if (localFlushedAmount == 0) {setOpWrite = true;break;}flushedAmount += localFlushedAmount;if (region.transfered() >= region.count()) {done = true;break;}}in.progress(flushedAmount);if (done) {in.remove(); // 根据写出的数据的数量情况, 来判断操作是否完成, 如果完成则调用 in.remove()} else {incompleteWrite(setOpWrite);break;}} else {throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg));}}}

红字部分就是最后将数据写出去的地方, 这里写数据最终调用的是 GatheringByteChannel 的 write() 方法, 这是个原生Java接口, 具体实现依赖于实现这个接口的Java类, 例如会调用 NIO 的 SocketChannel 的write()方法, 至此, 实际写数据的过程出现了, SocketChannel可以运行在non-blocking模式, 也就是非阻塞异步模式, write数据会马上返回写入的数据数量 (并不一定是所有数据都写入成功, 对于是否写入了所有数据, Netty有自己的处理逻辑, 也就是上面代码中的红字的那段for循环, 具体参看下SocketChannel的javadoc和netty源码).

当所有数据写入SocketChannel成功, 开始调用in.remove(), 这个 in 就是第一步 1. write 里的那个 outboundBuffer, 他的类型是 ChannelOutboundBuffer, 代码如下:

    public final boolean remove() {if (isEmpty()) {return false;}Entry e = buffer[flushed];Object msg = e.msg;if (msg == null) {return false;}ChannelPromise promise = e.promise;int size = e.pendingSize;e.clear();flushed = flushed + 1 & buffer.length - 1;if (!e.cancelled) {// only release message, notify and decrement if it was not canceled before.
            safeRelease(msg);safeSuccess(promise); // 这里, 调用了promise的trySuccess()方法, 触发ListenerdecrementPendingOutboundBytes(size);}return true;}

最后会调用Promise的notifyListeners()操作, 触发Listener完成整个异步流程

---------

最后, 回到我们应用netty的时候的代码

@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {ctx.writeAndFlush(new Object()).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (future.isSuccess()) {// do sth} else {// do sth
                }}});}

这就是整个流程

 

最后提一下, Netty的AbstractNioChannel里封装了selectionKey, 在accept socket的时候, socket会被注册到eventLoop()的Selector, 这个selectionKey就会被赋值,  如下

selectionKey = javaChannel().register(eventLoop().selector, 0, this);

在以后Selector的select()的时候,  则会通过这个key来获取到channel, 然后调用 AbstractChannel 里的 DefaultChannelPipeline 来触发 Handler 的 connect, read, write 等等事件...

 

转载于:https://www.cnblogs.com/zemliu/p/3667332.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/359933.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JavaScript学习总结(六)——JavaScript判断数据类型总结

最近做项目中遇到了一些关于javascript数据类型的判断处理,上网找了一下资料,并且亲自验证了各种数据类型的判断,在此做一个总结吧! 一、JS中的数据类型 1.数值型(Number):包括整数、浮点数。 2…

material 项目_Web开发必备的 10 个开源项目,不用自己亲自造轮子!

来自:Java and Python君Web 开发中几乎的平台都需要一个后台管理,但是从零开发一套后台控制面板并不容易,幸运的是有很多开源免费的后台控制面板可以给开发者使用,那么有哪些优秀的开源免费的控制面板呢?我在 Github 上…

香辣弹簧:自动接线的不同方法

我想展示使用Spring的Autowired批注的不同方式: Constructor , Method和Field自动装配。 我展示的示例都是byType自动装配模式的一种形式( constructor自动装配模式类似于byType )。 请参阅Spring Reference指南 ,以获…

SVN部署(本地)

1.安装TortoiseSVN 2.建立Repository。在F:/下建立文件夹SVN_NATIVE_REPOSITORY, —— 》 3.在其他任意路径建立任意名称的路径,这里为D:\MSVC Project\SVN_WORK,确保该文件夹为空 右键单击,点击SVN Checkout, 第一行 …

InetAddressImpl#lookupAllHostAddr慢/挂起

自从我升级到优胜美地以来,我已经注意到尝试解析我的家庭网络上的本地主机已经花费了很多时间(有时超过一分钟),所以我想我会尝试找出原因。 这是我的初始/ etc / hosts文件的外观,它基于我的机器的主机名是teetotal的…

mysql sqlserver schema_MySQL数据库数据迁移到SQLserver

最近因工作需要,需要将mysql数据库迁移到sqlserver,仅仅是数据迁移,因此相对比较简单。对于mysql迁移到sqlserver,我们需要使用到mysql odbc驱动,然后透过sqlserver链接服务器的方式来访问mysql。具体见下文描述。一、…

mysql客户端安装错误_windows下mysql 5.7以上版本安装及遇到的问题

(原)早些前用window安装mysql挺简单的,一个安装程序,一路下一步。2006的5.0版本,确实太早了点。于是官网上又下了一个版本,windows也是提供了二个版本Installer(安装)版和Archive(文档)版。Installer版本的后缀是.msi,…

SP2010开发和VS2010专家食谱--第二章节--工作流

本章内容: 1. 创建顺序工作流。 2. 创建带有初始表单的网站工作流。 3. 从工作流创建任务。 4. 创建自定义任务表单。 Introduction 根据国际工作流联盟(http://www.WFMC.org)标准组织,完全致力于流程,这样定义工作流&…

sklearn中eof报错_sklearn中的数据预处理和特征工程

小伙伴们大家好~o( ̄▽ ̄)ブ,今天我们看一下Sklearn中的数据预处理和特征工程,老规矩还是先强调一下,我的开发环境是Jupyter lab,所用的库和版本大家参考:Python 3.7.1(你的版本至少要…

sql显示前10行数据_SPL 简化 SQL 案例详解:计算各组前 N 行

取出各组的前N行数据是较常见的运算,比如:每个月每种产品销量最高的五天是哪五天,每位员工涨薪最多的一次是哪次,高尔夫会员成绩最差的三次是哪三次,等等。在SQL中,这类运算要用窗口函数以及keep/top/rownu…

jquery 与其他库冲突解决方案

var j jQuery.noConflict();j("div p").hide(); // 基于 jQuery 的代码$("content").style.display "none"; // 基于其他库的 $() 代码转载于:https://www.cnblogs.com/timelesszhuang/p/3677845.html

11. mysql锁机制_深入探讨MySQL锁机制

MySQL锁机制究竟是怎样的呢?这是很多人都提到过的问题,下面就为您详细介绍MySQL锁机制方面的知识,希望可以让您MySQL锁机制有更多的了解。当前MySQL已经支持 ISAM, MyISAM, MEMORY (HEAP) 类型表的表级锁了,BDB 表支持页级锁&…

4月21日会议总结(整理—祁子梁)

会议成果: 1.今天我们确定了软件版本的时间alphe版在12周做出来,在我们内部测试基本通过。 bate版在13周发布和其他组作交换测试,在14周release版发布并给其他人使用体验准备15周的演讲。 2.同时确定了部分功能实现顺序,”谁是卧底…

通达信金融终端_尘缘整合_V7.12

http://pan.baidu.com/s/1gvtPO http://pan.baidu.com/s/1xqrk6 通达信金融终端_尘缘整合_V7.12转载于:https://www.cnblogs.com/mier001/p/3679701.html

5天玩转mysql视频教程_六天带你玩转MySQL

教程列表:01数据库课程介绍02数据库(基础知识)03数据库(关系型数据库)04数据库(关系型数据库关键字说明)05数据库(SQL)06数据库(mysql数据库)07数据库(mysql服务器数据对象)08SQL基本操作(新增数据库)09SQL基本操作(查看数据库)10SQL基本操作(更新数据库)12SQL基本操…

winxp精简版没有IIS的解决办法

首先在“开始”菜单的“运行”中输入“c:\Windows\inf\sysoc.inf”,系统会自动使用记事本打开sysoc.inf这个文件。在sysoc.inf中找到“[Components]”这一段,因为是XP简化版,所以里面东西很少,在里面加上这段:“iisiis…

ant vue 兼容性问题_ant design for vue 关于table的一些问题

1、为table添加分页: :pagination"pagination"pagination: {defaultPageSize: 10,showTotal: (total) > 共${total} 条数据,total: 0,showSizeChanger: true,pageSizeOptions: [10, 20, 50],onShowSizeChange: (current, pageSize) > {this.pageSiz…

Coder-Strike 2014 - Finals (online edition, Div. 2) A. Pasha and Hamsters

水题 #include <iostream> #include <vector> #include <algorithm>using namespace std;int main(){int n,a,b;cin >> n >>a >> b;vector<int> apple(n1,0);int k;for(int i 0 ; i < a; i) {cin>>k;apple[k] 1;}for(…

如何查看mysql的gtid_汇总丨MySQL GTID技术点,看这一篇就够了!

mysql> SELECT * FROM mysql.gtid_executed;mysql.gtid_executed表是由MySQL服务器提供给内部使用的。它允许副本在副本上禁用二进制日志记录时使用GTIDs&#xff0c;并允许在二进制日志丢失时保留GTID状态。RESET MASTER命令&#xff0c;gtid_executed表将被清除。服务意外…

为JPA的本机查询API键入安全查询

当您使用JPA时-有时-JPQL无法解决问题&#xff0c;您将不得不使用本机SQL。 从一开始&#xff0c;像Hibernate这样的ORM就为这些情况保留了一个开放的“后门”&#xff0c;并为Spring的JdbcTemplate &#xff0c; Apache DbUtils或jOOQ提供了类似的API&#xff0c;用于纯SQL 。…