Created
October 6, 2020 12:01
-
-
Save shevron/5df67cdb846955295d1d53fe13816cde to your computer and use it in GitHub Desktop.
Demo of using Azure Blob Storage API for simple multipart upload flow
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Test Azure Multipart Uploads | |
""" | |
import base64 | |
import logging | |
import os | |
from itertools import count | |
from typing import BinaryIO, Generator, Tuple | |
import click | |
from azure.core.exceptions import ResourceNotFoundError | |
from azure.storage.blob import BlobServiceClient, BlobBlock, BlockState | |
_log = logging.getLogger(__name__) | |
def encode_id(id: int, id_length=16) -> str: | |
return base64.b64encode(id.to_bytes(id_length, 'big')).decode('ascii') | |
def get_file_size(open_file: BinaryIO) -> int: | |
"""Get the size of an open file | |
""" | |
pos = open_file.tell() | |
open_file.seek(0, os.SEEK_END) | |
size = open_file.tell() | |
open_file.seek(pos) | |
return size | |
def get_file_blocks(file_size: int, block_size: int) -> Generator[Tuple[int, int, int], None, None]: | |
"""Generate a list of blocks for an input file based on size | |
For each block, this will yield a tuple of enumerated block ID, the start position in the file and the block size | |
""" | |
current_pos = 0 | |
for block_id in count(): | |
remaining = file_size - current_pos | |
if remaining < block_size: | |
current_block_size = remaining | |
else: | |
current_block_size = block_size | |
yield block_id, current_pos, current_block_size | |
current_pos += block_size | |
if current_pos >= file_size: | |
break | |
def upload_to_azure(open_file: BinaryIO, container_name: str, file_name: str, block_size: int = 1024*1000, | |
noop: bool = False): | |
blob_svc = BlobServiceClient.from_connection_string(os.getenv('AZURE_CONNECTION_STRING')) | |
blob = blob_svc.get_blob_client(container=container_name, blob=file_name) | |
_log.info(f"Uploading {file_name} to {container_name} ...") | |
file_size = get_file_size(open_file) | |
# Check if file already exists | |
try: | |
size = blob.get_blob_properties().size | |
if file_size == size: | |
_log.info("File has already been uploaded to storage") | |
return | |
else: | |
_log.warning(f"File exists but has wrong size ({size} instead of {file_size} bytes, re-uploading all parts") | |
except ResourceNotFoundError: | |
_log.info("File does not exist or has not been committed, checking for uploaded blocks") | |
try: | |
committed_blocks, uncommitted_blocks = blob.get_block_list(block_list_type='all') | |
except ResourceNotFoundError: | |
_log.debug("Blob was not initialized and no blocks have been uploaded") | |
committed_blocks, uncommitted_blocks = None, [] | |
if committed_blocks: | |
raise RuntimeError("File exists and has some committed blocks; This is not expected") | |
# Let's calculate what blocks we *should* have | |
_log.debug("Current uncommitted uploaded blocks: %s", uncommitted_blocks) | |
skip_blocks = {b['id']: b['size'] for b in uncommitted_blocks} | |
_log.debug("Blocks to be skipped: %s", list(skip_blocks.keys())) | |
if noop: | |
_log.info("NOOP flag was passed, aborting here") | |
return | |
commit_blocks = [] | |
for b_id, b_start, b_size in get_file_blocks(get_file_size(open_file), block_size): | |
b_id = encode_id(b_id) | |
if b_id in skip_blocks: | |
if skip_blocks[b_id] == b_size: | |
_log.info("Block %s was already uploaded, skipping", b_id) | |
commit_blocks.append(BlobBlock(b_id, BlockState.Uncommitted)) | |
continue | |
else: | |
_log.warning("Block %s has wrong size, will re-upload", b_id) | |
_log.info("Uploading block %s of %d bytes", b_id, b_size) | |
open_file.seek(b_start) | |
block_data = open_file.read(b_size) | |
blob.stage_block(b_id, block_data, length=b_size) | |
commit_blocks.append(BlobBlock(b_id, BlockState.Uncommitted)) | |
blob.commit_block_list(commit_blocks) | |
@click.command('upload_to_azure') | |
@click.argument('source', type=click.Path(readable=True, allow_dash=True, dir_okay=False), required=True) | |
@click.argument('target', type=str, required=True) | |
@click.option('--block-size', '-s', type=int, default=1024*1000) | |
@click.option('--dont-upload', '-N', is_flag=True, help='Do not perform any uploads, just checks') | |
def main(source, target, block_size, dont_upload): | |
# Set up logging | |
logging.basicConfig(level=logging.DEBUG, format='%(asctime)-15s %(name)-15s %(levelname)s %(message)s') | |
logging.getLogger('azure.core').setLevel(logging.WARNING) | |
try: | |
container_name, file_name = target.split('/', maxsplit=1) | |
except ValueError: | |
raise click.UsageError('target must be of the form <container_name>/<blob_name>') | |
with open(source, 'rb') as f: | |
return upload_to_azure(f, container_name, file_name, block_size, noop=dont_upload) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment