Last active
September 16, 2019 11:33
-
-
Save kenoir/30f542bc5e541bf4ba69458f086bdcea to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
from pprint import pprint | |
import uuid | |
import argparse | |
import boto3 | |
def assumed_role_session(role_arn): | |
sts_client = boto3.client('sts') | |
assumed_role_object=sts_client.assume_role( | |
RoleArn=role_arn, | |
RoleSessionName=uuid.uuid1().hex | |
) | |
credentials=assumed_role_object['Credentials'] | |
session = boto3.Session( | |
aws_access_key_id=credentials['AccessKeyId'], | |
aws_secret_access_key=credentials['SecretAccessKey'], | |
aws_session_token=credentials['SessionToken'] | |
) | |
return session | |
def create_client(service_name, role_arn): | |
session = assumed_role_session(role_arn) | |
return session.client(service_name) | |
# ------------ | |
def _create_message_receipt(message): | |
return { | |
'MessageId': message['MessageId'], | |
'ReceiptHandle': message['ReceiptHandle'], | |
'MessageBody': json.loads(message['Body']), | |
} | |
def _create_entry_with_receipt(message_receipt): | |
return { | |
'receipt': { | |
'Id': message_receipt['MessageId'], | |
'ReceiptHandle': message_receipt['ReceiptHandle'], | |
}, | |
'entry': { | |
'Id': str(uuid.uuid1()), | |
'MessageBody': json.dumps(message_receipt['MessageBody']), | |
} | |
} | |
def _find_successful(entries_with_receipts, message_id): | |
return [ | |
entries_with_receipt['receipt'] | |
for entries_with_receipt in entries_with_receipts if entries_with_receipt['entry']['Id'] == message_id | |
] | |
def _flatten(list_of_lists): | |
return [val for sublist in list_of_lists for val in sublist] | |
def get_queue(client, queue_name): | |
response = client.get_queue_url( | |
QueueName=queue_name | |
) | |
return response['QueueUrl'] | |
def receive_messages(client, queue_url): | |
response = client.receive_message( | |
QueueUrl=queue_url, | |
AttributeNames=['All'], | |
MaxNumberOfMessages=10 | |
) | |
if('Messages' in response): | |
return response['Messages'] | |
else: | |
return [] | |
def parse_messages(messages): | |
return [_create_message_receipt(message) for message in messages] | |
def send_messages(client, target_queue, message_receipts): | |
entries_with_receipts = [ | |
_create_entry_with_receipt(message_receipt) for message_receipt in message_receipts | |
] | |
entries = [ | |
entries_with_receipt['entry'] for entries_with_receipt in entries_with_receipts | |
] | |
response = client.send_message_batch( | |
QueueUrl=target_queue, | |
Entries=entries | |
) | |
successful_ids = [ | |
success['Id'] for success in response['Successful'] | |
] | |
deletable_receipts = _flatten( | |
[_find_successful( | |
entries_with_receipts, | |
sent_message_id | |
) for sent_message_id in successful_ids] | |
) | |
return deletable_receipts | |
def delete_messages(client, queue_url, deletable_receipts): | |
return client.delete_message_batch( | |
QueueUrl=queue_url, | |
Entries=deletable_receipts | |
) | |
def sqs_redrive(client, source_queue, target_queue): | |
source_queue_url = get_queue(client, source_queue) | |
target_queue_url = get_queue(client, target_queue) | |
messages = receive_messages(client, source_queue_url) | |
if(messages): | |
message_receipts = parse_messages(messages) | |
deletable_receipts = send_messages(client, target_queue_url, message_receipts) | |
deletions = delete_messages(client, source_queue_url, deletable_receipts) | |
sent_ids = [message['MessageId'] for message in messages] | |
deleted_ids = [deletion['Id'] for deletion in deletions['Successful']] | |
print("Received:") | |
pprint(sent_ids) | |
print("Deleted:") | |
pprint(deleted_ids) | |
return True | |
else: | |
return False | |
def main(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument("source_queue", help="Queue to read from", type=str) | |
parser.add_argument("target_queue", help="Queue to send to", type=str) | |
parser.add_argument("role_arn", help="Role ARN to run as", type=str) | |
args = parser.parse_args() | |
client = create_client('sqs', args.role_arn) | |
while sqs_redrive(client, args.source_queue, args.target_queue): | |
print("--- Batch complete ---") | |
print("Done.") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment