Skip to content

Instantly share code, notes, and snippets.

@ThePlenkov
Created January 21, 2025 13:43
Show Gist options
  • Save ThePlenkov/ec4310d5781ebc753fb81dc7816b6ec7 to your computer and use it in GitHub Desktop.
Save ThePlenkov/ec4310d5781ebc753fb81dc7816b6ec7 to your computer and use it in GitHub Desktop.
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import boto3
import json
## @params: [JOB_NAME, SECRET_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'SECRET_NAME'])
glue_database = 'default'
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# Initialize Secrets Manager client
secrets_manager_client = boto3.client('secretsmanager')
# Retrieve the HANA service key from Secrets Manager
secret_name = args['SECRET_NAME']
secret_response = secrets_manager_client.get_secret_value(SecretId=secret_name)
secret_string = secret_response['SecretString']
hana_service_key = json.loads(secret_string) # Parse the JSON from the secret
# Function to dynamically fetch data with a query
def query(query, connection_type="custom.jdbc"):
# Use hana_service_key as the base for connection_options
connection_options = hana_service_key.copy()
connection_options["query"] = query
connection_options["className"] = hana_service_key["driver"]
return glueContext.create_dynamic_frame.from_options(
connection_type=connection_type,
connection_options=connection_options
)
# Show current schema
current_schema = query("select CURRENT_SCHEMA from DUMMY")
current_schema.show()
# List tables
tables = query("select TABLE_NAME from TABLES where SCHEMA_NAME in ( select CURRENT_SCHEMA from DUMMY )")
tables.show()
tables_list = [row['TABLE_NAME'] for row in tables.toDF().collect()]
def replicate_table(table_name, glue_database):
print(f"Replicating table: {table_name}")
# Fetch the table data
table_data = query(f"SELECT * FROM {table_name}")
# Check if the DynamicFrame is empty
if table_data.count() == 0:
print(f"Skipping table '{table_name}' as it has no data.")
return
# Write the data to S3 and update the Glue Data Catalog
glueContext.write_dynamic_frame.from_options(
frame=table_data,
connection_type="s3",
connection_options={
"path": f"s3://glue-replicate-hdi/{table_name}/" # Replace with your S3 bucket
},
format="parquet"
)
# Replicate each table to Glue
for table in tables_list:
replicate_table(table, glue_database)
# Commit the job
job.commit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment