Skip to content

Instantly share code, notes, and snippets.

@daskol
Last active April 19, 2022 16:00
Show Gist options
  • Save daskol/a4ff011150acd4a0ba0d8d29717dfb3f to your computer and use it in GitHub Desktop.
Save daskol/a4ff011150acd4a0ba0d8d29717dfb3f to your computer and use it in GitHub Desktop.
Benchmark parallel reading of large files in JAX/FLAX
#!/usr/bin/env python
import numpy as np
from concurrent.futures import ThreadPoolExecutor
from hashlib import sha256
from os import system
from time import monotonic
from tqdm import tqdm
from tensorflow.io import gfile
def read_parallel():
with gfile.GFile('data.blob', 'rb') as fin:
buf_size = 128 << 20 # 128M buffer.
num_bufs = fin.size() / buf_size
result = bytearray(fin.size())
def read_chunk(i):
with gfile.GFile('data.blob', 'rb') as fin:
fin.seek(i * buf_size)
if buf := fin.read(buf_size):
result[i * buf_size:i * buf_size + len(buf)] = buf
pool_size = 32
pool = ThreadPoolExecutor(pool_size)
pool.map(read_chunk, range(int(num_bufs) + 1))
pool.shutdown(wait=False)
return result
elapsed = np.empty(10)
for i in tqdm(range(elapsed.size)):
system('sudo sh -c "sync; echo 3 > /proc/sys/vm/drop_caches"')
elapsed[i] = -monotonic()
read_parallel()
elapsed[i] += monotonic()
print(f'Py,sync-multiple-g.py,{elapsed.mean()},{elapsed.std()}')
#!/usr/bin/env python
import numpy as np
from concurrent.futures import ThreadPoolExecutor
from hashlib import sha256
from os import SEEK_END, system
from time import monotonic
from tqdm import tqdm
def read_parallel():
with open('data.blob', 'rb') as fin:
buf_size = 128 << 20 # 128M buffer.
fin.seek(0, SEEK_END)
num_bufs = fin.tell() / buf_size
result = bytearray(fin.tell())
def read_chunk(i):
with open('data.blob', 'rb') as fin:
fin.seek(i * buf_size)
if buf := fin.read(buf_size):
result[i * buf_size:i * buf_size + len(buf)] = buf
pool_size = 32
pool = ThreadPoolExecutor(pool_size)
pool.map(read_chunk, range(int(num_bufs) + 1))
pool.shutdown(wait=True)
return result
elapsed = np.empty(10)
for i in tqdm(range(elapsed.size)):
system('sudo sh -c "sync; echo 3 > /proc/sys/vm/drop_caches"')
elapsed[i] = -monotonic()
read_parallel()
elapsed[i] += monotonic()
print(f'Py,sync-multiple.py,{elapsed.mean()},{elapsed.std()}')
#include <chrono>
#include <cmath>
#include <iostream>
#include <memory>
#include <vector>
#include <fcntl.h>
#include <unistd.h>
constexpr auto buffer_size = 1024 * 1024 * 1024;
std::shared_ptr<char[]> buffer;
bool read_whole(void) {
bool status = true;
int fd = open("data.blob", O_RDONLY);
if (auto count = read(fd, buffer.get(), buffer_size); count == -1) {
std::cerr << "failed to read data\n";
status = false;
} else if (count != buffer_size) {
std::cerr << "failed to read whole data: " << count << '\n';
status = false;
}
close(fd);
return status;
}
auto mean(auto const &xs) {
double acc = 0;
int count = 0;
for (auto const x : xs) {
acc += x;
++count;
}
return acc / count;
}
auto var(auto const &xs) {
auto mid = mean(xs);
double acc = 0;
int count = 0;
for (auto const x : xs) {
acc += (x - mid) * (x - mid);
++count;
}
return std::sqrt(acc / count);
}
std::tuple<double, double> bench_read(void) {
std::vector<double> elapsed;
for (auto it = 0; it != 20; ++it) {
system("sudo sh -c 'sync; echo 3 > /proc/sys/vm/drop_caches'");
auto begin = std::chrono::steady_clock::now();
read_whole();
auto end = std::chrono::steady_clock::now();
std::chrono::duration<double> diff = end - begin;
elapsed.push_back(diff.count());
}
return {mean(elapsed), var(elapsed)};
}
int main() {
buffer.reset(new char[buffer_size]);
auto [mean, var] = bench_read();
std::cout << mean << "+" << var << '\n';
return 0;
}
#!/usr/bin/env python
import numpy as np
from os import system
from time import monotonic
from tqdm import tqdm
def read_single():
with open('data.blob', 'rb') as fin:
return fin.read()
elapsed = np.empty(20)
for i in tqdm(range(elapsed.size)):
system('sudo sh -c "sync; echo 3 > /proc/sys/vm/drop_caches"')
elapsed[i] = -monotonic()
read_single()
elapsed[i] += monotonic()
print(f'Py,sync-single.py,{elapsed.mean()},{elapsed.std()}')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment