Skip to content

Instantly share code, notes, and snippets.

@priyanshujain
Created October 17, 2020 10:21
Show Gist options
  • Save priyanshujain/693f1073fb7cd3dd5a21a4c88c773363 to your computer and use it in GitHub Desktop.
Save priyanshujain/693f1073fb7cd3dd5a21a4c88c773363 to your computer and use it in GitHub Desktop.
Google Cloud Storage URL Signer
import base64
import datetime
import sys
import time
from hashlib import md5
import requests
from django.conf import settings
import Crypto.Hash.SHA256 as SHA256
import Crypto.PublicKey.RSA as RSA
import Crypto.Signature.PKCS1_v1_5 as PKCS1_v1_5
# The Google Cloud Storage API endpoint. You should not need to change this.
GCS_API_ENDPOINT = "https://storage.googleapis.com"
class CloudStorageURLSigner(object):
"""Contains methods for generating signed URLs for Google Cloud Storage."""
def __init__(
self,
key,
client_id_email,
gcs_api_endpoint,
expiration=None,
session=None,
):
"""Creates a CloudStorageURLSigner that can be used to access signed URLs.
Args:
key: A PyCrypto private key.
client_id_email: GCS service account email.
gcs_api_endpoint: Base URL for GCS API.
expiration: An instance of datetime.datetime containing the time when the
signed URL should expire.
session: A requests.session.Session to use for issuing requests. If not
supplied, a new session is created.
"""
self.key = key
self.client_id_email = client_id_email
self.gcs_api_endpoint = gcs_api_endpoint
self.expiration = expiration or (
datetime.datetime.now() + datetime.timedelta(days=1)
)
self.expiration = int(time.mktime(self.expiration.timetuple()))
self.session = session or requests.Session()
def _Base64Sign(self, plaintext):
"""Signs and returns a base64-encoded SHA256 digest."""
plaintext = plaintext.encode("utf-8")
shahash = SHA256.new(plaintext)
signer = PKCS1_v1_5.new(self.key)
signature_bytes = signer.sign(shahash)
return base64.b64encode(signature_bytes)
def _MakeSignatureString(self, verb, path, content_md5, content_type):
"""Creates the signature string for signing according to GCS docs."""
signature_string = (
"{verb}\n"
"{content_md5}\n"
"{content_type}\n"
"{expiration}\n"
"{resource}"
)
return signature_string.format(
verb=verb,
content_md5=content_md5,
content_type=content_type,
expiration=self.expiration,
resource=path,
)
def _MakeUrl(self, verb, path, content_type="", content_md5=""):
"""Forms and returns the full signed URL to access GCS."""
base_url = "%s%s" % (self.gcs_api_endpoint, path)
signature_string = self._MakeSignatureString(
verb, path, content_md5, content_type
)
signature_signed = self._Base64Sign(signature_string)
query_params = {
"GoogleAccessId": self.client_id_email,
"Expires": str(self.expiration),
"Signature": signature_signed,
}
return base_url, query_params
def Get(self, path):
"""Performs a GET request.
Args:
path: The relative API path to access, e.g. '/bucket/object'.
Returns:
An instance of requests.Response containing the HTTP response.
"""
base_url, query_params = self._MakeUrl("GET", path)
return self.session.get(base_url, params=query_params)
def generate_signed_url(file_name):
try:
keytext = settings.GS_CREDENTIALS.signer._key._save_pkcs1_pem().decode(
"utf-8"
)
except IOError as e:
sys.exit("Error while reading private key: %s" % e)
private_key = RSA.importKey(keytext)
signer = CloudStorageURLSigner(
private_key,
settings.GS_CREDENTIALS.service_account_email,
GCS_API_ENDPOINT,
)
file_path = "/%s/%s" % (settings.GS_BUCKET_NAME, file_name)
r = signer.Get(file_path)
return r.request.url
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment