Skip to content

Instantly share code, notes, and snippets.

@JavascriptMick
Created December 28, 2021 02:29
Show Gist options
  • Save JavascriptMick/56141fd06286b0aea7616836724d4a7c to your computer and use it in GitHub Desktop.
Save JavascriptMick/56141fd06286b0aea7616836724d4a7c to your computer and use it in GitHub Desktop.
Simple Python SQS Unordered Job Queue Implementation

Instructions

  1. Create an AWS account
  2. Create an SQS queue (standard type, not fifo)
  3. Ensure that the environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY are set
  4. use requirements.txt to get requirements (no I'm not using Celery at all, this is just a cheat way of getting the right dependencies)
  5. See usage.py for how to enqueue jobs
  6. run the worker to... work

python3 worker.py

import os
import json
import boto3
class SQSJob:
def __init__(self, job_params, sqs_message):
self.job_params = job_params
self.sqs_message = sqs_message
class SQSJobQueue:
def __init__(self, region_name, sqs_queue_name, max_number_of_messages=5, wait_time_seconds=5):
self.local_queue = []
self.max_number_of_messages = max_number_of_messages
self.wait_time_seconds = wait_time_seconds
#check for env
if not ('AWS_ACCESS_KEY_ID' in os.environ and 'AWS_SECRET_ACCESS_KEY' in os.environ):
raise Exception('FATAL: Environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY are not available')
#connect to SQS and get queue
self.sqs = boto3.resource('sqs', region_name=region_name)
self.queue = self.sqs.get_queue_by_name(QueueName=sqs_queue_name)
def enqueue_job(self, params):
response = self.queue.send_message(MessageBody=json.dumps(params), MessageAttributes={})
return response['MessageId']
def get_next_job(self):
if len(self.local_queue) > 0:
return self.local_queue.pop()
else:
print('fetching messages from queue')
messages = self.queue.receive_messages(MaxNumberOfMessages=self.max_number_of_messages, WaitTimeSeconds=self.wait_time_seconds)
print(f'found {len(messages)} messages')
if len(messages) > 0:
for sqs_message in messages:
self.local_queue.append(SQSJob(json.loads(sqs_message.body), sqs_message))
return self.local_queue.pop()
else:
return None
from sqsjobqueue import SQSJobQueue
import time
jobs = SQSJobQueue('your_region', 'your_queue')
jobs.enqueue_job({'some_id':'abc', 'maybe_you_might_like_to_add_time':time.time()}) # basically a dict with whatever you need to do the work
import time
from sqsjobqueue import SQSJobQueue
jobs = SQSJobQueue('your_region', 'your_queue_name') # use your preferred region e.g. ap-southeast-2
print("--------- Polling for Jobs ----------")
while True:
job = jobs.get_next_job()
if job is not None:
# Do work here
print(f'working on {job.job_params}')
time.sleep(10)
# If you finish successfully delete message from queue
job.sqs_message.delete()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment