Created
February 27, 2019 01:54
-
-
Save lucasnad27/5453ad54411ffa54ce74198418df70cb to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from enum import Enum | |
import rq | |
from redis import Redis | |
import settings | |
class Priority(Enum): | |
low = 'low' | |
default = 'default' | |
high = 'high' | |
def publish(fn, priority=Priority.default, skip_duplicate=False, *args, **kwargs): | |
"""Publishes a message to the queue""" | |
redis_conn = Redis(host=settings.WORKER_REDIS_HOST) | |
q = rq.Queue(priority.value, connection=redis_conn) | |
if skip_duplicate: | |
# check to make sure there isn't already a job in the queue | |
raise NotImplementedError | |
q.enqueue(fn, *args, **kwargs) | |
def consume(priority): | |
"""Consumes messages from a specified queue""" | |
conn = Redis(host=settings.WORKER_REDIS_HOST) | |
with rq.Connection(conn): | |
worker = rq.Worker(priority.value) | |
worker.work() | |
""" | |
# existing imlementsation | |
for customer in orm.Customer.query.all(): | |
queue.enqueue( | |
send_customer_listings_count_report, | |
kwargs={"geoid": customer.geoid, "email": customer.email}, | |
timeout=(60 * 5), # 5 minutes | |
) | |
# becomes | |
for customer in orm.Customer.query.all(): | |
worker.publish( | |
send_customer_listings_count_report, | |
kwargs={"geoid": customer.geoid, "email": customer.email}, | |
) | |
""" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
It's meant to stop us from repeating the same crawl more than once based on the description of the task. I don't really like this pattern, seems a bit hacky to stop us from having long running workers step on top of each other. But I'd like to avoid this check unless we explicitly want to check for a duplicate