Created
November 26, 2013 08:27
-
-
Save chao-he/7655058 to your computer and use it in GitHub Desktop.
beanstalk worker
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python26 | |
from tornado.ioloop import IOLoop | |
from tornado.options import parse_command_line | |
from tornado.options import options,define | |
from tornado.escape import utf8 | |
from beanstalkt import Client as BeansClient | |
from beanstalkt import TimedOut | |
from urllib import quote_plus | |
from urlparse import urlparse | |
from collections import deque | |
import simplejson as json | |
import time | |
import sys | |
import logging | |
UA = "Mozilla/5.0 (MSIE 9.0; Windows NT 6.1; Trident/5.0)" | |
CONNECT_TIMEOUT = 1 | |
STATUS_CHECK_INTERVAL = 60 | |
define("bs_input", "__fetch_input__") | |
define("bs_output", "__fetch_output__") | |
define("bs_host", "localhost") | |
define("bs_port", 8367) | |
class BsWorker(object): | |
def __init__(self): | |
self.failures = 0 | |
self.total_input = 0 | |
self._queue = deque() | |
self.bsclient = BeansClient(options.bs_host, options.bs_port) | |
def Setup(self, watch, ignore, using='default', cb=None): | |
def do_next(_=None): | |
try: | |
if watch: | |
self.bsclient.watch(watch.pop(), do_next) | |
elif ignore: | |
self.bsclient.ignore(ignore.pop(), do_next) | |
elif using != 'default': | |
self.bsclient.use(using, cb) | |
elif cb: | |
cb() | |
except Exception,e: | |
logging.error("setup error", exc_info=True) | |
def is_connected(_=None): | |
if self.bsclient.closed(): | |
logging.error("cannot connected to beanstalk, system exit") | |
sys.exit(1) | |
else: | |
logging.info("system startup ....") | |
self.bsclient.set_reconnect_callback(lambda _:self.DoWork()) | |
do_next() | |
self.bsclient.connect(None) | |
IOLoop.instance().add_timeout(time.time() + CONNECT_TIMEOUT, is_connected) | |
def Run(self, _=None): | |
def setup_cb(_=None): | |
self.DoWork() | |
self.Status() | |
watch = set([options.bs_input]) | |
ignore = set(['default']) | |
using = options.bs_output | |
self.Setup(watch, ignore, using, setup_cb) | |
def Status(self): | |
logging.info("input = %d, failures = %d", | |
self.total_input, | |
self.failures, | |
) | |
IOLoop.instance().add_timeout(time.time() + STATUS_CHECK_INTERVAL, self.Status) | |
def DoWork(self): | |
def del_cb(pid, _): | |
logging.info("task-%s: deleted", pid) | |
IOLoop.instance().add_callback(self.DoWork) | |
def put(pid, jid): | |
if jid is None: | |
pass | |
elif isinstance(jid, Exception): | |
logging.error("task-%d: put fail => %s", pid, jid) | |
else: | |
logging.info("task-%d: put OK, new id = %d", pid, jid) | |
if self._queue: | |
url = self._queue.popleft() | |
self.bsclient.put(url, ttr=600, callback=lambda jid: put(pid, jid)) | |
else: | |
self.bsclient.delete(pid, callback=lambda _:del_cb(pid, _)) | |
def reserve_cb(job): | |
if isinstance(job, dict): | |
logging.info("task-%d: reserved", job["id"]) | |
self.Process(job) | |
put(job["id"], None) | |
else: | |
if not isinstance(job, TimedOut): | |
logging.error("reserve fail: %s", str(job), exc_info=True) | |
self.bsclient.reserve(timeout=1, callback=reserve_cb) | |
self.bsclient.reserve(timeout=1, callback=reserve_cb) | |
def Process(self, job): | |
pass | |
def main(): | |
IOLoop.instance().add_callback(BsWorker().Run) | |
IOLoop.instance().start() | |
if __name__ == "__main__": | |
parse_command_line() | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment