消息队列NetMQ 原理分析2-IO线程和完成端口

目录

  • 前言
    • 介绍
    • 目的
  • IO线程
  • 初始化IO线程
    • Proactor
    • 启动Procator线程轮询
    • 处理socket
  • IOObject
  • 总结

前言

介绍

[NetMQ](https://github.com/zeromq/netmq.git)是ZeroMQ的C#移植版本,它是对标准socket接口的扩展。它提供了一种异步消息队列,多消息模式,消息过滤(订阅),对多种传输协议的无缝访问。
当前有2个版本正在维护,版本3最新版为3.3.4,版本4最新版本为4.0.0-rc5。本文档是对4.0.0-rc5分支代码进行分析。

zeromq的英文文档
NetMQ的英文文档

目的

对NetMQ的源码进行学习并分析理解,因此写下该系列文章,本系列文章暂定编写计划如下:

  1. 消息队列NetMQ 原理分析1-Context和ZObject
  2. 消息队列NetMQ 原理分析2-IO线程和完成端口
  3. 消息队列NetMQ 原理分析3-命令产生/处理和回收线程
  4. 消息队列NetMQ 原理分析4-Session和Pipe
  5. 消息队列NetMQ 原理分析5-Engine
  6. 消息队列NetMQ 原理分析6-TCP和Inpoc实现
  7. 消息队列NetMQ 原理分析7-Device
  8. 消息队列NetMQ 原理分析8-不同类型的Socket
  9. 消息队列NetMQ 原理分析9-实战

友情提示: 看本系列文章时最好获取源码,更有助于理解。


IO线程

NetMQ 4.0.0底层使用的是IOCP(即完成端口)模式进行通信的(3.3.4使用的是select模型),通过异步IO绑定到完成端口,来最大限度的提高性能。这里不对同步/异步socket进行详细介绍。稍微解释下完成端口,为了解决每个socket客户端使用一个线程进行通信的性能问题,完成端口它充分利用内核对象的调度,只使用少量的几个线程来处理和客户端的所有通信,消除了无谓的线程上下文切换,最大限度的提高了网络通信的性能。
想详细了解完成端口的请看完成端口(Completion Port)详解,讲解的比较详细,同时对各种网络编程模型做了简单的介绍。
因此NetMQ通过几个(默认1个)IO线程处理通信,上一片文章介绍了ZObejct对象,在该对象中存在许多命令的处理,实际对命令的发送,分配都是IO线程的工作。

初始化IO线程

IO线程初始化时会初始化ProactorIOThreadMailbox

var name = "iothread-" + threadId;
m_proactor = new Proactor(name);
m_mailbox = new IOThreadMailbox(name, m_proactor, this);

Proactor对象就是用来绑定或处理完成端口用的,后面再做作详细介绍。
IOThreadMailbox是IO线程处理的信箱,每当有命令需要处理时,都会向当前Socket对象所在的IO线程信箱发送命令。
让我们看一眼IOThread对象和IOThreadMailbox的定义

internal sealed class IOThread : ZObject, IMailboxEvent
{
}

IOThread对象继承自ZObject对象,记得上一节想到ZObject对象知道如何处理各种命令吗?因此IOThread对象也继承了他父亲的技能。同时IOThread对象实现了IMailboxEvent接口,这个接口之定义了一个方法。

internal interface IMailboxEvent
{void Ready();
}

当IO信箱接受到命令时表示当前有命令准备好了,可以进行 处理,IO信箱则会调用IO线程的Ready方法处理命令,那么IO信息如何调用IO线程的Ready方法呢,来看下IOThreadMailbox的构造函数。

internal class IOThreadMailbox : IMailbox
{...public IOThreadMailbox([NotNull] string name, [NotNull] Proactor proactor, [NotNull] IMailboxEvent mailboxEvent){m_proactor = proactor;m_mailboxEvent = mailboxEvent;Command cmd;bool ok = m_commandPipe.TryRead(out cmd);}...
}

在IOThreadMailbox初始化时,传入了IMailboxEvent。

m_commandPipe是NetMQ的管道(Pipe),后面我们会对其做介绍,这里只要知道该管道用于存放命令即可,可以__暂时__理解为管道队列。

Proactor

每个IOThread会有一个Proactor,Proactor的工作就是将Socket对象绑定到完成端口,然后定时去扫描完成端口是否有需要处理的Socket对象。

internal class Proactor : PollerBase
{...public Proactor([NotNull] string name){m_name = name;m_stopping = false;m_stopped = false;m_completionPort = CompletionPort.Create();m_sockets = new Dictionary<AsyncSocket, Item>();}...
}

Proactor对象继承自PollerBase,那么PollerBase又是什么呢?从命名可以看这是一个轮询基类,即该对象需要长时间不断循环处理某件事情。
PollerBase对象是一个抽象类,它有2个功能:

  1. 负载均衡

    还记的Context中选择IO线程时有这个一段代码吗?
    580757-20181020185114964-1158578268.png
    IO线程的负载均衡功能就是PollBase对象提供的

    每次选择IO线程时会将m_load字段值+1
    protected void AdjustLoad(int amount) { Interlocked.Add(ref m_load, amount); }
    public int Load { get { #if NETSTANDARD1_3 return Volatile.Read(ref m_load); #else Thread.MemoryBarrier(); return m_load; #endif } }
    IOThread取PollBase对象(Proactor)的Load属性时候会特殊处理,保证拿到的是最新的值。
  2. 定时任务
    PollBase第二个功能就是支持定时任务,即定时触发某事件。

    private readonly SortedList<long, List<TimerInfo>> m_timers;

    PollBase内部有一个SortedList,key为任务执行的时间,value为TimeInfo
    TimeInfo对象包含2个信息,idITimerEvent接口,id用来辨别当前任务的类型,ITimerEvent接口就包含了TimerEvent方法,即如何执行。
    TcpConnection连接失败会重新连接时会重连,下面时TcpConnection开始连接方法

    private void StartConnecting()
    {Debug.Assert(m_s == null);// Create the socket.try{m_s = AsyncSocket.Create(m_addr.Resolved.Address.AddressFamily, SocketType.Stream, ProtocolType.Tcp);}catch (SocketException){AddReconnectTimer();return;}...
    }
    private void AddReconnectTimer()
    {//获取重连时间间隔int rcIvl = GetNewReconnectIvl();//IO线程的Proactor中,TcpConnection的ReconnectTimerId = 1 m_ioObject.AddTimer(rcIvl, ReconnectTimerId);...
    }

    IO线程会被封装到IOObject中,调用IOObjectAddTimer方法实际就是调用IO线程中Proactor对象的AddTimer方法,其方法定义如下

    public void AddTimer(long timeout, [NotNull] IProactorEvents sink, int id)
    {long expiration = Clock.NowMs() + timeout;var info = new TimerInfo(sink, id);if (!m_timers.ContainsKey(expiration))m_timers.Add(expiration, new List<TimerInfo>());m_timers[expiration].Add(info);
    }

    第一行会获取当前的毫秒时间加上时间间隔。然后加入到m_timers中。

m_completionPort = CompletionPort.Create();
m_sockets = new Dictionary<AsyncSocket, Item>();

初始化时会创建完成端口,当有socket需要处理时会和完成端口绑定。
初始化时还会初始化一个存放异步AsyncSocketitem的字典。
有关于AsyncSocketCompletionPort可以去Git上看AsyncIO的源码,这里不做分析。
Item结构如下

private class Item
{public Item([NotNull] IProactorEvents proactorEvents){ProactorEvents = proactorEvents;Cancelled = false;}[NotNull] public IProactorEvents ProactorEvents { get; }public bool Cancelled { get; set; }
}

它包含了IProactorEvents接口的信息和当前Socket操作是否被取消标志。

internal interface IProactorEvents : ITimerEvent
{void InCompleted(SocketError socketError, int bytesTransferred);void OutCompleted(SocketError socketError, int bytesTransferred);
}

IProactorEvents继承自ITimerEvent。同时它还声明了InCompletedOutCompleted方法,即发送或接收完成时如何处理,因此当需要处理Socket时,会将当前Socket处理方式保存到这个字典中。当当前对象发送消息完成,则会调用OutCompleted方法,接收完成时则会调用InCompleted方法。
当有Socket需要绑定时会调用ProactorAddSocket方法

public void AddSocket(AsyncSocket socket, IProactorEvents proactorEvents)
{var item = new Item(proactorEvents);m_sockets.Add(socket, item);m_completionPort.AssociateSocket(socket, item);AdjustLoad(1);
}

它包含2个参数,一个时异步Socket对象和IProactorEvents。然后加把他们加入到字段中并将他们绑定到完成端口上。第四段AdjustLoad方法即把当前IO线程处理数量+1,用于负载均衡用。

Socket操作完成时会调用ProactorRemoveSocket移除绑定

public void RemoveSocket(AsyncSocket socket)
{AdjustLoad(-1);var item = m_sockets[socket];m_sockets.Remove(socket);item.Cancelled = true;
}

移除时会将itemCancelled字段设置为true。所以当Proactor轮询处理Socket时发现该Socket操作被取消(移除),就会跳过处理。

启动Procator线程轮询

在IO线程启动时实际就是启动Procator的work线程

public void Start()
{m_proactor.Start();
}
public void Start()
{m_worker = new Thread(Loop) { IsBackground = true, Name = m_name };m_worker.Start();
}

处理socket

完整的Loop方法如下

private void Loop()
{var completionStatuses = new CompletionStatus[CompletionStatusArraySize];while (!m_stopping){// Execute any due timers.int timeout = ExecuteTimers();int removed;if (!m_completionPort.GetMultipleQueuedCompletionStatus(timeout != 0 ? timeout : -1, completionStatuses, out removed))continue;for (int i = 0; i < removed; i++){try{if (completionStatuses[i].OperationType == OperationType.Signal){var mailbox = (IOThreadMailbox)completionStatuses[i].State;mailbox.RaiseEvent();}// if the state is null we just ignore the completion statuselse if (completionStatuses[i].State != null){var item = (Item)completionStatuses[i].State;if (!item.Cancelled){switch (completionStatuses[i].OperationType){case OperationType.Accept:case OperationType.Receive:item.ProactorEvents.InCompleted(completionStatuses[i].SocketError,completionStatuses[i].BytesTransferred);break;case OperationType.Connect:case OperationType.Disconnect:case OperationType.Send:item.ProactorEvents.OutCompleted(completionStatuses[i].SocketError,completionStatuses[i].BytesTransferred);break;default:throw new ArgumentOutOfRangeException();}}}}catch (TerminatingException){ }}}
}
 var completionStatuses = new CompletionStatus[CompletionStatusArraySize];

第一行初始化了CompletionStatus数组,CompletionStatusArraySize值为100。
CompletionStatus作用是用来保存socket的信息或状态。

获取超时时间

int timeout = ExecuteTimers();
 protected int ExecuteTimers()
{if (m_timers.Count == 0)return 0;long current = Clock.NowMs();var keys = m_timers.Keys;for (int i = 0; i < keys.Count; i++){var key = keys[i];if (key > current){return (int)(key - current);}var timers = m_timers[key];foreach (var timer in timers){timer.Sink.TimerEvent(timer.Id);}timers.Clear();m_timers.Remove(key);i--;}return 0;
}

ExecuteTimers会计算之前加入到m_timers需要等待的超时时间,若没有对象则直接返回0,否则获取若获取到key时间在当前时间之前,则需要调用TimerEvent方法,调用完成后移除。
若获取到的key时间比当前时间大,则返回他们的差即为需要等待的超时时间。

从完成端口获取处理完的状态

int removed;
if (!m_completionPort.GetMultipleQueuedCompletionStatus(timeout != 0 ? timeout : -1, completionStatuses, out removed))continue;

GetMultipleQueuedCompletionStatus方法传入一个超时时间,若前面获取的超时时间为0,则这边会设置为-1,表示阻断直到有要处理的才返回。
CompletionPort内部维护了一个状态队列,removed即为处理完成返回的状态个数。
若获取成功则会返回true,后面就开始遍历completionStatuses数组处理完成Socket

开始处理待处理的状态

public struct CompletionStatus
{internal CompletionStatus(AsyncSocket asyncSocket, object state, OperationType operationType, SocketError socketError, int bytesTransferred) : this(){AsyncSocket = asyncSocket;State = state;OperationType = operationType;SocketError = socketError;BytesTransferred = bytesTransferred;}public AsyncSocket AsyncSocket { get; private set; }public object State { get; internal set; }public OperationType OperationType { get; internal set; }public SocketError SocketError { get; internal set; }public int BytesTransferred { get; internal set; }        
}

CompletionStatus是个结构体,它包含的信息如上。其中OperationType是当前Socket的处理方式。

public enum OperationType
{Send, Receive, Accept, Connect, Disconnect, Signal
} 

for循环的一开始先会判断当前状态的OperationType,若是Signal,则说明当前是个信号状态,说明有命令需要处理,则会调用IO信箱的RaiseEvent方法,实际为IO线程的Ready方法。

public void Ready()
{Command command;while (m_mailbox.TryRecv(out command))command.Destination.ProcessCommand(command);
}

IOThread会将当前信箱的所有命令进行处理。
若不是Signal则会将CompletionStatus保存的状态信息转换为Item对象,并判断当前Socket是否移除(取消)。若没有则对其进行处理。判断OperationType,若为AcceptReceive则表示需要接收,则调用InCompleted方法。若为Connect,DisconnectSend则表示有消息向外发送,则调用OutCompleted方法。

至此IOThread代码分析完毕。

IOObject

internal class IOObject : IProactorEvents
{public IOObject([CanBeNull] IOThread ioThread){if (ioThread != null)Plug(ioThread);}public void Plug([NotNull] IOThread ioThread){Debug.Assert(ioThread != null);m_ioThread = ioThread;}
}

IOObject实际就是保存了IOThread的信息和Socket处理完成时如何执行,以及向外暴露了一些接口。

再次说明,如果向简单了解完成端口如何使用,则看《完成端口使用》,如果想详细了解完成端口则看下《完成端口详细介绍》,如果想直到NetMQ的AsyncIO和完成端口的源码请看AsyncIO。

总结

该篇介绍了IO线程和完成端口的处理方式,若哪里分析的不到位或有误希望支出。


本文地址:https://www.cnblogs.com/Jack-Blog/p/6347163.html
作者博客:杰哥很忙
欢迎转载,请在明显位置给出出处及链接)

转载于:https://www.cnblogs.com/Jack-Blog/p/6347163.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/396846.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

VC连接mysql数据库错误:libmysql.lib : fatal error LNK1113: invalid machine 解决方法

VC连接MySQL的配置过程在上一篇博文中&#xff0c;不过当你设置好&#xff0c;以为万事大吉的时候&#xff0c;运行却出现这个错误&#xff1a;libmysql.lib : fatal error LNK1113: invalid machine type。 无效的机器类型&#xff0c;真的是很让人捉急。 发生这个错误的原因是…

linux 内存泄漏 定位,一种内存泄露检查和定位的方法

一个系统后台服务进程&#xff0c;可能包括多个线程&#xff0c;在生成环境下要求系统程序能够稳定长时间稳定运行而不宕机。其中一个基本的前提就是需要保证系统程序不存在内存泄露。那么&#xff0c;该如何判读系统程序是否存在内存泄露呢&#xff1f;如果存在&#xff0c;又…

ifconfig命令找不到_02. Linux命令之查看网络连接

1. 查看网络连接数和端口使用 netstat 命令查看网络连接情况netstat -anp参数&#xff1a;-a 显示所有选项-t (tcp)仅显示tcp相关选项-u (udp)仅显示udp相关选项-n 拒绝显示别名&#xff0c;能显示数字的全部转化成数字。-p 显示建立相关链接的程序名关键列解释:Proto 表示协议…

python学习之模块(pip),列表生成式,模块操作mysql,excel

python基础 生成式 列表生成式  格式 [表达式 for 表达式 in 迭代对象 (可加判断)] 原&#xff1a; 1 res1 [] 2 for i in range(1,5): 3   res1.append(i) 4 print(res1) 改&#xff1a; 1 res2 [i for i in range(1,5)] 2 print(res2) 字典生成式  格式 {key:value f…

linux驱动read函数 copytouser,Linux驱动编程 step-by-step (五)主要的文件操作方法实现...

主要的文件操作方法实现文件操作函数有很多的操作接口&#xff0c;驱动编程需要实现这些接口&#xff0c;在用户编程时候系统调用时候会调用到这些操作structfile_operations {...loff_t (*llseek) (structfile *, loff_t,int);ssize_t (*read) (structfile *,char__user *,siz…

基于光线追踪的渲染中景深(Depth of field)效果的实现

图形学离线渲染中常用的透视摄像机模型时根据小孔成像的原理建立的&#xff0c;其实现通常是从向成像平面上发射ray&#xff0c;并把trace这条ray的结果作为成像平面上对应交点的采样结果。即&#xff1a; 图片来自《Fundamentals of Computer Graphics》 现实中的镜头拍摄的图…

带你制作百词斩单词表读写插件

上篇博文简单的介绍了一下Chrome插件&#xff0c;今天就与大家分享一下我做的这款有实际意义的插件吧。 做这款插件主要是用百词斩站点进行单词学习时&#xff0c;遇到的一点点闹心事儿。在单词表中不能听发音。也不能练习拼写。所以才忍无可忍的做了这么一款插件。自我感觉还是…

iphone各机型参数对比_带你了解新款iPhone 12系列四款机型

2020年10月14日凌晨1&#xff1a;00&#xff0c;苹果召开新品发布会&#xff0c;发布了新款iPhone 12系列手机&#xff0c;“果粉”们期待已久的iPhone 12终于来了。iPhone 12系列手机共有四款机型&#xff0c;分别是iPhone 12 mini、iPhone 12、iPhone 12 Pro、iPhone 12 Pro …

高并发第一弹:准备阶段 了解高并发

高并发第一弹:准备阶段 了解高并发 首先需要知道什么并发, 什么是高并发. 并发: 关于并发的学习&#xff0c;可以从JDK提供的并发包为核心开始&#xff0c;许多其他的类和封装都是对其进行扩展或者补充&#xff0c;我们来看一下Java并发包(java.util.concurrent包&#xff0c;简…

matlab立体坐标定位_【半导光电】基于光电探测器的激光章动定位算法(二)

今日光电有人说&#xff0c;20世纪是电的世纪&#xff0c;21世纪是光的世纪&#xff1b;知光解电&#xff0c;再小的个体都可以被赋能。欢迎来到今日光电&#xff01;----与智者为伍 为创新赋能----1. 章动定位算法实验前&#xff0c;首先需要对光路进行调节&#xff0c;保证经…

Android:支持多选的本地相册

前段时间在做一个动态发布功能&#xff0c;需要用到图片上传。一开始直接调用的系统相册和相机&#xff0c;由于系统相机不支持多选&#xff0c;就花点时间做了个本地相册&#xff0c;在此开源下。 先上截图&#xff0c;依次为选择相册界面、相册详情界面、查看图片大图界面 相…

心灵与大脑

2019独角兽企业重金招聘Python工程师标准>>> http://blog.sina.com.cn/s/blog_6f034fc30102f2tg.html 转载于:https://my.oschina.net/chirnson/blog/832011

python入门心得_记初学python的一些心得

人生苦短&#xff0c;我用python&#xff01; 其实我自学python也很长一段时间了&#xff0c;但总是去更换学习资料&#xff0c;搞的现在学的不是很好&#xff0c;因为没更换次资料都要从头开始学起&#xff0c;那么分享下我的学习战况吧&#xff0c;不是很好&#xff0c;还将就…

16.U-boot的工作流程分析-2440

16.U-boot的工作流程分析-2440 分析的流程&#xff1a; 程序入口 第一阶段程序分析 第二阶段程序分析 2440开发板&#xff1a; 1.uboot的入口&#xff1a; 要看uboot工程的入口&#xff0c;首先打开顶层目录的Makefile&#xff1a; Uboot所支持的开发板&#xff0c;在顶层的Ma…

如何使用Redis做MySQL的缓存

应用Redis实现数据的读写&#xff0c;同时利用队列处理器定时将数据写入mysql。 同时要注意避免冲突&#xff0c;在redis启动时去mysql读取所有表键值存入redis中&#xff0c;往redis写数据时&#xff0c;对redis主键自增并进行读取&#xff0c;若mysql更新失败&#xff0c;则需…

psychopy 与脑电打码 eeg

2019独角兽企业重金招聘Python工程师标准>>> 实验程序就不放了&#xff0c;这里主要放如何向串口发送打码的代码 实际上&#xff0c;给脑电打码的本质就是向串口发送一个字符&#xff0c;脑电的程序会自动在收到该字符的同时在脑电数据上进行标记。以下代码打开了一…

mysql -- 索引的使用

普通索引&#xff1a;用于提升查询速度唯一索引&#xff1a;用于提升查询速度&#xff0c;还要求字段值不得重复主键索引&#xff1a;唯一性且不为空的索引全文索引&#xff1a;用于大量文本搜索中建立的索引虽然索引有好处&#xff0c;但是凡是都有俩面性&#xff0c;提高效率…

surface pro 7 linux,微软 Surface Pro、Studio、Laptop 全线更新

今晨&#xff0c;微软在纽约的秋季新品发布会上&#xff0c;发布了 Surface Pro 6、Laptop 2 以及最顶级的 Studio 2 三款备受期待的 Surface 产品。至此&#xff0c;包括年初的 Surface Book 2 在内&#xff0c;完成了2018 年 Surface 产品线所有升级计划。当然这场规模不算大…

dremwere怎样让多个图片并列排放_PPT图片布局不好看?六步教你,看完就会。

我们平时很多场合需要演示讲解时&#xff0c;少不了使用PPT。尤其目前白领层人士&#xff0c;项目总结、产品介绍、调研报告…无时无刻不被PPT包围&#xff0c;又无时无刻不被PPT设计所困恼。诚然设计好看实用的PPT需要高质量素材资源和坚实的文字功底。但在处理美化PPT时&…

ThreadLocal 你到底是个什么鬼

2019独角兽企业重金招聘Python工程师标准>>> 很多文章都拿它跟同步机制作比较&#xff0c;我觉得这个思路对于理解这个东西完全没有作用。 ThreadLocal跟synchronize这类东西作比较&#xff0c;是很多文章的套路&#xff0c;我感觉这么比较&#xff0c;就跟比较重载…