The reservation pattern is a pattern for dealing with message reissues. That is, when the sender of a message sends the same message twice. Because of the laws of distributed systems, it is impossible for a sender to know if their message was received with 100% confidence. Senders will often approach this problem by utilizing "at-least-once" delivery. In other words, they may send the same message more than once.
In order to ensure that the receiving component does not handle the duplicated messages more than once, they must be deduplicated. It is the receiver's responsibility to ensure idempotence because the sender cannot.
To employ the reservation pattern one should attempt to write a message to a
stream with an expected version of -1
, which signifies that the stream must
not exist at the time of writing. If the write succeeds, then it is guaranteed
that that message was the only one written and it is considered to be reserved.
If it fails due to an expected version error, then that message has already been
written and the error is ignored.
One may either copy the message to the reservation stream or write another message to reservation stream. The latter is used for command messages that are meant to initiate the stream, e.g. Open an account. Otherwise, the former pattern is employed to deduplicate command messages such as Withdraw from an account.
# Two consumers must be started, one for the command category and
# another for the reservation stream
# Handles someCategory:command-*
class SomeHandler
# ...
handle SomeCommand do |some_command|
# Copy the message, following the original
some_command = SomeCommand.follow(some_command)
# The command must provide something that can be used to deduplicate, which
# may either be something natural to the command or synthetic (i.e.
# generated by the sender)
some_idempotence_key = some_command.some_idempotence_key
# The stream name must be different than the currently consumed stream
stream_name = stream_name(some_idempotence_key, "someReservationCategory")
begin
# write.initial(...) is identical to write.(..., expected_version: -1)
write.initial(some_command, stream_name)
rescue MessageStore::ExpectedVersion::Error
# The message was already written, so this error is ignored
end
end
end
# Handles someReservationCategory-*
class SomeReservationHandler
# ...
handle SomeCommand do |some_command|
# Handle SomeCommand normally
end
end