Skip to content

Instantly share code, notes, and snippets.

@dasl-
Created October 7, 2019 17:20
Show Gist options
  • Save dasl-/c885bd68acccfc6ec25f763db76b8900 to your computer and use it in GitHub Desktop.
Save dasl-/c885bd68acccfc6ec25f763db76b8900 to your computer and use it in GitHub Desktop.
diff --git a/go/vt/vttablet/tabletserver/tx_engine.go b/go/vt/vttablet/tabletserver/tx_engine.go
index 148569a02..37ae52b6b 100644
--- a/go/vt/vttablet/tabletserver/tx_engine.go
+++ b/go/vt/vttablet/tabletserver/tx_engine.go
@@ -118,6 +118,7 @@ func NewTxEngine(checker connpool.MySQLChecker, config tabletenv.TabletConfig) *
config.FoundRowsPoolSize,
config.TxPoolPrefillParallelism,
time.Duration(config.TransactionTimeout*1e9),
+ time.Duration(config.TxPoolTimeout*1e9),
time.Duration(config.IdleTimeout*1e9),
config.TxPoolWaiterCap,
checker,
diff --git a/go/vt/vttablet/tabletserver/tx_pool.go b/go/vt/vttablet/tabletserver/tx_pool.go
index aaa74dbca..1825eefcb 100644
--- a/go/vt/vttablet/tabletserver/tx_pool.go
+++ b/go/vt/vttablet/tabletserver/tx_pool.go
@@ -88,13 +88,14 @@ type TxPool struct {
// connections with CLIENT_FOUND_ROWS flag set. A separate
// pool is needed because this option can only be set at
// connection time.
- foundRowsPool *connpool.Pool
- activePool *pools.Numbered
- lastID sync2.AtomicInt64
- timeout sync2.AtomicDuration
- ticks *timer.Timer
- checker connpool.MySQLChecker
- limiter txlimiter.TxLimiter
+ foundRowsPool *connpool.Pool
+ activePool *pools.Numbered
+ lastID sync2.AtomicInt64
+ transactionTimeout sync2.AtomicDuration
+ transactionPoolTimeout sync2.AtomicDuration
+ ticks *timer.Timer
+ checker connpool.MySQLChecker
+ limiter txlimiter.TxLimiter
// Tracking culprits that cause tx pool full errors.
logMu sync.Mutex
lastLog time.Time
@@ -108,27 +109,30 @@ func NewTxPool(
capacity int,
foundRowsCapacity int,
prefillParallelism int,
- timeout time.Duration,
+ transactionTimeout time.Duration,
+ transactionPoolTimeout time.Duration,
idleTimeout time.Duration,
waiterCap int,
checker connpool.MySQLChecker,
limiter txlimiter.TxLimiter) *TxPool {
axp := &TxPool{
- conns: connpool.New(prefix+"TransactionPool", capacity, prefillParallelism, idleTimeout, checker),
- foundRowsPool: connpool.New(prefix+"FoundRowsPool", foundRowsCapacity, prefillParallelism, idleTimeout, checker),
- activePool: pools.NewNumbered(),
- lastID: sync2.NewAtomicInt64(time.Now().UnixNano()),
- timeout: sync2.NewAtomicDuration(timeout),
- waiterCap: sync2.NewAtomicInt64(int64(waiterCap)),
- waiters: sync2.NewAtomicInt64(0),
- ticks: timer.NewTimer(timeout / 10),
- checker: checker,
- limiter: limiter,
+ conns: connpool.New(prefix+"TransactionPool", capacity, prefillParallelism, idleTimeout, checker),
+ foundRowsPool: connpool.New(prefix+"FoundRowsPool", foundRowsCapacity, prefillParallelism, idleTimeout, checker),
+ activePool: pools.NewNumbered(),
+ lastID: sync2.NewAtomicInt64(time.Now().UnixNano()),
+ transactionTimeout: sync2.NewAtomicDuration(transactionTimeout),
+ transactionPoolTimeout: sync2.NewAtomicDuration(transactionPoolTimeout),
+ waiterCap: sync2.NewAtomicInt64(int64(waiterCap)),
+ waiters: sync2.NewAtomicInt64(0),
+ ticks: timer.NewTimer(transactionTimeout / 10),
+ checker: checker,
+ limiter: limiter,
}
txOnce.Do(func() {
// Careful: conns also exports name+"xxx" vars,
// but we know it doesn't export Timeout.
- stats.NewGaugeDurationFunc(prefix+"TransactionPoolTimeout", "Transaction pool timeout", axp.timeout.Get)
+ stats.NewGaugeDurationFunc(prefix+"TransactionTimeout", "Transaction timeout", axp.transactionTimeout.Get)
+ stats.NewGaugeDurationFunc(prefix+"TransactionPoolTimeout", "Timeout to get a connection from the transaction pool", axp.transactionPoolTimeout.Get)
stats.NewGaugeFunc(prefix+"TransactionPoolWaiters", "Transaction pool waiters", axp.waiters.Get)
})
return axp
@@ -230,10 +234,11 @@ func (axp *TxPool) Begin(ctx context.Context, options *querypb.ExecuteOptions) (
axp.limiter.Release(immediateCaller, effectiveCaller)
}()
+ poolCtx, _ := context.WithTimeout(ctx, axp.transactionPoolTimeout.Get())
if options.GetClientFoundRows() {
- conn, err = axp.foundRowsPool.Get(ctx)
+ conn, err = axp.foundRowsPool.Get(poolCtx)
} else {
- conn, err = axp.conns.Get(ctx)
+ conn, err = axp.conns.Get(poolCtx)
}
if err != nil {
switch err {
@@ -388,12 +393,12 @@ func (axp *TxPool) LogActive() {
// Timeout returns the transaction timeout.
func (axp *TxPool) Timeout() time.Duration {
- return axp.timeout.Get()
+ return axp.transactionTimeout.Get()
}
// SetTimeout sets the transaction timeout.
func (axp *TxPool) SetTimeout(timeout time.Duration) {
- axp.timeout.Set(timeout)
+ axp.transactionTimeout.Set(timeout)
axp.ticks.SetInterval(timeout / 10)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment