Created
August 30, 2012 14:45
-
-
Save bernerdschaefer/3529920 to your computer and use it in GitHub Desktop.
A priority queue backed by a binary heap
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{BinaryHeap} = require "binary-heap" | |
# A non-blocking priority queue, backed by a binary heap. Supports optional | |
# timeouts for pop() calls. | |
# | |
# An example: | |
# queue = new Queue (x, y) -> x.priority < y.priority | |
# | |
# queue.pop 0, (item) -> console.log('worker 1:', item.body) | |
# queue.pop 0, (item) -> console.log('worker 2:', item.body) | |
# queue.pop 1000, (item, timedOut) -> | |
# if timedOut | |
# console.log('worker 3: timed out') | |
# else | |
# console.log('worker 3:', item.body) | |
# queue.pop 2000, (item, timedOut) -> | |
# if timedOut | |
# console.log('worker 4: timed out') | |
# else | |
# console.log('worker 4:', item.body) | |
# | |
# queue.push body: "item 1", priority: 10 | |
# queue.push body: "item 2", priority: 1 | |
# setTimeout (-> queue.push(body: "item 3", priority: 1)) , 1500 | |
# | |
# The example will output: | |
# worker 1: item 1 | |
# worker 2: item 2 | |
# worker 3: timed out | |
# worker 4: item 3 | |
# | |
class Queue | |
constructor: (compare) -> | |
@heap = new BinaryHeap(compare) | |
@listeners = [] | |
push: (items...) -> | |
@heap.push(items...) | |
@notifyListeners() | |
# Timeout should be numeric, or null. Callback should accept two arguments: | |
# the first will be an item from the queue unless a timeout occurred. If | |
# that's the case, the second argument will be true. | |
# | |
# queue.pop 1000, (item, timedOut) -> | |
# return console.log("timed out") if timedOut | |
# console.log("got item from queue:", item) | |
pop: (timeout, callback) -> | |
if item = @heap.pop() | |
# There was an item already available on the queue, so we capture it and | |
# trigger the callback on the next tick. | |
process.nextTick -> | |
callback(item) | |
else if timeout > 0 | |
success_callback = (item) -> | |
clearTimeout(_timeout) | |
callback(item) | |
timeout_callback = => | |
index = @listeners.indexOf(success_callback) | |
return if index < 0 | |
@listeners.splice(index, 1) | |
callback(null, true) | |
@listeners.push success_callback | |
_timeout = setTimeout(timeout_callback, timeout) | |
else | |
# No timeout, so push the callback onto the listener list directly. | |
@listeners.push callback | |
null | |
notifyListeners: -> | |
while @heap.list.length > 0 and callback = @listeners.shift() | |
item = @heap.pop() | |
process.nextTick -> | |
callback(item) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment