Last active
August 29, 2015 13:57
-
-
Save code-of-kpp/9449980 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <iostream> | |
#include <vector> | |
#include <queue> | |
#include <memory> | |
#include <atomic> | |
#include <chrono> | |
#include <thread> | |
#include <mutex> | |
#include <condition_variable> | |
using std::cout; | |
using std::endl; | |
using std::shared_ptr; | |
using std::vector; | |
using std::queue; | |
using std::chrono::seconds; | |
using std::thread; | |
using std::mutex; | |
using std::unique_lock; | |
using std::condition_variable; | |
using std::atomic_uchar; | |
using std::atomic_bool; | |
class Task | |
{ | |
public: | |
virtual void run() = 0; | |
virtual ~Task() {} | |
}; | |
class Pool; | |
class Worker | |
{ | |
public: | |
Worker(Pool& pool): pool(pool), | |
m_thread(&Worker::thread_func, this) | |
{} | |
~Worker() | |
{ | |
if (m_thread.joinable()) m_thread.join(); | |
} | |
protected: | |
void thread_func(); | |
private: | |
Pool& pool; | |
thread m_thread; | |
}; | |
enum Priority | |
{ | |
Low, Normal, High | |
}; | |
class Pool | |
{ | |
public: | |
Pool(unsigned int workers): m_workers(workers, nullptr), | |
m_processed_high(0), | |
m_stopped(false) | |
{ | |
for(auto& worker: m_workers) | |
{ | |
worker.reset(new Worker(*this)); | |
} | |
} | |
~Pool() | |
{ | |
stop(); | |
} | |
queue<Task*>::size_type queued(bool nolock=false) | |
{ | |
unique_lock<mutex> lock; | |
if (!nolock) lock = unique_lock<mutex>(m_mutex_tasks); | |
return ( | |
m_high_queue.size() + | |
m_normal_queue.size() + | |
m_low_queue.size() | |
); | |
} | |
bool enqueue(Task& task, Priority priority) | |
{ | |
unique_lock<mutex> lock(m_mutex_tasks); | |
if (m_stopped) return false; | |
switch(priority) | |
{ | |
case Low: | |
m_low_queue.push(&task); | |
break; | |
case Normal: | |
m_normal_queue.push(&task); | |
break; | |
case High: | |
m_high_queue.push(&task); | |
break; | |
} | |
m_cv_tasks.notify_one(); | |
return true; | |
} | |
void stop() | |
{ | |
m_stopped = true; | |
m_cv_tasks.notify_all(); | |
m_workers.clear(); | |
} | |
protected: | |
Task* get_task() | |
{ | |
auto check_continue = [this] | |
{ | |
return this->m_stopped || this->queued(true); | |
}; | |
unique_lock<mutex> lock(m_mutex_tasks); | |
if (!queued(true) && !m_stopped) | |
{ | |
m_cv_tasks.wait(lock, check_continue); | |
} | |
if (m_stopped) return nullptr; | |
if (m_high_queue.empty() && m_normal_queue.empty()) | |
{ | |
if (!m_low_queue.empty()) | |
{ | |
auto task = m_low_queue.back(); | |
m_low_queue.pop(); | |
return task; | |
} | |
} | |
if (m_high_queue.empty() || m_processed_high >= 3) | |
{ | |
if (!m_normal_queue.empty()) | |
{ | |
auto task = m_normal_queue.back(); | |
m_normal_queue.pop(); | |
m_processed_high = 0; | |
return task; | |
} | |
} | |
if (!m_high_queue.empty()) | |
{ | |
auto task = m_high_queue.back(); | |
m_high_queue.pop(); | |
m_processed_high++; | |
return task; | |
} | |
return nullptr; | |
} | |
private: | |
vector<shared_ptr<Worker>> m_workers; | |
queue<Task*> m_low_queue; | |
queue<Task*> m_normal_queue; | |
queue<Task*> m_high_queue; | |
atomic_uchar m_processed_high; | |
mutex m_mutex_tasks; | |
condition_variable m_cv_tasks; | |
atomic_bool m_stopped; | |
friend class Worker; | |
}; | |
void Worker::thread_func() | |
{ | |
Task* task = nullptr; | |
for(;;) | |
{ | |
task = pool.get_task(); | |
if (pool.m_stopped) | |
{ | |
break; | |
} | |
task->run(); | |
task = nullptr; | |
} | |
} | |
template<Priority p> | |
class ConcreteTask: public Task | |
{ | |
public: | |
ConcreteTask(): counter(0) {} | |
virtual void run() | |
{ | |
std::this_thread::sleep_for(seconds(2)); | |
counter++; | |
} | |
atomic_uchar counter; | |
}; | |
int main() | |
{ | |
Pool pool(10); | |
ConcreteTask<Low> low_task; | |
ConcreteTask<Normal> normal_task; | |
ConcreteTask<High> high_task; | |
for(int i=0; i<30; i++) pool.enqueue(low_task, Low); | |
thread t([&pool, &normal_task] | |
{ | |
for(int i=0; i<90; i++) | |
{ | |
pool.enqueue(normal_task, Normal); | |
} | |
}); | |
for(int i=0; i<30; i++) pool.enqueue(high_task, High); | |
t.join(); | |
std::this_thread::sleep_for(seconds(10)); | |
pool.stop(); | |
cout << "low " << int(low_task.counter) << endl; | |
cout << "normal " << int(normal_task.counter) << endl; | |
cout << "high " << int(high_task.counter) << endl; | |
cout << "left " << pool.queued() << endl; | |
Pool pool1(350); | |
cout << endl; | |
for(int i=0; i<30; i++) pool1.enqueue(high_task, High); | |
thread t1([&pool1, &normal_task] | |
{ | |
for(int i=0; i<90; i++) | |
{ | |
pool1.enqueue(normal_task, Normal); | |
} | |
}); | |
for(int i=0; i<30; i++) pool1.enqueue(low_task, Low); | |
t1.join(); | |
std::this_thread::sleep_for(seconds(5)); | |
cout << "low " << int(low_task.counter) << endl; | |
cout << "normal " << int(normal_task.counter) << endl; | |
cout << "high " << int(high_task.counter) << endl; | |
cout << "left " << pool1.queued() << endl; | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment