Last active
August 29, 2015 14:10
-
-
Save codeslinger/19429aa39aa4ed677cf3 to your computer and use it in GitHub Desktop.
Multiple consumers each consuming all of the queue items
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Compile with: gcc -g -std=c99 -Wall -Werror test-vrt-mc.c -o test-vrt-mc -lvrt -lcork -lpthread |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
> ./test-vrt-mc | |
Results: 499999500000 499999500000 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <stdlib.h> | |
#include <stdio.h> | |
#include <pthread.h> | |
#include <libcork/core.h> | |
#include <libcork/helpers/errors.h> | |
#include <vrt.h> | |
struct vrt_integer_value { | |
struct vrt_value parent; | |
int32_t value; | |
}; | |
struct integer_generator { | |
struct vrt_producer *p; | |
int64_t count; | |
}; | |
struct integer_summer { | |
struct vrt_consumer *c; | |
int64_t *sum; | |
}; | |
struct vrt_queue_client { | |
void *(*run)(void *); | |
void *u; | |
}; | |
static struct vrt_value *vrt_integer_value_new(struct vrt_value_type *type) | |
{ | |
struct vrt_integer_value *self; | |
self = cork_new(struct vrt_integer_value); | |
return &self->parent; | |
} | |
static void vrt_integer_value_free(struct vrt_value_type *type, | |
struct vrt_value *value) | |
{ | |
struct vrt_integer_value *iself; | |
iself = cork_container_of(value, struct vrt_integer_value, parent); | |
cork_free(iself, sizeof(iself)); | |
} | |
static struct vrt_value_type _vrt_integer_value_type = { | |
vrt_integer_value_new, | |
vrt_integer_value_free | |
}; | |
static struct vrt_value_type *vrt_integer_value_type(void) | |
{ | |
return &_vrt_integer_value_type; | |
} | |
static int vrt_queue_threaded(struct vrt_queue *q, | |
struct vrt_queue_client *clients) | |
{ | |
size_t i, | |
client_count = 0; | |
pthread_t *tids; | |
struct vrt_producer *p; | |
struct vrt_consumer *c; | |
struct vrt_queue_client *client; | |
for (client = clients; client->run; client++) { | |
client_count++; | |
} | |
tids = cork_calloc(client_count, sizeof(*tids)); | |
for (i = 0; i < cork_array_size(&q->producers); i++) { | |
p = (struct vrt_producer *) cork_array_at(&q->producers, i); | |
p->yield = vrt_yield_strategy_threaded(); | |
} | |
for (i = 0; i < cork_array_size(&q->consumers); i++) { | |
c = (struct vrt_consumer *) cork_array_at(&q->consumers, i); | |
c->yield = vrt_yield_strategy_threaded(); | |
} | |
for (i = 0; i < client_count; i++) { | |
pthread_create(&tids[i], NULL, clients[i].run, clients[i].u); | |
} | |
for (i = 0; i < client_count; i++) { | |
pthread_join(tids[i], NULL); | |
} | |
cork_free(tids, client_count * sizeof(*tids)); | |
return 0; | |
} | |
void *generate_integers(void *u) | |
{ | |
int32_t i; | |
struct vrt_value *vv; | |
struct vrt_integer_value *iv; | |
struct integer_generator *c = (struct integer_generator *) u; | |
for (i = 0; i < c->count; i++) { | |
rpi_check(vrt_producer_claim(c->p, &vv)); | |
iv = cork_container_of(vv, struct vrt_integer_value, parent); | |
iv->value = i; | |
rpi_check(vrt_producer_publish(c->p)); | |
} | |
rpi_check(vrt_producer_eof(c->p)); | |
return NULL; | |
} | |
void *sum_integers(void *u) | |
{ | |
int rv; | |
int64_t sum = 0; | |
struct vrt_value *vv; | |
struct integer_summer *c = (struct integer_summer *) u; | |
struct vrt_integer_value *iv; | |
while ((rv = vrt_consumer_next(c->c, &vv)) != VRT_QUEUE_EOF) { | |
if (rv) continue; | |
iv = cork_container_of(vv, struct vrt_integer_value, parent); | |
sum += iv->value; | |
} | |
if (rv == VRT_QUEUE_EOF) { | |
*c->sum = sum; | |
} | |
return NULL; | |
} | |
int main(int argc, char **argv) | |
{ | |
int64_t r1, | |
r2; | |
struct vrt_queue *q; | |
struct vrt_producer *p; | |
struct vrt_consumer *c1, | |
*c2; | |
rip_check(q = vrt_queue_new("qsum", vrt_integer_value_type(), 64)); | |
rip_check(p = vrt_producer_new("generator", 1, q)); | |
rip_check(c1 = vrt_consumer_new("summer1", q)); | |
rip_check(c2 = vrt_consumer_new("summer2", q)); | |
struct integer_generator igen = {p, 1000000}; | |
struct integer_summer isum1 = {c1, &r1}; | |
struct integer_summer isum2 = {c2, &r2}; | |
struct vrt_queue_client clients[] = { | |
{sum_integers, &isum1}, | |
{sum_integers, &isum2}, | |
{generate_integers, &igen}, | |
{NULL, NULL} | |
}; | |
rii_check(vrt_queue_threaded(q, clients)); | |
fprintf(stdout, "Results: %lu %lu\n", r1, r2); | |
vrt_queue_free(q); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment