Created
February 29, 2024 07:49
-
-
Save mark-mishyn/89f8dbd09f24d28f99fc993e9ce577be to your computer and use it in GitHub Desktop.
SQS messages processing using long polling, it a very lightweight alternative for Celery
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import threading | |
import boto3 | |
from configurations.config_store import Config | |
from src.routes.frontapp.tasks import ( | |
add_tags_to_conversation, | |
reply_to_front_message, | |
) | |
from src.utils.aws import get_queue_url_by_name | |
sqs = boto3.client("sqs", region_name=Config.AWS_REGION) | |
message_handler_map = { | |
reply_to_front_message.__name__: reply_to_front_message, | |
add_tags_to_conversation.__name__: add_tags_to_conversation, | |
} | |
def process_and_delete_message(message: dict, queue_url: str): | |
message_body = json.loads(message["Body"]) | |
function_name = message_body["handler_function"] | |
payload = message_body["payload"] | |
# actual processing | |
message_handler_map[function_name](payload) | |
sqs.delete_message(QueueUrl=queue_url, ReceiptHandle=message["ReceiptHandle"]) | |
def long_poll_sqs(): | |
queue_url = get_queue_url_by_name(Config.FRONTAPP_QUEUE, sqs) | |
while True: | |
response = sqs.receive_message( | |
QueueUrl=queue_url, | |
MaxNumberOfMessages=10, | |
WaitTimeSeconds=20, | |
) | |
messages = response.get("Messages", []) | |
for message in messages: | |
threading.Thread( | |
target=process_and_delete_message, args=(message, queue_url) | |
).start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment