Created
January 21, 2025 13:43
-
-
Save ThePlenkov/ec4310d5781ebc753fb81dc7816b6ec7 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
from awsglue.transforms import * | |
from awsglue.utils import getResolvedOptions | |
from pyspark.context import SparkContext | |
from awsglue.context import GlueContext | |
from awsglue.job import Job | |
import boto3 | |
import json | |
## @params: [JOB_NAME, SECRET_NAME] | |
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'SECRET_NAME']) | |
glue_database = 'default' | |
sc = SparkContext() | |
glueContext = GlueContext(sc) | |
spark = glueContext.spark_session | |
job = Job(glueContext) | |
job.init(args['JOB_NAME'], args) | |
# Initialize Secrets Manager client | |
secrets_manager_client = boto3.client('secretsmanager') | |
# Retrieve the HANA service key from Secrets Manager | |
secret_name = args['SECRET_NAME'] | |
secret_response = secrets_manager_client.get_secret_value(SecretId=secret_name) | |
secret_string = secret_response['SecretString'] | |
hana_service_key = json.loads(secret_string) # Parse the JSON from the secret | |
# Function to dynamically fetch data with a query | |
def query(query, connection_type="custom.jdbc"): | |
# Use hana_service_key as the base for connection_options | |
connection_options = hana_service_key.copy() | |
connection_options["query"] = query | |
connection_options["className"] = hana_service_key["driver"] | |
return glueContext.create_dynamic_frame.from_options( | |
connection_type=connection_type, | |
connection_options=connection_options | |
) | |
# Show current schema | |
current_schema = query("select CURRENT_SCHEMA from DUMMY") | |
current_schema.show() | |
# List tables | |
tables = query("select TABLE_NAME from TABLES where SCHEMA_NAME in ( select CURRENT_SCHEMA from DUMMY )") | |
tables.show() | |
tables_list = [row['TABLE_NAME'] for row in tables.toDF().collect()] | |
def replicate_table(table_name, glue_database): | |
print(f"Replicating table: {table_name}") | |
# Fetch the table data | |
table_data = query(f"SELECT * FROM {table_name}") | |
# Check if the DynamicFrame is empty | |
if table_data.count() == 0: | |
print(f"Skipping table '{table_name}' as it has no data.") | |
return | |
# Write the data to S3 and update the Glue Data Catalog | |
glueContext.write_dynamic_frame.from_options( | |
frame=table_data, | |
connection_type="s3", | |
connection_options={ | |
"path": f"s3://glue-replicate-hdi/{table_name}/" # Replace with your S3 bucket | |
}, | |
format="parquet" | |
) | |
# Replicate each table to Glue | |
for table in tables_list: | |
replicate_table(table, glue_database) | |
# Commit the job | |
job.commit() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment