-
-
Save sblack4/57580950f6d8a741ed197e125cac32f6 to your computer and use it in GitHub Desktop.
Saves all messages from an AWS SQS queue into a folder, messages are TXT or JSON
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import argparse | |
import boto3 | |
import json | |
import os | |
from datetime import datetime | |
parser = argparse.ArgumentParser( | |
description='Saves all messages from an AWS SQS queue into a folder.') | |
parser.add_argument( | |
'-q', '--queue-url', dest='queue_url', type=str, required=True, | |
help='The URL of the AWS SQS queue to save.') | |
parser.add_argument( | |
'-o', '--output', dest='output', type=str, default='queue-messages', | |
help='The output folder for saved messages.') | |
parser.add_argument( | |
'-t', '--type', dest='type', type=str, default='TXT', | |
help='Message type. JSON, TXT') | |
parser.add_argument( | |
'-d', '--delete', dest='delete', default=False, action='store_true', | |
help='Whether or not to delete saved messages from the queue.') | |
parser.add_argument( | |
'-v', '--visibility', dest='visibility', type=int, default=60, | |
help='The message visibility timeout for saved messages.') | |
args = parser.parse_args() | |
if not os.path.exists(args.output): | |
os.makedirs(args.output) | |
client = boto3.client('sqs') | |
count = 0 | |
while True: | |
response = client.receive_message( | |
QueueUrl=args.queue_url, | |
MaxNumberOfMessages=10, | |
AttributeNames=['ApproximateFirstReceiveTimestamp'], | |
MessageAttributeNames=['All'], | |
VisibilityTimeout=args.visibility | |
) | |
messages = response.get('Messages') | |
if not messages or len(messages) == 0: | |
break | |
for msg in messages: | |
d = datetime.fromtimestamp(int(msg['Attributes']['ApproximateFirstReceiveTimestamp'])/1000) | |
path = os.path.join(args.output, d.strftime('%Y-%m-%d')) | |
if not os.path.exists(path): | |
os.mkdir(path) | |
if args.type == 'JSON': | |
filename = os.path.join(path, msg['MessageId'] + ".json") | |
obj = json.loads(msg['Body']) | |
with open(filename, 'w') as f: | |
json.dump(obj, f, indent=2) | |
count += 1 | |
print(f'Saved message to {filename}') | |
if args.delete: | |
client.delete_message(QueueUrl=args.queue_url, ReceiptHandle=msg['ReceiptHandle']) | |
if args.type == 'TXT': | |
filename = os.path.join(path, msg['MessageId'] + ".txt") | |
obj = msg['Body'] | |
with open(filename, 'w') as f: | |
f.writelines(obj) | |
count += 1 | |
print(f'Saved message to {filename}') | |
if args.delete: | |
client.delete_message(QueueUrl=args.queue_url, ReceiptHandle=msg['ReceiptHandle']) | |
print(f'{count} messages saved') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment