Last active
February 27, 2024 20:46
-
-
Save Intelrunner/d4eef85c6b8e49ac725bc8741e2ea01c to your computer and use it in GitHub Desktop.
GCP Cloud Function that Takes an EventARC Event of "InsertDataset" for BQ and immediately changes the billing model to physical for that dataset. Returns a simple log output.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import functions_framework | |
from google.cloud import bigquery | |
def extract_dataset_name(full_path): | |
""" | |
Extracts the dataset name from a full BigQuery dataset resource path. | |
Args: | |
- full_path (str): The full resource path of the dataset, | |
formatted as 'projects/{project_id}/datasets/{dataset_id}'. | |
Returns: | |
- str: The name of the dataset. | |
""" | |
# Split the path by '/' | |
parts = full_path.split('/') | |
# The last element of the parts list is the dataset name | |
dataset_name = parts[-1] | |
project_name = parts[1] | |
return f"{project_name}.{dataset_name}" | |
def alter_dataset_storage_billing_model(dataset_name, billing_model): | |
""" | |
Alter the storage billing model of a BigQuery dataset using the ALTER SCHEMA query. | |
Parameters: | |
- dataset_name (str): The full dataset name (including project ID) in the format `project_id.dataset_id`. | |
- billing_model (str): The desired billing model, either 'REQUEST_PAY' or 'FLAT_RATE'. | |
Example usage: | |
alter_dataset_storage_billing_model('your-project-id.your-dataset-name', 'REQUEST_PAY') | |
Raises: | |
- google.api_core.exceptions.GoogleAPIError: If an error occurs while executing the query. | |
""" | |
# Initialize the BigQuery client | |
client = bigquery.Client() | |
# Construct the ALTER SCHEMA query | |
query = f""" | |
ALTER SCHEMA `{dataset_name}` | |
SET OPTIONS( | |
storage_billing_model = '{billing_model}' | |
); | |
""" | |
# Execute the query | |
query_job = client.query(query) | |
# Wait for the query to complete | |
query_job.result() # Waits for job to complete | |
print(f"Dataset '{dataset_name}' updated to use the '{billing_model}' billing model. ✨") | |
# CloudEvent function to be triggered by an Eventarc Cloud Audit Logging trigger | |
# Note: this is NOT designed for second-party (Cloud Audit Logs -> Pub/Sub) triggers! | |
@functions_framework.cloud_event | |
def hello_auditlog(cloudevent): | |
""" | |
Prints out the CloudEvent's type and subject properties. | |
Also prints out details from the protoPayload, which encapsulates a Cloud Audit Logging entry. | |
Parameters: | |
- cloudevent: The CloudEvent object containing the event data. | |
Returns: | |
- The result of calling the alter_dataset_storage_billing_model function. | |
""" | |
print(f"Event type: {cloudevent['type']}") | |
if 'subject' in cloudevent: | |
print(f"Subject: {cloudevent['subject']}") | |
payload = cloudevent.data.get("protoPayload") | |
if payload: | |
print(f"API method: {payload.get('methodName')}") | |
print(f"Resource name: {payload.get('resourceName')}") | |
print(f"Principal: {payload.get('authenticationInfo', dict()).get('principalEmail')}") | |
x = extract_dataset_name(payload.get('resourceName')) | |
# Finalizes the change of the dataset to Physical | |
return alter_dataset_storage_billing_model(x, 'PHYSICAL') | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment