Created
April 22, 2009 14:43
-
-
Save ask/99830 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import settings | |
from django.core.management import setup_environ | |
setup_environ(settings) | |
from djangofeeds.messaging import consumer as feed_consumer | |
from djangofeeds.importers import FeedImporter | |
from UserList import UserList | |
import multiprocessing | |
import simplejson | |
import logging | |
import time | |
# If the queue is empty, this is the time *in seconds* the daemon sleeps | |
# until it wakes up to check if there's any new messages on the queue. | |
QUEUE_WAKEUP_AFTER = 0.3 | |
# As long as the queue is empty, the daemon logs a "Queue is empty" message | |
# every ``EMPTY_MESG_EMIT_EVERY`` *seconds*. | |
EMPTY_MSG_EMIT_EVERY = 5 | |
class EmptyQueue(Exception): | |
"""The message queue is currently empty.""" | |
class UnknownAction(Exception): | |
"""Got an unknown action in the queue. The message is requeued and | |
ignored.""" | |
class ProcessQueue(UserList): | |
"""Queue of running child processes, which starts waiting for the | |
processes to finish when the queue limit is reached.""" | |
def __init__(self, limit, done_msg="Got %s"): | |
self.limit = limit | |
self.data = [] | |
def add(self, result): | |
self.data.append(result) | |
if len(self.data) >= self.limit: | |
for result in self.data: | |
value = result.get() | |
self.data = [] | |
def refresh_feed(feed_url, loglevel=logging.WARNING, logfile=None): | |
"""Refresh a djangofeed feed, supports multiprocessing.""" | |
logger = setup_logger(loglevel, logfile) | |
importer = FeedImporter(update_on_import=True, logger=logger) | |
importer.import_feed(feed_url) | |
return feed_url | |
def setup_logger(loglevel=None, logfile=None): | |
"""Setup the ``multiprocessing`` logger. If ``logfile`` is not specified, | |
``stderr`` is used. | |
Returns logger object. | |
""" | |
logger = multiprocessing.get_logger() | |
if logfile: | |
log_file_handler = logging.FileHandler(logfile) | |
logger.addHandler(log_file_handler) | |
else: | |
multiprocessing.log_to_stderr() | |
logger.setLevel(loglevel) | |
return logger | |
class RefreshFeedDaemon(object): | |
"""Refreshes feed_urls in the queue using a process pool. | |
``concurrency`` is the number of simultaneous processes. | |
""" | |
def __init__(self, concurrency=None, logfile=None, loglevel=None): | |
self.loglevel = loglevel or self.loglevel | |
self.concurrency = concurrency or self.concurrency | |
self.logfile = logfile or self.logfile | |
self.logger = setup_logger(loglevel, logfile) | |
self.pool = multiprocessing.Pool(self.concurrency) | |
def fetch_next_feed(self): | |
message = feed_consumer.fetch() | |
if message is None: # No messages waiting. | |
raise EmptyQueue() | |
data = simplejson.loads(message.body) | |
action = data.get("action", "") | |
if action != "import_feed": | |
message.reject() | |
raise UnknownAction(action) | |
if data.get("action", "") == "import_feed": | |
feed_url = data["feed_url"] | |
self.logger.info(">>> Importing feed: %s" % feed_url) | |
try: | |
result = self.pool.apply_async(refresh_feed, [ | |
feed_url, self.loglevel, self.logfile]) | |
except: | |
message.reject() | |
raise | |
message.ack() | |
return result | |
def run(self): | |
results = ProcessQueue(self.concurrency, | |
"Feed %s successfully refreshed") | |
last_empty_emit = None | |
while True: | |
try: | |
result = self.fetch_next_feed() | |
except EmptyQueue: | |
if not last_empty_emit or \ | |
time.time() > last_empty_emit + EMPTY_MSG_EMIT_EVERY: | |
self.logger.info("Waiting for queue.") | |
last_empty_emit = time.time() | |
time.sleep(QUEUE_WAKEUP_AFTER) | |
continue | |
except UnknownAction, e: | |
self.logger.info("Unknown action %s requeued and ignored.") | |
continue | |
results.add(result) | |
def main(concurrency=5, loglevel=logging.DEBUG, logfile="rdaemon.log"): | |
daemon = RefreshFeedDaemon(concurrency=concurrency, | |
loglevel=loglevel, | |
logfile=logfile) | |
daemon.run() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment