Creates time based Glue partitions given time range.
Keep in mind that you don't need data to add partitions. So, you can create partitions for a whole year and add the data to S3 later.
| import json | |
| import boto3 | |
| from dateutil import rrule | |
| from datetime import datetime, timedelta | |
| glue = boto3.client('glue', '--') # Update with your location | |
| s3 = boto3.client('s3') | |
| def get_current_schema(table_name, database_name): | |
| response = glue.get_table( | |
| DatabaseName=database_name, | |
| Name=table_name) | |
| table_data = {} | |
| table_data['input_format'] = response['Table']['StorageDescriptor']['InputFormat'] | |
| table_data['output_format'] = response['Table']['StorageDescriptor']['OutputFormat'] | |
| table_data['table_location'] = response['Table']['StorageDescriptor']['Location'] | |
| table_data['serde_info'] = response['Table']['StorageDescriptor']['SerdeInfo'] | |
| table_data['partition_keys'] = response['Table']['PartitionKeys'] | |
| return table_data | |
| def create_partitions(data, database_name, table_name): | |
| break_list_in_chunks = lambda data, chunk: [data[x:x+chunk] for x in range(0, len(data), chunk)] | |
| for i, data in enumerate(break_list_in_chunks(data, 100)): | |
| print(i) | |
| reate_partition_response = glue.batch_create_partition( | |
| DatabaseName=database_name, | |
| TableName=table_name, | |
| PartitionInputList=data | |
| ) | |
| def generate_partition_input(year, month, day, hour, polygon_slug, s3_input, | |
| table_data): | |
| part_location = "{}/year={}/month={}/day={}/hour={}".format(s3_input, year, month, day, hour) | |
| input_dict = { | |
| 'Values': [ | |
| year, month, day, hour | |
| ], | |
| 'StorageDescriptor': { | |
| 'Location': part_location, | |
| 'InputFormat': table_data['input_format'], | |
| 'OutputFormat': table_data['output_format'], | |
| 'SerdeInfo': table_data['serde_info'] | |
| } | |
| } | |
| return input_dict | |
| def generate_partition_input_list(start, end, s3_input, polygons, table_data): | |
| input_list = [] | |
| for date in rrule.rrule(rrule.HOURLY, dtstart=start, until=end): | |
| year = str(date.year) | |
| month = str(date.month) | |
| day = str(date.day) | |
| hour = str(date.hour) | |
| input_list.append(generate_partition_input(year, month, day, hour, s3_input, table_data)) | |
| return input_list | |
| def lambda_handler(event, context): | |
| # Glue table location | |
| database_name = 'test' # update | |
| table_name = 'raw_data_test' # update | |
| # S3 info location | |
| s3_bucket_glue = '--' # update | |
| s3_prefix_glue = '--' # update | |
| s3_input_glue = 's3://' + s3_bucket_glue + '/' + s3_prefix_glue | |
| # Desired time range | |
| start_time = datetime.now() + timedelta(days=0) | |
| end_time = datetime.now() + timedelta(days=2) | |
| # Get Glue table metadata | |
| table_data = get_current_schema(table_name, database_name) | |
| # Generate partition list of dicts | |
| data = generate_partition_input_list(start_time, end_time, s3_input_glue, table_data) | |
| # Batch insert partitions | |
| create_partitions(data, database_name, table_name) |
Thanks for the reply.
I have smaller data as of now. It's small project for educational purpose. But trying to apply all best practices I can.
Your code is completely clear...I was just unclear about you iterate "polygons" in "generate_partition_input_list()" method, what is value part in polygon['polygon_slug']....!
I will probably take route for splitting the .csv into three (multiple) and create Glue partitions using Lambda so that I can refuse to to run expensive Glue Crawler.
Thanks !