workflow源码解析:ThreadTask

1、使用程序,一个简单的加法运算程序

#include <iostream>
#include <workflow/WFTaskFactory.h>
#include <errno.h>// 直接定义thread_task三要素
// 一个典型的后端程序由三个部分组成,并且完全独立开发。即:程序=协议+算法+任务流。// 定义INPUT
struct AddInput
{int x;int y;
};// 定义OUTPUT
struct AddOutput
{int res;
};// 加法流程
void add_routine(const AddInput *input, AddOutput *output)
{output->res = input->x + input->y;
}using AddTask = WFThreadTask<AddInput, AddOutput>;void callback(AddTask *task)
{auto *input = task->get_input();auto *output = task->get_output();assert(task->get_state() == WFT_STATE_SUCCESS);fprintf(stderr, "%d + %d = %d\n", input->x, input->y, output->res);
}int main()
{using AddFactory = WFThreadTaskFactory<AddInput, AddOutput>;AddTask *task = AddFactory::create_thread_task("add_task",add_routine,callback);AddInput *input = task->get_input();input->x = 1;input->y = 2;task->start();getchar();return 0;
}

2、类继承关系

WFThreadTaskFactory代码

// src/factory/WFTaskFactory.h
template<class INPUT, class OUTPUT>
class WFThreadTaskFactory
{
private:using T = WFThreadTask<INPUT, OUTPUT>;...
public:static T *create_thread_task(const std::string& queue_name,std::function<void (INPUT *, OUTPUT *)> routine,std::function<void (T *)> callback);...
};
// src/factory/WFTaskFactory.inl
template<class INPUT, class OUTPUT>
WFThreadTask<INPUT, OUTPUT> *
WFThreadTaskFactory<INPUT, OUTPUT>::create_thread_task(const std::string& queue_name,std::function<void (INPUT *, OUTPUT *)> routine,std::function<void (WFThreadTask<INPUT, OUTPUT> *)> callback)
{return new __WFThreadTask<INPUT, OUTPUT>(WFGlobal::get_exec_queue(queue_name),WFGlobal::get_compute_executor(),std::move(routine),std::move(callback));
}

__WFThreadTask代码

// src/factory/WFTaskFactory.inl
template<class INPUT, class OUTPUT>
class __WFThreadTask : public WFThreadTask<INPUT, OUTPUT>
{
protected:virtual void execute()  //实现ExecSession的纯虚函数{this->routine(&this->input, &this->output); //执行用户程序的routine}protected:std::function<void (INPUT *, OUTPUT *)> routine;public:__WFThreadTask(ExecQueue *queue, Executor *executor,std::function<void (INPUT *, OUTPUT *)>&& rt,std::function<void (WFThreadTask<INPUT, OUTPUT> *)>&& cb) :WFThreadTask<INPUT, OUTPUT>(queue, executor, std::move(cb)),routine(std::move(rt)){}
};

WFThreadTask代码

// src/factory/WFTask.h
template<class INPUT, class OUTPUT>
class WFThreadTask : public ExecRequest
{
public:void start();void dismiss();INPUT *get_input() { return &this->input; }OUTPUT *get_output() { return &this->output; }void *user_data;int get_state() const { return this->state; }int get_error() const { return this->error; }void set_callback(std::function<void (WFThreadTask<INPUT, OUTPUT> *)> cb);
protected:virtual SubTask *done();protected:INPUT input;OUTPUT output;std::function<void (WFThreadTask<INPUT, OUTPUT> *)> callback;public:WFThreadTask(ExecQueue *queue, Executor *executor,std::function<void (WFThreadTask<INPUT, OUTPUT> *)>&& cb) :ExecRequest(queue, executor),callback(std::move(cb)){// 初始化}protected:virtual ~WFThreadTask() { }
};

ExecRequest代码

// src/kernel/ExecRequest.h
class ExecRequest : public SubTask, public ExecSession
{
public:ExecRequest(ExecQueue *queue, Executor *executor);ExecQueue *get_request_queue() const { return this->queue; }void set_request_queue(ExecQueue *queue) { this->queue = queue; }virtual void dispatch()  // 实现SubTask的纯虚函数,这个纯虚函数主要是任务的开始执行接口{this->executor->request(this, this->queue);...}protected:int state;int error;ExecQueue *queue;Executor *executor;protected:virtual void handle(int state, int error); // 实现ExecSession的纯虚函数
};

SubTask代码

class SubTask
{// 子任务被调起的时机virtual void dispatch() = 0;// 子任务执行完成的时机virtual SubTask *done() = 0;// 内部实现,决定了任务流走向void subtask_done();...
};

ExecSession代码

/src/kernel/Executor.h
class ExecSession
{
private:virtual void execute() = 0;virtual void handle(int state, int error) = 0;protected:ExecQueue *get_queue() { return this->queue; }private:ExecQueue *queue;...
};

继承关系图

__WFThreadTask__目前还未用到,暂不清楚

在这里插入图片描述

3、两个重要成员: ExecQueue, Executor

ExecQueue代码

/src/kernel/Executor.h
class ExecQueue
{...
private:struct list_head task_list;pthread_mutex_t mutex;
};

Executor代码

/src/kernel/Executor.h
class Executor
{
public:// 一次要执行的接口,对于线程执行器来说,就是把一个执行任务扔进某个队列中int request(ExecSession *session, ExecQueue *queue);private:// 执行器和系统资源,是一个包含关系thrdpool_t *thrdpool;
};

request() 函数把任务扔进线程池队列等待执行,线程池会从队列拿到这个任务,然后执行executor_thread_routine

// src/kernel/Executor.cc
int Executor::request(ExecSession *session, ExecQueue *queue)
{ExecSessionEntry *entry = new ExecSessionEntry;session->queue = queue;entry->session = session;entry->thrdpool = this->thrdpool;queue->mutex.lock();list_add_tail(&entry->list, &queue->session_list);if (queue->session_list.next == &entry->list){struct thrdpool_task task = {Executor::executor_thread_routine, queue};/*{.routine	=	Executor::executor_thread_routine,.context	=	queue};*/if (thrdpool_schedule(&task, this->thrdpool) < 0){list_del(&entry->list);delete entry;entry = NULL;}}queue->mutex.unlock();return -!entry;
}
struct ExecSessionEntry
{struct list_head list;ExecSession *session;thrdpool_t *thrdpool;
};
// src/kernel/Executor.cc
void Executor::executor_thread_routine(void *context)
{ExecQueue *queue = (ExecQueue *)context;ExecSessionEntry *entry;ExecSession *session;queue->mutex.lock();entry = list_entry(queue->session_list.next, ExecSessionEntry, list);list_del(&entry->list);session = entry->session;if (!list_empty(&queue->session_list)){struct thrdpool_task task = {Executor::executor_thread_routine, queue};/*{.routine	=	Executor::executor_thread_routine,.context	=	queue};*/__thrdpool_schedule(&task, entry, entry->thrdpool);}elsedelete entry;queue->mutex.unlock();session->execute(); //这里会执行到用户routinesession->handle(ES_STATE_FINISHED, 0);
}

4、参考链接

https://github.com/chanchann/workflow_annotation/blob/main/src_analysis/12_thread_task.md
https://blog.csdn.net/j497205974/article/details/135554164?spm=1001.2014.3001.5502

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/628271.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【.NET Core】C#预处理器指令

【.NET Core】C#预处理器指令 文章目录 【.NET Core】C#预处理器指令一、概述二、可为空上下文&#xff08;#nullable&#xff09;三、条件编译2.1 定义DEBUG是编译代码2.2 未定义MYTEST时&#xff0c;将编译以下代码 四、定义符号五、定义区域六、错误和警告信息 一、概述 预…

MySQL面试题 | 07.精选MySQL面试题

&#x1f90d; 前端开发工程师&#xff08;主业&#xff09;、技术博主&#xff08;副业&#xff09;、已过CET6 &#x1f368; 阿珊和她的猫_CSDN个人主页 &#x1f560; 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 &#x1f35a; 蓝桥云课签约作者、已在蓝桥云…

跟着cherno手搓游戏引擎【6】ImGui和ImGui事件

导入ImGui&#xff1a; 下载链接&#xff1a; GitHub - TheCherno/imgui: Dear ImGui: Bloat-free Immediate Mode Graphical User interface for C with minimal dependencies 新建文件夹&#xff0c;把下载好的文件放入对应路径&#xff1a; SRC下的premake5.lua文件&#…

k8s 存储卷和pvc,pv

存储卷---数据卷 容器内的目录和宿主机的目录进行挂载。 容器在系统上的生命周期是短暂的&#xff0c;deletek8s用控制器创建的pod&#xff0c;delete相当于重启&#xff0c;容器的状态也会回复到初始状态。 一旦回到初始状态&#xff0c;所有的后天编辑的文件的都会消失。 …

SpringBoot教程(十六) | SpringBoot集成swagger(全网最全)

SpringBoot教程(十六) | SpringBoot集成swagger&#xff08;全网最全&#xff09; 一. 接口文档概述 swagger是当下比较流行的实时接口文文档生成工具。接口文档是当前前后端分离项目中必不可少的工具&#xff0c;在前后端开发之前&#xff0c;后端要先出接口文档&#xff0c…

Mysql-redoLog

Redo Log redo log进行刷盘的效率要远高于数据页刷盘,具体表现如下 redo log体积小,只记录了哪一页修改的内容,因此体积小,刷盘快 redo log是一直往末尾进行追加,属于顺序IO。效率显然比随机IO来的快Redo log 格式 在MySQL的InnoDB存储引擎中,redo log(重做日志)被用…

C++ 输入用户名和密码 防止注入攻击

1、问题解释&#xff1a;注入攻击 &#xff0c;无密码直接登录数据库 可视化展示 1.1、当你的数据库为&#xff1a;其中包含三个字段id user 以及md5密码 1.2、在使用C堆数据库信息进行访问的时候&#xff0c;使用多条语句进行查询 string sql "select id from t_user…

Spring自带分布式锁你用过吗?

环境&#xff1a;SpringBoot2.7.12 本篇文章将会为大家介绍有关spring integration提供的分布式锁功能。 1. 简介 Spring Integration 是一个框架&#xff0c;用于构建事件驱动的应用程序。在 Spring Integration 中&#xff0c;LockRegistry 是一个接口&#xff0c;用于管理…

使用Postman测试WebService接口

文章目录 使用Postman测试WebService接口1. 访问wsdl地址2. Postman配置1. URL及Headers设置2. Body设置3. 响应结果 使用Postman测试WebService接口 1. 访问wsdl地址 接口地址如&#xff1a;http://localhost:8101/ws/hello?wsdl 2. Postman配置 1. URL及Headers设置 2. B…

跟着小德学C++之数据库基础

嗨&#xff0c;大家好&#xff0c;我是出生在达纳苏斯的一名德鲁伊&#xff0c;我是要立志成为海贼王&#xff0c;啊不&#xff0c;是立志成为科学家的德鲁伊。最近&#xff0c;我发现我们所处的世界是一个虚拟的世界&#xff0c;并由此开始&#xff0c;我展开了对我们这个世界…

【揭秘】sleep()、wait()、park()三种休眠方式的终极对比

在Java中&#xff0c;线程休眠的三种方式包括Thread.sleep、Object.wait和LockSupport.park。Thread.sleep使线程在指定时间后进入休眠&#xff0c;状态为TIMEDWAITING&#xff0c;不会释放锁。Object.wait需在对象锁的保护下调用&#xff0c;会释放该对象的锁&#xff0c;使线…

leetcode 动态规划(单词拆分)

139.单词拆分 力扣题目链接(opens new window) 给定一个非空字符串 s 和一个包含非空单词的列表 wordDict&#xff0c;判定 s 是否可以被空格拆分为一个或多个在字典中出现的单词。 说明&#xff1a; 拆分时可以重复使用字典中的单词。 你可以假设字典中没有重复的单词。 …

图解智慧:数据可视化如何助你高效洞悉信息?

在信息爆炸的时代&#xff0c;数据扮演着越来越重要的角色&#xff0c;而数据可视化则成为解读和理解海量数据的得力工具。那么&#xff0c;数据可视化是如何帮助我们高效了解数据的呢&#xff1f;下面我就以可视化从业者的角度来简单聊聊这个话题。 无需深奥的专业知识&#x…

第1章 数据结构与算法介绍

文章目录 1.1 数据结构和算法内容介绍1.1.1 先看几个经典的算法面试题1.1.2 数据结构和算法的重要性1.1.3 本套数据结构和算法内容介绍1.1.4 课程亮点和授课方式 1.1 数据结构和算法内容介绍 1.1.1 先看几个经典的算法面试题  字符串匹配问题&#xff1a;&#xff1a; 有一…

wpf使用Popup封装数据筛选框

(关注博主后,在“粉丝专栏”,可免费阅读此文) 类似于DevExpress控件的功能 这是DevExpress的winform筛选样式,如下: 这是DevExpress的wpf筛选样式,如下: 这是Excel的筛选样式,如下: 先看效果 本案例使用wpf原生控件封装,功能基本上都满足,只是颜色样式没有写…

为何我选择山海鲸可视化:五大优势解析

在众多的可视化产品中&#xff0c;我选择了山海鲸可视化&#xff0c;这并非偶然。在对比了其他同类产品后&#xff0c;我发现山海鲸可视化具有许多独特的优势和特点&#xff0c;使得它成为了我心目中的理想选择。下面我简单说一下我选择这款产品的几大原因&#xff0c;希望对在…

最新国内可用GPT4、Midjourney绘画、DALL-E3文生图模型教程

一、前言 ChatGPT3.5、GPT4.0、GPT语音对话、Midjourney绘画&#xff0c;文档对话总结DALL-E3文生图&#xff0c;相信对大家应该不感到陌生吧&#xff1f;简单来说&#xff0c;GPT-4技术比之前的GPT-3.5相对来说更加智能&#xff0c;会根据用户的要求生成多种内容甚至也可以和…

【python 的各种模块】(9) 在python使用PIL( 即pillow模块 ) 修改图片

目录 1 导入PIL模块&#xff08;pillow&#xff09; 1.1 PIL的全称&#xff1a;Python Imaging Library 1.2 导入PIL模块 1.2.1 可用的导入形式 1.2.2 常用的导入形式 1.2.3 PIL下面的常用子模块 2 PIL.Image的方法 (读入&#xff0c;生成和显示图片) 2.1 用 PIL.Image…

蓝桥杯AcWing学习笔记 8-2数论的学习(下)

蓝桥杯 我的AcWing 题目及图片来自蓝桥杯C AB组辅导课 数论&#xff08;下&#xff09; 蓝桥杯省赛中考的数论不是很多&#xff0c;这里讲几个蓝桥杯常考的知识点。 约数个数定理 我们如何去求一个数的约数个数呢&#xff1f; N N N分解质因数的结果&#xff1a; N P 1 α…

HTML+CSS-02

阿里巴巴矢量图标库的使用 阿里巴巴网址矢量图标库网址 https://www.iconfont.cn/ 如何使用 选择需要的icon图标加入购物车下载代码 在将解压后的文件夹复制到项目中进入demo_index.html中打开就可以看到示例的三种用法 三种引入方法 Unicode 引用 Unicode 是字体在网页端…