创建Subflow
DAG任务中,有一种常见的场景,一个任务可能在执行期间产生新的任务,然后紧接着执行新任务。 之前提到的静态图就没有办法实现这样一个功能了,所以Taskflow提供了另一种流的节点:Subflow,Subflow的API与Taskflow无异,但又可以作为Taskflow的一个节点。
比如描述如下依赖图:
#include <memory>
#include <taskflow/taskflow.hpp>
int main() {tf::Executor executor; tf::Taskflow taskflow;tf::Task A = taskflow.emplace([] () {}).name("A"); // static task Atf::Task C = taskflow.emplace([] () {}).name("C"); // static task Ctf::Task D = taskflow.emplace([] () {}).name("D"); // static task D// 通过lambda创建subflow// 开始执行的时候,会创建一个subflow,然后通过引用传给lambda// 只有当本subflow执行完成之后,才会执行taskflowtf::Task B = taskflow.emplace([] (tf::Subflow& subflow) { tf::Task B1 = subflow.emplace([] () {}).name("B1"); // subflow task B1tf::Task B2 = subflow.emplace([] () {}).name("B2"); // subflow task B2tf::Task B3 = subflow.emplace([] () {}).name("B3"); // subflow task B3B1.precede(B3); // B1 runs bofore B3B2.precede(B3); // B2 runs before B3}).name("B");A.precede(B); // B runs after AA.precede(C); // C runs after AB.precede(D); // D runs after BC.precede(D); // D runs after Ctaskflow.dump(std::cout); // 在执行前,subflow无法展开,subflow只会显示节点Bexecutor.run(taskflow).get(); // execute the graph to spawn the subflowtaskflow.dump(std::cout); // 执行完毕后,才可以完全展开return 0;
}
在run之前dump,subflow只会被当作普通节点:
在run之后调用,subflow被展开,得到真正的依赖图:
Join a Subflow
Subflow 在离开其上下文时默认调用join,表示需要把subflow中的task执行完,才完成subflow的执行。同时,还可以在上下文中显式调用join,来完成递归模式:
#include <memory>
#include <taskflow/taskflow.hpp>// 递归计算斐波那契数列
int spswm(int n, tf::Subflow& sbf) {if(n < 2) return n;int res1 = 0, res2 = 0;// 生成两个递归子任务.sbf.emplace([&res1, n](tf::Subflow& sbf_inner){res1 = spswm(n-1, sbf_inner);}).name("sub Task:_"+std::to_string(n-1));sbf.emplace([&res2, n](tf::Subflow& sbf_inner){res2 = spswm(n-2, sbf_inner);}).name("sub Task:_"+std::to_string(n-2));// 显式调用join,得到两个子任务的返回值sbf.join();return res1 + res2;
}
int main() {tf::Executor executor; tf::Taskflow taskflow;int res = 0; // 用于存放最后的结果taskflow.emplace([&res](tf::Subflow& sbf){res = spswm(5, sbf); // 计算5的斐波那契数}).name("main Task");executor.run(taskflow).wait();std::cout << "5的斐波那契数:" << res << std::endl;taskflow.dump(std::cout); return 0;
}
调用图如下:
Detach a Subflow
和线程一样,Subflow 可以Detach出去,单独执行(并最后被主Taskflow Join)
#include <taskflow/taskflow.hpp>int main() {tf::Executor executor; tf::Taskflow taskflow;tf::Task A = taskflow.emplace([] () {}).name("A"); // static task Atf::Task C = taskflow.emplace([] () {}).name("C"); // static task Ctf::Task D = taskflow.emplace([] () {}).name("D"); // static task Dtf::Task B = taskflow.emplace([] (tf::Subflow& subflow) { tf::Task B1 = subflow.emplace([] () {}).name("B1"); // static task B1tf::Task B2 = subflow.emplace([] () {}).name("B2"); // static task B2tf::Task B3 = subflow.emplace([] () {}).name("B3"); // static task B3B1.precede(B3); // B1 runs bofore B3B2.precede(B3); // B2 runs before B3subflow.detach(); // 分离出Taskflow,单独执行}).name("B");A.precede(B); // B runs after AA.precede(C); // C runs after AB.precede(D); // D runs after BC.precede(D); // D runs after Cexecutor.run(taskflow).wait();taskflow.dump(std::cout); return 0;
}
最终结构如下:
detach出去的Subflow是临时的,所以,如果执行的是run_n, ABCD四个节点只会构造一次,但是subflow会被构造多次:
#include <taskflow/taskflow.hpp>int main() {tf::Executor executor; tf::Taskflow taskflow;tf::Task A = taskflow.emplace([] () {}).name("A"); // static task Atf::Task C = taskflow.emplace([] () {}).name("C"); // static task Ctf::Task D = taskflow.emplace([] () {}).name("D"); // static task Dtf::Task B = taskflow.emplace([] (tf::Subflow& subflow) { tf::Task B1 = subflow.emplace([] () {}).name("B1"); // static task B1tf::Task B2 = subflow.emplace([] () {}).name("B2"); // static task B2tf::Task B3 = subflow.emplace([] () {}).name("B3"); // static task B3B1.precede(B3); // B1 runs bofore B3B2.precede(B3); // B2 runs before B3subflow.detach(); // 分离出Taskflow,单独执行}).name("B");A.precede(B); // B runs after AA.precede(C); // C runs after AB.precede(D); // D runs after BC.precede(D); // D runs after Cexecutor.run_n(taskflow, 5).wait();assert(taskflow.num_tasks() == 19);taskflow.dump(std::cout);return 0;
}
嵌套子图
Subflow 支持递归,也支持嵌套:
#include <taskflow/taskflow.hpp>int main() {tf::Taskflow taskflow;tf::Task A = taskflow.emplace([] (tf::Subflow& sbf){std::cout << "A spawns A1 & subflow A2\n";tf::Task A1 = sbf.emplace([] () {std::cout << "subtask A1\n";}).name("A1");tf::Task A2 = sbf.emplace([] (tf::Subflow& sbf2){std::cout << "A2 spawns A2_1 & A2_2\n";tf::Task A2_1 = sbf2.emplace([] () {std::cout << "subtask A2_1\n";}).name("A2_1");tf::Task A2_2 = sbf2.emplace([] () {std::cout << "subtask A2_2\n";}).name("A2_2");A2_1.precede(A2_2);}).name("A2");A1.precede(A2);}).name("A");// execute the graph to spawn the subflowtf::Executor().run(taskflow).get();taskflow.dump(std::cout);
}
同样,也可以detach 子图的子图,独立执行,最终都会被master Taskflow 统一Join(类似进程与子进程的关系)