Skip to content

Instantly share code, notes, and snippets.

@amosr
Last active December 2, 2015 03:44
Show Gist options
  • Save amosr/32abd98047c2dce53e71 to your computer and use it in GitHub Desktop.
Save amosr/32abd98047c2dce53e71 to your computer and use it in GitHub Desktop.
merging EAVTs
#include <fcntl.h>
#include <math.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/mman.h>
#include <sys/param.h>
#include <sys/stat.h>
#include <unistd.h>
#define MERGE_BUFFER_SIZE 16384
#define INLINE __attribute__((always_inline))
typedef struct input_s
{
int fd;
char buf[MERGE_BUFFER_SIZE];
/* 0 <= buf_end <= MERGE_BUFFER_SIZE */
uint64_t buf_end;
/* must be start of line */
char* entity_start;
char* entity_end;
char* date_start;
/* must be end of line */
char* date_end;
} input_t;
static void INLINE parse_input_line(input_t* input, char* start, char* newline)
{
if (!newline) {
/* done... */
input->buf_end = 0;
return;
}
input->entity_start = start;
char* entity_end = memchr(start, '|', (input->buf + input->buf_end) - input->entity_start);
input->entity_end = entity_end;
input->date_end = newline;
/*
char* date_start = memrchr(input->buf + input->date_end, '|', input->date_end - input->entity_end);
input->date_start = date_start - input->buf;
*/
for (char* date_start = input->date_end - sizeof("YYYY-MM-DD"); date_start != input->entity_end; --date_start) {
if (*date_start == '|') {
input->date_start = date_start + 1;
break;
}
}
}
static void INLINE read_input_line(input_t* input)
{
if (input->buf_end == 0)
{
input->buf_end = read(input->fd, input->buf, MERGE_BUFFER_SIZE);
char* newline = memchr(input->buf, '\n', input->buf_end);
input->entity_start = input->buf;
input->entity_end = input->buf;
input->date_start = input->buf;
input->date_end = input->buf;
parse_input_line(input, input->buf, newline);
return;
}
else
{
char* start = input->date_end + 1;
char* end = input->buf + input->buf_end;
uint64_t remain = end - start;
char* newline = memchr(start, '\n', remain);
if (!newline) {
memmove(input->buf, start, remain);
char* read_into = input->buf + remain;
uint64_t bytes_read = read(input->fd, read_into, MERGE_BUFFER_SIZE - remain);
input->buf_end = remain + bytes_read;
newline = memchr(input->buf, '\n', input->buf_end);
start = input->buf;
}
parse_input_line(input, start, newline);
}
}
typedef struct output_s
{
int fd;
char buf[MERGE_BUFFER_SIZE];
uint64_t buf_end;
} output_t;
static void INLINE write_output_line(output_t* out, char* start, char* end)
{
uint64_t len = end - start;
uint64_t left = out->buf_end + len;
if (left < MERGE_BUFFER_SIZE) {
memcpy(out->buf + out->buf_end, start, len);
out->buf_end = left;
} else {
write(out->fd, out->buf, out->buf_end);
memcpy(out->buf, start, len);
out->buf_end = len;
}
}
static void flush_output(output_t* out)
{
write(out->fd, out->buf, out->buf_end);
out->buf_end = 0;
}
static void INLINE write_from_input(input_t* input, output_t* out)
{
write_output_line(out, input->entity_start, input->date_end + 1);
read_input_line(input);
}
typedef struct merge_s
{
output_t output;
uint64_t num_inputs;
input_t* inputs;
} merge_t;
static void merge_init(merge_t* merge)
{
for (uint64_t i = 0; i != merge->num_inputs; ++i) {
read_input_line(&merge->inputs[i]);
}
}
static int INLINE compare2(char* buf1, char* end1, char* buf2, char* end2)
{
uint64_t len1 = end1 - buf1;
uint64_t len2 = end2 - buf1;
uint64_t min = (len1 < len2) ? len1 : len2;
int cmp = memcmp(buf1, buf2, min);
if (cmp == 0)
/* if buf1 is shorter, it is smaller */
return len1 - len2;
else
return cmp;
}
static uint64_t INLINE merge_line(merge_t* merge)
{
uint64_t any = 0;
uint64_t smallest = 0;
for (uint64_t i = 0; i != merge->num_inputs; ++i) {
if (merge->inputs[i].buf_end != 0) {
any++;
if (any == 1) {
smallest = i;
} else {
/* compare smallest with this one */
int cmp_ent = compare2( merge->inputs[i].entity_start, merge->inputs[i].entity_end
, merge->inputs[smallest].entity_start, merge->inputs[smallest].entity_end);
if (cmp_ent == 0) {
int cmp_dat = compare2( merge->inputs[i].date_start, merge->inputs[i].date_end
, merge->inputs[smallest].date_start, merge->inputs[smallest].date_end);
if (cmp_dat < 0) smallest = i;
}
else if (cmp_ent < 0) {
smallest = i;
}
}
}
}
if (any) {
write_from_input(&merge->inputs[smallest], &merge->output);
}
return any;
}
int main(int argc, char* argv[])
{
merge_t merge;
merge.output.fd = 1;
merge.output.buf_end = 0;
merge.num_inputs = argc - 1;
merge.inputs = calloc(sizeof(input_t), merge.num_inputs);
for (int i = 0; i != argc - 1; ++i) {
int fd = open(argv[i+1], O_RDONLY);
if (fd == -1) {
fprintf(stderr, "Can't open file %s", argv[i+1]);
exit(1);
}
merge.inputs[i].fd = fd;
}
merge_init(&merge);
while (merge_line(&merge));
flush_output(&merge.output);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment