Last active
December 19, 2015 22:59
-
-
Save gshutler/6031728 to your computer and use it in GitHub Desktop.
Prioritised queue consumption with multiple queues
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
queues = [] | |
queues << queue_consumer("high") | |
queues << queue_consumer("normal") | |
def next_message | |
# look for a message from each queue in priority order | |
queues.each do |queue| | |
found, message = queue.non_blocking_dequeue | |
return [found, message] if found | |
end | |
# block on the highest priority queue for a while to avoid pounding | |
# what is a sensible time depends on your case | |
queues.first.blocking_dequeue(500ms) | |
end | |
while true do # make this a changeable flag to stop consuming gracefully | |
found, message = next_message | |
if found | |
process_message(message) | |
ack_message(message) | |
end | |
end |
queue
was me failing, as was nack
-ing rather than ack
-ing. I've edited the example.
Yes, "low" is only dequeued when there is no "high". However, <5% of our messages are "high" so they don't starve the "low". Much like backlog priority, if everything's high priority, there is no priority.
We actually have 3 priority queues:
- High (5%) - do as soon as possible as someone is actively waiting for the result
- Normal (80%) - the default level
- Background (15%) - housekeeping-type tasks that can cause contention but can happen whenever resources are free
Another thought, if contention isn't your reason for prioritising (so you can have multiple consumers), you need to be careful of the prefetch configuration. Otherwise one consumer can prefetch all the "high" tasks whilst all the others work on the remaining "low" tasks.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thanks Gary. great example.
So, your 'low' queue only gets dequeued when there are no 'high' messages? A constant stream of high messages will effectively lock out the low message consumer.
I know Ruby at all, but isn't the queue variable on line 14 out of scope?
Aside: why are you nacking rather than acking messages?
Interesting that people do different things when they talk about priority. Some do as you do, and always process high priority messages first. Other people want some kind of weighting, so that, say, 5 high priority messages get processed for each low one. Trying to devise some kind of generic strategy for EasyNetQ is quite a headache.