SaltStack源码分析之:master端执行salt模块大致流程

2019独角兽企业重金招聘Python工程师标准>>> hot3.png

##JOB执行流程 先看下官网对于master端的工作流程的介绍:


The Salt master works by always publishing commands to all connected minions and the minions decide if the command is meant for them by checking themselves against the command target.The typical lifecycle of a salt job from the perspective of the master might be as follows:1) A command is issued on the CLI. For example, 'salt my_minion test.ping'.
使用命令行工具生成一个条命令,如:'salt my_minion test.ping'。2) The 'salt' command uses LocalClient to generate a request to the salt master by connecting to the ReqServer on TCP:4506 and issuing the job.
'salt' 命令使用LocalClient连接本地的4506端口来发送命令。3) The salt-master ReqServer sees the request and passes it to an available MWorker over workers.ipc.
salt-master ReqServer接收请求,然后把请求通过workers.ipc分发到一个可用的MWorker中去。4) A worker picks up the request and handles it. First, it checks to ensure that the requested user has permissions to issue the command. Then, it sends the publish command to all connected minions. For the curious, this happens in ClearFuncs.publish().
一个worker线程认领请求并且处理它。首先,它检查用户是否有权限发送命令。然后,它发送一个publish类型的命令到所有连接的minions。这一步发生在ClearFuncs.publish()中。5) The worker announces on the master event bus that it is about to publish a job to connected minions. This happens by placing the event on the master event bus (master_event_pull.ipc) where the EventPublisher picks it up and distributes it to all connected event listeners on master_event_pub.ipc.
worker线程生成一个事件,说它准备将命令发送给minons。步骤是(1)worker将事件发送到master的事件总线中去(master_event_pull.ipc)。(2)EventPublisher获取这个事件,并通过master_event_pub.ipc分发给所有的订阅者。6) The message to the minions is encrypted and sent to the Publisher via IPC on publish_pull.ipc.
发送个minions的消息加密后通过publish_pull.ipc发送给Publisher。7) Connected minions have a TCP session established with the Publisher on TCP port 4505 where they await commands. When the Publisher receives the job over publish_pull, it sends the jobs across the wire to the minions for processing.
在线的minions通过TCP会话连接到master端的4505端口来等待命令。当Publisher在publish_pull接收到命令后,便把命令通过4505端口发送给minions。8) After the minions receive the request, they decrypt it and perform any requested work, if they determine that they are targeted to do so.
minions接收到请求后,首先解密请求,如果确定命令是发送给自己的,便去执行命令。9) When the minion is ready to respond, it publishes the result of its job back to the master by sending the encrypted result back to the master on TCP 4506 where it is again picked up by the ReqServer and forwarded to an available MWorker for processing. (Again, this happens by passing this message across workers.ipc to an available worker.)
当minion处理完命令后,便通过master的4506端口返回执行结果。master端的ReqServer接收到结果,再次将结果发送给MWorker去处理。(ReqServer是通过workers.ipc将消息分发给一个可用的worker线程的。)10) When the MWorker receives the job it decrypts it and fires an event onto the master event bus (master_event_pull.ipc). (Again for the curious, this happens in AESFuncs._return().
MWorker接收这个job并解密它,然后它会在master的事件总线中发布一个事件(master_event_pull.ipc)(这一步发生在AESFuncs._return()中)。11) The EventPublisher sees this event and re-publishes it on the bus to all connected listeners of the master event bus (on master_event_pub.ipc). This is where the LocalClient has been waiting, listening to the event bus for minion replies. It gathers the job and stores the result.
EventPublisher接收到这个事件,再次把它分发给所有的订阅者(通过master_event_pub.ipc)。LocalClient就在这里监听事件,等待自己需要的结果。它搜集并存储命令执行结果。12) When all targeted minions have replied or the timeout has been exceeded, the salt client displays the results of the job to the user on the CLI.
当所有的minions返回结果或者执行超时,salt客户端在界面显示结果。

##源码分析

下面介绍master执行salt模块用到的几个类,参照上面的流程阅读源码。

###salt.master.Master

创建ReqServer的代码在run_reqserver()中:

def run_reqserver(self):reqserv = ReqServer(self.opts,self.key,self.master_key)reqserv.run()

###salt.master.ReqServer

打开salt.master.ReqServer:

class ReqServer(object):'''Starts up the master request server, minions send results to thisinterface.'''def __init__(self, opts, key, mkey):'''Create a request server:param dict opts: The salt options dictionary:key dict: The user starting the server and the AES key:mkey dict: The user starting the server and the RSA key:rtype: ReqServer:returns: Request server'''self.opts = optsself.master_key = mkey# Prepare the AES keyself.key = keydef __bind(self):'''Binds the reply server'''dfn = os.path.join(self.opts['cachedir'], '.dfn')if os.path.isfile(dfn):try:os.remove(dfn)except os.error:passself.process_manager = salt.utils.process.ProcessManager(name='ReqServer_ProcessManager')req_channels = []for transport, opts in iter_transport_opts(self.opts):chan = salt.transport.server.ReqServerChannel.factory(opts)chan.pre_fork(self.process_manager)req_channels.append(chan)for ind in range(int(self.opts['worker_threads'])):self.process_manager.add_process(MWorker,args=(self.opts,self.master_key,self.key,req_channels,),)self.process_manager.run()def run(self):'''Start up the ReqServer'''try:self.__bind()except KeyboardInterrupt:log.warn('Stopping the Salt Master')raise SystemExit('\nExiting on Ctrl-c')def destroy(self):if hasattr(self, 'clients') and self.clients.closed is False:self.clients.setsockopt(zmq.LINGER, 1)self.clients.close()if hasattr(self, 'workers') and self.workers.closed is False:self.workers.setsockopt(zmq.LINGER, 1)self.workers.close()if hasattr(self, 'context') and self.context.closed is False:self.context.term()# Also stop the workersif hasattr(self, 'process_manager'):self.process_manager.kill_children()def __del__(self):self.destroy()

代码比较简单,主要的功能在_bind()方法中,它根据配置文件的中worker_threads生成数个worker线程。

###salt.master.MWorker

salt.master.MWorker类中,也是通过_bind()方法来接收请求的:

def __bind(self):'''Bind to the local port'''# using ZMQIOLoop since we *might* need zmq in therezmq.eventloop.ioloop.install()self.io_loop = zmq.eventloop.ioloop.ZMQIOLoop()for req_channel in self.req_channels:req_channel.post_fork(self._handle_payload, io_loop=self.io_loop)  # TODO: cleaner? Maybe lazily?self.io_loop.start()

核心语句在req_channel.post_fork(self._handle_payload, io_loop=self.io_loop),它将接收到的请求交给self._handle_payload处理,我们看下_handle_payload方法:

@tornado.gen.coroutine
def _handle_payload(self, payload):'''The _handle_payload method is the key method used to figure out whatneeds to be done with communication to the serverExample cleartext payload generated for 'salt myminion test.ping':{'enc': 'clear','load': {'arg': [],'cmd': 'publish','fun': 'test.ping','jid': '','key': 'alsdkjfa.,maljf-==adflkjadflkjalkjadfadflkajdflkj','kwargs': {'show_jid': False, 'show_timeout': False},'ret': '','tgt': 'myminion','tgt_type': 'glob','user': 'root'}}:param dict payload: The payload route to the appropriate handler'''key = payload['enc']load = payload['load']ret = {'aes': self._handle_aes,'clear': self._handle_clear}[key](load)raise tornado.gen.Return(ret)

在代码的最后一行可以看到,如果key是'aes'的话就调用self._handle_aes方法,它是用来处理minion返回的结果的;如果key是'clear'的话就调用self._handle_clear方法,它是用来处理master发送的命令的。

看下self. _handle_clear方法:

def _handle_clear(self, load):'''Process a cleartext command:param dict load: Cleartext payload:return: The result of passing the load to a function in ClearFuncs corresponding tothe command specified in the load's 'cmd' key.'''log.trace('Clear payload received with command {cmd}'.format(**load))if load['cmd'].startswith('__'):return Falsereturn getattr(self.clear_funcs, load['cmd'])(load), {'fun': 'send_clear'}

重点是最后一句,它根据load['cmd']的值来调用self.clear_funcs中的对应方法,执行salt模块时,load['cmd']的值是publishself.clear_funcssalt.master.ClearFuncs的实例化对象,salt.master.ClearFuncs介绍见下文。

self. _handle_aes方法跟self. _handle_clear方法类似:

def _handle_aes(self, data):'''Process a command sent via an AES key:param str load: Encrypted payload:return: The result of passing the load to a function in AESFuncs corresponding tothe command specified in the load's 'cmd' key.'''if 'cmd' not in data:log.error('Received malformed command {0}'.format(data))return {}log.trace('AES payload received with command {0}'.format(data['cmd']))if data['cmd'].startswith('__'):return Falsereturn self.aes_funcs.run_func(data['cmd'], data)

当salt-minion返回命令的结果时data['cmd']的值是_return,看下run_func的源码可知其调用的是salt.master.AESFuncs_return方法,salt.master.AESFuncs介绍见下文。

###salt.master.ClearFuncs

ClearFuncs.publish方法开始的部分是进行身份认证,认证通过后会生成一条事件来说明即将发送消息:

payload = self._prep_pub(minions, jid, clear_load, extra)

self._prep_pub中核心代码是这一行:

self.event.fire_event(new_job_load, tagify([clear_load['jid'], 'new'], 'job'))

最后发送消息给minions:

self._send_pub(payload)

self._send_pub方法很简单,调用底层的消息队列发送消息:

def _send_pub(self, load):'''Take a load and send it across the network to connected minions'''for transport, opts in iter_transport_opts(self.opts):chan = salt.transport.server.PubServerChannel.factory(opts)chan.publish(load)

###salt.master.AESFuncs

看下_return方法源码:

def _return(self, load):'''Handle the return data sent from the minions.Takes the return, verifies it and fires it on the master event bus.Typically, this event is consumed by the Salt CLI waiting on the otherend of the event bus but could be heard by any listener on the bus.:param dict load: The minion payload'''try:salt.utils.job.store_job(self.opts, load, event=self.event, mminion=self.mminion)except salt.exception.SaltCacheError:log.error('Could not store job information for load: {0}'.format(load))

可以看到,主要代码在salt.utils.job.store_job中,核心代码在这里:

if event:# If the return data is invalid, just ignore itlog.info('Got return from {id} for job {jid}'.format(**load))event.fire_event(load, tagify([load['jid'], 'ret', load['id']], 'job'))event.fire_ret_load(load)

往事件总线里面发送消息。

##总结 这里只是大致介绍了大致的流程,其中关于数据如何在消息队列间流转的,没有细写,以后有机会再单独写篇博客介绍下。

转载于:https://my.oschina.net/fmnisme/blog/553004

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/399115.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

myecplise新建Maven项目Filter选什么,使用myeclipse建立maven项目

myecplise新建Maven项目Filter选什么 使用myeclipse建立maven项目 1234567分步阅读maven是管理项目的,myeclipse是编写代码的。第一次写项目都要配置好多东西,很麻烦,now 来看看怎样新建一个maven项目。 工具/原料 myeclipsemaven方法/步骤 1…

关于TCP/IP与数据传输

一、TCP/IP的具体含义: 从字面意思来讲,很多人会认为TCP/IP是指TCP与IP这两种协议。有时确实也可以说是这两种协议,但是大部分情况下所说的是利用IP进行通信时所必须用到的协议群的统称。具体来说IP,ICMP,TCP,UDP,FTP以及HTTP等都属于TCP/IP协…

geohash php_空间索引-geohash算法实现

算法简介geohash是实现空间索引的一种算法,其他实现空间索引的算法有:R树和其变种GIST树、四叉树、网格索引等算法基本原理geohash算法将地球理解为一个二维平面,将平面递归分解成更小的子块,每个子块在一定经纬度范围内拥有相同的编码,这种方…

ActiveReports 报表控件V12新特性 -- 新增JSON和CSV导出

ActiveReports 报表控件V12新特性 -- 新增JSON和CSV导出 ActiveReports 是一款专注于 .NET 平台的报表控件,全面满足 HTML5 / WinForms / ASP.NET / ASP.NET MVC / WPF 等平台下报表设计和开发工作需求,作为专业的报表工具为全球超过 300,000 开发人员提…

php imap配置,怎么为PHP编译imap扩展?

为PHP编译imap扩展的方法:首先安装“imap-open2007e”;然后下载源代码;接着准备好系统的“imap-open”环境;最后进入“./ext/extension/imap/”文件夹下执行“make”命令即可。怎么为PHP编译imap扩展?最近为项目增加了…

vmware安装minimal centos报错/etc/rc5.d/s99local : line

2019独角兽企业重金招聘Python工程师标准>>> 有人用vmware安装minimal centos报错/etc/rc5.d/s99local : line:25 : eject : command not found 。我们看下完整报错内容: Installing VMware Tools, please wait...mount: special device /dev/hda does n…

后缀树(Suffix Trie)子串匹配结构

Suffix Trie 又称后缀Trie或后缀树。它与Trie树的最大不同在于,后缀Trie的字符串集合是由指定字符串的后缀子串构成的。比如、完整字符串"minimize"的后缀子串组成的集合S分…

java中的线程和进程,Java | 线程和进程,创建线程

一、线程与进程线程定义进程中执行的一个代码段,来完成不同的任务组成:线程ID,当前指令指针(PC),寄存器集合(存储一部分正在执行线程的处理器状态的值)和堆栈进程定义执行的一段程序,一旦程序被载入到内存中准备执行就…

Maven的pom报错的解决方法

如果在MyEclipse里面导入项目,导入不了,如下图 接下来可以点击Import Maven Projects里的Action那一行Resolve Later. 点击Do Not Execute(add to pom)就可以正常导入了转载于:https://www.cnblogs.com/JimmySeraph/p/8068299.html

django零开始

2019独角兽企业重金招聘Python工程师标准>>> 安装。。。后查看 import django django.VERSION #输出版本号,目前自己是py2.7.9和django1.8 1,新建一个django-project django-admin.py startproject project-name 一个project一般为一个项目 …

关于Python3.6下登陆接口的尝试

编者按:README:此代码为用户登陆界面,添加了寻求帮助选项。1.学习了基本数据类型,string, int,以及while循环,continue, break, if, elif, else条件语句,“x".format(x)变量替代…

php 命令安装tp5,tp5.1框架的下载与安装方法步骤(图文)

大家可以都知道啊,tp框架5.1之前的版本都是可以在thinkphp的官网进行下载压缩包来安装框架的,那么在从tp5.1开始啊,就取消了下载压缩包安装的方法,那么我们如何进行下载呢?tp5.1的手册中开始就有提到tp5.1框架有两种安…

[连载]《C#通讯(串口和网络)框架的设计与实现》- 13.中英文版本切换设计

目 录 第十三章 中英文版本切换设计... 2 13.1 不用自带的资源文件的理由... 2 13.2 配置文件... 2 13.3 语言管理类... 3 13.4 应用管理类... 12 13.5 小结... 12 第十三章 中英文版本切换设计 13.1 不用自带的资源文件…

Mybaitis JdbcType 和javaType

2019独角兽企业重金招聘Python工程师标准>>> MyBatis 通过包含的jdbcType类型 BIT FLOAT CHAR TIMESTAMP OTHER UNDEFINEDTINYINT REAL VARCHAR BINARY BLOB NVARCHARSMALLINT DOUBLE …

php数据趋势曲线,数据曲线图怎么做

数据曲线图怎么做?1、在电脑桌面上,新建一个excel文件(操作过程即为点击右键,在选项中选择“新建”选项,然后再选择“excel文件”,即可成功新建excel文件了)2、双击将新建的excel文件打开,输入你需要统计制…

ceph 分布式存储安装

[rootlocalhost ~]# rm -rf /etc/yum.repos.d/*.repo 下载阿里云的base源 [rootlocalhost ~]# wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo [rootlocalhost ~]# wget -O /etc/yum.repos.d/epel.repo http://mirrors.aliyun.com/r…

STM32学习之路-SysTick的应用(时间延迟)

开发板:奋斗V5 好~ 菜B要来搞实验了.. 前面已经说了SysTick的工作原理什么的了,这里就不说了.. 先来做第一个实验: 盗自奋斗的样例,嘿嘿, 用SysTick产生1ms的基准时间,产生中断,每秒闪烁一次(LED1 V6) (1)外围时钟初始化&#xf…

凡事预则立(Beta)

听说——凡事预则立 吸取之前alpha冲刺的经验教训,也为了这次的beta冲刺可以更好更顺利地进行,更是为了迎接我们的新成员玮诗。我们开了一次组内会议,进行beta冲刺的规划。 上一张我们的合照: 具体会议议程如下: 1、讨…

用Vue.js开发一个电影App的前端界面

我们要构建一个什么样的App? 我们大多数人使用在线流媒体服务(如Netflix)观看我们最喜欢的电影或者节目。这篇文章将重点介绍如何通过使用vue.js 2 建立一个类似风格的电影流媒体WEB交互界面(见上图)。 最终的产品可以…

eclipse 函数折叠展开

为什么80%的码农都做不了架构师?>>> 一、eclipse 代码块折叠显示 核查是否开启折叠功能全局folding(window->preference->Gerneral->Editor-Structured Text)右侧Appearance 勾选Enable folding检测对应源编辑(java/javaScript&…