Created
March 21, 2016 14:55
-
-
Save dannvix/f194d9927f69e5572654 to your computer and use it in GitHub Desktop.
Thread pool using std::thread and boost::lockfree::queue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <boost\lockfree\queue.hpp> | |
#include <iostream> | |
#include <thread> | |
#include <atomic> | |
#include <vector> | |
#include <memory> | |
#include <condition_variable> | |
#include <functional> | |
template<typename ItemType, unsigned long ulQueueCapacity> | |
class ThreadPool { | |
public: | |
ThreadPool( | |
unsigned long const ulThreadCount, | |
std::function<void(ItemType item)> pfItemCallbackRoutine) | |
: m_state(State::Genesis) | |
, m_ulThreadCount(ulThreadCount) | |
, m_pfItemCallbackRoutine(pfItemCallbackRoutine) | |
{ | |
} | |
~ThreadPool() | |
{ | |
{ | |
State const nState = m_state.load(std::memory_order_relaxed); | |
bool bNeedToStop = (nState == State::Running); | |
if (bNeedToStop) { | |
stop(); | |
} | |
} | |
} | |
// return TRUE if success | |
bool start() | |
{ | |
{ | |
State const nState = m_state.load(std::memory_order_relaxed); | |
bool bCanStart = (nState == State::Genesis); | |
if (!bCanStart) { | |
wprintf(L"ThreadPool start fail\n"); | |
return false; | |
} | |
} | |
m_state.store(State::Starting, std::memory_order_relaxed); | |
try { | |
wprintf(L"!!! ulThreadCount=%lu\n", m_ulThreadCount); | |
for (size_t i = 0; i < m_dwThreadCount; ++i) { | |
std::thread thread = std::thread(std::bind(&ThreadPool::threadRoutine, this)); | |
m_vThreads.push_back(std::move(thread)); | |
} | |
} | |
catch(std::system_error const &e) { | |
wprintf(L"ThreadPool start failed, code=[%lu], what=[%s]\n", e.code(), e.what()); | |
return false; | |
} | |
m_state.store(State::Running, std::memory_order_relaxed); | |
return true; | |
} | |
// return TRUE if success | |
bool stop() | |
{ | |
{ | |
State const nState = m_state.load(std::memory_order_relaxed); | |
bool bCanStop = (nState == State::Running); | |
if (!bCanStop) { | |
wprintf(L"ThreadPool stop failed"); | |
return false; | |
} | |
} | |
m_state.store(State::Stopping, std::memory_order_relaxed); | |
m_condVarQueue.notify_all(); // wake all sleeping threads | |
for (std::thread &thread : m_vThreads) { | |
thread.join(); | |
} | |
m_vThreads.clear(); | |
m_queue.consume_all([](ItemType){}); | |
m_state.store(State::Genesis, std::memory_order_relaxed); | |
return true; | |
} | |
// return true if success | |
bool enqueue(ItemType const &item) | |
{ | |
if (m_queue.push(item)) { | |
m_condVarQueue.notify_one(); | |
return true; | |
} | |
return false; | |
} | |
private: // disable copy and move | |
ThreadPool(ThreadPool&); | |
ThreadPool(ThreadPool&&); | |
ThreadPool& operator=(ThreadPool&); | |
ThreadPool& operator=(ThreadPool&&); | |
private: // methods | |
void threadRoutine() | |
{ | |
while (1) { | |
std::unique_lock<std::mutex> lock(m_mutexQueueCondVar); | |
m_condVarQueue.wait(lock); | |
State const nState = m_state.load(std::memory_order_relaxed); | |
bool bContinueRunning = (nState == State::Running); | |
if (!bContinueRunning) { | |
return; | |
} | |
ItemType item; | |
if (m_queue.pop(/*out*/ item)) { | |
if (m_pfItemCallbackRoutine) { | |
m_pfItemCallbackRoutine(item); | |
} | |
} | |
} | |
} | |
private: // attributes | |
enum class State { Genesis, Starting, Running, Stopping }; | |
std::atomic<State> m_state; | |
unsigned long m_ulThreadCount; | |
std::vector<std::thread> m_vThreads; | |
std::function<void(ItemType item)> m_pfItemCallbackRoutine; | |
std::condition_variable m_condVarQueue; | |
std::mutex m_mutexQueueCondVar; | |
boost::lockfree::queue<ItemType, | |
boost::lockfree::fixed_sized<true>, | |
boost::lockfree::capacity<ulQueueCapacity>> m_queue; | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment