Created
December 6, 2019 16:06
-
-
Save get-data-/87a66ef48063b7b9ebdfa0ef339ef080 to your computer and use it in GitHub Desktop.
lambda handler for sqs msg handling
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# handler.py | |
"""AWS Lambda function for transferring files from an S3 bucket to another | |
S3 bucket on a CreateObject event. | |
Required env vars: | |
:param SQS_QUEUE_URL: URL of SQS queue | |
Optional env vars: | |
:param LOGGING_LEVEL: string log level (default ERROR) | |
:param NUM_MESSAGES: int # of msg to retrieve per execution (default 1) | |
:validation NUM_MESSAGES: accepts int 1 - 10 | |
""" | |
import os | |
import boto3 | |
import logging | |
from botocore.exceptions import ClientError | |
logger = logging.getLogger() | |
logger.basicConfig( | |
level=logging.ERROR, | |
format='%(levelname)s: %(asctime)s: %(message)s') | |
logger.setLevel(os.getenv('LOGGING_LEVEL', 'ERROR')) | |
def main(event, context): | |
"""Retrieve messages from a dead letter queue to retry. | |
This is the Lambda entry point. It queries an sqs queue and retries failed | |
S3 PUT events caused by a delay in write consistency. (if successful), it | |
deletes messages from the queue after invoking the inbound-sync. | |
:param event: dict, the event payload delivered by Lambda. | |
:param context: a LambdaContext object - unused. | |
""" | |
# Assign this value before running the program | |
sqs_queue_url = os.getenv('SQS_QUEUE_URL', None) | |
if not sqs_queue_url: | |
raise ValueError('SQS_QUEUE_URL Environment variable missing') | |
num_messages = os.getenv('NUM_MESSAGES', 1) | |
logger.info(f"Inbound Sync DLQ Retry: Looking into queue") | |
# Retrieve SQS messages | |
msgs = retrieve_sqs_messages(sqs_queue_url, num_messages) | |
if msgs is not None: | |
for msg in msgs: | |
logging.info( | |
f'SQS: Message ID: {msg["MessageId"]}, ' | |
f'Contents: {msg["Body"]}') | |
# Remove the message from the queue | |
delete_sqs_message(sqs_queue_url, msg['ReceiptHandle']) | |
def retrieve_sqs_messages( | |
sqs_queue_url, num_msgs=1, wait_time=0, visibility_time=5): | |
"""Retrieve messages from an SQS queue | |
The retrieved messages are not deleted from the queue. | |
:param sqs_queue_url: String URL of existing SQS queue | |
:param num_msgs: Number of messages to retrieve (1-10) | |
:param wait_time: Number of seconds to wait if no messages in queue | |
:param visibility_time: Number of seconds to make retrieved messages | |
hidden from subsequent retrieval requests | |
:return: List of retrieved messages. If no messages are available, | |
returned list is empty. If error, returns None. | |
""" | |
# Validate number of messages to retrieve | |
if num_msgs < 1: | |
num_msgs = 1 | |
elif num_msgs > 10: | |
num_msgs = 10 | |
# Retrieve messages from an SQS queue | |
sqs_client = boto3.client('sqs') | |
try: | |
msgs = sqs_client.receive_message( | |
QueueUrl=sqs_queue_url, | |
MaxNumberOfMessages=num_msgs, | |
WaitTimeSeconds=wait_time, | |
VisibilityTimeout=visibility_time) | |
except ClientError as e: | |
logging.error(e) | |
# Return the list of retrieved messages | |
return msgs['Messages'] | |
def delete_sqs_message(sqs_queue_url, msg_receipt_handle): | |
"""Delete a message from an SQS queue | |
:param sqs_queue_url: String URL of existing SQS queue | |
:param msg_receipt_handle: Receipt handle value of retrieved message | |
""" | |
# Delete the message from the SQS queue | |
sqs_client = boto3.client('sqs') | |
sqs_client.delete_message( | |
QueueUrl=sqs_queue_url, | |
ReceiptHandle=msg_receipt_handle) | |
if __name__ == "__main__": | |
import sys | |
import json | |
try: | |
with open(sys.argv[1]) as f: | |
event = json.loads(f.read()) | |
handler = logging.StreamHandler(sys.stdout) | |
logger.addHandler(handler) | |
main(event, '') | |
except Exception as ex: | |
logger.exception(ex) | |
raise SystemExit() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment