Last active
          September 15, 2025 09:23 
        
      - 
      
- 
        Save t04glovern/04f6f2934353eb1d0fffd487e9b9b6a3 to your computer and use it in GitHub Desktop. 
    Creates a sample Iceberg table in Athena allowing you to try out Iceberg easily. This script is geared towards people who are new to the AWS variety of Iceberg and keen to try some of the unique features of Iceberg.
  
        
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | #!/usr/bin/env python3 | |
| """ | |
| This script generates sample data, uploads it to an S3 bucket, and creates Iceberg and Athena tables. | |
| It also creates IAM roles and policies for optimization and statistics generation if specified. | |
| There is also an option to create a Firehose delivery stream and insert random records into it. | |
| Install: | |
| python3 -m venv .venv | |
| source .venv/bin/activate | |
| pip3 install boto3 | |
| curl https://gist.githubusercontent.com/t04glovern/04f6f2934353eb1d0fffd487e9b9b6a3/raw \ | |
| > lets-try-iceberg.py \ | |
| && chmod +x lets-try-iceberg.py | |
| Usage: | |
| ./lets-try-iceberg.py \ | |
| [--table <table-name>] \ | |
| [--bucket <bucket-name>] \ | |
| [--delete] \ | |
| [--run-queries] \ | |
| [--optimization] \ | |
| [--statistics] \ | |
| [--firehose] \ | |
| [--firehose-load] | |
| Output Files: | |
| 1-athena-iceberg-create-table.sql - CREATE TABLE statement | |
| 2-athena-create-temp-table.sql - CREATE EXTERNAL TABLE statement | |
| 3-insert-into-iceberg-from-temp-table.sql - INSERT INTO statement | |
| 4-cleanup-temp-table.sql - DROP TABLE statement | |
| 5-cleanup-iceberg-table.sql - DROP TABLE statement | |
| """ | |
| import argparse | |
| import logging | |
| import random | |
| import os | |
| import configparser | |
| from datetime import datetime | |
| from typing import Dict, Tuple, Union | |
| import gzip | |
| import json | |
| import time | |
| import boto3 | |
| from botocore.exceptions import ClientError | |
| logging.basicConfig(level=logging.INFO) | |
| aws_region: str = "us-west-2" | |
| s3 = boto3.client("s3", region_name=aws_region) | |
| firehose = boto3.client('firehose', region_name=aws_region) | |
| logs = boto3.client('logs', region_name=aws_region) | |
| athena = boto3.client('athena', region_name=aws_region) | |
| iam = boto3.client("iam") | |
| sts = boto3.client("sts") | |
| account_id = sts.get_caller_identity()["Account"] | |
| CONFIG_FILE = ".lets-try-iceberg.conf" | |
| def load_config(): | |
| config = configparser.ConfigParser() | |
| if os.path.exists(CONFIG_FILE): | |
| config.read(CONFIG_FILE) | |
| return config | |
| def save_config(bucket_name: str, table_name: str): | |
| config = configparser.ConfigParser() | |
| config['DEFAULT'] = { | |
| 'bucket': bucket_name, | |
| 'table': table_name | |
| } | |
| with open(CONFIG_FILE, 'w') as configfile: | |
| config.write(configfile) | |
| def delete_config(): | |
| if os.path.exists(CONFIG_FILE): | |
| os.remove(CONFIG_FILE) | |
| def run_athena_query(query, query_output_bucket=None): | |
| athena_database = "default" | |
| athena_workgroup = "primary" | |
| try: | |
| logging.debug(f"Executing query: {query}") | |
| if query_output_bucket: | |
| query_start_response = athena.start_query_execution( | |
| QueryString=query, | |
| QueryExecutionContext={"Database": athena_database}, | |
| ResultConfiguration={ | |
| "OutputLocation": f"s3://{query_output_bucket}/athena-query-results/", | |
| }, | |
| WorkGroup=athena_workgroup, | |
| ) | |
| else: | |
| query_start_response = athena.start_query_execution( | |
| QueryString=query, | |
| QueryExecutionContext={"Database": athena_database}, | |
| WorkGroup=athena_workgroup, | |
| ) | |
| query_execution_id = query_start_response["QueryExecutionId"] | |
| while True: | |
| query_status_response = athena.get_query_execution( | |
| QueryExecutionId=query_execution_id | |
| ) | |
| query_execution_status = query_status_response["QueryExecution"]["Status"]["State"] | |
| if query_execution_status in ["SUCCEEDED"]: | |
| logging.debug(f"Query executed successfully.") | |
| return | |
| elif query_execution_status in ["FAILED", "CANCELLED"]: | |
| logging.error( | |
| f"Query execution failed. Status: {query_execution_status}") | |
| raise Exception( | |
| f"Query execution failed. Status: {query_execution_status}") | |
| else: | |
| logging.debug( | |
| f"Query execution in progress. Current status: {query_execution_status}") | |
| time.sleep(5) | |
| except Exception as err: | |
| logging.exception(f"Error during query execution: {err}") | |
| raise err | |
| def generate_random_json( | |
| id: str, | |
| timestamp: datetime, | |
| speed: float, | |
| temperature: float, | |
| location: Dict[str, float], | |
| ) -> Tuple[ | |
| Dict[str, Union[str, float, Dict[str, float]]], float, float, Dict[str, float] | |
| ]: | |
| speed += random.randint(-5, 5) | |
| temperature = round(temperature + random.uniform(-0.5, 0.5), 2) | |
| location["lat"] += random.uniform(-0.0001, 0.0001) | |
| location["lng"] += random.uniform(-0.0001, 0.0001) | |
| return ( | |
| { | |
| "id": id, | |
| "timestamp": timestamp.isoformat(), | |
| "speed": speed, | |
| "temperature": temperature, | |
| "location": location, | |
| }, | |
| speed, | |
| temperature, | |
| location, | |
| ) | |
| def generate_and_upload_jsonl(bucket_name: str, sample_count: int = 1000000): | |
| gzip_filename = "samples.jsonl.gz" | |
| s3_path = f"{bucket_name}/sample-data/{gzip_filename}" | |
| with gzip.open(gzip_filename, "wt", encoding="UTF-8") as f: | |
| for i in range(sample_count): | |
| sample_data, _, _, _ = generate_random_json( | |
| id=str(i % 5 + 1), # Cycling through IDs 1-5 | |
| timestamp=datetime.now(), | |
| speed=random.randint(0, 100), | |
| temperature=random.uniform(-20, 40), | |
| location={ | |
| "lat": random.uniform(-90, 90), | |
| "lng": random.uniform(-180, 180), | |
| }, | |
| ) | |
| f.write(json.dumps(sample_data) + "\n") | |
| with open(gzip_filename, "rb") as f: | |
| s3.upload_fileobj(f, bucket_name, f"sample-data/{gzip_filename}") | |
| logging.info(f"Uploaded {gzip_filename} to s3://{s3_path}") | |
| def create_bucket(bucket_name: str): | |
| try: | |
| s3.head_bucket(Bucket=bucket_name) | |
| except Exception as e: | |
| logging.info(f"Bucket {bucket_name} does not exist, creating it...") | |
| s3.create_bucket( | |
| Bucket=bucket_name, | |
| CreateBucketConfiguration={"LocationConstraint": aws_region}, | |
| ) | |
| else: | |
| logging.info(f"Bucket {bucket_name} already exists, using it...") | |
| def create_iceberg_query(bucket_name: str, table_name: str, run_query: bool = False): | |
| sql_content = f"""CREATE TABLE IF NOT EXISTS {table_name} ( | |
| `id` string, | |
| `timestamp` timestamp, | |
| `speed` int, | |
| `temperature` float, | |
| `location` struct < lat: float, lng: float > | |
| ) | |
| PARTITIONED BY ( | |
| id | |
| ) | |
| LOCATION 's3://{bucket_name}/' | |
| TBLPROPERTIES ( | |
| 'table_type'='ICEBERG', | |
| 'vacuum_max_snapshot_age_seconds'='60', | |
| 'vacuum_max_metadata_files_to_keep'='5' | |
| ); | |
| """ | |
| with open("1-athena-iceberg-create-table.sql", "w") as sql_file: | |
| sql_file.write(sql_content) | |
| if run_query: | |
| logging.info("Running Iceberg table creation query...") | |
| run_athena_query(sql_content, bucket_name) | |
| def create_athena_temp_table_sql(bucket_name: str, table_name: str, run_query: bool = False): | |
| sql_content = f"""CREATE EXTERNAL TABLE IF NOT EXISTS {table_name}_sample_data ( | |
| `id` string, | |
| `timestamp` timestamp, | |
| `speed` int, | |
| `temperature` float, | |
| `location` struct<lat:float, lng:float> | |
| ) | |
| ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' | |
| WITH SERDEPROPERTIES ( "timestamp.formats"="yyyy-MM-dd'T'HH:mm:ss.SSSSSSZZ" ) | |
| LOCATION 's3://{bucket_name}/sample-data/' | |
| """ | |
| with open("2-athena-create-temp-table.sql", "w") as sql_file: | |
| sql_file.write(sql_content) | |
| if run_query: | |
| logging.info("Running temporary table creation query...") | |
| run_athena_query(sql_content, bucket_name) | |
| def create_insert_from_temp_to_iceberg_sql(bucket_name: str, table_name: str, run_query: bool = False): | |
| sql_content = f"""INSERT INTO {table_name} | |
| SELECT * FROM {table_name}_sample_data | |
| """ | |
| with open("3-insert-into-iceberg-from-temp-table.sql", "w") as sql_file: | |
| sql_file.write(sql_content) | |
| if run_query: | |
| logging.info("Running INSERT INTO Iceberg table query...") | |
| run_athena_query(sql_content, bucket_name) | |
| def create_cleanup_table_sql(bucket_name: str, table_name: str, run_query: bool = False): | |
| sql_content = f"""DROP TABLE IF EXISTS {table_name}_sample_data; | |
| """ | |
| with open("4-cleanup-temp-table.sql", "w") as sql_file: | |
| sql_file.write(sql_content) | |
| if run_query: | |
| logging.info("Running temporary table cleanup query...") | |
| run_athena_query(sql_content, bucket_name) | |
| sql_content = f"""DROP TABLE IF EXISTS {table_name}; | |
| """ | |
| with open("5-cleanup-iceberg-table.sql", "w") as sql_file: | |
| sql_file.write(sql_content) | |
| if run_query: | |
| logging.info("Running Iceberg table cleanup query...") | |
| run_athena_query(sql_content, bucket_name) | |
| def wait_for_role_to_propagate(role_arn, retries=6, delay=5): | |
| for attempt in range(retries): | |
| try: | |
| iam.get_role(RoleName=role_arn.split('/')[-1]) | |
| logging.info(f"Role {role_arn} is now available.") | |
| return | |
| except iam.exceptions.NoSuchEntityException: | |
| logging.warning(f"Role {role_arn} not available yet, retrying...") | |
| time.sleep(delay) | |
| def create_iam_role_and_policy(role_name: str, policy_name: str, assume_role_policy_document: dict, policy_document: dict): | |
| # Check for existing role | |
| try: | |
| iam.get_role(RoleName=role_name) | |
| logging.info(f"IAM role {role_name} already exists") | |
| except iam.exceptions.NoSuchEntityException: | |
| # Create the role if it does not exist | |
| iam.create_role( | |
| RoleName=role_name, AssumeRolePolicyDocument=json.dumps(assume_role_policy_document) | |
| ) | |
| logging.info(f"Created IAM role {role_name}") | |
| # Check for existing policy | |
| try: | |
| policy = iam.get_policy(PolicyArn=f"arn:aws:iam::{account_id}:policy/{policy_name}") | |
| logging.info(f"IAM policy {policy_name} already exists, updating it...") | |
| # Create a new version of the policy | |
| iam.create_policy_version( | |
| PolicyArn=policy["Policy"]["Arn"], | |
| PolicyDocument=json.dumps(policy_document), | |
| SetAsDefault=True | |
| ) | |
| # Optionally, clean up non-default versions of the policy | |
| policy_versions = iam.list_policy_versions(PolicyArn=policy["Policy"]["Arn"]) | |
| for version in policy_versions["Versions"]: | |
| if not version["IsDefaultVersion"]: | |
| iam.delete_policy_version( | |
| PolicyArn=policy["Policy"]["Arn"], | |
| VersionId=version["VersionId"] | |
| ) | |
| except iam.exceptions.NoSuchEntityException: | |
| # Create the policy if it does not exist | |
| policy = iam.create_policy( | |
| PolicyName=policy_name, PolicyDocument=json.dumps(policy_document) | |
| ) | |
| logging.info(f"Created IAM policy {policy_name}") | |
| # Attach the policy to the role | |
| iam.attach_role_policy( | |
| RoleName=role_name, PolicyArn=policy["Policy"]["Arn"] | |
| ) | |
| def create_iam_role_and_policy_iceberg_optimization(bucket_name: str, table_name: str): | |
| role_name = "lets-try-iceberg-optimization-role" | |
| policy_name = "lets-try-iceberg-optimization-policy" | |
| assume_role_policy_document = { | |
| "Version": "2012-10-17", | |
| "Statement": [{ | |
| "Effect": "Allow", | |
| "Principal": {"Service": "glue.amazonaws.com"}, | |
| "Action": "sts:AssumeRole", | |
| }] | |
| } | |
| # Define the policy document | |
| policy_document = { | |
| "Version": "2012-10-17", | |
| "Statement": [ | |
| # S3 permissions | |
| { | |
| "Effect": "Allow", | |
| "Action": ["s3:PutObject", "s3:GetObject", "s3:DeleteObject"], | |
| "Resource": [f"arn:aws:s3:::{bucket_name}/*"], | |
| }, | |
| { | |
| "Effect": "Allow", | |
| "Action": ["s3:ListBucket"], | |
| "Resource": [f"arn:aws:s3:::{bucket_name}"], | |
| }, | |
| # Glue permissions | |
| { | |
| "Effect": "Allow", | |
| "Action": ["glue:UpdateTable", "glue:GetTable"], | |
| "Resource": [ | |
| f"arn:aws:glue:{aws_region}:{account_id}:table/default/{table_name}", | |
| f"arn:aws:glue:{aws_region}:{account_id}:database/default", | |
| f"arn:aws:glue:{aws_region}:{account_id}:catalog", | |
| ], | |
| }, | |
| # Logs permissions | |
| { | |
| "Effect": "Allow", | |
| "Action": [ | |
| "logs:CreateLogGroup", | |
| "logs:CreateLogStream", | |
| "logs:PutLogEvents", | |
| ], | |
| "Resource": [ | |
| f"arn:aws:logs:{aws_region}:{account_id}:log-group:/aws-glue/iceberg-compaction/logs:*", | |
| f"arn:aws:logs:{aws_region}:{account_id}:log-group:/aws-glue/iceberg-retention/logs:*", | |
| f"arn:aws:logs:{aws_region}:{account_id}:log-group:/aws-glue/iceberg-orphan-file-deletion/logs:*" | |
| ] | |
| }, | |
| # IAM permissions | |
| { | |
| "Effect": "Allow", | |
| "Action": ["iam:PassRole"], | |
| "Resource": [f"arn:aws:iam::{account_id}:role/{role_name}"] | |
| }, | |
| ], | |
| } | |
| create_iam_role_and_policy(role_name, policy_name, assume_role_policy_document, policy_document) | |
| def create_iam_role_and_policy_statistics(bucket_name: str, table_name: str): | |
| role_name = "lets-try-iceberg-statistics-role" | |
| policy_name = "lets-try-iceberg-statistics-policy" | |
| assume_role_policy_document = { | |
| "Version": "2012-10-17", | |
| "Statement": [{ | |
| "Effect": "Allow", | |
| "Principal": {"Service": "glue.amazonaws.com"}, | |
| "Action": "sts:AssumeRole", | |
| }] | |
| } | |
| # Define the policy document | |
| policy_document = { | |
| "Version": "2012-10-17", | |
| "Statement": [ | |
| # S3 permissions | |
| { | |
| "Effect": "Allow", | |
| "Action": ["s3:PutObject", "s3:GetObject", "s3:DeleteObject"], | |
| "Resource": [f"arn:aws:s3:::{bucket_name}/*"], | |
| }, | |
| { | |
| "Effect": "Allow", | |
| "Action": ["s3:ListBucket"], | |
| "Resource": [f"arn:aws:s3:::{bucket_name}"], | |
| }, | |
| # Glue permissions | |
| { | |
| "Effect": "Allow", | |
| "Action": ["glue:UpdateTable", "glue:GetTable"], | |
| "Resource": [ | |
| f"arn:aws:glue:{aws_region}:{account_id}:table/default/{table_name}", | |
| f"arn:aws:glue:{aws_region}:{account_id}:database/default", | |
| f"arn:aws:glue:{aws_region}:{account_id}:catalog", | |
| ], | |
| }, | |
| # Logs permissions | |
| { | |
| "Effect": "Allow", | |
| "Action": ["logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents"], | |
| "Resource": [f"arn:aws:logs:{aws_region}:{account_id}:log-group:/aws-glue:*"] | |
| }, | |
| # IAM permissions | |
| { | |
| "Effect": "Allow", | |
| "Action": ["iam:PassRole"], | |
| "Resource": [f"arn:aws:iam::{account_id}:role/{role_name}"] | |
| } | |
| ], | |
| } | |
| create_iam_role_and_policy(role_name, policy_name, assume_role_policy_document, policy_document) | |
| # Attach the AWSGlueServiceRole managed policy | |
| iam.attach_role_policy( | |
| RoleName=role_name, PolicyArn="arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole" | |
| ) | |
| def create_iam_role_and_policy_firehose(bucket_name: str, table_name: str) -> str: | |
| role_name = "lets-try-iceberg-firehose-role" | |
| policy_name = "lets-try-iceberg-firehose-policy" | |
| firehose_stream_name = "lets-try-iceberg-stream" | |
| log_group_name = "lets-try-iceberg-log-group" | |
| assume_role_policy_document = { | |
| "Version": "2012-10-17", | |
| "Statement": [{ | |
| "Effect": "Allow", | |
| "Principal": {"Service": "firehose.amazonaws.com"}, | |
| "Action": "sts:AssumeRole", | |
| }] | |
| } | |
| policy_document = { | |
| "Version": "2012-10-17", | |
| "Statement": [ | |
| # S3 permissions | |
| { | |
| "Effect": "Allow", | |
| "Action": [ | |
| "s3:AbortMultipartUpload", | |
| "s3:GetBucketLocation", | |
| "s3:GetObject", | |
| "s3:ListBucket", | |
| "s3:ListBucketMultipartUploads", | |
| "s3:PutObject", | |
| "s3:DeleteObject" | |
| ], | |
| "Resource": [ | |
| f"arn:aws:s3:::{bucket_name}", | |
| f"arn:aws:s3:::{bucket_name}/*" | |
| ] | |
| }, | |
| # Glue permissions | |
| { | |
| "Effect": "Allow", | |
| "Action": ["glue:UpdateTable", "glue:GetTable", "glue:GetDatabase"], | |
| "Resource": [ | |
| f"arn:aws:glue:{aws_region}:{account_id}:table/default/{table_name}", | |
| f"arn:aws:glue:{aws_region}:{account_id}:database/default", | |
| f"arn:aws:glue:{aws_region}:{account_id}:catalog", | |
| ], | |
| }, | |
| # Kinesis permissions | |
| { | |
| "Effect": "Allow", | |
| "Action": ["kinesis:DescribeStream", "kinesis:GetShardIterator", "kinesis:GetRecords", "kinesis:ListShards"], | |
| "Resource": f"arn:aws:kinesis:{aws_region}:{account_id}:stream/{firehose_stream_name}" | |
| }, | |
| # Logs permissions | |
| { | |
| "Effect": "Allow", | |
| "Action": ["logs:PutLogEvents"], | |
| "Resource": [ | |
| f"arn:aws:logs:{aws_region}:{account_id}:log-group:/aws/kinesisfirehose/{firehose_stream_name}:*", | |
| f"arn:aws:logs:{aws_region}:{account_id}:log-group:{log_group_name}:*" | |
| ] | |
| } | |
| ] | |
| } | |
| create_iam_role_and_policy(role_name, policy_name, assume_role_policy_document, policy_document) | |
| # Wait for the role to propagate | |
| time.sleep(5) | |
| role_arn = f"arn:aws:iam::{account_id}:role/{role_name}" | |
| return role_arn | |
| def create_log_group_and_stream(log_group_name: str, log_stream_name: str): | |
| try: | |
| response = logs.create_log_group(logGroupName=log_group_name) | |
| logging.info(f'Created log group: {log_group_name}') | |
| except logs.exceptions.ResourceAlreadyExistsException: | |
| logging.info(f'Log group {log_group_name} already exists') | |
| try: | |
| response = logs.create_log_stream(logGroupName=log_group_name, logStreamName=log_stream_name) | |
| logging.info(f'Created log stream: {log_stream_name}') | |
| except logs.exceptions.ResourceAlreadyExistsException: | |
| logging.info(f'Log stream {log_stream_name} already exists') | |
| def create_firehose_delivery_stream(bucket_name: str, table_name: str, role_arn: str): | |
| firehose_stream_name = "lets-try-iceberg-stream" | |
| log_group_name = "lets-try-iceberg-log-group" | |
| log_stream_name = "iceberg" | |
| # Create the log group and stream | |
| create_log_group_and_stream(log_group_name, log_stream_name) | |
| # Check if the delivery stream already exists | |
| try: | |
| response = firehose.describe_delivery_stream(DeliveryStreamName=firehose_stream_name) | |
| logging.info(f'Firehose delivery stream {firehose_stream_name} already exists: {response["DeliveryStreamDescription"]["DeliveryStreamARN"]}') | |
| return | |
| except firehose.exceptions.ResourceNotFoundException: | |
| pass | |
| # Check the role has propagated | |
| wait_for_role_to_propagate(role_arn) | |
| try: | |
| response = firehose.create_delivery_stream( | |
| DeliveryStreamName=firehose_stream_name, | |
| DeliveryStreamType='DirectPut', | |
| IcebergDestinationConfiguration={ | |
| 'DestinationTableConfigurationList': [ | |
| { | |
| 'DestinationTableName': f'{table_name}', | |
| 'DestinationDatabaseName': 'default', | |
| } | |
| ], | |
| 'CloudWatchLoggingOptions': { | |
| 'Enabled': True, | |
| 'LogGroupName': f'{log_group_name}', | |
| 'LogStreamName': f'{log_stream_name}' | |
| }, | |
| 'ProcessingConfiguration': { | |
| 'Enabled': False | |
| }, | |
| 'BufferingHints': { | |
| 'IntervalInSeconds': 5 | |
| }, | |
| 'RoleARN': role_arn, | |
| 'CatalogConfiguration': { | |
| 'CatalogARN': f'arn:aws:glue:{aws_region}:{account_id}:catalog', | |
| }, | |
| 'S3Configuration': { | |
| 'RoleARN': role_arn, | |
| 'BucketARN': f'arn:aws:s3:::{bucket_name}' | |
| } | |
| } | |
| ) | |
| logging.info(f'Created Firehose delivery stream: {response["DeliveryStreamARN"]}') | |
| except Exception as e: | |
| logging.error(f'Failed to create Firehose delivery stream: {e}') | |
| def random_insert_to_firehose(table_name: str, sample_count: int = 1000): | |
| delivery_stream_name = "lets-try-iceberg-stream" | |
| for i in range(sample_count): | |
| sample_data, _, _, _ = generate_random_json( | |
| id=str(i % 5 + 1), # Cycling through IDs 1-5 | |
| timestamp=datetime.now(), | |
| speed=random.randint(0, 100), | |
| temperature=random.uniform(-20, 40), | |
| location={ | |
| "lat": random.uniform(-90, 90), | |
| "lng": random.uniform(-180, 180), | |
| }, | |
| ) | |
| metadata = { | |
| "ADF_Metadata": { | |
| "OTF_Metadata": { | |
| "DestinationTableName": table_name, | |
| "DestinationDatabaseName": "default", | |
| "Operation": "INSERT" | |
| } | |
| } | |
| } | |
| iceberg_data = { | |
| "ADF_Record": sample_data, | |
| **metadata | |
| } | |
| json_string = json.dumps(iceberg_data) | |
| record = {'Data': json_string} | |
| try: | |
| response = firehose.put_record( | |
| DeliveryStreamName=delivery_stream_name, | |
| Record=record | |
| ) | |
| logging.info(f'Successfully put record to Firehose: {response["RecordId"]}') | |
| except Exception as e: | |
| logging.error(f"Failed to put record to Firehose: {e}") | |
| def delete_resources(bucket_name, table_name): | |
| try: | |
| # 1. Delete Firehose stream | |
| firehose_stream_name = "lets-try-iceberg-stream" | |
| try: | |
| firehose.delete_delivery_stream(DeliveryStreamName=firehose_stream_name, AllowForceDelete=True) | |
| logging.info(f"Deleted Firehose stream: {firehose_stream_name}") | |
| except ClientError as e: | |
| if e.response["Error"]["Code"] == "ResourceNotFoundException": | |
| logging.info(f"Firehose stream {firehose_stream_name} does not exist.") | |
| else: | |
| raise e | |
| # 2. Delete Log group and stream | |
| log_group_name = "lets-try-iceberg-log-group" | |
| log_stream_name = "iceberg" | |
| try: | |
| logs.delete_log_stream(logGroupName=log_group_name, logStreamName=log_stream_name) | |
| logging.info(f"Deleted Log stream: {log_stream_name}") | |
| except ClientError as e: | |
| if e.response["Error"]["Code"] == "ResourceNotFoundException": | |
| logging.info(f"Log stream {log_stream_name} does not exist.") | |
| else: | |
| raise e | |
| try: | |
| logs.delete_log_group(logGroupName=log_group_name) | |
| logging.info(f"Deleted Log group: {log_group_name}") | |
| except ClientError as e: | |
| if e.response["Error"]["Code"] == "ResourceNotFoundException": | |
| logging.info(f"Log group {log_group_name} does not exist.") | |
| else: | |
| raise e | |
| # 3. Delete Iceberg table and sample table | |
| try: | |
| create_cleanup_table_sql(bucket_name, table_name, run_query=True) | |
| except Exception as e: | |
| logging.error(f"Tables either do not exist or could not be deleted: {e}") | |
| # 4. Delete IAM roles and policies | |
| role_policies = [ | |
| {"role_name": "lets-try-iceberg-optimization-role", "policy_name": "lets-try-iceberg-optimization-policy"}, | |
| {"role_name": "lets-try-iceberg-statistics-role", "policy_name": "lets-try-iceberg-statistics-policy"}, | |
| {"role_name": "lets-try-iceberg-firehose-role", "policy_name": "lets-try-iceberg-firehose-policy"}, | |
| ] | |
| for item in role_policies: | |
| try: | |
| # Detach the policy from the role | |
| policy_arn = f"arn:aws:iam::{account_id}:policy/{item['policy_name']}" | |
| iam.detach_role_policy(RoleName=item["role_name"], PolicyArn=policy_arn) | |
| logging.info(f"Detached policy {item['policy_name']} from role {item['role_name']}") | |
| # Delete the role | |
| iam.delete_role(RoleName=item["role_name"]) | |
| logging.info(f"Deleted IAM role: {item['role_name']}") | |
| # Delete the policy | |
| iam.delete_policy(PolicyArn=policy_arn) | |
| logging.info(f"Deleted IAM policy: {item['policy_name']}") | |
| except ClientError as e: | |
| if e.response["Error"]["Code"] == "NoSuchEntity": | |
| logging.info(f"IAM role or policy does not exist: {item['role_name']} / {item['policy_name']}") | |
| else: | |
| raise e | |
| # 5. Empty the S3 bucket, then delete it | |
| try: | |
| bucket_objects = s3.list_objects_v2(Bucket=bucket_name) | |
| if "Contents" in bucket_objects: | |
| objects = [{'Key': obj['Key']} for obj in bucket_objects['Contents']] | |
| s3.delete_objects(Bucket=bucket_name, Delete={'Objects': objects}) | |
| logging.info(f"Emptied bucket: {bucket_name}") | |
| s3.delete_bucket(Bucket=bucket_name) | |
| logging.info(f"Deleted bucket: {bucket_name}") | |
| except ClientError as e: | |
| if e.response["Error"]["Code"] == "NoSuchBucket": | |
| logging.info(f"S3 bucket {bucket_name} does not exist.") | |
| else: | |
| raise e | |
| # 6. Delete the config file | |
| delete_config() | |
| except Exception as e: | |
| logging.error(f"Error while deleting resources: {e}") | |
| raise | |
| def main(): | |
| config = load_config() | |
| parser = argparse.ArgumentParser(description="Iceberg - Sample Table Creation") | |
| parser.add_argument( | |
| "--table", | |
| type=str, | |
| help="The table name to use. If not provided, it will be loaded from the config or default to 'lets_try_iceberg'.", | |
| default=config.get('DEFAULT', 'table', fallback="lets_try_iceberg"), | |
| ) | |
| parser.add_argument( | |
| "--bucket", | |
| type=str, | |
| help="The S3 bucket name to store generated data. If not provided, it will be loaded from the config or a random one will be generated.", | |
| default=config.get('DEFAULT', 'bucket', fallback=None), | |
| ) | |
| parser.add_argument( | |
| "--delete", | |
| action="store_true", | |
| help="Delete all resources (Firehose stream, Log group/stream, Iceberg table, IAM roles, S3 bucket)", | |
| ) | |
| parser.add_argument( | |
| "--run-queries", | |
| action="store_true", | |
| help="If provided, runs the Athena queries.", | |
| ) | |
| parser.add_argument( | |
| "--optimization", | |
| action="store_true", | |
| help="If provided, creates the optimization IAM role and policy.", | |
| ) | |
| parser.add_argument( | |
| "--statistics", | |
| action="store_true", | |
| help="If provided, creates the statistics IAM role and policy.", | |
| ) | |
| parser.add_argument( | |
| "--firehose", | |
| action="store_true", | |
| help="If provided, creates the firehose IAM role and policy and sets up a delivery stream.", | |
| ) | |
| parser.add_argument( | |
| "--firehose-load", | |
| action="store_true", | |
| help="If provided, inserts random data into the Firehose delivery stream.", | |
| ) | |
| args = parser.parse_args() | |
| bucket_name = args.bucket | |
| table_name = args.table | |
| if not bucket_name: | |
| bucket_name = f"iceberg-sample-data-{random.randint(100000, 999999)}" | |
| create_bucket(bucket_name) | |
| if args.delete: | |
| delete_resources(bucket_name, table_name) | |
| else: | |
| save_config(bucket_name, table_name) | |
| create_iceberg_query(bucket_name, table_name, args.run_queries) | |
| create_cleanup_table_sql(bucket_name, table_name) | |
| if args.optimization: | |
| generate_and_upload_jsonl(bucket_name) | |
| create_athena_temp_table_sql(bucket_name, table_name, args.run_queries) | |
| create_insert_from_temp_to_iceberg_sql(bucket_name, table_name, args.run_queries) | |
| create_iam_role_and_policy_iceberg_optimization(bucket_name, table_name) | |
| if args.statistics: | |
| generate_and_upload_jsonl(bucket_name) | |
| create_athena_temp_table_sql(bucket_name, table_name, args.run_queries) | |
| create_insert_from_temp_to_iceberg_sql(bucket_name, table_name, args.run_queries) | |
| create_iam_role_and_policy_statistics(bucket_name, table_name) | |
| if args.firehose: | |
| role_arn = create_iam_role_and_policy_firehose(bucket_name, table_name) | |
| create_firehose_delivery_stream(bucket_name, table_name, role_arn) | |
| if args.firehose_load: | |
| random_insert_to_firehose(table_name) | |
| if __name__ == "__main__": | |
| main() | 
  
    Sign up for free
    to join this conversation on GitHub.
    Already have an account?
    Sign in to comment