Created
October 9, 2012 02:20
-
-
Save shigeki/3856204 to your computer and use it in GitHub Desktop.
Solving Consumer-Producer Problem with libuv condvar
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <stdio.h> | |
#include <string.h> | |
#include <stdlib.h> | |
#include <stddef.h> | |
#include <assert.h> | |
#include "uv.h" | |
#define MAX_CONSUMERS 16 | |
#define MAX_LOOPS 100 | |
struct buffer_s { | |
ngx_queue_t queue; | |
int data; | |
}; | |
typedef struct buffer_s buffer_t; | |
static ngx_queue_t queue; | |
static uv_mutex_t mutex; | |
static uv_cond_t empty; | |
static uv_cond_t full; | |
static volatile int finished_consumers = 0; | |
void produce(int value) { | |
buffer_t* buf; | |
buf = malloc(sizeof(buf)); | |
ngx_queue_init(&buf->queue); | |
buf->data = value; | |
ngx_queue_insert_tail(&queue, &buf->queue); | |
} | |
int consume(void) { | |
ngx_queue_t *q; | |
buffer_t *buf; | |
int data; | |
assert(!ngx_queue_empty(&queue)); | |
q = ngx_queue_last(&queue); | |
ngx_queue_remove(q); | |
buf = ngx_queue_data(q, buffer_t, queue); | |
data = buf->data; | |
free(buf); | |
return data; | |
} | |
static void producer(void* arg) { | |
int i; | |
for(i = 0; i < MAX_LOOPS*MAX_CONSUMERS; i++) { | |
uv_mutex_lock(&mutex); | |
while(!ngx_queue_empty(&queue)) { | |
uv_cond_wait(&empty, &mutex); | |
} | |
produce(i); | |
fprintf(stdout, "producer(%lu) put value:%d\n", uv_thread_self(), i); | |
uv_cond_signal(&full); | |
uv_mutex_unlock(&mutex); | |
} | |
fprintf(stdout, "*** finsihed_consumers=%d ***\n", finished_consumers); | |
assert(finished_consumers == MAX_CONSUMERS); | |
} | |
static void consumer(void* arg) { | |
int i; | |
int value; | |
(void) arg; | |
for(i = 0; i < MAX_LOOPS; i++) { | |
uv_mutex_lock(&mutex); | |
while(ngx_queue_empty(&queue)) { | |
uv_cond_wait(&full, &mutex); | |
} | |
value = consume(); | |
assert(value < MAX_LOOPS*MAX_CONSUMERS); | |
fprintf(stdout, "consumer(%lu) gets value:%d\n", uv_thread_self(), value); | |
uv_cond_signal(&empty); | |
uv_mutex_unlock(&mutex); | |
} | |
finished_consumers++; | |
} | |
int main(int argc, char** argv) { | |
int i; | |
uv_thread_t cthreads[MAX_CONSUMERS]; | |
uv_thread_t pthread; | |
fprintf(stdout, "=== start consumer-producer test ===\n"); | |
ngx_queue_init(&queue); | |
assert(0 == uv_mutex_init(&mutex)); | |
assert(0 == uv_cond_init(&empty)); | |
assert(0 == uv_cond_init(&full)); | |
for(i = 0; i < MAX_CONSUMERS; i++) { | |
assert(0 == uv_thread_create(&cthreads[i], consumer, NULL)); | |
} | |
assert(0 == uv_thread_create(&pthread, producer, NULL)); | |
for(i = 0; i < MAX_CONSUMERS; i++) { | |
assert(0 == uv_thread_join(&cthreads[i])); | |
} | |
assert(0 == uv_thread_join(&pthread)); | |
uv_cond_destroy(&empty); | |
uv_cond_destroy(&full); | |
uv_mutex_destroy(&mutex); | |
fprintf(stdout, "=== end consumer-producer test ===\n"); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment