Created
March 5, 2016 23:50
-
-
Save voidexp/5240f64edabe9dfb40ce to your computer and use it in GitHub Desktop.
TaskQueue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include "error.h" | |
#include "mem.h" | |
#include "queue.h" | |
#include <assert.h> | |
#include <pthread.h> | |
#include <stdio.h> | |
#include <string.h> | |
#include <time.h> | |
#include <sys/time.h> | |
struct GK_Queue { | |
void *data; | |
size_t len; | |
size_t size; | |
size_t elem_size; | |
pthread_mutex_t mutex; | |
pthread_cond_t cond_has_data; | |
pthread_cond_t cond_full; | |
}; | |
struct GK_Queue* | |
gk_queue_new(size_t elem_size, size_t size) | |
{ | |
assert(elem_size > 0); | |
assert(size > 0); | |
struct GK_Queue *q = gk_new(struct GK_Queue); | |
if (!q) | |
return NULL; | |
if (!(q->data = gk_alloc0(elem_size * size))) { | |
gk_free(q); | |
return NULL; | |
} | |
q->size = size; | |
q->elem_size = elem_size; | |
if (pthread_mutex_init(&q->mutex, NULL) != 0) | |
goto mutex_error; | |
if (pthread_cond_init(&q->cond_has_data, NULL) != 0) | |
goto cond_error; | |
return q; | |
cond_error: | |
pthread_mutex_destroy(&q->mutex); | |
mutex_error: | |
gk_free(q->data); | |
gk_free(q); | |
return NULL; | |
} | |
void | |
gk_queue_free(struct GK_Queue *q) | |
{ | |
if (q) { | |
pthread_cond_destroy(&q->cond_has_data); | |
pthread_mutex_destroy(&q->mutex); | |
gk_free(q->data); | |
gk_free(q); | |
} | |
} | |
void* | |
gk_queue_get(struct GK_Queue *q) | |
{ | |
pthread_mutex_lock(&q->mutex); | |
void *elem = NULL; | |
while (q->len == 0) { | |
// block until the queue has some data in | |
struct timespec ts; | |
struct timeval tv; | |
gettimeofday(&tv, NULL); | |
ts.tv_sec = tv.tv_sec; | |
ts.tv_nsec = tv.tv_usec * 1000 + 1000; | |
pthread_cond_timedwait(&q->cond_has_data, &q->mutex, &ts); | |
} | |
// pick the first element | |
elem = q->data; | |
q->len--; | |
// shift elements in the queue by one position | |
// TODO: implement this better | |
for (size_t i = 0, j = 1; j < q->len; i++, j++) { | |
memcpy( | |
q->data + q->elem_size * i, | |
q->data + q->elem_size * j, | |
q->elem_size | |
); | |
} | |
pthread_cond_broadcast(&q->cond_full); | |
pthread_mutex_unlock(&q->mutex); | |
return elem; | |
} | |
void | |
gk_queue_add(struct GK_Queue *q, void *elem) | |
{ | |
pthread_mutex_lock(&q->mutex); | |
while (q->len == q->size) { | |
// block while the queue is full | |
pthread_cond_wait(&q->cond_full, &q->mutex); | |
} | |
// append the element to the end of the queue and notify | |
memcpy(q->data + q->elem_size * q->len++, elem, q->elem_size); | |
pthread_cond_broadcast(&q->cond_has_data); | |
pthread_mutex_unlock(&q->mutex); | |
} | |
size_t | |
gk_queue_len(struct GK_Queue *q) | |
{ | |
size_t len; | |
pthread_mutex_lock(&q->mutex); | |
len = q->len; | |
pthread_mutex_unlock(&q->mutex); | |
return len; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include "mem.h" | |
#include "queue.h" | |
#include "task_queue.h" | |
#include <assert.h> | |
#include <pthread.h> | |
#include <stdbool.h> | |
#include <stdio.h> | |
struct Worker { | |
pthread_t thread; | |
bool initialized; | |
struct GK_TaskQueue *queue; | |
}; | |
struct Task { | |
GK_TaskFunc func; | |
void *data; | |
}; | |
struct GK_TaskQueue { | |
struct GK_Queue *tasks; | |
struct Worker *workers; | |
unsigned num_workers; | |
size_t num_running; | |
pthread_mutex_t mutex; | |
pthread_cond_t cond_running; | |
}; | |
static struct Task* | |
get_task(struct GK_TaskQueue *tq) | |
{ | |
struct Task *tsk = gk_queue_get(tq->tasks); | |
pthread_mutex_lock(&tq->mutex); | |
tq->num_running++; | |
pthread_mutex_unlock(&tq->mutex); | |
return tsk; | |
} | |
static void | |
task_done(struct GK_TaskQueue *tq) | |
{ | |
pthread_mutex_lock(&tq->mutex); | |
tq->num_running--; | |
pthread_cond_signal(&tq->cond_running); | |
pthread_mutex_unlock(&tq->mutex); | |
} | |
static void* | |
worker(void *arg) | |
{ | |
struct Worker *wrk = arg; | |
bool run = true; | |
while (run) { | |
printf("worker %p waits for task\n", wrk->thread); | |
struct Task *tsk = get_task(wrk->queue); | |
printf("worker %p started task\n", wrk->thread); | |
if (tsk && tsk->func) { | |
// TODO: handle return code | |
tsk->func(tsk->data); | |
} else { | |
run = false; | |
} | |
task_done(wrk->queue); | |
printf("worker %p finished task\n", wrk->thread); | |
} | |
return NULL; | |
} | |
struct GK_TaskQueue* | |
gk_task_queue_new(size_t size, unsigned num_workers) | |
{ | |
assert(size > 0); | |
assert(num_workers > 0); | |
struct GK_TaskQueue *tq = gk_new(struct GK_TaskQueue); | |
if (!tq) | |
return NULL; | |
if (pthread_mutex_init(&tq->mutex, NULL) != 0) | |
goto mutex_error; | |
if (pthread_cond_init(&tq->cond_running, NULL) != 0) | |
goto cond_error; | |
tq->tasks = gk_queue_new(sizeof(struct Task), size); | |
if (!tq->tasks) | |
goto error; | |
tq->num_workers = num_workers; | |
tq->workers = gk_alloc0(sizeof(struct Worker) * num_workers); | |
if (!tq->workers) | |
goto error; | |
// spawn worker threads | |
for (unsigned i = 0; i < num_workers; i++) { | |
struct Worker *wrk = &tq->workers[i]; | |
wrk->queue = tq; | |
if (pthread_create(&wrk->thread, NULL, worker, wrk) != 0) | |
goto error; | |
wrk->initialized = true; | |
} | |
return tq; | |
error: | |
gk_task_queue_free(tq); | |
return NULL; | |
cond_error: | |
pthread_mutex_destroy(&tq->mutex); | |
mutex_error: | |
gk_free(tq); | |
return NULL; | |
} | |
void | |
gk_task_queue_free(struct GK_TaskQueue *tq) | |
{ | |
if (tq) { | |
if (tq->workers) { | |
gk_task_queue_wait(tq); | |
for (unsigned i = 0; i < tq->num_workers; i++) { | |
gk_task_queue_add(tq, NULL, NULL); | |
} | |
for (unsigned i = 0; i < tq->num_workers; i++) { | |
struct Worker *wrk = &tq->workers[i]; | |
if (wrk->initialized) { | |
pthread_join(wrk->thread, NULL); | |
} | |
} | |
gk_free(tq->workers); | |
} | |
pthread_mutex_destroy(&tq->mutex); | |
pthread_cond_destroy(&tq->cond_running); | |
gk_queue_free(tq->tasks); | |
gk_free(tq); | |
} | |
} | |
int | |
gk_task_queue_add(struct GK_TaskQueue *tq, GK_TaskFunc f, void *data) | |
{ | |
struct Task task = { | |
.func = f, | |
.data = data | |
}; | |
gk_queue_add(tq->tasks, &task); | |
return 1; | |
} | |
int | |
gk_task_queue_wait(struct GK_TaskQueue *tq) | |
{ | |
pthread_mutex_lock(&tq->mutex); | |
while (tq->num_running > 0 || gk_queue_len(tq->tasks) > 0) | |
pthread_cond_wait(&tq->cond_running, &tq->mutex); | |
pthread_mutex_unlock(&tq->mutex); | |
return 1; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment