|
#!/usr/bin/env python3 |
|
# -*- coding: utf-8 -*- |
|
|
|
import os |
|
import os.path |
|
import json |
|
import ipaddress |
|
import flask |
|
import google.auth |
|
from datetime import timedelta |
|
from google.auth.transport import requests |
|
from google.auth import compute_engine |
|
from google.oauth2 import service_account |
|
from google.cloud import storage |
|
|
|
|
|
# Force bucket resolution for security purpose. |
|
bucket = os.environ["BUCKET_NAME"] |
|
|
|
|
|
def vpn_bypass(request): |
|
"""Responds to any HTTP request. |
|
Args: |
|
request (flask.Request): HTTP request object. |
|
https://werkzeug.palletsprojects.com/en/1.0.x/wrappers/ |
|
Returns: |
|
The response text or any set of values that can be turned into a |
|
Response object using |
|
`make_response <http://flask.pocoo.org/docs/1.0/api/#flask.Flask.make_response>`. |
|
""" |
|
ip_address = request.headers["X-Forwarded-For"] # Notice: not working with remote_addr |
|
path = request.path[1:] # Drop first / separator |
|
|
|
print(f"Check if {ip_address} can bypass access to {bucket} resource {path}") |
|
if from_authorized_network(ip_address): |
|
return flask.redirect(get_signed_url(bucket, path)) |
|
else: |
|
return flask.redirect(f"https://storage.cloud.google.com/{bucket}/{path}") |
|
|
|
|
|
# TODO: Specific to the context of this gist, update it with your own way to fetch the IP list. |
|
def from_authorized_network(ip_address): |
|
ip_address = ipaddress.ip_address(ip_address) |
|
|
|
# Load all known IP ranges of authorized networks |
|
ip_list = json.loads(open("ip_list.json").read()) |
|
location_to_ranges = { |
|
item["location"]: [ # Flatten all ip_list items in one |
|
ipi for ipl in item["ip_list"] for ipi in (ipl["staff"] if ipl["staff"] else []) |
|
] for item in ip_list |
|
} |
|
|
|
# Iterate until finding a range including the provided ip |
|
for location, ip_ranges in location_to_ranges.items(): |
|
for ip_range in ip_ranges: |
|
if ip_address in ipaddress.ip_network(ip_range): |
|
print(f"{ip_address} found in {location}'s {ip_range} network") |
|
return True |
|
|
|
return False |
|
|
|
# Generate a signed url to access this resource |
|
# https://cloud.google.com/storage/docs/access-control/signed-urls |
|
# Inspired by https://gist.github.com/jezhumble/91051485db4462add82045ef9ac2a0ec |
|
# See also https://cloud.google.com/storage/docs/access-control/signing-urls-manually |
|
def get_signed_url(bucket, path, expiresMn = 10): |
|
# It has to be a JSON account file, tests with google.auth.default credentials always failed. |
|
credentials = service_account.Credentials.from_service_account_file( |
|
"service-account.json", |
|
scopes=["https://www.googleapis.com/auth/devstorage.read_only"], |
|
) |
|
# Consider a storage bucket in the same project than this function. |
|
_, project = google.auth.default() |
|
storage_client = storage.Client(project, credentials) |
|
data_bucket = storage_client.bucket(bucket) |
|
blob = data_bucket.blob(path) |
|
|
|
signing_credentials = compute_engine.IDTokenCredentials( |
|
requests.Request(), |
|
"", |
|
service_account_email=credentials.service_account_email, |
|
) |
|
signed_url = blob.generate_signed_url( |
|
expiration=timedelta(minutes=expiresMn), |
|
method="GET", |
|
credentials=signing_credentials, |
|
version="v4", |
|
) |
|
return signed_url |