Created
November 17, 2012 08:16
-
-
Save jdunck/4094242 to your computer and use it in GitHub Desktop.
leaky bucket queue - redis 2.6 + lua + python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#cribbed from http://vimeo.com/52569901 (Twilio carrier call origination moderation) | |
# The idea is that many fan-in queues can enqueue at any rate, but | |
# dequeue needs to happen in a rate-controlled manner without allowing | |
# any individual input queue to starve other queues. | |
# http://en.wikipedia.org/wiki/Leaky_bucket (second sense, "This version is referred to here as the leaky bucket as a queue.") | |
# | |
# requires: | |
# redis 2.6+ | |
# redis-py>=2.7.0 | |
# anyjson | |
from uuid import uuid4 | |
from time import time | |
from anyjson import loads, dumps | |
import redis | |
# this is obviously dumb - change to take a param | |
# but it's good for single-file exposition. | |
c = redis.Redis() | |
class QueueEmpty(Exception): | |
pass | |
def define_rate_limits(arena, limits): | |
""" | |
{ queue_name -> allowed calls per second } | |
""" | |
c.hmset('rate_limits', dict([(("%s:%s" % (arena, k)), v) for k, v in limits.items()])) | |
def get_time(): | |
return int(time() * 1000) | |
def enqueue(arena, queue, payload): | |
enqueue_script(args=[arena, queue, dumps(payload)]) | |
def dequeue(arena, reservation_seconds=10): | |
txnid = uuid4().get_hex() | |
item = dequeue_script(args=[arena, get_time(), txnid, reservation_seconds]) | |
if item is None: | |
raise QueueEmpty() | |
else: | |
item = loads(item) | |
return item, txnid | |
def commit(txnid): | |
return commit_script(args=[txnid]) | |
def reap(): | |
return reap_script(args=[get_time()]) | |
enqueue_script = c.register_script(""" | |
-- e.g. evalsha <sha> 0 arena_namespace some_queue payload | |
if #ARGV < 3 then | |
return error("USAGE: arena queue payload") | |
end | |
local arena_name = ARGV[1] | |
local queue_name = ARGV[2] | |
local payload = ARGV[3] | |
local rate_queues = arena_name .. ":queues" | |
local queue_name = arena_name .. ":" .. queue_name | |
-- schedule the job. | |
local queue_length = redis.call('RPUSH', queue_name, payload) | |
if queue_length == 1 then | |
--new queue, so add to rate list with immediate start time. | |
redis.call('ZADD', rate_queues, 0, queue_name) | |
end | |
return queue_length | |
""") | |
dequeue_script = c.register_script(""" | |
-- e.g. evalsha <sha> 0 arena_namespace, time uuid() 10 | |
if #ARGV < 4 then | |
return error("USAGE: arena current_time txnid reservation_duration") | |
end | |
local arena_name = ARGV[1] | |
local current_time = ARGV[2] | |
local txnid = ARGV[3] | |
local reservation_duration = ARGV[4] | |
local rate_queues = arena_name .. ":queues" | |
while true do -- loop until there are no ready queues or we find an unempty queue. | |
-- pick a queue with a read time from the dawn of time until now, but only one queue. | |
local queue_names = redis.call("ZRANGEBYSCORE", rate_queues, 0, current_time, "LIMIT", 0, 1) | |
if #queue_names == 0 then | |
-- no queue is allowed to run yet. | |
return false | |
end | |
local queue_name = queue_names[1] | |
local item = redis.call("LPOP", queue_name) | |
if item == false then | |
--the queue was empty; remove it from the managed queues. | |
redis.call("ZREM", rate_queues, queue_name) | |
else | |
-- rate_limits is an assumed hash with calls allowed per queue per second. | |
-- for sharded throughput, set it to overall_limit / #shards | |
local rate_delay = (1000 / redis.call("HGET", "rate_limits", queue_name)) | |
-- reschedule the next check of this queue | |
redis.call("ZADD", rate_queues, current_time + rate_delay, queue_name) | |
-- add the item to in-flight transactions | |
redis.call("HMSET", "txn:" .. txnid, "data", item, "queue_name", queue_name) | |
-- and schedule transaction reaping if not committed in time. | |
redis.call("ZADD", "transactions", current_time + reservation_duration, txnid) | |
return item | |
end | |
end | |
""") | |
commit_script = c.register_script(""" | |
-- e.g. evalsha <sha> 0 previous_uuid | |
if #ARGV < 1 then | |
return error("USAGE: txnid") | |
end | |
local txnid = ARGV[1] | |
redis.call("DEL", "txn:" .. txnid) | |
return redis.call("ZREM", "transactions", txnid) | |
""") | |
reap_script = c.register_script(""" | |
-- e.g. evalsha <sha> time.time() | |
if #ARGV < 1 then | |
return error("USAGE: current_time") | |
end | |
local time = ARGV[1] | |
local txns = "transactions" | |
-- find expired transactions | |
local expired_txns = redis.call("ZRANGEBYSCORE", txns, 0, time) | |
for i, txnid in ipairs(expired_txns) do | |
local item_key = "txn:" .. txnid | |
local data, queue_name = unpack(redis.call("HMGET", item_key, "data", "queue_name")) | |
--add the txn back to the input queue | |
redis.call("LPUSH", queue_name, data) | |
-- remove the in-flight transaction | |
redis.call("DEL", item_key) | |
end | |
--remove the expired transactions from the reaping set. | |
return redis.call("ZREMRANGEBYSCORE", txns, 0, time) | |
""") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import redis_leaky_bucket | |
# crawl github at most 10/s, wikipedia 5/s | |
redis_leaky_bucket.define_rate_limits('site_crawl', {'github.com': 10, 'wikipedia.org': 5}) | |
redis_leaky_bucket.enqueue('site_crawl', 'wikipedia.org', "http://www.wikipedia.org") | |
redis_leaky_bucket.enqueue('site_crawl', 'github.com', "http://www.github.com") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# as many workers as you want. | |
import time, requests, lxml.html, urlparse | |
import redis_leaky_bucket | |
interesting_domains = set(['github.com', 'www.wikipedia.org']) | |
# in each worker; as many workers as you want: | |
while True: | |
try: | |
url, txnid = redis_leaky_bucket.dequeue('site_crawl', reservation_seconds=30) | |
except redis_leaky_bucket.QueueEmpty: | |
print "nothing to do, sleeping." | |
time.sleep(1) | |
continue | |
response = requests.get(url) | |
if response.status_code != 200: | |
redis_leaky_bucket.commit(txnid) | |
continue | |
tree = lxml.html.fromstring(response.content) | |
tree.make_links_absolute(url) | |
for elem, attr, val, _ in tree.iterlinks(): | |
if not (elem.tag == 'a' and attr == 'href'): | |
continue | |
domain_parts = urlparse.urlparse(val).netloc.split('.') | |
if domain_parts[0] == 'www': | |
domain_parts = domain_parts[1:] | |
domain = ".".join(domain_parts) | |
if domain in interesting_domains: | |
# really, you'd want a bloom filter or something to limit recrawls. | |
# see https://github.com/seomoz/pyreBloom | |
print "enqueue %s" % val | |
redis_leaky_bucket.enqueue('site_crawl', domain, val) | |
else: | |
print "skipping %s" % domain | |
redis_leaky_bucket.commit(txnid) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment