Taskflow 概览

主要功能

  • 实现了一种基于任务拓扑图的调度算法
  • 支持对cpu + gpu的混合调度
  • 拓扑图支持条件语句、循环语句、switch case 等简易逻辑
  • 支持模块级别的子流程
  • 支持动态任务(任务执行运行时添加任务)
  • 支持设置任务的优先级
  • 支持任务的profile查看

缺点

  • 数据传递没有实现传递途径,需要自行实现

如何配置化

实现方式

在启动执行器时,会直接启动固定大小的线程,用于做任务执行。在执行器上增加工作流后,开始进行计算。

主要有几个名词

  • executor 执行器,一个executor 上会有多个worker线程
  • worker 工作线程
  • topology 拓扑图,拓扑图上会有一个或者多个node节点
  • task 是提交的任务
  • taskflow 实现的目标是,将task 在worker上并行执行,提升执行效率。

taskflow 的任务调度图

如调度图所示,每个worker 占用一个real core,每个core 会有两个 task 队列(gpu & cpu)。core 只和自己的task 队列交互。

steal 的task 应该是 wsq 的最外面的task

如何使用

cookbook

静态任务

case

代码示意

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include <taskflow/taskflow.hpp>  // Taskflow is header-only

int main(){
  
  // 启动执行器
  tf::Executor executor;

  // 定义任务流
  tf::Taskflow taskflow;
  auto [A, B, C, D] = taskflow.emplace(  // create four tasks
    [] () { std::cout << "TaskA\n"; },
    [] () { std::cout << "TaskB\n"; },
    [] () { std::cout << "TaskC\n"; },
    [] () { std::cout << "TaskD\n"; } 
  );                                  

  A.precede(B, C);  // A runs before B and C
  D.succeed(B, C);  // D runs after  B and C
                                      
  // 任务执行器启动,并执行任务流
  executor.run(taskflow).wait(); 

  return 0;
}

工作流

Executor

case

动态任务

case

动态任务支持在运行时,增加子任务

代码示意

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
 tf::Taskflow taskflow;
 tf::Executor executor;

 tf::Task A = taskflow.emplace([] () {}).name("A");  // static task A
 tf::Task C = taskflow.emplace([] () {}).name("C");  // static task C
 tf::Task D = taskflow.emplace([] () {}).name("D");  // static task D

 tf::Task B = taskflow.emplace([] (tf::Subflow& subflow) { 
   tf::Task B1 = subflow.emplace([] () {}).name("B1");  // dynamic task B1
   tf::Task B2 = subflow.emplace([] () {}).name("B2");  // dynamic task B2
   tf::Task B3 = subflow.emplace([] () {}).name("B3");  // dynamic task B3
   B1.precede(B3);  // B1 runs bofore B3
   B2.precede(B3);  // B2 runs before B3
 }).name("B");

 A.precede(B);  // B runs after A
 A.precede(C);  // C runs after A
 B.precede(D);  // D runs after B
 C.precede(D);  // D runs after C

 executor.run(taskflow).get();  // execute the graph to spawn the subflow
 taskflow.dump(std::cout);      // dump the taskflow to a DOT format

工作流

条件任务

case

支持多种条件的判断,因此可以支持if/while等的语意描述。

代码示意

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
tf::Executor executor;
tf::Taskflow taskflow;

auto A = taskflow.emplace([&]() -> tf::SmallVector<int> { 
  std::cout << "A\n"; 
  return {0, 2};
}).name("A");
auto B = taskflow.emplace([&](){ std::cout << "B\n"; }).name("B");
auto C = taskflow.emplace([&](){ std::cout << "C\n"; }).name("C");
auto D = taskflow.emplace([&](){ std::cout << "D\n"; }).name("D");

A.precede(B, C, D);

executor.run(taskflow).wait();

流程图

任务组合

case

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// f1 has three independent tasks
tf::Taskflow f1;
f1.name("F1");
tf::Task f1A = f1.emplace([&](){ std::cout << "F1 TaskA\n"; });
tf::Task f1B = f1.emplace([&](){ std::cout << "F1 TaskB\n"; });
tf::Task f1C = f1.emplace([&](){ std::cout << "F1 TaskC\n"; });

f1A.name("f1A");
f1B.name("f1B");
f1C.name("f1C");
f1A.precede(f1C);
f1B.precede(f1C);

// f2A ---
//        |----> f2C ----> f1_module_task ----> f2D
// f2B --- 
tf::Taskflow f2;
f2.name("F2");
tf::Task f2A = f2.emplace([&](){ std::cout << "  F2 TaskA\n"; });
tf::Task f2B = f2.emplace([&](){ std::cout << "  F2 TaskB\n"; });
tf::Task f2C = f2.emplace([&](){ std::cout << "  F2 TaskC\n"; });
tf::Task f2D = f2.emplace([&](){ std::cout << "  F2 TaskD\n"; });

f2A.name("f2A");
f2B.name("f2B");
f2C.name("f2C");
f2D.name("f2D");

f2A.precede(f2C);
f2B.precede(f2C);

tf::Task f1_module_task = f2.composed_of(f1).name("module");
f2C.precede(f1_module_task);
f1_module_task.precede(f2D);

f2.dump(std::cout);

流程图

警告
不能并行执行同一个任务组合

异步任务

  • 支持在subflow,executor,runtime 等级别的等待join
  • 支持有依赖的异步任务,并支持在多线程中创建异步任务
  • case | 有依赖的异步任务

简单的异步任务

1
2
std::future<int> future = executor.async([](){ return 1; });
assert(future.get() == 1);

多线程中的任务依赖

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
tf::Executor executor;

// main thread creates a dependent async task A
tf::AsyncTask A = executor.silent_dependent_async([](){});

// spawn a new thread to create an async task B that runs after A
std::thread t1([&](){
  tf::AsyncTask B = executor.silent_dependent_async([](){}, A);
});

// spawn a new thread to create an async task C that runs after A
std::thread t2([&](){
  tf::AsyncTask C = executor.silent_dependent_async([](){}, A);
});

executor.wait_for_all();
t1.join();
t2.join();

与runtime的交互

case 支持task传入runtime参数,用runtime 参数调度手动执行调度

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
tf::Task A, B, C, D;
std::tie(A, B, C, D) = taskflow.emplace(
  [] () { return 0; },
  [&C] (tf::Runtime& rt) {  // C must be captured by reference
    std::cout << "B\n"; 
    rt.schedule(C); // b 唤起C
  },
  [] () { std::cout << "C\n"; },
  [] () { std::cout << "D\n"; }
);
A.precede(B, C, D);
executor.run(taskflow).wait();

优先级任务

case

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
tf::Executor executor(1);
tf::Taskflow taskflow;

int counter = 0;

auto [A, B, C, D, E] = taskflow.emplace(
  [] () { },
  [&] () { 
    std::cout << "Task B: " << counter++ << '\n';  // 0
  },
  [&] () { 
    std::cout << "Task C: " << counter++ << '\n';  // 2
  },
  [&] () { 
    std::cout << "Task D: " << counter++ << '\n';  // 1
  },
  [] () { }
);

A.precede(B, C, D); 
E.succeed(B, C, D);

B.priority(tf::TaskPriority::HIGH);
C.priority(tf::TaskPriority::LOW);
D.priority(tf::TaskPriority::NORMAL);

executor.run(taskflow).wait();

gpu 任务

case

设置最大并发

case

取消请求

case

Profile

case

代码学习

node 描述

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

// 定义不同的执行类型
struct Static {
  template <typename C>
  Static(C&&);
  std::variant<
    std::function<void()>, std::function<void(Runtime&)>
  > work;
};

struct Dynamic {
  template <typename C>
  Dynamic(C&&);

  std::function<void(Subflow&)> work;
  Graph subgraph;
};
// ...


using handle_t = std::variant<
  Placeholder,      // placeholder
  Static,           // static tasking
  Dynamic,          // dynamic tasking
  Condition,        // conditional tasking
  MultiCondition,   // multi-conditional tasking
  Module,           // composable tasking
  Async,            // async tasking
  DependentAsync    // dependent async tasking (no future)
>;


class Node {
  SmallVector<Node*> _successors; // node 执行成功后,需要转移的node 列表(map)
  SmallVector<Node*> _dependents; // node 依赖的其他node   
  handle_t _handle;
  unsigned _priority {0};
  Topology* _topology {nullptr};
  Node* _parent {nullptr};
}

初始化执行器

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
// N 是需要启动的执行线程的数量
inline Executor::Executor(size_t N, std::shared_ptr<WorkerInterface> wix) :
    _MAX_STEALS {((N+1) << 1)},
    _threads    {N},
    _workers    {N},
    _notifier   {N},
    _worker_interface {std::move(wix)} {
 
    if(N == 0) {
      TF_THROW("no cpu workers to execute taskflows");
    }
 
    _spawn(N);
 
    // instantite the default observer if requested
    if(has_env(TF_ENABLE_PROFILER)) {
      TFProfManager::get()._manage(make_observer<TFProfObserver>());
    }
  }

启动线程

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
inline void Executor::_spawn(size_t N) {
 
  std::mutex mutex;
  std::condition_variable cond;
  size_t n=0;
  // workers 保存的就是线程,创建N个
  for(size_t id=0; id<N; ++id) {
 
    _workers[id]._id = id;
    _workers[id]._vtm = id;
    _workers[id]._executor = this;
    _workers[id]._waiter = &_notifier._waiters[id];
     
    _threads[id] = std::thread([this] (
      Worker& w, std::mutex& mutex, std::condition_variable& cond, size_t& n
    ) -> void {
 
      w._thread = &_threads[w._id];
 
      {
        std::scoped_lock lock(mutex);
        _wids[std::this_thread::get_id()] = w._id;
        if(n++; n == num_workers()) {
          // 确保至少有一个worker在启动中
          cond.notify_one();
        }
      }
      Node* t = nullptr;
 
      // worker 启动前的钩子
      if(_worker_interface) {
        _worker_interface->scheduler_prologue(w);
      }
 
      std::exception_ptr ptr{nullptr};    
      try {
        while(1) {
          循环获取task和执行task
          // execute the tasks.
          _exploit_task(w, t);
 
          // wait for tasks
          if(_wait_for_task(w, t) == false) {
            break;
          }
        }
      }
      catch(...) {
        ptr = std::current_exception();
      }
 
      // worker 结束前的钩子
      if(_worker_interface) {
        _worker_interface->scheduler_epilogue(w, ptr);
      }
 
    }, std::ref(_workers[id]), std::ref(mutex), std::ref(cond), std::ref(n));
  }
 
  std::unique_lock<std::mutex> lock(mutex);
  cond.wait(lock, [&](){ return n==N; });
}

任务执行

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
// 循环获取task
  inline void Executor::_exploit_task(Worker& w, Node*& t) {
    while(t) {
      _invoke(w, t);
      // 每个worker 中有一个 _wsq 保存task列表
      t = w._wsq.pop();
    }
  }
 
 
// 处理task的过程
inline void Executor::_invoke(Worker& worker, Node* node) {
 
  // synchronize all outstanding memory operations caused by reordering
  while(!(node->_state.load(std::memory_order_acquire) & Node::READY));
 
  begin_invoke:
   
  SmallVector<int> conds;
 
  // 取消直接返回
  if(node->_is_cancelled()) {
    if(node = _tear_down_invoke(worker, node); node) {
      goto invoke_successors;
    }
    return;
  }
 
  // if acquiring semaphore(s) exists, acquire them first
  if(node->_semaphores && !node->_semaphores->to_acquire.empty()) {
    SmallVector<Node*> nodes;
    if(!node->_acquire_all(nodes)) {
      _schedule(worker, nodes);
      return;
    }
    node->_state.fetch_or(Node::ACQUIRED, std::memory_order_release);
  }
 
  // 基于不同任务类型,执行node上的任务, conds是返回值
  switch(node->_handle.index()) {
    // static task
    case Node::STATIC:{
      _invoke_static_task(worker, node);
    }
    break;
 
    // dynamic task
    case Node::DYNAMIC: {
      _invoke_dynamic_task(worker, node);
    }
    break;
    ......
  }
 
  invoke_successors:
 
  // if releasing semaphores exist, release them
  if(node->_semaphores && !node->_semaphores->to_release.empty()) {
    _schedule(worker, node->_release_all());
  }
   
  // Reset the join counter to support the cyclic control flow.
  // + We must do this before scheduling the successors to avoid race
  //   condition on _dependents.
  // + We must use fetch_add instead of direct assigning
  //   because the user-space call on "invoke" may explicitly schedule
  //   this task again (e.g., pipeline) which can access the join_counter.
  if((node->_state.load(std::memory_order_relaxed) & Node::CONDITIONED)) {
    node->_join_counter.fetch_add(node->num_strong_dependents(), std::memory_order_relaxed);
  }
  else {
    node->_join_counter.fetch_add(node->num_dependents(), std::memory_order_relaxed);
  }
 
  // acquire the parent flow counter
  auto& j = (node->_parent) ? node->_parent->_join_counter :
                              node->_topology->_join_counter;
 
  // Here, we want to cache the latest successor with the highest priority
  worker._cache = nullptr;
  auto max_p = static_cast<unsigned>(TaskPriority::MAX);
 
  // Invoke the task based on the corresponding type
  switch(node->_handle.index()) {
 
    // condition and multi-condition tasks
    case Node::CONDITION:
    case Node::MULTI_CONDITION: {
      for(auto cond : conds) {
        if(cond >= 0 && static_cast<size_t>(cond) < node->_successors.size()) {
          auto s = node->_successors[cond];/           }
            worker._cache = s;
            max_p = s->_priority;
          }
          else {
            _schedule(worker, s);
          }
        }
      }
    }
    break;
 
    // 非条件的任务,即全部是强依赖,则
    default: {
      for(size_t i=0; i<node->_successors.size(); ++i) {
        if(auto s = node->_successors[i];
          s->_join_counter.fetch_sub(1, std::memory_order_acq_rel) == 1) {
          j.fetch_add(1, std::memory_order_relaxed);
          // 优先级最高的自己的本worker直接执行,否则会进入 _wsq 队列等待执行
          if(s->_priority <= max_p) {
            if(worker._cache) {
              _schedule(worker, worker._cache);
            }
            worker._cache = s;
            max_p = s->_priority;
          }
          else {
            _schedule(worker, s);
          }
        }
      }
    }
    break;
  }
 
  // 通知睡觉的worker,可以进行窃取task了
  _tear_down_invoke(worker, node);
  // 指定当前执行node,开始执行新的worker
  if(worker._cache) {
    node = worker._cache;
    goto begin_invoke;
  }
}

任务调度

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// Procedure: _schedule
inline void Executor::_schedule(Worker& worker, Node* node) {
   
  // We need to fetch p before the release such that the read
  // operation is synchronized properly with other thread to
  // void data race.
  auto p = node->_priority;
 
  node->_state.fetch_or(Node::READY, std::memory_order_release);
 
  // 本地worker队列
  if(worker._executor == this) {
    // 推入worker 的wsq
    worker._wsq.push(node, p);
  // 通知等待的worker
    _notifier.notify(false);
    return;
  }
 
  // 全局队列
  {
    std::lock_guard<std::mutex> lock(_wsq_mutex);
    _wsq.push(node, p);
  }
  // 通知等待的worker
  _notifier.notify(false);
}

其他链接

  1. github 地址
  2. 2022年论文
  3. 作者主页
0%