Created
February 27, 2019 01:54
-
-
Save lucasnad27/5453ad54411ffa54ce74198418df70cb to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from enum import Enum | |
import rq | |
from redis import Redis | |
import settings | |
class Priority(Enum): | |
low = 'low' | |
default = 'default' | |
high = 'high' | |
def publish(fn, priority=Priority.default, skip_duplicate=False, *args, **kwargs): | |
"""Publishes a message to the queue""" | |
redis_conn = Redis(host=settings.WORKER_REDIS_HOST) | |
q = rq.Queue(priority.value, connection=redis_conn) | |
if skip_duplicate: | |
# check to make sure there isn't already a job in the queue | |
raise NotImplementedError | |
q.enqueue(fn, *args, **kwargs) | |
def consume(priority): | |
"""Consumes messages from a specified queue""" | |
conn = Redis(host=settings.WORKER_REDIS_HOST) | |
with rq.Connection(conn): | |
worker = rq.Worker(priority.value) | |
worker.work() | |
""" | |
# existing imlementsation | |
for customer in orm.Customer.query.all(): | |
queue.enqueue( | |
send_customer_listings_count_report, | |
kwargs={"geoid": customer.geoid, "email": customer.email}, | |
timeout=(60 * 5), # 5 minutes | |
) | |
# becomes | |
for customer in orm.Customer.query.all(): | |
worker.publish( | |
send_customer_listings_count_report, | |
kwargs={"geoid": customer.geoid, "email": customer.email}, | |
) | |
""" |
@lucasnad27 can you please give an example of
skip_duplicate
? I'd like to understand why do we need this feature.
This "feature" already exists by default. Not sure what I want to do with it yet...
# check for duplicate already running
registry = StartedJobRegistry(queue, connection=conn)
for job_id in registry.get_job_ids():
job = q.fetch_job(job_id)
if job.description == kwargs["description"]:
return job
# check for duplicate in the queue
for job in q.jobs:
if job.description == kwargs["description"]:
return job
It's meant to stop us from repeating the same crawl more than once based on the description of the task. I don't really like this pattern, seems a bit hacky to stop us from having long running workers step on top of each other. But I'd like to avoid this check unless we explicitly want to check for a duplicate
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@lucasnad27 can you please give an example of
skip_duplicate
? I'd like to understand why do we need this feature.