Skip to content

Instantly share code, notes, and snippets.

@ocadaruma
Last active November 9, 2022 13:44
Show Gist options
  • Save ocadaruma/226c2f4e65cb97aba3610341903955b0 to your computer and use it in GitHub Desktop.
Save ocadaruma/226c2f4e65cb97aba3610341903955b0 to your computer and use it in GitHub Desktop.
INIT Init
NEXT Next
CONSTANTS
NULL = NULL
Partitions = {1,2,3}
MaxPendingRecords = 2
INVARIANTS
NoRecordsAddedMoreThanMaxPendingRecords
AssignedPartitionsAlwaysSubsetOfContexts
NonRevokedContextAlwaysSubsetOfAssignedPartitions
---- MODULE DecatonProcessorSpec ----
EXTENDS FiniteSets, Sequences, Naturals
CONSTANTS
NULL,
Partitions,
MaxPendingRecords
VARIABLES
(*
* consumer's assigned partitions.
* partitions may be removed from this set as soon as invoking `onPartitionsRevoked` and
* may be added as soon as invoking `onPartitionsAssigned`
*)
assignedPartitions,
(* flag that indicates dynamic property reload is requested *)
reloadRequested,
(* Decaton's partition contexts *)
partitionContexts,
(*
* represents current ongoing rebalance that contains to-be-revoked partitions and
* to-be-assigned partitions.
* This is an abstraction of single rebalance round that neglects details like
* sending JoinGroup after revocation, assigning new partitions after got SyncGroup response,... etc.
* In EAGER rebalance, to-be-revoked partitions must always equal to assignedPartitions.
* In COOPERATIVE rebalance, to-be-revoked partitions might be empty and in this case,
* onPartitionRevoked callback will not be invoked.
*)
currentRebalance,
(* program counter *)
pc
(* Helpers *)
Max(a, b) ==
IF a < b THEN b ELSE a
(* Init state *)
Init ==
/\ pc = "L0"
/\ reloadRequested = FALSE
/\ assignedPartitions = Partitions
/\ partitionContexts = [p \in Partitions |-> [
revoked |-> FALSE,
pendingRecords |-> 0,
paused |-> FALSE
]]
/\ currentRebalance = NULL
(* Actions *)
(* Can happen anytime from watcher thread *)
RequestPropertyReload ==
/\ ~reloadRequested
/\ reloadRequested' = TRUE
/\ UNCHANGED <<pc, assignedPartitions, currentRebalance, partitionContexts>>
(* Can happen anytime when no rebalance is currently ongoing *)
InitiateRebalance ==
/\ currentRebalance = NULL
/\ \E revokingPartitions \in (SUBSET assignedPartitions),
addingPartitions \in (SUBSET Partitions):
currentRebalance' = [
revokingPartitions |-> revokingPartitions,
addingPartitions |-> addingPartitions]
/\ UNCHANGED <<pc, assignedPartitions, reloadRequested, partitionContexts>>
OnPartitionsRevoked ==
/\ pc = "L0"
/\ pc' = "L1"
/\ IF currentRebalance /= NULL
THEN /\ partitionContexts' = [
p \in (DOMAIN partitionContexts) |->
[partitionContexts[p] EXCEPT!.revoked = p \in currentRebalance.revokingPartitions]
]
/\ assignedPartitions' = assignedPartitions \ currentRebalance.revokingPartitions
/\ UNCHANGED <<currentRebalance, reloadRequested>>
ELSE UNCHANGED <<assignedPartitions, partitionContexts, currentRebalance, reloadRequested>>
OnPartitionsAssigned ==
/\ pc = "L1"
/\ pc' = "L2"
/\ IF currentRebalance /= NULL /\
(* this condition is to ensure executing onPartitionAssigned only after *)
(* onPartitionRevoked is executed. *)
\A p \in currentRebalance.revokingPartitions: partitionContexts[p].revoked
THEN /\ partitionContexts' = [p \in (assignedPartitions \cup currentRebalance.addingPartitions) |->
IF p \in (DOMAIN partitionContexts)
THEN [partitionContexts[p] EXCEPT!.revoked = FALSE]
ELSE [revoked |-> FALSE, pendingRecords |-> 0, paused |-> FALSE]]
/\ currentRebalance' = NULL
/\ assignedPartitions' = assignedPartitions \cup currentRebalance.addingPartitions
/\ UNCHANGED <<reloadRequested>>
ELSE UNCHANGED <<partitionContexts, reloadRequested, assignedPartitions, currentRebalance>>
AddRecords ==
/\ pc = "L2"
/\ pc' = "L3"
/\ LET fetchablePartitions == {p \in assignedPartitions: ~partitionContexts[p].paused}
IN IF fetchablePartitions /= {}
THEN \E p \in fetchablePartitions:
/\ partitionContexts' = [partitionContexts EXCEPT![p] = [@ EXCEPT!.pendingRecords = @ + 1]]
/\ UNCHANGED <<assignedPartitions, currentRebalance, reloadRequested>>
ELSE
/\ UNCHANGED <<assignedPartitions, currentRebalance, partitionContexts, reloadRequested>>
UpdateHighWatermark ==
/\ pc = "L3"
/\ pc' = "L4"
/\ \/ /\ \E p \in DOMAIN partitionContexts:
partitionContexts' = [partitionContexts EXCEPT![p] = [@ EXCEPT!.pendingRecords = Max(0, @ - 1)]]
/\ UNCHANGED <<assignedPartitions, currentRebalance, reloadRequested>>
\/ UNCHANGED <<assignedPartitions, currentRebalance, partitionContexts, reloadRequested>>
PausePartitions ==
/\ pc = "L4"
/\ pc' = "L5"
/\ LET partitionsNeedPause == {p \in DOMAIN partitionContexts:
\/ reloadRequested (* If reload is requested, should pause all partitions *)
\/ ~partitionContexts[p].paused /\ partitionContexts[p].pendingRecords >= MaxPendingRecords}
IN IF partitionsNeedPause /= {}
THEN
/\ partitionContexts' = [p \in DOMAIN partitionContexts |->
[partitionContexts[p] EXCEPT!.paused =
IF p \in partitionsNeedPause THEN TRUE ELSE @]]
/\ UNCHANGED <<assignedPartitions, currentRebalance, reloadRequested>>
ELSE UNCHANGED <<assignedPartitions, currentRebalance, partitionContexts, reloadRequested>>
ResumePartitions ==
/\ pc = "L5"
/\ pc' = "L6"
/\ LET partitionsNeedResume == {p \in DOMAIN partitionContexts:
/\ ~reloadRequested (* To resume, reload should not be requested *)
/\ partitionContexts[p].paused /\ partitionContexts[p].pendingRecords < MaxPendingRecords}
IN IF partitionsNeedResume /= {}
THEN
/\ partitionContexts' = [p \in DOMAIN partitionContexts |->
[partitionContexts[p] EXCEPT!.paused =
IF p \in partitionsNeedResume THEN FALSE ELSE @]]
/\ UNCHANGED <<assignedPartitions, currentRebalance, reloadRequested>>
ELSE UNCHANGED <<assignedPartitions, currentRebalance, partitionContexts, reloadRequested>>
HandlePropertyReload ==
/\ pc = "L6"
/\ pc' = "L7"
/\ IF /\ reloadRequested
/\ \A p \in DOMAIN partitionContexts: partitionContexts[p].pendingRecords = 0
THEN /\ reloadRequested' = FALSE
/\ UNCHANGED <<assignedPartitions, partitionContexts, currentRebalance>>
ELSE /\ UNCHANGED <<assignedPartitions, partitionContexts, currentRebalance, reloadRequested>>
CommitOffset ==
/\ pc = "L7"
/\ pc' = "L0"
/\ UNCHANGED <<assignedPartitions, partitionContexts, currentRebalance, reloadRequested>>
Next ==
\/ RequestPropertyReload
\/ InitiateRebalance
\/ OnPartitionsRevoked
\/ OnPartitionsAssigned
\/ AddRecords
\/ UpdateHighWatermark
\/ PausePartitions
\/ ResumePartitions
\/ HandlePropertyReload
\/ CommitOffset
(* Invariants *)
(* This is an important requirement of Decaton. (back pressuring) *)
NoRecordsAddedMoreThanMaxPendingRecords ==
\A p \in DOMAIN partitionContexts:
partitionContexts[p].pendingRecords <= MaxPendingRecords
(* Since poll() could return any records from assigned partitions, *)
(* we alwasy should have corresponding context for assigned partitions *)
AssignedPartitionsAlwaysSubsetOfContexts ==
assignedPartitions \subseteq DOMAIN partitionContexts
(* We will do pause/resume/offset-commit for non revoked partition contexts. *)
(* This requires that we never perform such operations against non assigned partitions *)
NonRevokedContextAlwaysSubsetOfAssignedPartitions ==
{p \in DOMAIN partitionContexts: ~partitionContexts[p].revoked} \subseteq assignedPartitions
====
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment