-
-
Save vwood/702933 to your computer and use it in GitHub Desktop.
Channels with asynchronous support.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <semaphore.h> | |
#include <pthread.h> | |
#include <stdlib.h> | |
#include <stdio.h> | |
#include <string.h> | |
#include "channel.h" | |
struct channel_s { | |
sem_t empty_semaphore; | |
sem_t fill_semaphore; | |
pthread_mutex_t mutex; | |
char *buffer; | |
int buffer_length; | |
size_t object_size; | |
int insert_point; | |
int remove_point; | |
}; | |
void channel_init(channel_t **c, int buffer_length, size_t object_size) { | |
int error; | |
*c = malloc(sizeof **c); | |
if (*c == NULL) { | |
fprintf(stderr, "channel allocation error\n"); | |
return; | |
} | |
(*c)->buffer_length = buffer_length; | |
(*c)->object_size = object_size; | |
(*c)->buffer = malloc(object_size * buffer_length); | |
(*c)->insert_point = 0; | |
(*c)->remove_point = 0; | |
error = pthread_mutex_init(&c->mutex, NULL); | |
if (error != 0) | |
fprintf(stderr, "channel error:%d\n", error); | |
// Semaphores ensure there are items/space to read/write | |
sem_init(&(*c)->empty_semaphore, 0, buffer_length); | |
sem_init(&(*c)->fill_semaphore, 0, 0); | |
} | |
void channel_add(channel_t *c, void *item) { | |
int error; | |
sem_wait(&c->empty_semaphore); // Reserve an empty space | |
error = pthread_mutex_lock(&c->mutex); | |
if (error != 0) | |
fprintf(stderr, "channel error:%d\n", error); | |
memcpy(c->buffer + c->insert_point, (const void *) item, c->object_size); | |
c->insert_point = (c->insert_point + 1) % c->buffer_length; | |
error = pthread_mutex_unlock(&c->mutex); | |
if (error != 0) | |
fprintf(stderr, "channel error:%d\n", error); | |
sem_post(&c->fill_semaphore); // Increase number of items | |
} | |
/* Nonblocking, returns 1 on Success, 0 on Failure*/ | |
int channel_tryadd(channel_t *c, void *item) { | |
int error; | |
if (sem_trywait(&c->empty_semaphore) != 0) // Attempt to reserve an empty space | |
return 0; | |
error = pthread_mutex_lock(&c->mutex); | |
if (error != 0) | |
fprintf(stderr, "channel error:%d\n", error); | |
memcpy(c->buffer + c->insert_point, (char *) item, c->object_size); | |
c->insert_point = (c->insert_point + 1) % c->buffer_length; | |
error = pthread_mutex_unlock(&c->mutex); | |
if (error != 0) | |
fprintf(stderr, "channel error:%d\n", error); | |
sem_post(&c->fill_semaphore); // Increase number of items | |
return 1; | |
} | |
void channel_get(channel_t *c, void *space) { | |
int error; | |
sem_wait(&c->fill_semaphore); // Wait for an item | |
error = pthread_mutex_lock(&c->mutex); | |
if (error != 0) | |
fprintf(stderr, "channel error:%d\n", error); | |
memcpy(space, c->buffer + c->remove_point, c->object_size); | |
c->remove_point = (c->remove_point + 1) % c->buffer_length; | |
error = pthread_mutex_unlock(&c->mutex); | |
if (error != 0) | |
fprintf(stderr, "channel error:%d\n", error); | |
sem_post(&c->empty_semaphore); // Free the space | |
} | |
/* Nonblocking, Returns 1 on Success, 0 on Failure */ | |
int channel_tryget(channel_t *c, void *space) { | |
int error; | |
if (sem_trywait(&c->fill_semaphore) != 0) // Check for an item | |
return 0; | |
error = pthread_mutex_lock(&c->mutex); | |
if (error != 0) | |
fprintf(stderr, "channel error:%d\n", error); | |
memcpy(space, c->buffer + c->remove_point, c->object_size); | |
c->remove_point = (c->remove_point + 1) % c->buffer_length; | |
error = pthread_mutex_unlock(&c->mutex); | |
if (error != 0) | |
fprintf(stderr, "channel error:%d\n", error); | |
sem_post(&c->empty_semaphore); // Free the space | |
return 1; | |
} | |
/* | |
Items still in buffer are forfeit | |
May cause memory to leak if pointers are involved. | |
*/ | |
void channel_destroy(channel_t *c) { | |
int error; | |
free(c->buffer); | |
sem_destroy(&c->empty_semaphore); | |
sem_destroy(&c->fill_semaphore); | |
error = pthread_mutex_destroy(&c->mutex); | |
if (error != 0) | |
fprintf(stderr, "channel error:%d\n", error); | |
free(c); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#ifndef CHANNEL_H | |
#define CHANNEL_H | |
#include <stddef.h> | |
typedef struct channel_s channel_t; | |
void channel_init(channel_t **, int buffer_length, size_t object_size); | |
void channel_add(channel_t *, void *item); | |
int channel_tryadd(channel_t *, void *item); | |
void channel_get(channel_t *, void *space); | |
int channel_tryget(channel_t *, void *space); | |
void channel_destroy(channel_t *); | |
#endif /* CHANNEL_H */ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Asynchronous Channels