Last active
February 3, 2020 02:05
-
-
Save reyoung/a144bd75a9e509291782f4e8b120cd8a to your computer and use it in GitHub Desktop.
simple_thread_worker
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include "async_thread_pool.h" | |
#include <iostream> | |
int main() { | |
TimedThreadWorker worker(std::chrono::milliseconds(1)); | |
worker.start(); | |
std::this_thread::sleep_for(std::chrono::seconds(1)); | |
worker.enqueue(std::chrono::milliseconds(100), | |
[] { std::cout << "hello" << std::endl; }); | |
auto fut = worker.enqueue(std::chrono::milliseconds(350), | |
[] { std::cout << "!" << std::endl; }); | |
worker.enqueue(std::chrono::milliseconds(200), | |
[] { std::cout << "world" << std::endl; }); | |
// wait some task to finish | |
fut.get(); | |
fut = worker.enqueue(std::chrono::milliseconds(0), | |
[] { std::cout << "run now" << std::endl; }); | |
fut.get(); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#pragma once | |
#include <condition_variable> | |
#include <future> | |
#include <mutex> | |
#include <thread> | |
#include <vector> | |
class TimedThreadWorker { | |
public: | |
using clock_t = typename std::chrono::steady_clock; | |
using rep = typename clock_t::rep; | |
using duration_t = typename clock_t::duration; | |
explicit TimedThreadWorker(duration_t d); | |
~TimedThreadWorker(); | |
void start(); | |
template <class F, class... Args> | |
auto enqueue(duration_t execAt, F &&f, Args &&... args) | |
-> std::future<typename std::result_of<F(Args...)>::type>; | |
private: | |
struct Job { | |
rep execAt_; | |
std::function<void()> func_; | |
}; | |
static bool JobCmp(const std::unique_ptr<Job> &a, | |
const std::unique_ptr<Job> &b); | |
bool predicate() const; | |
bool lastTaskCanExec() const; | |
std::unique_ptr<std::thread> thread_; | |
std::vector<std::unique_ptr<Job>> tasks_; | |
std::condition_variable condition_; | |
std::mutex queue_mutex_; | |
bool stop_; | |
duration_t timePiece_; | |
}; | |
bool TimedThreadWorker::JobCmp( | |
const std::unique_ptr<TimedThreadWorker::Job> &a, | |
const std::unique_ptr<TimedThreadWorker::Job> &b) { | |
return a->execAt_ > b->execAt_; | |
} | |
TimedThreadWorker::TimedThreadWorker(TimedThreadWorker::duration_t d) | |
: timePiece_(d), stop_(false) {} | |
void TimedThreadWorker::start() { | |
thread_.reset(new std::thread([this] { | |
duration_t d = timePiece_; | |
while (!stop_) { | |
std::unique_lock<std::mutex> lock(queue_mutex_); | |
bool ok = | |
condition_.wait_for(lock, d, [this, &d] { return predicate(); }); | |
if (!ok) { | |
continue; | |
} | |
while (!tasks_.empty() && lastTaskCanExec()) { | |
auto &task = tasks_.back(); | |
task->func_(); | |
tasks_.pop_back(); | |
std::pop_heap(tasks_.begin(), tasks_.end(), JobCmp); | |
} | |
} | |
})); | |
} | |
bool TimedThreadWorker::predicate() const { | |
if (stop_) { | |
return true; | |
} | |
if (tasks_.empty()) { | |
return false; | |
} | |
return lastTaskCanExec(); | |
} | |
// add new work item to the pool | |
template <class F, class... Args> | |
auto TimedThreadWorker::enqueue(duration_t execAt, F &&f, Args &&... args) | |
-> std::future<typename std::result_of<F(Args...)>::type> { | |
using return_type = typename std::result_of<F(Args...)>::type; | |
auto task = std::make_shared<std::packaged_task<return_type()>>( | |
std::bind(std::forward<F>(f), std::forward<Args>(args)...)); | |
std::future<return_type> res = task->get_future(); | |
{ | |
std::unique_lock<std::mutex> lock(queue_mutex_); | |
// don't allow enqueueing after stopping the pool | |
if (stop_) | |
throw std::runtime_error("enqueue on stopped ThreadPool"); | |
std::unique_ptr<Job> job(new Job()); | |
job->execAt_ = clock_t::now().time_since_epoch().count(); | |
if (execAt.count() > 0) { | |
job->execAt_ += execAt.count(); | |
} | |
job->func_ = [task]() { (*task)(); }; | |
std::push_heap(tasks_.begin(), tasks_.end(), JobCmp); | |
tasks_.emplace_back(std::move(job)); | |
std::push_heap(tasks_.begin(), tasks_.end(), JobCmp); | |
std::pop_heap(tasks_.begin(), tasks_.end(), JobCmp); | |
} | |
condition_.notify_one(); | |
return res; | |
} | |
TimedThreadWorker::~TimedThreadWorker() { | |
{ | |
std::unique_lock<std::mutex> lock(queue_mutex_); | |
stop_ = true; | |
} | |
condition_.notify_all(); | |
thread_->join(); | |
} | |
bool TimedThreadWorker::lastTaskCanExec() const { | |
return clock_t::now().time_since_epoch().count() >= tasks_.back()->execAt_; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment