Last active
September 27, 2022 17:19
-
-
Save dannyim/1262640b009c495bee498909665f770c to your computer and use it in GitHub Desktop.
bp_tar_stream
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
goal: try to stream the http response from a Spectra Logic BlackPearl through python's tarfile. | |
TODO: | |
- compute md5 checksum while extracting each member of the tarball, compare against existing checksums and retry a download if they don't match. | |
""" | |
import concurrent.futures | |
import logging | |
import os | |
import pprint | |
import tarfile | |
import time | |
from typing import Dict, Set | |
from ds3 import ds3 | |
from ds3.ds3Helpers import Blob | |
# bucket = "scenes-anteas-06" | |
# object_keys = [ | |
# "WV02/2013/12/103001002A29AD00/WV02_20131230225228_103001002A29AD00_13DEC30225228-M1BS-500106813060_01_P001.tar", | |
# "WV02/2013/12/103001002A6D9A00/WV02_20131221031756_103001002A6D9A00_13DEC21031756-M1BS-500106808150_01_P001.tar", | |
# "WV02/2013/12/103001002A6D9A00/WV02_20131221031756_103001002A6D9A00_13DEC21031756-M1BS-500106808150_01_P002.tar", | |
# "WV02/2013/12/10300100296E9900/WV02_20131216012331_10300100296E9900_13DEC16012331-P1BS-500216153070_01_P010.tar", | |
# "WV02/2013/12/10300100296E9900/WV02_20131216012330_10300100296E9900_13DEC16012330-P1BS-500216153070_01_P009.tar", | |
# "WV02/2013/12/10300100296E9900/WV02_20131216012329_10300100296E9900_13DEC16012329-P1BS-500216153070_01_P008.tar", | |
# "WV02/2013/12/10300100296E9900/WV02_20131216012328_10300100296E9900_13DEC16012328-P1BS-500216153070_01_P007.tar", | |
# "WV02/2013/12/10300100296E9900/WV02_20131216012327_10300100296E9900_13DEC16012327-P1BS-500216153070_01_P006.tar", | |
# "WV02/2013/12/10300100296E9900/WV02_20131216012326_10300100296E9900_13DEC16012326-P1BS-500216153070_01_P005.tar", | |
# "WV02/2013/12/10300100296E9900/WV02_20131216012330_10300100296E9900_13DEC16012330-M1BS-500216153070_01_P009.tar", | |
# "WV02/2013/12/10300100296E9900/WV02_20131216012331_10300100296E9900_13DEC16012331-M1BS-500216153070_01_P010.tar", | |
# ] | |
# this object is > 64GB and will be split into multiple parts | |
bucket = "dems" | |
object_keys = [ | |
"setsm/scene/WV03/2017/01/WV03_20170120_1040010028905E00_1040010027193900_50cm_v040000.tar" | |
] | |
class SpectraObject(object): | |
def __init__(self, bucket: str, name: str, initial_blob): | |
self.bucket = bucket | |
self.name = name | |
self.blobs = [initial_blob] | |
self.current_offset = 0 | |
r, w = os.pipe() | |
self.pipe_reader = r | |
self.pipe_writer = w | |
def __eq__(self, other): | |
return (self.bucket == other.bucket and self.name == other.name) | |
def __hash__(self): | |
return hash((self.name, self.bucket)) | |
def get_blobs(self): | |
return self.blobs | |
def add_blob(self, blob): | |
self.blobs.append(blob) | |
def set_offset(self, offset): | |
self.current_offset = offset | |
def get_pipe_reader(self): | |
return self.pipe_reader | |
def get_pipe_writer(self): | |
return self.pipe_writer | |
def main(): | |
retry_delay = 60 | |
max_threads = 4 | |
output_dir = "output" | |
logging.basicConfig(level=logging.DEBUG) | |
logger = logging.getLogger("app") | |
client = ds3.createClientFromEnv() | |
objects = [ds3.Ds3GetObject(name=key) for key in object_keys] | |
# keeping chunks in order will make it such that the chunks hang out in cache for less time. Not ideal performance-wise but we need | |
# to process blobs sequentially in order to stream the data to tarfile | |
req = ds3.GetBulkJobSpectraS3Request(bucket, objects, chunk_client_processing_order_guarantee="IN_ORDER", priority="URGENT") | |
bulkGetResult = client.get_bulk_job_spectra_s3(req) | |
result = bulkGetResult.result | |
# pprint.pprint(result) | |
# import sys | |
# sys.exit(0) | |
# each item is a chunk. each chunk has a list of blobs (with offsets) Use this to track state of downloads | |
job_id = result['JobId'] | |
chunk_list = result['ObjectsList'] | |
s3object_map: Dict[str, SpectraObject] = dict() | |
# used to keep track of remaining blobs | |
blob_set: Set[Blob] = set() | |
for chunk in chunk_list: | |
blob_list = chunk['ObjectList'] | |
for blob in blob_list: | |
name = blob['Name'] | |
length: int = int(blob['Length']) | |
offset: int = int(blob['Offset']) | |
cur_blob = Blob(name=name, length=length, offset=offset) | |
if name in s3object_map: | |
existing_object = s3object_map[name] | |
existing_object.add_blob(cur_blob) | |
else: | |
s3object_map[name] = SpectraObject(bucket, name, cur_blob) | |
blob_set.add(cur_blob) | |
while len(blob_set) > 0: | |
# check job status | |
# get list of chunks ready | |
req = ds3.GetJobChunksReadyForClientProcessingSpectraS3Request(job_id) | |
available_chunks = client.get_job_chunks_ready_for_client_processing_spectra_s3(req) | |
chunks = available_chunks.result['ObjectsList'] | |
if len(chunks) <= 0: | |
logger.debug(f"No chunks available, the BP has not read anything from tape into cache yet, waiting {retry_delay} seconds before retrying") | |
time.sleep(retry_delay) | |
continue | |
# retrieve all available blobs concurrently. For multipart objects, retrieve the blobs serially | |
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: | |
# For objects split into parts, we must be careful about the ordering in which the blobs are downloaded. | |
# Suppose a file is split into multiple blobs - they must not be downloaded concurrently since | |
# in order for us to stream the data to tar, we must ensure that the blobs are downloaded sequentially. | |
for chunk in chunks: | |
for blob in chunk['ObjectList']: | |
name: str = blob['Name'] | |
length: int = int(blob['Length']) | |
offset: int = int(blob['Offset']) | |
cur_blob = Blob(name=name, length=length, offset=offset) | |
if cur_blob in blob_set: | |
# if the blob is part of a larger object, hold off on downloading this blob until all previous blobs have completed downloading. | |
s3object = s3object_map[cur_blob.name] | |
if offset == s3object.current_offset: | |
logger.debug(f"offset matches the current offset of {s3object.current_offset}, proceeding to download") | |
# okay to download | |
executor.submit(stream_download, client, bucket, s3object, offset, length, job_id) | |
executor.submit(extract, s3object, output_dir) | |
blob_set.remove(cur_blob) | |
else: | |
logger.debug(f"blob for object {name} at offset {offset} is ready for download, but we must wait for the preceding blobs of the object to finish downloading first") | |
# should we wait longer here? | |
time.sleep(retry_delay) | |
continue | |
executor.shutdown(wait=True) | |
def stream_download(client, bucket, s3object, offset, length, job_id): | |
w = s3object.get_pipe_writer() | |
stream = open(w, 'wb') | |
print(f"downloading {s3object.name} at offset {offset} with length {length} bytes") | |
print(f"job id: {job_id}") | |
client.get_object(ds3.GetObjectRequest(bucket_name=bucket, | |
object_name=s3object.name, | |
stream=stream, | |
offset=offset, | |
job=job_id, | |
# PGC does not use versioning, we can ignore | |
version_id=None)) | |
print(f"setting offset to {length}. Previous offset: {offset}") | |
s3object.set_offset(length) | |
stream.close() | |
def extract(s3object, output_dir): | |
print(f"extracting members from tarball to {output_dir}") | |
r = s3object.get_pipe_reader() | |
tarball = tarfile.open(fileobj=open(r, 'rb'), mode="r|") | |
for member in tarball: | |
print(f"extracting {member}") | |
tarball.extract(member, output_dir) | |
print("done extracting") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment