Skip to content

Instantly share code, notes, and snippets.

@joocer
Created October 24, 2020 09:46
Show Gist options
  • Save joocer/b91818bc810b1abbbe248303572e11eb to your computer and use it in GitHub Desktop.
Save joocer/b91818bc810b1abbbe248303572e11eb to your computer and use it in GitHub Desktop.
Reads a set of blobs parallel to speed up reading. Blobs are read line by line, the usecase this was written for was jsonl files, but any record-per-line format where you don't care about the order of the lines should not have issues.
"""
Threaded Google Cloud Storage Blob reader.
Reads a set of blobs parallel to speed up reading. Blobs are
read line by line, the usecase this was written for was jsonl
files, but any record-per-line format where you don't care
about the order of the lines should not have issues.
Limited performance testing reading a set of eight blobs,
4x 19Mb, 4x 5.4Mb in four threads ran in 20.1% of the time.
"""
import threading
import queue
import time
import sys
from google.cloud import storage
def generator_chunker(generator, chunk_size):
chunk = []
for item in generator:
if len(chunk) >= chunk_size:
yield chunk
chunk = [item]
else:
chunk.append(item)
if chunk:
yield chunk
def get_blob(project, bucket, blob_name):
client = storage.Client(project=project)
bucket = client.get_bucket(bucket)
blob = bucket.get_blob(blob_name)
return blob
def blob_reader(project, bucket, blob_name, chunk_size=1024*1024, delimiter='\n'):
"""
Reads lines from an arbitrarily long blob, line by line
Parameters:
project: GCP project
bucket: GCS bucket
blob: GCS blob
chunk_size: (optional) number of characters to read at a time (default = 1Mb)
delimiter: (optional) the record separator in the blob (default = new line)
Returns an generator of lines in the file
"""
blob = get_blob(project, bucket, blob_name)
blob_size = blob.size
carry_forward = ''
cursor = 0
while (cursor < blob_size):
chunk = blob.download_as_string(start=cursor, end=min(blob_size, cursor+chunk_size-1))
cursor = cursor + len(chunk)
chunk = chunk.decode('utf-8')
# add the last line from the previous cycle
chunk = carry_forward + chunk
lines = chunk.split(delimiter)
# the list line is likely to be incomplete, save it to carry forward
carry_forward = lines.pop()
yield from lines
if len(carry_forward) > 0:
yield carry_forward
def read_file(filename, chunk_size=1024*1024, delimiter='\n'):
with open(filename, 'r', encoding="utf8") as f:
carry_forward = ''
chunk = 'INITIALIZED'
while len(chunk) > 0:
chunk = f.read(chunk_size)
augmented_chunk = carry_forward + chunk
lines = augmented_chunk.split(delimiter)
carry_forward = lines.pop()
yield from lines
if carry_forward:
yield carry_forward
def get_project():
"""
Get the current project
"""
import subprocess
result = subprocess.run(['gcloud', 'config', 'get-value', 'project'], stdout=subprocess.PIPE)
return result.stdout.decode('utf8').rstrip('\n')
def threaded_reader(project=None, bucket=None, blobs=[], max_threads=4):
"""
Speed up reading sets of files - such as multiple days worth
of log-per-day files.
If you care about the order of the records, don't use this.
Each file is in it's own thread, so reading a single file
wouldn't benefit from this approach.
"""
thread_pool = []
def thread_process():
"""
The process inside the threads.
1) Get any files off the file queue
2) Read the file in chunks - helps with parallelizing
3) Put the chunk onto a reply queue
"""
file = file_queue.get() # this will wait until there's a record
while file:
#file_reader = read_file(file) #testing on non-gcs environment
file_reader = blob_reader(project, bucket, file)
for chunk in generator_chunker(file_reader, 100):
reply_queue.put(chunk)
if file_queue.empty():
sys.exit()
else:
file = file_queue.get()
# establish the queues
file_queue = queue.SimpleQueue()
reply_queue = queue.SimpleQueue()
# scale the number of threads, if we have more than the number
# of files we're reading, will have threads that never complete
t = len(file_arr)
if t > max_threads:
t = max_threads
# set a hard limit
if t > 8:
t = 8
# start the threads
for _ in range(t):
thread = threading.Thread(target=thread_process)
thread.daemon = True
thread.start()
thread_pool.append(thread)
# put the files in the file queue
for blob in blobs:
file_queue.put_nowait(blob)
# when the threads are all complete and all the records
# have been read from the reply queue, we're done
while any([ t.is_alive() for t in thread_pool ]) or not(reply_queue.empty()):
records = reply_queue.get()
yield from records
"""
Example usage, define a set of blobs to read, then send them to
reader and read out the results, we're just counting them here
to prove we read every record in a fraction of the time it
would have been to do this serially.
"""
file_arr = ['nvdcve-1.1-2002/nvdcve-1.1-2002_[2020-07-31].json',
'nvdcve-1.1-2002/nvdcve-1.1-2002_[2020-08-01].json',
'nvdcve-1.1-2002/nvdcve-1.1-2002_[2020-08-02].json',
'nvdcve-1.1-2002/nvdcve-1.1-2002_[2020-09-11].json',
'nvdcve-1.1-2003/nvdcve-1.1-2003_[2020-07-31].json',
'nvdcve-1.1-2003/nvdcve-1.1-2003_[2020-08-01].json',
'nvdcve-1.1-2003/nvdcve-1.1-2003_[2020-08-02].json',
'nvdcve-1.1-2003/nvdcve-1.1-2003_[2020-09-11].json']
st = time.perf_counter()
records = 0
for r in threaded_reader(project=get_project(), bucket='01_landing', blobs=file_arr, max_threads=4):
records += 1
print(f"I took {time.perf_counter() - st}, I read {records} records")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment