Last active
May 14, 2022 05:51
-
-
Save LyleScott/cd79d3fae54781bb4167ffcb1bc2252d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Example for how to return a https link to a GCS file that expires in {expiration} | |
seconds from now. | |
Receivers of the link do not need a Google account or any special access. | |
Dependencies: | |
pip install google-auth | |
""" | |
import argparse | |
import binascii | |
import collections | |
import datetime | |
import hashlib | |
import sys | |
from datetime import datetime, timezone | |
from urllib.parse import quote | |
from google.oauth2 import service_account | |
def generate_signed_url( | |
service_account_file: str, bucket_name: str, object_name: str, expiration: int = 600 | |
) -> str: | |
if expiration > 604800: | |
raise ValueError("Expiration can't be longer than 604800 seconds (7 days).") | |
now = datetime.now(tz=timezone.utc) | |
request_timestamp = now.strftime("%Y%m%dT%H%M%SZ") | |
datestamp = now.strftime("%Y%m%d") | |
google_credentials = service_account.Credentials.from_service_account_file( | |
service_account_file | |
) | |
client_email = google_credentials.service_account_email | |
credential_scope = f"{datestamp}/auto/storage/goog4_request" | |
credential = f"{client_email}/{credential_scope}" | |
host = f"{bucket_name}.storage.googleapis.com" | |
headers = {"host": host} | |
canonical_headers = "" | |
ordered_headers = collections.OrderedDict(sorted(headers.items())) | |
for k, v in ordered_headers.items(): | |
lower_k = str(k).lower() | |
strip_v = str(v).lower() | |
canonical_headers += f"{lower_k}:{strip_v}\n" | |
signed_headers = "" | |
for k in ordered_headers.keys(): | |
signed_headers += f"{str(k).lower()};" | |
signed_headers = signed_headers[:-1] # remove trailing ';' | |
query_parameters = { | |
"X-Goog-Algorithm": "GOOG4-RSA-SHA256", | |
"X-Goog-Credential": credential, | |
"X-Goog-Date": request_timestamp, | |
"X-Goog-Expires": expiration, | |
"X-Goog-SignedHeaders": signed_headers, | |
} | |
canonical_query_string = "" | |
ordered_query_parameters = collections.OrderedDict(sorted(query_parameters.items())) | |
for k, v in ordered_query_parameters.items(): | |
encoded_k = quote(str(k), safe="") | |
encoded_v = quote(str(v), safe="") | |
canonical_query_string += f"{encoded_k}={encoded_v}&" | |
canonical_query_string = canonical_query_string[:-1] # remove trailing '&' | |
escaped_object_name = quote(object_name, safe=b"/~") | |
canonical_uri = f"/{escaped_object_name}" | |
canonical_request = "\n".join( | |
[ | |
"GET", | |
canonical_uri, | |
canonical_query_string, | |
canonical_headers, | |
signed_headers, | |
"UNSIGNED-PAYLOAD", | |
] | |
) | |
canonical_request_hash = hashlib.sha256(canonical_request.encode()).hexdigest() | |
string_to_sign = "\n".join( | |
[ | |
"GOOG4-RSA-SHA256", | |
request_timestamp, | |
credential_scope, | |
canonical_request_hash, | |
] | |
) | |
# signer.sign() signs using RSA-SHA256 with PKCS1v15 padding | |
signature = binascii.hexlify( | |
google_credentials.signer.sign(string_to_sign) | |
).decode() | |
signed_url = ( | |
f"https://{host}{canonical_uri}" | |
f"?{canonical_query_string}" | |
f"&x-goog-signature={signature}" | |
) | |
return signed_url | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser( | |
description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter | |
) | |
parser.add_argument( | |
"service_account_file", help="Path to your Google service account keyfile." | |
) | |
parser.add_argument("bucket_name", help="Your Cloud Storage bucket name.") | |
parser.add_argument("object_name", help="Your Cloud Storage object name.") | |
parser.add_argument("expiration", type=int, help="Expiration time.") | |
args = parser.parse_args() | |
signed_url = generate_signed_url( | |
service_account_file=args.service_account_file, | |
bucket_name=args.bucket_name, | |
object_name=args.object_name, | |
expiration=int(args.expiration), | |
) | |
print(signed_url) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment