Created
May 25, 2011 06:47
-
-
Save samsonjs/990471 to your computer and use it in GitHub Desktop.
Basic rate limiting queue for node
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//// Usage | |
var queue = createMessageQueue({ messagesPerMinute: 60 }) | |
queue.on('message', function(msg) { | |
console.log('message: ' + JSON.stringify(msg, null, 2)) | |
}) | |
queue.enqueue({ to: 'avian', from: 'sjs', body: 'cool story bro' }) | |
//// Implementation | |
// The simplest thing I could think of. A rolling tally of # messages sent | |
// each second, spanning one minute total, stored in a queue. Every second | |
// we dequeue the oldest count and push a count of 0 which is then updated | |
// as messages come in. The total in the queue at any given time must be | |
// less than the specified maximum number of messages to send per minute. | |
// The default is a maximum of 20 messages per minute. | |
// | |
// When that limit is reached messages will be queued until the queue | |
// reaches the maximum specified size, after which messages are ignored. By | |
// default up to 10 messages will be queued. | |
// | |
// There is a volume knob that goes from 0 to 10 where 0 is mute and 10 is | |
// maximum chattiness. You can retrieve and set the volume so an external | |
// algorithm could adjust the volume as necessary. | |
var EventEmitter = require('events').EventEmitter | |
// a couple of utils | |
function mixin(a, b) { | |
for (var k in b) a[k] = b[k] | |
} | |
function clamp(n, min, max) { | |
if (n > max) return max | |
if (n < min) return min | |
return n | |
} | |
// the good stuff | |
var MaxVolume = 10 | |
, DefaultOptions = { messagesPerMinute: 20 // send at most 1 message every N minutes | |
, queueSize: 10 // queue up to N messages before ignoring new ones | |
} | |
function createMessageQueue(options) { | |
options = mixin(options || {}, DefaultOptions) | |
var absoluteMaxMessagesPerMinute = options.messagesPerMinute | |
, maxQueueSize = options.queueSize | |
, currentMaxMessagesPerMinute = absoluteMaxMessagesPerMinute | |
, messagesSentThisMinute = 0 | |
, messagesSentEachSecond = [0] // flows left -> right. unshift is enqueue, pop is dequeue. | |
, queue = [] | |
, limited = false | |
, volume = MaxVolume | |
, preMuteVolume = volume // used to restore the volume on unmute | |
function consume() { | |
if (self.isEmpty()) return | |
self.emit('message', queue.pop()) | |
messagesSentEachSecond[0] += 1 | |
messagesSentThisMinute += 1 | |
limit() | |
} | |
function limit() { | |
currentMaxMessagesPerMinute = Math.round((volume / MaxVolume) * absoluteMaxMessagesPerMinute) | |
limited = messagesSentThisMinute >= currentMaxMessagesPerMinute | |
if (!limited) consume() | |
} | |
var self = { | |
enqueue: function(msg) { | |
if (self.isFull()) return false | |
queue.push(msg) | |
if (!limited) consume() | |
return true | |
} | |
, isEmpty: function() { | |
return queue.length === 0 | |
} | |
, isFull: function() { | |
return queue.length >= maxQueueSize | |
} | |
, mute: function() { | |
if (volume > 0) { | |
preMuteVolume = volume | |
volume = 0 | |
} | |
} | |
, start: function() { | |
if (self.timeout) throw 'queue already started' | |
limited = false | |
volume = 10 | |
messagesSentThisMinute = 0 | |
messagesSentEachSecond = [0] | |
self.timeout = setTimeout(function() { | |
if (messagesSentEachSecond.length === 60) { | |
messagesSentThisMinute -= messagesSentEachSecond.pop() | |
} | |
messagesSentEachSecond.unshift(0) | |
limit() | |
}, 1000) | |
} | |
, stop: function() { | |
if (!self.timeout) return | |
clearTimeout(self.timeout) | |
delete self.timeout | |
queue = [] | |
messagesSentEachSecond = null | |
} | |
, volume: function(newVolume) { | |
if (typeof newVolume !== 'undefined') { | |
var n = +newVolume | |
if (typeof n !== 'number' || isNaN(n)) throw 'volume does not go to 11' | |
volume = clamp(n, 0, MaxVolume) | |
} | |
else { | |
return volume | |
} | |
} | |
, unmute: function() { | |
if (volume === 0) volume = preMuteVolume | |
} | |
} | |
EventEmitter.call(self) | |
self.start() | |
return self | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment