Skip to content

Instantly share code, notes, and snippets.

@shalomb
Created August 6, 2024 08:06
Show Gist options
  • Save shalomb/97304ae89008bd3cf48be7835f43e759 to your computer and use it in GitHub Desktop.
Save shalomb/97304ae89008bd3cf48be7835f43e759 to your computer and use it in GitHub Desktop.
Sync dynamodb table data
#!/usr/bin/env python3
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from botocore.exceptions import ClientError
# Configure logging
logging.getLogger().setLevel(logging.INFO)
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Initialize DynamoDB resource
dynamodb = boto3.resource('dynamodb', os.environ.get('AWS_REGION', 'us-east-1'))
# Define the source-destination table mapping
table_mapping = {
'source_dynamodb_table': 'target_dynamodb_table'
}
# Function to scan the source table
def scan_table(table):
try:
response = table.scan()
items = response.get('Items', [])
logger.info(f"Scanned {len(items)} items from {table.table_name}")
while 'LastEvaluatedKey' in response:
response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'])
items.extend(response.get('Items', []))
logger.info(f"Scanned additional {len(response.get('Items', []))} items from {table.table_name}")
return items
except ClientError as e:
if e.response['Error']['Code'] == 'ResourceNotFoundException':
logger.error(f"Resource not found for table {table.table_name}: {e}")
else:
logger.error(f"Error scanning table {table.table_name}: {e}")
return []
# Function to copy a single item to the destination table
def copy_item(destination_table, item):
try:
destination_table.put_item(Item=item)
logger.info(f"Successfully wrote item to {destination_table.table_name}")
except Exception as e:
logger.error(f"Error writing item to {destination_table.table_name}: {e}")
def transfer_table_data(source_table_name, destination_table_name):
source_table = dynamodb.Table(source_table_name)
destination_table = dynamodb.Table(destination_table_name)
items = scan_table(source_table)
if not items:
logger.info(f"No items to transfer from {source_table_name} to {destination_table_name}")
return
logger.info(f"Starting data transfer from {source_table_name} to {destination_table_name}")
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(copy_item, destination_table, item) for item in items]
for future in as_completed(futures):
try:
future.result()
except Exception as e:
logger.error(f"Error during transfer: {e}")
logger.info(f"Completed data transfer from {source_table_name} to {destination_table_name}")
def lambda_handler(event, context):
for source_table, destination_table in table_mapping.items():
transfer_table_data(source_table, destination_table)
return {
'statusCode': 200,
'body': 'Migration completed'
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment