Last active
December 4, 2020 09:19
-
-
Save Pangoraw/aa95181d532c1e86a33a7fd13f776408 to your computer and use it in GitHub Desktop.
A thread safe go channels implementation using pthreads.h
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include "channel.h" | |
queue_t *queue_new() { | |
queue_t *queue = malloc(sizeof(queue_t)); | |
queue_init(queue); | |
return queue; | |
} | |
void queue_init(queue_t *queue) { | |
queue->end = 0; | |
queue->start = 0; | |
queue->length = QUEUE_SIZE; | |
} | |
int queue_is_empty(queue_t *queue) { | |
return queue->end == queue->start; | |
} | |
void queue_push(queue_t *queue, int64_t value) { | |
queue->values[queue->end] = value; | |
queue->end = (queue->end + 1) % QUEUE_SIZE; | |
} | |
int64_t queue_pop(queue_t *queue) { | |
int64_t value = queue->values[queue->start]; | |
queue->start = (queue->start + 1) % QUEUE_SIZE; | |
return value; | |
} | |
channel_t *channel_new() { | |
channel_t *channel = malloc(sizeof(channel_t)); | |
channel_init(channel); | |
return channel; | |
} | |
void channel_init(channel_t *channel) { | |
pthread_cond_init(&channel->condvar, NULL); | |
pthread_mutex_init(&channel->mutex, NULL); | |
queue_init(&channel->queue); | |
} | |
void channel_free(channel_t *channel) { | |
if (channel == NULL) | |
return; | |
pthread_cond_destroy(&channel->condvar); | |
pthread_mutex_destroy(&channel->mutex); | |
free(channel); | |
} | |
void channel_send(channel_t *channel, int64_t value) { | |
if (pthread_mutex_lock(&channel->mutex) != 0) { | |
printf("error: locking mutex\n"); | |
} | |
queue_push(&channel->queue, value); | |
if (pthread_mutex_unlock(&channel->mutex) != 0) { | |
printf("error: unlocking mutex\n"); | |
} | |
pthread_cond_signal(&channel->condvar); | |
} | |
int64_t channel_receive(channel_t *channel) { | |
char had_to_wait = 0; | |
if (queue_is_empty(&channel->queue)) { | |
if (pthread_cond_wait(&channel->condvar, &channel->mutex) != 0) { | |
printf("error: while waiting for cond var\n"); | |
} | |
had_to_wait = 1; | |
} | |
if (had_to_wait != 1 && pthread_mutex_trylock(&channel->mutex) != 0) { | |
printf("error: locking mutex\n"); | |
} | |
int64_t value = queue_pop(&channel->queue); | |
if (pthread_mutex_unlock(&channel->mutex) != 0) { | |
printf("error: while releasing mutex\n"); | |
} | |
return value; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#ifndef _CHANNEL_H_ | |
#define _CHANNEL_H_ | |
#include <pthread.h> | |
#include <stdint.h> | |
#include <stdlib.h> | |
#include <stdio.h> | |
#define QUEUE_SIZE 10 | |
typedef struct queue { | |
size_t start; | |
size_t end; | |
uint32_t length; | |
int64_t values[QUEUE_SIZE]; | |
} queue_t; | |
queue_t *queue_new(); | |
void queue_init(queue_t *queue); | |
void queue_push(queue_t *queue, int64_t value); | |
int64_t queue_pop(queue_t *queue); | |
typedef struct channel { | |
pthread_cond_t condvar; | |
pthread_mutex_t mutex; | |
queue_t queue; | |
} channel_t; | |
channel_t *channel_new(); | |
void channel_init(channel_t *channel); | |
void channel_send(channel_t *channel, int64_t value); | |
int64_t channel_receive(channel_t *channel); | |
#endif // _CHANNEL_H_ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <stdint.h> | |
#include <stdio.h> | |
#include <stdlib.h> | |
#include <time.h> | |
#include "channel.h" | |
#define BENCH_SIZE 100000 | |
#define N_THREADS 4 | |
typedef struct instruction { | |
size_t start, end; | |
channel_t *channel; | |
int64_t *values; | |
} instruction_t; | |
extern void *count_routine(void *arg) { | |
instruction_t *insts = (instruction_t *)arg; | |
int64_t s = 0; | |
for (int i = insts->start; i < insts->end; ++i) { | |
s += insts->values[i]; | |
} | |
channel_send(insts->channel, s); | |
return NULL; | |
} | |
void multi_threaded() { | |
int64_t values[BENCH_SIZE]; | |
for (int i = 0; i < BENCH_SIZE; ++i) { | |
values[i] = i + 1; | |
} | |
channel_t channel; | |
channel_init(&channel); | |
size_t offset = BENCH_SIZE / N_THREADS; | |
instruction_t instructions[N_THREADS]; | |
pthread_t tid[N_THREADS]; | |
for (int i = 0; i < N_THREADS; ++i) { | |
instructions[i].channel = &channel; | |
instructions[i].start = i * offset; | |
instructions[i].end = (i + 1) * offset; | |
instructions[i].values = values; | |
pthread_create(tid + i, NULL, count_routine, instructions + i); | |
} | |
int64_t s = 0; | |
for (int i = 0; i < N_THREADS; ++i) { | |
s += channel_receive(&channel); | |
} | |
for (int i = 0; i < N_THREADS; ++i) { | |
pthread_join(tid[i], NULL); | |
} | |
printf("sum is %ld\n", s); | |
} | |
void single_threaded() { | |
int64_t values[BENCH_SIZE]; | |
for (int i = 0; i < BENCH_SIZE; ++i) { | |
values[i] = i + 1; | |
} | |
int64_t s = 0; | |
for (int i = 0; i < BENCH_SIZE; ++i) { | |
s += values[i]; | |
} | |
printf("sum is %ld\n", s); | |
} | |
void benchmark(void (*f)()) { | |
clock_t start, end; | |
double cpu_time; | |
start = clock(); | |
f(); | |
end = clock(); | |
cpu_time = ((double) (end - start)) / CLOCKS_PER_SEC; | |
printf("f took %f\n", cpu_time); | |
} | |
int main() { | |
benchmark(multi_threaded); | |
benchmark(single_threaded); | |
exit(EXIT_SUCCESS); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment