Skip to content

Instantly share code, notes, and snippets.

@dasl-
Created September 17, 2019 23:32
Show Gist options
  • Save dasl-/65aa3d25c03133e500671202a9aa06df to your computer and use it in GitHub Desktop.
Save dasl-/65aa3d25c03133e500671202a9aa06df to your computer and use it in GitHub Desktop.
diff --git a/go/cmd/mysqlctl/mysqlctl.go b/go/cmd/mysqlctl/mysqlctl.go
index da7eec12d..30aa20ecd 100644
--- a/go/cmd/mysqlctl/mysqlctl.go
+++ b/go/cmd/mysqlctl/mysqlctl.go
@@ -25,6 +25,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/exit"
"vitess.io/vitess/go/flagutil"
"vitess.io/vitess/go/mysql"
@@ -71,7 +72,7 @@ func initCmd(subFlags *flag.FlagSet, args []string) error {
}
defer mysqld.Close()
- ctx, cancel := context.WithTimeout(context.Background(), *waitTime)
+ ctx, cancel := context2.WithTimeout(context.Background(), *waitTime, "./go/cmd/mysqlctl/mysqlctl.go:74")
defer (func() { log.Infof("ctx cancel at go/cmd/mysqlctl/mysqlctl.go:75\n"); cancel() })()
if err := mysqld.Init(ctx, cnf, *initDBSQLFile); err != nil {
return fmt.Errorf("failed init mysql: %v", err)
@@ -104,7 +105,7 @@ func shutdownCmd(subFlags *flag.FlagSet, args []string) error {
}
defer mysqld.Close()
- ctx, cancel := context.WithTimeout(context.Background(), *waitTime)
+ ctx, cancel := context2.WithTimeout(context.Background(), *waitTime, "./go/cmd/mysqlctl/mysqlctl.go:107")
defer (func() { log.Infof("ctx cancel at go/cmd/mysqlctl/mysqlctl.go:108\n"); cancel() })()
if err := mysqld.Shutdown(ctx, cnf, true); err != nil {
return fmt.Errorf("failed shutdown mysql: %v", err)
@@ -125,7 +126,7 @@ func startCmd(subFlags *flag.FlagSet, args []string) error {
}
defer mysqld.Close()
- ctx, cancel := context.WithTimeout(context.Background(), *waitTime)
+ ctx, cancel := context2.WithTimeout(context.Background(), *waitTime, "./go/cmd/mysqlctl/mysqlctl.go:128")
defer (func() { log.Infof("ctx cancel at go/cmd/mysqlctl/mysqlctl.go:129\n"); cancel() })()
if err := mysqld.Start(ctx, cnf, mysqldArgs...); err != nil {
return fmt.Errorf("failed start mysql: %v", err)
@@ -145,7 +146,7 @@ func teardownCmd(subFlags *flag.FlagSet, args []string) error {
}
defer mysqld.Close()
- ctx, cancel := context.WithTimeout(context.Background(), *waitTime)
+ ctx, cancel := context2.WithTimeout(context.Background(), *waitTime, "./go/cmd/mysqlctl/mysqlctl.go:148")
defer (func() { log.Infof("ctx cancel at go/cmd/mysqlctl/mysqlctl.go:149\n"); cancel() })()
if err := mysqld.Teardown(ctx, cnf, *force); err != nil {
return fmt.Errorf("failed teardown mysql (forced? %v): %v", *force, err)
diff --git a/go/cmd/mysqlctld/mysqlctld.go b/go/cmd/mysqlctld/mysqlctld.go
index 5c17ea813..8cafcb6fd 100644
--- a/go/cmd/mysqlctld/mysqlctld.go
+++ b/go/cmd/mysqlctld/mysqlctld.go
@@ -25,6 +25,7 @@ import (
"time"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/exit"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/log"
@@ -68,7 +69,7 @@ func main() {
}
// Start or Init mysqld as needed.
- ctx, cancel := context.WithTimeout(context.Background(), *waitTime)
+ ctx, cancel := context2.WithTimeout(context.Background(), *waitTime, "./go/cmd/mysqlctld/mysqlctld.go:71")
mycnfFile := mysqlctl.MycnfFile(uint32(*tabletUID))
if _, statErr := os.Stat(mycnfFile); os.IsNotExist(statErr) {
// Generate my.cnf from scratch and use it to find mysqld.
diff --git a/go/cmd/vtbackup/vtbackup.go b/go/cmd/vtbackup/vtbackup.go
index 0caee687e..4da1f6089 100644
--- a/go/cmd/vtbackup/vtbackup.go
+++ b/go/cmd/vtbackup/vtbackup.go
@@ -68,6 +68,7 @@ import (
"strings"
"time"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/exit"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqlescape"
@@ -200,7 +201,7 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back
if err != nil {
return fmt.Errorf("failed to initialize mysql config: %v", err)
}
- initCtx, initCancel := context.WithTimeout(ctx, *mysqlTimeout)
+ initCtx, initCancel := context2.WithTimeout(ctx, *mysqlTimeout, "./go/cmd/vtbackup/vtbackup.go:203")
defer initCancel()
if err := mysqld.Init(initCtx, mycnf, *initDBSQLFile); err != nil {
return fmt.Errorf("failed to initialize mysql data dir and start mysqld: %v", err)
@@ -209,7 +210,7 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back
defer func() {
// Be careful not to use the original context, because we don't want to
// skip shutdown just because we timed out waiting for other things.
- ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+ ctx, cancel := context2.WithTimeout(context.Background(), 30*time.Second, "./go/cmd/vtbackup/vtbackup.go:212")
defer (func() { log.Infof("ctx cancel at go/cmd/vtbackup/vtbackup.go:213\n"); cancel() })()
mysqld.Shutdown(ctx, mycnf, false)
}()
diff --git a/go/cmd/vtbench/vtbench.go b/go/cmd/vtbench/vtbench.go
index af6392d87..14e917723 100644
--- a/go/cmd/vtbench/vtbench.go
+++ b/go/cmd/vtbench/vtbench.go
@@ -23,6 +23,7 @@ import (
"strings"
"time"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/exit"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/log"
@@ -156,7 +157,7 @@ func main() {
b := vtbench.NewBench(*threads, *count, connParams, *sql)
- ctx, cancel := context.WithTimeout(context.Background(), *deadline)
+ ctx, cancel := context2.WithTimeout(context.Background(), *deadline, "./go/cmd/vtbench/vtbench.go:159")
defer (func() { log.Infof("ctx cancel at go/cmd/vtbench/vtbench.go:160\n"); cancel() })()
fmt.Printf("Initializing test with %s protocol / %d threads / %d iterations\n",
diff --git a/go/cmd/vtclient/vtclient.go b/go/cmd/vtclient/vtclient.go
index 3da6f8738..01ae2540e 100644
--- a/go/cmd/vtclient/vtclient.go
+++ b/go/cmd/vtclient/vtclient.go
@@ -30,6 +30,7 @@ import (
"time"
"github.com/olekukonko/tablewriter"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
@@ -183,7 +184,7 @@ func run() (*results, error) {
log.Infof("Sending the query...")
- ctx, cancel := context.WithTimeout(context.Background(), *timeout)
+ ctx, cancel := context2.WithTimeout(context.Background(), *timeout, "./go/cmd/vtclient/vtclient.go:186")
defer (func() { log.Infof("ctx cancel at go/cmd/vtclient/vtclient.go:187\n"); cancel() })()
return execMulti(ctx, db, args[0])
}
diff --git a/go/cmd/vtctl/vtctl.go b/go/cmd/vtctl/vtctl.go
index 8365b9c0a..b8ebf5b4e 100644
--- a/go/cmd/vtctl/vtctl.go
+++ b/go/cmd/vtctl/vtctl.go
@@ -27,6 +27,7 @@ import (
"time"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/exit"
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/log"
@@ -91,7 +92,7 @@ func main() {
vtctl.WorkflowManager = workflow.NewManager(ts)
- ctx, cancel := context.WithTimeout(context.Background(), *waitTime)
+ ctx, cancel := context2.WithTimeout(context.Background(), *waitTime, "./go/cmd/vtctl/vtctl.go:94")
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient())
installSignalHandlers(cancel)
diff --git a/go/cmd/vtctlclient/main.go b/go/cmd/vtctlclient/main.go
index d3b867754..3569d791a 100644
--- a/go/cmd/vtctlclient/main.go
+++ b/go/cmd/vtctlclient/main.go
@@ -23,6 +23,7 @@ import (
"time"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/exit"
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/log"
@@ -55,7 +56,7 @@ func main() {
os.Exit(1)
}
- ctx, cancel := context.WithTimeout(context.Background(), *actionTimeout)
+ ctx, cancel := context2.WithTimeout(context.Background(), *actionTimeout, "./go/cmd/vtctlclient/main.go:58")
defer (func() { log.Infof("ctx cancel at go/cmd/vtctlclient/main.go:59\n"); cancel() })()
err := vtctlclient.RunCommandAndWait(
diff --git a/go/cmd/vtworkerclient/vtworkerclient.go b/go/cmd/vtworkerclient/vtworkerclient.go
index 63e10fc58..737133ca5 100644
--- a/go/cmd/vtworkerclient/vtworkerclient.go
+++ b/go/cmd/vtworkerclient/vtworkerclient.go
@@ -23,6 +23,7 @@ import (
"syscall"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/worker/vtworkerclient"
@@ -37,7 +38,7 @@ var (
func main() {
flag.Parse()
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/cmd/vtworkerclient/vtworkerclient.go:40")
defer (func() { log.Infof("ctx cancel at go/cmd/vtworkerclient/vtworkerclient.go:41\n"); cancel() })()
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT)
diff --git a/go/cmd/zk/zkcmd.go b/go/cmd/zk/zkcmd.go
index b615a69b1..895b39de2 100644
--- a/go/cmd/zk/zkcmd.go
+++ b/go/cmd/zk/zkcmd.go
@@ -36,6 +36,7 @@ import (
"golang.org/x/crypto/ssh/terminal"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/exit"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
@@ -157,7 +158,7 @@ func main() {
subFlags := flag.NewFlagSet(cmdName, flag.ExitOnError)
// Create a context for the command, cancel it if we get a signal.
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/cmd/zk/zkcmd.go:160")
sigRecv := make(chan os.Signal, 1)
signal.Notify(sigRecv, os.Interrupt)
go func() {
diff --git a/go/context2/context.go b/go/context2/context.go
new file mode 100644
index 000000000..b4a03a28d
--- /dev/null
+++ b/go/context2/context.go
@@ -0,0 +1,40 @@
+package context2
+
+import (
+ "context"
+ "time"
+
+ "vitess.io/vitess/go/context2"
+ "vitess.io/vitess/go/vt/log"
+)
+
+type Context2 struct {
+ context.Context
+ reason string
+}
+
+func WithCancel(parent context.Context, reason string) (context.Context, context.CancelFunc) {
+ ctx, cancel := context2.WithCancel(parent, "./go/context2/context.go:15")
+ ctx2 := Context2{ctx, reason}
+ return ctx2, cancel
+}
+
+func WithDeadline(parent context.Context, d time.Time, reason string) (context.Context, context.CancelFunc) {
+ ctx, cancel := context2.WithDeadline(parent, d, "./go/context2/context.go:21")
+ ctx2 := Context2{ctx, reason}
+ return ctx2, cancel
+}
+
+func WithTimeout(parent context.Context, timeout time.Duration, reason string) (context.Context, context.CancelFunc) {
+ ctx, cancel := context2.WithTimeout(parent, timeout, "./go/context2/context.go:27")
+ ctx2 := Context2{ctx, reason}
+ return ctx2, cancel
+}
+
+func (c Context2) Err() error {
+ err := c.Context.Err()
+ if err != nil {
+ log.Infof("Context errored due to: %s", c.reason)
+ }
+ return err
+}
diff --git a/go/mysql/client_test.go b/go/mysql/client_test.go
index 13d25073e..3fa8aea13 100644
--- a/go/mysql/client_test.go
+++ b/go/mysql/client_test.go
@@ -26,6 +26,7 @@ import (
"time"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
)
// assertSQLError makes sure we get the right error.
@@ -71,7 +72,7 @@ func TestConnectTimeout(t *testing.T) {
defer listener.Close()
// Test that canceling the context really interrupts the Connect.
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/mysql/client_test.go:74")
done := make(chan struct{})
go func() {
_, err := Connect(ctx, params)
@@ -85,7 +86,7 @@ func TestConnectTimeout(t *testing.T) {
<-done
// Tests a connection timeout works.
- ctx, cancel = context.WithTimeout(context.Background(), 100*time.Millisecond)
+ ctx, cancel = context2.WithTimeout(context.Background(), 100*time.Millisecond, "./go/mysql/client_test.go:88")
_, err = Connect(ctx, params)
cancel()
if err != context.DeadlineExceeded {
diff --git a/go/mysql/server_test.go b/go/mysql/server_test.go
index e4680497e..b582f6cda 100644
--- a/go/mysql/server_test.go
+++ b/go/mysql/server_test.go
@@ -29,6 +29,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/sqltypes"
vtenv "vitess.io/vitess/go/vt/env"
"vitess.io/vitess/go/vt/log"
@@ -1247,7 +1248,7 @@ func TestListenerShutdown(t *testing.T) {
Pass: "password1",
}
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/mysql/server_test.go:1250")
defer (func() { log.Infof("ctx cancel at go/mysql/server_test.go:1250\n"); cancel() })()
conn, err := Connect(ctx, params)
diff --git a/go/pools/resource_pool.go b/go/pools/resource_pool.go
index e154b7628..6e287d454 100644
--- a/go/pools/resource_pool.go
+++ b/go/pools/resource_pool.go
@@ -26,6 +26,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/sync2"
"vitess.io/vitess/go/timer"
"vitess.io/vitess/go/trace"
@@ -102,7 +103,7 @@ func NewResourcePool(factory Factory, capacity, maxCap int, idleTimeout time.Dur
rp.resources <- resourceWrapper{}
}
- ctx, cancel := context.WithTimeout(context.TODO(), prefillTimeout)
+ ctx, cancel := context2.WithTimeout(context.TODO(), prefillTimeout, "./go/pools/resource_pool.go:105")
defer (func() { log.Infof("ctx cancel at go/pools/resource_pool.go:104\n"); cancel() })()
if prefillParallelism != 0 {
sem := sync2.NewSemaphore(prefillParallelism, 0 /* timeout */)
diff --git a/go/pools/resource_pool_test.go b/go/pools/resource_pool_test.go
index f9a7c9ac3..13ac38c0b 100644
--- a/go/pools/resource_pool_test.go
+++ b/go/pools/resource_pool_test.go
@@ -22,6 +22,7 @@ import (
"time"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/sync2"
)
@@ -618,7 +619,7 @@ func TestTimeout(t *testing.T) {
if err != nil {
t.Fatal(err)
}
- newctx, cancel := context.WithTimeout(ctx, 1*time.Millisecond)
+ newctx, cancel := context2.WithTimeout(ctx, 1*time.Millisecond, "./go/pools/resource_pool_test.go:621")
_, err = p.Get(newctx)
cancel()
want := "resource pool timed out"
@@ -633,7 +634,7 @@ func TestExpired(t *testing.T) {
count.Set(0)
p := NewResourcePool(PoolFactory, 1, 1, time.Second, 0)
defer p.Close()
- ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(-1*time.Second))
+ ctx, cancel := context2.WithDeadline(context.Background(), time.Now().Add(-1*time.Second), "./go/pools/resource_pool_test.go:636")
r, err := p.Get(ctx)
if err == nil {
p.Put(r)
diff --git a/go/vt/binlog/binlog_streamer_test.go b/go/vt/binlog/binlog_streamer_test.go
index 576e7b562..9a10fb38d 100644
--- a/go/vt/binlog/binlog_streamer_test.go
+++ b/go/vt/binlog/binlog_streamer_test.go
@@ -26,6 +26,7 @@ import (
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/mysql"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
@@ -193,7 +194,7 @@ func TestStreamerStop(t *testing.T) {
bls := NewStreamer(&mysql.ConnParams{DbName: "vt_test_keyspace"}, nil, nil, mysql.Position{}, 0, sendTransaction)
// Start parseEvents(), but don't send it anything, so it just waits.
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/binlog/binlog_streamer_test.go:196")
done := make(chan error)
go func() {
_, err := bls.parseEvents(ctx, events)
diff --git a/go/vt/binlog/binlogplayer/binlog_player_test.go b/go/vt/binlog/binlogplayer/binlog_player_test.go
index dedcfcc08..f95aa2c0c 100644
--- a/go/vt/binlog/binlogplayer/binlog_player_test.go
+++ b/go/vt/binlog/binlogplayer/binlog_player_test.go
@@ -22,6 +22,7 @@ import (
"time"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/throttler"
@@ -306,7 +307,7 @@ func TestRetryOnDeadlock(t *testing.T) {
// has finished. Otherwise, it may cause race with other tests.
func applyEvents(blp *BinlogPlayer) func() error {
errChan := make(chan error)
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/binlog/binlogplayer/binlog_player_test.go:309")
go func() {
errChan <- blp.ApplyBinlogEvents(ctx)
diff --git a/go/vt/binlog/slave_connection.go b/go/vt/binlog/slave_connection.go
index 0e744d4cd..9d00ef03b 100644
--- a/go/vt/binlog/slave_connection.go
+++ b/go/vt/binlog/slave_connection.go
@@ -22,6 +22,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/pools"
"vitess.io/vitess/go/vt/dbconfigs"
@@ -96,7 +97,7 @@ var slaveIDPool = pools.NewIDPool()
// StartBinlogDumpFromCurrent requests a replication binlog dump from
// the current position.
func (sc *SlaveConnection) StartBinlogDumpFromCurrent(ctx context.Context) (mysql.Position, <-chan mysql.BinlogEvent, error) {
- ctx, sc.cancel = context.WithCancel(ctx)
+ ctx, sc.cancel = context2.WithCancel(ctx, "./go/vt/binlog/slave_connection.go:99")
masterPosition, err := sc.Conn.MasterPosition()
if err != nil {
@@ -116,7 +117,7 @@ func (sc *SlaveConnection) StartBinlogDumpFromCurrent(ctx context.Context) (mysq
//
// Note the context is valid and used until eventChan is closed.
func (sc *SlaveConnection) StartBinlogDumpFromPosition(ctx context.Context, startPos mysql.Position) (<-chan mysql.BinlogEvent, error) {
- ctx, sc.cancel = context.WithCancel(ctx)
+ ctx, sc.cancel = context2.WithCancel(ctx, "./go/vt/binlog/slave_connection.go:119")
log.Infof("sending binlog dump command: startPos=%v, slaveID=%v", startPos, sc.slaveID)
if err := sc.SendBinlogDumpCommand(sc.slaveID, startPos); err != nil {
@@ -193,7 +194,7 @@ func (sc *SlaveConnection) streamEvents(ctx context.Context) chan mysql.BinlogEv
//
// Note the context is valid and used until eventChan is closed.
func (sc *SlaveConnection) StartBinlogDumpFromBinlogBeforeTimestamp(ctx context.Context, timestamp int64) (<-chan mysql.BinlogEvent, error) {
- ctx, sc.cancel = context.WithCancel(ctx)
+ ctx, sc.cancel = context2.WithCancel(ctx, "./go/vt/binlog/slave_connection.go:196")
filename, err := sc.findFileBeforeTimestamp(ctx, timestamp)
if err != nil {
diff --git a/go/vt/binlog/updatestreamctl.go b/go/vt/binlog/updatestreamctl.go
index 3525c28a2..faea9bd5f 100644
--- a/go/vt/binlog/updatestreamctl.go
+++ b/go/vt/binlog/updatestreamctl.go
@@ -22,6 +22,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/sync2"
@@ -266,7 +267,7 @@ func (updateStream *UpdateStreamImpl) StreamKeyRange(ctx context.Context, positi
return fmt.Errorf("newKeyspaceIDResolverFactory failed: %v", err)
}
- streamCtx, cancel := context.WithCancel(ctx)
+ streamCtx, cancel := context2.WithCancel(ctx, "./go/vt/binlog/updatestreamctl.go:269")
i := updateStream.streams.Add(cancel)
defer updateStream.streams.Delete(i)
@@ -302,7 +303,7 @@ func (updateStream *UpdateStreamImpl) StreamTables(ctx context.Context, position
})
bls := NewStreamer(updateStream.cp, updateStream.se, charset, pos, 0, f)
- streamCtx, cancel := context.WithCancel(ctx)
+ streamCtx, cancel := context2.WithCancel(ctx, "./go/vt/binlog/updatestreamctl.go:305")
i := updateStream.streams.Add(cancel)
defer updateStream.streams.Delete(i)
diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go
index 65b7d2cb7..af7c994d5 100644
--- a/go/vt/discovery/healthcheck.go
+++ b/go/vt/discovery/healthcheck.go
@@ -51,6 +51,7 @@ import (
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/netutil"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/sync2"
@@ -495,7 +496,7 @@ func (hc *HealthCheckImpl) finalizeConn(hcc *healthCheckConn) {
if hcc.conn != nil {
// Don't use hcc.ctx because it's already closed.
// Use a separate context, and add a timeout to prevent unbounded waits.
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ ctx, cancel := context2.WithTimeout(context.Background(), 10*time.Second, "./go/vt/discovery/healthcheck.go:498")
defer (func() { log.Infof("ctx cancel at go/vt/discovery/healthcheck.go:499\n"); cancel() })()
hcc.conn.Close(ctx)
hcc.conn = nil
@@ -513,7 +514,7 @@ func (hc *HealthCheckImpl) checkConn(hcc *healthCheckConn, name string) {
retryDelay := hc.retryDelay
for {
- streamCtx, streamCancel := context.WithCancel(hcc.ctx)
+ streamCtx, streamCancel := context2.WithCancel(hcc.ctx, "./go/vt/discovery/healthcheck.go:516")
// Setup a watcher that restarts the timer every time an update is received.
// If a timeout occurs for a serving tablet, we make it non-serving and send
@@ -717,7 +718,7 @@ func (hc *HealthCheckImpl) SetListener(listener HealthCheckStatsListener, sendDo
// It does not block on making connection.
// name is an optional tag for the tablet, e.g. an alternative address.
func (hc *HealthCheckImpl) AddTablet(tablet *topodatapb.Tablet, name string) {
- ctx, cancelFunc := context.WithCancel(context.Background())
+ ctx, cancelFunc := context2.WithCancel(context.Background(), "./go/vt/discovery/healthcheck.go:720")
key := TabletToMapKey(tablet)
hcc := &healthCheckConn{
ctx: ctx,
diff --git a/go/vt/discovery/tablet_stats_cache_wait_test.go b/go/vt/discovery/tablet_stats_cache_wait_test.go
index 19933494f..cffab6b4d 100644
--- a/go/vt/discovery/tablet_stats_cache_wait_test.go
+++ b/go/vt/discovery/tablet_stats_cache_wait_test.go
@@ -22,6 +22,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/topo"
querypb "vitess.io/vitess/go/vt/proto/query"
@@ -29,7 +30,7 @@ import (
)
func TestWaitForTablets(t *testing.T) {
- shortCtx, shortCancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
+ shortCtx, shortCancel := context2.WithTimeout(context.Background(), 10*time.Millisecond, "./go/vt/discovery/tablet_stats_cache_wait_test.go:32")
defer shortCancel()
waitAvailableTabletInterval = 20 * time.Millisecond
@@ -48,7 +49,7 @@ func TestWaitForTablets(t *testing.T) {
}
// this should fail, but return a non-timeout error
- cancelledCtx, cancel := context.WithCancel(context.Background())
+ cancelledCtx, cancel := context2.WithCancel(context.Background(), "./go/vt/discovery/tablet_stats_cache_wait_test.go:51")
cancel()
if err := tsc.WaitForTablets(cancelledCtx, "cell", "keyspace", "shard", topodatapb.TabletType_REPLICA); err == nil || err == context.DeadlineExceeded {
t.Errorf("want: non-timeout error, got: %v", err)
@@ -67,7 +68,7 @@ func TestWaitForTablets(t *testing.T) {
input <- shr
// and ask again, with longer time outs so it's not flaky
- longCtx, longCancel := context.WithTimeout(context.Background(), 10*time.Second)
+ longCtx, longCancel := context2.WithTimeout(context.Background(), 10*time.Second, "./go/vt/discovery/tablet_stats_cache_wait_test.go:70")
defer longCancel()
waitAvailableTabletInterval = 10 * time.Millisecond
if err := tsc.WaitForTablets(longCtx, "cell", "keyspace", "shard", topodatapb.TabletType_REPLICA); err != nil {
diff --git a/go/vt/discovery/topology_watcher.go b/go/vt/discovery/topology_watcher.go
index a5a0ca971..8ef10d24c 100644
--- a/go/vt/discovery/topology_watcher.go
+++ b/go/vt/discovery/topology_watcher.go
@@ -26,6 +26,7 @@ import (
"time"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/trace"
@@ -153,7 +154,7 @@ func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, tr TabletR
// We want the span from the context, but not the cancelation that comes with it
spanContext := trace.CopySpan(context.Background(), ctx)
- tw.ctx, tw.cancelFunc = context.WithCancel(spanContext)
+ tw.ctx, tw.cancelFunc = context2.WithCancel(spanContext, "./go/vt/discovery/topology_watcher.go:156")
tw.wg.Add(1)
go tw.watch()
return tw
diff --git a/go/vt/grpcclient/client_test.go b/go/vt/grpcclient/client_test.go
index 106a0937e..306040c04 100644
--- a/go/vt/grpcclient/client_test.go
+++ b/go/vt/grpcclient/client_test.go
@@ -24,6 +24,7 @@ import (
"google.golang.org/grpc"
+ "vitess.io/vitess/go/context2"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
vtgateservicepb "vitess.io/vitess/go/vt/proto/vtgateservice"
)
@@ -41,7 +42,7 @@ func TestDialErrors(t *testing.T) {
t.Fatal(err)
}
vtg := vtgateservicepb.NewVitessClient(gconn)
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
+ ctx, cancel := context2.WithTimeout(context.Background(), 10*time.Millisecond, "./go/vt/grpcclient/client_test.go:44")
_, err = vtg.Execute(ctx, &vtgatepb.ExecuteRequest{})
cancel()
gconn.Close()
@@ -57,7 +58,7 @@ func TestDialErrors(t *testing.T) {
t.Fatal(err)
}
vtg := vtgateservicepb.NewVitessClient(gconn)
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
+ ctx, cancel := context2.WithTimeout(context.Background(), 10*time.Millisecond, "./go/vt/grpcclient/client_test.go:60")
_, err = vtg.Execute(ctx, &vtgatepb.ExecuteRequest{})
cancel()
gconn.Close()
diff --git a/go/vt/mysqlctl/query.go b/go/vt/mysqlctl/query.go
index e4c200abb..645996790 100644
--- a/go/vt/mysqlctl/query.go
+++ b/go/vt/mysqlctl/query.go
@@ -23,6 +23,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/dbconnpool"
@@ -162,7 +163,7 @@ func (mysqld *Mysqld) killConnection(connID int64) error {
// Get another connection with which to kill.
// Use background context because the caller's context is likely expired,
// which is the reason we're being asked to kill the connection.
- ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
+ ctx, cancel := context2.WithTimeout(context.Background(), 1*time.Second, "./go/vt/mysqlctl/query.go:165")
defer (func() { log.Infof("ctx cancel at go/vt/mysqlctl/query.go:166\n"); cancel() })()
if poolConn, connErr := getPoolReconnect(ctx, mysqld.dbaPool); connErr == nil {
// We got a pool connection.
diff --git a/go/vt/schemamanager/schemaswap/schema_swap.go b/go/vt/schemamanager/schemaswap/schema_swap.go
index d7688832c..82e0bb6a4 100644
--- a/go/vt/schemamanager/schemaswap/schema_swap.go
+++ b/go/vt/schemamanager/schemaswap/schema_swap.go
@@ -29,6 +29,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/discovery"
@@ -882,7 +883,7 @@ func (array orderTabletsForSwap) Less(i, j int) bool {
// executeAdminQuery executes a query on a given tablet as 'allprivs' user. The query is executed
// using timeout value from --schema_swap_admin_query_timeout flag.
func (shardSwap *shardSchemaSwap) executeAdminQuery(tablet *topodatapb.Tablet, query string, maxRows int) (*sqltypes.Result, error) {
- sqlCtx, cancelSQLCtx := context.WithTimeout(shardSwap.parent.ctx, *adminQueryTimeout)
+ sqlCtx, cancelSQLCtx := context2.WithTimeout(shardSwap.parent.ctx, *adminQueryTimeout, "./go/vt/schemamanager/schemaswap/schema_swap.go:885")
defer (func() {
log.Infof("ctx cancel at go/vt/schemamanager/schemaswap/schema_swap.go:886\n")
cancelSQLCtx()
diff --git a/go/vt/schemamanager/tablet_executor.go b/go/vt/schemamanager/tablet_executor.go
index 6c7514223..d4f8f96ab 100644
--- a/go/vt/schemamanager/tablet_executor.go
+++ b/go/vt/schemamanager/tablet_executor.go
@@ -23,6 +23,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/sync2"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/sqlparser"
@@ -253,7 +254,7 @@ func (exec *TabletExecutor) executeOnAllTablets(ctx context.Context, execResult
// execute the schema change via replication. This is best-effort, meaning
// we still return overall success if the timeout expires.
concurrency := sync2.NewSemaphore(10, 0)
- reloadCtx, cancel := context.WithTimeout(ctx, exec.waitSlaveTimeout)
+ reloadCtx, cancel := context2.WithTimeout(ctx, exec.waitSlaveTimeout, "./go/vt/schemamanager/tablet_executor.go:256")
defer (func() { log.Infof("ctx cancel at go/vt/schemamanager/tablet_executor.go:256\n"); cancel() })()
for _, result := range execResult.SuccessShards {
wg.Add(1)
diff --git a/go/vt/srvtopo/resilient_server_flaky_test.go b/go/vt/srvtopo/resilient_server_flaky_test.go
index bad1800ce..2b197b4e4 100644
--- a/go/vt/srvtopo/resilient_server_flaky_test.go
+++ b/go/vt/srvtopo/resilient_server_flaky_test.go
@@ -27,6 +27,7 @@ import (
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/status"
"vitess.io/vitess/go/vt/topo"
@@ -309,7 +310,7 @@ func TestGetSrvKeyspace(t *testing.T) {
time.Sleep(*srvTopoCacheTTL)
- timeoutCtx, _ := context.WithTimeout(context.Background(), *srvTopoCacheRefresh*2)
+ timeoutCtx, _ := context2.WithTimeout(context.Background(), *srvTopoCacheRefresh*2, "./go/vt/srvtopo/resilient_server_flaky_test.go:312")
_, err = rs.GetSrvKeyspace(timeoutCtx, "test_cell", "test_ks")
wantErr := "timed out waiting for keyspace"
if err == nil || err.Error() != wantErr {
@@ -347,7 +348,7 @@ func TestSrvKeyspaceCachedError(t *testing.T) {
time.Sleep(*srvTopoCacheTTL + 10*time.Millisecond)
// Ask again with a different context, should get an error and
// save that context.
- ctx, cancel := context.WithCancel(ctx)
+ ctx, cancel := context2.WithCancel(ctx, "./go/vt/srvtopo/resilient_server_flaky_test.go:350")
defer (func() { log.Infof("ctx cancel at go/vt/srvtopo/resilient_server_flaky_test.go:350\n"); cancel() })()
_, err2 := rs.GetSrvKeyspace(ctx, "test_cell", "unknown_ks")
if err2 == nil {
@@ -599,7 +600,7 @@ func TestGetSrvKeyspaceNames(t *testing.T) {
time.Sleep(*srvTopoCacheTTL)
- timeoutCtx, _ := context.WithTimeout(context.Background(), *srvTopoCacheRefresh*2)
+ timeoutCtx, _ := context2.WithTimeout(context.Background(), *srvTopoCacheRefresh*2, "./go/vt/srvtopo/resilient_server_flaky_test.go:602")
_, err = rs.GetSrvKeyspaceNames(timeoutCtx, "test_cell")
wantErr := "timed out waiting for keyspace names"
if err == nil || err.Error() != wantErr {
diff --git a/go/vt/topo/consultopo/election.go b/go/vt/topo/consultopo/election.go
index bacbbac8d..65455b1c8 100644
--- a/go/vt/topo/consultopo/election.go
+++ b/go/vt/topo/consultopo/election.go
@@ -22,6 +22,7 @@ import (
"golang.org/x/net/context"
"github.com/hashicorp/consul/api"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"
)
@@ -90,7 +91,7 @@ func (mp *consulMasterParticipation) WaitForMastership() (context.Context, error
}
// We have the lock, keep mastership until we lose it.
- lockCtx, lockCancel := context.WithCancel(context.Background())
+ lockCtx, lockCancel := context2.WithCancel(context.Background(), "./go/vt/topo/consultopo/election.go:93")
go func() {
select {
case <-lost:
diff --git a/go/vt/topo/consultopo/watch.go b/go/vt/topo/consultopo/watch.go
index 8021738ca..de4ca89b9 100644
--- a/go/vt/topo/consultopo/watch.go
+++ b/go/vt/topo/consultopo/watch.go
@@ -24,6 +24,7 @@ import (
"golang.org/x/net/context"
"github.com/hashicorp/consul/api"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/topo"
)
@@ -51,7 +52,7 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, <
}
// Create a context, will be used to cancel the watch.
- watchCtx, watchCancel := context.WithCancel(context.Background())
+ watchCtx, watchCancel := context2.WithCancel(context.Background(), "./go/vt/topo/consultopo/watch.go:54")
// Create the notifications channel, send updates to it.
notifications := make(chan *topo.WatchData, 10)
diff --git a/go/vt/topo/etcd2topo/election.go b/go/vt/topo/etcd2topo/election.go
index 354c5e499..ff3b66256 100644
--- a/go/vt/topo/etcd2topo/election.go
+++ b/go/vt/topo/etcd2topo/election.go
@@ -22,6 +22,7 @@ import (
"github.com/coreos/etcd/clientv3"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"
)
@@ -73,7 +74,7 @@ func (mp *etcdMasterParticipation) WaitForMastership() (context.Context, error)
// We use a cancelable context here. If stop is closed,
// we just cancel that context.
- lockCtx, lockCancel := context.WithCancel(context.Background())
+ lockCtx, lockCancel := context2.WithCancel(context.Background(), "./go/vt/topo/etcd2topo/election.go:76")
go func() {
<-mp.stop
if ld != nil {
diff --git a/go/vt/topo/etcd2topo/lock.go b/go/vt/topo/etcd2topo/lock.go
index 471bac495..966e31f11 100644
--- a/go/vt/topo/etcd2topo/lock.go
+++ b/go/vt/topo/etcd2topo/lock.go
@@ -24,6 +24,7 @@ import (
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
@@ -86,7 +87,7 @@ func (s *Server) waitOnLastRev(ctx context.Context, cli *clientv3.Client, nodePa
// Wait for release on blocking key. Cancel the watch when we
// exit this function.
key := string(lastKey.Kvs[0].Key)
- ctx, cancel := context.WithCancel(ctx)
+ ctx, cancel := context2.WithCancel(ctx, "./go/vt/topo/etcd2topo/lock.go:89")
defer (func() { log.Infof("ctx cancel at go/vt/topo/etcd2topo/lock.go:90\n"); cancel() })()
wc := cli.Watch(ctx, key, clientv3.WithRev(revision))
if wc == nil {
diff --git a/go/vt/topo/etcd2topo/server_test.go b/go/vt/topo/etcd2topo/server_test.go
index 0254878c5..098295dc9 100644
--- a/go/vt/topo/etcd2topo/server_test.go
+++ b/go/vt/topo/etcd2topo/server_test.go
@@ -28,6 +28,7 @@ import (
"golang.org/x/net/context"
"github.com/coreos/etcd/clientv3"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/testfiles"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"
@@ -74,7 +75,7 @@ func startEtcd(t *testing.T) (*exec.Cmd, string, string) {
}
// Wait until we can list "/", or timeout.
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ ctx, cancel := context2.WithTimeout(context.Background(), 10*time.Second, "./go/vt/topo/etcd2topo/server_test.go:77")
defer (func() { log.Infof("ctx cancel at go/vt/topo/etcd2topo/server_test.go:77\n"); cancel() })()
start := time.Now()
for {
diff --git a/go/vt/topo/etcd2topo/watch.go b/go/vt/topo/etcd2topo/watch.go
index 706fd994e..cb024798c 100644
--- a/go/vt/topo/etcd2topo/watch.go
+++ b/go/vt/topo/etcd2topo/watch.go
@@ -23,6 +23,7 @@ import (
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
@@ -50,10 +51,10 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, <
}
// Create an outer context that will be canceled on return and will cancel all inner watches.
- outerCtx, outerCancel := context.WithCancel(context.Background())
+ outerCtx, outerCancel := context2.WithCancel(context.Background(), "./go/vt/topo/etcd2topo/watch.go:53")
// Create a context, will be used to cancel the watch on retry.
- watchCtx, watchCancel := context.WithCancel(outerCtx)
+ watchCtx, watchCancel := context2.WithCancel(outerCtx, "./go/vt/topo/etcd2topo/watch.go:56")
// Create the Watcher. We start watching from the response we
// got, not from the file original version, as the server may
@@ -87,7 +88,7 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, <
watchRetries++
// Cancel inner context on retry and create new one.
watchCancel()
- watchCtx, watchCancel = context.WithCancel(outerCtx)
+ watchCtx, watchCancel = context2.WithCancel(outerCtx, "./go/vt/topo/etcd2topo/watch.go:90")
newWatcher := s.cli.Watch(watchCtx, nodePath, clientv3.WithRev(currVersion))
if newWatcher == nil {
log.Warningf("watch %v failed and get a nil channel returned, currVersion: %v", nodePath, currVersion)
diff --git a/go/vt/topo/locks.go b/go/vt/topo/locks.go
index 220dd1fcb..8cdc58d29 100644
--- a/go/vt/topo/locks.go
+++ b/go/vt/topo/locks.go
@@ -26,6 +26,7 @@ import (
"time"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
@@ -217,7 +218,7 @@ func CheckKeyspaceLocked(ctx context.Context, keyspace string) error {
func (l *Lock) lockKeyspace(ctx context.Context, ts *Server, keyspace string) (LockDescriptor, error) {
log.Infof("Locking keyspace %v for action %v", keyspace, l.Action)
- ctx, cancel := context.WithTimeout(ctx, *RemoteOperationTimeout)
+ ctx, cancel := context2.WithTimeout(ctx, *RemoteOperationTimeout, "./go/vt/topo/locks.go:220")
defer (func() { log.Infof("ctx cancel at go/vt/topo/locks.go:221\n"); cancel() })()
span, ctx := trace.NewSpan(ctx, "TopoServer.LockKeyspaceForAction")
@@ -241,7 +242,7 @@ func (l *Lock) unlockKeyspace(ctx context.Context, ts *Server, keyspace string,
// Note that we are not using the user provided RemoteOperationTimeout
// here because it is possible that that timeout is too short.
ctx = trace.CopySpan(context.TODO(), ctx)
- ctx, cancel := context.WithTimeout(ctx, defaultLockTimeout)
+ ctx, cancel := context2.WithTimeout(ctx, defaultLockTimeout, "./go/vt/topo/locks.go:244")
defer (func() { log.Infof("ctx cancel at go/vt/topo/locks.go:245\n"); cancel() })()
span, ctx := trace.NewSpan(ctx, "TopoServer.UnlockKeyspaceForAction")
@@ -358,7 +359,7 @@ func CheckShardLocked(ctx context.Context, keyspace, shard string) error {
func (l *Lock) lockShard(ctx context.Context, ts *Server, keyspace, shard string) (LockDescriptor, error) {
log.Infof("Locking shard %v/%v for action %v", keyspace, shard, l.Action)
- ctx, cancel := context.WithTimeout(ctx, *RemoteOperationTimeout)
+ ctx, cancel := context2.WithTimeout(ctx, *RemoteOperationTimeout, "./go/vt/topo/locks.go:361")
defer (func() { log.Infof("ctx cancel at go/vt/topo/locks.go:362\n"); cancel() })()
span, ctx := trace.NewSpan(ctx, "TopoServer.LockShardForAction")
@@ -382,7 +383,7 @@ func (l *Lock) unlockShard(ctx context.Context, ts *Server, keyspace, shard stri
// Note that we are not using the user provided RemoteOperationTimeout
// here because it is possible that that timeout is too short.
ctx = trace.CopySpan(context.TODO(), ctx)
- ctx, cancel := context.WithTimeout(ctx, defaultLockTimeout)
+ ctx, cancel := context2.WithTimeout(ctx, defaultLockTimeout, "./go/vt/topo/locks.go:385")
defer (func() { log.Infof("ctx cancel at go/vt/topo/locks.go:386\n"); cancel() })()
span, ctx := trace.NewSpan(ctx, "TopoServer.UnlockShardForAction")
diff --git a/go/vt/topo/memorytopo/election.go b/go/vt/topo/memorytopo/election.go
index f66154a02..16db7ccb7 100644
--- a/go/vt/topo/memorytopo/election.go
+++ b/go/vt/topo/memorytopo/election.go
@@ -21,6 +21,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"
)
@@ -81,7 +82,7 @@ func (mp *cMasterParticipation) WaitForMastership() (context.Context, error) {
// We use a cancelable context here. If stop is closed,
// we just cancel that context.
- lockCtx, lockCancel := context.WithCancel(context.Background())
+ lockCtx, lockCancel := context2.WithCancel(context.Background(), "./go/vt/topo/memorytopo/election.go:84")
go func() {
<-mp.stop
if ld != nil {
diff --git a/go/vt/topo/test/lock.go b/go/vt/topo/test/lock.go
index 315653ff0..ab5e60043 100644
--- a/go/vt/topo/test/lock.go
+++ b/go/vt/topo/test/lock.go
@@ -22,6 +22,7 @@ import (
"time"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/topo"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
@@ -92,14 +93,14 @@ func checkLockTimeout(ctx context.Context, t *testing.T, conn topo.Conn) {
}
// test we can't take the lock again
- fastCtx, cancel := context.WithTimeout(ctx, timeUntilLockIsTaken)
+ fastCtx, cancel := context2.WithTimeout(ctx, timeUntilLockIsTaken, "./go/vt/topo/test/lock.go:95")
if _, err := conn.Lock(fastCtx, keyspacePath, "again"); !topo.IsErrType(err, topo.Timeout) {
t.Fatalf("Lock(again): %v", err)
}
cancel()
// test we can interrupt taking the lock
- interruptCtx, cancel := context.WithCancel(ctx)
+ interruptCtx, cancel := context2.WithCancel(ctx, "./go/vt/topo/test/lock.go:102")
go func() {
time.Sleep(timeUntilLockIsTaken)
cancel()
diff --git a/go/vt/topo/zk2topo/election.go b/go/vt/topo/zk2topo/election.go
index 86639411f..92509aa60 100644
--- a/go/vt/topo/zk2topo/election.go
+++ b/go/vt/topo/zk2topo/election.go
@@ -22,6 +22,7 @@ import (
"github.com/z-division/go-zookeeper/zk"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/log"
@@ -49,7 +50,7 @@ func (zs *Server) NewMasterParticipation(name, id string) (topo.MasterParticipat
id: []byte(id),
done: make(chan struct{}),
}
- result.stopCtx, result.stopCtxCancel = context.WithCancel(context.Background())
+ result.stopCtx, result.stopCtxCancel = context2.WithCancel(context.Background(), "./go/vt/topo/zk2topo/election.go:52")
return result, nil
}
@@ -122,7 +123,7 @@ func (mp *zkMasterParticipation) WaitForMastership() (context.Context, error) {
}
// we got the lock, create our background context
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/topo/zk2topo/election.go:125")
go mp.watchMastership(ctx, mp.zs.conn, proposal, cancel)
return ctx, nil
}
diff --git a/go/vt/vtctl/throttler.go b/go/vt/vtctl/throttler.go
index 1d5db1d96..2831235d2 100644
--- a/go/vt/vtctl/throttler.go
+++ b/go/vt/vtctl/throttler.go
@@ -27,6 +27,7 @@ import (
"github.com/olekukonko/tablewriter"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/throttler"
@@ -87,7 +88,7 @@ func commandThrottlerMaxRates(ctx context.Context, wr *wrangler.Wrangler, subFla
}
// Connect to the server.
- ctx, cancel := context.WithTimeout(ctx, shortTimeout)
+ ctx, cancel := context2.WithTimeout(ctx, shortTimeout, "./go/vt/vtctl/throttler.go:90")
defer (func() { log.Infof("ctx cancel at go/vt/vtctl/throttler.go:90\n"); cancel() })()
client, err := throttlerclient.New(*server)
if err != nil {
@@ -140,7 +141,7 @@ func commandThrottlerSetMaxRate(ctx context.Context, wr *wrangler.Wrangler, subF
}
// Connect to the server.
- ctx, cancel := context.WithTimeout(ctx, shortTimeout)
+ ctx, cancel := context2.WithTimeout(ctx, shortTimeout, "./go/vt/vtctl/throttler.go:143")
defer (func() { log.Infof("ctx cancel at go/vt/vtctl/throttler.go:143\n"); cancel() })()
client, err := throttlerclient.New(*server)
if err != nil {
@@ -177,7 +178,7 @@ func commandGetThrottlerConfiguration(ctx context.Context, wr *wrangler.Wrangler
}
// Connect to the server.
- ctx, cancel := context.WithTimeout(ctx, shortTimeout)
+ ctx, cancel := context2.WithTimeout(ctx, shortTimeout, "./go/vt/vtctl/throttler.go:180")
defer (func() { log.Infof("ctx cancel at go/vt/vtctl/throttler.go:180\n"); cancel() })()
client, err := throttlerclient.New(*server)
if err != nil {
@@ -231,7 +232,7 @@ func commandUpdateThrottlerConfiguration(ctx context.Context, wr *wrangler.Wrang
}
// Connect to the server.
- ctx, cancel := context.WithTimeout(ctx, shortTimeout)
+ ctx, cancel := context2.WithTimeout(ctx, shortTimeout, "./go/vt/vtctl/throttler.go:234")
defer (func() { log.Infof("ctx cancel at go/vt/vtctl/throttler.go:234\n"); cancel() })()
client, err := throttlerclient.New(*server)
if err != nil {
@@ -269,7 +270,7 @@ func commandResetThrottlerConfiguration(ctx context.Context, wr *wrangler.Wrangl
}
// Connect to the server.
- ctx, cancel := context.WithTimeout(ctx, shortTimeout)
+ ctx, cancel := context2.WithTimeout(ctx, shortTimeout, "./go/vt/vtctl/throttler.go:272")
defer (func() { log.Infof("ctx cancel at go/vt/vtctl/throttler.go:272\n"); cancel() })()
client, err := throttlerclient.New(*server)
if err != nil {
diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go
index 6d9adc726..af31e63b0 100644
--- a/go/vt/vtctl/vtctl.go
+++ b/go/vt/vtctl/vtctl.go
@@ -107,6 +107,7 @@ import (
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/flagutil"
"vitess.io/vitess/go/json2"
"vitess.io/vitess/go/mysql"
@@ -1011,7 +1012,7 @@ func commandWaitForDrain(ctx context.Context, wr *wrangler.Wrangler, subFlags *f
}
if *timeout != 0 {
var cancel context.CancelFunc
- ctx, cancel = context.WithTimeout(ctx, *timeout)
+ ctx, cancel = context2.WithTimeout(ctx, *timeout, "./go/vt/vtctl/vtctl.go:1014")
defer (func() { log.Infof("ctx cancel at go/vt/vtctl/vtctl.go:1014\n"); cancel() })()
}
diff --git a/go/vt/vtctld/action_repository.go b/go/vt/vtctld/action_repository.go
index 7b484707e..c957b3280 100644
--- a/go/vt/vtctld/action_repository.go
+++ b/go/vt/vtctld/action_repository.go
@@ -24,6 +24,7 @@ import (
"golang.org/x/net/context"
"vitess.io/vitess/go/acl"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
@@ -115,7 +116,7 @@ func (ar *ActionRepository) ApplyKeyspaceAction(ctx context.Context, actionName,
return result
}
- ctx, cancel := context.WithTimeout(ctx, *actionTimeout)
+ ctx, cancel := context2.WithTimeout(ctx, *actionTimeout, "./go/vt/vtctld/action_repository.go:118")
wr := wrangler.New(logutil.NewConsoleLogger(), ar.ts, tmclient.NewTabletManagerClient())
output, err := action(ctx, wr, keyspace, r)
cancel()
@@ -142,7 +143,7 @@ func (ar *ActionRepository) ApplyShardAction(ctx context.Context, actionName, ke
return result
}
- ctx, cancel := context.WithTimeout(ctx, *actionTimeout)
+ ctx, cancel := context2.WithTimeout(ctx, *actionTimeout, "./go/vt/vtctld/action_repository.go:145")
wr := wrangler.New(logutil.NewConsoleLogger(), ar.ts, tmclient.NewTabletManagerClient())
output, err := action(ctx, wr, keyspace, shard, r)
cancel()
@@ -176,7 +177,7 @@ func (ar *ActionRepository) ApplyTabletAction(ctx context.Context, actionName st
}
// run the action
- ctx, cancel := context.WithTimeout(ctx, *actionTimeout)
+ ctx, cancel := context2.WithTimeout(ctx, *actionTimeout, "./go/vt/vtctld/action_repository.go:179")
wr := wrangler.New(logutil.NewConsoleLogger(), ar.ts, tmclient.NewTabletManagerClient())
output, err := action.method(ctx, wr, tabletAlias, r)
cancel()
diff --git a/go/vt/vtctld/tablet_data_test.go b/go/vt/vtctld/tablet_data_test.go
index edc69d985..e8b32d2d2 100644
--- a/go/vt/vtctld/tablet_data_test.go
+++ b/go/vt/vtctld/tablet_data_test.go
@@ -24,6 +24,7 @@ import (
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/logutil"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
@@ -141,7 +142,7 @@ func TestTabletData(t *testing.T) {
}()
// Start streaming and wait for the first result.
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ ctx, cancel := context2.WithTimeout(context.Background(), 5*time.Second, "./go/vt/vtctld/tablet_data_test.go:144")
result, err := thc.Get(ctx, tablet1.Tablet.Alias)
cancel()
close(stop)
diff --git a/go/vt/vtctld/workflow.go b/go/vt/vtctld/workflow.go
index 1982e9093..e95b4a5fc 100644
--- a/go/vt/vtctld/workflow.go
+++ b/go/vt/vtctld/workflow.go
@@ -21,6 +21,7 @@ import (
"time"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/flagutil"
@@ -85,7 +86,7 @@ func initWorkflowManager(ts *topo.Server) {
}
func runWorkflowManagerAlone() {
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vtctld/workflow.go:88")
go vtctl.WorkflowManager.Run(ctx)
// Running cancel on OnTermSync will cancel the context of any
diff --git a/go/vt/vtgate/buffer/buffer_test.go b/go/vt/vtgate/buffer/buffer_test.go
index 5951ebb65..f4d7074e2 100644
--- a/go/vt/vtgate/buffer/buffer_test.go
+++ b/go/vt/vtgate/buffer/buffer_test.go
@@ -26,6 +26,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"
@@ -574,7 +575,7 @@ func testRequestCanceled(t *testing.T, explicitEnd bool) {
if err := waitForRequestsInFlight(b, 1); err != nil {
t.Fatal(err)
}
- ctx2, cancel2 := context.WithCancel(context.Background())
+ ctx2, cancel2 := context2.WithCancel(context.Background(), "./go/vt/vtgate/buffer/buffer_test.go:577")
stopped2 := issueRequest(ctx2, t, b, failoverErr)
if err := waitForRequestsInFlight(b, 2); err != nil {
t.Fatal(err)
diff --git a/go/vt/vtgate/buffer/shard_buffer.go b/go/vt/vtgate/buffer/shard_buffer.go
index 05623c126..17df120db 100644
--- a/go/vt/vtgate/buffer/shard_buffer.go
+++ b/go/vt/vtgate/buffer/shard_buffer.go
@@ -24,6 +24,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/sync2"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
@@ -327,7 +328,7 @@ func (sb *shardBuffer) bufferRequestLocked(ctx context.Context) (*entry, error)
done: make(chan struct{}),
deadline: sb.now().Add(*window),
}
- e.bufferCtx, e.bufferCancel = context.WithCancel(ctx)
+ e.bufferCtx, e.bufferCancel = context2.WithCancel(ctx, "./go/vt/vtgate/buffer/shard_buffer.go:330")
sb.queue = append(sb.queue, e)
if max := lastRequestsInFlightMax.Counts()[sb.statsKeyJoined]; max < int64(len(sb.queue)) {
diff --git a/go/vt/vtgate/endtoend/vstream_test.go b/go/vt/vtgate/endtoend/vstream_test.go
index fe5956ee0..1bdecfc06 100644
--- a/go/vt/vtgate/endtoend/vstream_test.go
+++ b/go/vt/vtgate/endtoend/vstream_test.go
@@ -22,6 +22,7 @@ import (
"testing"
"github.com/golang/protobuf/proto"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
@@ -32,7 +33,7 @@ import (
)
func TestVStream(t *testing.T) {
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vtgate/endtoend/vstream_test.go:35")
defer (func() { log.Infof("ctx cancel at go/vt/vtgate/endtoend/vstream_test.go:35\n"); cancel() })()
gconn, err := vtgateconn.Dial(ctx, grpcAddress)
diff --git a/go/vt/vtgate/engine/merge_sort.go b/go/vt/vtgate/engine/merge_sort.go
index 9c6b67995..070a7fc3e 100644
--- a/go/vt/vtgate/engine/merge_sort.go
+++ b/go/vt/vtgate/engine/merge_sort.go
@@ -24,6 +24,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/log"
@@ -38,7 +39,7 @@ import (
// was pulled out. Since the input streams are sorted the same way that the heap is
// sorted, this guarantees that the merged stream will also be sorted the same way.
func mergeSort(vcursor VCursor, query string, orderBy []OrderbyParams, rss []*srvtopo.ResolvedShard, bvs []map[string]*querypb.BindVariable, callback func(*sqltypes.Result) error) error {
- ctx, cancel := context.WithCancel(vcursor.Context())
+ ctx, cancel := context2.WithCancel(vcursor.Context(), "./go/vt/vtgate/engine/merge_sort.go:41")
defer (func() { log.Infof("ctx cancel at go/vt/vtgate/engine/merge_sort.go:42\n"); cancel() })()
handles := make([]*streamHandle, len(rss))
diff --git a/go/vt/vtgate/executor_stream_test.go b/go/vt/vtgate/executor_stream_test.go
index b3e0159cd..bf5ec26db 100644
--- a/go/vt/vtgate/executor_stream_test.go
+++ b/go/vt/vtgate/executor_stream_test.go
@@ -21,6 +21,7 @@ import (
"time"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/log"
@@ -136,7 +137,7 @@ func TestStreamSQLSet(t *testing.T) {
func executorStreamMessages(executor *Executor, sql string) (qr *sqltypes.Result, err error) {
results := make(chan *sqltypes.Result, 100)
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
+ ctx, cancel := context2.WithTimeout(context.Background(), 10*time.Millisecond, "./go/vt/vtgate/executor_stream_test.go:139")
defer (func() { log.Infof("ctx cancel at go/vt/vtgate/executor_stream_test.go:89\n"); cancel() })()
err = executor.StreamExecute(
ctx,
diff --git a/go/vt/vtgate/gateway/gateway.go b/go/vt/vtgate/gateway/gateway.go
index d7e0b57bb..60b6053fc 100644
--- a/go/vt/vtgate/gateway/gateway.go
+++ b/go/vt/vtgate/gateway/gateway.go
@@ -23,6 +23,7 @@ import (
"time"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/flagutil"
"vitess.io/vitess/go/vt/log"
@@ -107,7 +108,7 @@ func GetCreator() Creator {
// just calls it.
func WaitForTablets(gw Gateway, tabletTypesToWait []topodatapb.TabletType) error {
log.Infof("Gateway waiting for serving tablets...")
- ctx, cancel := context.WithTimeout(context.Background(), *initialTabletTimeout)
+ ctx, cancel := context2.WithTimeout(context.Background(), *initialTabletTimeout, "./go/vt/vtgate/gateway/gateway.go:110")
defer (func() { log.Infof("ctx cancel at go/vt/vtgate/gateway/gateway.go:111\n"); cancel() })()
err := gw.WaitForTablets(ctx, tabletTypesToWait)
diff --git a/go/vt/vtgate/plugin_mysql_server.go b/go/vt/vtgate/plugin_mysql_server.go
index fb97beb10..1a60e6745 100644
--- a/go/vt/vtgate/plugin_mysql_server.go
+++ b/go/vt/vtgate/plugin_mysql_server.go
@@ -26,6 +26,7 @@ import (
"syscall"
"time"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
@@ -89,7 +90,7 @@ func (vh *vtgateHandler) ConnectionClosed(c *mysql.Conn) {
var ctx context.Context
var cancel context.CancelFunc
if *mysqlQueryTimeout != 0 {
- ctx, cancel = context.WithTimeout(context.Background(), *mysqlQueryTimeout)
+ ctx, cancel = context2.WithTimeout(context.Background(), *mysqlQueryTimeout, "./go/vt/vtgate/plugin_mysql_server.go:92")
defer (func() { log.Infof("ctx cancel at go/vt/vtgate/plugin_mysql_server.go:93\n"); cancel() })()
} else {
ctx = context.Background()
@@ -136,7 +137,7 @@ func (vh *vtgateHandler) ComQuery(c *mysql.Conn, query string, callback func(*sq
ctx := context.Background()
var cancel context.CancelFunc
if *mysqlQueryTimeout != 0 {
- ctx, cancel = context.WithTimeout(ctx, *mysqlQueryTimeout)
+ ctx, cancel = context2.WithTimeout(ctx, *mysqlQueryTimeout, "./go/vt/vtgate/plugin_mysql_server.go:139")
defer (func() { log.Infof("ctx cancel at go/vt/vtgate/plugin_mysql_server.go:140\n"); cancel() })()
}
@@ -203,7 +204,7 @@ func (vh *vtgateHandler) ComPrepare(c *mysql.Conn, query string) ([]*querypb.Fie
var ctx context.Context
var cancel context.CancelFunc
if *mysqlQueryTimeout != 0 {
- ctx, cancel = context.WithTimeout(context.Background(), *mysqlQueryTimeout)
+ ctx, cancel = context2.WithTimeout(context.Background(), *mysqlQueryTimeout, "./go/vt/vtgate/plugin_mysql_server.go:206")
defer (func() { log.Infof("ctx cancel at go/vt/vtgate/plugin_mysql_server.go:207\n"); cancel() })()
} else {
ctx = context.Background()
@@ -262,7 +263,7 @@ func (vh *vtgateHandler) ComStmtExecute(c *mysql.Conn, prepare *mysql.PrepareDat
var ctx context.Context
var cancel context.CancelFunc
if *mysqlQueryTimeout != 0 {
- ctx, cancel = context.WithTimeout(context.Background(), *mysqlQueryTimeout)
+ ctx, cancel = context2.WithTimeout(context.Background(), *mysqlQueryTimeout, "./go/vt/vtgate/plugin_mysql_server.go:265")
defer (func() { log.Infof("ctx cancel at go/vt/vtgate/plugin_mysql_server.go:266\n"); cancel() })()
} else {
ctx = context.Background()
diff --git a/go/vt/vtgate/resolver.go b/go/vt/vtgate/resolver.go
index b0f905c87..d1cea3e4c 100644
--- a/go/vt/vtgate/resolver.go
+++ b/go/vt/vtgate/resolver.go
@@ -27,6 +27,7 @@ import (
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
@@ -368,7 +369,7 @@ func (res *Resolver) UpdateStream(ctx context.Context, keyspace string, shard st
// VStream streams events from one target. This function ensures that events of each
// transaction are streamed together, along with the corresponding GTID.
func (res *Resolver) VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, send func(events []*binlogdatapb.VEvent) error) error {
- ctx, cancel := context.WithCancel(ctx)
+ ctx, cancel := context2.WithCancel(ctx, "./go/vt/vtgate/resolver.go:371")
defer (func() { log.Infof("ctx cancel at go/vt/vtgate/resolver.go:351\n"); cancel() })()
// mu protects sending on ch, err and positions.
diff --git a/go/vt/vtgate/resolver_test.go b/go/vt/vtgate/resolver_test.go
index ffbd93f90..d69461ed1 100644
--- a/go/vt/vtgate/resolver_test.go
+++ b/go/vt/vtgate/resolver_test.go
@@ -27,6 +27,7 @@ import (
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/key"
@@ -637,7 +638,7 @@ func TestResolverVStream(t *testing.T) {
}}
sbc0.AddVStreamEvents(send2, nil)
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vtgate/resolver_test.go:640")
defer (func() { log.Infof("ctx cancel at go/vt/vtgate/resolver_test.go:574\n"); cancel() })()
count := 1
@@ -686,7 +687,7 @@ func TestResolverVStreamChunks(t *testing.T) {
}
sbc1.AddVStreamEvents([]*binlogdatapb.VEvent{{Type: binlogdatapb.VEventType_COMMIT}}, nil)
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vtgate/resolver_test.go:689")
defer (func() { log.Infof("ctx cancel at go/vt/vtgate/resolver_test.go:623\n"); cancel() })()
rowEncountered := false
@@ -762,7 +763,7 @@ func TestResolverVStreamMulti(t *testing.T) {
}
sbc1.AddVStreamEvents(send1, nil)
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vtgate/resolver_test.go:765")
defer (func() { log.Infof("ctx cancel at go/vt/vtgate/resolver_test.go:699\n"); cancel() })()
var got *binlogdatapb.VGtid
@@ -827,7 +828,7 @@ func TestResolverVStreamRetry(t *testing.T) {
sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "cc"))
sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "final error"))
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vtgate/resolver_test.go:830")
defer (func() { log.Infof("ctx cancel at go/vt/vtgate/resolver_test.go:764\n"); cancel() })()
count := 0
@@ -886,7 +887,7 @@ func TestResolverVStreamHeartbeat(t *testing.T) {
}}
sbc0.AddVStreamEvents(send1, nil)
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vtgate/resolver_test.go:889")
defer (func() { log.Infof("ctx cancel at go/vt/vtgate/resolver_test.go:823\n"); cancel() })()
vgtid := &binlogdatapb.VGtid{
diff --git a/go/vt/vtgate/scatter_conn.go b/go/vt/vtgate/scatter_conn.go
index 612c49a0a..e502910b2 100644
--- a/go/vt/vtgate/scatter_conn.go
+++ b/go/vt/vtgate/scatter_conn.go
@@ -25,6 +25,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/concurrency"
@@ -547,7 +548,7 @@ func (tt *timeTracker) Record(target *querypb.Target) time.Time {
func (stc *ScatterConn) MessageStream(ctx context.Context, rss []*srvtopo.ResolvedShard, name string, callback func(*sqltypes.Result) error) error {
// The cancelable context is used for handling errors
// from individual streams.
- ctx, cancel := context.WithCancel(ctx)
+ ctx, cancel := context2.WithCancel(ctx, "./go/vt/vtgate/scatter_conn.go:550")
defer (func() { log.Infof("ctx cancel at go/vt/vtgate/scatter_conn.go:551\n"); cancel() })()
// mu is used to merge multiple callback calls into one.
diff --git a/go/vt/vtgate/vcursor_impl.go b/go/vt/vtgate/vcursor_impl.go
index 8da8069b3..13dbec234 100644
--- a/go/vt/vtgate/vcursor_impl.go
+++ b/go/vt/vtgate/vcursor_impl.go
@@ -22,6 +22,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/sqlparser"
@@ -82,7 +83,7 @@ func (vc *vcursorImpl) MaxMemoryRows() int {
// SetContextTimeout updates context and sets a timeout.
func (vc *vcursorImpl) SetContextTimeout(timeout time.Duration) context.CancelFunc {
- ctx, cancel := context.WithTimeout(vc.ctx, timeout)
+ ctx, cancel := context2.WithTimeout(vc.ctx, timeout, "./go/vt/vtgate/vcursor_impl.go:85")
vc.ctx = ctx
return cancel
}
diff --git a/go/vt/vtgate/vtgate_test.go b/go/vt/vtgate/vtgate_test.go
index a2c24b2f7..dd326a989 100644
--- a/go/vt/vtgate/vtgate_test.go
+++ b/go/vt/vtgate/vtgate_test.go
@@ -29,6 +29,7 @@ import (
"golang.org/x/net/context"
"github.com/golang/protobuf/proto"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/key"
@@ -1225,7 +1226,7 @@ func TestVTGateMessageStreamSharded(t *testing.T) {
_ = hcVTGateTest.AddTestTablet("aa", "1.1.1.1", 1002, ks, "20-40", topodatapb.TabletType_MASTER, true, 1, nil)
ch := make(chan *sqltypes.Result)
done := make(chan struct{})
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vtgate/vtgate_test.go:1228")
go func() {
kr := &topodatapb.KeyRange{End: []byte{0x40}}
err := rpcVTGate.MessageStream(ctx, ks, "", kr, "msg", func(qr *sqltypes.Result) error {
@@ -1271,7 +1272,7 @@ func TestVTGateMessageStreamUnsharded(t *testing.T) {
_ = hcVTGateTest.AddTestTablet("aa", "1.1.1.1", 1001, ks, "0", topodatapb.TabletType_MASTER, true, 1, nil)
ch := make(chan *sqltypes.Result)
done := make(chan struct{})
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vtgate/vtgate_test.go:1274")
go func() {
err := rpcVTGate.MessageStream(ctx, ks, "0", nil, "msg", func(qr *sqltypes.Result) error {
select {
@@ -1307,7 +1308,7 @@ func TestVTGateMessageStreamRetry(t *testing.T) {
_ = hcVTGateTest.AddTestTablet("aa", "1.1.1.1", 1001, ks, "0", topodatapb.TabletType_MASTER, true, 1, nil)
ch := make(chan *sqltypes.Result)
done := make(chan struct{})
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vtgate/vtgate_test.go:1310")
go func() {
err := rpcVTGate.MessageStream(ctx, ks, "0", nil, "msg", func(qr *sqltypes.Result) error {
select {
@@ -1351,7 +1352,7 @@ func TestVTGateMessageStreamUnavailable(t *testing.T) {
tablet.MustFailCodes[vtrpcpb.Code_UNAVAILABLE] = 1
ch := make(chan *sqltypes.Result)
done := make(chan struct{})
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vtgate/vtgate_test.go:1354")
go func() {
err := rpcVTGate.MessageStream(ctx, ks, "0", nil, "msg", func(qr *sqltypes.Result) error {
select {
diff --git a/go/vt/vtqueryserver/plugin_mysql_server.go b/go/vt/vtqueryserver/plugin_mysql_server.go
index aba29684d..a984eaaf9 100644
--- a/go/vt/vtqueryserver/plugin_mysql_server.go
+++ b/go/vt/vtqueryserver/plugin_mysql_server.go
@@ -25,6 +25,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/callerid"
@@ -75,7 +76,7 @@ func (mh *proxyHandler) ConnectionClosed(c *mysql.Conn) {
var ctx context.Context
var cancel context.CancelFunc
if *mysqlQueryTimeout != 0 {
- ctx, cancel = context.WithTimeout(context.Background(), *mysqlQueryTimeout)
+ ctx, cancel = context2.WithTimeout(context.Background(), *mysqlQueryTimeout, "./go/vt/vtqueryserver/plugin_mysql_server.go:78")
defer (func() { log.Infof("ctx cancel at go/vt/vtqueryserver/plugin_mysql_server.go:79\n"); cancel() })()
} else {
ctx = context.Background()
@@ -90,7 +91,7 @@ func (mh *proxyHandler) ComQuery(c *mysql.Conn, query string, callback func(*sql
var ctx context.Context
var cancel context.CancelFunc
if *mysqlQueryTimeout != 0 {
- ctx, cancel = context.WithTimeout(context.Background(), *mysqlQueryTimeout)
+ ctx, cancel = context2.WithTimeout(context.Background(), *mysqlQueryTimeout, "./go/vt/vtqueryserver/plugin_mysql_server.go:93")
defer (func() { log.Infof("ctx cancel at go/vt/vtqueryserver/plugin_mysql_server.go:94\n"); cancel() })()
} else {
ctx = context.Background()
diff --git a/go/vt/vttablet/agentrpctest/test_agent_rpc.go b/go/vt/vttablet/agentrpctest/test_agent_rpc.go
index 692029033..2fc66c4c5 100644
--- a/go/vt/vttablet/agentrpctest/test_agent_rpc.go
+++ b/go/vt/vttablet/agentrpctest/test_agent_rpc.go
@@ -28,6 +28,7 @@ import (
"golang.org/x/net/context"
"github.com/golang/protobuf/proto"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/hook"
"vitess.io/vitess/go/vt/log"
@@ -202,7 +203,7 @@ func agentRPCTestPingPanic(ctx context.Context, t *testing.T, client tmclient.Ta
// RPCs failed due to an expired context before .Dial().
func agentRPCTestDialExpiredContext(ctx context.Context, t *testing.T, client tmclient.TabletManagerClient, tablet *topodatapb.Tablet) {
// Using a timeout of 0 here such that .Dial() will fail immediately.
- expiredCtx, cancel := context.WithTimeout(ctx, 0)
+ expiredCtx, cancel := context2.WithTimeout(ctx, 0, "./go/vt/vttablet/agentrpctest/test_agent_rpc.go:205")
defer (func() { log.Infof("ctx cancel at go/vt/vttablet/agentrpctest/test_agent_rpc.go:205\n"); cancel() })()
err := client.Ping(expiredCtx, tablet)
if err == nil {
@@ -228,7 +229,7 @@ func agentRPCTestRPCTimeout(ctx context.Context, t *testing.T, client tmclient.T
// NOTE: This might still race e.g. when test execution takes too long the
// context will be expired in dial() already. In such cases coverage
// will be reduced but the test will not flake.
- shortCtx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
+ shortCtx, cancel := context2.WithTimeout(ctx, 10*time.Millisecond, "./go/vt/vttablet/agentrpctest/test_agent_rpc.go:231")
defer (func() { log.Infof("ctx cancel at go/vt/vttablet/agentrpctest/test_agent_rpc.go:231\n"); cancel() })()
fakeAgent.setSlow(true)
defer func() { fakeAgent.setSlow(false) }()
diff --git a/go/vt/vttablet/endtoend/misc_test.go b/go/vt/vttablet/endtoend/misc_test.go
index 0b9c50d38..3c7a82fbf 100644
--- a/go/vt/vttablet/endtoend/misc_test.go
+++ b/go/vt/vttablet/endtoend/misc_test.go
@@ -29,6 +29,7 @@ import (
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/callerid"
@@ -467,7 +468,7 @@ func TestStreamHealth_Expired(t *testing.T) {
var health *querypb.StreamHealthResponse
framework.Server.BroadcastHealth(0, nil, time.Millisecond)
time.Sleep(5 * time.Millisecond)
- ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100)
+ ctx, cancel := context2.WithTimeout(context.Background(), time.Millisecond*100, "./go/vt/vttablet/endtoend/misc_test.go:470")
defer (func() { log.Infof("ctx cancel at go/vt/vttablet/endtoend/misc_test.go:471\n"); cancel() })()
if err := framework.Server.StreamHealth(ctx, func(shr *querypb.StreamHealthResponse) error {
health = shr
diff --git a/go/vt/vttablet/grpctabletconn/conn.go b/go/vt/vttablet/grpctabletconn/conn.go
index edd48b47f..e1ac83a08 100644
--- a/go/vt/vttablet/grpctabletconn/conn.go
+++ b/go/vt/vttablet/grpctabletconn/conn.go
@@ -23,6 +23,7 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/netutil"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/callerid"
@@ -149,7 +150,7 @@ func (conn *gRPCQueryClient) StreamExecute(ctx context.Context, target *querypb.
// no direct API to end a stream from the client side. If callback
// returns an error, we return from the function. The deferred
// cancel will then cause the stream to be terminated.
- ctx, cancel := context.WithCancel(ctx)
+ ctx, cancel := context2.WithCancel(ctx, "./go/vt/vttablet/grpctabletconn/conn.go:152")
defer (func() { log.Infof("ctx cancel at go/vt/vttablet/grpctabletconn/conn.go:153\n"); cancel() })()
stream, err := func() (queryservicepb.Query_StreamExecuteClient, error) {
@@ -494,7 +495,7 @@ func (conn *gRPCQueryClient) BeginExecuteBatch(ctx context.Context, target *quer
// MessageStream streams messages.
func (conn *gRPCQueryClient) MessageStream(ctx context.Context, target *querypb.Target, name string, callback func(*sqltypes.Result) error) error {
// Please see comments in StreamExecute to see how this works.
- ctx, cancel := context.WithCancel(ctx)
+ ctx, cancel := context2.WithCancel(ctx, "./go/vt/vttablet/grpctabletconn/conn.go:497")
defer (func() { log.Infof("ctx cancel at go/vt/vttablet/grpctabletconn/conn.go:498\n"); cancel() })()
stream, err := func() (queryservicepb.Query_MessageStreamClient, error) {
@@ -595,7 +596,7 @@ func (conn *gRPCQueryClient) SplitQuery(
// StreamHealth starts a streaming RPC for VTTablet health status updates.
func (conn *gRPCQueryClient) StreamHealth(ctx context.Context, callback func(*querypb.StreamHealthResponse) error) error {
// Please see comments in StreamExecute to see how this works.
- ctx, cancel := context.WithCancel(ctx)
+ ctx, cancel := context2.WithCancel(ctx, "./go/vt/vttablet/grpctabletconn/conn.go:598")
defer (func() { log.Infof("ctx cancel at go/vt/vttablet/grpctabletconn/conn.go:599\n"); cancel() })()
stream, err := func() (queryservicepb.Query_StreamHealthClient, error) {
@@ -631,7 +632,7 @@ func (conn *gRPCQueryClient) StreamHealth(ctx context.Context, callback func(*qu
// UpdateStream starts a streaming query to VTTablet.
func (conn *gRPCQueryClient) UpdateStream(ctx context.Context, target *querypb.Target, position string, timestamp int64, callback func(*querypb.StreamEvent) error) error {
// Please see comments in StreamExecute to see how this works.
- ctx, cancel := context.WithCancel(ctx)
+ ctx, cancel := context2.WithCancel(ctx, "./go/vt/vttablet/grpctabletconn/conn.go:634")
defer (func() { log.Infof("ctx cancel at go/vt/vttablet/grpctabletconn/conn.go:635\n"); cancel() })()
stream, err := func() (queryservicepb.Query_UpdateStreamClient, error) {
diff --git a/go/vt/vttablet/heartbeat/reader.go b/go/vt/vttablet/heartbeat/reader.go
index 33e480c87..793c0220f 100644
--- a/go/vt/vttablet/heartbeat/reader.go
+++ b/go/vt/vttablet/heartbeat/reader.go
@@ -21,6 +21,7 @@ import (
"sync"
"time"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/vterrors"
"golang.org/x/net/context"
@@ -149,7 +150,7 @@ func (r *Reader) GetLatest() (time.Duration, error) {
func (r *Reader) readHeartbeat() {
defer tabletenv.LogError()
- ctx, cancel := context.WithDeadline(context.Background(), r.now().Add(r.interval))
+ ctx, cancel := context2.WithDeadline(context.Background(), r.now().Add(r.interval), "./go/vt/vttablet/heartbeat/reader.go:152")
defer (func() { log.Infof("ctx cancel at go/vt/vttablet/heartbeat/reader.go:153\n"); cancel() })()
res, err := r.fetchMostRecentHeartbeat(ctx)
diff --git a/go/vt/vttablet/heartbeat/writer.go b/go/vt/vttablet/heartbeat/writer.go
index 31dced89b..b92df96df 100644
--- a/go/vt/vttablet/heartbeat/writer.go
+++ b/go/vt/vttablet/heartbeat/writer.go
@@ -21,6 +21,7 @@ import (
"sync"
"time"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/vterrors"
"golang.org/x/net/context"
@@ -203,7 +204,7 @@ func (w *Writer) bindHeartbeatVars(query string) (string, error) {
// writeHeartbeat updates the heartbeat row for this tablet with the current time in nanoseconds.
func (w *Writer) writeHeartbeat() {
defer tabletenv.LogError()
- ctx, cancel := context.WithDeadline(context.Background(), w.now().Add(w.interval))
+ ctx, cancel := context2.WithDeadline(context.Background(), w.now().Add(w.interval), "./go/vt/vttablet/heartbeat/writer.go:206")
defer (func() { log.Infof("ctx cancel at go/vt/vttablet/heartbeat/writer.go:207\n"); cancel() })()
update, err := w.bindHeartbeatVars(sqlUpdateHeartbeat)
if err != nil {
diff --git a/go/vt/vttablet/tabletmanager/action_agent.go b/go/vt/vttablet/tabletmanager/action_agent.go
index 0cbf2b87f..797bd3d92 100644
--- a/go/vt/vttablet/tabletmanager/action_agent.go
+++ b/go/vt/vttablet/tabletmanager/action_agent.go
@@ -44,6 +44,7 @@ import (
"sync"
"time"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/vterrors"
"golang.org/x/net/context"
@@ -746,7 +747,7 @@ func (agent *ActionAgent) checkTabletMysqlPort(ctx context.Context, tablet *topo
// Update the port in the topology. Use a shorter timeout, so if
// the topo server is busy / throttling us, we don't hang forever here.
// The healthcheck go routine will try again next time.
- ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
+ ctx, cancel := context2.WithTimeout(ctx, 10*time.Second, "./go/vt/vttablet/tabletmanager/action_agent.go:749")
defer (func() { log.Infof("ctx cancel at go/vt/vttablet/tabletmanager/action_agent.go:750\n"); cancel() })()
if !agent.waitingForMysql {
log.Warningf("MySQL port has changed from %v to %v, updating it in tablet record", topoproto.MysqlPort(tablet), mport)
diff --git a/go/vt/vttablet/tabletmanager/init_tablet.go b/go/vt/vttablet/tabletmanager/init_tablet.go
index 91fe8f560..741dc382e 100644
--- a/go/vt/vttablet/tabletmanager/init_tablet.go
+++ b/go/vt/vttablet/tabletmanager/init_tablet.go
@@ -24,8 +24,7 @@ import (
"fmt"
"time"
- "golang.org/x/net/context"
-
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/flagutil"
"vitess.io/vitess/go/netutil"
"vitess.io/vitess/go/vt/log"
@@ -84,7 +83,7 @@ func (agent *ActionAgent) InitTablet(port, gRPCPort int32) error {
// Create a context for this whole operation. Note we will
// retry some actions upon failure up to this context expires.
- ctx, cancel := context.WithTimeout(agent.batchCtx, *initTimeout)
+ ctx, cancel := context2.WithTimeout(agent.batchCtx, *initTimeout, "./go/vt/vttablet/tabletmanager/init_tablet.go:87")
defer (func() { log.Infof("ctx cancel at go/vt/vttablet/tabletmanager/init_tablet.go:88\n"); cancel() })()
// Read the shard, create it if necessary.
diff --git a/go/vt/vttablet/tabletmanager/replication_reporter.go b/go/vt/vttablet/tabletmanager/replication_reporter.go
index 4ec3b1241..5f3ca9ecf 100644
--- a/go/vt/vttablet/tabletmanager/replication_reporter.go
+++ b/go/vt/vttablet/tabletmanager/replication_reporter.go
@@ -22,6 +22,7 @@ import (
"html/template"
"time"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/vterrors"
"golang.org/x/net/context"
@@ -68,7 +69,7 @@ func (r *replicationReporter) Report(isSlaveType, shouldQueryServiceBeRunning bo
log.Infof("Slave is stopped. Running with --disable_active_reparents so will not try to reconnect to master...")
} else {
log.Infof("Slave is stopped. Trying to reconnect to master...")
- ctx, cancel := context.WithTimeout(r.agent.batchCtx, 5*time.Second)
+ ctx, cancel := context2.WithTimeout(r.agent.batchCtx, 5*time.Second, "./go/vt/vttablet/tabletmanager/replication_reporter.go:71")
if err := repairReplication(ctx, r.agent); err != nil {
log.Infof("Failed to reconnect to master: %v", err)
}
diff --git a/go/vt/vttablet/tabletmanager/rpc_external_reparent.go b/go/vt/vttablet/tabletmanager/rpc_external_reparent.go
index cb6245b48..770b8f745 100644
--- a/go/vt/vttablet/tabletmanager/rpc_external_reparent.go
+++ b/go/vt/vttablet/tabletmanager/rpc_external_reparent.go
@@ -23,6 +23,7 @@ import (
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/event"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/trace"
@@ -113,7 +114,7 @@ func (agent *ActionAgent) TabletExternallyReparented(ctx context.Context, extern
agent.updateState(ctx, newTablet, "fastTabletExternallyReparented")
}
// Start the finalize stage with a background context, but connect the trace.
- bgCtx, cancel := context.WithTimeout(agent.batchCtx, *finalizeReparentTimeout)
+ bgCtx, cancel := context2.WithTimeout(agent.batchCtx, *finalizeReparentTimeout, "./go/vt/vttablet/tabletmanager/rpc_external_reparent.go:116")
bgCtx = trace.CopySpan(bgCtx, ctx)
agent.finalizeReparentCtx = bgCtx
go func() {
diff --git a/go/vt/vttablet/tabletmanager/rpc_replication.go b/go/vt/vttablet/tabletmanager/rpc_replication.go
index 41a4501cb..a3d945d9b 100644
--- a/go/vt/vttablet/tabletmanager/rpc_replication.go
+++ b/go/vt/vttablet/tabletmanager/rpc_replication.go
@@ -21,6 +21,7 @@ import (
"fmt"
"time"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/vterrors"
"golang.org/x/net/context"
@@ -103,7 +104,7 @@ func (agent *ActionAgent) StopSlaveMinimum(ctx context.Context, position string,
if err != nil {
return "", err
}
- waitCtx, cancel := context.WithTimeout(ctx, waitTime)
+ waitCtx, cancel := context2.WithTimeout(ctx, waitTime, "./go/vt/vttablet/tabletmanager/rpc_replication.go:106")
defer (func() { log.Infof("ctx cancel at go/vt/vttablet/tabletmanager/rpc_replication.go:107\n"); cancel() })()
if err := agent.MysqlDaemon.WaitMasterPos(waitCtx, pos); err != nil {
return "", err
@@ -153,7 +154,7 @@ func (agent *ActionAgent) StartSlaveUntilAfter(ctx context.Context, position str
}
defer agent.unlock()
- waitCtx, cancel := context.WithTimeout(ctx, waitTime)
+ waitCtx, cancel := context2.WithTimeout(ctx, waitTime, "./go/vt/vttablet/tabletmanager/rpc_replication.go:156")
defer (func() { log.Infof("ctx cancel at go/vt/vttablet/tabletmanager/rpc_replication.go:157\n"); cancel() })()
pos, err := mysql.DecodePosition(position)
diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go
index 93e334ce3..e1b151af0 100644
--- a/go/vt/vttablet/tabletmanager/vreplication/controller.go
+++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go
@@ -22,6 +22,7 @@ import (
"strconv"
"time"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/vterrors"
"github.com/golang/protobuf/proto"
@@ -106,7 +107,7 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor
ct.tabletPicker = tp
// cancel
- ctx, ct.cancel = context.WithCancel(ctx)
+ ctx, ct.cancel = context2.WithCancel(ctx, "./go/vt/vttablet/tabletmanager/vreplication/controller.go:109")
go ct.run(ctx)
diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller_test.go b/go/vt/vttablet/tabletmanager/vreplication/controller_test.go
index f330985c4..acecde60b 100644
--- a/go/vt/vttablet/tabletmanager/vreplication/controller_test.go
+++ b/go/vt/vttablet/tabletmanager/vreplication/controller_test.go
@@ -24,6 +24,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon"
@@ -225,7 +226,7 @@ func TestControllerCanceledContext(t *testing.T) {
"source": fmt.Sprintf(`keyspace:"%s" shard:"0" key_range:<end:"\200" > `, env.KeyspaceName),
}
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletmanager/vreplication/controller_test.go:228")
cancel()
ct, err := newController(ctx, params, nil, nil, env.TopoServ, env.Cells[0], "rdonly", nil)
if err != nil {
diff --git a/go/vt/vttablet/tabletmanager/vreplication/engine.go b/go/vt/vttablet/tabletmanager/vreplication/engine.go
index ec999ccc0..b75faf5a0 100644
--- a/go/vt/vttablet/tabletmanager/vreplication/engine.go
+++ b/go/vt/vttablet/tabletmanager/vreplication/engine.go
@@ -24,6 +24,7 @@ import (
"time"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
@@ -110,7 +111,7 @@ func (vre *Engine) Open(ctx context.Context) error {
return nil
}
- vre.ctx, vre.cancel = context.WithCancel(ctx)
+ vre.ctx, vre.cancel = context2.WithCancel(ctx, "./go/vt/vttablet/tabletmanager/vreplication/engine.go:113")
vre.isOpen = true
if err := vre.initAll(); err != nil {
go vre.Close()
diff --git a/go/vt/vttablet/tabletmanager/vreplication/engine_test.go b/go/vt/vttablet/tabletmanager/vreplication/engine_test.go
index d16ca35cc..11cb9b670 100644
--- a/go/vt/vttablet/tabletmanager/vreplication/engine_test.go
+++ b/go/vt/vttablet/tabletmanager/vreplication/engine_test.go
@@ -23,6 +23,7 @@ import (
"time"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
@@ -398,7 +399,7 @@ func TestWaitForPosCancel(t *testing.T) {
sqltypes.NewVarBinary("Running"),
sqltypes.NewVarBinary(""),
}}}, nil)
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletmanager/vreplication/engine_test.go:401")
cancel()
err := vre.WaitForPos(ctx, 1, "MariaDB/0-1-1084")
if err == nil || err != context.Canceled {
diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier.go
index 4688d7412..e44bcf19b 100644
--- a/go/vt/vttablet/tabletmanager/vreplication/vcopier.go
+++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier.go
@@ -27,6 +27,7 @@ import (
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
@@ -115,7 +116,7 @@ func (vc *vcopier) copyNext(ctx context.Context, settings binlogplayer.VRSetting
}
func (vc *vcopier) catchup(ctx context.Context, copyState map[string]*sqltypes.Result) error {
- ctx, cancel := context.WithCancel(ctx)
+ ctx, cancel := context2.WithCancel(ctx, "./go/vt/vttablet/tabletmanager/vreplication/vcopier.go:118")
defer (func() {
log.Infof("ctx cancel at go/vt/vttablet/tabletmanager/vreplication/vcopier.go:119\n")
cancel()
@@ -186,7 +187,7 @@ func (vc *vcopier) copyTable(ctx context.Context, tableName string, copyState ma
}
defer vsClient.Close(ctx)
- ctx, cancel := context.WithTimeout(ctx, copyTimeout)
+ ctx, cancel := context2.WithTimeout(ctx, copyTimeout, "./go/vt/vttablet/tabletmanager/vreplication/vcopier.go:189")
defer (func() {
log.Infof("ctx cancel at go/vt/vttablet/tabletmanager/vreplication/vcopier.go:187\n")
cancel()
diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go
index 790143520..75d7298ff 100644
--- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go
+++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go
@@ -24,6 +24,7 @@ import (
"time"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
@@ -113,7 +114,7 @@ func (vp *vplayer) fetchAndApply(ctx context.Context) error {
return fmt.Errorf("error dialing tablet: %v", err)
}
defer vsClient.Close(ctx)
- ctx, cancel := context.WithCancel(ctx)
+ ctx, cancel := context2.WithCancel(ctx, "./go/vt/vttablet/tabletmanager/vreplication/vplayer.go:116")
defer (func() {
log.Infof("ctx cancel at go/vt/vttablet/tabletmanager/vreplication/vplayer.go:117\n")
cancel()
diff --git a/go/vt/vttablet/tabletserver/connpool/dbconn_test.go b/go/vt/vttablet/tabletserver/connpool/dbconn_test.go
index ebde54578..e9cfbb62a 100644
--- a/go/vt/vttablet/tabletserver/connpool/dbconn_test.go
+++ b/go/vt/vttablet/tabletserver/connpool/dbconn_test.go
@@ -26,6 +26,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/fakesqldb"
"vitess.io/vitess/go/sqltypes"
@@ -63,7 +64,7 @@ func TestDBConnExec(t *testing.T) {
connPool := newPool()
connPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams())
defer connPool.Close()
- ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(10*time.Second))
+ ctx, cancel := context2.WithDeadline(context.Background(), time.Now().Add(10*time.Second), "./go/vt/vttablet/tabletserver/connpool/dbconn_test.go:66")
defer (func() { log.Infof("ctx cancel at go/vt/vttablet/tabletserver/connpool/dbconn_test.go:66\n"); cancel() })()
dbConn, err := NewDBConn(connPool, db.ConnParams())
if dbConn != nil {
@@ -141,7 +142,7 @@ func TestDBConnDeadline(t *testing.T) {
defer connPool.Close()
db.SetConnDelay(100 * time.Millisecond)
- ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(50*time.Millisecond))
+ ctx, cancel := context2.WithDeadline(context.Background(), time.Now().Add(50*time.Millisecond), "./go/vt/vttablet/tabletserver/connpool/dbconn_test.go:144")
defer (func() {
log.Infof("ctx cancel at go/vt/vttablet/tabletserver/connpool/dbconn_test.go:144\n")
cancel()
@@ -167,7 +168,7 @@ func TestDBConnDeadline(t *testing.T) {
startCounts = tabletenv.MySQLStats.Counts()
- ctx, cancel = context.WithDeadline(context.Background(), time.Now().Add(10*time.Second))
+ ctx, cancel = context2.WithDeadline(context.Background(), time.Now().Add(10*time.Second), "./go/vt/vttablet/tabletserver/connpool/dbconn_test.go:170")
defer (func() {
log.Infof("ctx cancel at go/vt/vttablet/tabletserver/connpool/dbconn_test.go:167\n")
cancel()
@@ -307,7 +308,7 @@ func TestDBConnStream(t *testing.T) {
connPool := newPool()
connPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams())
defer connPool.Close()
- ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(10*time.Second))
+ ctx, cancel := context2.WithDeadline(context.Background(), time.Now().Add(10*time.Second), "./go/vt/vttablet/tabletserver/connpool/dbconn_test.go:310")
defer (func() {
log.Infof("ctx cancel at go/vt/vttablet/tabletserver/connpool/dbconn_test.go:304\n")
cancel()
diff --git a/go/vt/vttablet/tabletserver/messager/message_manager.go b/go/vt/vttablet/tabletserver/messager/message_manager.go
index 9c531d25b..4992c4939 100644
--- a/go/vt/vttablet/tabletserver/messager/message_manager.go
+++ b/go/vt/vttablet/tabletserver/messager/message_manager.go
@@ -23,6 +23,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/sync2"
@@ -56,7 +57,7 @@ type messageReceiver struct {
}
func newMessageReceiver(ctx context.Context, send func(*sqltypes.Result) error) (*messageReceiver, <-chan struct{}) {
- ctx, cancel := context.WithCancel(ctx)
+ ctx, cancel := context2.WithCancel(ctx, "./go/vt/vttablet/tabletserver/messager/message_manager.go:59")
rcv := &messageReceiver{
ctx: ctx,
errChan: make(chan error, 1),
@@ -534,7 +535,7 @@ func (mm *messageManager) postpone(tsv TabletService, name string, ackWaitTime t
return
}
defer mm.postponeSema.Release()
- ctx, cancel := context.WithTimeout(tabletenv.LocalContext(), ackWaitTime)
+ ctx, cancel := context2.WithTimeout(tabletenv.LocalContext(), ackWaitTime, "./go/vt/vttablet/tabletserver/messager/message_manager.go:537")
defer (func() {
log.Infof("ctx cancel at go/vt/vttablet/tabletserver/messager/message_manager.go:539\n")
cancel()
@@ -546,7 +547,7 @@ func (mm *messageManager) postpone(tsv TabletService, name string, ackWaitTime t
}
func (mm *messageManager) runPoller() {
- ctx, cancel := context.WithTimeout(tabletenv.LocalContext(), mm.pollerTicks.Interval())
+ ctx, cancel := context2.WithTimeout(tabletenv.LocalContext(), mm.pollerTicks.Interval(), "./go/vt/vttablet/tabletserver/messager/message_manager.go:549")
defer func() {
tabletenv.LogError()
cancel()
@@ -616,7 +617,7 @@ func (mm *messageManager) runPurge() {
// purge is a non-member because it should be called asynchronously and should
// not rely on members of messageManager.
func purge(tsv TabletService, name string, purgeAfter, purgeInterval time.Duration) {
- ctx, cancel := context.WithTimeout(tabletenv.LocalContext(), purgeInterval)
+ ctx, cancel := context2.WithTimeout(tabletenv.LocalContext(), purgeInterval, "./go/vt/vttablet/tabletserver/messager/message_manager.go:619")
defer func() {
tabletenv.LogError()
cancel()
diff --git a/go/vt/vttablet/tabletserver/messager/message_manager_test.go b/go/vt/vttablet/tabletserver/messager/message_manager_test.go
index aa43af69e..6fd142db7 100644
--- a/go/vt/vttablet/tabletserver/messager/message_manager_test.go
+++ b/go/vt/vttablet/tabletserver/messager/message_manager_test.go
@@ -26,6 +26,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/mysql/fakesqldb"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/sync2"
@@ -106,7 +107,7 @@ func TestReceiverCancel(t *testing.T) {
mm.Open()
defer mm.Close()
r1 := newTestReceiver(0)
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletserver/messager/message_manager_test.go:109")
_ = mm.Subscribe(ctx, r1.rcv)
cancel()
// r1 should eventually be unsubscribed.
@@ -232,7 +233,7 @@ func TestMessageManagerSend(t *testing.T) {
// Test that mm stops sending to a canceled receiver.
r2 := newTestReceiver(1)
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletserver/messager/message_manager_test.go:235")
mm.Subscribe(ctx, r2.rcv)
<-r2.ch
mm.Add(&MessageRow{Row: []sqltypes.Value{sqltypes.NewVarBinary("2")}})
@@ -343,7 +344,7 @@ func TestMessageManagerSendEOF(t *testing.T) {
mm.Open()
defer mm.Close()
r1 := newTestReceiver(0)
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletserver/messager/message_manager_test.go:346")
mm.Subscribe(ctx, r1.rcv)
// Pull field info.
<-r1.ch
@@ -514,7 +515,7 @@ func TestMessageManagerPoller(t *testing.T) {
mm.Open()
defer mm.Close()
r1 := newTestReceiver(1)
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletserver/messager/message_manager_test.go:517")
mm.Subscribe(ctx, r1.rcv)
<-r1.ch
mm.pollerTicks.Trigger()
diff --git a/go/vt/vttablet/tabletserver/query_engine.go b/go/vt/vttablet/tabletserver/query_engine.go
index 27bcacc18..c71ed8b1f 100644
--- a/go/vt/vttablet/tabletserver/query_engine.go
+++ b/go/vt/vttablet/tabletserver/query_engine.go
@@ -28,6 +28,7 @@ import (
"vitess.io/vitess/go/acl"
"vitess.io/vitess/go/cache"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/streamlog"
@@ -395,7 +396,7 @@ func (qe *QueryEngine) getQueryConn(ctx context.Context) (*connpool.DBConn, erro
timeout := qe.connTimeout.Get()
if timeout != 0 {
- ctxTimeout, cancel := context.WithTimeout(ctx, timeout)
+ ctxTimeout, cancel := context2.WithTimeout(ctx, timeout, "./go/vt/vttablet/tabletserver/query_engine.go:398")
defer (func() { log.Infof("ctx cancel at go/vt/vttablet/tabletserver/query_engine.go:387\n"); cancel() })()
conn, err := qe.conns.Get(ctxTimeout)
if err != nil {
diff --git a/go/vt/vttablet/tabletserver/replication_watcher.go b/go/vt/vttablet/tabletserver/replication_watcher.go
index 67f8b818c..84f018811 100644
--- a/go/vt/vttablet/tabletserver/replication_watcher.go
+++ b/go/vt/vttablet/tabletserver/replication_watcher.go
@@ -22,6 +22,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/binlog"
@@ -91,7 +92,7 @@ func (rpw *ReplicationWatcher) Open() {
if rpw.isOpen || !rpw.watchReplication {
return
}
- ctx, cancel := context.WithCancel(tabletenv.LocalContext())
+ ctx, cancel := context2.WithCancel(tabletenv.LocalContext(), "./go/vt/vttablet/tabletserver/replication_watcher.go:94")
rpw.cancel = cancel
rpw.wg.Add(1)
go rpw.Process(ctx, rpw.dbconfigs)
diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go
index e3c7a540b..126fe00b6 100644
--- a/go/vt/vttablet/tabletserver/tabletserver.go
+++ b/go/vt/vttablet/tabletserver/tabletserver.go
@@ -31,6 +31,7 @@ import (
"golang.org/x/net/context"
"vitess.io/vitess/go/acl"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/history"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
@@ -2017,7 +2018,7 @@ func (tsv *TabletServer) UpdateStream(ctx context.Context, target *querypb.Targe
s := binlog.NewEventStreamer(tsv.dbconfigs.DbaWithDB(), tsv.se, p, timestamp, callback)
// Create a cancelable wrapping context.
- streamCtx, streamCancel := context.WithCancel(ctx)
+ streamCtx, streamCancel := context2.WithCancel(ctx, "./go/vt/vttablet/tabletserver/tabletserver.go:2020")
i := tsv.updateStreamList.Add(streamCancel)
defer tsv.updateStreamList.Delete(i)
@@ -2319,7 +2320,7 @@ func withTimeout(ctx context.Context, timeout time.Duration, options *querypb.Ex
if timeout == 0 || options.GetWorkload() == querypb.ExecuteOptions_DBA || tabletenv.IsLocalContext(ctx) {
return ctx, func() {}
}
- return context.WithTimeout(ctx, timeout)
+ return context2.WithTimeout(ctx, timeout, "./go/vt/vttablet/tabletserver/tabletserver.go:2322")
}
// skipQueryPlanCache returns true if the query plan should be cached
diff --git a/go/vt/vttablet/tabletserver/tabletserver_test.go b/go/vt/vttablet/tabletserver/tabletserver_test.go
index dcbced312..c2a1fa46c 100644
--- a/go/vt/vttablet/tabletserver/tabletserver_test.go
+++ b/go/vt/vttablet/tabletserver/tabletserver_test.go
@@ -32,6 +32,7 @@ import (
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/fakesqldb"
"vitess.io/vitess/go/sqltypes"
@@ -958,7 +959,7 @@ func TestTabletServerBeginFail(t *testing.T) {
t.Fatalf("StartService failed: %v", err)
}
defer tsv.StopService()
- ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond)
+ ctx, cancel := context2.WithTimeout(context.Background(), 1*time.Nanosecond, "./go/vt/vttablet/tabletserver/tabletserver_test.go:961")
defer (func() { log.Infof("ctx cancel at go/vt/vttablet/tabletserver/tabletserver_test.go:962\n"); cancel() })()
tsv.Begin(ctx, &target, nil)
_, err = tsv.Begin(ctx, &target, nil)
@@ -2092,7 +2093,7 @@ func TestSerializeTransactionsSameRow_RequestCanceled(t *testing.T) {
}()
// tx2.
- ctxTx2, cancelTx2 := context.WithCancel(ctx)
+ ctxTx2, cancelTx2 := context2.WithCancel(ctx, "./go/vt/vttablet/tabletserver/tabletserver_test.go:2095")
wg.Add(1)
go func() {
defer wg.Done()
diff --git a/go/vt/vttablet/tabletserver/tx_engine.go b/go/vt/vttablet/tabletserver/tx_engine.go
index 9a485f028..f4cf91829 100644
--- a/go/vt/vttablet/tabletserver/tx_engine.go
+++ b/go/vt/vttablet/tabletserver/tx_engine.go
@@ -23,6 +23,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/timer"
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/concurrency"
@@ -541,7 +542,7 @@ func (te *TxEngine) rollbackPrepared() {
// transactions and calls the notifier on them.
func (te *TxEngine) startWatchdog() {
te.ticks.Start(func() {
- ctx, cancel := context.WithTimeout(tabletenv.LocalContext(), te.abandonAge/4)
+ ctx, cancel := context2.WithTimeout(tabletenv.LocalContext(), te.abandonAge/4, "./go/vt/vttablet/tabletserver/tx_engine.go:544")
defer (func() { log.Infof("ctx cancel at go/vt/vttablet/tabletserver/tx_engine.go:545\n"); cancel() })()
// Raise alerts on prepares that have been unresolved for too long.
diff --git a/go/vt/vttablet/tabletserver/txserializer/tx_serializer_test.go b/go/vt/vttablet/tabletserver/txserializer/tx_serializer_test.go
index bc64e396a..2117fb957 100644
--- a/go/vt/vttablet/tabletserver/txserializer/tx_serializer_test.go
+++ b/go/vt/vttablet/tabletserver/txserializer/tx_serializer_test.go
@@ -27,6 +27,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/streamlog"
"vitess.io/vitess/go/vt/vterrors"
@@ -306,7 +307,7 @@ func TestTxSerializerCancel(t *testing.T) {
}
// tx3 (gets queued and must wait).
- ctx3, cancel3 := context.WithCancel(context.Background())
+ ctx3, cancel3 := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletserver/txserializer/tx_serializer_test.go:309")
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
diff --git a/go/vt/vttablet/tabletserver/vstreamer/engine_test.go b/go/vt/vttablet/tabletserver/vstreamer/engine_test.go
index 9db9ecc01..25ed2e192 100644
--- a/go/vt/vttablet/tabletserver/vstreamer/engine_test.go
+++ b/go/vt/vttablet/tabletserver/vstreamer/engine_test.go
@@ -23,6 +23,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)
@@ -54,7 +55,7 @@ func TestUpdateVSchema(t *testing.T) {
defer env.SetVSchema("{}")
// We have to start at least one stream to start the vschema watcher.
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletserver/vstreamer/engine_test.go:57")
defer (func() {
log.Infof("ctx cancel at go/vt/vttablet/tabletserver/vstreamer/engine_test.go:57\n")
cancel()
diff --git a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go
index f19ea01f0..88978a380 100644
--- a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go
+++ b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go
@@ -20,6 +20,7 @@ import (
"context"
"fmt"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/dbconfigs"
@@ -49,7 +50,7 @@ type rowStreamer struct {
}
func newRowStreamer(ctx context.Context, cp *mysql.ConnParams, se *schema.Engine, query string, lastpk []sqltypes.Value, kschema *vindexes.KeyspaceSchema, send func(*binlogdatapb.VStreamRowsResponse) error) *rowStreamer {
- ctx, cancel := context.WithCancel(ctx)
+ ctx, cancel := context2.WithCancel(ctx, "./go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go:52")
return &rowStreamer{
ctx: ctx,
cancel: cancel,
diff --git a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go
index 8e237a9e8..678d587d1 100644
--- a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go
+++ b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go
@@ -22,6 +22,7 @@ import (
"testing"
"time"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/log"
@@ -254,7 +255,7 @@ func TestStreamRowsCancel(t *testing.T) {
})
engine.se.Reload(context.Background())
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go:257")
defer (func() {
log.Infof("ctx cancel at go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go:257\n")
cancel()
diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
index 0b0bc0eb7..6bbaba94f 100644
--- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
+++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
@@ -23,6 +23,7 @@ import (
"io"
"time"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/binlog"
@@ -70,7 +71,7 @@ type streamerPlan struct {
}
func newVStreamer(ctx context.Context, cp *mysql.ConnParams, se *schema.Engine, startPos string, filter *binlogdatapb.Filter, kschema *vindexes.KeyspaceSchema, send func([]*binlogdatapb.VEvent) error) *vstreamer {
- ctx, cancel := context.WithCancel(ctx)
+ ctx, cancel := context2.WithCancel(ctx, "./go/vt/vttablet/tabletserver/vstreamer/vstreamer.go:73")
return &vstreamer{
ctx: ctx,
cancel: cancel,
diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go
index 2a8240e8f..e02114cb7 100644
--- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go
+++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go
@@ -23,6 +23,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/vt/log"
@@ -194,7 +195,7 @@ func TestREKeyRange(t *testing.T) {
}
defer env.SetVSchema("{}")
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go:197")
defer (func() {
log.Infof("ctx cancel at go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go:197\n")
cancel()
@@ -344,7 +345,7 @@ func TestDDLAddColumn(t *testing.T) {
})
engine.se.Reload(context.Background())
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go:347")
defer (func() {
log.Infof("ctx cancel at go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go:344\n")
cancel()
@@ -413,7 +414,7 @@ func TestDDLDropColumn(t *testing.T) {
})
engine.se.Reload(context.Background())
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go:416")
defer (func() {
log.Infof("ctx cancel at go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go:410\n")
cancel()
@@ -781,7 +782,7 @@ func TestMinimalMode(t *testing.T) {
"set @@session.binlog_row_image='full'",
})
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go:784")
defer (func() {
log.Infof("ctx cancel at go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go:775\n")
cancel()
@@ -823,7 +824,7 @@ func TestStatementMode(t *testing.T) {
"set @@session.binlog_format='row'",
})
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go:826")
defer (func() {
log.Infof("ctx cancel at go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go:814\n")
cancel()
@@ -845,7 +846,7 @@ func TestStatementMode(t *testing.T) {
func runCases(t *testing.T, filter *binlogdatapb.Filter, testcases []testcase) {
t.Helper()
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go:848")
defer (func() {
log.Infof("ctx cancel at go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go:833\n")
cancel()
diff --git a/go/vt/vttest/mysqlctl.go b/go/vt/vttest/mysqlctl.go
index 54c243f62..568a28eb5 100644
--- a/go/vt/vttest/mysqlctl.go
+++ b/go/vt/vttest/mysqlctl.go
@@ -25,6 +25,7 @@ import (
"strings"
"time"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/vt/log"
)
@@ -53,7 +54,7 @@ type Mysqlctl struct {
// Setup spawns a new mysqld service and initializes it with the defaults.
// The service is kept running in the background until TearDown() is called.
func (ctl *Mysqlctl) Setup() error {
- ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
+ ctx, cancel := context2.WithTimeout(context.Background(), 60*time.Second, "./go/vt/vttest/mysqlctl.go:56")
defer (func() { log.Infof("ctx cancel at go/vt/vttest/mysqlctl.go:56\n"); cancel() })()
cmd := exec.CommandContext(ctx,
@@ -77,7 +78,7 @@ func (ctl *Mysqlctl) Setup() error {
// TearDown shutdowns the running mysqld service
func (ctl *Mysqlctl) TearDown() error {
- ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
+ ctx, cancel := context2.WithTimeout(context.Background(), 60*time.Second, "./go/vt/vttest/mysqlctl.go:80")
defer (func() { log.Infof("ctx cancel at go/vt/vttest/mysqlctl.go:80\n"); cancel() })()
cmd := exec.CommandContext(ctx,
diff --git a/go/vt/worker/chunk.go b/go/vt/worker/chunk.go
index 4b775b470..1b9b9e80b 100644
--- a/go/vt/worker/chunk.go
+++ b/go/vt/worker/chunk.go
@@ -19,6 +19,7 @@ package worker
import (
"fmt"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
@@ -94,7 +95,7 @@ func generateChunks(ctx context.Context, wr *wrangler.Wrangler, tablet *topodata
// Get the MIN and MAX of the leading column of the primary key.
query := fmt.Sprintf("SELECT MIN(%v), MAX(%v) FROM %v.%v", sqlescape.EscapeID(td.PrimaryKeyColumns[0]), sqlescape.EscapeID(td.PrimaryKeyColumns[0]), sqlescape.EscapeID(topoproto.TabletDbName(tablet)), sqlescape.EscapeID(td.Name))
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/chunk.go:97")
qr, err := wr.TabletManagerClient().ExecuteFetchAsApp(shortCtx, tablet, true, []byte(query), 1)
cancel()
if err != nil {
diff --git a/go/vt/worker/diff_utils.go b/go/vt/worker/diff_utils.go
index 19d2dcbca..895267e0f 100644
--- a/go/vt/worker/diff_utils.go
+++ b/go/vt/worker/diff_utils.go
@@ -25,6 +25,7 @@ import (
"strings"
"time"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tmclient"
@@ -63,7 +64,7 @@ type QueryResultReader struct {
// NewQueryResultReaderForTablet creates a new QueryResultReader for
// the provided tablet / sql query
func NewQueryResultReaderForTablet(ctx context.Context, ts *topo.Server, tabletAlias *topodatapb.TabletAlias, sql string) (*QueryResultReader, error) {
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/diff_utils.go:66")
tablet, err := ts.GetTablet(shortCtx, tabletAlias)
cancel()
if err != nil {
@@ -97,7 +98,7 @@ func NewQueryResultReaderForTablet(ctx context.Context, ts *topo.Server, tabletA
// NewTransactionalQueryResultReaderForTablet creates a new QueryResultReader for
// the provided tablet / sql query, and runs it in an existing transaction
func NewTransactionalQueryResultReaderForTablet(ctx context.Context, ts *topo.Server, tabletAlias *topodatapb.TabletAlias, sql string, txID int64) (*QueryResultReader, error) {
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/diff_utils.go:100")
tablet, err := ts.GetTablet(shortCtx, tabletAlias)
cancel()
if err != nil {
@@ -130,7 +131,7 @@ func NewTransactionalQueryResultReaderForTablet(ctx context.Context, ts *topo.Se
// RollbackTransaction rolls back the transaction
func RollbackTransaction(ctx context.Context, ts *topo.Server, tabletAlias *topodatapb.TabletAlias, txID int64) error {
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/diff_utils.go:133")
tablet, err := ts.GetTablet(shortCtx, tabletAlias)
cancel()
if err != nil {
diff --git a/go/vt/worker/executor.go b/go/vt/worker/executor.go
index e953bc4f0..6a0980987 100644
--- a/go/vt/worker/executor.go
+++ b/go/vt/worker/executor.go
@@ -20,6 +20,7 @@ import (
"fmt"
"time"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/vterrors"
@@ -119,7 +120,7 @@ func (e *executor) refreshState(ctx context.Context) error {
func (e *executor) fetchWithRetries(ctx context.Context, action func(ctx context.Context, tablet *topodatapb.Tablet) error) error {
retryDuration := *retryDuration
// We should keep retrying up until the retryCtx runs out.
- retryCtx, retryCancel := context.WithTimeout(ctx, retryDuration)
+ retryCtx, retryCancel := context2.WithTimeout(ctx, retryDuration, "./go/vt/worker/executor.go:122")
defer retryCancel()
// Is this current attempt a retry of a previous attempt?
isRetry := false
@@ -152,7 +153,7 @@ func (e *executor) fetchWithRetries(ctx context.Context, action func(ctx context
// Run the command (in a block since goto above does not allow to introduce
// new variables until the label is reached.)
{
- tryCtx, cancel := context.WithTimeout(retryCtx, 2*time.Minute)
+ tryCtx, cancel := context2.WithTimeout(retryCtx, 2*time.Minute, "./go/vt/worker/executor.go:155")
err = action(tryCtx, master.Tablet)
cancel()
diff --git a/go/vt/worker/instance.go b/go/vt/worker/instance.go
index c14c2df9b..c46554873 100644
--- a/go/vt/worker/instance.go
+++ b/go/vt/worker/instance.go
@@ -24,6 +24,7 @@ import (
"time"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/tb"
@@ -115,7 +116,7 @@ func (wi *Instance) setAndStartWorker(ctx context.Context, wrk Worker, wr *wrang
wi.currentWorker = wrk
wi.currentMemoryLogger = logutil.NewMemoryLogger()
- wi.currentContext, wi.currentCancelFunc = context.WithCancel(ctx)
+ wi.currentContext, wi.currentCancelFunc = context2.WithCancel(ctx, "./go/vt/worker/instance.go:118")
wi.lastRunError = nil
wi.lastRunStopTime = time.Unix(0, 0)
done := make(chan struct{})
diff --git a/go/vt/worker/legacy_split_clone.go b/go/vt/worker/legacy_split_clone.go
index a191a1b79..890a71207 100644
--- a/go/vt/worker/legacy_split_clone.go
+++ b/go/vt/worker/legacy_split_clone.go
@@ -27,6 +27,7 @@ import (
"sync"
"time"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/vterrors"
"golang.org/x/net/context"
@@ -266,7 +267,7 @@ func (scw *LegacySplitCloneWorker) init(ctx context.Context) error {
var err error
// read the keyspace and validate it
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/legacy_split_clone.go:269")
scw.keyspaceInfo, err = scw.wr.TopoServer().GetKeyspace(shortCtx, scw.keyspace)
cancel()
if err != nil {
@@ -274,7 +275,7 @@ func (scw *LegacySplitCloneWorker) init(ctx context.Context) error {
}
// find the OverlappingShards in the keyspace
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/legacy_split_clone.go:277")
osList, err := topotools.FindOverlappingShards(shortCtx, scw.wr.TopoServer(), scw.keyspace)
cancel()
if err != nil {
@@ -367,7 +368,7 @@ func (scw *LegacySplitCloneWorker) findTargets(ctx context.Context) error {
// get the tablet info for them, and stop their replication
scw.sourceTablets = make([]*topodatapb.Tablet, len(scw.sourceAliases))
for i, alias := range scw.sourceAliases {
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/legacy_split_clone.go:370")
ti, err := scw.wr.TopoServer().GetTablet(shortCtx, alias)
cancel()
if err != nil {
@@ -375,7 +376,7 @@ func (scw *LegacySplitCloneWorker) findTargets(ctx context.Context) error {
}
scw.sourceTablets[i] = ti.Tablet
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/legacy_split_clone.go:378")
err = scw.wr.TabletManagerClient().StopSlave(shortCtx, scw.sourceTablets[i])
cancel()
if err != nil {
@@ -398,7 +399,7 @@ func (scw *LegacySplitCloneWorker) findTargets(ctx context.Context) error {
// Make sure we find a master for each destination shard and log it.
scw.wr.Logger().Infof("Finding a MASTER tablet for each destination shard...")
for _, si := range scw.destinationShards {
- waitCtx, waitCancel := context.WithTimeout(ctx, 10*time.Second)
+ waitCtx, waitCancel := context2.WithTimeout(ctx, 10*time.Second, "./go/vt/worker/legacy_split_clone.go:401")
defer waitCancel()
if err := scw.tsc.WaitForTablets(waitCtx, scw.cell, si.Keyspace(), si.ShardName(), topodatapb.TabletType_MASTER); err != nil {
return vterrors.Wrapf(err, "cannot find MASTER tablet for destination shard for %v/%v", si.Keyspace(), si.ShardName())
@@ -410,7 +411,7 @@ func (scw *LegacySplitCloneWorker) findTargets(ctx context.Context) error {
master := masters[0]
// Get the MySQL database name of the tablet.
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/legacy_split_clone.go:413")
ti, err := scw.wr.TopoServer().GetTablet(shortCtx, master.Tablet.Alias)
cancel()
if err != nil {
@@ -453,7 +454,7 @@ func (scw *LegacySplitCloneWorker) copy(ctx context.Context) error {
// on all source shards. Furthermore, we estimate the number of rows
// in each source shard for each table to be about the same
// (rowCount is used to estimate an ETA)
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/legacy_split_clone.go:456")
sourceSchemaDefinition, err := scw.wr.GetSchema(shortCtx, scw.sourceAliases[0], nil, scw.excludeTables, false /* includeViews */)
cancel()
if err != nil {
@@ -476,7 +477,7 @@ func (scw *LegacySplitCloneWorker) copy(ctx context.Context) error {
mu := sync.Mutex{}
var firstError error
- ctx, cancelCopy := context.WithCancel(ctx)
+ ctx, cancelCopy := context2.WithCancel(ctx, "./go/vt/worker/legacy_split_clone.go:479")
processError := func(format string, args ...interface{}) {
scw.wr.Logger().Errorf(format, args...)
mu.Lock()
@@ -605,7 +606,7 @@ func (scw *LegacySplitCloneWorker) copy(ctx context.Context) error {
sourcePositions := make([]string, len(scw.sourceShards))
// get the current position from the sources
for shardIndex := range scw.sourceShards {
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/legacy_split_clone.go:608")
status, err := scw.wr.TabletManagerClient().SlaveStatus(shortCtx, scw.sourceTablets[shardIndex])
cancel()
if err != nil {
diff --git a/go/vt/worker/legacy_split_clone_test.go b/go/vt/worker/legacy_split_clone_test.go
index 1cc578d56..b46dcce97 100644
--- a/go/vt/worker/legacy_split_clone_test.go
+++ b/go/vt/worker/legacy_split_clone_test.go
@@ -28,6 +28,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/fakesqldb"
"vitess.io/vitess/go/sqltypes"
@@ -469,7 +470,7 @@ func TestLegacySplitCloneV2_NoMasterAvailable(t *testing.T) {
statsRetryCounters.ResetAll()
errs := make(chan error, 1)
go func() {
- ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+ ctx, cancel := context2.WithTimeout(context.Background(), 30*time.Second, "./go/vt/worker/legacy_split_clone_test.go:472")
defer (func() { log.Infof("ctx cancel at go/vt/worker/legacy_split_clone_test.go:472\n"); cancel() })()
for {
diff --git a/go/vt/worker/multi_split_diff.go b/go/vt/worker/multi_split_diff.go
index 9a93ca1e1..c43d6a3d4 100644
--- a/go/vt/worker/multi_split_diff.go
+++ b/go/vt/worker/multi_split_diff.go
@@ -24,6 +24,7 @@ import (
"time"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vttablet/queryservice"
"vitess.io/vitess/go/vt/vttablet/tabletconn"
@@ -200,13 +201,13 @@ func (msdw *MultiSplitDiffWorker) init(ctx context.Context) error {
}
var err error
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:203")
msdw.keyspaceInfo, err = msdw.wr.TopoServer().GetKeyspace(shortCtx, msdw.keyspace)
cancel()
if err != nil {
return vterrors.Wrapf(err, "cannot read keyspace %v", msdw.keyspace)
}
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:209")
msdw.shardInfo, err = msdw.wr.TopoServer().GetShard(shortCtx, msdw.keyspace, msdw.shard)
cancel()
if err != nil {
@@ -228,7 +229,7 @@ func (msdw *MultiSplitDiffWorker) init(ctx context.Context) error {
// findDestinationShards finds all the shards that have filtered replication from the source shard
func (msdw *MultiSplitDiffWorker) findDestinationShards(ctx context.Context) ([]*topo.ShardInfo, error) {
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:231")
keyspaces, err := msdw.wr.TopoServer().GetKeyspaces(shortCtx)
cancel()
if err != nil {
@@ -252,7 +253,7 @@ func (msdw *MultiSplitDiffWorker) findDestinationShards(ctx context.Context) ([]
}
func (msdw *MultiSplitDiffWorker) findShardsInKeyspace(ctx context.Context, keyspace string) ([]*topo.ShardInfo, error) {
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:255")
shards, err := msdw.wr.TopoServer().GetShardNames(shortCtx, keyspace)
cancel()
if err != nil {
@@ -284,7 +285,7 @@ func (msdw *MultiSplitDiffWorker) findShardsInKeyspace(ctx context.Context, keys
}
func (msdw *MultiSplitDiffWorker) getShardInfo(ctx context.Context, keyspace string, shard string) (*topo.ShardInfo, uint32, error) {
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:287")
si, err := msdw.wr.TopoServer().GetShard(shortCtx, keyspace, shard)
cancel()
if err != nil {
@@ -349,14 +350,14 @@ func (msdw *MultiSplitDiffWorker) stopVreplicationOnAll(ctx context.Context, tab
tablet := tabletInfo[i].Tablet
msdw.wr.Logger().Infof("stopping master binlog replication on %v", shardInfo.MasterAlias)
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:352")
_, err := msdw.wr.TabletManagerClient().VReplicationExec(shortCtx, tablet, binlogplayer.StopVReplication(msdw.sourceUID, "for split diff"))
cancel()
if err != nil {
return nil, vterrors.Wrapf(err, "VReplicationExec(stop) for %v failed", shardInfo.MasterAlias)
}
wrangler.RecordVReplicationAction(msdw.cleaner, tablet, binlogplayer.StartVReplication(msdw.sourceUID))
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:359")
p3qr, err := msdw.wr.TabletManagerClient().VReplicationExec(shortCtx, tablet, binlogplayer.ReadVReplicationPos(msdw.sourceUID))
cancel()
if err != nil {
@@ -375,7 +376,7 @@ func (msdw *MultiSplitDiffWorker) stopVreplicationOnAll(ctx context.Context, tab
}
func (msdw *MultiSplitDiffWorker) getMasterTabletInfoForShard(ctx context.Context, shardInfo *topo.ShardInfo) (*topo.TabletInfo, error) {
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:378")
masterInfo, err := msdw.wr.TopoServer().GetTablet(shortCtx, shardInfo.MasterAlias)
cancel()
if err != nil {
@@ -389,7 +390,7 @@ func (msdw *MultiSplitDiffWorker) getMasterTabletInfoForShard(ctx context.Contex
// (add a cleanup task to restart binlog replication on the source tablet, and
// change the existing ChangeSlaveType cleanup action to 'spare' type)
func (msdw *MultiSplitDiffWorker) stopReplicationOnSourceTabletAt(ctx context.Context, destVreplicationPos []string) (string, error) {
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:392")
sourceTablet, err := msdw.wr.TopoServer().GetTablet(shortCtx, msdw.sourceAlias)
cancel()
if err != nil {
@@ -406,14 +407,14 @@ func (msdw *MultiSplitDiffWorker) stopReplicationOnSourceTabletAt(ctx context.Co
msdw.wr.Logger().Infof("stopping slave %v at a minimum of %v", msdw.sourceAlias, vreplicationPos)
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:409")
msdw.wr.TabletManagerClient().StartSlave(shortCtx, sourceTablet.Tablet)
cancel()
if err != nil {
return "", err
}
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:416")
mysqlPos, err = msdw.wr.TabletManagerClient().StopSlaveMinimum(shortCtx, sourceTablet.Tablet, vreplicationPos, *remoteActionsTimeout)
cancel()
if err != nil {
@@ -431,21 +432,21 @@ func (msdw *MultiSplitDiffWorker) stopReplicationOnSourceTabletAt(ctx context.Co
// up to the specified source position, and return the destination position.
func (msdw *MultiSplitDiffWorker) stopVreplicationAt(ctx context.Context, shardInfo *topo.ShardInfo, sourcePosition string, masterInfo *topo.TabletInfo) (string, error) {
msdw.wr.Logger().Infof("Restarting master %v until it catches up to %v", shardInfo.MasterAlias, sourcePosition)
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:434")
_, err := msdw.wr.TabletManagerClient().VReplicationExec(shortCtx, masterInfo.Tablet, binlogplayer.StartVReplicationUntil(msdw.sourceUID, sourcePosition))
cancel()
if err != nil {
return "", vterrors.Wrapf(err, "VReplication(start until) for %v until %v failed", shardInfo.MasterAlias, sourcePosition)
}
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:441")
err = msdw.wr.TabletManagerClient().VReplicationWaitForPos(shortCtx, masterInfo.Tablet, int(msdw.sourceUID), sourcePosition)
cancel()
if err != nil {
return "", vterrors.Wrapf(err, "VReplicationWaitForPos for %v until %v failed", shardInfo.MasterAlias, sourcePosition)
}
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:448")
masterPos, err := msdw.wr.TabletManagerClient().MasterPosition(shortCtx, masterInfo.Tablet)
cancel()
if err != nil {
@@ -464,7 +465,7 @@ func (msdw *MultiSplitDiffWorker) stopReplicationAt(ctx context.Context, destina
} else {
msdw.wr.Logger().Infof("waiting for destination tablet %v to catch up to %v", destinationAlias, masterPos)
}
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:467")
destinationTablet, err := msdw.wr.TopoServer().GetTablet(shortCtx, destinationAlias)
cancel()
if err != nil {
@@ -475,7 +476,7 @@ func (msdw *MultiSplitDiffWorker) stopReplicationAt(ctx context.Context, destina
time.Sleep(1 * time.Minute)
}
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:478")
if msdw.waitForFixedTimeRatherThanGtidSet {
err = msdw.wr.TabletManagerClient().StopSlave(shortCtx, destinationTablet.Tablet)
} else {
@@ -493,7 +494,7 @@ func (msdw *MultiSplitDiffWorker) stopReplicationAt(ctx context.Context, destina
// (remove the cleanup task that does the same)
func (msdw *MultiSplitDiffWorker) startVreplication(ctx context.Context, shardInfo *topo.ShardInfo, masterInfo *topo.TabletInfo) error {
msdw.wr.Logger().Infof("restarting filtered replication on master %v", shardInfo.MasterAlias)
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:496")
_, err := msdw.wr.TabletManagerClient().VReplicationExec(shortCtx, masterInfo.Tablet, binlogplayer.StartVReplication(msdw.sourceUID))
if err != nil {
return vterrors.Wrapf(err, "VReplicationExec(start) failed for %v", shardInfo.MasterAlias)
@@ -560,7 +561,7 @@ func (msdw *MultiSplitDiffWorker) synchronizeSrcAndDestTxState(ctx context.Conte
}
}
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:563")
source, _ := msdw.wr.TopoServer().GetTablet(shortCtx, msdw.sourceAlias)
cancel()
@@ -604,7 +605,7 @@ func (msdw *MultiSplitDiffWorker) synchronizeSrcAndDestTxState(ctx context.Conte
return err
}
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:607")
destTabletInfo, err := msdw.wr.TopoServer().GetTablet(shortCtx, destinationAlias)
cancel()
if err != nil {
@@ -652,7 +653,7 @@ func (msdw *MultiSplitDiffWorker) synchronizeSrcAndDestTxState(ctx context.Conte
func (msdw *MultiSplitDiffWorker) waitForDestinationTabletToReach(ctx context.Context, tablet *topodatapb.Tablet, mysqlPos string) error {
for i := 0; i < 20; i++ {
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:655")
pos, err := msdw.wr.TabletManagerClient().MasterPosition(shortCtx, tablet)
cancel()
if err != nil {
@@ -758,7 +759,7 @@ func (msdw *MultiSplitDiffWorker) gatherSchemaInfo(ctx context.Context) ([]*tabl
wg.Add(1)
go func(i int, destinationAlias *topodatapb.TabletAlias) {
var err error
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:761")
destinationSchemaDefinition, err := msdw.wr.GetSchema(
shortCtx, destinationAlias, nil /* tables */, msdw.excludeTables, false /* includeViews */)
cancel()
@@ -773,7 +774,7 @@ func (msdw *MultiSplitDiffWorker) gatherSchemaInfo(ctx context.Context) ([]*tabl
wg.Add(1)
go func() {
var err error
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:776")
sourceSchemaDefinition, err = msdw.wr.GetSchema(
shortCtx, msdw.sourceAlias, nil /* tables */, msdw.excludeTables, false /* includeViews */)
cancel()
@@ -811,7 +812,7 @@ func (msdw *MultiSplitDiffWorker) diffSchemaInformation(ctx context.Context, des
}
func (msdw *MultiSplitDiffWorker) loadVSchema(ctx context.Context) (*vindexes.KeyspaceSchema, error) {
- shortCtx, cancel := context.WithCancel(ctx)
+ shortCtx, cancel := context2.WithCancel(ctx, "./go/vt/worker/multi_split_diff.go:814")
kschema, err := msdw.wr.TopoServer().GetVSchema(shortCtx, msdw.keyspace)
cancel()
if err != nil {
diff --git a/go/vt/worker/multi_split_diff_cmd.go b/go/vt/worker/multi_split_diff_cmd.go
index 5a6b880c5..1c6cb4e42 100644
--- a/go/vt/worker/multi_split_diff_cmd.go
+++ b/go/vt/worker/multi_split_diff_cmd.go
@@ -25,6 +25,7 @@ import (
"strings"
"sync"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
@@ -123,7 +124,7 @@ func commandMultiSplitDiff(wi *Instance, wr *wrangler.Wrangler, subFlags *flag.F
// shardSources returns all the shards that are SourceShards of at least one other shard.
func shardSources(ctx context.Context, wr *wrangler.Wrangler) ([]map[string]string, error) {
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff_cmd.go:126")
keyspaces, err := wr.TopoServer().GetKeyspaces(shortCtx)
cancel()
if err != nil {
@@ -139,7 +140,7 @@ func shardSources(ctx context.Context, wr *wrangler.Wrangler) ([]map[string]stri
wg.Add(1)
go func(keyspace string) {
defer wg.Done()
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff_cmd.go:142")
shards, err := wr.TopoServer().GetShardNames(shortCtx, keyspace)
cancel()
if err != nil {
@@ -150,7 +151,7 @@ func shardSources(ctx context.Context, wr *wrangler.Wrangler) ([]map[string]stri
wg.Add(1)
go func(keyspace, shard string) {
defer wg.Done()
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff_cmd.go:153")
si, err := wr.TopoServer().GetShard(shortCtx, keyspace, shard)
cancel()
if err != nil {
diff --git a/go/vt/worker/restartable_result_reader.go b/go/vt/worker/restartable_result_reader.go
index 04f95695f..995096e96 100644
--- a/go/vt/worker/restartable_result_reader.go
+++ b/go/vt/worker/restartable_result_reader.go
@@ -22,6 +22,7 @@ import (
"strings"
"time"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/vterrors"
"golang.org/x/net/context"
@@ -225,7 +226,7 @@ func (r *RestartableResultReader) Next() (*sqltypes.Result, error) {
func (r *RestartableResultReader) nextWithRetries() (*sqltypes.Result, error) {
// In case of errors we will keep retrying until retryCtx is done.
- retryCtx, retryCancel := context.WithTimeout(r.ctx, *retryDuration)
+ retryCtx, retryCancel := context2.WithTimeout(r.ctx, *retryDuration, "./go/vt/worker/restartable_result_reader.go:228")
defer retryCancel()
// The first retry is the second attempt because we already tried once in Next()
diff --git a/go/vt/worker/split_clone.go b/go/vt/worker/split_clone.go
index 24254fc88..ca112cb48 100644
--- a/go/vt/worker/split_clone.go
+++ b/go/vt/worker/split_clone.go
@@ -25,6 +25,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/event"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
@@ -528,7 +529,7 @@ func (scw *SplitCloneWorker) init(ctx context.Context) error {
scw.setState(WorkerStateInit)
// read the keyspace and validate it
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_clone.go:531")
var err error
scw.destinationKeyspaceInfo, err = scw.wr.TopoServer().GetKeyspace(shortCtx, scw.destinationKeyspace)
cancel()
@@ -578,7 +579,7 @@ func (scw *SplitCloneWorker) init(ctx context.Context) error {
func (scw *SplitCloneWorker) initShardsForHorizontalResharding(ctx context.Context) error {
// find the OverlappingShards in the keyspace
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_clone.go:581")
osList, err := topotools.FindOverlappingShards(shortCtx, scw.wr.TopoServer(), scw.destinationKeyspace)
cancel()
if err != nil {
@@ -635,7 +636,7 @@ func (scw *SplitCloneWorker) initShardsForVerticalSplit(ctx context.Context) err
}
sourceKeyspace := servedFrom
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_clone.go:638")
shardMap, err := scw.wr.TopoServer().FindAllShardsInKeyspace(shortCtx, sourceKeyspace)
cancel()
if err != nil {
@@ -775,7 +776,7 @@ func (scw *SplitCloneWorker) findOfflineSourceTablets(ctx context.Context) error
// get the tablet info for them, and stop their replication
scw.sourceTablets = make([]*topodatapb.Tablet, len(scw.sourceAliases))
for i, alias := range scw.sourceAliases {
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_clone.go:778")
ti, err := scw.wr.TopoServer().GetTablet(shortCtx, alias)
cancel()
if err != nil {
@@ -783,7 +784,7 @@ func (scw *SplitCloneWorker) findOfflineSourceTablets(ctx context.Context) error
}
scw.sourceTablets[i] = ti.Tablet
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_clone.go:786")
err = scw.wr.TabletManagerClient().StopSlave(shortCtx, scw.sourceTablets[i])
cancel()
if err != nil {
@@ -818,7 +819,7 @@ func (scw *SplitCloneWorker) findTransactionalSources(ctx context.Context) error
// get the tablet info
scw.sourceTablets = make([]*topodatapb.Tablet, 1)
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_clone.go:821")
ti, err := scw.wr.TopoServer().GetTablet(shortCtx, scw.sourceAliases[0])
cancel()
if err != nil {
@@ -844,7 +845,7 @@ func (scw *SplitCloneWorker) findDestinationMasters(ctx context.Context) error {
// Make sure we find a master for each destination shard and log it.
scw.wr.Logger().Infof("Finding a MASTER tablet for each destination shard...")
for _, si := range scw.destinationShards {
- waitCtx, waitCancel := context.WithTimeout(ctx, *waitForHealthyTabletsTimeout)
+ waitCtx, waitCancel := context2.WithTimeout(ctx, *waitForHealthyTabletsTimeout, "./go/vt/worker/split_clone.go:847")
err := scw.tsc.WaitForTablets(waitCtx, scw.cell, si.Keyspace(), si.ShardName(), topodatapb.TabletType_MASTER)
waitCancel()
if err != nil {
@@ -1179,7 +1180,7 @@ func (scw *SplitCloneWorker) clone(ctx context.Context, state StatusWorkerState)
var firstError error
- ctx, cancelCopy := context.WithCancel(ctx)
+ ctx, cancelCopy := context2.WithCancel(ctx, "./go/vt/worker/split_clone.go:1182")
defer (func() { log.Infof("ctx cancel at go/vt/worker/split_clone.go:1182\n"); cancelCopy() })()
processError := func(format string, args ...interface{}) {
// in theory we could have two threads see firstError as null and both write to the variable
@@ -1246,7 +1247,7 @@ func (scw *SplitCloneWorker) setUpVReplication(ctx context.Context) error {
sourcePositions[0] = scw.lastPos
} else {
for shardIndex := range scw.sourceShards {
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_clone.go:1249")
status, err := scw.wr.TabletManagerClient().SlaveStatus(shortCtx, scw.sourceTablets[shardIndex])
cancel()
if err != nil {
@@ -1255,7 +1256,7 @@ func (scw *SplitCloneWorker) setUpVReplication(ctx context.Context) error {
sourcePositions[shardIndex] = status.Position
}
}
- cancelableCtx, cancel := context.WithCancel(ctx)
+ cancelableCtx, cancel := context2.WithCancel(ctx, "./go/vt/worker/split_clone.go:1258")
rec := concurrency.AllErrorRecorder{}
handleError := func(e error) {
rec.RecordError(e)
@@ -1320,7 +1321,7 @@ func (scw *SplitCloneWorker) getSourceSchema(ctx context.Context, tablet *topoda
// on all source shards. Furthermore, we estimate the number of rows
// in each source shard for each table to be about the same
// (rowCount is used to estimate an ETA)
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_clone.go:1323")
sourceSchemaDefinition, err := scw.wr.GetSchema(shortCtx, tablet.Alias, scw.tables, scw.excludeTables, false /* includeViews */)
cancel()
if err != nil {
diff --git a/go/vt/worker/split_clone_cmd.go b/go/vt/worker/split_clone_cmd.go
index 4cf6b8795..da7782709 100644
--- a/go/vt/worker/split_clone_cmd.go
+++ b/go/vt/worker/split_clone_cmd.go
@@ -27,6 +27,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/proto/vtrpc"
@@ -151,7 +152,7 @@ func commandSplitClone(wi *Instance, wr *wrangler.Wrangler, subFlags *flag.FlagS
}
func keyspacesWithOverlappingShards(ctx context.Context, wr *wrangler.Wrangler) ([]map[string]string, error) {
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_clone_cmd.go:154")
keyspaces, err := wr.TopoServer().GetKeyspaces(shortCtx)
cancel()
if err != nil {
@@ -166,7 +167,7 @@ func keyspacesWithOverlappingShards(ctx context.Context, wr *wrangler.Wrangler)
wg.Add(1)
go func(keyspace string) {
defer wg.Done()
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_clone_cmd.go:169")
osList, err := topotools.FindOverlappingShards(shortCtx, wr.TopoServer(), keyspace)
cancel()
if err != nil {
diff --git a/go/vt/worker/split_clone_test.go b/go/vt/worker/split_clone_test.go
index 72335157b..c26612013 100644
--- a/go/vt/worker/split_clone_test.go
+++ b/go/vt/worker/split_clone_test.go
@@ -27,6 +27,7 @@ import (
"time"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/fakesqldb"
"vitess.io/vitess/go/sqltypes"
@@ -1026,7 +1027,7 @@ func TestSplitCloneV2_NoMasterAvailable(t *testing.T) {
// late because this Go routine looks at it and can run before the worker.
statsRetryCounters.ResetAll()
go func() {
- ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+ ctx, cancel := context2.WithTimeout(context.Background(), 30*time.Second, "./go/vt/worker/split_clone_test.go:1029")
defer (func() { log.Infof("ctx cancel at go/vt/worker/split_clone_test.go:1029\n"); cancel() })()
for {
diff --git a/go/vt/worker/split_diff.go b/go/vt/worker/split_diff.go
index 4718c731e..25b60dfd2 100644
--- a/go/vt/worker/split_diff.go
+++ b/go/vt/worker/split_diff.go
@@ -21,6 +21,7 @@ import (
"sort"
"sync"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
@@ -188,13 +189,13 @@ func (sdw *SplitDiffWorker) init(ctx context.Context) error {
sdw.SetState(WorkerStateInit)
var err error
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff.go:191")
sdw.keyspaceInfo, err = sdw.wr.TopoServer().GetKeyspace(shortCtx, sdw.keyspace)
cancel()
if err != nil {
return vterrors.Wrapf(err, "cannot read keyspace %v", sdw.keyspace)
}
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff.go:197")
sdw.shardInfo, err = sdw.wr.TopoServer().GetShard(shortCtx, sdw.keyspace, sdw.shard)
cancel()
if err != nil {
@@ -259,7 +260,7 @@ func (sdw *SplitDiffWorker) findTargets(ctx context.Context) error {
// to FindWorkerTablet could attempt to set to DRAIN state the same tablet. Only
// one of these calls to FindWorkerTablet will succeed and the rest will fail.
// The following, makes sures we keep trying to find a worker tablet when this error occur.
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff.go:262")
for {
select {
case <-shortCtx.Done():
@@ -298,7 +299,7 @@ func (sdw *SplitDiffWorker) findTargets(ctx context.Context) error {
func (sdw *SplitDiffWorker) synchronizeReplication(ctx context.Context) error {
sdw.SetState(WorkerStateSyncReplication)
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff.go:301")
defer (func() { log.Infof("ctx cancel at go/vt/worker/split_diff.go:301\n"); cancel() })()
masterInfo, err := sdw.wr.TopoServer().GetTablet(shortCtx, sdw.shardInfo.MasterAlias)
if err != nil {
@@ -307,7 +308,7 @@ func (sdw *SplitDiffWorker) synchronizeReplication(ctx context.Context) error {
// 1 - stop the master binlog replication, get its current position
sdw.wr.Logger().Infof("Stopping master binlog replication on %v", sdw.shardInfo.MasterAlias)
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff.go:310")
defer (func() { log.Infof("ctx cancel at go/vt/worker/split_diff.go:310\n"); cancel() })()
_, err = sdw.wr.TabletManagerClient().VReplicationExec(shortCtx, masterInfo.Tablet, binlogplayer.StopVReplication(sdw.sourceShard.Uid, "for split diff"))
if err != nil {
@@ -332,7 +333,7 @@ func (sdw *SplitDiffWorker) synchronizeReplication(ctx context.Context) error {
return err
}
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff.go:335")
defer (func() { log.Infof("ctx cancel at go/vt/worker/split_diff.go:335\n"); cancel() })()
mysqlPos, err := sdw.wr.TabletManagerClient().StopSlaveMinimum(shortCtx, sourceTablet.Tablet, vreplicationPos, *remoteActionsTimeout)
if err != nil {
@@ -346,7 +347,7 @@ func (sdw *SplitDiffWorker) synchronizeReplication(ctx context.Context) error {
// 3 - ask the master of the destination shard to resume filtered
// replication up to the new list of positions
sdw.wr.Logger().Infof("Restarting master %v until it catches up to %v", sdw.shardInfo.MasterAlias, mysqlPos)
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff.go:349")
defer (func() { log.Infof("ctx cancel at go/vt/worker/split_diff.go:349\n"); cancel() })()
_, err = sdw.wr.TabletManagerClient().VReplicationExec(shortCtx, masterInfo.Tablet, binlogplayer.StartVReplicationUntil(sdw.sourceShard.Uid, mysqlPos))
if err != nil {
@@ -363,13 +364,13 @@ func (sdw *SplitDiffWorker) synchronizeReplication(ctx context.Context) error {
// 4 - wait until the destination tablet is equal or passed
// that master binlog position, and stop its replication.
sdw.wr.Logger().Infof("Waiting for destination tablet %v to catch up to %v", sdw.destinationAlias, masterPos)
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff.go:366")
defer (func() { log.Infof("ctx cancel at go/vt/worker/split_diff.go:366\n"); cancel() })()
destinationTablet, err := sdw.wr.TopoServer().GetTablet(shortCtx, sdw.destinationAlias)
if err != nil {
return err
}
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff.go:372")
defer (func() { log.Infof("ctx cancel at go/vt/worker/split_diff.go:372\n"); cancel() })()
if _, err = sdw.wr.TabletManagerClient().StopSlaveMinimum(shortCtx, destinationTablet.Tablet, masterPos, *remoteActionsTimeout); err != nil {
return vterrors.Wrapf(err, "StopSlaveMinimum for %v at %v failed", sdw.destinationAlias, masterPos)
@@ -378,7 +379,7 @@ func (sdw *SplitDiffWorker) synchronizeReplication(ctx context.Context) error {
// 5 - restart filtered replication on destination master
sdw.wr.Logger().Infof("Restarting filtered replication on master %v", sdw.shardInfo.MasterAlias)
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff.go:381")
defer (func() { log.Infof("ctx cancel at go/vt/worker/split_diff.go:381\n"); cancel() })()
if _, err = sdw.wr.TabletManagerClient().VReplicationExec(shortCtx, masterInfo.Tablet, binlogplayer.StartVReplication(sdw.sourceShard.Uid)); err != nil {
return vterrors.Wrapf(err, "VReplicationExec(start) failed for %v", sdw.shardInfo.MasterAlias)
@@ -401,7 +402,7 @@ func (sdw *SplitDiffWorker) diff(ctx context.Context) error {
wg.Add(1)
go func() {
var err error
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff.go:404")
sdw.destinationSchemaDefinition, err = sdw.wr.GetSchema(
shortCtx, sdw.destinationAlias, nil /* tables */, sdw.excludeTables, false /* includeViews */)
cancel()
@@ -414,7 +415,7 @@ func (sdw *SplitDiffWorker) diff(ctx context.Context) error {
wg.Add(1)
go func() {
var err error
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff.go:417")
sdw.sourceSchemaDefinition, err = sdw.wr.GetSchema(
shortCtx, sdw.sourceAlias, nil /* tables */, sdw.excludeTables, false /* includeViews */)
cancel()
diff --git a/go/vt/worker/split_diff_cmd.go b/go/vt/worker/split_diff_cmd.go
index 5ece6f5dc..bd52efae6 100644
--- a/go/vt/worker/split_diff_cmd.go
+++ b/go/vt/worker/split_diff_cmd.go
@@ -25,6 +25,7 @@ import (
"strings"
"sync"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
@@ -114,7 +115,7 @@ func commandSplitDiff(wi *Instance, wr *wrangler.Wrangler, subFlags *flag.FlagSe
// shardsWithSources returns all the shards that have SourceShards set
// with no Tables list.
func shardsWithSources(ctx context.Context, wr *wrangler.Wrangler) ([]map[string]string, error) {
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff_cmd.go:117")
keyspaces, err := wr.TopoServer().GetKeyspaces(shortCtx)
cancel()
if err != nil {
@@ -129,7 +130,7 @@ func shardsWithSources(ctx context.Context, wr *wrangler.Wrangler) ([]map[string
wg.Add(1)
go func(keyspace string) {
defer wg.Done()
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff_cmd.go:132")
shards, err := wr.TopoServer().GetShardNames(shortCtx, keyspace)
cancel()
if err != nil {
@@ -140,7 +141,7 @@ func shardsWithSources(ctx context.Context, wr *wrangler.Wrangler) ([]map[string
wg.Add(1)
go func(keyspace, shard string) {
defer wg.Done()
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff_cmd.go:143")
si, err := wr.TopoServer().GetShard(shortCtx, keyspace, shard)
cancel()
if err != nil {
diff --git a/go/vt/worker/tablet_provider.go b/go/vt/worker/tablet_provider.go
index 2d42bcb7f..47a7b950b 100644
--- a/go/vt/worker/tablet_provider.go
+++ b/go/vt/worker/tablet_provider.go
@@ -19,6 +19,7 @@ package worker
import (
"fmt"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/vterrors"
"golang.org/x/net/context"
@@ -57,7 +58,7 @@ func newSingleTabletProvider(ctx context.Context, ts *topo.Server, alias *topoda
}
func (p *singleTabletProvider) getTablet() (*topodatapb.Tablet, error) {
- shortCtx, cancel := context.WithTimeout(p.ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(p.ctx, *remoteActionsTimeout, "./go/vt/worker/tablet_provider.go:60")
tablet, err := p.ts.GetTablet(shortCtx, p.alias)
cancel()
if err != nil {
diff --git a/go/vt/worker/topo_utils.go b/go/vt/worker/topo_utils.go
index 4dcc52a1d..aa7b4a182 100644
--- a/go/vt/worker/topo_utils.go
+++ b/go/vt/worker/topo_utils.go
@@ -22,6 +22,7 @@ import (
"math/rand"
"time"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/vterrors"
"golang.org/x/net/context"
@@ -68,7 +69,7 @@ func FindHealthyTablet(ctx context.Context, wr *wrangler.Wrangler, tsc *discover
}
func waitForHealthyTablets(ctx context.Context, wr *wrangler.Wrangler, tsc *discovery.TabletStatsCache, cell, keyspace, shard string, minHealthyRdonlyTablets int, timeout time.Duration, tabletType topodatapb.TabletType) ([]discovery.TabletStats, error) {
- busywaitCtx, busywaitCancel := context.WithTimeout(ctx, timeout)
+ busywaitCtx, busywaitCancel := context2.WithTimeout(ctx, timeout, "./go/vt/worker/topo_utils.go:71")
defer busywaitCancel()
start := time.Now()
@@ -122,7 +123,7 @@ func FindWorkerTablet(ctx context.Context, wr *wrangler.Wrangler, cleaner *wrang
}
wr.Logger().Infof("Changing tablet %v to '%v'", topoproto.TabletAliasString(tabletAlias), topodatapb.TabletType_DRAINED)
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/topo_utils.go:125")
err = wr.ChangeSlaveType(shortCtx, tabletAlias, topodatapb.TabletType_DRAINED)
cancel()
if err != nil {
@@ -131,7 +132,7 @@ func FindWorkerTablet(ctx context.Context, wr *wrangler.Wrangler, cleaner *wrang
ourURL := servenv.ListeningURL.String()
wr.Logger().Infof("Adding tag[worker]=%v to tablet %v", ourURL, topoproto.TabletAliasString(tabletAlias))
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/topo_utils.go:134")
_, err = wr.TopoServer().UpdateTabletFields(shortCtx, tabletAlias, func(tablet *topodatapb.Tablet) error {
if tablet.Tags == nil {
tablet.Tags = make(map[string]string)
@@ -154,7 +155,7 @@ func FindWorkerTablet(ctx context.Context, wr *wrangler.Wrangler, cleaner *wrang
wrangler.RecordChangeSlaveTypeAction(cleaner, tabletAlias, topodatapb.TabletType_DRAINED, tabletType)
// We refresh the destination vttablet reloads the worker URL when it reloads the tablet.
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/topo_utils.go:157")
wr.RefreshTabletState(shortCtx, tabletAlias)
if err != nil {
return nil, err
diff --git a/go/vt/worker/utils_test.go b/go/vt/worker/utils_test.go
index 2c0445299..a5fed7b27 100644
--- a/go/vt/worker/utils_test.go
+++ b/go/vt/worker/utils_test.go
@@ -22,6 +22,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/mysql/fakesqldb"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/log"
@@ -40,7 +41,7 @@ import (
func runCommand(t *testing.T, wi *Instance, wr *wrangler.Wrangler, args []string) error {
// Limit the scope of the context e.g. to implicitly terminate stray Go
// routines.
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/worker/utils_test.go:43")
defer (func() { log.Infof("ctx cancel at go/vt/worker/utils_test.go:43\n"); cancel() })()
worker, done, err := wi.RunCommand(ctx, args, wr, false /* runFromCli */)
diff --git a/go/vt/worker/vertical_split_clone_cmd.go b/go/vt/worker/vertical_split_clone_cmd.go
index 7045fb3e6..0298ca959 100644
--- a/go/vt/worker/vertical_split_clone_cmd.go
+++ b/go/vt/worker/vertical_split_clone_cmd.go
@@ -25,6 +25,7 @@ import (
"strings"
"sync"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/vterrors"
@@ -152,7 +153,7 @@ func commandVerticalSplitClone(wi *Instance, wr *wrangler.Wrangler, subFlags *fl
// keyspacesWithServedFrom returns all the keyspaces that have ServedFrom set
// to one value.
func keyspacesWithServedFrom(ctx context.Context, wr *wrangler.Wrangler) ([]string, error) {
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_clone_cmd.go:155")
keyspaces, err := wr.TopoServer().GetKeyspaces(shortCtx)
cancel()
if err != nil {
@@ -167,7 +168,7 @@ func keyspacesWithServedFrom(ctx context.Context, wr *wrangler.Wrangler) ([]stri
wg.Add(1)
go func(keyspace string) {
defer wg.Done()
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_clone_cmd.go:170")
ki, err := wr.TopoServer().GetKeyspace(shortCtx, keyspace)
cancel()
if err != nil {
@@ -288,7 +289,7 @@ func interactiveVerticalSplitClone(ctx context.Context, wi *Instance, wr *wrangl
}
// Figure out the shard
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_clone_cmd.go:291")
shardMap, err := wr.TopoServer().FindAllShardsInKeyspace(shortCtx, keyspace)
cancel()
if err != nil {
diff --git a/go/vt/worker/vertical_split_diff.go b/go/vt/worker/vertical_split_diff.go
index 3d72c48b6..63868da73 100644
--- a/go/vt/worker/vertical_split_diff.go
+++ b/go/vt/worker/vertical_split_diff.go
@@ -21,6 +21,7 @@ import (
"html/template"
"sync"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/vterrors"
@@ -262,7 +263,7 @@ func (vsdw *VerticalSplitDiffWorker) findTargets(ctx context.Context) error {
func (vsdw *VerticalSplitDiffWorker) synchronizeReplication(ctx context.Context) error {
vsdw.SetState(WorkerStateSyncReplication)
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_diff.go:265")
defer (func() { log.Infof("ctx cancel at go/vt/worker/vertical_split_diff.go:265\n"); cancel() })()
masterInfo, err := vsdw.wr.TopoServer().GetTablet(shortCtx, vsdw.shardInfo.MasterAlias)
if err != nil {
@@ -273,7 +274,7 @@ func (vsdw *VerticalSplitDiffWorker) synchronizeReplication(ctx context.Context)
// 1 - stop the master binlog replication, get its current position
vsdw.wr.Logger().Infof("Stopping master binlog replication on %v", topoproto.TabletAliasString(vsdw.shardInfo.MasterAlias))
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_diff.go:276")
defer (func() { log.Infof("ctx cancel at go/vt/worker/vertical_split_diff.go:276\n"); cancel() })()
_, err = vsdw.wr.TabletManagerClient().VReplicationExec(shortCtx, masterInfo.Tablet, binlogplayer.StopVReplication(ss.Uid, "for split diff"))
if err != nil {
@@ -292,7 +293,7 @@ func (vsdw *VerticalSplitDiffWorker) synchronizeReplication(ctx context.Context)
// stop replication
vsdw.wr.Logger().Infof("Stopping slave %v at a minimum of %v", topoproto.TabletAliasString(vsdw.sourceAlias), vreplicationPos)
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_diff.go:295")
defer (func() { log.Infof("ctx cancel at go/vt/worker/vertical_split_diff.go:295\n"); cancel() })()
sourceTablet, err := vsdw.wr.TopoServer().GetTablet(shortCtx, vsdw.sourceAlias)
if err != nil {
@@ -310,7 +311,7 @@ func (vsdw *VerticalSplitDiffWorker) synchronizeReplication(ctx context.Context)
// 3 - ask the master of the destination shard to resume filtered
// replication up to the new list of positions
vsdw.wr.Logger().Infof("Restarting master %v until it catches up to %v", topoproto.TabletAliasString(vsdw.shardInfo.MasterAlias), mysqlPos)
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_diff.go:313")
defer (func() { log.Infof("ctx cancel at go/vt/worker/vertical_split_diff.go:313\n"); cancel() })()
_, err = vsdw.wr.TabletManagerClient().VReplicationExec(shortCtx, masterInfo.Tablet, binlogplayer.StartVReplicationUntil(ss.Uid, mysqlPos))
if err != nil {
@@ -327,13 +328,13 @@ func (vsdw *VerticalSplitDiffWorker) synchronizeReplication(ctx context.Context)
// 4 - wait until the destination tablet is equal or passed
// that master binlog position, and stop its replication.
vsdw.wr.Logger().Infof("Waiting for destination tablet %v to catch up to %v", topoproto.TabletAliasString(vsdw.destinationAlias), masterPos)
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_diff.go:330")
defer (func() { log.Infof("ctx cancel at go/vt/worker/vertical_split_diff.go:330\n"); cancel() })()
destinationTablet, err := vsdw.wr.TopoServer().GetTablet(shortCtx, vsdw.destinationAlias)
if err != nil {
return err
}
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_diff.go:336")
defer (func() { log.Infof("ctx cancel at go/vt/worker/vertical_split_diff.go:336\n"); cancel() })()
_, err = vsdw.wr.TabletManagerClient().StopSlaveMinimum(shortCtx, destinationTablet.Tablet, masterPos, *remoteActionsTimeout)
if err != nil {
@@ -343,7 +344,7 @@ func (vsdw *VerticalSplitDiffWorker) synchronizeReplication(ctx context.Context)
// 5 - restart filtered replication on destination master
vsdw.wr.Logger().Infof("Restarting filtered replication on master %v", topoproto.TabletAliasString(vsdw.shardInfo.MasterAlias))
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_diff.go:346")
defer (func() { log.Infof("ctx cancel at go/vt/worker/vertical_split_diff.go:346\n"); cancel() })()
if _, err = vsdw.wr.TabletManagerClient().VReplicationExec(shortCtx, masterInfo.Tablet, binlogplayer.StartVReplication(ss.Uid)); err != nil {
return vterrors.Wrapf(err, "VReplicationExec(start) failed for %v", vsdw.shardInfo.MasterAlias)
@@ -366,7 +367,7 @@ func (vsdw *VerticalSplitDiffWorker) diff(ctx context.Context) error {
wg.Add(1)
go func() {
var err error
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_diff.go:369")
vsdw.destinationSchemaDefinition, err = vsdw.wr.GetSchema(
shortCtx, vsdw.destinationAlias, vsdw.shardInfo.SourceShards[0].Tables, nil /* excludeTables */, false /* includeViews */)
cancel()
@@ -379,7 +380,7 @@ func (vsdw *VerticalSplitDiffWorker) diff(ctx context.Context) error {
wg.Add(1)
go func() {
var err error
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_diff.go:382")
vsdw.sourceSchemaDefinition, err = vsdw.wr.GetSchema(
shortCtx, vsdw.sourceAlias, vsdw.shardInfo.SourceShards[0].Tables, nil /* excludeTables */, false /* includeViews */)
cancel()
diff --git a/go/vt/worker/vertical_split_diff_cmd.go b/go/vt/worker/vertical_split_diff_cmd.go
index e4cfd4d34..cc997b9d1 100644
--- a/go/vt/worker/vertical_split_diff_cmd.go
+++ b/go/vt/worker/vertical_split_diff_cmd.go
@@ -24,6 +24,7 @@ import (
"strconv"
"sync"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/vterrors"
"golang.org/x/net/context"
@@ -101,7 +102,7 @@ func commandVerticalSplitDiff(wi *Instance, wr *wrangler.Wrangler, subFlags *fla
// shardsWithTablesSources returns all the shards that have SourceShards set
// to one value, with an array of Tables.
func shardsWithTablesSources(ctx context.Context, wr *wrangler.Wrangler) ([]map[string]string, error) {
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_diff_cmd.go:104")
keyspaces, err := wr.TopoServer().GetKeyspaces(shortCtx)
cancel()
if err != nil {
@@ -116,7 +117,7 @@ func shardsWithTablesSources(ctx context.Context, wr *wrangler.Wrangler) ([]map[
wg.Add(1)
go func(keyspace string) {
defer wg.Done()
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_diff_cmd.go:119")
shards, err := wr.TopoServer().GetShardNames(shortCtx, keyspace)
cancel()
if err != nil {
@@ -127,7 +128,7 @@ func shardsWithTablesSources(ctx context.Context, wr *wrangler.Wrangler) ([]map[
wg.Add(1)
go func(keyspace, shard string) {
defer wg.Done()
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_diff_cmd.go:130")
si, err := wr.TopoServer().GetShard(shortCtx, keyspace, shard)
cancel()
if err != nil {
diff --git a/go/vt/worker/vtworkerclienttest/client_testsuite.go b/go/vt/worker/vtworkerclienttest/client_testsuite.go
index b5ff551d5..1ac4b3bff 100644
--- a/go/vt/worker/vtworkerclienttest/client_testsuite.go
+++ b/go/vt/worker/vtworkerclienttest/client_testsuite.go
@@ -34,6 +34,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/topo/memorytopo"
"vitess.io/vitess/go/vt/vterrors"
@@ -123,7 +124,7 @@ func runVtworkerCommand(client vtworkerclient.Client, args []string) error {
func commandErrorsBecauseBusy(t *testing.T, client vtworkerclient.Client, serverSideCancelation bool) {
// Run the vtworker "Block" command which blocks until we cancel the context.
var wg sync.WaitGroup
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/worker/vtworkerclienttest/client_testsuite.go:126")
// blockCommandStarted will be closed after we're sure that vtworker is
// running the "Block" command.
blockCommandStarted := make(chan struct{})
diff --git a/go/vt/workflow/manager.go b/go/vt/workflow/manager.go
index a75d2aa1b..9cfc16be0 100644
--- a/go/vt/workflow/manager.go
+++ b/go/vt/workflow/manager.go
@@ -26,6 +26,7 @@ import (
gouuid "github.com/pborman/uuid"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"
@@ -352,7 +353,7 @@ func (m *Manager) Start(ctx context.Context, uuid string) error {
func (m *Manager) runWorkflow(rw *runningWorkflow) {
// Create a context to run it.
var ctx context.Context
- ctx, rw.cancel = context.WithCancel(m.ctx)
+ ctx, rw.cancel = context2.WithCancel(m.ctx, "./go/vt/workflow/manager.go:355")
// And run it in the background.
go m.executeWorkflowRun(ctx, rw)
@@ -591,7 +592,7 @@ func AvailableFactories() map[string]bool {
// StartManager starts a manager. This function should only be used for tests purposes.
func StartManager(m *Manager) (*sync.WaitGroup, context.Context, context.CancelFunc) {
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/workflow/manager.go:594")
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
diff --git a/go/vt/wrangler/cleaner.go b/go/vt/wrangler/cleaner.go
index e40b9c08d..837d20a88 100644
--- a/go/vt/wrangler/cleaner.go
+++ b/go/vt/wrangler/cleaner.go
@@ -22,6 +22,7 @@ import (
"time"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
@@ -96,7 +97,7 @@ type cleanUpHelper struct {
// basis. They are then serialized on each target.
func (cleaner *Cleaner) CleanUp(wr *Wrangler) error {
// we use a background context so we're not dependent on the original context timeout
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
+ ctx, cancel := context2.WithTimeout(context.Background(), 5*time.Minute, "./go/vt/wrangler/cleaner.go:99")
actionMap := make(map[string]*cleanUpHelper)
rec := concurrency.AllErrorRecorder{}
cleaner.mu.Lock()
diff --git a/go/vt/wrangler/fake_tablet_test.go b/go/vt/wrangler/fake_tablet_test.go
index 85adabf6c..fc5ada634 100644
--- a/go/vt/wrangler/fake_tablet_test.go
+++ b/go/vt/wrangler/fake_tablet_test.go
@@ -25,6 +25,7 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/mysql/fakesqldb"
"vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon"
querypb "vitess.io/vitess/go/vt/proto/query"
@@ -183,7 +184,7 @@ func (ft *fakeTablet) StartActionLoop(t *testing.T, wr *Wrangler) {
step := 10 * time.Millisecond
c := tmclient.NewTabletManagerClient()
for timeout >= 0 {
- ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
+ ctx, cancel := context2.WithTimeout(context.Background(), 1*time.Second, "./go/vt/wrangler/fake_tablet_test.go:186")
err := c.Ping(ctx, ft.Agent.Tablet())
cancel()
if err == nil {
diff --git a/go/vt/wrangler/keyspace.go b/go/vt/wrangler/keyspace.go
index e94ee2f68..d94420637 100644
--- a/go/vt/wrangler/keyspace.go
+++ b/go/vt/wrangler/keyspace.go
@@ -25,6 +25,7 @@ import (
"time"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/event"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
@@ -496,7 +497,7 @@ func (wr *Wrangler) waitForFilteredReplication(ctx context.Context, sourcePositi
wg.Add(1)
go func(si *topo.ShardInfo) {
defer wg.Done()
- ctx, cancel := context.WithTimeout(ctx, waitTime)
+ ctx, cancel := context2.WithTimeout(ctx, waitTime, "./go/vt/wrangler/keyspace.go:499")
defer (func() { log.Infof("ctx cancel at go/vt/wrangler/keyspace.go:499\n"); cancel() })()
var pos string
@@ -1233,7 +1234,7 @@ func (wr *Wrangler) replicaMigrateServedFrom(ctx context.Context, ki *topo.Keysp
// replication and starts accepting writes
func (wr *Wrangler) masterMigrateServedFrom(ctx context.Context, ki *topo.KeyspaceInfo, sourceShard *topo.ShardInfo, destinationShard *topo.ShardInfo, tables []string, ev *events.MigrateServedFrom, filteredReplicationWaitTime time.Duration) error {
// Read the data we need
- ctx, cancel := context.WithTimeout(ctx, filteredReplicationWaitTime)
+ ctx, cancel := context2.WithTimeout(ctx, filteredReplicationWaitTime, "./go/vt/wrangler/keyspace.go:1236")
defer (func() { log.Infof("ctx cancel at go/vt/wrangler/keyspace.go:1236\n"); cancel() })()
sourceMasterTabletInfo, err := wr.ts.GetTablet(ctx, sourceShard.MasterAlias)
if err != nil {
@@ -1358,7 +1359,7 @@ func (wr *Wrangler) RefreshTabletsByShard(ctx context.Context, si *topo.ShardInf
// Setting an upper bound timeout to fail faster in case of an error.
// Using 60 seconds because RefreshState should not take more than 30 seconds.
// (RefreshState will restart the tablet's QueryService and most time will be spent on the shutdown, i.e. waiting up to 30 seconds on transactions (see Config.TransactionTimeout)).
- ctx, cancel := context.WithTimeout(ctx, 60*time.Second)
+ ctx, cancel := context2.WithTimeout(ctx, 60*time.Second, "./go/vt/wrangler/keyspace.go:1361")
if err := wr.tmc.RefreshState(ctx, ti.Tablet); err != nil {
wr.Logger().Warningf("RefreshTabletsByShard: failed to refresh %v: %v", ti.AliasString(), err)
}
diff --git a/go/vt/wrangler/migrater.go b/go/vt/wrangler/migrater.go
index f0a9cde00..b25252824 100644
--- a/go/vt/wrangler/migrater.go
+++ b/go/vt/wrangler/migrater.go
@@ -28,6 +28,7 @@ import (
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/sync2"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
@@ -506,7 +507,7 @@ func (mi *migrater) changeTableSourceWrites(ctx context.Context, access accessTy
}
func (mi *migrater) waitForCatchup(ctx context.Context, filteredReplicationWaitTime time.Duration) error {
- ctx, cancel := context.WithTimeout(ctx, filteredReplicationWaitTime)
+ ctx, cancel := context2.WithTimeout(ctx, filteredReplicationWaitTime, "./go/vt/wrangler/migrater.go:509")
defer (func() { log.Infof("ctx cancel at go/vt/wrangler/migrater.go:506\n"); cancel() })()
var mu sync.Mutex
diff --git a/go/vt/wrangler/reparent.go b/go/vt/wrangler/reparent.go
index 846e8ea62..70000d31c 100644
--- a/go/vt/wrangler/reparent.go
+++ b/go/vt/wrangler/reparent.go
@@ -26,6 +26,7 @@ import (
"sync"
"time"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/event"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqlescape"
@@ -206,7 +207,7 @@ func (wr *Wrangler) initShardMasterLocked(ctx context.Context, ev *events.Repare
// at a wrong replication spot.
// Create a context for the following RPCs that respects waitSlaveTimeout
- resetCtx, resetCancel := context.WithTimeout(ctx, waitSlaveTimeout)
+ resetCtx, resetCancel := context2.WithTimeout(ctx, waitSlaveTimeout, "./go/vt/wrangler/reparent.go:209")
defer resetCancel()
event.DispatchUpdate(ev, "resetting replication on all tablets")
@@ -249,7 +250,7 @@ func (wr *Wrangler) initShardMasterLocked(ctx context.Context, ev *events.Repare
// Create a cancelable context for the following RPCs.
// If error conditions happen, we can cancel all outgoing RPCs.
- replCtx, replCancel := context.WithTimeout(ctx, waitSlaveTimeout)
+ replCtx, replCancel := context2.WithTimeout(ctx, waitSlaveTimeout, "./go/vt/wrangler/reparent.go:252")
defer replCancel()
// Now tell the new master to insert the reparent_journal row,
@@ -414,7 +415,7 @@ func (wr *Wrangler) plannedReparentShardLocked(ctx context.Context, ev *events.R
ev.OldMaster = *oldMasterTabletInfo.Tablet
// create a new context for the short running remote operations
- remoteCtx, remoteCancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
+ remoteCtx, remoteCancel := context2.WithTimeout(ctx, *topo.RemoteOperationTimeout, "./go/vt/wrangler/reparent.go:417")
defer remoteCancel()
// Demote the current master, get its replication position
@@ -425,7 +426,7 @@ func (wr *Wrangler) plannedReparentShardLocked(ctx context.Context, ev *events.R
return fmt.Errorf("old master tablet %v DemoteMaster failed: %v", topoproto.TabletAliasString(shardInfo.MasterAlias), err)
}
- remoteCtx, remoteCancel = context.WithTimeout(ctx, waitSlaveTimeout)
+ remoteCtx, remoteCancel = context2.WithTimeout(ctx, waitSlaveTimeout, "./go/vt/wrangler/reparent.go:428")
defer remoteCancel()
// Wait on the master-elect tablet until it reaches that position,
@@ -436,7 +437,7 @@ func (wr *Wrangler) plannedReparentShardLocked(ctx context.Context, ev *events.R
if err != nil || (ctx.Err() != nil && ctx.Err() == context.DeadlineExceeded) {
remoteCancel()
// if this fails it is not enough to return an error. we should rollback all the changes made by DemoteMaster
- remoteCtx, remoteCancel = context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
+ remoteCtx, remoteCancel = context2.WithTimeout(ctx, *topo.RemoteOperationTimeout, "./go/vt/wrangler/reparent.go:439")
defer remoteCancel()
if err1 := wr.tmc.UndoDemoteMaster(remoteCtx, oldMasterTabletInfo.Tablet); err1 != nil {
log.Warningf("Encountered error %v while trying to undo DemoteMaster", err1)
@@ -451,7 +452,7 @@ func (wr *Wrangler) plannedReparentShardLocked(ctx context.Context, ev *events.R
// Create a cancelable context for the following RPCs.
// If error conditions happen, we can cancel all outgoing RPCs.
- replCtx, replCancel := context.WithTimeout(ctx, waitSlaveTimeout)
+ replCtx, replCancel := context2.WithTimeout(ctx, waitSlaveTimeout, "./go/vt/wrangler/reparent.go:454")
defer replCancel()
// Go through all the tablets:
@@ -533,7 +534,7 @@ func (maxPosSearch *maxReplPosSearch) processTablet(tablet *topodatapb.Tablet) {
defer maxPosSearch.waitGroup.Done()
maxPosSearch.wrangler.logger.Infof("getting replication position from %v", topoproto.TabletAliasString(tablet.Alias))
- slaveStatusCtx, cancelSlaveStatus := context.WithTimeout(maxPosSearch.ctx, maxPosSearch.waitSlaveTimeout)
+ slaveStatusCtx, cancelSlaveStatus := context2.WithTimeout(maxPosSearch.ctx, maxPosSearch.waitSlaveTimeout, "./go/vt/wrangler/reparent.go:536")
defer (func() { log.Infof("ctx cancel at go/vt/wrangler/reparent.go:537\n"); cancelSlaveStatus() })()
status, err := maxPosSearch.wrangler.tmc.SlaveStatus(slaveStatusCtx, tablet)
@@ -668,7 +669,7 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events
ev.OldMaster = *oldMasterTabletInfo.Tablet
wr.logger.Infof("deleting old master %v", shardInfoMasterAliasStr)
- ctx, cancel := context.WithTimeout(ctx, waitSlaveTimeout)
+ ctx, cancel := context2.WithTimeout(ctx, waitSlaveTimeout, "./go/vt/wrangler/reparent.go:671")
defer (func() { log.Infof("ctx cancel at go/vt/wrangler/reparent.go:672\n"); cancel() })()
if err := topotools.DeleteTablet(ctx, wr.ts, oldMasterTabletInfo.Tablet); err != nil {
@@ -688,7 +689,7 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events
go func(alias string, tabletInfo *topo.TabletInfo) {
defer wg.Done()
wr.logger.Infof("getting replication position from %v", alias)
- ctx, cancel := context.WithTimeout(ctx, waitSlaveTimeout)
+ ctx, cancel := context2.WithTimeout(ctx, waitSlaveTimeout, "./go/vt/wrangler/reparent.go:691")
defer (func() { log.Infof("ctx cancel at go/vt/wrangler/reparent.go:692\n"); cancel() })()
rp, err := wr.tmc.StopReplicationAndGetStatus(ctx, tabletInfo.Tablet)
if err != nil {
@@ -744,7 +745,7 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events
// Create a cancelable context for the following RPCs.
// If error conditions happen, we can cancel all outgoing RPCs.
- replCtx, replCancel := context.WithCancel(ctx)
+ replCtx, replCancel := context2.WithCancel(ctx, "./go/vt/wrangler/reparent.go:747")
defer replCancel()
// Reset replication on all slaves to point to the new master, and
diff --git a/go/vt/wrangler/schema.go b/go/vt/wrangler/schema.go
index f209b1e50..261f40b0f 100644
--- a/go/vt/wrangler/schema.go
+++ b/go/vt/wrangler/schema.go
@@ -26,6 +26,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/sync2"
"vitess.io/vitess/go/vt/concurrency"
@@ -359,7 +360,7 @@ func (wr *Wrangler) CopySchemaShard(ctx context.Context, sourceTabletAlias *topo
// Notify slaves to reload schema. This is best-effort.
concurrency := sync2.NewSemaphore(10, 0)
- reloadCtx, cancel := context.WithTimeout(ctx, waitSlaveTimeout)
+ reloadCtx, cancel := context2.WithTimeout(ctx, waitSlaveTimeout, "./go/vt/wrangler/schema.go:362")
defer (func() { log.Infof("ctx cancel at go/vt/wrangler/schema.go:363\n"); cancel() })()
wr.ReloadSchemaShard(reloadCtx, destKeyspace, destShard, destMasterPos, concurrency, true /* includeMaster */)
return nil
@@ -435,7 +436,7 @@ func (wr *Wrangler) applySQLShard(ctx context.Context, tabletInfo *topo.TabletIn
if err != nil {
return fmt.Errorf("fillStringTemplate failed: %v", err)
}
- ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
+ ctx, cancel := context2.WithTimeout(ctx, 30*time.Second, "./go/vt/wrangler/schema.go:438")
defer (func() { log.Infof("ctx cancel at go/vt/wrangler/schema.go:439\n"); cancel() })()
// Need to make sure that we enable binlog, since we're only applying the statement on masters.
_, err = wr.tmc.ExecuteFetchAsDba(ctx, tabletInfo.Tablet, false, []byte(filledChange), 0, false, reloadSchema)
diff --git a/go/vt/wrangler/testlib/fake_tablet.go b/go/vt/wrangler/testlib/fake_tablet.go
index 75766e27d..557ee2ad0 100644
--- a/go/vt/wrangler/testlib/fake_tablet.go
+++ b/go/vt/wrangler/testlib/fake_tablet.go
@@ -30,6 +30,7 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/mysql/fakesqldb"
"vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon"
"vitess.io/vitess/go/vt/topo"
@@ -206,7 +207,7 @@ func (ft *FakeTablet) StartActionLoop(t *testing.T, wr *wrangler.Wrangler) {
step := 10 * time.Millisecond
c := tmclient.NewTabletManagerClient()
for timeout >= 0 {
- ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
+ ctx, cancel := context2.WithTimeout(context.Background(), 1*time.Second, "./go/vt/wrangler/testlib/fake_tablet.go:209")
err := c.Ping(ctx, ft.Agent.Tablet())
cancel()
if err == nil {
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment