|
import time |
|
|
|
import boto3 |
|
from botocore.exceptions import ClientError |
|
|
|
sqs = boto3.resource("sqs", endpoint_url="http://localhost:4576") |
|
|
|
|
|
def create_queue(name, attributes=None): |
|
if not attributes: |
|
attributes = {} |
|
try: |
|
queue = sqs.create_queue(QueueName=name, Attributes=attributes) |
|
print(f"Created queue '{name}' with URL = {queue.url}") |
|
except ClientError as error: |
|
print(f"Couldn't create queue named '{name}'.") |
|
raise error |
|
else: |
|
return queue |
|
|
|
|
|
def send_message(queue, message_body, message_attributes=None): |
|
if not message_attributes: |
|
message_attributes = {} |
|
|
|
try: |
|
response = queue.send_message(MessageBody=message_body, MessageAttributes=message_attributes) |
|
except ClientError as error: |
|
print(f"Send message failed: {message_body}") |
|
raise error |
|
else: |
|
return response |
|
|
|
|
|
def receive_messages(queue, max_number=10, wait_time=2): |
|
try: |
|
messages = queue.receive_messages( |
|
AttributeNames=["All"], |
|
MessageAttributeNames=["All"], |
|
MaxNumberOfMessages=max_number, |
|
WaitTimeSeconds=wait_time |
|
) |
|
for msg in messages: |
|
print(f"Received message: ID = {msg.message_id}, body = {msg.body}") |
|
except ClientError as error: |
|
print(f"Couldn't receive messages from queue: '{queue}'") |
|
raise error |
|
else: |
|
return messages |
|
|
|
|
|
# Might want to introduce randomness here, if synchronized concurrent clients is a possibility. |
|
# See: https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter |
|
def get_exponential_backoff_wait_time(n, base_wait_time_sec, max_wait_time_sec): |
|
return int(min((2 ** n) + base_wait_time_sec, max_wait_time_sec)) |
|
|
|
|
|
def main(): |
|
max_attempts = 5 |
|
visibility_timeout = 5 |
|
max_wait_time = 60 |
|
|
|
# Send a single message to the queue |
|
queue = create_queue("backoff_test_queue", {"VisibilityTimeout": str(visibility_timeout)}) |
|
queue.purge() |
|
send_message(queue, "test") |
|
|
|
start_time = time.time_ns() |
|
|
|
num_attempts = 0 |
|
wait_time = 0 |
|
total_wait_time = 0 |
|
|
|
# Listen for messages |
|
while num_attempts < max_attempts: |
|
messages = receive_messages(queue) |
|
|
|
# Empty retrieve - do nothing |
|
if len(messages) == 0: |
|
continue |
|
|
|
msg = messages[0] |
|
|
|
# This is where you would do something with the message. If that 'something' was successful, the message |
|
# would then be deleted from the queue. In this example, we just do nothing to simulate a 'failure', and |
|
# just leave the message on the queue to be reprocessed. |
|
|
|
print(f"Attempt: {num_attempts}") |
|
num_attempts = int(msg.attributes["ApproximateReceiveCount"]) |
|
|
|
wait_time = get_exponential_backoff_wait_time(num_attempts, visibility_timeout, max_wait_time) |
|
total_wait_time += wait_time |
|
print(f"Wait time: {wait_time}") |
|
|
|
# Update the message with the new visibility timeout. This is the 'backoff' step |
|
msg.change_visibility(VisibilityTimeout=wait_time) |
|
|
|
end_time = time.time_ns() |
|
elapsed_sec = (end_time - start_time) // 1_000_000_000 |
|
theoretical_wait_time_sec = total_wait_time - wait_time |
|
|
|
print(f"Max retries reached. Waited {elapsed_sec} sec. Expected wait {theoretical_wait_time_sec} sec") |
|
|
|
queue.purge() |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |