Last active
April 9, 2024 12:58
-
-
Save ranchodeluxe/42b9cbf9a241b528c9f2104490edbc31 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import pytest | |
import s3fs | |
import fsspec | |
import pickle | |
from multiprocessing import Process, Queue | |
fs = fsspec.filesystem('local') | |
@pytest.fixture | |
def byte_range_filepath(tmp_path): | |
output_filepath = os.path.join(str(tmp_path), 'test.txt') | |
with open(output_filepath, mode="w") as f: | |
for indx, i in enumerate(range(3), start=1): | |
# write exactly 10 bytes | |
f.write(f"{indx:02d}________") | |
return output_filepath | |
def process_task(serialized_fsock: fsspec.spec.AbstractBufferedFile, byte_range, queue): | |
open_fsock = pickle.loads(serialized_fsock) | |
open_fsock.seek(byte_range[0]) | |
data = open_fsock.read(byte_range[1] - byte_range[0]) | |
queue.put(data) | |
def test_open_file_handlers(byte_range_filepath): | |
byte_ranges = [(0, 10), (10, 20), (20, 30)] | |
open_fsock = fs.open(byte_range_filepath, mode='rb') | |
open_fsock.seek(30) | |
serialized_fsock = pickle.dumps(open_fsock) | |
queue = Queue() | |
jobs = [] | |
for byte_range in byte_ranges: | |
job = Process(target=process_task, args=(serialized_fsock, byte_range, queue)) | |
job.start() | |
jobs.append(job) | |
for job in jobs: | |
job.join() | |
while not queue.empty(): | |
assert queue.get() in [b'01________', b'02________', b'03________'] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
threaded version