Skip to content

Instantly share code, notes, and snippets.

@Vanlightly
Last active December 19, 2024 11:47
Show Gist options
  • Save Vanlightly/3683f6419b8504996a0adfba3959db70 to your computer and use it in GitHub Desktop.
Save Vanlightly/3683f6419b8504996a0adfba3959db70 to your computer and use it in GitHub Desktop.
Throwaway Kafka txns for TLA web
\* Model parameters
CONSTANTS
p1 = p1
TxnLogPartitions = {p1}
tp1 = tp1
TopicPartitions = {p1}
b1 = b1
b2 = b2
Brokers = {b1, b2}
c1 = c1
c2 = c2
Clients = {c1, c2}
tid1 = tid1
TransactionIds = {tid1}
MaxCoordinatorChanges = 1
\* Model values
CONSTANTS
InitPidRequest = InitPidRequest
InitPidResponse = InitPidResponse
AddPartitionsToTxnRequest = AddPartitionsToTxnRequest
AddPartitionsToTxnResponse = AddPartitionsToTxnResponse
Ready = Ready
SentInitPidRequest = SentInitPidRequest
HasPid = HasPid
BegunTxn = BegunTxn
Terminated = Terminated
Empty = Empty
Begin = Begin
PrepareCommit = PrepareCommit
PrepareAbort = PrepareAbort
CompleteAbort = CompleteAbort
CompleteCommit = CompleteCommit
Ongoing = Ongoing
PrepareEpochFence = PrepareEpochFence
Dead = Dead
Abort = Abort
Commit = Commit
IllegalState = IllegalState
OK = OK
NotCoordinator = NotCoordinator
ConcurrentTransactions = ConcurrentTransactions
ProducerFenced = ProducerFenced
InvalidTxnState = InvalidTxnState
InvalidTransition = InvalidTransition
NotSupportedYet = NotSupportedYet
InvalidProducerIdMapping = InvalidProducerIdMapping
None = None
INVARIANT
TypeOK
NoBadStateResponse
TestInv
SPECIFICATION Spec
------------------------- MODULE kafka_transactions_for_web -------------------------
EXTENDS FiniteSets, FiniteSetsExt, Sequences, Integers, TLC
(*
Stage 2 - The AddPartitionsToTxnRequest and response.
Also added:
1. The transaction coordinator of a partition can now move around.
2. The FindCoordinator request is now partially modeled, as a simpler
atomic lookup. It can still be stale as the TC can move from one
broker to another at any time.
3. Producer fencing, along with the associated txn abort. However,
the LSO and txn markers are not yet modeled.
Limitations:
1. Does not implement KIP-360 that allows a producer to send an InitPidRequest
with an existing pid and epoch.
3. Does not implement KIP-890.
Running:
1. Use the VS Code TLA+ extension.
2. Configure the model parameters in the cfg file.
3. Choose either liveness checking or not by commenting and uncommenting
sections in the cfg. See the cfg file.
4. You must use the -deadlock parameter as clients stop doing anything once
they have a PID, which TLC will interpret as a deadlock.
Example: -workers 4 -deadlock -fpmem 0.75
Says 4 dedicated threads, 75% of available memory, and a "deadlock" will not
trigger a counterexample.
*)
\* Model parameters
CONSTANTS TxnLogPartitions,
TopicPartitions,
Brokers,
Clients,
TransactionIds,
MaxCoordinatorChanges
\* Message types
CONSTANTS InitPidRequest,
InitPidResponse,
AddPartitionsToTxnRequest,
AddPartitionsToTxnResponse
\* Client states (not part of the protocol but used to control client behavior)
CONSTANTS Ready,
SentInitPidRequest,
HasPid,
BegunTxn,
Terminated \* See readme for discussion on this.
\* TxnStates (only Empty used so far)
CONSTANT Empty, Begin, PrepareCommit, PrepareAbort,
CompleteAbort, CompleteCommit, Ongoing,
PrepareEpochFence, Dead
\* TxnResults
CONSTANTS Abort, Commit
\* Errors (not all are used yet)
CONSTANTS IllegalState, OK, NotCoordinator,
ConcurrentTransactions, ProducerFenced,
InvalidTransition, NotSupportedYet,
InvalidProducerIdMapping
\* Other constants
CONSTANTS None
VARIABLES client \* a map of client_id -> client state
\* Transaction coordinator state, each is a map of broker_id -> some state
VARIABLES tc_tid_metadata, \* per TID txn state
tc_tid_transition, \* per TID txn transition state
tc_log, \* txn log partition_id -> the partition data
tc_log_metadata, \* txn log partition_id -> the partition metadata
tc_log_hwm \* txn log partition_id -> high watermark
\* Regular topic partitions
VARIABLES topic_partitions
\* KRaft controller metadata state
VARIABLES txn_log_leader, \* a map of txn log partition_id -> the leader aka the TC
txn_log_epoch, \* a map of txn log partition_id -> the partition epoch,
\* also known as coordinator epoch.
pid_source, \* a unique source of PIDs
t_to_p_mapping \* a mapping of TID to partition (static in this version)
\* i.e. partition leadership is static.
\* Auxilliary variables
VARIABLES aux_coord_ctr \* counts the number of coordinator changes
VARIABLES messages,
messages_discard
NetworkView == << messages >>
net_vars == << messages, messages_discard >>
Messages == messages
ProcessedMessages == messages_discard
NetworkInit ==
/\ messages = {}
/\ messages_discard = {}
\* ======================================================================
\* ----- Message passing ------------------------------------------------
\* Send the message whether it already exists or not.
\* If it does exist, the delivery count will go above 1 and
\* the message can be delivered multiple times.
SendFunc(m, msgs, deliver_count) ==
IF deliver_count > 0
THEN msgs \union {m}
ELSE msgs
\* Remove a message from the bag of messages. Used when a server is done
\* processing a message.
DiscardFunc(m, msgs) ==
msgs \ {m}
\* Send a message, without restriction
Send(m) ==
/\ messages' = SendFunc(m, messages, 1)
/\ UNCHANGED messages_discard
RECURSIVE SendAllFunc(_,_)
SendAllFunc(send_msgs, msgs) ==
IF send_msgs = {}
THEN msgs
ELSE LET m == CHOOSE m \in send_msgs : TRUE
new_msgs == SendFunc(m, msgs, 1)
remaining == send_msgs \ {m}
IN SendAllFunc(remaining, new_msgs)
SendAll(msgs) ==
/\ messages' = SendAllFunc(msgs, messages)
/\ UNCHANGED messages_discard
DiscardAndSendAll(d, msgs) ==
/\ messages' = SendAllFunc(msgs, DiscardFunc(d, messages))
/\ messages_discard' = messages_discard \union {d}
\* Set the delivery count to 0 so the message cannot be processed again.
Discard(d) ==
/\ messages' = DiscardFunc(d, messages)
/\ messages_discard' = messages_discard \union {d}
Drop(msgs) ==
/\ messages' = messages \ msgs
/\ messages_discard' = messages_discard \union msgs
\* Discard incoming message and reply with another
Reply(d, m) ==
/\ d \in messages
/\ messages' = SendFunc(m, DiscardFunc(d, messages), 1)
/\ messages_discard' = messages_discard \union {d}
tc_tid_vars == <<tc_tid_metadata, tc_tid_transition>>
tc_log_vars == << tc_log, tc_log_hwm, tc_log_metadata >>
txn_log_vars == << txn_log_leader, txn_log_epoch >>
aux_vars == <<pid_source, t_to_p_mapping, aux_coord_ctr>>
topic_vars == <<topic_partitions>>
vars == <<client, tc_tid_vars, tc_log_vars, txn_log_vars, topic_vars, aux_vars, net_vars>>
View == <<client, tc_tid_vars, tc_log_vars, txn_log_vars, topic_vars, pid_source, t_to_p_mapping, NetworkView>>
SymmetryClients == Permutations(Clients)
SymmetryBrokers == Permutations(Brokers)
SymmetryPartitions == Permutations(TxnLogPartitions)
SymmetryTids == Permutations(TransactionIds)
Symmetry == SymmetryClients
\union SymmetryBrokers
\union SymmetryPartitions
\union SymmetryTids
\* ----------------------------------------------
\* Types and state machine transitions
\* ----------------------------------------------
\* Valid previous txn states
ValidPrevTxnStates(state) ==
CASE state = Empty -> {None, Empty, CompleteCommit, CompleteAbort}
[] state = Ongoing -> {Ongoing, Empty, CompleteCommit, CompleteAbort}
[] state = PrepareCommit -> {Ongoing}
[] state = PrepareAbort -> {Ongoing, PrepareEpochFence}
[] state = CompleteCommit -> {PrepareCommit}
[] state = CompleteAbort -> {PrepareAbort}
[] state = Dead -> {Empty, CompleteCommit, CompleteAbort}
[] state = PrepareEpochFence -> {Ongoing}
[] OTHER -> {InvalidTransition}
InitPidRequestType ==
[type: {InitPidRequest},
tid: TransactionIds,
dest: Brokers,
source: Clients]
InitPidResponseType ==
[type: {InitPidResponse},
code: {OK, ConcurrentTransactions, NotCoordinator},
pid: Nat \union {-1},
pepoch: Nat \union {-1},
dest: Clients,
source: Brokers]
AddPartitionsToTxnRequestType ==
[type: {AddPartitionsToTxnRequest},
tid: TransactionIds,
pid: Nat,
pepoch: Nat,
partitions: SUBSET TopicPartitions,
dest: Brokers,
source: Clients]
AddPartitionsToTxnResponseType ==
[type: {AddPartitionsToTxnResponse},
code: {OK, ConcurrentTransactions, NotCoordinator,
ProducerFenced, InvalidProducerIdMapping},
partitions: SUBSET TopicPartitions,
dest: Clients,
source: Brokers]
MessageType ==
InitPidRequestType \union InitPidResponseType
\union AddPartitionsToTxnRequestType
\union AddPartitionsToTxnResponseType
\* Common helpers start --------------------
SetIllegalState ==
Send([code |-> IllegalState])
CurrentTC(p) == txn_log_leader[p]
\* Common helpers end --------------------
\* ----------------------------------------------
\* The client
\* ----------------------------------------------
\* Client helpers start --------------------
\* This is an atomic FindCoordinatorRequest implementation.
FindCoordinator(tid) ==
LET p == t_to_p_mapping[tid]
IN CurrentTC(p)
\* Some client helpers end --------------------
(* ---------------------------------------------------------------
ACTION: SendInitPidRequest
WHO: A client
A client sends an InitPidRequest to a broker. This spec does not
model the FindCoordinator request, it simply allows a client to
send an InitPidRequest to any broker. Given that the find coordinator
step can result in the wrong answer, this seems like a good shortcut
for keeping the specification as small as possible.
If the client has to TransactionId (tid) then one is non-deterministically
chosen, else its existing one is reused.
*)
SendInitPidRequest(c) ==
/\ client[c].state = Ready
/\ \E tid \in TransactionIds :
\* If this is a retry, then reuse the same tid, else use whichever.
/\ IF client[c].tid # None THEN tid = client[c].tid ELSE TRUE
/\ Send([type |-> InitPidRequest,
tid |-> tid,
dest |-> FindCoordinator(tid),
source |-> c])
/\ client' = [client EXCEPT ![c].tid = tid,
![c].state = SentInitPidRequest]
/\ UNCHANGED << tc_tid_vars, tc_log_vars, txn_log_vars, topic_vars, aux_vars >>
(* ---------------------------------------------------------------
ACTION: ReceiveInitPidResponse
WHO: A client
A client receives an InitPidResponse. If it is an OK response,
then it updates its pid and epoch, and transitions to the HasPid state.
These states are not part of the protocol, but used for implementing
the client as a state machine.
If the response is an error, then the client reverts back to Ready, where
it can retry the InitPidRequest.
*)
ReceiveInitPidResponse(c, b) ==
/\ client[c].state = SentInitPidRequest
/\ \E msg \in messages :
/\ msg.dest = c
/\ msg.source = b
/\ msg.type = InitPidResponse
/\ IF msg.code = OK
THEN client' = [client EXCEPT ![c].state = HasPid,
![c].tc = msg.source,
![c].pid = msg.pid,
![c].epoch = msg.pepoch]
ELSE client' = [client EXCEPT ![c].state = Ready,
![c].last_state = client[c].state,
![c].last_error = msg.code]
/\ Discard(msg)
/\ UNCHANGED << tc_tid_vars, tc_log_vars, txn_log_vars, topic_vars, aux_vars >>
(* ---------------------------------------------------------------
ACTION: SendAddPartitionsToTxnRequest
WHO: A client
TODO
*)
SendAddPartitionsToTxnRequest(c) ==
/\ client[c].state \in { HasPid, BegunTxn }
/\ client[c].pending_partitions = {}
/\ \E p \in TopicPartitions :
/\ p \notin client[c].partitions
/\ Send([type |-> AddPartitionsToTxnRequest,
tid |-> client[c].tid,
pid |-> client[c].pid,
pepoch |-> client[c].epoch,
partitions |-> {p}, \* one partition for now
dest |-> client[c].tc,
source |-> c])
/\ client' = [client EXCEPT ![c].pending_partitions = {p},
![c].state = BegunTxn]
/\ UNCHANGED << tc_tid_vars, tc_log_vars, txn_log_vars, topic_vars, aux_vars >>
(* ---------------------------------------------------------------
ACTION: ReceiveAddPartitionsToTxnResponse
WHO: A client
TODO
*)
ReceiveAddPartitionsToTxnResponse(c, b) ==
/\ client[c].state = BegunTxn
/\ \E msg \in messages :
/\ msg.dest = c
/\ msg.source = b
/\ msg.type = AddPartitionsToTxnResponse
/\ CASE msg.code = OK ->
client' = [client EXCEPT ![c].pending_partitions = @ \ msg.partitions,
![c].partitions = @ \union msg.partitions]
[] msg.code \in {ConcurrentTransactions, NotCoordinator} ->
\* Retriable error, so may be try again
client' = [client EXCEPT ![c].pending_partitions = {},
![c].last_state = client[c].state,
![c].last_error = msg.code,
![c].tc = IF msg.code = NotCoordinator
THEN FindCoordinator(client[c].tid)
ELSE client[c].tc]
[] OTHER ->
\* Some non-retriable error such as ProducerFenced. Go to the
\* Terminated state to prevent an infinite battle (and infinite
\* state space) with another client.
client' = [client EXCEPT ![c].state = Terminated,
![c].last_state = client[c].state,
![c].last_error = msg.code,
![c].pending_partitions = {},
![c].partitions = {}]
/\ Discard(msg)
/\ UNCHANGED << tc_tid_vars, tc_log_vars, txn_log_vars, topic_vars, aux_vars >>
\* ----------------------------------------------
\* The transaction coordinator actions
\* ----------------------------------------------
\* COMMON HELPERS START --------------------------------------
PartitionFor(tid) == t_to_p_mapping[tid]
PartitionMetadataOf(b, partition) == tc_log_metadata[b][partition]
PartitionMetadataOfTid(b, tid) == PartitionMetadataOf(b, PartitionFor(tid))
CurrentTransition(b, tid) ==
tc_tid_transition[b][tid]
IsPartitionLeader(b, p) ==
b = tc_log_metadata[b][p].leader
GetTxnMetadata(b, tid) ==
CASE ~IsPartitionLeader(b, PartitionFor(tid)) ->
[code |-> NotCoordinator]
[] tc_tid_metadata[b][tid] = None ->
[code |-> None]
[] OTHER->
[code |-> OK,
txn_metadata |-> tc_tid_metadata[b][tid],
cepoch |-> PartitionMetadataOfTid(b, tid).cepoch]
GetTransition(b, curr_transition, curr_state, new_state, pid,
new_pepoch, new_last_pepoch, new_partitions) ==
CASE
\* CASE 1 - Reject because an existing transition is currently being committed.
curr_transition # None ->
[code |-> ConcurrentTransactions]
\* CASE 2 - Accept as this is a valid transition
[] curr_state \in ValidPrevTxnStates(new_state) ->
\* Create a modified txn metadata transitioned to the new state
[code |-> OK,
transition |-> [pid |-> pid,
pepoch |-> new_pepoch,
last_pepoch |-> new_last_pepoch,
state |-> new_state,
partitions |-> new_partitions]]
\* CASE 3 - This shouldn't happen.
[] OTHER ->
\* Shouldn't get here
[code |-> InvalidTransition]
GetCompleteTransition(b, tid, txn_metadata) ==
LET next_state == IF txn_metadata.state = PrepareCommit
THEN CompleteCommit ELSE CompleteAbort
IN GetTransition(b, None, \* clear the current pending transition to avoid an error
txn_metadata.state, \* current state
next_state, \* transition to CompleteAbort or CompleteCommit
txn_metadata.pid, \* same pid (no exhaustion)
txn_metadata.pepoch, \* TODO: epoch bumping?
txn_metadata.last_pepoch,
txn_metadata.partitions) \* no partitions change
ClearTransition(b, tid) ==
tc_tid_transition' = [tc_tid_transition EXCEPT ![b][tid] = None]
AppendTxnLogEntry(b, p, tid, log, transition, callback) ==
/\ tc_log' = [log EXCEPT ![b][p] =
Append(@, [tid |-> tid,
transition |-> transition,
callback |-> callback])]
/\ tc_tid_transition' = [tc_tid_transition EXCEPT ![b][tid] = transition]
\* COMMON HELPERS END --------------------------------------
(* ---------------------------------------------------------------
ACTION: BecomeLeader
WHO: Transaction controller
TODO
*)
\* You would materialize txn state based on the log, but
\* that is quite ugly code, instead the simpler equivalent
\* is to copy the state of the original TC.
MaterializeState(tid) ==
LET p == PartitionFor(tid)
b == CurrentTC(p) \* TC in current state is the original
IN tc_tid_metadata[b][tid]
BecomeLeader(b, p) ==
/\ aux_coord_ctr < MaxCoordinatorChanges
/\ b # tc_log_metadata[b][p].leader
\* Bump the coordinator epoch of the partition leadership that is moving to
\* to this broker.
/\ LET cepoch == txn_log_epoch[p] + 1
IN
\* Controller elects the new leader and bump the txn log partition epoch
/\ txn_log_leader' = [txn_log_leader EXCEPT ![p] = b]
/\ txn_log_epoch' = [txn_log_epoch EXCEPT ![p] = cepoch]
\* The new leader updates its txn log partition metadata with the new epoch.
/\ tc_log_metadata' = [tc_log_metadata EXCEPT ![b][p] =
[cepoch |-> cepoch,
leader |-> b]]
\* Materialize the txn metadata stored by this txn log partition
/\ tc_tid_metadata' = [tc_tid_metadata EXCEPT ![b] =
[tid \in TransactionIds |->
IF PartitionFor(tid) = p
THEN MaterializeState(tid)
ELSE None]]
/\ aux_coord_ctr' = aux_coord_ctr + 1
/\ UNCHANGED <<tc_tid_transition, tc_log, tc_log_hwm, client,
topic_vars, pid_source, t_to_p_mapping, net_vars>>
(* ---------------------------------------------------------------
ACTION: CompletePartialTxn
WHO: Transaction controller
After becoming the TC for a partition, there may be tids that
had a committed PrepareAbort/Commit, but no CompleteAbort/Commit.
This action ensures that are initiated prepare phase gets completed.
*)
\*CompletePartialTxn(b, tid) ==
\* LET p == PartitionFor(tid)
\* txn_metadata == tc_tid_metadata[b][tid]
\* trans_result == GetCompleteTransition(b, tid, txn_metadata)
\* IN
\* /\ IsPartitionLeader(b, p)
\* /\ txn_metadata # None
\* /\ txn_metadata.state \in {PrepareAbort, PrepareCommit} \* Prepare in-progress
\* /\ tc_tid_transition[b][tid] = None \* but no current transition
\* /\ trans_result.code = OK
\* /\ AppendTxnLogEntry(b, p, tid, tc_log,
\* trans_result.transition,
\* [type |-> trans_result.transition.state,
\* response |-> None,
\* err |-> None])
\* /\ UNCHANGED <<tc_tid_metadata, tc_log_hwm, tc_log_metadata,
\* txn_log_vars, topic_vars, client, aux_vars, net_vars>>
\*
(* ---------------------------------------------------------------
ACTION: BecomeFollower
WHO: Transaction controller
TODO
*)
RemoveNoneResponses(responses) ==
{ r \in responses : r # None }
BecomeFollower(b, p) ==
/\ tc_log_metadata[b][p].cepoch < txn_log_epoch[p]
/\ b # txn_log_leader[p]
/\ LET first_uncommitted_offset == tc_log_hwm[b][p]+1
uncommitted_offsets == first_uncommitted_offset..Len(tc_log[b][p])
uncommitted_entries == { tc_log[b][p][offset] : offset \in uncommitted_offsets }
err_responses == { entry.callback.err : entry \in uncommitted_entries }
IN
\* Update local log state (truncates the log to match the leader)
/\ tc_log_metadata' = [tc_log_metadata EXCEPT ![b][p] =
[cepoch |-> txn_log_epoch[p],
leader |-> txn_log_leader[p]]]
/\ tc_log' = [tc_log EXCEPT ![b][p] = tc_log[CurrentTC(p)][p]]
/\ tc_log_hwm' = [tc_log_hwm EXCEPT ![b][p] = tc_log_hwm[CurrentTC(p)][p]]
\* Clear out txn state of the affected tids
/\ tc_tid_metadata' = [tc_tid_metadata EXCEPT ![b] =
[tid \in TransactionIds |->
IF PartitionFor(tid) = p
THEN None
ELSE tc_tid_metadata[b][tid]]]
\* Clear out txn pending transitions of the affected tids
/\ tc_tid_transition' = [tc_tid_transition EXCEPT ![b] =
[tid \in TransactionIds |->
IF PartitionFor(tid) = p
THEN None
ELSE tc_tid_transition[b][tid]]]
\* Send error responses for pending ops
/\ SendAll(RemoveNoneResponses(err_responses))
/\ UNCHANGED << tc_log_hwm, txn_log_vars, topic_vars, client, aux_vars>>
(* ---------------------------------------------------------------
ACTION: ReceiveInitPidRequest
WHO: Transaction controller
A TC receives an InitPidRequest.
- If the txn log partition for this tid does not belong to this TC
then it sends an InitPidResponse with the error NotCoordinatorForTransactionalId.
- If there is no existing metadata for a txn with this tid,
- Then new, empty metadata is created. When creating new metadata,
a unique ProducerId (pid) is generated with a producer epoch of -1.
- Else the existing txn metadata is used.
- If there is an in-progress transition (a prior transition was appended
to the txn log but it hasn't committed yet)
- Then the TC sends an InitPidResponse with the error ConcurrentTransactions.
- Else, a new transition is generated with:
- the Empty state
- the pid and incremented epoch
The TC appends this transition metadata to the txn log partition.
- Once the transition is committed to the txn log, the TC sends the
InitPidResponse to the client with the pid and incremented epoch.
*)
GetOrCreateTxnMetadata(b, tid) ==
LET cached_md == GetTxnMetadata(b, tid)
IN
IF cached_md.code = None THEN
\* Generate new metadata.
[code |-> OK,
txn_metadata |-> [pid |-> pid_source + 1,
last_pid |-> -1,
pepoch |-> -1,
last_pepoch |-> -1,
state |-> Empty,
partitions |-> {}],
cepoch |-> PartitionMetadataOfTid(b, tid).cepoch]
ELSE cached_md
GetInitPidTransition(b, tid, txn_metadata) ==
\* This is simple now, but lots more logic will get added here.
CASE txn_metadata.state \in { CompleteAbort, CompleteCommit, Empty } ->
GetTransition(b, CurrentTransition(b, tid),
txn_metadata.state, \* current state
Empty, \* transition to Empty
txn_metadata.pid, \* the pid (no exhaustion modeled)
txn_metadata.pepoch + 1, \* new pepoch (incremented)
txn_metadata.pepoch, \* last pepoch
{}) \* no partitions yet
[] txn_metadata.state = Ongoing ->
\* Abort the ongoing transaction first, by fencing the producer
GetTransition(b, CurrentTransition(b, tid),
txn_metadata.state, \* current state
PrepareEpochFence, \* transition to PrepareEpochFence
txn_metadata.pid, \* same pid (no exhaustion)
txn_metadata.pepoch + 1, \* bump the pepoch
-1, \* don't know why yet
txn_metadata.partitions) \* no partitions change
[] txn_metadata.state \in { PrepareAbort, PrepareCommit } ->
[code |-> ConcurrentTransactions]
[] OTHER ->
\* Shouldn't get here
[code |-> IllegalState]
MakePidResponse(b, c, code, pid, pepoch) ==
[type |-> InitPidResponse,
code |-> code,
pid |-> pid,
pepoch |-> pepoch,
dest |-> c,
source |-> b]
MakeErrorPidResponse(b, c, code) ==
MakePidResponse(b, c, code, -1, -1)
GetPrepareAbortOrCommitTransition(b, tid, curr_metadata, new_metadata, next_state) ==
GetTransition(b, CurrentTransition(b, tid),
curr_metadata.state, \* current state
next_state, \* transition to PrepareAbort or PrepareCommit
new_metadata.pid, \* same pid (no exhaustion)
new_metadata.pepoch, \* TODO: client support for bump pepoch?
new_metadata.last_pepoch,
curr_metadata.partitions) \* no partitions change
\* Transition to PrepareCommit or PrepareAbort
EndTransaction(msg, b, c, curr_metadata, new_metadata,
partition, txn_result, is_from_client) ==
CASE \/ (is_from_client /\ new_metadata.pepoch # curr_metadata.pepoch)
\/ new_metadata.pepoch < curr_metadata.pepoch ->
/\ Reply(msg, MakeErrorPidResponse(b, c, ProducerFenced))
/\ UNCHANGED << tc_tid_transition >>
[] curr_metadata.state = Ongoing ->
LET next_state == IF txn_result = Abort THEN PrepareAbort ELSE PrepareCommit
trans_result == IF next_state = PrepareAbort /\ new_metadata.state = PrepareEpochFence
THEN GetPrepareAbortOrCommitTransition(b, msg.tid, curr_metadata,
new_metadata, next_state)
ELSE None
IN /\ Discard(msg) \* We'll reply once the transition is complete
/\ AppendTxnLogEntry(b, partition, msg.tid, tc_log,
trans_result.transition,
[type |-> next_state,
response |-> MakePidResponse(b, c, ConcurrentTransactions, -1, -1),
err |-> MakeErrorPidResponse(b, c, NotCoordinator)])
[] OTHER ->
/\ Reply(msg, MakeErrorPidResponse(b, c, IllegalState))
/\ UNCHANGED << tc_tid_transition, tc_log >>
ReceiveInitPidRequest(b, c) ==
\E msg \in messages :
/\ msg.dest = b
/\ msg.source = c
/\ msg.type = InitPidRequest
/\ LET md_result == GetOrCreateTxnMetadata(b, msg.tid)
trans_result == GetInitPidTransition(b, msg.tid,
md_result.txn_metadata)
partition == PartitionFor(msg.tid)
IN
IF md_result.code # OK
THEN /\ Reply(msg, MakeErrorPidResponse(b, c, md_result.code))
/\ UNCHANGED <<pid_source, tc_tid_vars, tc_log_vars>>
ELSE /\ pid_source' = md_result.txn_metadata.pid
/\ tc_tid_metadata' = [tc_tid_metadata EXCEPT ![b][msg.tid] = md_result.txn_metadata]
/\ CASE
\* CASE 1 - Can't make a transition right now.
trans_result.code # OK ->
/\ Reply(msg, MakeErrorPidResponse(b, c, trans_result.code))
/\ UNCHANGED << tc_tid_transition, tc_log >>
\* CASE 2 - Need to fence the current producer and abort its txn
[] trans_result.transition.state = PrepareEpochFence ->
EndTransaction(msg, b, c,
md_result.txn_metadata,
trans_result.transition,
partition,
Abort, FALSE)
\* CASE 3 - All ok, write the transition to the local txn log partition
[] OTHER ->
/\ Discard(msg) \* We'll reply once the transition is complete
/\ AppendTxnLogEntry(b, partition, msg.tid, tc_log,
trans_result.transition,
[type |-> InitPidRequest,
response |-> MakePidResponse(b, c, OK,
trans_result.transition.pid,
trans_result.transition.pepoch),
err |-> MakeErrorPidResponse(b, c, NotCoordinator)])
/\ UNCHANGED << client, tc_log_hwm, tc_log_metadata, txn_log_vars,
topic_vars, t_to_p_mapping, aux_coord_ctr >>
(* ---------------------------------------------------------------
ACTION: ReceiveAddPartitionsToTxnRequest
WHO: Transaction controller
TODO
*)
MakeAddPartsResponse(b, c, code, partitions) ==
[type |-> AddPartitionsToTxnResponse,
code |-> code,
partitions |-> partitions,
dest |-> c,
source |-> b]
ReplyWithError(msg, b, c, error) ==
/\ Reply(msg, MakeAddPartsResponse(b, c, error, {}))
/\ UNCHANGED << tc_tid_transition, tc_log >>
GetAddPartitionsTransition(b, tid, txn_metadata, add_partitions) ==
GetTransition(b, CurrentTransition(b, tid),
txn_metadata.state, \* the current state
Ongoing, \* transition to ONGOING
txn_metadata.pid,
txn_metadata.pepoch,
txn_metadata.last_pepoch,
txn_metadata.partitions \union add_partitions)
ReceiveAddPartitionsToTxnRequest(b, c) ==
\E msg \in messages :
/\ msg.dest = b
/\ msg.source = c
/\ msg.type = AddPartitionsToTxnRequest
/\ LET md_result == GetTxnMetadata(b, msg.tid)
trans_result == GetAddPartitionsTransition(b, msg.tid,
md_result.txn_metadata,
msg.partitions)
partition == PartitionFor(msg.tid)
IN
CASE md_result.code = None ->
ReplyWithError(msg, b, c, InvalidProducerIdMapping)
[] md_result.code = NotCoordinator ->
ReplyWithError(msg, b, c, NotCoordinator)
[] msg.pid # md_result.txn_metadata.pid ->
ReplyWithError(msg, b, c, InvalidProducerIdMapping)
[] msg.pepoch # md_result.txn_metadata.pepoch ->
ReplyWithError(msg, b, c, ProducerFenced)
[] tc_tid_transition[b][msg.tid] # None ->
ReplyWithError(msg, b, c, ConcurrentTransactions)
[] md_result.txn_metadata.state \in {PrepareCommit, PrepareAbort} ->
ReplyWithError(msg, b, c, ConcurrentTransactions)
[] trans_result.code # OK ->
ReplyWithError(msg, b, c, trans_result.code)
[] OTHER ->
/\ Discard(msg) \* We'll reply once the transition is complete
/\ AppendTxnLogEntry(b, partition, msg.tid, tc_log,
trans_result.transition,
[type |-> AddPartitionsToTxnRequest,
response |-> MakeAddPartsResponse(b, c, OK, trans_result.transition.partitions),
err |-> MakeAddPartsResponse(b, c, NotCoordinator, {})])
/\ UNCHANGED <<tc_tid_metadata, tc_log_hwm, tc_log_metadata, txn_log_vars,
topic_vars, client, aux_vars>>
(* ---------------------------------------------------------------
ACTION: TxnLogAppendCommits
WHO: Transaction controller
A pending write to the txn log commits.
Note that NotCoordinator handling of pending log entries is handled
in BecomeFollower.
*)
AdvanceLogHwm(b, p, offset) ==
tc_log_hwm' = [bb \in Brokers |->
[tc_log_hwm[bb] EXCEPT ![p] = offset]]
CanCommit(b, p) ==
/\ b = CurrentTC(p)
/\ \A bb \in Brokers :
tc_log_metadata[bb][p].cepoch = tc_log_metadata[b][p].cepoch
LogAfterReplication(b, p, entry) ==
[bb \in Brokers |->
IF bb # b
THEN [tc_log[bb] EXCEPT ![p] = Append(@, entry)]
ELSE tc_log[b]]
SetTxnMetadata(b, tid, transition) ==
tc_tid_metadata' = [tc_tid_metadata EXCEPT ![b][tid] = transition]
HandlePrepareAbortOrCommit(b, p, log_after_rep, entry) ==
\* TODO: Check validations in TransactionMetadata.scala completeTransitionTo
\* Start a new transition to complete the commit/abort.
LET new_trans_result == GetCompleteTransition(b, entry.tid, entry.transition)
IN IF new_trans_result.code = OK
\* The *new* transition is ok so append it and send the response now.
\* Note we don't store a response for the new transition, as we already respond now.
THEN /\ AppendTxnLogEntry(b, p, entry.tid, log_after_rep,
new_trans_result.transition,
[type |-> new_trans_result.transition.state,
response |-> None,
err |-> None])
/\ Send(entry.callback.response)
\* The *new* transition is not ok so don't append it, and send the response with the updated error code
ELSE LET response == [entry.callback.response EXCEPT !.code = new_trans_result.code]
IN /\ tc_log' = log_after_rep
/\ ClearTransition(b, entry.tid)
/\ Send(response)
HandleCompleteAbortOrCommit(b, log_after_rep, entry) ==
\* TODO: Check validations in TransactionMetadata.scala completeTransitionTo
\* TODO: Advance the LSO (not modeled yet)
\* TODO: Write txn markers (not modeled yet)
/\ tc_log' = log_after_rep
/\ ClearTransition(b, entry.tid)
/\ UNCHANGED <<messages, messages_discard>>
HandleInitPid(b, log_after_rep, entry) ==
\* TODO: Check validations in TransactionMetadata.scala completeTransitionTo
/\ tc_log' = log_after_rep
/\ ClearTransition(b, entry.tid)
/\ Send(entry.callback.response)
HandleAddPartitions(b, p, log_after_rep, entry) ==
\* TODO: Check validations in TransactionMetadata.scala completeTransitionTo
/\ tc_log' = log_after_rep
/\ ClearTransition(b, entry.tid)
/\ Send(entry.callback.response)
CommitTxnLogAppend(b, p) ==
/\ tc_log_hwm[b][p] < Len(tc_log[b][p])
/\ LET next_offset == tc_log_hwm[b][p] + 1
entry == tc_log[b][p][next_offset]
log_after_rep == LogAfterReplication(b, p, entry)
IN /\ CanCommit(b, p)
/\ AdvanceLogHwm(b, p, next_offset)
/\ SetTxnMetadata(b, entry.tid, entry.transition)
/\ CASE entry.callback.type \in {PrepareAbort, PrepareCommit } ->
HandlePrepareAbortOrCommit(b, p, log_after_rep, entry)
[] entry.callback.type \in {CompleteAbort, CompleteCommit } ->
HandleCompleteAbortOrCommit(b, log_after_rep, entry)
[] entry.callback.type = InitPidRequest ->
HandleInitPid(b, log_after_rep, entry)
[] entry.callback.type = AddPartitionsToTxnRequest ->
HandleAddPartitions(b, p, log_after_rep, entry)
[] OTHER -> SetIllegalState
/\ UNCHANGED << client, txn_log_vars, tc_log_metadata, topic_vars, aux_vars >>
\* ----------------------------------------------
\* Invariants
\* ----------------------------------------------
\* Currently only checks that messages are valid.
TypeOK ==
/\ \A m \in messages :
m \in MessageType
\* Catch any IllegalState or InvalidTransition
NoBadStateResponse ==
~\E msg \in messages :
\/ /\ msg.type = InitPidResponse
/\ msg.code \in {IllegalState, InvalidTransition}
\* Used for debugging
TestInv ==
TRUE
\* ~\E m \in messages :
\* /\ m.type = AddPartitionsToTxnResponse
\* /\ m.code = ProducerFenced
\* ----------------------------------------------
\* Liveness properties
\* ----------------------------------------------
\* Eventually all clients will receive a pid, even if there
\* are multiple clients and one tid. This is because:
\* 1. When a client receives an error InitPidResponse, it always sends
\* a new request. The fairness states that ultimately, it will send
\* a request to the right broker, eventually.
\* 2. When a client receives a success InitPidResponse, it does nothing
\* further.
\* Given multiple clients, multiple brokers, and one tid, in the end,
\* we expect the pepoch of that tid to reach the number of clients.
EventuallyAllClientsGetPid ==
<>[](\A c \in Clients : client[c].pid > -1)
EventuallyOneClientAddsAllPartitions ==
<>[](\E c \in Clients : /\ client[c].state = BegunTxn
/\ \A tp \in TopicPartitions :
tp \in client[c].partitions)
\* ----------------------------------------------
\* Init and Next
\* ----------------------------------------------
Next ==
\/ \E c \in Clients : SendInitPidRequest(c)
\/ \E c \in Clients, b \in Brokers : ReceiveInitPidRequest(b, c)
\/ \E c \in Clients, b \in Brokers : ReceiveInitPidResponse(c, b)
\/ \E c \in Clients : SendAddPartitionsToTxnRequest(c)
\/ \E c \in Clients, b \in Brokers : ReceiveAddPartitionsToTxnRequest(b, c)
\/ \E c \in Clients, b \in Brokers : ReceiveAddPartitionsToTxnResponse(c, b)
\/ \E b \in Brokers, p \in TxnLogPartitions: CommitTxnLogAppend(b, p)
\/ \E b \in Brokers, p \in TxnLogPartitions: BecomeLeader(b, p)
\/ \E b \in Brokers, p \in TxnLogPartitions: BecomeFollower(b, p)
\* \/ \E b \in Brokers, tid \in TransactionIds :
\* CompletePartialTxn(b, tid)
EmptyMap == [x \in {} |-> None]
EmptyTxnState == [tid \in TransactionIds |-> None]
BalancedTidToPartSpread(mapping) ==
\* Ensure the tid -> txn log partition mapping is evenly distributed.
\A p1, p2 \in TxnLogPartitions :
Quantify(DOMAIN mapping, LAMBDA tid : mapping[tid] = p1)
- Quantify(DOMAIN mapping, LAMBDA tid : mapping[tid] = p2) \in {-1, 0, 1}
BalancedPartitionLeadership(part_leader) ==
\A br1, br2 \in Brokers :
LET br1_parts == {p \in TxnLogPartitions : part_leader[p] = br1}
br2_parts == {p \in TxnLogPartitions : part_leader[p] = br2}
IN Cardinality(br1_parts) - Cardinality(br2_parts) \in {-1, 0, 1}
Init ==
LET tid_to_part_mapping == CHOOSE mapping \in [TransactionIds -> TxnLogPartitions] :
BalancedTidToPartSpread(mapping)
log_part_leader == CHOOSE mapping \in [TxnLogPartitions -> Brokers] :
BalancedPartitionLeadership(mapping)
IN
/\ client = [c \in Clients |->
[state |-> Ready,
tc |-> None,
tid |-> None,
pid |-> -1,
epoch |-> -1,
last_state |-> None,
last_error |-> None,
pending_partitions |-> {},
partitions |-> {}]]
/\ tc_tid_metadata = [b \in Brokers |-> [tid \in TransactionIds |-> None]]
/\ tc_tid_transition = [b \in Brokers |-> [tid \in TransactionIds |-> None]]
/\ tc_log = [b \in Brokers |->
[p \in TxnLogPartitions |-> <<>>]]
/\ tc_log_hwm = [b \in Brokers |->
[p \in TxnLogPartitions |-> 0]]
/\ tc_log_metadata = [b \in Brokers |->
[p \in TxnLogPartitions |->
[cepoch |-> 1,
leader |-> log_part_leader[p]]]]
/\ txn_log_epoch = [p \in TxnLogPartitions |-> 1]
/\ txn_log_leader = [p \in TxnLogPartitions |-> log_part_leader[p]]
/\ topic_partitions = [tp \in TopicPartitions |-> <<>>]
/\ t_to_p_mapping = tid_to_part_mapping
/\ pid_source = 0
/\ aux_coord_ctr = 0
/\ NetworkInit
\* Note that:
\* 1. SendInitPidRequest requires strong fairness because
\* sending to another broker will disable the action. So we need
\* fairness that applies to a state that is enabled infinitely often.
\* 2. SendAddPartitionsToTxnRequest is strongly fair right now, but I
\* may change that. More of a reminder to self.
Fairness ==
/\ \A c \in Clients :
/\ SF_vars(SendInitPidRequest(c))
/\ \A b \in Brokers :
/\ WF_vars(ReceiveInitPidRequest(b, c))
/\ WF_vars(ReceiveInitPidResponse(c, b))
/\ SF_vars(SendAddPartitionsToTxnRequest(c))
/\ \A b \in Brokers :
/\ WF_vars(ReceiveAddPartitionsToTxnResponse(c, b))
/\ WF_vars(ReceiveAddPartitionsToTxnRequest(b, c))
/\ \A b \in Brokers, p \in TxnLogPartitions:
/\ WF_vars(CommitTxnLogAppend(b, p))
/\ WF_vars(BecomeFollower(b, p))
\* /\ \A b \in Brokers, tid \in TransactionIds :
\* WF_vars(CompletePartialTxn(b, tid))
Spec == Init /\ [][Next]_vars
LivenessSpec == Init /\ [][Next]_vars /\ Fairness
=============================================================================
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment