Last active
December 2, 2015 03:44
-
-
Save amosr/32abd98047c2dce53e71 to your computer and use it in GitHub Desktop.
merging EAVTs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <fcntl.h> | |
#include <math.h> | |
#include <stdbool.h> | |
#include <stdint.h> | |
#include <stdio.h> | |
#include <stdlib.h> | |
#include <string.h> | |
#include <sys/mman.h> | |
#include <sys/param.h> | |
#include <sys/stat.h> | |
#include <unistd.h> | |
#define MERGE_BUFFER_SIZE 16384 | |
#define INLINE __attribute__((always_inline)) | |
typedef struct input_s | |
{ | |
int fd; | |
char buf[MERGE_BUFFER_SIZE]; | |
/* 0 <= buf_end <= MERGE_BUFFER_SIZE */ | |
uint64_t buf_end; | |
/* must be start of line */ | |
char* entity_start; | |
char* entity_end; | |
char* date_start; | |
/* must be end of line */ | |
char* date_end; | |
} input_t; | |
static void INLINE parse_input_line(input_t* input, char* start, char* newline) | |
{ | |
if (!newline) { | |
/* done... */ | |
input->buf_end = 0; | |
return; | |
} | |
input->entity_start = start; | |
char* entity_end = memchr(start, '|', (input->buf + input->buf_end) - input->entity_start); | |
input->entity_end = entity_end; | |
input->date_end = newline; | |
/* | |
char* date_start = memrchr(input->buf + input->date_end, '|', input->date_end - input->entity_end); | |
input->date_start = date_start - input->buf; | |
*/ | |
for (char* date_start = input->date_end - sizeof("YYYY-MM-DD"); date_start != input->entity_end; --date_start) { | |
if (*date_start == '|') { | |
input->date_start = date_start + 1; | |
break; | |
} | |
} | |
} | |
static void INLINE read_input_line(input_t* input) | |
{ | |
if (input->buf_end == 0) | |
{ | |
input->buf_end = read(input->fd, input->buf, MERGE_BUFFER_SIZE); | |
char* newline = memchr(input->buf, '\n', input->buf_end); | |
input->entity_start = input->buf; | |
input->entity_end = input->buf; | |
input->date_start = input->buf; | |
input->date_end = input->buf; | |
parse_input_line(input, input->buf, newline); | |
return; | |
} | |
else | |
{ | |
char* start = input->date_end + 1; | |
char* end = input->buf + input->buf_end; | |
uint64_t remain = end - start; | |
char* newline = memchr(start, '\n', remain); | |
if (!newline) { | |
memmove(input->buf, start, remain); | |
char* read_into = input->buf + remain; | |
uint64_t bytes_read = read(input->fd, read_into, MERGE_BUFFER_SIZE - remain); | |
input->buf_end = remain + bytes_read; | |
newline = memchr(input->buf, '\n', input->buf_end); | |
start = input->buf; | |
} | |
parse_input_line(input, start, newline); | |
} | |
} | |
typedef struct output_s | |
{ | |
int fd; | |
char buf[MERGE_BUFFER_SIZE]; | |
uint64_t buf_end; | |
} output_t; | |
static void INLINE write_output_line(output_t* out, char* start, char* end) | |
{ | |
uint64_t len = end - start; | |
uint64_t left = out->buf_end + len; | |
if (left < MERGE_BUFFER_SIZE) { | |
memcpy(out->buf + out->buf_end, start, len); | |
out->buf_end = left; | |
} else { | |
write(out->fd, out->buf, out->buf_end); | |
memcpy(out->buf, start, len); | |
out->buf_end = len; | |
} | |
} | |
static void flush_output(output_t* out) | |
{ | |
write(out->fd, out->buf, out->buf_end); | |
out->buf_end = 0; | |
} | |
static void INLINE write_from_input(input_t* input, output_t* out) | |
{ | |
write_output_line(out, input->entity_start, input->date_end + 1); | |
read_input_line(input); | |
} | |
typedef struct merge_s | |
{ | |
output_t output; | |
uint64_t num_inputs; | |
input_t* inputs; | |
} merge_t; | |
static void merge_init(merge_t* merge) | |
{ | |
for (uint64_t i = 0; i != merge->num_inputs; ++i) { | |
read_input_line(&merge->inputs[i]); | |
} | |
} | |
static int INLINE compare2(char* buf1, char* end1, char* buf2, char* end2) | |
{ | |
uint64_t len1 = end1 - buf1; | |
uint64_t len2 = end2 - buf1; | |
uint64_t min = (len1 < len2) ? len1 : len2; | |
int cmp = memcmp(buf1, buf2, min); | |
if (cmp == 0) | |
/* if buf1 is shorter, it is smaller */ | |
return len1 - len2; | |
else | |
return cmp; | |
} | |
static uint64_t INLINE merge_line(merge_t* merge) | |
{ | |
uint64_t any = 0; | |
uint64_t smallest = 0; | |
for (uint64_t i = 0; i != merge->num_inputs; ++i) { | |
if (merge->inputs[i].buf_end != 0) { | |
any++; | |
if (any == 1) { | |
smallest = i; | |
} else { | |
/* compare smallest with this one */ | |
int cmp_ent = compare2( merge->inputs[i].entity_start, merge->inputs[i].entity_end | |
, merge->inputs[smallest].entity_start, merge->inputs[smallest].entity_end); | |
if (cmp_ent == 0) { | |
int cmp_dat = compare2( merge->inputs[i].date_start, merge->inputs[i].date_end | |
, merge->inputs[smallest].date_start, merge->inputs[smallest].date_end); | |
if (cmp_dat < 0) smallest = i; | |
} | |
else if (cmp_ent < 0) { | |
smallest = i; | |
} | |
} | |
} | |
} | |
if (any) { | |
write_from_input(&merge->inputs[smallest], &merge->output); | |
} | |
return any; | |
} | |
int main(int argc, char* argv[]) | |
{ | |
merge_t merge; | |
merge.output.fd = 1; | |
merge.output.buf_end = 0; | |
merge.num_inputs = argc - 1; | |
merge.inputs = calloc(sizeof(input_t), merge.num_inputs); | |
for (int i = 0; i != argc - 1; ++i) { | |
int fd = open(argv[i+1], O_RDONLY); | |
if (fd == -1) { | |
fprintf(stderr, "Can't open file %s", argv[i+1]); | |
exit(1); | |
} | |
merge.inputs[i].fd = fd; | |
} | |
merge_init(&merge); | |
while (merge_line(&merge)); | |
flush_output(&merge.output); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment