Skip to content

Instantly share code, notes, and snippets.

@jg75
Created April 6, 2018 13:49
Show Gist options
  • Save jg75/5e79bcf54acee6d9f849d964b7ba95d3 to your computer and use it in GitHub Desktop.
Save jg75/5e79bcf54acee6d9f849d964b7ba95d3 to your computer and use it in GitHub Desktop.
"""Listens to an SNS Queue and pops messages."""
from multiprocessing import Process
import boto3
sqs = boto3.client("sqs")
queue_url = ""
visibility_timeout = 30
wait_percent = 0.8
max_wait = 3
def handle_message(message, visibility_timeout=0, delete=True):
"""Print message body & delete the message from the queue."""
group_id = message['Attributes']['MessageGroupId']
print("handling message: {}".format(group_id))
# Process message
# ...
# Done processing
if delete:
print("deleting message")
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=message["ReceiptHandle"]
)
def main():
"""Listen to a queue for new messages."""
print("Listening for messages on the Queue...")
while True:
response = sqs.receive_message(
QueueUrl=queue_url,
AttributeNames=["MessageGroupId"],
WaitTimeSeconds=10
)
messages = response.get("Messages", [])
if not messages:
continue
for message in response.get("Messages", []):
# Monitor the process and update the message visibility
# if it looks like it's taking too long to complete.
# At some point it's going to have to just give up though.
# At what point is that?
waits = 1
receipt_handle = message['ReceiptHandle']
process = Process(
target=handle_message,
args=(message, delete=False)
)
process.start()
while process.is_alive() and waits <= max_waits:
process.join(int(visibility_timeout * wait_percent))
if process.is_alive():
sqs.change_message_visibility(
QueueUrl=queue_url,
ReceiptHandle=receipt_handle,
VisibilityTimout=visibility_timeout
)
if process.is_alive():
process.terminate()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment