Skip to content

Instantly share code, notes, and snippets.

@jadhavmanoj
Last active April 2, 2019 07:53
Show Gist options
  • Save jadhavmanoj/1950c0b3b2ff31478bc942762ac20f27 to your computer and use it in GitHub Desktop.
Save jadhavmanoj/1950c0b3b2ff31478bc942762ac20f27 to your computer and use it in GitHub Desktop.
Add task in SQS queue without delay, apply async function from lambda
# Author: Manoj jadhav
from __future__ import print_function
import json
import urllib
import boto3
import uuid
import base64
sqs = boto3.client("sqs")
QUEUE_URL = "<SQS_QUEUE_URL>"
"""
{
"task_name": "tasks.your_task_name",
"task_args": [
"your", "task", "args"
]
}
"""
def lambda_handler(event, context):
print("Received event: " + json.dumps(event, indent=4))
task_name = "notification.send_message"
task_args = None
task_kwargs = {}
# generate celery task message
msg_id = str(uuid.uuid4())
msg_envelope = {
"content-encoding": "utf-8",
"content-type": "application/json",
"headers": {},
"properties": {
"body_encoding": "base64",
"correlation_id": msg_id,
"delivery_info": {
"exchange": None,
"routing_key": None
},
"delivery_tag": None
}
}
msg_body = {
"task": task_name,
"args": task_args,
"kwargs": task_kwargs,
"id": msg_id,
"retries": 0
}
# package celery task message
msg_envelope["body"] = base64.b64encode(json.dumps(msg_body))
msg = base64.b64encode(json.dumps(msg_envelope))
# send message to sqs
response = sqs.send_message(
QueueUrl=QUEUE_URL,
MessageBody=msg
)
if response["MessageId"]:
print("Task message sent to sqs: " + json.dumps(msg_body, indent=4))
else:
raise Exception("Task message unable to send: " + json.dumps(msg_body, indent=4))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment