Created
August 6, 2024 08:06
-
-
Save shalomb/97304ae89008bd3cf48be7835f43e759 to your computer and use it in GitHub Desktop.
Sync dynamodb table data
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import logging | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
from botocore.exceptions import ClientError | |
# Configure logging | |
logging.getLogger().setLevel(logging.INFO) | |
logger = logging.getLogger() | |
logger.setLevel(logging.INFO) | |
# Initialize DynamoDB resource | |
dynamodb = boto3.resource('dynamodb', os.environ.get('AWS_REGION', 'us-east-1')) | |
# Define the source-destination table mapping | |
table_mapping = { | |
'source_dynamodb_table': 'target_dynamodb_table' | |
} | |
# Function to scan the source table | |
def scan_table(table): | |
try: | |
response = table.scan() | |
items = response.get('Items', []) | |
logger.info(f"Scanned {len(items)} items from {table.table_name}") | |
while 'LastEvaluatedKey' in response: | |
response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey']) | |
items.extend(response.get('Items', [])) | |
logger.info(f"Scanned additional {len(response.get('Items', []))} items from {table.table_name}") | |
return items | |
except ClientError as e: | |
if e.response['Error']['Code'] == 'ResourceNotFoundException': | |
logger.error(f"Resource not found for table {table.table_name}: {e}") | |
else: | |
logger.error(f"Error scanning table {table.table_name}: {e}") | |
return [] | |
# Function to copy a single item to the destination table | |
def copy_item(destination_table, item): | |
try: | |
destination_table.put_item(Item=item) | |
logger.info(f"Successfully wrote item to {destination_table.table_name}") | |
except Exception as e: | |
logger.error(f"Error writing item to {destination_table.table_name}: {e}") | |
def transfer_table_data(source_table_name, destination_table_name): | |
source_table = dynamodb.Table(source_table_name) | |
destination_table = dynamodb.Table(destination_table_name) | |
items = scan_table(source_table) | |
if not items: | |
logger.info(f"No items to transfer from {source_table_name} to {destination_table_name}") | |
return | |
logger.info(f"Starting data transfer from {source_table_name} to {destination_table_name}") | |
with ThreadPoolExecutor(max_workers=3) as executor: | |
futures = [executor.submit(copy_item, destination_table, item) for item in items] | |
for future in as_completed(futures): | |
try: | |
future.result() | |
except Exception as e: | |
logger.error(f"Error during transfer: {e}") | |
logger.info(f"Completed data transfer from {source_table_name} to {destination_table_name}") | |
def lambda_handler(event, context): | |
for source_table, destination_table in table_mapping.items(): | |
transfer_table_data(source_table, destination_table) | |
return { | |
'statusCode': 200, | |
'body': 'Migration completed' | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment