-
-
Save Bivek/6d0b0b500a76de90d939ba789cd29793 to your computer and use it in GitHub Desktop.
Add Glue Partitions with Lambda AWS
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import boto3 | |
from dateutil import rrule | |
from datetime import datetime, timedelta | |
glue = boto3.client('glue', '--') # Update with your location | |
s3 = boto3.client('s3') | |
def get_current_schema(table_name, database_name): | |
response = glue.get_table( | |
DatabaseName=database_name, | |
Name=table_name) | |
table_data = {} | |
table_data['input_format'] = response['Table']['StorageDescriptor']['InputFormat'] | |
table_data['output_format'] = response['Table']['StorageDescriptor']['OutputFormat'] | |
table_data['table_location'] = response['Table']['StorageDescriptor']['Location'] | |
table_data['serde_info'] = response['Table']['StorageDescriptor']['SerdeInfo'] | |
table_data['partition_keys'] = response['Table']['PartitionKeys'] | |
return table_data | |
def create_partitions(data, database_name, table_name): | |
break_list_in_chunks = lambda data, chunk: [data[x:x+chunk] for x in range(0, len(data), chunk)] | |
for i, data in enumerate(break_list_in_chunks(data, 100)): | |
print(i) | |
reate_partition_response = glue.batch_create_partition( | |
DatabaseName=database_name, | |
TableName=table_name, | |
PartitionInputList=data | |
) | |
def generate_partition_input(year, month, day, hour, polygon_slug, s3_input, | |
table_data): | |
part_location = "{}/year={}/month={}/day={}/hour={}/polygon_slug={}".format(s3_input, year, month, day, hour, polygon_slug) | |
input_dict = { | |
'Values': [ | |
year, month, day, hour, polygon_slug | |
], | |
'StorageDescriptor': { | |
'Location': part_location, | |
'InputFormat': table_data['input_format'], | |
'OutputFormat': table_data['output_format'], | |
'SerdeInfo': table_data['serde_info'] | |
} | |
} | |
return input_dict | |
def generate_partition_input_list(start, end, s3_input, polygons, table_data): | |
input_list = [] | |
for date in rrule.rrule(rrule.HOURLY, dtstart=start, until=end): | |
year = str(date.year) | |
month = str(date.month) | |
day = str(date.day) | |
hour = str(date.hour) | |
for polygon in polygons: | |
input_list.append(generate_partition_input(year, month, day, hour, polygon['polygon_slug'], s3_input, | |
table_data)) | |
return input_list | |
def read_polygon(bucket, key): | |
s3 = boto3.client('s3') | |
response = s3.get_object(Bucket=bucket, Key=key) | |
payload = json.loads(response['Body'].read().decode('utf-8')) | |
return payload['polygons'] | |
def lambda_handler(event, context): | |
# Glue table location | |
database_name = 'test' # update | |
table_name = 'raw_data_test' # update | |
# S3 info location | |
s3_buckcet_glue = '--' # update | |
s3_prefix_glue = '--' # update | |
s3_input_glue = 's3://' + s3_buckcet_glue + '/' + s3_prefix_glue | |
# Support polygons index | |
s3_buckcet_polygon = '--' # update | |
s3_key_polygon = '--.json' # update | |
# Desired time range | |
start_time = datetime.now() + timedelta(days=0) | |
end_time = datetime.now() + timedelta(days=2) | |
# Get polygons | |
polygons = read_polygon(s3_buckcet_polygon, s3_key_polygon) | |
# Get Glue table metadata | |
table_data = get_current_schema(table_name, database_name) | |
# Generate partition list of dicts | |
data = generate_partition_input_list(start_time, end_time, s3_input_glue, | |
polygons, table_data) | |
# Batch insert partitions | |
create_partitions(data, database_name, table_name) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment