Created
October 24, 2020 09:46
-
-
Save joocer/b91818bc810b1abbbe248303572e11eb to your computer and use it in GitHub Desktop.
Reads a set of blobs parallel to speed up reading. Blobs are read line by line, the usecase this was written for was jsonl files, but any record-per-line format where you don't care about the order of the lines should not have issues.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Threaded Google Cloud Storage Blob reader. | |
Reads a set of blobs parallel to speed up reading. Blobs are | |
read line by line, the usecase this was written for was jsonl | |
files, but any record-per-line format where you don't care | |
about the order of the lines should not have issues. | |
Limited performance testing reading a set of eight blobs, | |
4x 19Mb, 4x 5.4Mb in four threads ran in 20.1% of the time. | |
""" | |
import threading | |
import queue | |
import time | |
import sys | |
from google.cloud import storage | |
def generator_chunker(generator, chunk_size): | |
chunk = [] | |
for item in generator: | |
if len(chunk) >= chunk_size: | |
yield chunk | |
chunk = [item] | |
else: | |
chunk.append(item) | |
if chunk: | |
yield chunk | |
def get_blob(project, bucket, blob_name): | |
client = storage.Client(project=project) | |
bucket = client.get_bucket(bucket) | |
blob = bucket.get_blob(blob_name) | |
return blob | |
def blob_reader(project, bucket, blob_name, chunk_size=1024*1024, delimiter='\n'): | |
""" | |
Reads lines from an arbitrarily long blob, line by line | |
Parameters: | |
project: GCP project | |
bucket: GCS bucket | |
blob: GCS blob | |
chunk_size: (optional) number of characters to read at a time (default = 1Mb) | |
delimiter: (optional) the record separator in the blob (default = new line) | |
Returns an generator of lines in the file | |
""" | |
blob = get_blob(project, bucket, blob_name) | |
blob_size = blob.size | |
carry_forward = '' | |
cursor = 0 | |
while (cursor < blob_size): | |
chunk = blob.download_as_string(start=cursor, end=min(blob_size, cursor+chunk_size-1)) | |
cursor = cursor + len(chunk) | |
chunk = chunk.decode('utf-8') | |
# add the last line from the previous cycle | |
chunk = carry_forward + chunk | |
lines = chunk.split(delimiter) | |
# the list line is likely to be incomplete, save it to carry forward | |
carry_forward = lines.pop() | |
yield from lines | |
if len(carry_forward) > 0: | |
yield carry_forward | |
def read_file(filename, chunk_size=1024*1024, delimiter='\n'): | |
with open(filename, 'r', encoding="utf8") as f: | |
carry_forward = '' | |
chunk = 'INITIALIZED' | |
while len(chunk) > 0: | |
chunk = f.read(chunk_size) | |
augmented_chunk = carry_forward + chunk | |
lines = augmented_chunk.split(delimiter) | |
carry_forward = lines.pop() | |
yield from lines | |
if carry_forward: | |
yield carry_forward | |
def get_project(): | |
""" | |
Get the current project | |
""" | |
import subprocess | |
result = subprocess.run(['gcloud', 'config', 'get-value', 'project'], stdout=subprocess.PIPE) | |
return result.stdout.decode('utf8').rstrip('\n') | |
def threaded_reader(project=None, bucket=None, blobs=[], max_threads=4): | |
""" | |
Speed up reading sets of files - such as multiple days worth | |
of log-per-day files. | |
If you care about the order of the records, don't use this. | |
Each file is in it's own thread, so reading a single file | |
wouldn't benefit from this approach. | |
""" | |
thread_pool = [] | |
def thread_process(): | |
""" | |
The process inside the threads. | |
1) Get any files off the file queue | |
2) Read the file in chunks - helps with parallelizing | |
3) Put the chunk onto a reply queue | |
""" | |
file = file_queue.get() # this will wait until there's a record | |
while file: | |
#file_reader = read_file(file) #testing on non-gcs environment | |
file_reader = blob_reader(project, bucket, file) | |
for chunk in generator_chunker(file_reader, 100): | |
reply_queue.put(chunk) | |
if file_queue.empty(): | |
sys.exit() | |
else: | |
file = file_queue.get() | |
# establish the queues | |
file_queue = queue.SimpleQueue() | |
reply_queue = queue.SimpleQueue() | |
# scale the number of threads, if we have more than the number | |
# of files we're reading, will have threads that never complete | |
t = len(file_arr) | |
if t > max_threads: | |
t = max_threads | |
# set a hard limit | |
if t > 8: | |
t = 8 | |
# start the threads | |
for _ in range(t): | |
thread = threading.Thread(target=thread_process) | |
thread.daemon = True | |
thread.start() | |
thread_pool.append(thread) | |
# put the files in the file queue | |
for blob in blobs: | |
file_queue.put_nowait(blob) | |
# when the threads are all complete and all the records | |
# have been read from the reply queue, we're done | |
while any([ t.is_alive() for t in thread_pool ]) or not(reply_queue.empty()): | |
records = reply_queue.get() | |
yield from records | |
""" | |
Example usage, define a set of blobs to read, then send them to | |
reader and read out the results, we're just counting them here | |
to prove we read every record in a fraction of the time it | |
would have been to do this serially. | |
""" | |
file_arr = ['nvdcve-1.1-2002/nvdcve-1.1-2002_[2020-07-31].json', | |
'nvdcve-1.1-2002/nvdcve-1.1-2002_[2020-08-01].json', | |
'nvdcve-1.1-2002/nvdcve-1.1-2002_[2020-08-02].json', | |
'nvdcve-1.1-2002/nvdcve-1.1-2002_[2020-09-11].json', | |
'nvdcve-1.1-2003/nvdcve-1.1-2003_[2020-07-31].json', | |
'nvdcve-1.1-2003/nvdcve-1.1-2003_[2020-08-01].json', | |
'nvdcve-1.1-2003/nvdcve-1.1-2003_[2020-08-02].json', | |
'nvdcve-1.1-2003/nvdcve-1.1-2003_[2020-09-11].json'] | |
st = time.perf_counter() | |
records = 0 | |
for r in threaded_reader(project=get_project(), bucket='01_landing', blobs=file_arr, max_threads=4): | |
records += 1 | |
print(f"I took {time.perf_counter() - st}, I read {records} records") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment