Skip to content

Instantly share code, notes, and snippets.

@reyoung
Last active February 3, 2020 02:05
Show Gist options
  • Save reyoung/a144bd75a9e509291782f4e8b120cd8a to your computer and use it in GitHub Desktop.
Save reyoung/a144bd75a9e509291782f4e8b120cd8a to your computer and use it in GitHub Desktop.
simple_thread_worker
#include "async_thread_pool.h"
#include <iostream>
int main() {
TimedThreadWorker worker(std::chrono::milliseconds(1));
worker.start();
std::this_thread::sleep_for(std::chrono::seconds(1));
worker.enqueue(std::chrono::milliseconds(100),
[] { std::cout << "hello" << std::endl; });
auto fut = worker.enqueue(std::chrono::milliseconds(350),
[] { std::cout << "!" << std::endl; });
worker.enqueue(std::chrono::milliseconds(200),
[] { std::cout << "world" << std::endl; });
// wait some task to finish
fut.get();
fut = worker.enqueue(std::chrono::milliseconds(0),
[] { std::cout << "run now" << std::endl; });
fut.get();
}
#pragma once
#include <condition_variable>
#include <future>
#include <mutex>
#include <thread>
#include <vector>
class TimedThreadWorker {
public:
using clock_t = typename std::chrono::steady_clock;
using rep = typename clock_t::rep;
using duration_t = typename clock_t::duration;
explicit TimedThreadWorker(duration_t d);
~TimedThreadWorker();
void start();
template <class F, class... Args>
auto enqueue(duration_t execAt, F &&f, Args &&... args)
-> std::future<typename std::result_of<F(Args...)>::type>;
private:
struct Job {
rep execAt_;
std::function<void()> func_;
};
static bool JobCmp(const std::unique_ptr<Job> &a,
const std::unique_ptr<Job> &b);
bool predicate() const;
bool lastTaskCanExec() const;
std::unique_ptr<std::thread> thread_;
std::vector<std::unique_ptr<Job>> tasks_;
std::condition_variable condition_;
std::mutex queue_mutex_;
bool stop_;
duration_t timePiece_;
};
bool TimedThreadWorker::JobCmp(
const std::unique_ptr<TimedThreadWorker::Job> &a,
const std::unique_ptr<TimedThreadWorker::Job> &b) {
return a->execAt_ > b->execAt_;
}
TimedThreadWorker::TimedThreadWorker(TimedThreadWorker::duration_t d)
: timePiece_(d), stop_(false) {}
void TimedThreadWorker::start() {
thread_.reset(new std::thread([this] {
duration_t d = timePiece_;
while (!stop_) {
std::unique_lock<std::mutex> lock(queue_mutex_);
bool ok =
condition_.wait_for(lock, d, [this, &d] { return predicate(); });
if (!ok) {
continue;
}
while (!tasks_.empty() && lastTaskCanExec()) {
auto &task = tasks_.back();
task->func_();
tasks_.pop_back();
std::pop_heap(tasks_.begin(), tasks_.end(), JobCmp);
}
}
}));
}
bool TimedThreadWorker::predicate() const {
if (stop_) {
return true;
}
if (tasks_.empty()) {
return false;
}
return lastTaskCanExec();
}
// add new work item to the pool
template <class F, class... Args>
auto TimedThreadWorker::enqueue(duration_t execAt, F &&f, Args &&... args)
-> std::future<typename std::result_of<F(Args...)>::type> {
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex_);
// don't allow enqueueing after stopping the pool
if (stop_)
throw std::runtime_error("enqueue on stopped ThreadPool");
std::unique_ptr<Job> job(new Job());
job->execAt_ = clock_t::now().time_since_epoch().count();
if (execAt.count() > 0) {
job->execAt_ += execAt.count();
}
job->func_ = [task]() { (*task)(); };
std::push_heap(tasks_.begin(), tasks_.end(), JobCmp);
tasks_.emplace_back(std::move(job));
std::push_heap(tasks_.begin(), tasks_.end(), JobCmp);
std::pop_heap(tasks_.begin(), tasks_.end(), JobCmp);
}
condition_.notify_one();
return res;
}
TimedThreadWorker::~TimedThreadWorker() {
{
std::unique_lock<std::mutex> lock(queue_mutex_);
stop_ = true;
}
condition_.notify_all();
thread_->join();
}
bool TimedThreadWorker::lastTaskCanExec() const {
return clock_t::now().time_since_epoch().count() >= tasks_.back()->execAt_;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment