Created
June 8, 2016 16:55
-
-
Save chishaku/bbadf169a32586d16104d6ef9eb9feb4 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import hashlib | |
from b2 import b2 | |
import requests | |
from bunch import bunchify | |
from logger import get_logger | |
class B2ApiModified(b2.B2Api): | |
"""An extension of the base B2Api maintained by backblaze.""" | |
def open_file_from_url(self, url, authorization=True, | |
headers_received_cb=None): | |
"""Opens a file from a given url.""" | |
request_headers = {} | |
if authorization: | |
request_headers['Authorization'] = self.account_info.get_account_auth_token() | |
with b2.OpenUrl(url, None, request_headers) as response: | |
info = response.info() | |
if headers_received_cb is not None: | |
headers_received_cb(info) # may raise an exception to abort | |
file_size = int(info['content-length']) | |
file_sha1 = info['x-bz-content-sha1'] | |
block_size = 4096 | |
digest = hashlib.sha1() | |
bytes_read = 0 | |
while 1: | |
data = response.read(block_size) | |
if len(data) == 0: | |
break | |
digest.update(data) | |
bytes_read += len(data) | |
return data | |
if bytes_read != int(info['content-length']): | |
raise b2.TruncatedOutput(bytes_read, file_size) | |
if digest.hexdigest() != file_sha1: | |
raise b2.ChecksumMismatch( | |
checksum_type='sha1', | |
expected=file_sha1, | |
actual=digest.hexdigest() | |
) | |
class B2Client(object): | |
"""A B2 client built on top of the actively developed Backblaze client.""" | |
def __init__(self, account_id='', application_key='', log_path='.'): | |
self.log = get_logger('b2', log_path=log_path, console_level=2) | |
self.log.debug('Authorizing B2 account.') | |
self.client = B2ApiModified() | |
self.client.authorize_account(realm_url='https://api.backblaze.com', | |
account_id=account_id, application_key=application_key) | |
auth_token = self.client.account_info.get_account_auth_token() | |
self.headers = { 'Authorization': auth_token } | |
def create_bucket(self, bucket_name, bucket_type='allPrivate'): | |
"""Create a B2 bucket. | |
Args: | |
bucket_name (str): The name of the B2 bucket. | |
bucket_type (str): allPrivate or allPublic. Default to 'allPrivate'. | |
""" | |
try: | |
self.client.create_bucket(bucket_name, bucket_type) | |
self.log.info('"{}" bucket was created successfully.'.format(bucket_name)) | |
except b2.DuplicateBucketName: | |
self.log.error('"{}" bucket already exists.'.format(bucket_name)) | |
def delete_bucket(self, bucket_name): | |
"""Delete a B2 bucket. | |
Args: | |
bucket_name (str): The name of the B2 bucket. | |
""" | |
bucket_id = self.get_bucket_id(bucket_name) | |
bucket = b2.Bucket(self.client, bucket_id, bucket_name) | |
self.client.delete_bucket(bucket) | |
self.log.info('"{}" bucket was deleted successfully.'.format(bucket_name)) | |
def upload(self, bucket_name, filepath): | |
"""Upload a B2 file. | |
Args: | |
bucket_name (str): The name of the B2 bucket. | |
filepath (str): The full path of the file to be uploaded | |
""" | |
bucket_id = self.get_bucket_id(bucket_name) | |
bucket = b2.Bucket(self.client, bucket_id, bucket_name) | |
directory, filename = os.path.split(filepath) | |
if not self.get_file_info(bucket_name, filename): | |
bucket.upload_file(filepath, filename) | |
self.log.info('"{}" was added to the "{}" bucket.'.format(filename, bucket_name)) | |
else: | |
self.log.error('"{}" already exists in "{}" bucket. The file was not uploaded.'.format(filename, bucket_name)) | |
def open(self, bucket_name, filename): | |
"""Return the content of a B2 file. | |
Args: | |
bucket_name (str): The name of the B2 bucket. | |
filename (str): The name of the B2 file. | |
""" | |
file_info = self.get_file_info(bucket_name, filename) | |
if file_info: | |
url = file_info.download_url | |
return self.client.open_file_from_url(url) | |
def download(self, bucket_name, filename, dest='.'): | |
"""Download a B2 file. | |
Args: | |
bucket_name (str): The name of the B2 bucket. | |
filename (str): The name of the B2 file. | |
dest (str): Destination directory. | |
""" | |
file_info = self.get_file_info(bucket_name, filename) | |
if file_info: | |
url = file_info.download_url | |
outfile = os.path.join(dest, filename) | |
if os.path.exists(outfile): | |
self.log.error('"{}" target file already exists. Nothing was downloaded.'.format(outfile)) | |
else: | |
with open(filename, 'w') as f: | |
self.client.download_file_from_url(url, f) | |
self.log.info('"{}" was downloaded successfully.'.format(outfile)) | |
else: | |
self.log.error('"{}" does not exist in the "{}" bucket. Nothing was downloaded.'.format(filename, bucket_name)) | |
def delete(self, bucket_name, filename): | |
"""Delete a B2 file. | |
Args: | |
bucket_name (str): The name of the B2 bucket. | |
filename (str): The name of the B2 file. | |
""" | |
file_info = self.get_file_info(bucket_name, filename) | |
if file_info: | |
file_id = file_info.file_id | |
self.client.delete_file_version(file_id, filename) | |
self.log.info('"{}" was deleted successfully.'.format(filename)) | |
else: | |
self.log.error('"{}" does not exist in the "{}" bucket. Nothing was deleted.'.format(filename, bucket_name)) | |
def get_files(self, bucket_name): | |
"""Return a list of B2 files. | |
TODO: convert camelcase keys from B2Api. | |
Each file is represented by a dictionary including the following keys: | |
* fileName | |
* fileId | |
Args: | |
bucket_name (str): The name of the B2 bucket. | |
""" | |
bucket_id = self.get_bucket_id(bucket_name) | |
bucket = b2.Bucket(self.client, bucket_id, bucket_name) | |
return bunchify(bucket.list_file_names()).files | |
def list_files(self, bucket_name): | |
"""Print out a list of files from a bucket. | |
Args: | |
bucket_name (str): The name of the B2 bucket. | |
""" | |
self.client.list_buckets() # Needed to refresh bucket list, not sure why. | |
print 'Listing all files from "{}" bucket.'.format(bucket_name) | |
for f in self.get_files(bucket_name): | |
print '\t', f.fileName, '-', f.fileId | |
def list_buckets(self): | |
"""Print out a list of buckets.""" | |
buckets = self.client.list_buckets() | |
print 'Listing all buckets.' | |
all_bucket_names = [] | |
for b in buckets: | |
bucket_name = repr(b).split(',')[1] | |
bucket_type = repr(b).split(',')[2] | |
print '\t', bucket_name, '-', bucket_type | |
all_bucket_names.append(bucket_name) | |
return ','.join(all_bucket_names) | |
def get_bucket_id(self, bucket_name): | |
"""Get a bucket_id given the name of the bucket. | |
Args: | |
bucket_name (str): The name of the B2 bucket. | |
""" | |
try: | |
return self.client.get_bucket_by_name(bucket_name).id_ | |
except b2.NonExistentBucket: | |
self.log.error('"{}" bucket does not exist.'.format(bucket_name)) | |
raise | |
def get_file_info(self, bucket_name, filename): | |
"""Get info for a B2 file. | |
Args: | |
bucket_name (str): The name of the B2 bucket. | |
filename (str): The name of the B2 file. | |
""" | |
bucket_id = self.get_bucket_id(bucket_name) | |
bucket = b2.Bucket(self.client, bucket_id, bucket_name) | |
url = bucket.get_download_url(filename) | |
r = requests.head(url, headers=self.headers) | |
if r.status_code != 200: | |
return False | |
else: | |
file_id = r.headers['x-bz-file-id'] | |
file_info = bunchify(self.client.get_file_info(file_id)) | |
file_info.download_url = url | |
file_info.file_id = file_info.pop('fileId') | |
return file_info |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment