Skip to content

Instantly share code, notes, and snippets.

@Vanlightly
Created July 23, 2020 15:03
Show Gist options
  • Select an option

  • Save Vanlightly/87e25a7c162429bb4940eb124491fa05 to your computer and use it in GitHub Desktop.

Select an option

Save Vanlightly/87e25a7c162429bb4940eb124491fa05 to your computer and use it in GitHub Desktop.
------------------- MODULE rabbit_leaderless_rebalancing -------------------
EXTENDS TLC, Sequences, Integers, FiniteSets, Naturals
CONSTANTS Q, \* set of queues, e.g. { q1, q2, q3, q4, q5 }
A \* set of apps, e.g. { a1, a2, a3 }
(*
Models an algorithm that balances the consumption of group of Q queues over a group of
A applications, using the Single Active Consumer (SAC) feature of RabbitMQ.
RabbitMQ itself does not possess a Kafka-like consumer group functionality, it only has SAC,
but a group of consuming applications can take individual actions in order to achieve balanced
consumption.
The individual actions of consuming applications is to detect how many active consumers each has
and when calculate the ideal number of active consumers they should have. On having too many active consumers,
an application will "release" N queues which, in the real-wolrd, means it will cancel and then resubscribe
to N queues.
The SAC queue will then select the next consumer in its "subscriber queue". This means that rebalancing
can take sometime as there can be cascades of releases until balance is achieved.
Safety is assured because all consumers in the group always subscribe to all queues. The main issue
with this algorithm is its liveness properties and it is those properties that we are most concerned with
and likely the reason to not choose this algorithm at all.
Example:
Initial state:
- q1
- q2
State 1 (a1 subscribes to q1 and is made active):
- q1 -> a1
State 2 (a1 subscribes to q2 and is made active):
- q1 -> a1
- q2 -> a1
State 3 (a2 subscribes to q1, added to subscriber queue):
- q1 -> a1 [a2]
- q2 -> a1
State 4 (a2 subscribes to q2, added to subscriber queue):
- q1 -> a1 [a2]
- q2 -> a1 [a2]
State 5 (a1 releases q2):
- q1 -> a1 [a2]
- q2 -> - [a2]
State 6 (a2 made active on q2):
- q1 -> a1 [a2]
- q2 -> a2
Balance achieved
State 6 (a1 subscribes to q2, added to subscriber queue):
- q1 -> a1 [a2]
- q2 -> a2 [a1]
Deadlock (no more to do - steady state)
*)
VARIABLES subscriber_queue, \* the First Subscribe, First Active ordering used by SAC
active, \* the active consumer of each queue
app, \* a list of ids, required for determinism in releases
id \* for assigned each id
release_counter == 0
Init ==
/\ subscriber_queue = [q \in Q |-> <<>>]
/\ active = [q \in Q |-> 0]
/\ app = [a \in A |-> 0]
/\ id = 1
/\ TLCSet(release_counter, 0)
AppOrNone ==
A \union { 0 }
TypeOK ==
/\ subscriber_queue \in [Q -> Seq(A)]
/\ active \in [Q -> AppOrNone]
/\ app \in [A -> Nat]
/\ id \in Nat
StoppedApps ==
{ a \in A : app[a] = 0 }
StartedApps ==
{ a \in A : app[a] # 0 }
\* A stopped app starts and is assigned an id
Start ==
\E a \in StoppedApps :
\* enabling conditions
/\ app[a] = 0
\* actions
/\ app' = [app EXCEPT ![a] = id]
/\ id' = id + 1
/\ UNCHANGED << subscriber_queue, active >>
\* An app that has started subscribes to one queue in the group
Subscribe ==
\E a \in StartedApps :
\E q \in Q :
\* enabling conditions
/\ ~\E a1 \in DOMAIN subscriber_queue[q] : subscriber_queue[q][a1] = a
/\ active[q] # a
\* actions
/\ subscriber_queue' = [subscriber_queue EXCEPT ![q] = Append(@, a)]
/\ UNCHANGED << active, app, id >>
\* The number of active consumers the application (a) has
AppActiveCount(a) ==
Cardinality({ q \in Q : active[q] = a})
\* The position in the list of apps with active consumers, in reverse order, then by id
\* Required in order for each app to deterministically make the same decision about when to release a queue
Position(a) ==
IF AppActiveCount(a) = 0 THEN -1
ELSE
Cardinality({
a1 \in StartedApps :
LET a_active == AppActiveCount(a)
a1_active == AppActiveCount(a1)
IN
/\ a # a1
/\ a1_active > 0
/\ \/ a1_active >= a_active
\/ /\ a1_active = a_active
/\ app[a] < app[a1]
})
\* Calculates the ideal number of active consumers this application should have
IdealNumber(a) ==
LET queue_count == Cardinality(Q)
app_count == Cardinality(StartedApps)
IN
LET ideal == queue_count \div app_count
remainder == queue_count % app_count
position == Position(a)
IN
IF remainder = 0 THEN ideal
ELSE
IF remainder >= position + 1 THEN
ideal + 1
ELSE
ideal
\* Releases one queue if it has too many active consumers
Release ==
\E a \in StartedApps :
\E q \in Q :
\* enabling conditions
/\ active[q] = a
/\ AppActiveCount(a) > IdealNumber(a)
\* actions
/\ subscriber_queue' = [subscriber_queue EXCEPT ![q] = SelectSeq(@, LAMBDA a1: a1 # a)]
/\ TLCSet(release_counter,TLCGet(release_counter) + 1)
/\ \/ /\ active[q] = a
/\ active' = [active EXCEPT ![q] = 0]
\/ /\ active[q] # a
/\ UNCHANGED active
/\ UNCHANGED << app, id >>
\* The SAC queue assigns active status to the next consumer in the subscriber queue
MakeActive ==
\E a \in StartedApps :
\E q \in Q :
\* enabling conditions
/\ Cardinality(DOMAIN subscriber_queue[q]) > 0
/\ Head(subscriber_queue[q]) = a
/\ active[q] = 0
\* actions
/\ active' = [active EXCEPT ![q] = a]
/\ subscriber_queue' = [subscriber_queue EXCEPT ![q] = SelectSeq(@, LAMBDA a1: a1 # a)]
/\ UNCHANGED << app, id >>
Next ==
\/ Start
\/ Subscribe
\/ Release
\/ MakeActive
Inv == IF TLCGet("duration") > 60 THEN Print(TLCGet(release_counter), FALSE)
ELSE id \in Nat \* Some invariant of the system that we know holds to
\* lift Inv from constant to state level. TLC evaluates
\* constant expressions eagerly at startup. Alternatively,
\* run TLC with -Dtlc2.tool.impl.SpecProcessor.vetoed=Inv
\* TODO: Load TLC module override with level=1 (state).
=============================================================================
\* Modification History
\* Last modified Thu Jul 23 09:58:17 CEST 2020 by Jack
\* Created Tue Jul 21 17:02:05 CEST 2020 by Jack
@lemmy

lemmy commented Jul 23, 2020

Copy link
Copy Markdown

I personally prefer to collect behavior-level stats with an ordinary TLA+ variable, but with tlaplus/tlaplus@557c674

Init ==
   /\ ...
   /\ TLCSet(release_counter, [i \in 1..10 |-> 0]) \* single worker with num=10

Release ==
  /\ ...
  /\ TLCSet(release_counter, [ TLCGet(release_counter) EXCEPT ![TLCGet("diameter")] = @ + 1 ])

PostCondition ==
  Print(TLCGet(release_counter), TRUE)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment