Created
January 25, 2014 04:10
-
-
Save zxytim/8611754 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* $File: class_uniq_thread_job_pool.hh | |
* $Date: Sat Jan 18 14:33:53 2014 +0800 | |
* $Author: Xinyu Zhou <zxytim[at]gmail[dot]com> | |
*/ | |
#pragma once | |
#include <vector> | |
#include <queue> | |
#include <memory> | |
#include <thread> | |
#include <mutex> | |
#include <condition_variable> | |
#include <future> | |
#include <functional> | |
#include <stdexcept> | |
#include <unordered_map> | |
#include <map> | |
#include <set> | |
#include "multi_put_single_get_job_queue.hh" | |
/*! | |
* A thread pool which support tasks with classes(a concept, not c++ class), | |
* which contrained to: | |
* A single task of a class can be excuted at a time, | |
* that is, a task belongs to a class is exclusive to others | |
* tasks belong to the same class. | |
* | |
* In order not to introduce global lock when adding jobs, job assignment is | |
* done by using hash value of class_types to assign a job. Therefore, it is | |
* required that the class_type must be hashable, | |
* | |
* Caveat: | |
* 1.while this thread pool can handle enormous number of class_types, | |
* its performance is not guaranteed when number of class_types is | |
* small: load of workers may be unbalanced. | |
* | |
*/ | |
template<class class_type, class hash_func = std::hash<class_type>> | |
class class_uniq_thread_job_pool { | |
public: | |
class_uniq_thread_job_pool(size_t nr_thread, const hash_func &hasher = hash_func()); | |
template<class F, class... Args> | |
auto put(const class_type &ctype, F&& f, Args&&...args) | |
->std::future<typename std::result_of<F(Args...)>::type>; | |
~class_uniq_thread_job_pool(); | |
protected: | |
class pool_worker { | |
public: | |
pool_worker() : | |
stop(false) { | |
} | |
void set_stop(); | |
void run(); | |
void operator () () { run(); } | |
//! this queue is thread safe while phutting in things | |
multi_put_single_get_job_queue<std::function<void()>> job_queue; | |
std::condition_variable condition; | |
std::mutex notification_mutex; | |
bool stop; | |
}; | |
std::vector<std::thread> worker_threads; | |
std::vector<std::shared_ptr<pool_worker>> workers; | |
hash_func hasher; | |
void set_stop(); | |
}; | |
template<class class_type, class hash_func> | |
class_uniq_thread_job_pool<class_type, hash_func>::class_uniq_thread_job_pool( | |
size_t nr_thread, const hash_func &hasher) : hasher(hasher) { | |
for (size_t i = 0; i < nr_thread; i ++) | |
{ | |
workers.emplace_back(); | |
auto &worker = workers.back(); | |
worker = std::make_shared<pool_worker>(); | |
worker_threads.emplace_back( | |
std::bind(&pool_worker::run, worker.get())); | |
} | |
} | |
template<class class_type, class hash_func> | |
template<class F, class... Args> | |
auto class_uniq_thread_job_pool<class_type, hash_func>::put(const class_type &class_id, F&& f, Args&&...args) | |
->std::future<typename std::result_of<F(Args...)>::type> | |
{ | |
typedef typename std::result_of<F(Args...)>::type return_type; | |
// if (stop) | |
// throw std::runtime_error("put on stopped class_uniq_thread_job_pool"); | |
auto task = std::make_shared<std::packaged_task<return_type()>>( | |
std::bind(std::forward<F>(f), std::forward<Args>(args)...) | |
); | |
std::future<return_type> ret = task->get_future(); | |
int assigned_worker_id = hasher(class_id) % workers.size(); | |
auto &assigned_worker = *workers[assigned_worker_id]; | |
assigned_worker.job_queue.put( | |
[task](){ (*task)(); }); // thread safe when putting in | |
assigned_worker.condition.notify_one(); | |
return ret; | |
} | |
template<class class_type, class hash_func> | |
void class_uniq_thread_job_pool<class_type, hash_func>::set_stop() { | |
for (auto &worker: workers) | |
worker->set_stop(); | |
} | |
template<class class_type, class hash_func> | |
class_uniq_thread_job_pool<class_type, hash_func>::~class_uniq_thread_job_pool() { | |
set_stop(); | |
for (auto &th: worker_threads) | |
th.join(); | |
} | |
template<class class_type, class hash_func> | |
void class_uniq_thread_job_pool<class_type, hash_func>::pool_worker::run() { | |
try { | |
for (; ;) { | |
std::function<void()> job(job_queue.get()); // get is lock free, | |
job_queue.pop(); | |
job(); | |
} | |
} catch (JobFinished) { | |
} | |
} | |
template<class class_type, class hash_func> | |
void class_uniq_thread_job_pool<class_type, hash_func>::pool_worker::set_stop() { | |
job_queue.stop(); | |
} | |
// vim: syntax=cpp11.doxygen foldmethod=marker |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* $File: multi_put_single_get_job_queue.hh | |
* $Date: Sat Jan 18 14:22:22 2014 +0800 | |
* $Author: Xinyu Zhou <zxytim[at]gmail[dot]com> | |
*/ | |
#pragma once | |
#include <cstddef> | |
#include <vector> | |
#include <memory> | |
#include <queue> | |
#include <mutex> | |
#include <condition_variable> | |
#include <iostream> | |
class JobFinished{}; | |
/*! | |
* A multi-put-single-get model job queue. | |
* | |
* This queue is thread safe when multiple threads tries to put things in, | |
* however, it is not thread safe when multiple threads tries to get things. | |
* This compromise could leads to better performance which introduces | |
* fewer locks while a single thread get things. | |
*/ | |
template<class value_type> | |
class multi_put_single_get_job_queue { | |
public: | |
multi_put_single_get_job_queue() : m_stop(false) { | |
get_queue = new std::queue<value_type>(); | |
buf_queue = new std::queue<value_type>(); | |
} | |
~multi_put_single_get_job_queue() { | |
delete get_queue; | |
delete buf_queue; | |
} | |
void put(const value_type &val) { | |
{ | |
// if (m_stop) | |
// throw JobFinished(); | |
std::unique_lock<std::mutex> | |
lock(buf_queue_mutex); | |
buf_queue->push(val); | |
} | |
condition.notify_one(); | |
} | |
// block wait | |
value_type &get() throw (JobFinished) { | |
try_enrich_get_queue(); | |
return get_queue->front(); | |
} | |
void pop() { | |
try_enrich_get_queue(); | |
get_queue->pop(); | |
} | |
void stop() { | |
{ | |
std::unique_lock<std::mutex> | |
lock(buf_queue_mutex); | |
m_stop = true; | |
} | |
condition.notify_one(); | |
} | |
bool is_stopped() { | |
return m_stop; | |
} | |
protected: | |
std::queue<value_type> *get_queue, *buf_queue; | |
std::mutex buf_queue_mutex; | |
std::condition_variable condition; | |
bool m_stop; | |
void try_enrich_get_queue() throw(JobFinished){ | |
if (!get_queue->empty()) | |
return; | |
{ | |
std::unique_lock<std::mutex> lock(buf_queue_mutex); | |
while (!m_stop && buf_queue->empty()) { | |
condition.wait(lock); | |
} | |
if (m_stop && buf_queue->empty()) | |
throw JobFinished(); | |
std::swap(get_queue, buf_queue); | |
} | |
} | |
}; | |
// vim: syntax=cpp11.doxygen foldmethod=marker |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment