Skip to content

Instantly share code, notes, and snippets.

@kenoir
Last active September 16, 2019 11:33
Show Gist options
  • Save kenoir/30f542bc5e541bf4ba69458f086bdcea to your computer and use it in GitHub Desktop.
Save kenoir/30f542bc5e541bf4ba69458f086bdcea to your computer and use it in GitHub Desktop.
import json
from pprint import pprint
import uuid
import argparse
import boto3
def assumed_role_session(role_arn):
sts_client = boto3.client('sts')
assumed_role_object=sts_client.assume_role(
RoleArn=role_arn,
RoleSessionName=uuid.uuid1().hex
)
credentials=assumed_role_object['Credentials']
session = boto3.Session(
aws_access_key_id=credentials['AccessKeyId'],
aws_secret_access_key=credentials['SecretAccessKey'],
aws_session_token=credentials['SessionToken']
)
return session
def create_client(service_name, role_arn):
session = assumed_role_session(role_arn)
return session.client(service_name)
# ------------
def _create_message_receipt(message):
return {
'MessageId': message['MessageId'],
'ReceiptHandle': message['ReceiptHandle'],
'MessageBody': json.loads(message['Body']),
}
def _create_entry_with_receipt(message_receipt):
return {
'receipt': {
'Id': message_receipt['MessageId'],
'ReceiptHandle': message_receipt['ReceiptHandle'],
},
'entry': {
'Id': str(uuid.uuid1()),
'MessageBody': json.dumps(message_receipt['MessageBody']),
}
}
def _find_successful(entries_with_receipts, message_id):
return [
entries_with_receipt['receipt']
for entries_with_receipt in entries_with_receipts if entries_with_receipt['entry']['Id'] == message_id
]
def _flatten(list_of_lists):
return [val for sublist in list_of_lists for val in sublist]
def get_queue(client, queue_name):
response = client.get_queue_url(
QueueName=queue_name
)
return response['QueueUrl']
def receive_messages(client, queue_url):
response = client.receive_message(
QueueUrl=queue_url,
AttributeNames=['All'],
MaxNumberOfMessages=10
)
if('Messages' in response):
return response['Messages']
else:
return []
def parse_messages(messages):
return [_create_message_receipt(message) for message in messages]
def send_messages(client, target_queue, message_receipts):
entries_with_receipts = [
_create_entry_with_receipt(message_receipt) for message_receipt in message_receipts
]
entries = [
entries_with_receipt['entry'] for entries_with_receipt in entries_with_receipts
]
response = client.send_message_batch(
QueueUrl=target_queue,
Entries=entries
)
successful_ids = [
success['Id'] for success in response['Successful']
]
deletable_receipts = _flatten(
[_find_successful(
entries_with_receipts,
sent_message_id
) for sent_message_id in successful_ids]
)
return deletable_receipts
def delete_messages(client, queue_url, deletable_receipts):
return client.delete_message_batch(
QueueUrl=queue_url,
Entries=deletable_receipts
)
def sqs_redrive(client, source_queue, target_queue):
source_queue_url = get_queue(client, source_queue)
target_queue_url = get_queue(client, target_queue)
messages = receive_messages(client, source_queue_url)
if(messages):
message_receipts = parse_messages(messages)
deletable_receipts = send_messages(client, target_queue_url, message_receipts)
deletions = delete_messages(client, source_queue_url, deletable_receipts)
sent_ids = [message['MessageId'] for message in messages]
deleted_ids = [deletion['Id'] for deletion in deletions['Successful']]
print("Received:")
pprint(sent_ids)
print("Deleted:")
pprint(deleted_ids)
return True
else:
return False
def main():
parser = argparse.ArgumentParser()
parser.add_argument("source_queue", help="Queue to read from", type=str)
parser.add_argument("target_queue", help="Queue to send to", type=str)
parser.add_argument("role_arn", help="Role ARN to run as", type=str)
args = parser.parse_args()
client = create_client('sqs', args.role_arn)
while sqs_redrive(client, args.source_queue, args.target_queue):
print("--- Batch complete ---")
print("Done.")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment