Created
October 20, 2021 08:04
-
-
Save rcarmo/823bd16c87cadd9e692285484a59ef1a to your computer and use it in GitHub Desktop.
Azure Blob Storage Tiering for Synology Backups:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
from typing import Iterator | |
from requests import Session, Response | |
from base64 import b64encode, b64decode | |
from datetime import datetime, timedelta, timezone | |
from email.utils import formatdate, parsedate_to_datetime | |
from hashlib import sha256, md5 | |
from hmac import HMAC | |
from lxml.etree import Element, tostring, fromstring, iterwalk | |
from os import environ | |
from logging import getLogger, basicConfig | |
import azure.functions as func | |
log = getLogger(__name__) | |
basicConfig(format = 'time=%(asctime)s loc=%(funcName)s:%(lineno)d msg="%(message)s"', | |
level = environ.get('LOGLEVEL','DEBUG')) | |
class BlobClient: | |
account = None | |
auth = None | |
session = None | |
def __init__(self, account: str, auth=None, session=None) -> None: | |
"""Create a BlobClient instance""" | |
self.account = account | |
if auth is not None: | |
self.auth = b64decode(auth) | |
if session is None: | |
session = Session() | |
self.session = session | |
def close(self) -> None: | |
self.session.close() | |
def _headers(self, headers={}, date=None) -> dict: | |
"""Default headers for REST requests""" | |
if not date: | |
date = formatdate(usegmt=True) # if you don't use GMT, the API breaks | |
return { | |
'x-ms-date': date, | |
'x-ms-version': '2018-03-28', | |
'Content-Type': 'application/octet-stream', | |
'Connection': 'Keep-Alive', | |
**headers | |
} | |
#'x-ms-blob-type': 'BlockBlob', # TODO: support also append blobs later | |
#'x-ms-blob-content-disposition': 'attachment; filename="fname.ext"', | |
#'Content-Type': 'text/plain; charset=UTF-8', | |
#'x-ms-meta-createdBy': 'aioazstorage', | |
def _sign_for_blobs(self, verb: str, canonicalized: str, headers={}, payload='') -> dict: | |
"""Compute SharedKeyLite authorization header and add standard headers""" | |
headers = self._headers(headers) | |
signing_headers = sorted(filter(lambda x: 'x-ms' in x, headers.keys())) | |
canon_headers = "\n".join("{}:{}".format(k, headers[k]) for k in signing_headers) | |
sign = "\n".join([verb, '', headers['Content-Type'], '', canon_headers, canonicalized]).encode('utf-8') | |
return { | |
'Authorization': 'SharedKeyLite {}:{}'.format(self.account, \ | |
b64encode(HMAC(self.auth, sign, sha256).digest()).decode('utf-8')), | |
'Content-Length': str(len(payload)), | |
**headers | |
} | |
def createContainer(self, container_name) -> Response: | |
"""Create a new Container""" | |
canon = f'/{self.account}/{container_name}' | |
uri = f'https://{self.account}.blob.core.windows.net/{container_name}?restype=container' | |
return self.session.put(uri, headers=self._sign_for_blobs("PUT", canon)) | |
def deleteContainer(self, container_name) -> Response: | |
canon = f'/{self.account}/{container_name}' | |
uri = f'https://{self.account}.blob.core.windows.net/{container_name}?restype=container' | |
return self.session.delete(uri, headers=self._sign_for_blobs("DELETE", canon)) | |
def listContainers(self, marker=None) -> Iterator[dict]: | |
canon = f'/{self.account}/?comp=list' | |
if marker is None: | |
uri = f'https://{self.account}.blob.core.windows.net/?comp=list' | |
else: | |
uri = f'https://{self.account}.blob.core.windows.net/?comp=list&marker={marker}' | |
res = self.session.get(uri, headers=self._sign_for_blobs("GET", canon)) | |
if res.ok: | |
doc = fromstring(res.text.encode('utf-8')) | |
for container in doc.xpath("//Container"): | |
item = { | |
"name": container.find("Name").text | |
} | |
for prop in container.xpath("./Properties/*"): | |
if prop.tag in ["Creation-Time","Last-Modified","Etag","Content-Length","Content-Type","Content-Encoding","Content-MD5","Cache-Control"]: | |
if prop.tag in ["Last-Modified", "DeletedTime"]: | |
item[prop.tag.lower()] = parsedate_to_datetime(prop.text) | |
else: | |
item[prop.tag.lower()] = prop.text | |
yield item | |
tag = doc.find("NextMarker") | |
if tag is not None: | |
if tag.text: | |
del res | |
del doc | |
for item in self.listContainers(tag.text): | |
yield item | |
else: | |
log.info("not ok") | |
log.info(res.status_code) | |
log.info(res.text) | |
def listBlobs(self, container_name, marker=None) -> Iterator[dict]: | |
canon = f'/{self.account}/{container_name}?comp=list' | |
if marker is None: | |
uri = f'https://{self.account}.blob.core.windows.net/{container_name}?restype=container&comp=list&include=metadata' | |
else: | |
uri = f'https://{self.account}.blob.core.windows.net/{container_name}?restype=container&comp=list&include=metadata&marker={marker}' | |
res = self.session.get(uri, headers=self._sign_for_blobs("GET", canon)) | |
if res.ok: | |
doc = fromstring(res.text.encode('utf-8')) | |
for blob in doc.xpath("//Blob"): | |
item = { | |
"name": blob.find("Name").text | |
} | |
for prop in blob.xpath("./Properties/*"): | |
if prop.tag in ["AccessTier","Creation-Time","Last-Modified","Etag","Content-Length","Content-Type","Content-Encoding","Content-MD5","Cache-Control"] and prop.text: | |
if prop.tag in ["Last-Modified", "Creation-Time"]: | |
item[prop.tag.lower()] = parsedate_to_datetime(prop.text) | |
elif prop.tag in ["Content-Length"]: | |
item[prop.tag.lower()] = int(prop.text) | |
elif prop.tag in ["Content-MD5"]: | |
item[prop.tag.lower()] = b64decode(prop.text.encode('utf-8')) | |
else: | |
item[prop.tag.lower()] = prop.text | |
yield item | |
tag = doc.find("NextMarker") | |
if tag is not None: | |
if tag.text: | |
del res | |
del doc | |
for item in self.listBlobs(container_name, tag.text): | |
yield item | |
else: | |
log.error(res.status_code) | |
log.error(res.text) | |
def putBlob(self, container_name: str, blob_path: str, payload, mimetype="application/octet-stream") -> Response: | |
"""Upload a blob""" | |
canon = f'/{self.account}/{container_name}/{blob_path}' | |
uri = f'https://{self.account}.blob.core.windows.net/{container_name}/{blob_path}' | |
headers = { | |
'x-ms-blob-type': 'BlockBlob', | |
'x-ms-blob-content-type': mimetype, | |
'Content-Type': mimetype | |
} | |
return self.session.put(uri, data=payload, headers=self._sign_for_blobs("PUT", canon, headers, payload)) | |
def setBlobTier(self, container_name: str, blob_path: str, tier: str) -> Response: | |
canon = f'/{self.account}/{container_name}/{blob_path}?comp=tier' | |
uri = f'https://{self.account}.blob.core.windows.net/{container_name}/{blob_path}?comp=tier' | |
headers = { | |
'x-ms-access-tier': tier | |
} | |
return self.session.put(uri, headers=self._sign_for_blobs("PUT", canon, headers)) | |
def main(mytimer: func.TimerRequest) -> None: | |
now = datetime.now(timezone.utc) | |
cool_window = now + timedelta(hours=-24) | |
archive_window = now + timedelta(days=-30) | |
log.info(f'Starting run {now}') | |
if mytimer.past_due: | |
log.info('The timer is past due!') | |
client = BlobClient(environ.get("STORAGE_ACCOUNT_NAME"), environ.get("STORAGE_ACCOUNT_KEY")) | |
for container in client.listContainers(): | |
if container['name'] in environ.get("INSPECT_CONTAINERS", "homes"): | |
for blob in client.listBlobs(container['name']): | |
if "HyperBackup.hbk/Pool/0" in blob['name']: | |
if "index" in blob['name']: | |
if blob['accesstier'] != "Cool": | |
if blob['last-modified'] < cool_window: | |
log.info(f"{blob['last-modified']} <-> {cool_window} for {blob}") | |
res = client.setBlobTier(container['name'], blob['name'], "Cool") | |
if not res.ok: | |
log.info(f"error setting tier: {res.text}") | |
log.info('ending run') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment