Skip to content

Instantly share code, notes, and snippets.

@ryanorz
Last active February 21, 2017 09:01
Show Gist options
  • Save ryanorz/44070418a55218f18103a11564f15d67 to your computer and use it in GitHub Desktop.
Save ryanorz/44070418a55218f18103a11564f15d67 to your computer and use it in GitHub Desktop.
C++ thread pool implementation
#ifndef _UITILS_EXCEPTIONS_H
#define _UITILS_EXCEPTIONS_H
#include <stdexcept>
#include <string>
#include <cstring>
#include <errno.h>
#define NOEXCEPT noexcept
using std::string;
using std::runtime_error;
inline void ThrowRuntimeIf(bool expression, const string& msg)
{
if (expression)
throw runtime_error(msg);
}
inline void ThrowCAPIIf(bool expression, const string& msg)
{
if (expression)
throw runtime_error(msg + " : " + strerror(errno));
}
#endif
#include "thread_pool.hpp"
#include <stdlib.h>
#include <stdio.h>
static void* routine(void* arg)
{
tpool_t *tpool = (tpool_t *)arg;
tpool_work_t* work;
while (true) {
pthread_mutex_lock(&tpool->q_lock);
while (tpool->q_work == nullptr && !tpool->shutdown)
pthread_cond_wait(&tpool->q_ready, &tpool->q_lock);
if (tpool->shutdown) {
pthread_mutex_unlock(&tpool->q_lock);
pthread_exit(nullptr);
}
work = tpool->q_work;
tpool->q_work = tpool->q_work->next;
tpool->running_num++;
pthread_mutex_unlock(&tpool->q_lock);
work->routine(work->arg);
free(work);
pthread_mutex_lock(&tpool->q_lock);
tpool->running_num--;
if (tpool->running_num == 0 && tpool->q_work == nullptr)
pthread_cond_signal(&tpool->no_running);
pthread_mutex_unlock(&tpool->q_lock);
}
}
ThreadPool::ThreadPool(int max_thread_num)
{
int err;
tpool = (tpool_t*)calloc(1, sizeof(tpool_t));
ThrowCAPIIf(tpool == nullptr, "tpool_create::calloc");
err = pthread_mutex_init(&tpool->q_lock, nullptr);
ThrowCAPIIf(err, "tpool_create::pthread_mutex_init");
err = pthread_cond_init(&tpool->q_ready, nullptr);
ThrowCAPIIf(err, "tpool_create::pthread_cond_init");
err = pthread_cond_init(&tpool->no_running, nullptr);
ThrowCAPIIf(err, "tpool_create::pthread_cond_init");
tpool->tids = (pthread_t *)calloc(max_thread_num, sizeof(pthread_t));
ThrowCAPIIf(tpool->tids == nullptr, "tpool_create::calloc");
tpool->shutdown = false;
tpool->max_thread_num = max_thread_num;
tpool->q_work = nullptr;
tpool->running_num = 0;
for (int i = 0; i < max_thread_num; ++i) {
err = pthread_create(&tpool->tids[i], nullptr, routine, (void *)tpool);
ThrowCAPIIf(err, "tpool_create::pthread_create");
}
}
void ThreadPool::destroy() NOEXCEPT
{
if (tpool->shutdown)
return;
tpool->shutdown = true;
pthread_cond_broadcast(&tpool->q_ready);
for (int i = 0; i < tpool->max_thread_num; ++i) {
pthread_join(tpool->tids[i], nullptr);
}
while (tpool->q_work) {
tpool_work_t *work = tpool->q_work;
tpool->q_work = tpool->q_work->next;
free(work);
}
free(tpool->tids);
pthread_mutex_destroy(&tpool->q_lock);
pthread_cond_destroy(&tpool->q_ready);
pthread_cond_destroy(&tpool->no_running);
free(tpool);
}
void ThreadPool::destroy_wait() NOEXCEPT
{
if (tpool->shutdown)
return;
pthread_mutex_lock(&tpool->q_lock);
while (tpool->q_work != nullptr || tpool->running_num != 0)
pthread_cond_wait(&tpool->no_running, &tpool->q_lock);
pthread_mutex_unlock(&tpool->q_lock);
destroy();
}
void ThreadPool::add_work(Routine workroutine, void *arg) NOEXCEPT
{
ThrowRuntimeIf(!workroutine, "tpool_add_work::routine null");
tpool_work_t *work = (tpool_work_t *)calloc(1, sizeof(tpool_work_t));
ThrowCAPIIf(work == nullptr, "tpool_add_work::calloc");
work->routine = workroutine;
work->arg = arg;
work->next = nullptr;
pthread_mutex_lock(&tpool->q_lock);
if (tpool->q_work == nullptr) {
tpool->q_work = work;
} else {
tpool_work_t *tail = tpool->q_work;
while (tail->next != nullptr)
tail = tail->next;
tail->next = work;
}
pthread_mutex_unlock(&tpool->q_lock);
pthread_cond_signal(&tpool->q_ready);
}
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
#include <pthread.h>
#include "my_exceptions.h"
typedef void* (*Routine)(void *);
/**
* thread pool work list
*/
struct tpool_work_t
{
Routine routine; // function pointer
void* arg; // function argument
tpool_work_t *next;
};
struct tpool_t
{
bool shutdown; // If destroy thread pool
int max_thread_num;
pthread_t* tids;
tpool_work_t* q_work; // list struct work wait queue
pthread_mutex_t q_lock;
pthread_cond_t q_ready;
int running_num;
pthread_cond_t no_running;
};
class ThreadPool
{
public:
ThreadPool(int max_thread_num);
~ThreadPool() NOEXCEPT {};
void destroy() NOEXCEPT;
void destroy_wait() NOEXCEPT;
void add_work(Routine workroutine, void *arg) NOEXCEPT;
private:
tpool_t *tpool;
};
#endif
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment