pip install boto3
# Run python file
Last active
June 11, 2025 22:20
-
-
Save fazlurr/8119f77248d60c59ea9eb66d02601f72 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
import json | |
import os | |
SOURCE_PROFILE = 'source-account' | |
DEST_PROFILE = 'dest-account' | |
REGION = 'us-east-1' | |
EXPORT_DIR = 'sqs_export' | |
os.makedirs(EXPORT_DIR, exist_ok=True) | |
def export_queues(): | |
session = boto3.Session(profile_name=SOURCE_PROFILE, region_name=REGION) | |
sqs = session.client('sqs') | |
queues = sqs.list_queues().get('QueueUrls', []) | |
for queue_url in queues: | |
queue_name = queue_url.split('/')[-1] | |
print(f'Exporting {queue_name}') | |
attrs = sqs.get_queue_attributes(QueueUrl=queue_url, AttributeNames=['All'])['Attributes'] | |
tags = sqs.list_queue_tags(QueueUrl=queue_url).get('Tags', {}) | |
with open(f'{EXPORT_DIR}/{queue_name}.json', 'w') as f: | |
json.dump({'name': queue_name, 'attributes': attrs, 'tags': tags}, f, indent=2) | |
def import_queues(): | |
session = boto3.Session(profile_name=DEST_PROFILE, region_name=REGION) | |
sqs = session.client('sqs') | |
for file in os.listdir(EXPORT_DIR): | |
with open(f'{EXPORT_DIR}/{file}') as f: | |
data = json.load(f) | |
queue_name = data['name'] | |
attrs = data['attributes'] | |
tags = data['tags'] | |
# Clean up attributes not allowed during creation | |
for k in ['QueueArn', 'CreatedTimestamp', 'LastModifiedTimestamp', 'ApproximateNumberOfMessages', 'ApproximateNumberOfMessagesDelayed', 'ApproximateNumberOfMessagesNotVisible']: | |
attrs.pop(k, None) | |
print(f'Creating {queue_name} in destination account...') | |
response = sqs.create_queue( | |
QueueName=queue_name, | |
Attributes=attrs | |
) | |
dest_url = response['QueueUrl'] | |
if tags: | |
sqs.tag_queue(QueueUrl=dest_url, Tags=tags) | |
if __name__ == "__main__": | |
print("1. Exporting from source account...") | |
export_queues() | |
print("2. Importing into destination account...") | |
import_queues() | |
print("✅ Done.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment