Last active
June 1, 2016 20:33
-
-
Save saolsen/d28eafaf6b8e5f9fd6ff to your computer and use it in GitHub Desktop.
sort and print a giant list
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #include <stdlib.h> | |
| #include <stdio.h> | |
| #include <time.h> | |
| #include <pthread.h> | |
| #define TEST_CASES 1000000 | |
| #define TEST_RUNS 30 | |
| // Channels for passing data between threads. We pack a whole cache line of ints before | |
| // giving it to the consumer to get this to run as efficiently as possible and avoid | |
| // sharing | |
| typedef struct { | |
| int can_read_to; | |
| int can_write_to; | |
| int length; | |
| void* memptr; | |
| int* data; | |
| pthread_mutex_t mx; | |
| pthread_cond_t read_to_moved; | |
| pthread_cond_t write_to_moved; | |
| } Chan; | |
| // Job state for the worker threads | |
| typedef enum { | |
| JOB_BUFFER, // 4 of these | |
| JOB_MIDDLEMAN, // 3 of these | |
| JOB_PRINTER // 1 of these | |
| } Job; | |
| typedef struct { | |
| Job job; | |
| int id; | |
| union { | |
| struct {int* buffer_start; int buffer_count; Chan* out;} buffer; | |
| struct {Chan* left; Chan* right; Chan* out;} middleman; | |
| struct {Chan* in;} printer; | |
| }; | |
| pthread_mutex_t wait; | |
| pthread_cond_t go; | |
| } Task; | |
| void* | |
| task_worker(void* task_data) | |
| { | |
| Task* task = (Task*)task_data; | |
| pthread_mutex_lock(&task->wait); | |
| pthread_cond_wait(&task->go, &task->wait); | |
| pthread_mutex_unlock(&task->wait); | |
| char* task_name; | |
| switch(task->job) { | |
| case(JOB_BUFFER): { | |
| /* | |
| * Search buffer for the smallest element. | |
| * Put smallest element on q. | |
| * Continue until out of elements. | |
| */ | |
| task_name = "BUFFER"; | |
| fprintf(stderr, "Hello, I'm a buffer worker %i\n", task->id); | |
| Chan* out = task->buffer.out; | |
| int out_index = 0; | |
| int done = 0; | |
| for(;;) { | |
| // Find the smallest element. | |
| int smallest_index = -1; | |
| int smallest = RAND_MAX; | |
| for (int i=0; i<task->buffer.buffer_count; i++) { | |
| if (task->buffer.buffer_start[i] >= 0 && | |
| task->buffer.buffer_start[i] <= smallest) { | |
| smallest_index = i; | |
| smallest = task->buffer.buffer_start[i]; | |
| } | |
| } | |
| if (smallest_index == -1) { | |
| // pad -1's till next cache line so write marker moves. | |
| done = 1; | |
| do { | |
| out->data[out_index++] = -1; | |
| } while ((out_index % 16) != 0); | |
| } else { | |
| task->buffer.buffer_start[smallest_index] = -1; | |
| out->data[out_index++] = smallest; | |
| } | |
| if (out_index > out->length) { | |
| out_index = 0; | |
| } | |
| int next_line = out_index % 16 == 0; | |
| if (next_line) { | |
| pthread_mutex_lock(&out->mx); | |
| out->can_read_to = out_index; | |
| pthread_cond_signal(&out->read_to_moved); | |
| while ((out_index < out->can_write_to && out->can_write_to - out_index < 16) || | |
| (out->length - out_index + out->can_write_to < 16)) { | |
| // Wait until can_write_to gets updated. | |
| pthread_cond_wait(&out->write_to_moved, &out->mx); | |
| } | |
| pthread_mutex_unlock(&out->mx); | |
| } | |
| if (done) { | |
| break; | |
| } | |
| } | |
| } break; | |
| case(JOB_MIDDLEMAN): { | |
| /* | |
| * Pull 1 element from each input queue. | |
| * When you have two elements, put smallest value on output queue and pull new value. | |
| * Continue until both input queue's are empty. | |
| */ | |
| task_name = "MIDDLEMAN"; | |
| fprintf(stderr, "Hello, I'm a middleman worker %i\n", task->id); | |
| Chan* left = task->middleman.left; | |
| Chan* right = task->middleman.right; | |
| Chan* out = task->middleman.out; | |
| int left_drained = 0; | |
| int right_drained = 0; | |
| int left_element = -1; | |
| int right_element = -1; | |
| int left_index = 0; | |
| int right_index = 0; | |
| int out_index = 0; | |
| int need_new_left = 1; | |
| int need_new_right = 1; | |
| for (;;) { | |
| int out_element = -1; | |
| // pull needed elements | |
| if (need_new_left) { | |
| // pull new left element | |
| int new_line = left_index % 16 == 0; | |
| if (new_line) { | |
| pthread_mutex_lock(&left->mx); | |
| left->can_write_to = left_index; | |
| pthread_cond_signal(&left->write_to_moved); | |
| // wait until it's safe to read further | |
| while ((left_index <= left->can_read_to && | |
| left->can_read_to - left_index < 16) || | |
| (left->length - left_index + left->can_read_to < 16)) { | |
| // wait until can_read_to gets updated. | |
| fprintf(stderr, "middleman %i: block on left read\n", task->id); | |
| pthread_cond_wait(&left->read_to_moved, &left->mx); | |
| fprintf(stderr, "middleman %i: unblock on left read\n", task->id); | |
| } | |
| pthread_mutex_unlock(&left->mx); | |
| } | |
| left_element = left->data[left_index++]; | |
| if (left_index > left->length) { | |
| left_index = 0; | |
| } | |
| if (left_element == -1) { | |
| left_drained = 1; | |
| } | |
| } | |
| if (need_new_right) { | |
| // pull new right element | |
| int new_line = right_index % 16 == 0; | |
| if (new_line) { | |
| pthread_mutex_lock(&right->mx); | |
| right->can_write_to = right_index; | |
| pthread_cond_signal(&right->write_to_moved); | |
| // wait until it's safe to read further | |
| while ((right_index <= right->can_read_to && | |
| right->can_read_to - right_index < 16) || | |
| (right->length - right_index + right->can_read_to < 16)) { | |
| // wait until can_read_to gets updated. | |
| fprintf(stderr, "middleman %i: block on right read\n", task->id); | |
| pthread_cond_wait(&right->read_to_moved, &right->mx); | |
| fprintf(stderr, "middleman %i: unblock on right read\n", task->id); | |
| } | |
| pthread_mutex_unlock(&right->mx); | |
| } | |
| right_element = right->data[right_index++]; | |
| if (right_index > right->length) { | |
| right_index = 0; | |
| } | |
| if (right_element == -1) { | |
| right_drained = 1; | |
| } | |
| } | |
| // pick smallest element | |
| need_new_right = 0; | |
| need_new_left = 0; | |
| if (left_drained && right_drained) { | |
| out_element = -1; | |
| } else if (left_drained) { | |
| out_element = right_element; | |
| need_new_right = 1; | |
| } else if (right_drained) { | |
| out_element = left_element; | |
| need_new_left = 1; | |
| } else if (left_element < right_element) { | |
| out_element = left_element; | |
| need_new_left = 1; | |
| } else { | |
| out_element = right_element; | |
| need_new_right = 1; | |
| } | |
| if (out_element == -1) { | |
| do { | |
| out->data[out_index++] = -1; | |
| } while ((out_index % 16) != 0); | |
| } else { | |
| out->data[out_index++] = out_element; | |
| } | |
| if (out_index > out->length) { | |
| out_index = 0; | |
| } | |
| // if needed, set last cache line to be readable and wait for next line to be writable | |
| int next_line = out_index % 16 == 0; | |
| if (next_line) { | |
| pthread_mutex_lock(&out->mx); | |
| out->can_read_to = out_index; | |
| pthread_cond_signal(&out->read_to_moved); | |
| while ((out_index <= out->can_write_to && out->can_write_to - out_index < 16) || | |
| (out->length - out_index + out->can_write_to < 16)) { | |
| // Wait until can_write_to gets updated. | |
| fprintf(stderr, "middleman %i: block write\n", task->id); | |
| pthread_cond_wait(&out->write_to_moved, &out->mx); | |
| fprintf(stderr, "middleman %i: unblock write\n", task->id); | |
| } | |
| pthread_mutex_unlock(&out->mx); | |
| } | |
| if (out_element == -1) { | |
| fprintf(stderr, "middleman %i: DONE\n", task->id); | |
| break; | |
| } | |
| } | |
| } break; | |
| case(JOB_PRINTER): { | |
| /* | |
| * Pull elements from input queue and print them. | |
| */ | |
| task_name = "PRINTER"; | |
| fprintf(stderr, "Hello, I'm a printer worker %i\n", task->id); | |
| Chan* in = task->printer.in; | |
| int index = 0; | |
| for (;;) { | |
| int new_line = index % 16 == 0; | |
| if (new_line) { | |
| // fetch next line | |
| pthread_mutex_lock(&in->mx); | |
| in->can_write_to = index; | |
| pthread_cond_signal(&in->write_to_moved); | |
| while((index <= in->can_read_to && | |
| in->can_read_to - index < 16) || | |
| (in->length - index + in->can_read_to < 16)) { | |
| fprintf(stderr, "printer %i: block read\n", task->id); | |
| pthread_cond_wait(&in->read_to_moved, &in->mx); | |
| fprintf(stderr, "printer %i: unblock read\n", task->id); | |
| } | |
| pthread_mutex_unlock(&in->mx); | |
| } | |
| // print value | |
| int val = in->data[index++]; | |
| if (index > in->length) { | |
| index = 0; | |
| } | |
| if (val == -1) { | |
| break; | |
| } else { | |
| printf("%i\n", val); | |
| } | |
| } | |
| } break; | |
| }; | |
| fprintf(stderr, "%s %i: has finished\n", task_name, task->id); | |
| return NULL; | |
| } | |
| int | |
| main(void) | |
| { | |
| fprintf(stderr, "Starting Test\n"); | |
| srand(time(NULL)); | |
| int* num_buffer = malloc(sizeof(int) * TEST_CASES); | |
| int num_count = 0; | |
| // Set up threads | |
| Task tasks[8]; | |
| pthread_t threads[8]; | |
| Chan chans[7]; | |
| int i; | |
| for (i=0; i<8; i++) { | |
| Chan* c = chans + i; | |
| c->can_read_to = 0; | |
| c->can_write_to = 0; | |
| c->length = 16*3; | |
| // Align data to 64 bytes so the data buffer is exactly 3 cache lines. | |
| c->memptr = malloc(16*3*sizeof(int)+63); | |
| c->data = (int*)((long)c->memptr+63 & ~63); | |
| pthread_mutex_init(&c->mx, NULL); | |
| pthread_cond_init (&c->read_to_moved, NULL); | |
| pthread_cond_init (&c->write_to_moved, NULL); | |
| } | |
| Task* task; | |
| pthread_t* thread; | |
| // Buffer threads | |
| for (i=0;i<4;i++) { | |
| task = tasks + i; | |
| thread = threads + i; | |
| task->id = i; | |
| task->job = JOB_BUFFER; | |
| task->buffer.buffer_start = num_buffer + (i * TEST_CASES / 4); | |
| task->buffer.buffer_count = TEST_CASES / 4; | |
| task->buffer.out = chans + i; | |
| pthread_mutex_init(&task->wait, NULL); | |
| pthread_cond_init(&task->go, NULL); | |
| pthread_create(thread, NULL, task_worker, (void*)task); | |
| } | |
| // Middlemen | |
| task = tasks + i; | |
| thread = threads + i; | |
| task->id = i; | |
| task->job = JOB_MIDDLEMAN; | |
| task->middleman.left = &chans[0]; | |
| task->middleman.right = &chans[1]; | |
| task->middleman.out = &chans[4]; | |
| pthread_mutex_init(&task->wait, NULL); | |
| pthread_cond_init(&task->go, NULL); | |
| pthread_create(thread, NULL, task_worker, (void*)task); | |
| i++; | |
| task = tasks + i; | |
| thread = threads + i; | |
| task->job = JOB_MIDDLEMAN; | |
| task->id = i; | |
| task->middleman.left = &chans[2]; | |
| task->middleman.right = &chans[3]; | |
| task->middleman.out = &chans[5]; | |
| pthread_mutex_init(&task->wait, NULL); | |
| pthread_cond_init(&task->go, NULL); | |
| pthread_create(thread, NULL, task_worker, (void*)task); | |
| i++; | |
| task = tasks + i; | |
| thread = threads + i; | |
| task->job = JOB_MIDDLEMAN; | |
| task->id = i; | |
| task->middleman.left = &chans[4]; | |
| task->middleman.right = &chans[5]; | |
| task->middleman.out = &chans[6]; | |
| pthread_mutex_init(&task->wait, NULL); | |
| pthread_cond_init(&task->go, NULL); | |
| pthread_create(thread, NULL, task_worker, (void*)task); | |
| i++; | |
| // Printer | |
| task = tasks + i; | |
| thread = threads + i; | |
| task->id = i; | |
| task->job = JOB_PRINTER; | |
| task->printer.in = &chans[6]; | |
| pthread_create(thread, NULL, task_worker, (void*)task); | |
| // @TODO: I need a way to signal to all the threads that it's time to begin. | |
| // for (int run=0; run<TEST_RUNS; run++) { | |
| for (int run=0; run<1; run++) { | |
| // Random numbers in (0 to RAND_MAX) | |
| for (int i=0; i<TEST_CASES; i++) { | |
| num_buffer[num_count++] = rand(); | |
| } | |
| fprintf(stderr, "Running test case: %i\n", run); | |
| // Start threads | |
| for (int i=0; i<8; i++) { | |
| pthread_mutex_lock(&tasks[i].wait); | |
| pthread_cond_signal(&tasks[i].go); | |
| pthread_mutex_unlock(&tasks[i].wait); | |
| } | |
| // Wait for threads to complete. | |
| for (int i=0; i<8; i++) { | |
| pthread_join(threads[i], NULL); | |
| } | |
| num_count = 0; | |
| } | |
| fprintf(stderr, "Done\n"); | |
| } | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package main | |
| import ( | |
| "fmt" | |
| "math" | |
| "math/rand" | |
| ) | |
| type Buffer []int32 | |
| const ( | |
| test_cases = 100000 | |
| test_runs = 30 | |
| buf_length = 16 | |
| ) | |
| func buffer_worker(thread_id int, buffer []int32, out chan Buffer) { | |
| fmt.Println("buffer thread: %", thread_id) | |
| output_buffer := make([]int32, 0, buf_length) | |
| done := false | |
| for !done { | |
| smallest_element := int32(math.MaxInt32) | |
| smallest_element_index := -1 | |
| for index, element := range buffer { | |
| if element >= 0 && element <= smallest_element { | |
| smallest_element_index = index | |
| smallest_element = element | |
| } | |
| } | |
| done = false | |
| if smallest_element_index == -1 { | |
| // we are done, pad rest of output buffer with -1 and pass through channel | |
| for len(output_buffer) != buf_length { | |
| output_buffer = append(output_buffer, -1) | |
| } | |
| done = true | |
| } else { | |
| buffer[smallest_element_index] = -1 | |
| output_buffer = append(output_buffer, smallest_element) | |
| } | |
| if len(output_buffer) == buf_length { | |
| out <- output_buffer | |
| output_buffer = make([]int32, 0, buf_length) | |
| } | |
| } | |
| } | |
| func middleman_worker(thread_id int, left chan Buffer, right chan Buffer, out chan Buffer) { | |
| fmt.Println("middleman thread: %", thread_id) | |
| output_buffer := make([]int32, 0, buf_length) | |
| left_buffer := <-left | |
| left_index := 0 | |
| left_empty := false | |
| right_buffer := <-right | |
| right_index := 0 | |
| right_empty := false | |
| left_val := int32(-1) | |
| right_val := int32(-1) | |
| need_left := true | |
| need_right := true | |
| done := false | |
| for !done { | |
| if need_left { | |
| left_val = left_buffer[left_index] | |
| if left_val == -1 { | |
| left_empty = true | |
| } else if left_index+1 == len(left_buffer) { | |
| left_buffer = <-left | |
| left_index = 0 | |
| } else { | |
| left_index += 1 | |
| } | |
| } | |
| if need_right { | |
| right_val = right_buffer[right_index] | |
| if right_val == -1 { | |
| right_empty = true | |
| } else if right_index+1 == len(right_buffer) { | |
| right_buffer = <-right | |
| right_index = 0 | |
| } else { | |
| right_index += 1 | |
| } | |
| } | |
| need_left = false | |
| need_right = false | |
| var out_val int32 | |
| if left_empty && right_empty { | |
| out_val = -1 | |
| } else if left_empty { | |
| out_val = right_val | |
| need_right = true | |
| } else if right_empty { | |
| out_val = left_val | |
| need_left = true | |
| } else if left_val < right_val { | |
| out_val = left_val | |
| need_left = true | |
| } else { | |
| out_val = right_val | |
| need_right = true | |
| } | |
| if out_val == -1 { | |
| for len(output_buffer) != buf_length { | |
| output_buffer = append(output_buffer, -1) | |
| } | |
| done = true | |
| } else { | |
| output_buffer = append(output_buffer, out_val) | |
| } | |
| if len(output_buffer) == buf_length { | |
| out <- output_buffer | |
| output_buffer = make([]int32, 0, buf_length) | |
| } | |
| } | |
| } | |
| func main() { | |
| r := rand.New(rand.NewSource(1234)) | |
| // Generate random data. | |
| test_data := make([]int32, 0, test_cases) | |
| for i := 0; i < test_cases; i++ { | |
| test_data = append(test_data, int32(r.Int())) | |
| } | |
| c1 := make(chan Buffer, 3) | |
| c2 := make(chan Buffer, 3) | |
| c3 := make(chan Buffer, 3) | |
| c4 := make(chan Buffer, 3) | |
| c5 := make(chan Buffer, 3) | |
| c6 := make(chan Buffer, 3) | |
| c7 := make(chan Buffer, 3) | |
| i := 0 | |
| buffer_chans := []chan Buffer{c1, c2, c3, c4} | |
| for ; i < 4; i++ { | |
| start := i * test_cases / 4 | |
| end := start + test_cases/4 | |
| go buffer_worker(i, test_data[start:end], buffer_chans[i]) | |
| } | |
| i += 1 | |
| go middleman_worker(i, c1, c2, c5) | |
| i += 1 | |
| go middleman_worker(i, c3, c4, c6) | |
| i += 1 | |
| go middleman_worker(i, c5, c6, c7) | |
| done := false | |
| for !done { | |
| buf := <-c7 | |
| for _, e := range buf { | |
| if e == -1 { | |
| done = true | |
| break | |
| } | |
| fmt.Println(e) | |
| } | |
| } | |
| fmt.Println("Done") | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #include <stdlib.h> | |
| #include <stdio.h> | |
| #define TEST_CASES 1000000 | |
| #define TEST_RUNS 30 | |
| int | |
| compare(int* a, int* b) | |
| { | |
| return *a - *b; | |
| } | |
| int | |
| main(void) | |
| { | |
| fprintf(stderr, "Starting Test\n"); | |
| srand(time(NULL)); | |
| int* num_buffer = malloc(sizeof(int) * TEST_CASES); | |
| int num_count = 0; | |
| for (int run=0; run<TEST_RUNS; run++) { | |
| // Random numbers in (0 to RAND_MAX) | |
| for (int i=0; i<TEST_CASES; i++) { | |
| num_buffer[num_count++] = rand(); | |
| } | |
| fprintf(stderr, "Running test case: %i\n", run); | |
| // Serial Version | |
| // Sort them | |
| qsort(num_buffer, num_count, sizeof(int), compare); | |
| // Print them | |
| for (int i=0; i<num_count; i++) { | |
| printf("%i,", num_buffer[i]); | |
| } | |
| num_count = 0; | |
| } | |
| fprintf(stderr, "Done\n"); | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment