Created
September 9, 2020 04:13
-
-
Save mbiemann/4fc92a6fd49a6414e20f05744496a78e to your computer and use it in GitHub Desktop.
Python script to run on AWS looking for all Apache Hudi Glue Table and Add Partition via Athena according to S3 files
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json,time | |
import boto3 | |
athena_output = 's3://aws-athena-query-results-ACCOUNTID-REGION' | |
athena = boto3.client('athena') | |
glue = boto3.client('glue') | |
s3 = boto3.client('s3') | |
glue_search_next = {} | |
while glue_search_next != None: | |
rTables = glue.search_tables( | |
**{'Filters':[{ | |
'Key': 'InputFormat', | |
'Value': 'org.apache.hudi.hadoop.HoodieParquetInputFormat' | |
}]}, | |
**glue_search_next | |
) | |
glue_search_next = {'NextToken':rTables['NextToken']} if 'NextToken' in rTables else None | |
if 'TableList' in rTables: | |
for srcTable in rTables['TableList']: | |
# if not ( srcTable['DatabaseName'] == 'databaseName' and srcTable['Name'] == 'tableName' ): | |
# continue | |
if srcTable['StorageDescriptor']['InputFormat'] != 'org.apache.hudi.hadoop.HoodieParquetInputFormat': | |
continue | |
if srcTable['PartitionKeys'] == []: | |
continue | |
database = srcTable['DatabaseName'] | |
table = srcTable['Name'] | |
location = srcTable['StorageDescriptor']['Location'] | |
print(database) | |
print(table) | |
print(location) | |
partKeys = [key['Name'] for key in srcTable['PartitionKeys']] | |
print(partKeys) | |
partList = [] | |
glue_getpart_next = {} | |
while glue_getpart_next != None: | |
rParts = glue.get_partitions( | |
**{ | |
'DatabaseName': database, | |
'TableName': table | |
}, | |
**glue_getpart_next | |
) | |
glue_getpart_next = {'NextToken':rParts['NextToken']} if 'NextToken' in rParts else None | |
if 'Partitions' in rParts: | |
for srcPartition in rParts['Partitions']: | |
if len(partKeys) != len(srcPartition['Values']): | |
print(partKeys) | |
print(srcPartition['Values']) | |
print('ERRO: Partições não coincidem com Valores') | |
continue | |
part = [] | |
for i in range(len(partKeys)): | |
part += [f'''{partKeys[i]}={srcPartition['Values'][i]}'''] | |
partList += ['/'.join(part)] | |
# print(partList) | |
locationSplit = location.replace('s3://','').replace('s3a://','').split('/') | |
bucket = locationSplit[0] | |
prefix = '/'.join(locationSplit[1:]) | |
prefix = prefix if prefix[-1:] == '/' else prefix + '/' | |
addPartition = [] | |
s3_list_next = {} | |
while s3_list_next != None: | |
rFiles = s3.list_objects_v2( | |
**{ | |
'Bucket': bucket, | |
'Prefix': prefix | |
}, | |
**s3_list_next | |
) | |
s3_list_next = {'ContinuationToken':rFiles['NextContinuationToken']} if 'NextContinuationToken' in rFiles else None | |
if 'Contents' in rFiles: | |
for srcFile in rFiles['Contents']: | |
if srcFile['Key'][-8:] != '.parquet': | |
continue | |
key = srcFile['Key'].replace(prefix,'') | |
values = key.split('/')[:-1] | |
if len(partKeys) != len(values): | |
print(partKeys) | |
print(values) | |
print('ERRO: Partições não coincidem com Valores') | |
continue | |
part = '/'.join(values) | |
if part not in partList: | |
if part not in addPartition: | |
print(f' new partition {part}') | |
addPartition += [part] | |
partCols = ','.join(part.split('/')) | |
partCols = partCols.replace('=','=\'').replace(',','\',')+'\'' | |
sql = f\"ALTER TABLE {database}.{table} ADD IF NOT EXISTS PARTITION ({partCols}) LOCATION 's3://{bucket}/{prefix}{part}'\" | |
print(' '+sql) | |
count = 0 | |
while True: | |
count += 1 | |
try: | |
r = athena.start_query_execution( | |
QueryString=sql, | |
QueryExecutionContext={ | |
'Database': database | |
}, | |
ResultConfiguration={ | |
'OutputLocation': athena_output | |
} | |
) | |
queryId = r['QueryExecutionId'] | |
break | |
except Exception as e: | |
if 'ThrottlingException' in str(e): | |
print(str(e)) | |
time.sleep(1*count) | |
else: | |
raise Exception(e) | |
if count == 100: | |
raise Exception('Too many retries') | |
count = 0 | |
while True: | |
count += 1 | |
r = athena.get_query_execution( | |
QueryExecutionId=queryId | |
) | |
status = r['QueryExecution']['Status']['State'] | |
print(' '+status) | |
if status in ['QUEUED','RUNNING']: | |
time.sleep(1*count) | |
else: | |
if 'StateChangeReason' in r['QueryExecution']['Status']: | |
print(' '+r['QueryExecution']['Status']['StateChangeReason']) | |
break | |
if count == 100: | |
raise Exception('Too many retries') | |
print('=================================================') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment