Skip to content

Instantly share code, notes, and snippets.

@dasl-
Created November 3, 2020 22:01
Show Gist options
  • Save dasl-/91b927ae15699b0b847cd2a8e9174ed3 to your computer and use it in GitHub Desktop.
Save dasl-/91b927ae15699b0b847cd2a8e9174ed3 to your computer and use it in GitHub Desktop.
diff --git a/go/vt/vttablet/tabletserver/tx_pool.go b/go/vt/vttablet/tabletserver/tx_pool.go
index 872e3825e..a1080420e 100644
--- a/go/vt/vttablet/tabletserver/tx_pool.go
+++ b/go/vt/vttablet/tabletserver/tx_pool.go
@@ -426,30 +426,31 @@ func (axp *TxPool) SetPoolTimeout(timeout time.Duration) {
// are failures.
type TxConnection struct {
*connpool.DBConn
TransactionID int64
pool *TxPool
StartTime time.Time
EndTime time.Time
Queries []string
NewMessages map[string][]*messager.MessageRow
ChangedMessages map[string][]string
Conclusion string
LogToFile sync2.AtomicInt32
ImmediateCallerID *querypb.VTGateCallerID
EffectiveCallerID *vtrpcpb.CallerID
Autocommit bool
+ concludeMu sync.Mutex
}
func newTxConnection(conn *connpool.DBConn, transactionID int64, pool *TxPool, immediate *querypb.VTGateCallerID, effective *vtrpcpb.CallerID, autocommit bool) *TxConnection {
return &TxConnection{
DBConn: conn,
TransactionID: transactionID,
pool: pool,
StartTime: time.Now(),
NewMessages: make(map[string][]*messager.MessageRow),
ChangedMessages: make(map[string][]string),
ImmediateCallerID: immediate,
EffectiveCallerID: effective,
Autocommit: autocommit,
}
}
@@ -487,30 +488,37 @@ func (txc *TxConnection) BeginAgain(ctx context.Context) error {
// active.
func (txc *TxConnection) Recycle() {
if txc.IsClosed() {
txc.conclude(TxClose, "closed")
} else {
txc.pool.activePool.Put(txc.TransactionID)
}
}
// RecordQuery records the query against this transaction.
func (txc *TxConnection) RecordQuery(query string) {
txc.Queries = append(txc.Queries, query)
}
func (txc *TxConnection) conclude(conclusion, reason string) {
+ txc.concludeMu.Lock()
+ defer txc.concludeMu.Unlock()
+ if (txc.DBConn == nil) {
+ // Another process already concluded this connection. Bail to avoid a go panic.
+ return
+ }
+
txc.pool.activePool.Unregister(txc.TransactionID, reason)
txc.DBConn.Recycle()
txc.DBConn = nil
txc.pool.limiter.Release(txc.ImmediateCallerID, txc.EffectiveCallerID)
txc.log(conclusion)
}
func (txc *TxConnection) log(conclusion string) {
txc.Conclusion = conclusion
txc.EndTime = time.Now()
username := callerid.GetPrincipal(txc.EffectiveCallerID)
if username == "" {
username = callerid.GetUsername(txc.ImmediateCallerID)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment