Created
May 2, 2011 08:16
-
-
Save karthick18/951299 to your computer and use it in GitHub Desktop.
A parallel sort but currently using just 1 intermediate input and output files for merging sort results
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CC := gcc | |
CFLAGS := -Wall -g -std=c99 | |
LD_LIBS := -lpthread | |
ARCH := $(shell uname) | |
BLOCK_SIZE := 1m | |
ifeq ("$(strip $(ARCH))", "Linux") | |
LD_LIBS += -lrt | |
BLOCK_SIZE := 1M | |
endif | |
SRCS := prsort.c | |
OBJS := $(SRCS:%.c=%.o) | |
TARGET := prsort | |
SIZE := 4 | |
INPUT_FILE := random.dat | |
BUFFER_SIZE := 1024 | |
all: $(TARGET) | |
.PHONY: $(INPUT_FILE) create_file run | |
run: create_file | |
./$(TARGET) $(INPUT_FILE) $(BUFFER_SIZE) | |
create_file: $(INPUT_FILE) | |
dd if=/dev/urandom of=$< bs=$(BLOCK_SIZE) count=$(SIZE) | |
prsort: $(OBJS) | |
$(CC) -o $@ $^ $(LD_LIBS) | |
%.o: %.c | |
$(CC) -c $(CFLAGS) -o $@ $< | |
clean: | |
rm -f $(OBJS) $(TARGET) *~ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* A parallel sort currently using only 1 intermediate input and output file for merging sort results from | |
* multiple threads in each pass. | |
* The memory for the sort would be the input buffer size in KB multiplied by number of cpus. + a running buffer_size in KB | |
* | |
* To compile: | |
* gcc -o prsort prsort.c -Wall -g -std=c99 -pedantic -lpthread -lrt | |
* | |
* To run or test, create a 16 MB input file: | |
* dd if=/dev/urandom of=random.dat bs=1M count=16 | |
* | |
* ./prsort random.dat 2048 to run with 2 MB buffer size for each thread | |
*/ | |
#define _GNU_SOURCE | |
#include <stdio.h> | |
#include <stdlib.h> | |
#include <string.h> | |
#include <sys/stat.h> | |
#include <unistd.h> | |
#include <fcntl.h> | |
#include <assert.h> | |
#include <pthread.h> | |
#include <time.h> | |
#include <sys/time.h> | |
#include <sched.h> | |
#ifdef __linux__ | |
#define offset_t loff_t | |
#else | |
#define offset_t off_t | |
#endif | |
struct sort_run | |
{ | |
void *items; | |
int length; | |
int size; /* size of each */ | |
int buf_size; | |
int (*cmp)(const void *, const void *); | |
int input_fd; | |
int output_fd; | |
offset_t offset; | |
char input_file[40]; | |
char output_file[40]; | |
}; | |
#ifdef CLOCK_MONOTONIC | |
static __inline__ void __time(unsigned long long *start) | |
{ | |
struct timespec t; | |
clock_gettime(CLOCK_MONOTONIC, &t); | |
*start = (unsigned long long)t.tv_sec * 1000000LL + t.tv_nsec/1000; | |
} | |
#else | |
static __inline__ void __time(unsigned long long *start) | |
{ | |
struct timeval t; | |
gettimeofday(&t, NULL); | |
*start = (unsigned long long)t.tv_sec * 1000000LL + t.tv_usec; | |
} | |
#endif | |
static __inline__ void __swap(char *a, char *b, int s) | |
{ | |
char temp; | |
int i; | |
if(a == b) return; | |
for(i = 0; i < s; ++i) | |
{ | |
temp = a[i]; | |
a[i] = b[i]; | |
b[i] = temp; | |
} | |
} | |
static int int_cmp(const void *a, const void *b) | |
{ | |
return *(unsigned int*)a - *(unsigned int*)b; | |
} | |
static int __partition(void *base, int size, int left, int right, int pivot, | |
int (*cmp)(const void *, const void *)) | |
{ | |
int next_pivot = left; | |
int i; | |
char *pivot_ele = (char *)base + pivot*size; | |
char *right_ele = (char *)base + right*size; | |
/* | |
* swap the existing pivot to the end. | |
*/ | |
__swap(pivot_ele, right_ele, size); | |
for(i = left; i < right; ++i) | |
{ | |
char *cur_ele = (char*)base + i*size; | |
if(cmp(cur_ele, right_ele) <= 0 ) | |
{ | |
pivot_ele = (char*)base + size*next_pivot; | |
__swap(pivot_ele, cur_ele, size); | |
++next_pivot; | |
} | |
} | |
pivot_ele = (char*)base + next_pivot*size; | |
/* | |
* swap back the pivot to the right next_pivot index. | |
*/ | |
__swap(pivot_ele, right_ele, size); | |
return next_pivot; | |
} | |
static int __bubble_sort(void *base, int size, int left, int right, int (*cmp)(const void *, const void *)) | |
{ | |
int i; | |
int j; | |
for(i = left; i <= right; ++i) | |
{ | |
for(j = left; j <= right-1-(i-left); ++j) | |
{ | |
char *ele1 = (char *)base + size*j; | |
char *ele2 = (char *)base + size*(j+1); | |
if(cmp(ele1, ele2) > 0) | |
{ | |
__swap(ele1, ele2, size); | |
} | |
} | |
} | |
return 0; | |
} | |
static int do_qsort(void *base, int size, int left, int right, int (*cmp)(const void *, const void *)) | |
{ | |
if(right > left) | |
{ | |
if(right - left < 5) | |
return __bubble_sort(base, size, left, right, cmp); | |
int pivot = (left+right) >> 1; | |
int new_pivot = __partition(base, size, left, right, pivot, cmp); | |
if(new_pivot - left - 1 < right - new_pivot) | |
{ | |
do_qsort(base, size, left, new_pivot-1, cmp); | |
do_qsort(base, size, new_pivot, right, cmp); | |
} | |
else | |
{ | |
do_qsort(base, size, new_pivot, right, cmp); | |
do_qsort(base, size, left, new_pivot-1,cmp); | |
} | |
} | |
return 0; | |
} | |
static int __qsort(void *base, int nelem, int size, int (*cmp)(const void *, const void *)) | |
{ | |
int left = 0; | |
int right = nelem - 1; | |
return do_qsort(base, size, left, right, cmp); | |
} | |
static struct sort_run *g_runs; | |
static void *parallel_sort(void *arg) | |
{ | |
struct sort_run *run = arg; | |
int num_input = run->length/run->size; | |
int err = __qsort(run->items, num_input, run->size, run->cmp); | |
printf("Sorting done for run [%d] with [%d] items [%s]\n", | |
(int)(run - g_runs), (int) run->length/run->size, err ? "Unsuccessfully" : "Successfully"); | |
return (void *)(unsigned long)err; | |
} | |
static int verify_sort(struct sort_run *run) | |
{ | |
int err = 0; | |
int i; | |
void *items = run->items; | |
char *prev_ele = (char*)items; | |
int num_items = run->length/run->size; | |
for(i = 1; i < num_items; ++i) | |
{ | |
char *cur_ele = (char*)items + run->size*i; | |
if(run->cmp(cur_ele, prev_ele) < 0) | |
{ | |
err = 1; | |
fprintf(stderr, "Sort error for item [%p] at index [%d]\n", (void*)cur_ele, i); | |
assert(0); | |
} | |
prev_ele = cur_ele; | |
} | |
return err; | |
} | |
static int verify_sort_file(struct sort_run *run) | |
{ | |
char *buf ; | |
int bytes; | |
struct sort_run output_run = {.cmp = run->cmp, .size = run->size, .buf_size = run->buf_size }; | |
if(!run->output_file[0]) | |
return -1; | |
if(run->output_fd < 0) | |
{ | |
run->output_fd = open(run->output_file, O_RDONLY); | |
assert(run->output_fd >= 0); | |
} | |
else lseek(run->output_fd, 0, SEEK_SET); | |
buf = calloc(1, run->buf_size); | |
assert(buf != NULL); | |
output_run.items = buf; | |
int cur_run = 0; | |
while( cur_run++, (bytes = read(run->output_fd, buf, run->buf_size)) > 0 ) | |
{ | |
output_run.length = bytes; | |
printf("Verifying sort run [%d] for file [%s]...\n", cur_run, run->output_file); | |
verify_sort(&output_run); | |
} | |
free(buf); | |
return 0; | |
} | |
static struct sort_run *merge_runs(struct sort_run *runs, int num_runs, | |
struct sort_run **ret_output_run, int *total_runs) | |
{ | |
struct sort_run *output_run = *ret_output_run; | |
int run_item_size,buf_size; | |
int (*run_cmp)(const void *,const void *) = NULL; | |
int i; | |
int cur_runs = num_runs + (output_run ? 1 : 0); | |
int run_index_map[cur_runs]; | |
struct sort_run *run_array[cur_runs]; | |
int cur_output_index = 0, cur_run_index = 0; | |
static char input_file[] = "foo"; | |
static char output_file[] = "bar"; | |
char *output_buffer = NULL; | |
if(!output_run && num_runs <= 1) return runs; | |
if(!runs) return output_run; | |
run_cmp= runs->cmp; | |
run_item_size = runs->size; | |
buf_size = runs->buf_size; | |
if(output_run) | |
{ | |
run_index_map[cur_run_index] = 0; | |
run_array[cur_run_index++] = output_run; | |
} | |
for(i = cur_run_index; i < num_runs + cur_run_index; ++i) | |
{ | |
run_array[i] = runs + (i - cur_run_index); | |
run_index_map[i] = 0; | |
} | |
assert(i == cur_runs); | |
num_runs = cur_runs; | |
if(!output_run) | |
{ | |
*ret_output_run = output_run = calloc(1, sizeof(*output_run)); | |
assert(output_run != NULL); | |
output_run->size = run_item_size; | |
output_run->cmp = run_cmp; | |
output_run->buf_size = buf_size; | |
output_run->input_fd = -1; | |
output_run->output_file[0] = 0; | |
strncat(output_run->output_file, output_file, sizeof(output_run->output_file)-1); | |
output_run->input_file[0] = 0; | |
strncat(output_run->input_file, input_file, sizeof(output_run->input_file)-1); | |
} | |
else | |
{ | |
/* | |
* Swap input with output | |
*/ | |
char tmpfile[sizeof(output_run->input_file)]; | |
close(output_run->output_fd); | |
output_run->output_fd = -1; | |
tmpfile[0] = 0; | |
strncat(tmpfile, output_run->input_file, sizeof(tmpfile)-1); | |
output_run->input_file[0] = 0; | |
strncat(output_run->input_file, output_run->output_file, sizeof(output_run->input_file)-1); | |
output_run->output_file[0] = 0; | |
strncat(output_run->output_file, tmpfile, sizeof(output_run->output_file)-1); | |
output_run->input_fd = open(output_run->input_file, O_RDONLY); | |
assert(output_run->input_fd >= 0); | |
output_run->items = calloc(1, buf_size); | |
assert(output_run->items != NULL); | |
output_run->length = read(output_run->input_fd, output_run->items, buf_size); | |
if(output_run->length < 0) | |
output_run->length = 0; | |
} | |
output_run->output_fd = open(output_run->output_file, O_RDWR | O_CREAT | O_TRUNC, 0777); | |
assert(output_run->output_fd >= 0); | |
output_buffer = calloc(1, buf_size); | |
assert(output_buffer != NULL); | |
while(cur_runs > 0) | |
{ | |
int min_run = 0; | |
void *min_item = NULL; | |
for(i = 0; i < num_runs; ++i) | |
{ | |
if(run_index_map[i] >= run_array[i]->length/run_item_size) | |
continue; | |
char *cur_item = (char*)run_array[i]->items + run_item_size*run_index_map[i]; | |
if(!min_item || run_cmp(cur_item, min_item) < 0) | |
{ | |
min_item = cur_item; | |
min_run = i; | |
} | |
} | |
run_index_map[min_run]++; | |
char *tgt_item = output_buffer + cur_output_index*run_item_size; | |
memcpy(tgt_item, min_item, run_item_size); | |
++cur_output_index; | |
/* | |
* Flush output buffer if at the limit. | |
*/ | |
if(cur_output_index >= buf_size/run_item_size) | |
{ | |
write(output_run->output_fd, output_buffer, buf_size); | |
cur_output_index = 0; | |
} | |
/* | |
* Check for the limits. and re-read if required for the relevant run from the input file | |
*/ | |
if(run_index_map[min_run] >= run_array[min_run]->length/run_item_size) | |
{ | |
if(run_array[min_run]->input_fd < 0) | |
{ | |
--cur_runs; | |
} | |
else | |
{ | |
int length = read(run_array[min_run]->input_fd, run_array[min_run]->items, buf_size); | |
if(length <= 0) | |
{ | |
--cur_runs; | |
} | |
else | |
{ | |
run_array[min_run]->length = length; | |
run_index_map[min_run] = 0; | |
} | |
} | |
} | |
} | |
/* | |
* Final flush. | |
*/ | |
if(cur_output_index > 0) | |
{ | |
write(output_run->output_fd, output_buffer, cur_output_index*run_item_size); | |
fsync(output_run->output_fd); | |
} | |
if(output_run->input_fd >= 0) | |
{ | |
free(output_run->items); | |
output_run->items = NULL; | |
close(output_run->input_fd); | |
output_run->input_fd = -1; | |
} | |
free(output_buffer); | |
if(total_runs) *total_runs += num_runs; | |
return output_run; | |
} | |
#ifdef DEBUG | |
static void dump_result(struct sort_run *run) | |
{ | |
int i; | |
int nentries = run->length/run->size; | |
char *result = run->items; | |
for(i = 0; i < nentries; ++i) | |
{ | |
unsigned int *ele = (unsigned int*) (result + i*sizeof(int)); | |
printf("[%d] = [%u]\n", i, *ele); | |
} | |
} | |
#else | |
static __inline__ void dump_result(struct sort_run *run) { } | |
#endif | |
static int check_libc_qsort(int fd, size_t size) | |
{ | |
struct sort_run output_run = {0}; | |
char *buf = calloc(1, size); | |
offset_t offset = 0; | |
assert(buf); | |
int bytes = 0; | |
lseek(fd, 0, SEEK_SET); | |
while( (bytes = read(fd, buf + offset, size - offset) ) > 0 ) | |
offset += bytes; | |
output_run.length = size; | |
output_run.items = buf; | |
output_run.size = sizeof(int); | |
output_run.cmp = int_cmp; | |
unsigned long long start = 0, end = 0; | |
__time(&start); | |
qsort(buf, size/sizeof(int), sizeof(int), int_cmp); | |
__time(&end); | |
verify_sort(&output_run); | |
printf("LIBC qsort time for [%ld] items = [%lld] usecs\n", size/sizeof(int), end - start); | |
free(buf); | |
return 0; | |
} | |
#ifdef __linux__ | |
static int get_cpu_count(void) | |
{ | |
cpu_set_t set; | |
sched_getaffinity(0, sizeof(set), &set); | |
return CPU_COUNT(&set); | |
} | |
#else | |
static int get_cpu_count(void) | |
{ | |
return 4; | |
} | |
#endif | |
int main(int argc, char **argv) | |
{ | |
int num_threads; | |
pthread_t *tids; | |
char filename[0xff+1]; | |
int buf_size; | |
char *endp = NULL; | |
int fd; | |
struct stat statbuf; | |
size_t size; | |
int i; | |
if(argc != 3) | |
{ | |
fprintf(stderr, "Insufficient args...\n"); | |
fprintf(stderr, "%s filename buf_size_in_kb\n", argv[0]); | |
exit(127); | |
} | |
filename[0] = 0; | |
strncat(filename, argv[1], sizeof(filename)-1); | |
buf_size = strtol(argv[2], &endp, 10); | |
if(*endp) | |
{ | |
fprintf(stderr, "Invalid format for buf size [%s]\n", argv[2]); | |
exit(127); | |
} | |
buf_size <<= 10;/*in kb*/ | |
fd = open(filename, O_RDONLY); | |
if(fd < 0) | |
{ | |
perror("open:"); | |
exit(127); | |
} | |
if(fstat(fd, &statbuf)) | |
{ | |
perror("fstat:"); | |
exit(127); | |
} | |
size = statbuf.st_size; | |
if(buf_size > size) | |
buf_size = size; | |
/* | |
* Allocate threads based on the current cpu count for real parallelization. | |
*/ | |
num_threads = get_cpu_count(); | |
assert(num_threads >= 1); | |
struct sort_run *runs = calloc(num_threads, sizeof(*runs)); | |
assert(runs != NULL); | |
g_runs = runs; | |
tids = calloc(num_threads, sizeof(*tids)); | |
assert(tids != NULL); | |
unsigned long long start = 0, end = 0; | |
struct sort_run *output_run = NULL; | |
__time(&start); | |
while(size > 0) | |
{ | |
for(i = 0; i < num_threads && size > 0; ++i) | |
{ | |
int length = buf_size; | |
if(size < buf_size) length = size; | |
size -= length; | |
if(length != runs[i].length) | |
{ | |
if(runs[i].items) free(runs[i].items); | |
runs[i].length = length; | |
runs[i].items = calloc(1, length); | |
} | |
assert(runs[i].items != NULL); | |
runs[i].input_fd = runs[i].output_fd = -1; | |
runs[i].size = sizeof(int); | |
runs[i].buf_size = buf_size; | |
runs[i].cmp = int_cmp; | |
if(read(fd, runs[i].items, length) != length) | |
{ | |
perror("read:"); | |
goto out; | |
} | |
} | |
/* | |
* Start all the threads at once. with the sort load. | |
*/ | |
if(i > 1) | |
{ | |
for(int j = 0; j < i; ++j) | |
{ | |
int err = pthread_create(tids + j, NULL, parallel_sort, runs+j); | |
assert(err == 0); | |
} | |
for(int j = 0; j < i; ++j) | |
pthread_join(tids[j], NULL); | |
} | |
else | |
{ | |
parallel_sort(runs); | |
} | |
output_run = merge_runs(runs, i, &output_run, NULL); | |
} | |
__time(&end); | |
printf("Time taken to sort [%ld] items = [%lld] usecs\n", (long int) (statbuf.st_size/sizeof(int)), end-start); | |
if(output_run == runs) runs = NULL; | |
out: | |
if(runs) | |
{ | |
for(i = 0; i < num_threads; ++i) | |
{ | |
if(runs[i].items) | |
{ | |
free(runs[i].items); | |
runs[i].items = NULL; | |
} | |
} | |
free(runs); | |
} | |
if(output_run) | |
{ | |
if(output_run->items) | |
{ | |
free(output_run->items); | |
output_run->items = NULL; | |
} | |
verify_sort_file(output_run); | |
if(output_run->output_fd >= 0) | |
close(output_run->output_fd); | |
free(output_run); | |
} | |
if(tids) free(tids); | |
//check_libc_qsort(fd, statbuf.st_size); | |
close(fd); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment