workflow源码解析:ThreadTask

1、使用程序,一个简单的加法运算程序

#include <iostream>
#include <workflow/WFTaskFactory.h>
#include <errno.h>// 直接定义thread_task三要素
// 一个典型的后端程序由三个部分组成,并且完全独立开发。即:程序=协议+算法+任务流。// 定义INPUT
struct AddInput
{int x;int y;
};// 定义OUTPUT
struct AddOutput
{int res;
};// 加法流程
void add_routine(const AddInput *input, AddOutput *output)
{output->res = input->x + input->y;
}using AddTask = WFThreadTask<AddInput, AddOutput>;void callback(AddTask *task)
{auto *input = task->get_input();auto *output = task->get_output();assert(task->get_state() == WFT_STATE_SUCCESS);fprintf(stderr, "%d + %d = %d\n", input->x, input->y, output->res);
}int main()
{using AddFactory = WFThreadTaskFactory<AddInput, AddOutput>;AddTask *task = AddFactory::create_thread_task("add_task",add_routine,callback);AddInput *input = task->get_input();input->x = 1;input->y = 2;task->start();getchar();return 0;
}

2、类继承关系

WFThreadTaskFactory代码

// src/factory/WFTaskFactory.h
template<class INPUT, class OUTPUT>
class WFThreadTaskFactory
{
private:using T = WFThreadTask<INPUT, OUTPUT>;...
public:static T *create_thread_task(const std::string& queue_name,std::function<void (INPUT *, OUTPUT *)> routine,std::function<void (T *)> callback);...
};
// src/factory/WFTaskFactory.inl
template<class INPUT, class OUTPUT>
WFThreadTask<INPUT, OUTPUT> *
WFThreadTaskFactory<INPUT, OUTPUT>::create_thread_task(const std::string& queue_name,std::function<void (INPUT *, OUTPUT *)> routine,std::function<void (WFThreadTask<INPUT, OUTPUT> *)> callback)
{return new __WFThreadTask<INPUT, OUTPUT>(WFGlobal::get_exec_queue(queue_name),WFGlobal::get_compute_executor(),std::move(routine),std::move(callback));
}

__WFThreadTask代码

// src/factory/WFTaskFactory.inl
template<class INPUT, class OUTPUT>
class __WFThreadTask : public WFThreadTask<INPUT, OUTPUT>
{
protected:virtual void execute()  //实现ExecSession的纯虚函数{this->routine(&this->input, &this->output); //执行用户程序的routine}protected:std::function<void (INPUT *, OUTPUT *)> routine;public:__WFThreadTask(ExecQueue *queue, Executor *executor,std::function<void (INPUT *, OUTPUT *)>&& rt,std::function<void (WFThreadTask<INPUT, OUTPUT> *)>&& cb) :WFThreadTask<INPUT, OUTPUT>(queue, executor, std::move(cb)),routine(std::move(rt)){}
};

WFThreadTask代码

// src/factory/WFTask.h
template<class INPUT, class OUTPUT>
class WFThreadTask : public ExecRequest
{
public:void start();void dismiss();INPUT *get_input() { return &this->input; }OUTPUT *get_output() { return &this->output; }void *user_data;int get_state() const { return this->state; }int get_error() const { return this->error; }void set_callback(std::function<void (WFThreadTask<INPUT, OUTPUT> *)> cb);
protected:virtual SubTask *done();protected:INPUT input;OUTPUT output;std::function<void (WFThreadTask<INPUT, OUTPUT> *)> callback;public:WFThreadTask(ExecQueue *queue, Executor *executor,std::function<void (WFThreadTask<INPUT, OUTPUT> *)>&& cb) :ExecRequest(queue, executor),callback(std::move(cb)){// 初始化}protected:virtual ~WFThreadTask() { }
};

ExecRequest代码

// src/kernel/ExecRequest.h
class ExecRequest : public SubTask, public ExecSession
{
public:ExecRequest(ExecQueue *queue, Executor *executor);ExecQueue *get_request_queue() const { return this->queue; }void set_request_queue(ExecQueue *queue) { this->queue = queue; }virtual void dispatch()  // 实现SubTask的纯虚函数,这个纯虚函数主要是任务的开始执行接口{this->executor->request(this, this->queue);...}protected:int state;int error;ExecQueue *queue;Executor *executor;protected:virtual void handle(int state, int error); // 实现ExecSession的纯虚函数
};

SubTask代码

class SubTask
{// 子任务被调起的时机virtual void dispatch() = 0;// 子任务执行完成的时机virtual SubTask *done() = 0;// 内部实现,决定了任务流走向void subtask_done();...
};

ExecSession代码

/src/kernel/Executor.h
class ExecSession
{
private:virtual void execute() = 0;virtual void handle(int state, int error) = 0;protected:ExecQueue *get_queue() { return this->queue; }private:ExecQueue *queue;...
};

继承关系图

__WFThreadTask__目前还未用到,暂不清楚

在这里插入图片描述

3、两个重要成员: ExecQueue, Executor

ExecQueue代码

/src/kernel/Executor.h
class ExecQueue
{...
private:struct list_head task_list;pthread_mutex_t mutex;
};

Executor代码

/src/kernel/Executor.h
class Executor
{
public:// 一次要执行的接口,对于线程执行器来说,就是把一个执行任务扔进某个队列中int request(ExecSession *session, ExecQueue *queue);private:// 执行器和系统资源,是一个包含关系thrdpool_t *thrdpool;
};

request() 函数把任务扔进线程池队列等待执行,线程池会从队列拿到这个任务,然后执行executor_thread_routine

// src/kernel/Executor.cc
int Executor::request(ExecSession *session, ExecQueue *queue)
{ExecSessionEntry *entry = new ExecSessionEntry;session->queue = queue;entry->session = session;entry->thrdpool = this->thrdpool;queue->mutex.lock();list_add_tail(&entry->list, &queue->session_list);if (queue->session_list.next == &entry->list){struct thrdpool_task task = {Executor::executor_thread_routine, queue};/*{.routine	=	Executor::executor_thread_routine,.context	=	queue};*/if (thrdpool_schedule(&task, this->thrdpool) < 0){list_del(&entry->list);delete entry;entry = NULL;}}queue->mutex.unlock();return -!entry;
}
struct ExecSessionEntry
{struct list_head list;ExecSession *session;thrdpool_t *thrdpool;
};
// src/kernel/Executor.cc
void Executor::executor_thread_routine(void *context)
{ExecQueue *queue = (ExecQueue *)context;ExecSessionEntry *entry;ExecSession *session;queue->mutex.lock();entry = list_entry(queue->session_list.next, ExecSessionEntry, list);list_del(&entry->list);session = entry->session;if (!list_empty(&queue->session_list)){struct thrdpool_task task = {Executor::executor_thread_routine, queue};/*{.routine	=	Executor::executor_thread_routine,.context	=	queue};*/__thrdpool_schedule(&task, entry, entry->thrdpool);}elsedelete entry;queue->mutex.unlock();session->execute(); //这里会执行到用户routinesession->handle(ES_STATE_FINISHED, 0);
}

4、参考链接

https://github.com/chanchann/workflow_annotation/blob/main/src_analysis/12_thread_task.md
https://blog.csdn.net/j497205974/article/details/135554164?spm=1001.2014.3001.5502

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/628271.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python 循环结构之for循环结合range()函数使用技巧

在Python中for循环经常结合range()函数一起使用。 range()函数是一个内置函数&#xff0c;用于生成一个整数序列&#xff0c;通常与for循环结合使用。它的常见用法是生成一系列连续的整数&#xff0c;可以指定起始值、结束值和步长。 range()函数的语法如下&#xff1a; ran…

【.NET Core】C#预处理器指令

【.NET Core】C#预处理器指令 文章目录 【.NET Core】C#预处理器指令一、概述二、可为空上下文&#xff08;#nullable&#xff09;三、条件编译2.1 定义DEBUG是编译代码2.2 未定义MYTEST时&#xff0c;将编译以下代码 四、定义符号五、定义区域六、错误和警告信息 一、概述 预…

【深度学习】动手学深度学习(PyTorch版)李沐 2.4.3 梯度【公式推导】

2.4.3. 梯度 我们可以连接一个多元函数对其所有变量的偏导数&#xff0c;以得到该函数的梯度&#xff08;gradient&#xff09;向量。 具体而言&#xff0c;设函数 f : R n → R f:\mathbb{R}^{n}\to\mathbb{R} f:Rn→R的输入是一个 n n n维向量 x ⃗ [ x 1 x 2 ⋅ ⋅ ⋅ x n …

MySQL面试题 | 07.精选MySQL面试题

&#x1f90d; 前端开发工程师&#xff08;主业&#xff09;、技术博主&#xff08;副业&#xff09;、已过CET6 &#x1f368; 阿珊和她的猫_CSDN个人主页 &#x1f560; 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 &#x1f35a; 蓝桥云课签约作者、已在蓝桥云…

MyBatis-Plus代码生成器使用

这里写目录标题 第一章、添加依赖第二章、准备CodeGenerator类第三章、右键运行main方法 第一章、添加依赖 mybatis-plus-generator依赖和velocity-engine-core依赖和commons-lang3依赖加到pom文件里面&#xff0c;代码生成器会用到 <dependency><groupId>com.bao…

跟着cherno手搓游戏引擎【6】ImGui和ImGui事件

导入ImGui&#xff1a; 下载链接&#xff1a; GitHub - TheCherno/imgui: Dear ImGui: Bloat-free Immediate Mode Graphical User interface for C with minimal dependencies 新建文件夹&#xff0c;把下载好的文件放入对应路径&#xff1a; SRC下的premake5.lua文件&#…

基于STM32F103C8T6单片机的1秒定时器设计与应用

标题&#xff1a;基于STM32F103C8T6单片机的1秒定时器设计与应用 摘要&#xff1a; 本文主要探讨了如何在STM32F103C8T6微控制器上利用内部定时器实现精确的1秒钟定时功能&#xff0c;并通过实际项目实施&#xff0c;验证其稳定性和可靠性。首先介绍了STM32F103C8T6单片机的特…

k8s 存储卷和pvc,pv

存储卷---数据卷 容器内的目录和宿主机的目录进行挂载。 容器在系统上的生命周期是短暂的&#xff0c;deletek8s用控制器创建的pod&#xff0c;delete相当于重启&#xff0c;容器的状态也会回复到初始状态。 一旦回到初始状态&#xff0c;所有的后天编辑的文件的都会消失。 …

Nacos:发现微服务的未来

一、为什么要使用Nacos 在今天的数字化世界中&#xff0c;微服务架构已经成为软件开发的主流。这种架构风格将大型复杂软件拆分为一系列小型的、松耦合的服务&#xff0c;每个服务都可以独立地开发、测试、部署和扩展。然而&#xff0c;随着微服务数量的增长&#xff0c;管理…

SpringBoot教程(十六) | SpringBoot集成swagger(全网最全)

SpringBoot教程(十六) | SpringBoot集成swagger&#xff08;全网最全&#xff09; 一. 接口文档概述 swagger是当下比较流行的实时接口文文档生成工具。接口文档是当前前后端分离项目中必不可少的工具&#xff0c;在前后端开发之前&#xff0c;后端要先出接口文档&#xff0c…

202312 青少年软件编程(C/C++)等级考试试卷(三级)电子学会真题

202312 青少年软件编程&#xff08;C/C&#xff09;等级考试试卷&#xff08;三级&#xff09;电子学会真题 1.因子问题 题目描述 任给两个正整数N、M&#xff0c;求一个最小的正整数a&#xff0c;使得a和(M-a)都是N的因子。 输入 包括两个整数N、M。N不超过1,000,000。 …

Mysql-redoLog

Redo Log redo log进行刷盘的效率要远高于数据页刷盘,具体表现如下 redo log体积小,只记录了哪一页修改的内容,因此体积小,刷盘快 redo log是一直往末尾进行追加,属于顺序IO。效率显然比随机IO来的快Redo log 格式 在MySQL的InnoDB存储引擎中,redo log(重做日志)被用…

C++ 输入用户名和密码 防止注入攻击

1、问题解释&#xff1a;注入攻击 &#xff0c;无密码直接登录数据库 可视化展示 1.1、当你的数据库为&#xff1a;其中包含三个字段id user 以及md5密码 1.2、在使用C堆数据库信息进行访问的时候&#xff0c;使用多条语句进行查询 string sql "select id from t_user…

蓝桥杯基础知识5 unique()

蓝桥杯基础知识5 unique&#xff08;&#xff09; #include <bits/stdc.h>int main(){std::vector<int> vec {1,1,2,2,3,3,3,4,4,5};auto it std::unique(vec.begin(), vec.end());vec.erase(it, vec.end());//vec.erase(unique(vec.begin(),vec.end()),vec.end(…

力扣82-删除排序链表中的重复元素

删除排序链表中的重复元素 题目链接 解题思路 1.遇见相同的元素直接删除即可 2.链表的头部也可能是重复元素&#xff0c;所以需要一个哑节点res来指向链表的头节点 /*** Definition for singly-linked list.* struct ListNode {* int val;* ListNode *next;* List…

机器学习之集成学习概念介绍

概念 机器学习中的集成学习(Ensemble Learning)是一种通过组合多个模型来提高整体性能的技术。它的基本思想是将多个学习器(弱学习器)组合成一个更强大的学习器,以提高整体性能和泛化能力。集成学习可以在各种机器学习任务中使用,包括分类、回归和聚类。 核心 弱学习器…

Spring自带分布式锁你用过吗?

环境&#xff1a;SpringBoot2.7.12 本篇文章将会为大家介绍有关spring integration提供的分布式锁功能。 1. 简介 Spring Integration 是一个框架&#xff0c;用于构建事件驱动的应用程序。在 Spring Integration 中&#xff0c;LockRegistry 是一个接口&#xff0c;用于管理…

使用Postman测试WebService接口

文章目录 使用Postman测试WebService接口1. 访问wsdl地址2. Postman配置1. URL及Headers设置2. Body设置3. 响应结果 使用Postman测试WebService接口 1. 访问wsdl地址 接口地址如&#xff1a;http://localhost:8101/ws/hello?wsdl 2. Postman配置 1. URL及Headers设置 2. B…

跟着小德学C++之数据库基础

嗨&#xff0c;大家好&#xff0c;我是出生在达纳苏斯的一名德鲁伊&#xff0c;我是要立志成为海贼王&#xff0c;啊不&#xff0c;是立志成为科学家的德鲁伊。最近&#xff0c;我发现我们所处的世界是一个虚拟的世界&#xff0c;并由此开始&#xff0c;我展开了对我们这个世界…

【揭秘】sleep()、wait()、park()三种休眠方式的终极对比

在Java中&#xff0c;线程休眠的三种方式包括Thread.sleep、Object.wait和LockSupport.park。Thread.sleep使线程在指定时间后进入休眠&#xff0c;状态为TIMEDWAITING&#xff0c;不会释放锁。Object.wait需在对象锁的保护下调用&#xff0c;会释放该对象的锁&#xff0c;使线…