Skip to content

Instantly share code, notes, and snippets.

@chao-he
Created November 26, 2013 08:27
Show Gist options
  • Save chao-he/7655058 to your computer and use it in GitHub Desktop.
Save chao-he/7655058 to your computer and use it in GitHub Desktop.
beanstalk worker
#!/usr/bin/python26
from tornado.ioloop import IOLoop
from tornado.options import parse_command_line
from tornado.options import options,define
from tornado.escape import utf8
from beanstalkt import Client as BeansClient
from beanstalkt import TimedOut
from urllib import quote_plus
from urlparse import urlparse
from collections import deque
import simplejson as json
import time
import sys
import logging
UA = "Mozilla/5.0 (MSIE 9.0; Windows NT 6.1; Trident/5.0)"
CONNECT_TIMEOUT = 1
STATUS_CHECK_INTERVAL = 60
define("bs_input", "__fetch_input__")
define("bs_output", "__fetch_output__")
define("bs_host", "localhost")
define("bs_port", 8367)
class BsWorker(object):
def __init__(self):
self.failures = 0
self.total_input = 0
self._queue = deque()
self.bsclient = BeansClient(options.bs_host, options.bs_port)
def Setup(self, watch, ignore, using='default', cb=None):
def do_next(_=None):
try:
if watch:
self.bsclient.watch(watch.pop(), do_next)
elif ignore:
self.bsclient.ignore(ignore.pop(), do_next)
elif using != 'default':
self.bsclient.use(using, cb)
elif cb:
cb()
except Exception,e:
logging.error("setup error", exc_info=True)
def is_connected(_=None):
if self.bsclient.closed():
logging.error("cannot connected to beanstalk, system exit")
sys.exit(1)
else:
logging.info("system startup ....")
self.bsclient.set_reconnect_callback(lambda _:self.DoWork())
do_next()
self.bsclient.connect(None)
IOLoop.instance().add_timeout(time.time() + CONNECT_TIMEOUT, is_connected)
def Run(self, _=None):
def setup_cb(_=None):
self.DoWork()
self.Status()
watch = set([options.bs_input])
ignore = set(['default'])
using = options.bs_output
self.Setup(watch, ignore, using, setup_cb)
def Status(self):
logging.info("input = %d, failures = %d",
self.total_input,
self.failures,
)
IOLoop.instance().add_timeout(time.time() + STATUS_CHECK_INTERVAL, self.Status)
def DoWork(self):
def del_cb(pid, _):
logging.info("task-%s: deleted", pid)
IOLoop.instance().add_callback(self.DoWork)
def put(pid, jid):
if jid is None:
pass
elif isinstance(jid, Exception):
logging.error("task-%d: put fail => %s", pid, jid)
else:
logging.info("task-%d: put OK, new id = %d", pid, jid)
if self._queue:
url = self._queue.popleft()
self.bsclient.put(url, ttr=600, callback=lambda jid: put(pid, jid))
else:
self.bsclient.delete(pid, callback=lambda _:del_cb(pid, _))
def reserve_cb(job):
if isinstance(job, dict):
logging.info("task-%d: reserved", job["id"])
self.Process(job)
put(job["id"], None)
else:
if not isinstance(job, TimedOut):
logging.error("reserve fail: %s", str(job), exc_info=True)
self.bsclient.reserve(timeout=1, callback=reserve_cb)
self.bsclient.reserve(timeout=1, callback=reserve_cb)
def Process(self, job):
pass
def main():
IOLoop.instance().add_callback(BsWorker().Run)
IOLoop.instance().start()
if __name__ == "__main__":
parse_command_line()
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment