Last active
April 26, 2020 20:58
-
-
Save saethlin/953a43d8dc6dda365e39cfd88e530f5c to your computer and use it in GitHub Desktop.
Bypassing the library to read HDF5 datasets in parallel
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <stdio.h> | |
#include <stdlib.h> | |
#include <errno.h> | |
#include <string.h> | |
#include <sys/time.h> | |
#include <hdf5.h> | |
// The read syscall is allowed to return without reading all the requested bytes | |
// In my experience it'll only read ~2^31 bytes at a time so we NEED the retry logic | |
void retrying_read(char* dest, size_t count, FILE* stream) { | |
size_t bytes_read = 0; | |
while (bytes_read < count) { | |
ssize_t bytes_read_this_time = fread(dest + bytes_read, 1, count - bytes_read, stream); | |
if (ferror(stream)) { | |
perror("Unable to read from file"); | |
abort(); // The nuclear option. Reliably produces a core dump so we can fix the problem. | |
} | |
bytes_read += bytes_read_this_time; | |
} | |
} | |
long current_time_ms() { | |
struct timeval timecheck; | |
gettimeofday(&timecheck, NULL); | |
return (long)timecheck.tv_sec * 1000 + (long)timecheck.tv_usec / 1000; | |
} | |
int main(int argc, char** argv) { | |
if (argc != 3) { | |
printf("Usage: ./read_hdf5_parallel file_path dataset_path\n"); | |
return -1; | |
} | |
const char* file_path = argv[1]; | |
const char* dset_path = argv[2]; | |
hid_t file = H5Fopen(file_path, H5F_ACC_RDONLY, H5P_DEFAULT); | |
hid_t dset = H5Dopen(file, dset_path, H5P_DEFAULT); | |
hid_t plist = H5Dget_create_plist(dset); | |
// We can only do this shortcut if there are no filters | |
// It's totally possible to do this on chunked datasets too but it's a bit harder | |
if ((H5Pget_nfilters(plist) == 0) && (H5Pget_layout(plist) == H5D_CONTIGUOUS)) { | |
hsize_t dset_size = H5Dget_storage_size(dset); | |
long start = current_time_ms(); | |
char* output = malloc(dset_size); | |
hsize_t dset_offset = H5Dget_offset(dset); | |
// You could set a chunk size or a number of chunks | |
// Probably best to profile on your setup and figure out what is best | |
size_t chunk_size = 2LL * 1024LL * 1024LL; | |
size_t chunks = dset_size / chunk_size; | |
size_t remainder = dset_size % chunk_size; | |
printf("Reading %.2f GB in %lu chunks\n", (double)dset_size/1024/1024/1024, chunks); | |
#pragma omp parallel // Must be in a parallel region to launch tasks | |
#pragma omp single nowait // But only one thread launches all the tasks | |
for (size_t chunk = 0; chunk < chunks; ++chunk) { | |
#pragma omp task | |
{ | |
FILE* stream = fopen(file_path, "r"); | |
if (stream == NULL) { | |
perror("Unable to open file in thread"); | |
abort(); | |
} | |
if(fseek(stream, dset_offset + (chunk*chunk_size), SEEK_SET) != 0) { | |
perror("Seek failed"); | |
abort(); | |
} | |
if (chunk == chunks-1) { // On the last chunk, also read the remainder | |
retrying_read(output + (chunk*chunk_size), chunk_size + remainder, stream); | |
} else { | |
retrying_read(output + (chunk*chunk_size), chunk_size, stream); | |
} | |
fclose(stream); | |
} | |
} | |
#pragma omp taskwait | |
long end = current_time_ms(); | |
long parallel_ms = end - start; | |
printf("%.3f seconds parallel\n", (double)(parallel_ms)/1000); | |
start = end; | |
// Equivalent HDF5 library call | |
char* h5_output = malloc(dset_size); | |
hid_t type = H5Dget_type(dset); | |
H5Dread(dset, type, H5S_ALL, H5S_ALL, H5P_DEFAULT, h5_output); | |
H5Tclose(type); | |
long serial_ms = current_time_ms() - start; | |
printf("%.3f seconds serial\n", (double)(serial_ms)/1000); | |
printf("Parallelism: %.1f\n", (double)serial_ms/parallel_ms); | |
// Compare and print if there's a difference | |
if (strncmp(output, h5_output, dset_size) != 0) { | |
printf("Mismatch!\n"); | |
} | |
free(h5_output); | |
free(output); | |
} | |
H5Dclose(dset); | |
H5Fclose(file); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment