|
import channels |
|
import time |
|
import redis |
|
import Queue |
|
import select |
|
from redis.exceptions import ( |
|
ConnectionError, |
|
TimeoutError, |
|
) |
|
from channels.log import setup_logger |
|
from django.core.management.base import BaseCommand |
|
|
|
|
|
logger = setup_logger('django.channels') |
|
|
|
CHANNEL_NAME = 'channels.delay' |
|
SELECT_TIMEOUT = 5 |
|
BLPOP_TIMEOUT = 5 |
|
|
|
|
|
class Task(object): |
|
def __init__(self, channel, delay, content): |
|
self.channel = channel |
|
self.delay = delay |
|
self.content = content |
|
self.scheduled_time = time.time() + delay |
|
|
|
def is_time(self): |
|
return time.time() >= self.scheduled_time |
|
|
|
def timeout(self): |
|
return max(self.scheduled_time - time.time(), 0) |
|
|
|
def __cmp__(self, other): |
|
return cmp(self.scheduled_time, other.scheduled_time) |
|
|
|
|
|
class Worker(): |
|
def __init__(self, channel_layer): |
|
self.channel_layer = channel_layer |
|
host = channel_layer.hosts[0] |
|
prefix = channel_layer.prefix |
|
|
|
self.channel_key = prefix + CHANNEL_NAME |
|
|
|
self.redis = redis.Redis.from_url(host) |
|
self.blpop_connection = self.redis.connection_pool.get_connection('BLPOP') |
|
self.queue = Queue.PriorityQueue() |
|
self.select_timeout = SELECT_TIMEOUT |
|
self.blpop_timeout = BLPOP_TIMEOUT |
|
|
|
def process_blpop_response(self, channel, task_key): |
|
content = self.redis.get(task_key) |
|
if content: |
|
task_data = self.channel_layer.deserialize(content) |
|
task = Task(**task_data) |
|
self.queue.put(task) |
|
|
|
def run(self): |
|
connection = self.blpop_connection |
|
|
|
while True: |
|
try: |
|
connection.send_command('BLPOP', self.channel_key, self.blpop_timeout) |
|
except ConnectionError, e: |
|
connection.disconnect() |
|
logger.error(e) |
|
time.sleep(5) |
|
continue |
|
|
|
if self.queue.empty(): |
|
next_task = None |
|
timeout = self.select_timeout |
|
else: |
|
next_task = self.queue.get() |
|
timeout = next_task.timeout() |
|
|
|
inp, _, _ = select.select([connection._sock, ], [], [], timeout) |
|
|
|
if inp: |
|
try: |
|
resp = connection.read_response() |
|
if resp: |
|
self.process_blpop_response(*resp) |
|
except (ConnectionError, TimeoutError) as e: |
|
connection.disconnect() |
|
logger.error(e) |
|
except TypeError: |
|
logger.error("Invalid task format") |
|
except Exception, e: |
|
logger.error(e) |
|
|
|
if next_task: |
|
if next_task.is_time(): |
|
# logger.info("Sending task to channel: %s" % next_task.channel) |
|
channels.Channel(next_task.channel).send(next_task.content) |
|
else: |
|
self.queue.put(next_task) |
|
|
|
|
|
class Command(BaseCommand): |
|
def handle(self, *args, **options): |
|
channel_layer = channels.asgi.channel_layers[channels.DEFAULT_CHANNEL_LAYER] |
|
worker = Worker(channel_layer) |
|
worker.run() |