One of the things that I was sure must exist, but had, until this point, never found anywhere was a distributed time delay. That is, a way to add data to a bucket/queue/dataset/what have you, that would then allow for work to be done on that data after a pre-determined delay. A delay that was independent to each item (the ability for different delays on each item automatically allows for the same delay on each item)
When working in a single service, using a sleep command is often good enough, but when you have to wait longer than about a minute, you should be using a separate (ideally distributed) tool.
The reason for this, in my opinion, is that you should not trust your service to not be shut down in the time between the event starts and the event ends.
This gist explains, roughly, how to create a time delay queue in RabbitMQ (And thus AMQP in general) as well as includes code required to do so in terraform.
This work is built from/inspired by a talk by @adamralph called Where we're going... we don't need batch jobs
There is an existing extension to rabbitMQ that theoretically allows this already. The recommendation with that extension however, is to not delay more than a day or two, and it is not a supported extension, meaning it has some external difficulties installing and supporting etc.
The goal of this gist is to allow you to
Given the large number of bindings required for a time delay queue (about log(max_time_delay,2)^2 / 2), it is imperative that you construct this queue set using Infrastructure As Code in some form. Doing so by hand is only really doable for systems where the in-program delay would be acceptable.
The terraform should be a drop-in module, but this document should also explain how to construct the bindings in a way that you can duplicate the terraform in your IAC of choice.
The core of this design is a series of dead-letter-exchanges and time-to-live queues. There is one queue for each power of 2 greater than the maximum delay time. Combining these queues in the correct order (the binary representation of the desired delay) allows for delaying any message by any number of seconds. (A minor change would allow for delay for any number of milliseconds, however, the accuracy of this system is +/- 1 second, and so that precise a delay is not the best idea.)
Steps
- Messages are published to an ingest exchange.
- The ingest exchange sends the message to one of N time-delay queues
- The message sits in the time-delay queue until the message's time in the queue surpasses that queues TTL.
- The queue Dead-Letters the message to the matching time-delay-done exchange.
- This exchange, in the same manner as step 2, sends the message to the next required time-delay queue.
- Repeat steps 3-5 until the message has no more queues that it needs to be delayed in
- Send message to "done" queue to be consumed.
At a glance, the web of bindings is difficult to understand how to implement.
The First key is to examine each exchange independently. For every exchange attached to a queue delaying for N seconds, it only requires bindings to queues that would delay for Less than N seconds.
The second difficulty here is sending the message to exactly one queue. To solve this problem, one must understand that the next queue we wish the message to go to next is the queue with the largest delay. (as messages can only go to queues with shorter delays) Or in other words, a message should only go to a given queue if it would have gone to no queue with a larger time delay.
The method of creating these bindings is to use topic exchanges, and to encode the desired delay in the message's key as dot seperated binary digits. (IE 0.1.1.0.1
).
Then by using an assortment of single digit wildcards and full match wildcards, messages can be appropriately matched.
Keys are always in the form of a #
, 0 or more 0
, up to one 1
, and 0 or more *
, all characters seperated by .
.
To determine the keys that a message goes to, you take the number of now-important digits (1 fewer than the digit of the queue currently being used) and use that as the number of 1
,0
and *
. You then start with no 0
and add one for each time you skip a queue.
This produces the following keys from the 8.done
exchange
#.1.*.*
to send to the4
queue#.0.1.*
to send to the2
queue#.0.0.1
to send to the1
queue
The ingest exchange is mostly no different than the delay queue keys. The only thing is the length of the keys. To determine the length, you just need to make sure that the keys are able to send to the longest delay queue.
Should the queues be set up to have a max delay of 15 seconds, the following bindings would exist
#.1.*.*.*
to send to the8
queue#.0.1.*.*
to send to the4
queue#.0.0.1.*
to send to the2
queue#.0.0.0.1
to send to the1
queue#.0.0.0.0
to send directly to the done queue (this is optional and not included in the terraform, but should be done to make sure no messages are dropped silently)
(tho effectively no different, these bindings have been separated out from the next-step keys in the terraform for clarity.)
For the done keys, the message must have no more delays to go through. Thus it is all 0
s for the remaining digits, producing the following keys.
#.0.0.0
when in the8.done
exchange#.0.0
when in the4.done
exchange#.0
when in the2.done
exchange#
when in the1.done
exchange
See the following diagram for how the keys are all created.
sequenceDiagram
Ingest -->> Q8: #35;.1.*.*.*
Ingest -->> Q4: #35;.0.1.*.*
Ingest -->> Q2: #35;.0.0.1.*
Ingest -->> Q1: #35;.0.0.0.1
Q8 -->> Q4: #35;.1.*.*
Q8 -->> Q2: #35;.0.1.*
Q8 -->> Q1: #35;.0.0.1
Q8 -->> Done: #35;.0.0.0
Q4 -->> Q2: #35;.1.*
Q4 -->> Q1: #35;.0.1
Q4 -->> Done: #35;.0.0
Q2 -->> Q1: #35;.1
Q2 -->> Done: #35;.0
Q1 -->> Done: #35;