Created
October 17, 2020 10:21
-
-
Save priyanshujain/693f1073fb7cd3dd5a21a4c88c773363 to your computer and use it in GitHub Desktop.
Google Cloud Storage URL Signer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import base64 | |
import datetime | |
import sys | |
import time | |
from hashlib import md5 | |
import requests | |
from django.conf import settings | |
import Crypto.Hash.SHA256 as SHA256 | |
import Crypto.PublicKey.RSA as RSA | |
import Crypto.Signature.PKCS1_v1_5 as PKCS1_v1_5 | |
# The Google Cloud Storage API endpoint. You should not need to change this. | |
GCS_API_ENDPOINT = "https://storage.googleapis.com" | |
class CloudStorageURLSigner(object): | |
"""Contains methods for generating signed URLs for Google Cloud Storage.""" | |
def __init__( | |
self, | |
key, | |
client_id_email, | |
gcs_api_endpoint, | |
expiration=None, | |
session=None, | |
): | |
"""Creates a CloudStorageURLSigner that can be used to access signed URLs. | |
Args: | |
key: A PyCrypto private key. | |
client_id_email: GCS service account email. | |
gcs_api_endpoint: Base URL for GCS API. | |
expiration: An instance of datetime.datetime containing the time when the | |
signed URL should expire. | |
session: A requests.session.Session to use for issuing requests. If not | |
supplied, a new session is created. | |
""" | |
self.key = key | |
self.client_id_email = client_id_email | |
self.gcs_api_endpoint = gcs_api_endpoint | |
self.expiration = expiration or ( | |
datetime.datetime.now() + datetime.timedelta(days=1) | |
) | |
self.expiration = int(time.mktime(self.expiration.timetuple())) | |
self.session = session or requests.Session() | |
def _Base64Sign(self, plaintext): | |
"""Signs and returns a base64-encoded SHA256 digest.""" | |
plaintext = plaintext.encode("utf-8") | |
shahash = SHA256.new(plaintext) | |
signer = PKCS1_v1_5.new(self.key) | |
signature_bytes = signer.sign(shahash) | |
return base64.b64encode(signature_bytes) | |
def _MakeSignatureString(self, verb, path, content_md5, content_type): | |
"""Creates the signature string for signing according to GCS docs.""" | |
signature_string = ( | |
"{verb}\n" | |
"{content_md5}\n" | |
"{content_type}\n" | |
"{expiration}\n" | |
"{resource}" | |
) | |
return signature_string.format( | |
verb=verb, | |
content_md5=content_md5, | |
content_type=content_type, | |
expiration=self.expiration, | |
resource=path, | |
) | |
def _MakeUrl(self, verb, path, content_type="", content_md5=""): | |
"""Forms and returns the full signed URL to access GCS.""" | |
base_url = "%s%s" % (self.gcs_api_endpoint, path) | |
signature_string = self._MakeSignatureString( | |
verb, path, content_md5, content_type | |
) | |
signature_signed = self._Base64Sign(signature_string) | |
query_params = { | |
"GoogleAccessId": self.client_id_email, | |
"Expires": str(self.expiration), | |
"Signature": signature_signed, | |
} | |
return base_url, query_params | |
def Get(self, path): | |
"""Performs a GET request. | |
Args: | |
path: The relative API path to access, e.g. '/bucket/object'. | |
Returns: | |
An instance of requests.Response containing the HTTP response. | |
""" | |
base_url, query_params = self._MakeUrl("GET", path) | |
return self.session.get(base_url, params=query_params) | |
def generate_signed_url(file_name): | |
try: | |
keytext = settings.GS_CREDENTIALS.signer._key._save_pkcs1_pem().decode( | |
"utf-8" | |
) | |
except IOError as e: | |
sys.exit("Error while reading private key: %s" % e) | |
private_key = RSA.importKey(keytext) | |
signer = CloudStorageURLSigner( | |
private_key, | |
settings.GS_CREDENTIALS.service_account_email, | |
GCS_API_ENDPOINT, | |
) | |
file_path = "/%s/%s" % (settings.GS_BUCKET_NAME, file_name) | |
r = signer.Get(file_path) | |
return r.request.url |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment