Skip to content

Instantly share code, notes, and snippets.

@code-of-kpp
Last active August 29, 2015 13:57
Show Gist options
  • Save code-of-kpp/9449980 to your computer and use it in GitHub Desktop.
Save code-of-kpp/9449980 to your computer and use it in GitHub Desktop.
#include <iostream>
#include <vector>
#include <queue>
#include <memory>
#include <atomic>
#include <chrono>
#include <thread>
#include <mutex>
#include <condition_variable>
using std::cout;
using std::endl;
using std::shared_ptr;
using std::vector;
using std::queue;
using std::chrono::seconds;
using std::thread;
using std::mutex;
using std::unique_lock;
using std::condition_variable;
using std::atomic_uchar;
using std::atomic_bool;
class Task
{
public:
virtual void run() = 0;
virtual ~Task() {}
};
class Pool;
class Worker
{
public:
Worker(Pool& pool): pool(pool),
m_thread(&Worker::thread_func, this)
{}
~Worker()
{
if (m_thread.joinable()) m_thread.join();
}
protected:
void thread_func();
private:
Pool& pool;
thread m_thread;
};
enum Priority
{
Low, Normal, High
};
class Pool
{
public:
Pool(unsigned int workers): m_workers(workers, nullptr),
m_processed_high(0),
m_stopped(false)
{
for(auto& worker: m_workers)
{
worker.reset(new Worker(*this));
}
}
~Pool()
{
stop();
}
queue<Task*>::size_type queued(bool nolock=false)
{
unique_lock<mutex> lock;
if (!nolock) lock = unique_lock<mutex>(m_mutex_tasks);
return (
m_high_queue.size() +
m_normal_queue.size() +
m_low_queue.size()
);
}
bool enqueue(Task& task, Priority priority)
{
unique_lock<mutex> lock(m_mutex_tasks);
if (m_stopped) return false;
switch(priority)
{
case Low:
m_low_queue.push(&task);
break;
case Normal:
m_normal_queue.push(&task);
break;
case High:
m_high_queue.push(&task);
break;
}
m_cv_tasks.notify_one();
return true;
}
void stop()
{
m_stopped = true;
m_cv_tasks.notify_all();
m_workers.clear();
}
protected:
Task* get_task()
{
auto check_continue = [this]
{
return this->m_stopped || this->queued(true);
};
unique_lock<mutex> lock(m_mutex_tasks);
if (!queued(true) && !m_stopped)
{
m_cv_tasks.wait(lock, check_continue);
}
if (m_stopped) return nullptr;
if (m_high_queue.empty() && m_normal_queue.empty())
{
if (!m_low_queue.empty())
{
auto task = m_low_queue.back();
m_low_queue.pop();
return task;
}
}
if (m_high_queue.empty() || m_processed_high >= 3)
{
if (!m_normal_queue.empty())
{
auto task = m_normal_queue.back();
m_normal_queue.pop();
m_processed_high = 0;
return task;
}
}
if (!m_high_queue.empty())
{
auto task = m_high_queue.back();
m_high_queue.pop();
m_processed_high++;
return task;
}
return nullptr;
}
private:
vector<shared_ptr<Worker>> m_workers;
queue<Task*> m_low_queue;
queue<Task*> m_normal_queue;
queue<Task*> m_high_queue;
atomic_uchar m_processed_high;
mutex m_mutex_tasks;
condition_variable m_cv_tasks;
atomic_bool m_stopped;
friend class Worker;
};
void Worker::thread_func()
{
Task* task = nullptr;
for(;;)
{
task = pool.get_task();
if (pool.m_stopped)
{
break;
}
task->run();
task = nullptr;
}
}
template<Priority p>
class ConcreteTask: public Task
{
public:
ConcreteTask(): counter(0) {}
virtual void run()
{
std::this_thread::sleep_for(seconds(2));
counter++;
}
atomic_uchar counter;
};
int main()
{
Pool pool(10);
ConcreteTask<Low> low_task;
ConcreteTask<Normal> normal_task;
ConcreteTask<High> high_task;
for(int i=0; i<30; i++) pool.enqueue(low_task, Low);
thread t([&pool, &normal_task]
{
for(int i=0; i<90; i++)
{
pool.enqueue(normal_task, Normal);
}
});
for(int i=0; i<30; i++) pool.enqueue(high_task, High);
t.join();
std::this_thread::sleep_for(seconds(10));
pool.stop();
cout << "low " << int(low_task.counter) << endl;
cout << "normal " << int(normal_task.counter) << endl;
cout << "high " << int(high_task.counter) << endl;
cout << "left " << pool.queued() << endl;
Pool pool1(350);
cout << endl;
for(int i=0; i<30; i++) pool1.enqueue(high_task, High);
thread t1([&pool1, &normal_task]
{
for(int i=0; i<90; i++)
{
pool1.enqueue(normal_task, Normal);
}
});
for(int i=0; i<30; i++) pool1.enqueue(low_task, Low);
t1.join();
std::this_thread::sleep_for(seconds(5));
cout << "low " << int(low_task.counter) << endl;
cout << "normal " << int(normal_task.counter) << endl;
cout << "high " << int(high_task.counter) << endl;
cout << "left " << pool1.queued() << endl;
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment