Skip to content

Instantly share code, notes, and snippets.

@jlaxson
Last active August 20, 2020 00:38
Show Gist options
  • Save jlaxson/914ec2751a18947a8ddae22db1106135 to your computer and use it in GitHub Desktop.
Save jlaxson/914ec2751a18947a8ddae22db1106135 to your computer and use it in GitHub Desktop.
Taskflow Syntax Ideas
tf::Continuation token_; // this is a functor-like object that can be called to signify the task is actually complete.
void StartExpensiveTaskWithToken(tf::Continuation token) {
token_ = token;
}
tf::Taskflow flow;
auto t1 = flow.emplace([](tf::Continuation token)) {
std::cout << "T1 running...";
StartExpensiveTaskWithToken(token);
}
auto t2 = flow.placeholder();
t2.succeed(t1);
// Output
T1 running...
// now t2 is still blocked, but t1 is not scheduled on an executor. All threads in the executor are free.
// task finishes and somebody calls `token_()`
T2 runs
tf::Taskflow flow;
// This is a very specific solution to the problem of reusing resources, but
// as an example, the flow could be explicitly marked as oneshot, so Taskflow
// knows it is safe to free nodes and handlers as soon as possible.
flow.oneshot();
executor.run(flow); // ok
executor.run(flow); // exception! this flow is oneshot and has already been run
// ignoring the above for now
struct ExpensiveResource {
~ExpensiveResource { std::cout << "Destructing!"; }
}
tf::Taskflow MakeMyFlow() {
tf::Taskflow flow;
std::shared_ptr<ExpensiveResource> data = std::make_shared<ExpensiveResource>();
auto t1 = flow.emplace([data]() {
DoSomethingWith(data);
std::cout << "T1";
});
auto t2 = flow.emplace([data]() {
DoSomethingElseWith(data);
std::cout << "T2";
});
auto t3 = flow.emplace([]() {
std::cout << "T3";
});
t2.succeed(t1);
t3.succeed(t1);
return flow;
}
void RunMe() {
tf::Taskflow flow = MakeMyFlow();
tf::Executor executor;
executor.run(flow);
}
// Outputs:
T1
T2
Destructing!
T3
with tf::Taskflow::sample_task(std::string name, int delay) = emplace([=]() { std::cout << "Starting " << name; sleep(delay)});
tf::Taskflow flow;
auto semaphore = flow.semaphore(5); // 5 is initial capacity
std::atomic<int> concurrency;
for (int i = 0; i < 100; i++) {
auto t1 = flow.emplace([=]() { concurrency++; sleep(1); });
auto t2 = flow.emplace([=]() { assert(concurrency.load() <= 5); sleep(10); });
auto t3 = flow.emplace([=]() { concurrency--; sleep(1); });
t2.succeed(t1);
t3.succeed(t3);
t1.acquire(semaphore); // t1 cannot run until the semaphore can be downed
t3.release(semaphore); // when t3 completes it will "up" the semaphore
}
============
tf::Taskflow flow;
auto semaphore = flow.semaphore(1);
for (int i = 99; i >= 0; i--) {
// each of these tasks will be serialized to one at a time, although not necessarily in the proper order, should see output without any garbling
auto t1 = flow.emplace([=]() { std::cout << std::to_string(i) << " bottles of beer on the wall\n" });
t1.acquire(semaphore).release(semaphore);
}
============
tf::Taskflow flow;
auto semaphore = flow.semaphore(1);
auto t1 = flow.emplace([=]() { sleep(1); });
auto t2 = flow.emplace([=]() { sleep(10); });
t2.succeed(t1);
t2.synchronize(semaphore); // syntax sugar for .acquire(semaphore).release(semaphore)
// The semaphore should not be acquired until t1 is complete and t2 is otherwise runnable. The goal being to not block the semaphore unneccessarily. If it depended on two or more semaphores, might have to just pick an order to lock them in (or does CS give us a good solution for this)
============
tf::Taskflow flow;
auto semaphore = flow.semaphore(10);
auto t1 = flow.emplace([=]() { sleep(1); });
auto t2 = flow.emplace([=]() { sleep(1); });
t1.acquire(semaphore, 6).release(semaphore, 6);
t2.synchronize(semaphore, 5);
// not sure how practical this one is - the count that the semaphore is decremented could be specified. This could allow subflows to more finely control resource access.
// t1 and t2 won't run concurrently.
tf::Taskflow flow;
auto t1 = flow.placeholder();
auto t2 = flow.placeholder();
auto t3 = flow.emplace([&](tf::Subflow& subflow) {
// tasks in the subflow can depend on tasks outside it
auto t3sub1 = subflow.placeholder();
t3sub1.succeed(t2);
auto t3sub2 = subflow.placeholder();
});
tf::Executor executor;
executor.run(flow);
tf::Taskflow flow2;
auto t4 = flow2.placeholder();
t4.succeed(t2); // tasks in flow2 can depend on tasks in other graphs already submitted to executor
executor.run(flow2);
=====
// this seems super complicated to address. probably should not be allowed.
executor.run(flow);
---> undefined behavior
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment