Skip to content

Instantly share code, notes, and snippets.

@saethlin
Last active April 26, 2020 20:58
Show Gist options
  • Save saethlin/953a43d8dc6dda365e39cfd88e530f5c to your computer and use it in GitHub Desktop.
Save saethlin/953a43d8dc6dda365e39cfd88e530f5c to your computer and use it in GitHub Desktop.
Bypassing the library to read HDF5 datasets in parallel
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <sys/time.h>
#include <hdf5.h>
// The read syscall is allowed to return without reading all the requested bytes
// In my experience it'll only read ~2^31 bytes at a time so we NEED the retry logic
void retrying_read(char* dest, size_t count, FILE* stream) {
size_t bytes_read = 0;
while (bytes_read < count) {
ssize_t bytes_read_this_time = fread(dest + bytes_read, 1, count - bytes_read, stream);
if (ferror(stream)) {
perror("Unable to read from file");
abort(); // The nuclear option. Reliably produces a core dump so we can fix the problem.
}
bytes_read += bytes_read_this_time;
}
}
long current_time_ms() {
struct timeval timecheck;
gettimeofday(&timecheck, NULL);
return (long)timecheck.tv_sec * 1000 + (long)timecheck.tv_usec / 1000;
}
int main(int argc, char** argv) {
if (argc != 3) {
printf("Usage: ./read_hdf5_parallel file_path dataset_path\n");
return -1;
}
const char* file_path = argv[1];
const char* dset_path = argv[2];
hid_t file = H5Fopen(file_path, H5F_ACC_RDONLY, H5P_DEFAULT);
hid_t dset = H5Dopen(file, dset_path, H5P_DEFAULT);
hid_t plist = H5Dget_create_plist(dset);
// We can only do this shortcut if there are no filters
// It's totally possible to do this on chunked datasets too but it's a bit harder
if ((H5Pget_nfilters(plist) == 0) && (H5Pget_layout(plist) == H5D_CONTIGUOUS)) {
hsize_t dset_size = H5Dget_storage_size(dset);
long start = current_time_ms();
char* output = malloc(dset_size);
hsize_t dset_offset = H5Dget_offset(dset);
// You could set a chunk size or a number of chunks
// Probably best to profile on your setup and figure out what is best
size_t chunk_size = 2LL * 1024LL * 1024LL;
size_t chunks = dset_size / chunk_size;
size_t remainder = dset_size % chunk_size;
printf("Reading %.2f GB in %lu chunks\n", (double)dset_size/1024/1024/1024, chunks);
#pragma omp parallel // Must be in a parallel region to launch tasks
#pragma omp single nowait // But only one thread launches all the tasks
for (size_t chunk = 0; chunk < chunks; ++chunk) {
#pragma omp task
{
FILE* stream = fopen(file_path, "r");
if (stream == NULL) {
perror("Unable to open file in thread");
abort();
}
if(fseek(stream, dset_offset + (chunk*chunk_size), SEEK_SET) != 0) {
perror("Seek failed");
abort();
}
if (chunk == chunks-1) { // On the last chunk, also read the remainder
retrying_read(output + (chunk*chunk_size), chunk_size + remainder, stream);
} else {
retrying_read(output + (chunk*chunk_size), chunk_size, stream);
}
fclose(stream);
}
}
#pragma omp taskwait
long end = current_time_ms();
long parallel_ms = end - start;
printf("%.3f seconds parallel\n", (double)(parallel_ms)/1000);
start = end;
// Equivalent HDF5 library call
char* h5_output = malloc(dset_size);
hid_t type = H5Dget_type(dset);
H5Dread(dset, type, H5S_ALL, H5S_ALL, H5P_DEFAULT, h5_output);
H5Tclose(type);
long serial_ms = current_time_ms() - start;
printf("%.3f seconds serial\n", (double)(serial_ms)/1000);
printf("Parallelism: %.1f\n", (double)serial_ms/parallel_ms);
// Compare and print if there's a difference
if (strncmp(output, h5_output, dset_size) != 0) {
printf("Mismatch!\n");
}
free(h5_output);
free(output);
}
H5Dclose(dset);
H5Fclose(file);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment