Created
November 3, 2020 22:01
-
-
Save dasl-/91b927ae15699b0b847cd2a8e9174ed3 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/go/vt/vttablet/tabletserver/tx_pool.go b/go/vt/vttablet/tabletserver/tx_pool.go | |
index 872e3825e..a1080420e 100644 | |
--- a/go/vt/vttablet/tabletserver/tx_pool.go | |
+++ b/go/vt/vttablet/tabletserver/tx_pool.go | |
@@ -426,30 +426,31 @@ func (axp *TxPool) SetPoolTimeout(timeout time.Duration) { | |
// are failures. | |
type TxConnection struct { | |
*connpool.DBConn | |
TransactionID int64 | |
pool *TxPool | |
StartTime time.Time | |
EndTime time.Time | |
Queries []string | |
NewMessages map[string][]*messager.MessageRow | |
ChangedMessages map[string][]string | |
Conclusion string | |
LogToFile sync2.AtomicInt32 | |
ImmediateCallerID *querypb.VTGateCallerID | |
EffectiveCallerID *vtrpcpb.CallerID | |
Autocommit bool | |
+ concludeMu sync.Mutex | |
} | |
func newTxConnection(conn *connpool.DBConn, transactionID int64, pool *TxPool, immediate *querypb.VTGateCallerID, effective *vtrpcpb.CallerID, autocommit bool) *TxConnection { | |
return &TxConnection{ | |
DBConn: conn, | |
TransactionID: transactionID, | |
pool: pool, | |
StartTime: time.Now(), | |
NewMessages: make(map[string][]*messager.MessageRow), | |
ChangedMessages: make(map[string][]string), | |
ImmediateCallerID: immediate, | |
EffectiveCallerID: effective, | |
Autocommit: autocommit, | |
} | |
} | |
@@ -487,30 +488,37 @@ func (txc *TxConnection) BeginAgain(ctx context.Context) error { | |
// active. | |
func (txc *TxConnection) Recycle() { | |
if txc.IsClosed() { | |
txc.conclude(TxClose, "closed") | |
} else { | |
txc.pool.activePool.Put(txc.TransactionID) | |
} | |
} | |
// RecordQuery records the query against this transaction. | |
func (txc *TxConnection) RecordQuery(query string) { | |
txc.Queries = append(txc.Queries, query) | |
} | |
func (txc *TxConnection) conclude(conclusion, reason string) { | |
+ txc.concludeMu.Lock() | |
+ defer txc.concludeMu.Unlock() | |
+ if (txc.DBConn == nil) { | |
+ // Another process already concluded this connection. Bail to avoid a go panic. | |
+ return | |
+ } | |
+ | |
txc.pool.activePool.Unregister(txc.TransactionID, reason) | |
txc.DBConn.Recycle() | |
txc.DBConn = nil | |
txc.pool.limiter.Release(txc.ImmediateCallerID, txc.EffectiveCallerID) | |
txc.log(conclusion) | |
} | |
func (txc *TxConnection) log(conclusion string) { | |
txc.Conclusion = conclusion | |
txc.EndTime = time.Now() | |
username := callerid.GetPrincipal(txc.EffectiveCallerID) | |
if username == "" { | |
username = callerid.GetUsername(txc.ImmediateCallerID) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment