Last active
December 19, 2024 11:47
-
-
Save Vanlightly/3683f6419b8504996a0adfba3959db70 to your computer and use it in GitHub Desktop.
Throwaway Kafka txns for TLA web
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
\* Model parameters | |
CONSTANTS | |
p1 = p1 | |
TxnLogPartitions = {p1} | |
tp1 = tp1 | |
TopicPartitions = {p1} | |
b1 = b1 | |
b2 = b2 | |
Brokers = {b1, b2} | |
c1 = c1 | |
c2 = c2 | |
Clients = {c1, c2} | |
tid1 = tid1 | |
TransactionIds = {tid1} | |
MaxCoordinatorChanges = 1 | |
\* Model values | |
CONSTANTS | |
InitPidRequest = InitPidRequest | |
InitPidResponse = InitPidResponse | |
AddPartitionsToTxnRequest = AddPartitionsToTxnRequest | |
AddPartitionsToTxnResponse = AddPartitionsToTxnResponse | |
Ready = Ready | |
SentInitPidRequest = SentInitPidRequest | |
HasPid = HasPid | |
BegunTxn = BegunTxn | |
Terminated = Terminated | |
Empty = Empty | |
Begin = Begin | |
PrepareCommit = PrepareCommit | |
PrepareAbort = PrepareAbort | |
CompleteAbort = CompleteAbort | |
CompleteCommit = CompleteCommit | |
Ongoing = Ongoing | |
PrepareEpochFence = PrepareEpochFence | |
Dead = Dead | |
Abort = Abort | |
Commit = Commit | |
IllegalState = IllegalState | |
OK = OK | |
NotCoordinator = NotCoordinator | |
ConcurrentTransactions = ConcurrentTransactions | |
ProducerFenced = ProducerFenced | |
InvalidTxnState = InvalidTxnState | |
InvalidTransition = InvalidTransition | |
NotSupportedYet = NotSupportedYet | |
InvalidProducerIdMapping = InvalidProducerIdMapping | |
None = None | |
INVARIANT | |
TypeOK | |
NoBadStateResponse | |
TestInv | |
SPECIFICATION Spec |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
------------------------- MODULE kafka_transactions_for_web ------------------------- | |
EXTENDS FiniteSets, FiniteSetsExt, Sequences, Integers, TLC | |
(* | |
Stage 2 - The AddPartitionsToTxnRequest and response. | |
Also added: | |
1. The transaction coordinator of a partition can now move around. | |
2. The FindCoordinator request is now partially modeled, as a simpler | |
atomic lookup. It can still be stale as the TC can move from one | |
broker to another at any time. | |
3. Producer fencing, along with the associated txn abort. However, | |
the LSO and txn markers are not yet modeled. | |
Limitations: | |
1. Does not implement KIP-360 that allows a producer to send an InitPidRequest | |
with an existing pid and epoch. | |
3. Does not implement KIP-890. | |
Running: | |
1. Use the VS Code TLA+ extension. | |
2. Configure the model parameters in the cfg file. | |
3. Choose either liveness checking or not by commenting and uncommenting | |
sections in the cfg. See the cfg file. | |
4. You must use the -deadlock parameter as clients stop doing anything once | |
they have a PID, which TLC will interpret as a deadlock. | |
Example: -workers 4 -deadlock -fpmem 0.75 | |
Says 4 dedicated threads, 75% of available memory, and a "deadlock" will not | |
trigger a counterexample. | |
*) | |
\* Model parameters | |
CONSTANTS TxnLogPartitions, | |
TopicPartitions, | |
Brokers, | |
Clients, | |
TransactionIds, | |
MaxCoordinatorChanges | |
\* Message types | |
CONSTANTS InitPidRequest, | |
InitPidResponse, | |
AddPartitionsToTxnRequest, | |
AddPartitionsToTxnResponse | |
\* Client states (not part of the protocol but used to control client behavior) | |
CONSTANTS Ready, | |
SentInitPidRequest, | |
HasPid, | |
BegunTxn, | |
Terminated \* See readme for discussion on this. | |
\* TxnStates (only Empty used so far) | |
CONSTANT Empty, Begin, PrepareCommit, PrepareAbort, | |
CompleteAbort, CompleteCommit, Ongoing, | |
PrepareEpochFence, Dead | |
\* TxnResults | |
CONSTANTS Abort, Commit | |
\* Errors (not all are used yet) | |
CONSTANTS IllegalState, OK, NotCoordinator, | |
ConcurrentTransactions, ProducerFenced, | |
InvalidTransition, NotSupportedYet, | |
InvalidProducerIdMapping | |
\* Other constants | |
CONSTANTS None | |
VARIABLES client \* a map of client_id -> client state | |
\* Transaction coordinator state, each is a map of broker_id -> some state | |
VARIABLES tc_tid_metadata, \* per TID txn state | |
tc_tid_transition, \* per TID txn transition state | |
tc_log, \* txn log partition_id -> the partition data | |
tc_log_metadata, \* txn log partition_id -> the partition metadata | |
tc_log_hwm \* txn log partition_id -> high watermark | |
\* Regular topic partitions | |
VARIABLES topic_partitions | |
\* KRaft controller metadata state | |
VARIABLES txn_log_leader, \* a map of txn log partition_id -> the leader aka the TC | |
txn_log_epoch, \* a map of txn log partition_id -> the partition epoch, | |
\* also known as coordinator epoch. | |
pid_source, \* a unique source of PIDs | |
t_to_p_mapping \* a mapping of TID to partition (static in this version) | |
\* i.e. partition leadership is static. | |
\* Auxilliary variables | |
VARIABLES aux_coord_ctr \* counts the number of coordinator changes | |
VARIABLES messages, | |
messages_discard | |
NetworkView == << messages >> | |
net_vars == << messages, messages_discard >> | |
Messages == messages | |
ProcessedMessages == messages_discard | |
NetworkInit == | |
/\ messages = {} | |
/\ messages_discard = {} | |
\* ====================================================================== | |
\* ----- Message passing ------------------------------------------------ | |
\* Send the message whether it already exists or not. | |
\* If it does exist, the delivery count will go above 1 and | |
\* the message can be delivered multiple times. | |
SendFunc(m, msgs, deliver_count) == | |
IF deliver_count > 0 | |
THEN msgs \union {m} | |
ELSE msgs | |
\* Remove a message from the bag of messages. Used when a server is done | |
\* processing a message. | |
DiscardFunc(m, msgs) == | |
msgs \ {m} | |
\* Send a message, without restriction | |
Send(m) == | |
/\ messages' = SendFunc(m, messages, 1) | |
/\ UNCHANGED messages_discard | |
RECURSIVE SendAllFunc(_,_) | |
SendAllFunc(send_msgs, msgs) == | |
IF send_msgs = {} | |
THEN msgs | |
ELSE LET m == CHOOSE m \in send_msgs : TRUE | |
new_msgs == SendFunc(m, msgs, 1) | |
remaining == send_msgs \ {m} | |
IN SendAllFunc(remaining, new_msgs) | |
SendAll(msgs) == | |
/\ messages' = SendAllFunc(msgs, messages) | |
/\ UNCHANGED messages_discard | |
DiscardAndSendAll(d, msgs) == | |
/\ messages' = SendAllFunc(msgs, DiscardFunc(d, messages)) | |
/\ messages_discard' = messages_discard \union {d} | |
\* Set the delivery count to 0 so the message cannot be processed again. | |
Discard(d) == | |
/\ messages' = DiscardFunc(d, messages) | |
/\ messages_discard' = messages_discard \union {d} | |
Drop(msgs) == | |
/\ messages' = messages \ msgs | |
/\ messages_discard' = messages_discard \union msgs | |
\* Discard incoming message and reply with another | |
Reply(d, m) == | |
/\ d \in messages | |
/\ messages' = SendFunc(m, DiscardFunc(d, messages), 1) | |
/\ messages_discard' = messages_discard \union {d} | |
tc_tid_vars == <<tc_tid_metadata, tc_tid_transition>> | |
tc_log_vars == << tc_log, tc_log_hwm, tc_log_metadata >> | |
txn_log_vars == << txn_log_leader, txn_log_epoch >> | |
aux_vars == <<pid_source, t_to_p_mapping, aux_coord_ctr>> | |
topic_vars == <<topic_partitions>> | |
vars == <<client, tc_tid_vars, tc_log_vars, txn_log_vars, topic_vars, aux_vars, net_vars>> | |
View == <<client, tc_tid_vars, tc_log_vars, txn_log_vars, topic_vars, pid_source, t_to_p_mapping, NetworkView>> | |
SymmetryClients == Permutations(Clients) | |
SymmetryBrokers == Permutations(Brokers) | |
SymmetryPartitions == Permutations(TxnLogPartitions) | |
SymmetryTids == Permutations(TransactionIds) | |
Symmetry == SymmetryClients | |
\union SymmetryBrokers | |
\union SymmetryPartitions | |
\union SymmetryTids | |
\* ---------------------------------------------- | |
\* Types and state machine transitions | |
\* ---------------------------------------------- | |
\* Valid previous txn states | |
ValidPrevTxnStates(state) == | |
CASE state = Empty -> {None, Empty, CompleteCommit, CompleteAbort} | |
[] state = Ongoing -> {Ongoing, Empty, CompleteCommit, CompleteAbort} | |
[] state = PrepareCommit -> {Ongoing} | |
[] state = PrepareAbort -> {Ongoing, PrepareEpochFence} | |
[] state = CompleteCommit -> {PrepareCommit} | |
[] state = CompleteAbort -> {PrepareAbort} | |
[] state = Dead -> {Empty, CompleteCommit, CompleteAbort} | |
[] state = PrepareEpochFence -> {Ongoing} | |
[] OTHER -> {InvalidTransition} | |
InitPidRequestType == | |
[type: {InitPidRequest}, | |
tid: TransactionIds, | |
dest: Brokers, | |
source: Clients] | |
InitPidResponseType == | |
[type: {InitPidResponse}, | |
code: {OK, ConcurrentTransactions, NotCoordinator}, | |
pid: Nat \union {-1}, | |
pepoch: Nat \union {-1}, | |
dest: Clients, | |
source: Brokers] | |
AddPartitionsToTxnRequestType == | |
[type: {AddPartitionsToTxnRequest}, | |
tid: TransactionIds, | |
pid: Nat, | |
pepoch: Nat, | |
partitions: SUBSET TopicPartitions, | |
dest: Brokers, | |
source: Clients] | |
AddPartitionsToTxnResponseType == | |
[type: {AddPartitionsToTxnResponse}, | |
code: {OK, ConcurrentTransactions, NotCoordinator, | |
ProducerFenced, InvalidProducerIdMapping}, | |
partitions: SUBSET TopicPartitions, | |
dest: Clients, | |
source: Brokers] | |
MessageType == | |
InitPidRequestType \union InitPidResponseType | |
\union AddPartitionsToTxnRequestType | |
\union AddPartitionsToTxnResponseType | |
\* Common helpers start -------------------- | |
SetIllegalState == | |
Send([code |-> IllegalState]) | |
CurrentTC(p) == txn_log_leader[p] | |
\* Common helpers end -------------------- | |
\* ---------------------------------------------- | |
\* The client | |
\* ---------------------------------------------- | |
\* Client helpers start -------------------- | |
\* This is an atomic FindCoordinatorRequest implementation. | |
FindCoordinator(tid) == | |
LET p == t_to_p_mapping[tid] | |
IN CurrentTC(p) | |
\* Some client helpers end -------------------- | |
(* --------------------------------------------------------------- | |
ACTION: SendInitPidRequest | |
WHO: A client | |
A client sends an InitPidRequest to a broker. This spec does not | |
model the FindCoordinator request, it simply allows a client to | |
send an InitPidRequest to any broker. Given that the find coordinator | |
step can result in the wrong answer, this seems like a good shortcut | |
for keeping the specification as small as possible. | |
If the client has to TransactionId (tid) then one is non-deterministically | |
chosen, else its existing one is reused. | |
*) | |
SendInitPidRequest(c) == | |
/\ client[c].state = Ready | |
/\ \E tid \in TransactionIds : | |
\* If this is a retry, then reuse the same tid, else use whichever. | |
/\ IF client[c].tid # None THEN tid = client[c].tid ELSE TRUE | |
/\ Send([type |-> InitPidRequest, | |
tid |-> tid, | |
dest |-> FindCoordinator(tid), | |
source |-> c]) | |
/\ client' = [client EXCEPT ![c].tid = tid, | |
![c].state = SentInitPidRequest] | |
/\ UNCHANGED << tc_tid_vars, tc_log_vars, txn_log_vars, topic_vars, aux_vars >> | |
(* --------------------------------------------------------------- | |
ACTION: ReceiveInitPidResponse | |
WHO: A client | |
A client receives an InitPidResponse. If it is an OK response, | |
then it updates its pid and epoch, and transitions to the HasPid state. | |
These states are not part of the protocol, but used for implementing | |
the client as a state machine. | |
If the response is an error, then the client reverts back to Ready, where | |
it can retry the InitPidRequest. | |
*) | |
ReceiveInitPidResponse(c, b) == | |
/\ client[c].state = SentInitPidRequest | |
/\ \E msg \in messages : | |
/\ msg.dest = c | |
/\ msg.source = b | |
/\ msg.type = InitPidResponse | |
/\ IF msg.code = OK | |
THEN client' = [client EXCEPT ![c].state = HasPid, | |
![c].tc = msg.source, | |
![c].pid = msg.pid, | |
![c].epoch = msg.pepoch] | |
ELSE client' = [client EXCEPT ![c].state = Ready, | |
![c].last_state = client[c].state, | |
![c].last_error = msg.code] | |
/\ Discard(msg) | |
/\ UNCHANGED << tc_tid_vars, tc_log_vars, txn_log_vars, topic_vars, aux_vars >> | |
(* --------------------------------------------------------------- | |
ACTION: SendAddPartitionsToTxnRequest | |
WHO: A client | |
TODO | |
*) | |
SendAddPartitionsToTxnRequest(c) == | |
/\ client[c].state \in { HasPid, BegunTxn } | |
/\ client[c].pending_partitions = {} | |
/\ \E p \in TopicPartitions : | |
/\ p \notin client[c].partitions | |
/\ Send([type |-> AddPartitionsToTxnRequest, | |
tid |-> client[c].tid, | |
pid |-> client[c].pid, | |
pepoch |-> client[c].epoch, | |
partitions |-> {p}, \* one partition for now | |
dest |-> client[c].tc, | |
source |-> c]) | |
/\ client' = [client EXCEPT ![c].pending_partitions = {p}, | |
![c].state = BegunTxn] | |
/\ UNCHANGED << tc_tid_vars, tc_log_vars, txn_log_vars, topic_vars, aux_vars >> | |
(* --------------------------------------------------------------- | |
ACTION: ReceiveAddPartitionsToTxnResponse | |
WHO: A client | |
TODO | |
*) | |
ReceiveAddPartitionsToTxnResponse(c, b) == | |
/\ client[c].state = BegunTxn | |
/\ \E msg \in messages : | |
/\ msg.dest = c | |
/\ msg.source = b | |
/\ msg.type = AddPartitionsToTxnResponse | |
/\ CASE msg.code = OK -> | |
client' = [client EXCEPT ![c].pending_partitions = @ \ msg.partitions, | |
![c].partitions = @ \union msg.partitions] | |
[] msg.code \in {ConcurrentTransactions, NotCoordinator} -> | |
\* Retriable error, so may be try again | |
client' = [client EXCEPT ![c].pending_partitions = {}, | |
![c].last_state = client[c].state, | |
![c].last_error = msg.code, | |
![c].tc = IF msg.code = NotCoordinator | |
THEN FindCoordinator(client[c].tid) | |
ELSE client[c].tc] | |
[] OTHER -> | |
\* Some non-retriable error such as ProducerFenced. Go to the | |
\* Terminated state to prevent an infinite battle (and infinite | |
\* state space) with another client. | |
client' = [client EXCEPT ![c].state = Terminated, | |
![c].last_state = client[c].state, | |
![c].last_error = msg.code, | |
![c].pending_partitions = {}, | |
![c].partitions = {}] | |
/\ Discard(msg) | |
/\ UNCHANGED << tc_tid_vars, tc_log_vars, txn_log_vars, topic_vars, aux_vars >> | |
\* ---------------------------------------------- | |
\* The transaction coordinator actions | |
\* ---------------------------------------------- | |
\* COMMON HELPERS START -------------------------------------- | |
PartitionFor(tid) == t_to_p_mapping[tid] | |
PartitionMetadataOf(b, partition) == tc_log_metadata[b][partition] | |
PartitionMetadataOfTid(b, tid) == PartitionMetadataOf(b, PartitionFor(tid)) | |
CurrentTransition(b, tid) == | |
tc_tid_transition[b][tid] | |
IsPartitionLeader(b, p) == | |
b = tc_log_metadata[b][p].leader | |
GetTxnMetadata(b, tid) == | |
CASE ~IsPartitionLeader(b, PartitionFor(tid)) -> | |
[code |-> NotCoordinator] | |
[] tc_tid_metadata[b][tid] = None -> | |
[code |-> None] | |
[] OTHER-> | |
[code |-> OK, | |
txn_metadata |-> tc_tid_metadata[b][tid], | |
cepoch |-> PartitionMetadataOfTid(b, tid).cepoch] | |
GetTransition(b, curr_transition, curr_state, new_state, pid, | |
new_pepoch, new_last_pepoch, new_partitions) == | |
CASE | |
\* CASE 1 - Reject because an existing transition is currently being committed. | |
curr_transition # None -> | |
[code |-> ConcurrentTransactions] | |
\* CASE 2 - Accept as this is a valid transition | |
[] curr_state \in ValidPrevTxnStates(new_state) -> | |
\* Create a modified txn metadata transitioned to the new state | |
[code |-> OK, | |
transition |-> [pid |-> pid, | |
pepoch |-> new_pepoch, | |
last_pepoch |-> new_last_pepoch, | |
state |-> new_state, | |
partitions |-> new_partitions]] | |
\* CASE 3 - This shouldn't happen. | |
[] OTHER -> | |
\* Shouldn't get here | |
[code |-> InvalidTransition] | |
GetCompleteTransition(b, tid, txn_metadata) == | |
LET next_state == IF txn_metadata.state = PrepareCommit | |
THEN CompleteCommit ELSE CompleteAbort | |
IN GetTransition(b, None, \* clear the current pending transition to avoid an error | |
txn_metadata.state, \* current state | |
next_state, \* transition to CompleteAbort or CompleteCommit | |
txn_metadata.pid, \* same pid (no exhaustion) | |
txn_metadata.pepoch, \* TODO: epoch bumping? | |
txn_metadata.last_pepoch, | |
txn_metadata.partitions) \* no partitions change | |
ClearTransition(b, tid) == | |
tc_tid_transition' = [tc_tid_transition EXCEPT ![b][tid] = None] | |
AppendTxnLogEntry(b, p, tid, log, transition, callback) == | |
/\ tc_log' = [log EXCEPT ![b][p] = | |
Append(@, [tid |-> tid, | |
transition |-> transition, | |
callback |-> callback])] | |
/\ tc_tid_transition' = [tc_tid_transition EXCEPT ![b][tid] = transition] | |
\* COMMON HELPERS END -------------------------------------- | |
(* --------------------------------------------------------------- | |
ACTION: BecomeLeader | |
WHO: Transaction controller | |
TODO | |
*) | |
\* You would materialize txn state based on the log, but | |
\* that is quite ugly code, instead the simpler equivalent | |
\* is to copy the state of the original TC. | |
MaterializeState(tid) == | |
LET p == PartitionFor(tid) | |
b == CurrentTC(p) \* TC in current state is the original | |
IN tc_tid_metadata[b][tid] | |
BecomeLeader(b, p) == | |
/\ aux_coord_ctr < MaxCoordinatorChanges | |
/\ b # tc_log_metadata[b][p].leader | |
\* Bump the coordinator epoch of the partition leadership that is moving to | |
\* to this broker. | |
/\ LET cepoch == txn_log_epoch[p] + 1 | |
IN | |
\* Controller elects the new leader and bump the txn log partition epoch | |
/\ txn_log_leader' = [txn_log_leader EXCEPT ![p] = b] | |
/\ txn_log_epoch' = [txn_log_epoch EXCEPT ![p] = cepoch] | |
\* The new leader updates its txn log partition metadata with the new epoch. | |
/\ tc_log_metadata' = [tc_log_metadata EXCEPT ![b][p] = | |
[cepoch |-> cepoch, | |
leader |-> b]] | |
\* Materialize the txn metadata stored by this txn log partition | |
/\ tc_tid_metadata' = [tc_tid_metadata EXCEPT ![b] = | |
[tid \in TransactionIds |-> | |
IF PartitionFor(tid) = p | |
THEN MaterializeState(tid) | |
ELSE None]] | |
/\ aux_coord_ctr' = aux_coord_ctr + 1 | |
/\ UNCHANGED <<tc_tid_transition, tc_log, tc_log_hwm, client, | |
topic_vars, pid_source, t_to_p_mapping, net_vars>> | |
(* --------------------------------------------------------------- | |
ACTION: CompletePartialTxn | |
WHO: Transaction controller | |
After becoming the TC for a partition, there may be tids that | |
had a committed PrepareAbort/Commit, but no CompleteAbort/Commit. | |
This action ensures that are initiated prepare phase gets completed. | |
*) | |
\*CompletePartialTxn(b, tid) == | |
\* LET p == PartitionFor(tid) | |
\* txn_metadata == tc_tid_metadata[b][tid] | |
\* trans_result == GetCompleteTransition(b, tid, txn_metadata) | |
\* IN | |
\* /\ IsPartitionLeader(b, p) | |
\* /\ txn_metadata # None | |
\* /\ txn_metadata.state \in {PrepareAbort, PrepareCommit} \* Prepare in-progress | |
\* /\ tc_tid_transition[b][tid] = None \* but no current transition | |
\* /\ trans_result.code = OK | |
\* /\ AppendTxnLogEntry(b, p, tid, tc_log, | |
\* trans_result.transition, | |
\* [type |-> trans_result.transition.state, | |
\* response |-> None, | |
\* err |-> None]) | |
\* /\ UNCHANGED <<tc_tid_metadata, tc_log_hwm, tc_log_metadata, | |
\* txn_log_vars, topic_vars, client, aux_vars, net_vars>> | |
\* | |
(* --------------------------------------------------------------- | |
ACTION: BecomeFollower | |
WHO: Transaction controller | |
TODO | |
*) | |
RemoveNoneResponses(responses) == | |
{ r \in responses : r # None } | |
BecomeFollower(b, p) == | |
/\ tc_log_metadata[b][p].cepoch < txn_log_epoch[p] | |
/\ b # txn_log_leader[p] | |
/\ LET first_uncommitted_offset == tc_log_hwm[b][p]+1 | |
uncommitted_offsets == first_uncommitted_offset..Len(tc_log[b][p]) | |
uncommitted_entries == { tc_log[b][p][offset] : offset \in uncommitted_offsets } | |
err_responses == { entry.callback.err : entry \in uncommitted_entries } | |
IN | |
\* Update local log state (truncates the log to match the leader) | |
/\ tc_log_metadata' = [tc_log_metadata EXCEPT ![b][p] = | |
[cepoch |-> txn_log_epoch[p], | |
leader |-> txn_log_leader[p]]] | |
/\ tc_log' = [tc_log EXCEPT ![b][p] = tc_log[CurrentTC(p)][p]] | |
/\ tc_log_hwm' = [tc_log_hwm EXCEPT ![b][p] = tc_log_hwm[CurrentTC(p)][p]] | |
\* Clear out txn state of the affected tids | |
/\ tc_tid_metadata' = [tc_tid_metadata EXCEPT ![b] = | |
[tid \in TransactionIds |-> | |
IF PartitionFor(tid) = p | |
THEN None | |
ELSE tc_tid_metadata[b][tid]]] | |
\* Clear out txn pending transitions of the affected tids | |
/\ tc_tid_transition' = [tc_tid_transition EXCEPT ![b] = | |
[tid \in TransactionIds |-> | |
IF PartitionFor(tid) = p | |
THEN None | |
ELSE tc_tid_transition[b][tid]]] | |
\* Send error responses for pending ops | |
/\ SendAll(RemoveNoneResponses(err_responses)) | |
/\ UNCHANGED << tc_log_hwm, txn_log_vars, topic_vars, client, aux_vars>> | |
(* --------------------------------------------------------------- | |
ACTION: ReceiveInitPidRequest | |
WHO: Transaction controller | |
A TC receives an InitPidRequest. | |
- If the txn log partition for this tid does not belong to this TC | |
then it sends an InitPidResponse with the error NotCoordinatorForTransactionalId. | |
- If there is no existing metadata for a txn with this tid, | |
- Then new, empty metadata is created. When creating new metadata, | |
a unique ProducerId (pid) is generated with a producer epoch of -1. | |
- Else the existing txn metadata is used. | |
- If there is an in-progress transition (a prior transition was appended | |
to the txn log but it hasn't committed yet) | |
- Then the TC sends an InitPidResponse with the error ConcurrentTransactions. | |
- Else, a new transition is generated with: | |
- the Empty state | |
- the pid and incremented epoch | |
The TC appends this transition metadata to the txn log partition. | |
- Once the transition is committed to the txn log, the TC sends the | |
InitPidResponse to the client with the pid and incremented epoch. | |
*) | |
GetOrCreateTxnMetadata(b, tid) == | |
LET cached_md == GetTxnMetadata(b, tid) | |
IN | |
IF cached_md.code = None THEN | |
\* Generate new metadata. | |
[code |-> OK, | |
txn_metadata |-> [pid |-> pid_source + 1, | |
last_pid |-> -1, | |
pepoch |-> -1, | |
last_pepoch |-> -1, | |
state |-> Empty, | |
partitions |-> {}], | |
cepoch |-> PartitionMetadataOfTid(b, tid).cepoch] | |
ELSE cached_md | |
GetInitPidTransition(b, tid, txn_metadata) == | |
\* This is simple now, but lots more logic will get added here. | |
CASE txn_metadata.state \in { CompleteAbort, CompleteCommit, Empty } -> | |
GetTransition(b, CurrentTransition(b, tid), | |
txn_metadata.state, \* current state | |
Empty, \* transition to Empty | |
txn_metadata.pid, \* the pid (no exhaustion modeled) | |
txn_metadata.pepoch + 1, \* new pepoch (incremented) | |
txn_metadata.pepoch, \* last pepoch | |
{}) \* no partitions yet | |
[] txn_metadata.state = Ongoing -> | |
\* Abort the ongoing transaction first, by fencing the producer | |
GetTransition(b, CurrentTransition(b, tid), | |
txn_metadata.state, \* current state | |
PrepareEpochFence, \* transition to PrepareEpochFence | |
txn_metadata.pid, \* same pid (no exhaustion) | |
txn_metadata.pepoch + 1, \* bump the pepoch | |
-1, \* don't know why yet | |
txn_metadata.partitions) \* no partitions change | |
[] txn_metadata.state \in { PrepareAbort, PrepareCommit } -> | |
[code |-> ConcurrentTransactions] | |
[] OTHER -> | |
\* Shouldn't get here | |
[code |-> IllegalState] | |
MakePidResponse(b, c, code, pid, pepoch) == | |
[type |-> InitPidResponse, | |
code |-> code, | |
pid |-> pid, | |
pepoch |-> pepoch, | |
dest |-> c, | |
source |-> b] | |
MakeErrorPidResponse(b, c, code) == | |
MakePidResponse(b, c, code, -1, -1) | |
GetPrepareAbortOrCommitTransition(b, tid, curr_metadata, new_metadata, next_state) == | |
GetTransition(b, CurrentTransition(b, tid), | |
curr_metadata.state, \* current state | |
next_state, \* transition to PrepareAbort or PrepareCommit | |
new_metadata.pid, \* same pid (no exhaustion) | |
new_metadata.pepoch, \* TODO: client support for bump pepoch? | |
new_metadata.last_pepoch, | |
curr_metadata.partitions) \* no partitions change | |
\* Transition to PrepareCommit or PrepareAbort | |
EndTransaction(msg, b, c, curr_metadata, new_metadata, | |
partition, txn_result, is_from_client) == | |
CASE \/ (is_from_client /\ new_metadata.pepoch # curr_metadata.pepoch) | |
\/ new_metadata.pepoch < curr_metadata.pepoch -> | |
/\ Reply(msg, MakeErrorPidResponse(b, c, ProducerFenced)) | |
/\ UNCHANGED << tc_tid_transition >> | |
[] curr_metadata.state = Ongoing -> | |
LET next_state == IF txn_result = Abort THEN PrepareAbort ELSE PrepareCommit | |
trans_result == IF next_state = PrepareAbort /\ new_metadata.state = PrepareEpochFence | |
THEN GetPrepareAbortOrCommitTransition(b, msg.tid, curr_metadata, | |
new_metadata, next_state) | |
ELSE None | |
IN /\ Discard(msg) \* We'll reply once the transition is complete | |
/\ AppendTxnLogEntry(b, partition, msg.tid, tc_log, | |
trans_result.transition, | |
[type |-> next_state, | |
response |-> MakePidResponse(b, c, ConcurrentTransactions, -1, -1), | |
err |-> MakeErrorPidResponse(b, c, NotCoordinator)]) | |
[] OTHER -> | |
/\ Reply(msg, MakeErrorPidResponse(b, c, IllegalState)) | |
/\ UNCHANGED << tc_tid_transition, tc_log >> | |
ReceiveInitPidRequest(b, c) == | |
\E msg \in messages : | |
/\ msg.dest = b | |
/\ msg.source = c | |
/\ msg.type = InitPidRequest | |
/\ LET md_result == GetOrCreateTxnMetadata(b, msg.tid) | |
trans_result == GetInitPidTransition(b, msg.tid, | |
md_result.txn_metadata) | |
partition == PartitionFor(msg.tid) | |
IN | |
IF md_result.code # OK | |
THEN /\ Reply(msg, MakeErrorPidResponse(b, c, md_result.code)) | |
/\ UNCHANGED <<pid_source, tc_tid_vars, tc_log_vars>> | |
ELSE /\ pid_source' = md_result.txn_metadata.pid | |
/\ tc_tid_metadata' = [tc_tid_metadata EXCEPT ![b][msg.tid] = md_result.txn_metadata] | |
/\ CASE | |
\* CASE 1 - Can't make a transition right now. | |
trans_result.code # OK -> | |
/\ Reply(msg, MakeErrorPidResponse(b, c, trans_result.code)) | |
/\ UNCHANGED << tc_tid_transition, tc_log >> | |
\* CASE 2 - Need to fence the current producer and abort its txn | |
[] trans_result.transition.state = PrepareEpochFence -> | |
EndTransaction(msg, b, c, | |
md_result.txn_metadata, | |
trans_result.transition, | |
partition, | |
Abort, FALSE) | |
\* CASE 3 - All ok, write the transition to the local txn log partition | |
[] OTHER -> | |
/\ Discard(msg) \* We'll reply once the transition is complete | |
/\ AppendTxnLogEntry(b, partition, msg.tid, tc_log, | |
trans_result.transition, | |
[type |-> InitPidRequest, | |
response |-> MakePidResponse(b, c, OK, | |
trans_result.transition.pid, | |
trans_result.transition.pepoch), | |
err |-> MakeErrorPidResponse(b, c, NotCoordinator)]) | |
/\ UNCHANGED << client, tc_log_hwm, tc_log_metadata, txn_log_vars, | |
topic_vars, t_to_p_mapping, aux_coord_ctr >> | |
(* --------------------------------------------------------------- | |
ACTION: ReceiveAddPartitionsToTxnRequest | |
WHO: Transaction controller | |
TODO | |
*) | |
MakeAddPartsResponse(b, c, code, partitions) == | |
[type |-> AddPartitionsToTxnResponse, | |
code |-> code, | |
partitions |-> partitions, | |
dest |-> c, | |
source |-> b] | |
ReplyWithError(msg, b, c, error) == | |
/\ Reply(msg, MakeAddPartsResponse(b, c, error, {})) | |
/\ UNCHANGED << tc_tid_transition, tc_log >> | |
GetAddPartitionsTransition(b, tid, txn_metadata, add_partitions) == | |
GetTransition(b, CurrentTransition(b, tid), | |
txn_metadata.state, \* the current state | |
Ongoing, \* transition to ONGOING | |
txn_metadata.pid, | |
txn_metadata.pepoch, | |
txn_metadata.last_pepoch, | |
txn_metadata.partitions \union add_partitions) | |
ReceiveAddPartitionsToTxnRequest(b, c) == | |
\E msg \in messages : | |
/\ msg.dest = b | |
/\ msg.source = c | |
/\ msg.type = AddPartitionsToTxnRequest | |
/\ LET md_result == GetTxnMetadata(b, msg.tid) | |
trans_result == GetAddPartitionsTransition(b, msg.tid, | |
md_result.txn_metadata, | |
msg.partitions) | |
partition == PartitionFor(msg.tid) | |
IN | |
CASE md_result.code = None -> | |
ReplyWithError(msg, b, c, InvalidProducerIdMapping) | |
[] md_result.code = NotCoordinator -> | |
ReplyWithError(msg, b, c, NotCoordinator) | |
[] msg.pid # md_result.txn_metadata.pid -> | |
ReplyWithError(msg, b, c, InvalidProducerIdMapping) | |
[] msg.pepoch # md_result.txn_metadata.pepoch -> | |
ReplyWithError(msg, b, c, ProducerFenced) | |
[] tc_tid_transition[b][msg.tid] # None -> | |
ReplyWithError(msg, b, c, ConcurrentTransactions) | |
[] md_result.txn_metadata.state \in {PrepareCommit, PrepareAbort} -> | |
ReplyWithError(msg, b, c, ConcurrentTransactions) | |
[] trans_result.code # OK -> | |
ReplyWithError(msg, b, c, trans_result.code) | |
[] OTHER -> | |
/\ Discard(msg) \* We'll reply once the transition is complete | |
/\ AppendTxnLogEntry(b, partition, msg.tid, tc_log, | |
trans_result.transition, | |
[type |-> AddPartitionsToTxnRequest, | |
response |-> MakeAddPartsResponse(b, c, OK, trans_result.transition.partitions), | |
err |-> MakeAddPartsResponse(b, c, NotCoordinator, {})]) | |
/\ UNCHANGED <<tc_tid_metadata, tc_log_hwm, tc_log_metadata, txn_log_vars, | |
topic_vars, client, aux_vars>> | |
(* --------------------------------------------------------------- | |
ACTION: TxnLogAppendCommits | |
WHO: Transaction controller | |
A pending write to the txn log commits. | |
Note that NotCoordinator handling of pending log entries is handled | |
in BecomeFollower. | |
*) | |
AdvanceLogHwm(b, p, offset) == | |
tc_log_hwm' = [bb \in Brokers |-> | |
[tc_log_hwm[bb] EXCEPT ![p] = offset]] | |
CanCommit(b, p) == | |
/\ b = CurrentTC(p) | |
/\ \A bb \in Brokers : | |
tc_log_metadata[bb][p].cepoch = tc_log_metadata[b][p].cepoch | |
LogAfterReplication(b, p, entry) == | |
[bb \in Brokers |-> | |
IF bb # b | |
THEN [tc_log[bb] EXCEPT ![p] = Append(@, entry)] | |
ELSE tc_log[b]] | |
SetTxnMetadata(b, tid, transition) == | |
tc_tid_metadata' = [tc_tid_metadata EXCEPT ![b][tid] = transition] | |
HandlePrepareAbortOrCommit(b, p, log_after_rep, entry) == | |
\* TODO: Check validations in TransactionMetadata.scala completeTransitionTo | |
\* Start a new transition to complete the commit/abort. | |
LET new_trans_result == GetCompleteTransition(b, entry.tid, entry.transition) | |
IN IF new_trans_result.code = OK | |
\* The *new* transition is ok so append it and send the response now. | |
\* Note we don't store a response for the new transition, as we already respond now. | |
THEN /\ AppendTxnLogEntry(b, p, entry.tid, log_after_rep, | |
new_trans_result.transition, | |
[type |-> new_trans_result.transition.state, | |
response |-> None, | |
err |-> None]) | |
/\ Send(entry.callback.response) | |
\* The *new* transition is not ok so don't append it, and send the response with the updated error code | |
ELSE LET response == [entry.callback.response EXCEPT !.code = new_trans_result.code] | |
IN /\ tc_log' = log_after_rep | |
/\ ClearTransition(b, entry.tid) | |
/\ Send(response) | |
HandleCompleteAbortOrCommit(b, log_after_rep, entry) == | |
\* TODO: Check validations in TransactionMetadata.scala completeTransitionTo | |
\* TODO: Advance the LSO (not modeled yet) | |
\* TODO: Write txn markers (not modeled yet) | |
/\ tc_log' = log_after_rep | |
/\ ClearTransition(b, entry.tid) | |
/\ UNCHANGED <<messages, messages_discard>> | |
HandleInitPid(b, log_after_rep, entry) == | |
\* TODO: Check validations in TransactionMetadata.scala completeTransitionTo | |
/\ tc_log' = log_after_rep | |
/\ ClearTransition(b, entry.tid) | |
/\ Send(entry.callback.response) | |
HandleAddPartitions(b, p, log_after_rep, entry) == | |
\* TODO: Check validations in TransactionMetadata.scala completeTransitionTo | |
/\ tc_log' = log_after_rep | |
/\ ClearTransition(b, entry.tid) | |
/\ Send(entry.callback.response) | |
CommitTxnLogAppend(b, p) == | |
/\ tc_log_hwm[b][p] < Len(tc_log[b][p]) | |
/\ LET next_offset == tc_log_hwm[b][p] + 1 | |
entry == tc_log[b][p][next_offset] | |
log_after_rep == LogAfterReplication(b, p, entry) | |
IN /\ CanCommit(b, p) | |
/\ AdvanceLogHwm(b, p, next_offset) | |
/\ SetTxnMetadata(b, entry.tid, entry.transition) | |
/\ CASE entry.callback.type \in {PrepareAbort, PrepareCommit } -> | |
HandlePrepareAbortOrCommit(b, p, log_after_rep, entry) | |
[] entry.callback.type \in {CompleteAbort, CompleteCommit } -> | |
HandleCompleteAbortOrCommit(b, log_after_rep, entry) | |
[] entry.callback.type = InitPidRequest -> | |
HandleInitPid(b, log_after_rep, entry) | |
[] entry.callback.type = AddPartitionsToTxnRequest -> | |
HandleAddPartitions(b, p, log_after_rep, entry) | |
[] OTHER -> SetIllegalState | |
/\ UNCHANGED << client, txn_log_vars, tc_log_metadata, topic_vars, aux_vars >> | |
\* ---------------------------------------------- | |
\* Invariants | |
\* ---------------------------------------------- | |
\* Currently only checks that messages are valid. | |
TypeOK == | |
/\ \A m \in messages : | |
m \in MessageType | |
\* Catch any IllegalState or InvalidTransition | |
NoBadStateResponse == | |
~\E msg \in messages : | |
\/ /\ msg.type = InitPidResponse | |
/\ msg.code \in {IllegalState, InvalidTransition} | |
\* Used for debugging | |
TestInv == | |
TRUE | |
\* ~\E m \in messages : | |
\* /\ m.type = AddPartitionsToTxnResponse | |
\* /\ m.code = ProducerFenced | |
\* ---------------------------------------------- | |
\* Liveness properties | |
\* ---------------------------------------------- | |
\* Eventually all clients will receive a pid, even if there | |
\* are multiple clients and one tid. This is because: | |
\* 1. When a client receives an error InitPidResponse, it always sends | |
\* a new request. The fairness states that ultimately, it will send | |
\* a request to the right broker, eventually. | |
\* 2. When a client receives a success InitPidResponse, it does nothing | |
\* further. | |
\* Given multiple clients, multiple brokers, and one tid, in the end, | |
\* we expect the pepoch of that tid to reach the number of clients. | |
EventuallyAllClientsGetPid == | |
<>[](\A c \in Clients : client[c].pid > -1) | |
EventuallyOneClientAddsAllPartitions == | |
<>[](\E c \in Clients : /\ client[c].state = BegunTxn | |
/\ \A tp \in TopicPartitions : | |
tp \in client[c].partitions) | |
\* ---------------------------------------------- | |
\* Init and Next | |
\* ---------------------------------------------- | |
Next == | |
\/ \E c \in Clients : SendInitPidRequest(c) | |
\/ \E c \in Clients, b \in Brokers : ReceiveInitPidRequest(b, c) | |
\/ \E c \in Clients, b \in Brokers : ReceiveInitPidResponse(c, b) | |
\/ \E c \in Clients : SendAddPartitionsToTxnRequest(c) | |
\/ \E c \in Clients, b \in Brokers : ReceiveAddPartitionsToTxnRequest(b, c) | |
\/ \E c \in Clients, b \in Brokers : ReceiveAddPartitionsToTxnResponse(c, b) | |
\/ \E b \in Brokers, p \in TxnLogPartitions: CommitTxnLogAppend(b, p) | |
\/ \E b \in Brokers, p \in TxnLogPartitions: BecomeLeader(b, p) | |
\/ \E b \in Brokers, p \in TxnLogPartitions: BecomeFollower(b, p) | |
\* \/ \E b \in Brokers, tid \in TransactionIds : | |
\* CompletePartialTxn(b, tid) | |
EmptyMap == [x \in {} |-> None] | |
EmptyTxnState == [tid \in TransactionIds |-> None] | |
BalancedTidToPartSpread(mapping) == | |
\* Ensure the tid -> txn log partition mapping is evenly distributed. | |
\A p1, p2 \in TxnLogPartitions : | |
Quantify(DOMAIN mapping, LAMBDA tid : mapping[tid] = p1) | |
- Quantify(DOMAIN mapping, LAMBDA tid : mapping[tid] = p2) \in {-1, 0, 1} | |
BalancedPartitionLeadership(part_leader) == | |
\A br1, br2 \in Brokers : | |
LET br1_parts == {p \in TxnLogPartitions : part_leader[p] = br1} | |
br2_parts == {p \in TxnLogPartitions : part_leader[p] = br2} | |
IN Cardinality(br1_parts) - Cardinality(br2_parts) \in {-1, 0, 1} | |
Init == | |
LET tid_to_part_mapping == CHOOSE mapping \in [TransactionIds -> TxnLogPartitions] : | |
BalancedTidToPartSpread(mapping) | |
log_part_leader == CHOOSE mapping \in [TxnLogPartitions -> Brokers] : | |
BalancedPartitionLeadership(mapping) | |
IN | |
/\ client = [c \in Clients |-> | |
[state |-> Ready, | |
tc |-> None, | |
tid |-> None, | |
pid |-> -1, | |
epoch |-> -1, | |
last_state |-> None, | |
last_error |-> None, | |
pending_partitions |-> {}, | |
partitions |-> {}]] | |
/\ tc_tid_metadata = [b \in Brokers |-> [tid \in TransactionIds |-> None]] | |
/\ tc_tid_transition = [b \in Brokers |-> [tid \in TransactionIds |-> None]] | |
/\ tc_log = [b \in Brokers |-> | |
[p \in TxnLogPartitions |-> <<>>]] | |
/\ tc_log_hwm = [b \in Brokers |-> | |
[p \in TxnLogPartitions |-> 0]] | |
/\ tc_log_metadata = [b \in Brokers |-> | |
[p \in TxnLogPartitions |-> | |
[cepoch |-> 1, | |
leader |-> log_part_leader[p]]]] | |
/\ txn_log_epoch = [p \in TxnLogPartitions |-> 1] | |
/\ txn_log_leader = [p \in TxnLogPartitions |-> log_part_leader[p]] | |
/\ topic_partitions = [tp \in TopicPartitions |-> <<>>] | |
/\ t_to_p_mapping = tid_to_part_mapping | |
/\ pid_source = 0 | |
/\ aux_coord_ctr = 0 | |
/\ NetworkInit | |
\* Note that: | |
\* 1. SendInitPidRequest requires strong fairness because | |
\* sending to another broker will disable the action. So we need | |
\* fairness that applies to a state that is enabled infinitely often. | |
\* 2. SendAddPartitionsToTxnRequest is strongly fair right now, but I | |
\* may change that. More of a reminder to self. | |
Fairness == | |
/\ \A c \in Clients : | |
/\ SF_vars(SendInitPidRequest(c)) | |
/\ \A b \in Brokers : | |
/\ WF_vars(ReceiveInitPidRequest(b, c)) | |
/\ WF_vars(ReceiveInitPidResponse(c, b)) | |
/\ SF_vars(SendAddPartitionsToTxnRequest(c)) | |
/\ \A b \in Brokers : | |
/\ WF_vars(ReceiveAddPartitionsToTxnResponse(c, b)) | |
/\ WF_vars(ReceiveAddPartitionsToTxnRequest(b, c)) | |
/\ \A b \in Brokers, p \in TxnLogPartitions: | |
/\ WF_vars(CommitTxnLogAppend(b, p)) | |
/\ WF_vars(BecomeFollower(b, p)) | |
\* /\ \A b \in Brokers, tid \in TransactionIds : | |
\* WF_vars(CompletePartialTxn(b, tid)) | |
Spec == Init /\ [][Next]_vars | |
LivenessSpec == Init /\ [][Next]_vars /\ Fairness | |
============================================================================= |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment