Last active
April 19, 2022 16:00
-
-
Save daskol/a4ff011150acd4a0ba0d8d29717dfb3f to your computer and use it in GitHub Desktop.
Benchmark parallel reading of large files in JAX/FLAX
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python | |
| import numpy as np | |
| from concurrent.futures import ThreadPoolExecutor | |
| from hashlib import sha256 | |
| from os import system | |
| from time import monotonic | |
| from tqdm import tqdm | |
| from tensorflow.io import gfile | |
| def read_parallel(): | |
| with gfile.GFile('data.blob', 'rb') as fin: | |
| buf_size = 128 << 20 # 128M buffer. | |
| num_bufs = fin.size() / buf_size | |
| result = bytearray(fin.size()) | |
| def read_chunk(i): | |
| with gfile.GFile('data.blob', 'rb') as fin: | |
| fin.seek(i * buf_size) | |
| if buf := fin.read(buf_size): | |
| result[i * buf_size:i * buf_size + len(buf)] = buf | |
| pool_size = 32 | |
| pool = ThreadPoolExecutor(pool_size) | |
| pool.map(read_chunk, range(int(num_bufs) + 1)) | |
| pool.shutdown(wait=False) | |
| return result | |
| elapsed = np.empty(10) | |
| for i in tqdm(range(elapsed.size)): | |
| system('sudo sh -c "sync; echo 3 > /proc/sys/vm/drop_caches"') | |
| elapsed[i] = -monotonic() | |
| read_parallel() | |
| elapsed[i] += monotonic() | |
| print(f'Py,sync-multiple-g.py,{elapsed.mean()},{elapsed.std()}') |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python | |
| import numpy as np | |
| from concurrent.futures import ThreadPoolExecutor | |
| from hashlib import sha256 | |
| from os import SEEK_END, system | |
| from time import monotonic | |
| from tqdm import tqdm | |
| def read_parallel(): | |
| with open('data.blob', 'rb') as fin: | |
| buf_size = 128 << 20 # 128M buffer. | |
| fin.seek(0, SEEK_END) | |
| num_bufs = fin.tell() / buf_size | |
| result = bytearray(fin.tell()) | |
| def read_chunk(i): | |
| with open('data.blob', 'rb') as fin: | |
| fin.seek(i * buf_size) | |
| if buf := fin.read(buf_size): | |
| result[i * buf_size:i * buf_size + len(buf)] = buf | |
| pool_size = 32 | |
| pool = ThreadPoolExecutor(pool_size) | |
| pool.map(read_chunk, range(int(num_bufs) + 1)) | |
| pool.shutdown(wait=True) | |
| return result | |
| elapsed = np.empty(10) | |
| for i in tqdm(range(elapsed.size)): | |
| system('sudo sh -c "sync; echo 3 > /proc/sys/vm/drop_caches"') | |
| elapsed[i] = -monotonic() | |
| read_parallel() | |
| elapsed[i] += monotonic() | |
| print(f'Py,sync-multiple.py,{elapsed.mean()},{elapsed.std()}') |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #include <chrono> | |
| #include <cmath> | |
| #include <iostream> | |
| #include <memory> | |
| #include <vector> | |
| #include <fcntl.h> | |
| #include <unistd.h> | |
| constexpr auto buffer_size = 1024 * 1024 * 1024; | |
| std::shared_ptr<char[]> buffer; | |
| bool read_whole(void) { | |
| bool status = true; | |
| int fd = open("data.blob", O_RDONLY); | |
| if (auto count = read(fd, buffer.get(), buffer_size); count == -1) { | |
| std::cerr << "failed to read data\n"; | |
| status = false; | |
| } else if (count != buffer_size) { | |
| std::cerr << "failed to read whole data: " << count << '\n'; | |
| status = false; | |
| } | |
| close(fd); | |
| return status; | |
| } | |
| auto mean(auto const &xs) { | |
| double acc = 0; | |
| int count = 0; | |
| for (auto const x : xs) { | |
| acc += x; | |
| ++count; | |
| } | |
| return acc / count; | |
| } | |
| auto var(auto const &xs) { | |
| auto mid = mean(xs); | |
| double acc = 0; | |
| int count = 0; | |
| for (auto const x : xs) { | |
| acc += (x - mid) * (x - mid); | |
| ++count; | |
| } | |
| return std::sqrt(acc / count); | |
| } | |
| std::tuple<double, double> bench_read(void) { | |
| std::vector<double> elapsed; | |
| for (auto it = 0; it != 20; ++it) { | |
| system("sudo sh -c 'sync; echo 3 > /proc/sys/vm/drop_caches'"); | |
| auto begin = std::chrono::steady_clock::now(); | |
| read_whole(); | |
| auto end = std::chrono::steady_clock::now(); | |
| std::chrono::duration<double> diff = end - begin; | |
| elapsed.push_back(diff.count()); | |
| } | |
| return {mean(elapsed), var(elapsed)}; | |
| } | |
| int main() { | |
| buffer.reset(new char[buffer_size]); | |
| auto [mean, var] = bench_read(); | |
| std::cout << mean << "+" << var << '\n'; | |
| return 0; | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python | |
| import numpy as np | |
| from os import system | |
| from time import monotonic | |
| from tqdm import tqdm | |
| def read_single(): | |
| with open('data.blob', 'rb') as fin: | |
| return fin.read() | |
| elapsed = np.empty(20) | |
| for i in tqdm(range(elapsed.size)): | |
| system('sudo sh -c "sync; echo 3 > /proc/sys/vm/drop_caches"') | |
| elapsed[i] = -monotonic() | |
| read_single() | |
| elapsed[i] += monotonic() | |
| print(f'Py,sync-single.py,{elapsed.mean()},{elapsed.std()}') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment