Last active
November 9, 2022 13:44
-
-
Save ocadaruma/226c2f4e65cb97aba3610341903955b0 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
INIT Init | |
NEXT Next | |
CONSTANTS | |
NULL = NULL | |
Partitions = {1,2,3} | |
MaxPendingRecords = 2 | |
INVARIANTS | |
NoRecordsAddedMoreThanMaxPendingRecords | |
AssignedPartitionsAlwaysSubsetOfContexts | |
NonRevokedContextAlwaysSubsetOfAssignedPartitions |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
---- MODULE DecatonProcessorSpec ---- | |
EXTENDS FiniteSets, Sequences, Naturals | |
CONSTANTS | |
NULL, | |
Partitions, | |
MaxPendingRecords | |
VARIABLES | |
(* | |
* consumer's assigned partitions. | |
* partitions may be removed from this set as soon as invoking `onPartitionsRevoked` and | |
* may be added as soon as invoking `onPartitionsAssigned` | |
*) | |
assignedPartitions, | |
(* flag that indicates dynamic property reload is requested *) | |
reloadRequested, | |
(* Decaton's partition contexts *) | |
partitionContexts, | |
(* | |
* represents current ongoing rebalance that contains to-be-revoked partitions and | |
* to-be-assigned partitions. | |
* This is an abstraction of single rebalance round that neglects details like | |
* sending JoinGroup after revocation, assigning new partitions after got SyncGroup response,... etc. | |
* In EAGER rebalance, to-be-revoked partitions must always equal to assignedPartitions. | |
* In COOPERATIVE rebalance, to-be-revoked partitions might be empty and in this case, | |
* onPartitionRevoked callback will not be invoked. | |
*) | |
currentRebalance, | |
(* program counter *) | |
pc | |
(* Helpers *) | |
Max(a, b) == | |
IF a < b THEN b ELSE a | |
(* Init state *) | |
Init == | |
/\ pc = "L0" | |
/\ reloadRequested = FALSE | |
/\ assignedPartitions = Partitions | |
/\ partitionContexts = [p \in Partitions |-> [ | |
revoked |-> FALSE, | |
pendingRecords |-> 0, | |
paused |-> FALSE | |
]] | |
/\ currentRebalance = NULL | |
(* Actions *) | |
(* Can happen anytime from watcher thread *) | |
RequestPropertyReload == | |
/\ ~reloadRequested | |
/\ reloadRequested' = TRUE | |
/\ UNCHANGED <<pc, assignedPartitions, currentRebalance, partitionContexts>> | |
(* Can happen anytime when no rebalance is currently ongoing *) | |
InitiateRebalance == | |
/\ currentRebalance = NULL | |
/\ \E revokingPartitions \in (SUBSET assignedPartitions), | |
addingPartitions \in (SUBSET Partitions): | |
currentRebalance' = [ | |
revokingPartitions |-> revokingPartitions, | |
addingPartitions |-> addingPartitions] | |
/\ UNCHANGED <<pc, assignedPartitions, reloadRequested, partitionContexts>> | |
OnPartitionsRevoked == | |
/\ pc = "L0" | |
/\ pc' = "L1" | |
/\ IF currentRebalance /= NULL | |
THEN /\ partitionContexts' = [ | |
p \in (DOMAIN partitionContexts) |-> | |
[partitionContexts[p] EXCEPT!.revoked = p \in currentRebalance.revokingPartitions] | |
] | |
/\ assignedPartitions' = assignedPartitions \ currentRebalance.revokingPartitions | |
/\ UNCHANGED <<currentRebalance, reloadRequested>> | |
ELSE UNCHANGED <<assignedPartitions, partitionContexts, currentRebalance, reloadRequested>> | |
OnPartitionsAssigned == | |
/\ pc = "L1" | |
/\ pc' = "L2" | |
/\ IF currentRebalance /= NULL /\ | |
(* this condition is to ensure executing onPartitionAssigned only after *) | |
(* onPartitionRevoked is executed. *) | |
\A p \in currentRebalance.revokingPartitions: partitionContexts[p].revoked | |
THEN /\ partitionContexts' = [p \in (assignedPartitions \cup currentRebalance.addingPartitions) |-> | |
IF p \in (DOMAIN partitionContexts) | |
THEN [partitionContexts[p] EXCEPT!.revoked = FALSE] | |
ELSE [revoked |-> FALSE, pendingRecords |-> 0, paused |-> FALSE]] | |
/\ currentRebalance' = NULL | |
/\ assignedPartitions' = assignedPartitions \cup currentRebalance.addingPartitions | |
/\ UNCHANGED <<reloadRequested>> | |
ELSE UNCHANGED <<partitionContexts, reloadRequested, assignedPartitions, currentRebalance>> | |
AddRecords == | |
/\ pc = "L2" | |
/\ pc' = "L3" | |
/\ LET fetchablePartitions == {p \in assignedPartitions: ~partitionContexts[p].paused} | |
IN IF fetchablePartitions /= {} | |
THEN \E p \in fetchablePartitions: | |
/\ partitionContexts' = [partitionContexts EXCEPT![p] = [@ EXCEPT!.pendingRecords = @ + 1]] | |
/\ UNCHANGED <<assignedPartitions, currentRebalance, reloadRequested>> | |
ELSE | |
/\ UNCHANGED <<assignedPartitions, currentRebalance, partitionContexts, reloadRequested>> | |
UpdateHighWatermark == | |
/\ pc = "L3" | |
/\ pc' = "L4" | |
/\ \/ /\ \E p \in DOMAIN partitionContexts: | |
partitionContexts' = [partitionContexts EXCEPT![p] = [@ EXCEPT!.pendingRecords = Max(0, @ - 1)]] | |
/\ UNCHANGED <<assignedPartitions, currentRebalance, reloadRequested>> | |
\/ UNCHANGED <<assignedPartitions, currentRebalance, partitionContexts, reloadRequested>> | |
PausePartitions == | |
/\ pc = "L4" | |
/\ pc' = "L5" | |
/\ LET partitionsNeedPause == {p \in DOMAIN partitionContexts: | |
\/ reloadRequested (* If reload is requested, should pause all partitions *) | |
\/ ~partitionContexts[p].paused /\ partitionContexts[p].pendingRecords >= MaxPendingRecords} | |
IN IF partitionsNeedPause /= {} | |
THEN | |
/\ partitionContexts' = [p \in DOMAIN partitionContexts |-> | |
[partitionContexts[p] EXCEPT!.paused = | |
IF p \in partitionsNeedPause THEN TRUE ELSE @]] | |
/\ UNCHANGED <<assignedPartitions, currentRebalance, reloadRequested>> | |
ELSE UNCHANGED <<assignedPartitions, currentRebalance, partitionContexts, reloadRequested>> | |
ResumePartitions == | |
/\ pc = "L5" | |
/\ pc' = "L6" | |
/\ LET partitionsNeedResume == {p \in DOMAIN partitionContexts: | |
/\ ~reloadRequested (* To resume, reload should not be requested *) | |
/\ partitionContexts[p].paused /\ partitionContexts[p].pendingRecords < MaxPendingRecords} | |
IN IF partitionsNeedResume /= {} | |
THEN | |
/\ partitionContexts' = [p \in DOMAIN partitionContexts |-> | |
[partitionContexts[p] EXCEPT!.paused = | |
IF p \in partitionsNeedResume THEN FALSE ELSE @]] | |
/\ UNCHANGED <<assignedPartitions, currentRebalance, reloadRequested>> | |
ELSE UNCHANGED <<assignedPartitions, currentRebalance, partitionContexts, reloadRequested>> | |
HandlePropertyReload == | |
/\ pc = "L6" | |
/\ pc' = "L7" | |
/\ IF /\ reloadRequested | |
/\ \A p \in DOMAIN partitionContexts: partitionContexts[p].pendingRecords = 0 | |
THEN /\ reloadRequested' = FALSE | |
/\ UNCHANGED <<assignedPartitions, partitionContexts, currentRebalance>> | |
ELSE /\ UNCHANGED <<assignedPartitions, partitionContexts, currentRebalance, reloadRequested>> | |
CommitOffset == | |
/\ pc = "L7" | |
/\ pc' = "L0" | |
/\ UNCHANGED <<assignedPartitions, partitionContexts, currentRebalance, reloadRequested>> | |
Next == | |
\/ RequestPropertyReload | |
\/ InitiateRebalance | |
\/ OnPartitionsRevoked | |
\/ OnPartitionsAssigned | |
\/ AddRecords | |
\/ UpdateHighWatermark | |
\/ PausePartitions | |
\/ ResumePartitions | |
\/ HandlePropertyReload | |
\/ CommitOffset | |
(* Invariants *) | |
(* This is an important requirement of Decaton. (back pressuring) *) | |
NoRecordsAddedMoreThanMaxPendingRecords == | |
\A p \in DOMAIN partitionContexts: | |
partitionContexts[p].pendingRecords <= MaxPendingRecords | |
(* Since poll() could return any records from assigned partitions, *) | |
(* we alwasy should have corresponding context for assigned partitions *) | |
AssignedPartitionsAlwaysSubsetOfContexts == | |
assignedPartitions \subseteq DOMAIN partitionContexts | |
(* We will do pause/resume/offset-commit for non revoked partition contexts. *) | |
(* This requires that we never perform such operations against non assigned partitions *) | |
NonRevokedContextAlwaysSubsetOfAssignedPartitions == | |
{p \in DOMAIN partitionContexts: ~partitionContexts[p].revoked} \subseteq assignedPartitions | |
==== |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment