Skip to content

Instantly share code, notes, and snippets.

@bernerdschaefer
Created August 30, 2012 14:45
Show Gist options
  • Save bernerdschaefer/3529920 to your computer and use it in GitHub Desktop.
Save bernerdschaefer/3529920 to your computer and use it in GitHub Desktop.
A priority queue backed by a binary heap
{BinaryHeap} = require "binary-heap"
# A non-blocking priority queue, backed by a binary heap. Supports optional
# timeouts for pop() calls.
#
# An example:
# queue = new Queue (x, y) -> x.priority < y.priority
#
# queue.pop 0, (item) -> console.log('worker 1:', item.body)
# queue.pop 0, (item) -> console.log('worker 2:', item.body)
# queue.pop 1000, (item, timedOut) ->
# if timedOut
# console.log('worker 3: timed out')
# else
# console.log('worker 3:', item.body)
# queue.pop 2000, (item, timedOut) ->
# if timedOut
# console.log('worker 4: timed out')
# else
# console.log('worker 4:', item.body)
#
# queue.push body: "item 1", priority: 10
# queue.push body: "item 2", priority: 1
# setTimeout (-> queue.push(body: "item 3", priority: 1)) , 1500
#
# The example will output:
# worker 1: item 1
# worker 2: item 2
# worker 3: timed out
# worker 4: item 3
#
class Queue
constructor: (compare) ->
@heap = new BinaryHeap(compare)
@listeners = []
push: (items...) ->
@heap.push(items...)
@notifyListeners()
# Timeout should be numeric, or null. Callback should accept two arguments:
# the first will be an item from the queue unless a timeout occurred. If
# that's the case, the second argument will be true.
#
# queue.pop 1000, (item, timedOut) ->
# return console.log("timed out") if timedOut
# console.log("got item from queue:", item)
pop: (timeout, callback) ->
if item = @heap.pop()
# There was an item already available on the queue, so we capture it and
# trigger the callback on the next tick.
process.nextTick ->
callback(item)
else if timeout > 0
success_callback = (item) ->
clearTimeout(_timeout)
callback(item)
timeout_callback = =>
index = @listeners.indexOf(success_callback)
return if index < 0
@listeners.splice(index, 1)
callback(null, true)
@listeners.push success_callback
_timeout = setTimeout(timeout_callback, timeout)
else
# No timeout, so push the callback onto the listener list directly.
@listeners.push callback
null
notifyListeners: ->
while @heap.list.length > 0 and callback = @listeners.shift()
item = @heap.pop()
process.nextTick ->
callback(item)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment