skynet源码阅读5--协程调度模型

注:为方便理解,本文贴出的代码部分经过了缩减或展开,与实际skynet代码可能会有所出入。
    作为一个skynet actor,在启动脚本被加载的过程中,总是要调用skynet.start和skynet.dispatch的,前者在skynet-os中做一些初始化工作,设置消息的Lua回调,后者则注册针对某协议的解析回调。举个例子:

 1 local skynet = require "skynet"
 2 
 3 local function hello()
 4     skynet.ret(skynet.pack("Hello, World!"))
 5     print("hello OK")
 6 end
 7 
 8 skynet.start(function()
 9     skynet.dispatch("lua", function(session, address, cmd, ...)
10         assert(cmd == "hello")
11         hello()
12     end)
13 end)

    先是调用skynet.start注册初始化回调,在其中调用skynet.dispatch注册针对"lua"协议的解析回调。skynet的基本使用这里我们就不多说了,具体见官方文档。下面我们就从skynet.start(见skynet.lua)开始,逐一分析流程。

1 function skynet.start(start_func)
2     c.callback(skynet.dispatch_message)
3     skynet.timeout(0, function()
4         skynet.init_service(start_func)
5     end)
6 end

这里的c来自于require "skynet.core",它是在lua-skynet.c中注册的,如下: 

 1 int luaopen_skynet_core(lua_State *L) {
 2     luaL_checkversion(L);
 3 
 4     luaL_Reg l[] = {
 5         ...
 6         { "callback", _callback },
 7         { NULL, NULL },
 8     };
 9 
10     luaL_newlibtable(L, l);
11 
12     lua_getfield(L, LUA_REGISTRYINDEX, "skynet_context");
13     struct skynet_context *ctx = lua_touserdata(L,-1);
14     if (ctx == NULL) {
15         return luaL_error(L, "Init skynet context first");
16     }
17 
18     luaL_setfuncs(L,l,1);
19 
20     return 1;
21 }

     可以看到,它注册了几个函数,并将skynet_context实例作为各函数的upvalue,方便调用时获取。skynet.start中调用c.callback,对应的就是lua-skynet.c中的_callback函数,skynet.dispatch_message回调就是它的参数:

 1 static int _callback(lua_State *L) {
 2     struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1));
 3     int forward = lua_toboolean(L, 2);
 4     luaL_checktype(L,1,LUA_TFUNCTION);
 5     lua_settop(L,1);
 6     lua_rawsetp(L, LUA_REGISTRYINDEX, _cb);
 7 
 8     lua_rawgeti(L, LUA_REGISTRYINDEX, LUA_RIDX_MAINTHREAD);
 9     lua_State *gL = lua_tothread(L,-1);
10 
11     if (forward) {
12         skynet_callback(context, gL, forward_cb);
13     } else {
14         skynet_callback(context, gL, _cb);
15     }
16 
17     return 0;
18 }

    可以看到,其以函数_cb为key,LUA回调(skynet.dispatch_message)作为value被注册到全局注册表中。skynet_callback(在skynet_server.c中)则设置函数指针_cb为C层面的消息处理函数:

1 void skynet_callback(struct skynet_context * context, void *ud, skynet_cb cb) {
2     context->cb = cb;
3     context->cb_ud = ud;
4 }

    先不关注skynet-os内部的线程调度细节,只需要知道,skynet-context接收到消息后会转发给context->cb处理,也就是_cb函数。在_cb中,从全局表中取到关联的LUA回调,将type, msg, sz, session, source压栈调用:

 1 static int _cb(struct skynet_context * context, void * ud, int type, int session, uint32_t source, const void * msg, size_t sz) {
 2     lua_State *L = ud;
 3     int trace = 1;
 4     int r;
 5     int top = lua_gettop(L);
 6     if (top == 0) {
 7         lua_pushcfunction(L, traceback);
 8         lua_rawgetp(L, LUA_REGISTRYINDEX, _cb);
 9     } else {
10         assert(top == 2);
11     }
12     lua_pushvalue(L,2);
13 
14     lua_pushinteger(L, type);
15     lua_pushlightuserdata(L, (void *)msg);
16     lua_pushinteger(L,sz);
17     lua_pushinteger(L, session);
18     lua_pushinteger(L, source);
19 
20     r = lua_pcall(L, 5, 0 , trace);
21 
22     if (r == LUA_OK) {
23         return 0;
24     }
25 }

    此时调用流程正式转到skynet.lua中的skynet.dispatch_message: 

1 function skynet.dispatch_message(...)
2     local succ, err = pcall(raw_dispatch_message,...)
3     while true do
4         local key,co = next(fork_queue)
5         fork_queue[key] = nil
6         pcall(suspend,co,coroutine_resume(co))
7     end
8 end

    首先是将msg交由raw_dispatch_message作分发,然后开始处理fork_queue中缓存的fork协程:

        pcall(suspend, co, coroutine_resume(co))

    这行代码是我们今天关注的重点。在继续之前,我假设你对lua的协程有一定的了解,了解coroutine.resume,coroutine.yield的基本用法。coroutine就是lua里的线程,它拥有自己的函数栈,但与我们平常接触的大多数操作系统里的线程不同,是非抢占式的。skynet对lua的coroutine作了封装(详见lua-profile.c),主要是增加了starttime和totaltime的监测,最终还是交由lua的coroutine库来处理的。既然这里分析到了fork_queue,那我们就先以skynet.fork为例,看看它作了什么:

1 function skynet.fork(func,...)
2     local args = table.pack(...)
3     local co = co_create(function()
4         func(table.unpack(args,1,args.n))
5     end)
6     table.insert(fork_queue, co)
7     return co
8 end

    skynet.fork做的事情很简单,通过co_create创建一个coroutine并将其入队fork_queue。看看co_create是如何创建协程的:

 1 local function co_create(f)
 2     local co = table.remove(coroutine_pool)
 3     if co == nil then
 4         co = coroutine.create(function(...)
 5             f(...)
 6             while true do
 7                 f = nil
 8                 coroutine_pool[#coroutine_pool+1] = co
 9                 f = coroutine_yield "EXIT"
10                 f(coroutine_yield())
11             end
12         end)
13     else
14         coroutine_resume(co, f)
15     end
16     return co
17 end

    调用co_create时,如果coroutine_pool为空,它会创建一个新的co。co在第一次被resume时,会执行f,接着便进入一个使用和回收的无限循环。在这个循环中,先是收回co到coroutine_pool中,接着便yield "EXIT"到上一次的resume点A。当下一次被resume在点B唤醒时,会先将函数f传递过来,接着再次yield到点B,等待下一次在点D被resume唤醒时,传递需要的参数过来加以执行,完毕后回收,如此反复。这样看来,co的执行似乎相当简单。但是实际上要复杂一些,因为在执行f的过程中,可以再反复地yield和resume。下面我们举个简单的例子: 

1     skynet.fork(function()
2         print ("skynet fork: <1>")
3         skynet.sleep(10)
4         print ("skynet fork: <2>")
5     end)

     我们把skynet.sleep展开: 

1     skynet.fork(function()
2         print ("skynet fork: <1>")
3         coroutine_yield("SLEEP", COMMAND_TIMEOUT(10))
4         print ("skynet fork: <2>")
5     end)

     下面开始分析调用流程。fork-co入队,主co在skynet.dispatch_message中分发消息后取出fork-co,调用resume开始进入fork-co的函数f执行,如下图所示,如果fork-co是第一次执行,是走圈1,如果是复用,则走圈1*(如果是复用的话,调用co_create时,会先coroutine_resume(co,f)一次进入fork-co,将用户函数传递给while循环中的coroutine_yield "EXIT"点之后,接着fork-co再次yield让出,等待实际传参的调用)。接着进入用户函数,COMMAND_TIMEOUT会先向skynet-kernal发送TIMEOUT命令,如圈2所示。然后yield "SLEEP"到主co的resume点1之后继续执行,如圈3所示,按圈4的指向,调用suspend进入"SLEEP"分支,记录下TIMEOUT-session与fork-co的映射关系。此时主co回到skynet.dispatch_message中继续下一个fork-co的处理。当TIMEOUT消息回来时,会由主co再次进入skynet.dispatch_message并调用raw_dispatch_message分发,这时通过session拿到之前映射的fork-co,再次resume,按照圈5的指向,会跳转到fork-co的yield "SLEEP"点之后继续向下处理。用户函数处理完毕后,回到上层调用,即圈6所指,回收fork-co,接着yield "EXIT"到主co所在raw_dispatch_message中的resume点之后,如圈7所示。进入suspend后,无额外命令,raw_dispatch_message处理结束,继续主co的消息处理流程。

    由以上分析可以看到,实际的协程跳转过程是比较复杂的,也更显得小小的LUA在skynet中的精巧运用。为方便理解,顺便贴出suspend的代码(只列出了我们关注的几个命令,并做了删减): 

 1 function suspend(co, result, command, param, size)
 2     if command == "CALL" then
 3         session_id_coroutine[param] = co
 4     elseif command == "SLEEP" then
 5         session_id_coroutine[param] = co
 6         sleep_session[co] = param
 7     elseif command == "RETURN" then
 8         local co_session = session_coroutine_id[co]
 9         local co_address = session_coroutine_address[co]
10         session_response[co] = true
11         c.send(co_address, skynet.PTYPE_RESPONSE, co_session, param, size)
12         return suspend(co, coroutine_resume(co, ret))
13     elseif command == "EXIT" then
14         -- coroutine exit
15         local address = session_coroutine_address[co]
16         session_coroutine_id[co] = nil
17         session_coroutine_address[co] = nil
18         session_response[co] = nil
19     elseif command == nil then
20         -- debug trace
21         return
22     end
23 end

    看完skynet.fork,我们再回过头来,看一看skynet.dispatch_message中消息分发raw_dispatch_message的具体细节:

 1 local function raw_dispatch_message(prototype, msg, sz, session, source)
 2     -- skynet.PTYPE_RESPONSE = 1, read skynet.h
 3     if prototype == 1 then
 4         local co = session_id_coroutine[session]
 5         session_id_coroutine[session] = nil
 6         suspend(co, coroutine_resume(co, true, msg, sz))
 7     else
 8         local p = proto[prototype]
 9         local f = p.dispatch
10         local co = co_create(f)
11         session_coroutine_id[co] = session
12         session_coroutine_address[co] = source
13         suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz)))
14     end
15 end

    RESPONSE的处理先就不说了,在叙述skynet.sleep时已经有所讨论。消息会根据它的prototype查找proto,接着调用co_create取得协程user-co,将message解包后resume到user-co进入用户函数f。而后续的流程则与上文我们讨论的fork是一样的了。比如,在f中调用skynet.call时,会向目标发送消息,接着会yield到主co,回到这里的resume点,接着进入suspend的"CALL"分支,记录session与user-co的映射关系。下一次response消息回来时,会查找到user-co并resume唤醒,在skynet.call后继续执行,用户函数f结束后进入上层调用,回收user-co,等待新的调用。

    因为本篇我们关注的是协程调度模型,而非具体的处理细节,因此有空再对skynet.call,skynet.ret等作详细的细节分析。

转载于:https://www.cnblogs.com/Jackie-Snow/p/6687651.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/282390.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ASP.NET Core GRPC 和 Dubbo 互通

一.前言Dubbo 是比较流行的服务治理框架&#xff0c;国内不少大厂都在使用。以前的 Dubbo 使用的是私有协议&#xff0c;采集用的 hessian 序列化&#xff0c;对于多语言生态来说是极度的不友好。现在 Dubbo 发布了新版本 v3&#xff0c;推出了基于 gRPC 的新协议 Triple&#…

详解C# 迭代器

[引用&#xff1a;https://www.cnblogs.com/yangecnu/archive/2012/03/17/2402432.html] 迭代器模式是设计模式中行为模式(behavioral pattern)的一个例子&#xff0c;他是一种简化对象间通讯的模式&#xff0c;也是一种非常容易理解和使用的模式。简单来说&#xff0c;迭代器模…

利用redis List队列简单实现秒杀 PHP代码实现

一 生产者producer部分 --------------------------------producer 部分注释------------------------------------------------------------ 用户在页面请求之后, 获取到用户uid , 跳转到这个加入队列的方法 (这里直接在producer中模拟了多个uid) 在方法内部判断redis队列长…

使用Filezilla 与 linux远程服务器传输文件时,设置默认打开编辑器

1. 点击编辑 2. 选择设置&#xff0c;点击文本编辑 3. 设置编辑器目录 4. 确定作用&#xff1a; 这样设置之后&#xff0c;可以实现在远程站点栏直接下载并使用phpstorm编辑的作用 正常需要下载之后&#xff0c;再去本地相应下载目录打开&#xff0c;然后再进行上传文件&#x…

在 .NET 中使用 FluentValidation 进行参数验证

不用说&#xff0c;参数验证很重要&#xff0c;无效的参数&#xff0c;可能会导致程序的异常。如果使用Web API或MVC页面&#xff0c;那么可能习惯了自带的规则验证&#xff0c;我们的控制器很干净&#xff1a;public class User {[Required]public string FirstName { get; se…

在win10系统下怎样快速切换任务视图

2019独角兽企业重金招聘Python工程师标准>>> 切换窗口&#xff1a;Alt Tab 任务视图&#xff1a;Win Tab (松开键盘界面不会消失) 切换任务视图&#xff1a;Win Ctrl 左/右 创建新的虚拟桌面&#xff1a;Win Ctrl D 关闭当前虚拟桌面&#xff1a;Win Ctrl F4…

Linux上搭建Samba,实现windows与Linux文件数据同步

一 环境介绍 1. 本地win10 2. Linux (centos7.4) 注&#xff1a;因为运营商方面禁止smb协议&#xff0c;导致无法在云服务器上使用smb&#xff0c;如果不是在虚拟机上操作&#xff0c;而是在云服务器上操作&#xff0c;建议还是使用 filezillaxshell组合 或者 使用finalshell等…

A5-1和DES两个加密算法的学习

A5-1加密算法 1、基本原理 A5-1加密算法是一种流password&#xff0c;通过密钥流对明文进行加密。然后用密钥流进行对密文的解密操作。 这样的算法主要用于GSM加密。也就是我们平时打电话的时候。通信数据发送到基站&#xff0c;基站发送到还有一个基站&#xff0c;基站发送到接…

从0到1简易区块链开发手册V0.3-数据持久化与创世区块

Author: brucefeng Email: brucefengbrucefeng.com 编程语言:Golang 1.BoltDB简介 Bolt是一个纯粹Key/Value模型的程序。该项目的目标是为不需要完整数据库服务器&#xff08;如Postgres或MySQL&#xff09;的项目提供一个简单&#xff0c;快速&#xff0c;可靠的数据库。 Bolt…

ELK之elasticsearch5.6的安装和head插件的安装

这里选择的elasticsearch为5.6的新版本&#xff0c;根据官方文档有几种暗装方式&#xff1a; https://www.elastic.co/guide/en/elasticsearch/reference/current/install-elasticsearch.html 这里选择rpm包安装https://www.elastic.co/guide/en/elasticsearch/reference/curre…

Nginx 基础(一)

一 、Nginx简述 Nginx是一个开源、高性能、可靠的HTTP中间件、代理服务。二 、常见的HTTP服务 1. HTTPD-Apache基金会 2. IIS-微软 3. GWS-Google 4. Nginx三、为什么选择Nginx 原因一&#xff1a;IO多路复用epoll &#xff08;主要解决了并发性的问题&#xff09; 注1&#xf…

ASP.NET Core高性能服务器HTTP.SYS

如果我们只需要将ASP.NET CORE应用部署到Windows环境下&#xff0c;并且希望获得更好的性能&#xff0c;那么我们选择的服务器类型应该是HTTP.SYS。Windows环境下任何针对HTTP的网络监听器/服务器在性能上都无法与HTTP.SYS比肩。[本文节选《ASP.NET Core 6框架揭秘》第18章]一、…

Nginx 基础 ( 二)

一、HTTP请求 http请求包括客户端请求服务端 以及 服务端响应数据回客户端&#xff0c;如下 请求&#xff1a;包括请求行、请求头部、请求数据 响应&#xff1a;包括状态行、消息报头、响应正文 比如在Linux中curl请求网站获取请求信息和响应信息 curl -v http://www.kugou.com…

《金融行业应用解决方案白皮书》发布,金融自主创新未来可期!

日前&#xff0c;以“聚势赋能 行业共创”为主题的金融行业解决方案发布会在线上举行。麒麟软件发布《金融行业应用解决方案白皮书》&#xff0c;并发起成立“金融机具生态圈俱乐部”&#xff0c;助力金融行业用户高质量发展。金融信息系统曾经被国外厂商垄断金融信息系统作为国…

leetcode53 Maximum Subarray 最大连续子数组

题目要求 Find the contiguous subarray within an array (containing at least one number) which has the largest sum.For example, given the array [-2,1,-3,4,-1,2,1,-5,4], the contiguous subarray [4,-1,2,1] has the largest sum 6.即&#xff1a;寻找数列中的一个子…

详解go语言的array和slice 【二】

上一篇 详解go语言的array和slice 【一】已经讲解过,array和slice的一些基本用法&#xff0c;使用array和slice时需要注意的地方&#xff0c;特别是slice需要注意的地方比较多。上一篇的最后讲解到创建新的slice时使用第三个索引来限制slice的容量&#xff0c;在操作新slice时…

详解Objective-C的meta-class

2019独角兽企业重金招聘Python工程师标准>>> 比较简单的一篇英文&#xff0c;重点是讲解meta-class。翻译下&#xff0c;加深理解。 原文标题&#xff1a;What is a meta-class in Objective-C? 原文地址&#xff1a;http://www.cocoawithlove.com/2010/01/what-is…

十倍程序员 | 使用 Source Generator 将 JSON 转换成 C# 类

前言有时候&#xff0c;我们需要将通过 WebAPI 接收 JSON 字符串转换成 C# 代码。Visual Studio 提供了一个功能菜单可以轻松实现&#xff1a;执行完成后&#xff0c;它会将生成的代码放在打开的的代码窗口中。但是&#xff0c;如果有多个 JSON 字符串需要转换&#xff0c;这个…

微软Microsoft Azure 机器学习工作室的案例之Image Classification using DenseNet

点击上方蓝字关注我们&#xff08;本文阅读时间&#xff1a;10分钟)Microsoft Azure Machine Learning Studio是微软强大的机器学习平台&#xff0c;在设计器中&#xff0c;微软内置了15个场景案例&#xff0c;但网上似乎没有对这15个案例深度刨析的分析资料&#xff0c;所以我…

音乐分类

代码&#xff1a; 1 import numpy as np2 from scipy import fft3 from scipy.io import wavfile4 from sklearn.linear_model import LogisticRegression5 import random6 """7 使用logistic regression处理音乐数据&#xff0c;音乐数据训练样本的获得是使…