Skip to content

Instantly share code, notes, and snippets.

@saolsen
Last active June 1, 2016 20:33
Show Gist options
  • Save saolsen/d28eafaf6b8e5f9fd6ff to your computer and use it in GitHub Desktop.
Save saolsen/d28eafaf6b8e5f9fd6ff to your computer and use it in GitHub Desktop.
sort and print a giant list
#include <stdlib.h>
#include <stdio.h>
#include <time.h>
#include <pthread.h>
#define TEST_CASES 1000000
#define TEST_RUNS 30
// Channels for passing data between threads. We pack a whole cache line of ints before
// giving it to the consumer to get this to run as efficiently as possible and avoid
// sharing
typedef struct {
int can_read_to;
int can_write_to;
int length;
void* memptr;
int* data;
pthread_mutex_t mx;
pthread_cond_t read_to_moved;
pthread_cond_t write_to_moved;
} Chan;
// Job state for the worker threads
typedef enum {
JOB_BUFFER, // 4 of these
JOB_MIDDLEMAN, // 3 of these
JOB_PRINTER // 1 of these
} Job;
typedef struct {
Job job;
int id;
union {
struct {int* buffer_start; int buffer_count; Chan* out;} buffer;
struct {Chan* left; Chan* right; Chan* out;} middleman;
struct {Chan* in;} printer;
};
pthread_mutex_t wait;
pthread_cond_t go;
} Task;
void*
task_worker(void* task_data)
{
Task* task = (Task*)task_data;
pthread_mutex_lock(&task->wait);
pthread_cond_wait(&task->go, &task->wait);
pthread_mutex_unlock(&task->wait);
char* task_name;
switch(task->job) {
case(JOB_BUFFER): {
/*
* Search buffer for the smallest element.
* Put smallest element on q.
* Continue until out of elements.
*/
task_name = "BUFFER";
fprintf(stderr, "Hello, I'm a buffer worker %i\n", task->id);
Chan* out = task->buffer.out;
int out_index = 0;
int done = 0;
for(;;) {
// Find the smallest element.
int smallest_index = -1;
int smallest = RAND_MAX;
for (int i=0; i<task->buffer.buffer_count; i++) {
if (task->buffer.buffer_start[i] >= 0 &&
task->buffer.buffer_start[i] <= smallest) {
smallest_index = i;
smallest = task->buffer.buffer_start[i];
}
}
if (smallest_index == -1) {
// pad -1's till next cache line so write marker moves.
done = 1;
do {
out->data[out_index++] = -1;
} while ((out_index % 16) != 0);
} else {
task->buffer.buffer_start[smallest_index] = -1;
out->data[out_index++] = smallest;
}
if (out_index > out->length) {
out_index = 0;
}
int next_line = out_index % 16 == 0;
if (next_line) {
pthread_mutex_lock(&out->mx);
out->can_read_to = out_index;
pthread_cond_signal(&out->read_to_moved);
while ((out_index < out->can_write_to && out->can_write_to - out_index < 16) ||
(out->length - out_index + out->can_write_to < 16)) {
// Wait until can_write_to gets updated.
pthread_cond_wait(&out->write_to_moved, &out->mx);
}
pthread_mutex_unlock(&out->mx);
}
if (done) {
break;
}
}
} break;
case(JOB_MIDDLEMAN): {
/*
* Pull 1 element from each input queue.
* When you have two elements, put smallest value on output queue and pull new value.
* Continue until both input queue's are empty.
*/
task_name = "MIDDLEMAN";
fprintf(stderr, "Hello, I'm a middleman worker %i\n", task->id);
Chan* left = task->middleman.left;
Chan* right = task->middleman.right;
Chan* out = task->middleman.out;
int left_drained = 0;
int right_drained = 0;
int left_element = -1;
int right_element = -1;
int left_index = 0;
int right_index = 0;
int out_index = 0;
int need_new_left = 1;
int need_new_right = 1;
for (;;) {
int out_element = -1;
// pull needed elements
if (need_new_left) {
// pull new left element
int new_line = left_index % 16 == 0;
if (new_line) {
pthread_mutex_lock(&left->mx);
left->can_write_to = left_index;
pthread_cond_signal(&left->write_to_moved);
// wait until it's safe to read further
while ((left_index <= left->can_read_to &&
left->can_read_to - left_index < 16) ||
(left->length - left_index + left->can_read_to < 16)) {
// wait until can_read_to gets updated.
fprintf(stderr, "middleman %i: block on left read\n", task->id);
pthread_cond_wait(&left->read_to_moved, &left->mx);
fprintf(stderr, "middleman %i: unblock on left read\n", task->id);
}
pthread_mutex_unlock(&left->mx);
}
left_element = left->data[left_index++];
if (left_index > left->length) {
left_index = 0;
}
if (left_element == -1) {
left_drained = 1;
}
}
if (need_new_right) {
// pull new right element
int new_line = right_index % 16 == 0;
if (new_line) {
pthread_mutex_lock(&right->mx);
right->can_write_to = right_index;
pthread_cond_signal(&right->write_to_moved);
// wait until it's safe to read further
while ((right_index <= right->can_read_to &&
right->can_read_to - right_index < 16) ||
(right->length - right_index + right->can_read_to < 16)) {
// wait until can_read_to gets updated.
fprintf(stderr, "middleman %i: block on right read\n", task->id);
pthread_cond_wait(&right->read_to_moved, &right->mx);
fprintf(stderr, "middleman %i: unblock on right read\n", task->id);
}
pthread_mutex_unlock(&right->mx);
}
right_element = right->data[right_index++];
if (right_index > right->length) {
right_index = 0;
}
if (right_element == -1) {
right_drained = 1;
}
}
// pick smallest element
need_new_right = 0;
need_new_left = 0;
if (left_drained && right_drained) {
out_element = -1;
} else if (left_drained) {
out_element = right_element;
need_new_right = 1;
} else if (right_drained) {
out_element = left_element;
need_new_left = 1;
} else if (left_element < right_element) {
out_element = left_element;
need_new_left = 1;
} else {
out_element = right_element;
need_new_right = 1;
}
if (out_element == -1) {
do {
out->data[out_index++] = -1;
} while ((out_index % 16) != 0);
} else {
out->data[out_index++] = out_element;
}
if (out_index > out->length) {
out_index = 0;
}
// if needed, set last cache line to be readable and wait for next line to be writable
int next_line = out_index % 16 == 0;
if (next_line) {
pthread_mutex_lock(&out->mx);
out->can_read_to = out_index;
pthread_cond_signal(&out->read_to_moved);
while ((out_index <= out->can_write_to && out->can_write_to - out_index < 16) ||
(out->length - out_index + out->can_write_to < 16)) {
// Wait until can_write_to gets updated.
fprintf(stderr, "middleman %i: block write\n", task->id);
pthread_cond_wait(&out->write_to_moved, &out->mx);
fprintf(stderr, "middleman %i: unblock write\n", task->id);
}
pthread_mutex_unlock(&out->mx);
}
if (out_element == -1) {
fprintf(stderr, "middleman %i: DONE\n", task->id);
break;
}
}
} break;
case(JOB_PRINTER): {
/*
* Pull elements from input queue and print them.
*/
task_name = "PRINTER";
fprintf(stderr, "Hello, I'm a printer worker %i\n", task->id);
Chan* in = task->printer.in;
int index = 0;
for (;;) {
int new_line = index % 16 == 0;
if (new_line) {
// fetch next line
pthread_mutex_lock(&in->mx);
in->can_write_to = index;
pthread_cond_signal(&in->write_to_moved);
while((index <= in->can_read_to &&
in->can_read_to - index < 16) ||
(in->length - index + in->can_read_to < 16)) {
fprintf(stderr, "printer %i: block read\n", task->id);
pthread_cond_wait(&in->read_to_moved, &in->mx);
fprintf(stderr, "printer %i: unblock read\n", task->id);
}
pthread_mutex_unlock(&in->mx);
}
// print value
int val = in->data[index++];
if (index > in->length) {
index = 0;
}
if (val == -1) {
break;
} else {
printf("%i\n", val);
}
}
} break;
};
fprintf(stderr, "%s %i: has finished\n", task_name, task->id);
return NULL;
}
int
main(void)
{
fprintf(stderr, "Starting Test\n");
srand(time(NULL));
int* num_buffer = malloc(sizeof(int) * TEST_CASES);
int num_count = 0;
// Set up threads
Task tasks[8];
pthread_t threads[8];
Chan chans[7];
int i;
for (i=0; i<8; i++) {
Chan* c = chans + i;
c->can_read_to = 0;
c->can_write_to = 0;
c->length = 16*3;
// Align data to 64 bytes so the data buffer is exactly 3 cache lines.
c->memptr = malloc(16*3*sizeof(int)+63);
c->data = (int*)((long)c->memptr+63 & ~63);
pthread_mutex_init(&c->mx, NULL);
pthread_cond_init (&c->read_to_moved, NULL);
pthread_cond_init (&c->write_to_moved, NULL);
}
Task* task;
pthread_t* thread;
// Buffer threads
for (i=0;i<4;i++) {
task = tasks + i;
thread = threads + i;
task->id = i;
task->job = JOB_BUFFER;
task->buffer.buffer_start = num_buffer + (i * TEST_CASES / 4);
task->buffer.buffer_count = TEST_CASES / 4;
task->buffer.out = chans + i;
pthread_mutex_init(&task->wait, NULL);
pthread_cond_init(&task->go, NULL);
pthread_create(thread, NULL, task_worker, (void*)task);
}
// Middlemen
task = tasks + i;
thread = threads + i;
task->id = i;
task->job = JOB_MIDDLEMAN;
task->middleman.left = &chans[0];
task->middleman.right = &chans[1];
task->middleman.out = &chans[4];
pthread_mutex_init(&task->wait, NULL);
pthread_cond_init(&task->go, NULL);
pthread_create(thread, NULL, task_worker, (void*)task);
i++;
task = tasks + i;
thread = threads + i;
task->job = JOB_MIDDLEMAN;
task->id = i;
task->middleman.left = &chans[2];
task->middleman.right = &chans[3];
task->middleman.out = &chans[5];
pthread_mutex_init(&task->wait, NULL);
pthread_cond_init(&task->go, NULL);
pthread_create(thread, NULL, task_worker, (void*)task);
i++;
task = tasks + i;
thread = threads + i;
task->job = JOB_MIDDLEMAN;
task->id = i;
task->middleman.left = &chans[4];
task->middleman.right = &chans[5];
task->middleman.out = &chans[6];
pthread_mutex_init(&task->wait, NULL);
pthread_cond_init(&task->go, NULL);
pthread_create(thread, NULL, task_worker, (void*)task);
i++;
// Printer
task = tasks + i;
thread = threads + i;
task->id = i;
task->job = JOB_PRINTER;
task->printer.in = &chans[6];
pthread_create(thread, NULL, task_worker, (void*)task);
// @TODO: I need a way to signal to all the threads that it's time to begin.
// for (int run=0; run<TEST_RUNS; run++) {
for (int run=0; run<1; run++) {
// Random numbers in (0 to RAND_MAX)
for (int i=0; i<TEST_CASES; i++) {
num_buffer[num_count++] = rand();
}
fprintf(stderr, "Running test case: %i\n", run);
// Start threads
for (int i=0; i<8; i++) {
pthread_mutex_lock(&tasks[i].wait);
pthread_cond_signal(&tasks[i].go);
pthread_mutex_unlock(&tasks[i].wait);
}
// Wait for threads to complete.
for (int i=0; i<8; i++) {
pthread_join(threads[i], NULL);
}
num_count = 0;
}
fprintf(stderr, "Done\n");
}
package main
import (
"fmt"
"math"
"math/rand"
)
type Buffer []int32
const (
test_cases = 100000
test_runs = 30
buf_length = 16
)
func buffer_worker(thread_id int, buffer []int32, out chan Buffer) {
fmt.Println("buffer thread: %", thread_id)
output_buffer := make([]int32, 0, buf_length)
done := false
for !done {
smallest_element := int32(math.MaxInt32)
smallest_element_index := -1
for index, element := range buffer {
if element >= 0 && element <= smallest_element {
smallest_element_index = index
smallest_element = element
}
}
done = false
if smallest_element_index == -1 {
// we are done, pad rest of output buffer with -1 and pass through channel
for len(output_buffer) != buf_length {
output_buffer = append(output_buffer, -1)
}
done = true
} else {
buffer[smallest_element_index] = -1
output_buffer = append(output_buffer, smallest_element)
}
if len(output_buffer) == buf_length {
out <- output_buffer
output_buffer = make([]int32, 0, buf_length)
}
}
}
func middleman_worker(thread_id int, left chan Buffer, right chan Buffer, out chan Buffer) {
fmt.Println("middleman thread: %", thread_id)
output_buffer := make([]int32, 0, buf_length)
left_buffer := <-left
left_index := 0
left_empty := false
right_buffer := <-right
right_index := 0
right_empty := false
left_val := int32(-1)
right_val := int32(-1)
need_left := true
need_right := true
done := false
for !done {
if need_left {
left_val = left_buffer[left_index]
if left_val == -1 {
left_empty = true
} else if left_index+1 == len(left_buffer) {
left_buffer = <-left
left_index = 0
} else {
left_index += 1
}
}
if need_right {
right_val = right_buffer[right_index]
if right_val == -1 {
right_empty = true
} else if right_index+1 == len(right_buffer) {
right_buffer = <-right
right_index = 0
} else {
right_index += 1
}
}
need_left = false
need_right = false
var out_val int32
if left_empty && right_empty {
out_val = -1
} else if left_empty {
out_val = right_val
need_right = true
} else if right_empty {
out_val = left_val
need_left = true
} else if left_val < right_val {
out_val = left_val
need_left = true
} else {
out_val = right_val
need_right = true
}
if out_val == -1 {
for len(output_buffer) != buf_length {
output_buffer = append(output_buffer, -1)
}
done = true
} else {
output_buffer = append(output_buffer, out_val)
}
if len(output_buffer) == buf_length {
out <- output_buffer
output_buffer = make([]int32, 0, buf_length)
}
}
}
func main() {
r := rand.New(rand.NewSource(1234))
// Generate random data.
test_data := make([]int32, 0, test_cases)
for i := 0; i < test_cases; i++ {
test_data = append(test_data, int32(r.Int()))
}
c1 := make(chan Buffer, 3)
c2 := make(chan Buffer, 3)
c3 := make(chan Buffer, 3)
c4 := make(chan Buffer, 3)
c5 := make(chan Buffer, 3)
c6 := make(chan Buffer, 3)
c7 := make(chan Buffer, 3)
i := 0
buffer_chans := []chan Buffer{c1, c2, c3, c4}
for ; i < 4; i++ {
start := i * test_cases / 4
end := start + test_cases/4
go buffer_worker(i, test_data[start:end], buffer_chans[i])
}
i += 1
go middleman_worker(i, c1, c2, c5)
i += 1
go middleman_worker(i, c3, c4, c6)
i += 1
go middleman_worker(i, c5, c6, c7)
done := false
for !done {
buf := <-c7
for _, e := range buf {
if e == -1 {
done = true
break
}
fmt.Println(e)
}
}
fmt.Println("Done")
}
#include <stdlib.h>
#include <stdio.h>
#define TEST_CASES 1000000
#define TEST_RUNS 30
int
compare(int* a, int* b)
{
return *a - *b;
}
int
main(void)
{
fprintf(stderr, "Starting Test\n");
srand(time(NULL));
int* num_buffer = malloc(sizeof(int) * TEST_CASES);
int num_count = 0;
for (int run=0; run<TEST_RUNS; run++) {
// Random numbers in (0 to RAND_MAX)
for (int i=0; i<TEST_CASES; i++) {
num_buffer[num_count++] = rand();
}
fprintf(stderr, "Running test case: %i\n", run);
// Serial Version
// Sort them
qsort(num_buffer, num_count, sizeof(int), compare);
// Print them
for (int i=0; i<num_count; i++) {
printf("%i,", num_buffer[i]);
}
num_count = 0;
}
fprintf(stderr, "Done\n");
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment