Skip to content

Instantly share code, notes, and snippets.

@dasl-
Created September 11, 2019 22:36
Show Gist options
  • Save dasl-/7848acc57d765beccbbbfaea1cda9b3c to your computer and use it in GitHub Desktop.
Save dasl-/7848acc57d765beccbbbfaea1cda9b3c to your computer and use it in GitHub Desktop.
diff --git a/go/cmd/mysqlctl/mysqlctl.go b/go/cmd/mysqlctl/mysqlctl.go
index 2f39206a0..bf051d7e4 100644
--- a/go/cmd/mysqlctl/mysqlctl.go
+++ b/go/cmd/mysqlctl/mysqlctl.go
@@ -25,6 +25,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/exit"
"vitess.io/vitess/go/flagutil"
"vitess.io/vitess/go/mysql"
@@ -71,7 +72,7 @@ func initCmd(subFlags *flag.FlagSet, args []string) error {
}
defer mysqld.Close()
- ctx, cancel := context.WithTimeout(context.Background(), *waitTime)
+ ctx, cancel := context2.WithTimeout(context.Background(), *waitTime, "./go/cmd/mysqlctl/mysqlctl.go:74")
defer cancel()
if err := mysqld.Init(ctx, cnf, *initDBSQLFile); err != nil {
return fmt.Errorf("failed init mysql: %v", err)
@@ -104,7 +105,7 @@ func shutdownCmd(subFlags *flag.FlagSet, args []string) error {
}
defer mysqld.Close()
- ctx, cancel := context.WithTimeout(context.Background(), *waitTime)
+ ctx, cancel := context2.WithTimeout(context.Background(), *waitTime, "./go/cmd/mysqlctl/mysqlctl.go:107")
defer cancel()
if err := mysqld.Shutdown(ctx, cnf, true); err != nil {
return fmt.Errorf("failed shutdown mysql: %v", err)
@@ -125,7 +126,7 @@ func startCmd(subFlags *flag.FlagSet, args []string) error {
}
defer mysqld.Close()
- ctx, cancel := context.WithTimeout(context.Background(), *waitTime)
+ ctx, cancel := context2.WithTimeout(context.Background(), *waitTime, "./go/cmd/mysqlctl/mysqlctl.go:128")
defer cancel()
if err := mysqld.Start(ctx, cnf, mysqldArgs...); err != nil {
return fmt.Errorf("failed start mysql: %v", err)
@@ -145,7 +146,7 @@ func teardownCmd(subFlags *flag.FlagSet, args []string) error {
}
defer mysqld.Close()
- ctx, cancel := context.WithTimeout(context.Background(), *waitTime)
+ ctx, cancel := context2.WithTimeout(context.Background(), *waitTime, "./go/cmd/mysqlctl/mysqlctl.go:148")
defer cancel()
if err := mysqld.Teardown(ctx, cnf, *force); err != nil {
return fmt.Errorf("failed teardown mysql (forced? %v): %v", *force, err)
diff --git a/go/cmd/mysqlctld/mysqlctld.go b/go/cmd/mysqlctld/mysqlctld.go
index 5c17ea813..8cafcb6fd 100644
--- a/go/cmd/mysqlctld/mysqlctld.go
+++ b/go/cmd/mysqlctld/mysqlctld.go
@@ -25,6 +25,7 @@ import (
"time"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/exit"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/log"
@@ -68,7 +69,7 @@ func main() {
}
// Start or Init mysqld as needed.
- ctx, cancel := context.WithTimeout(context.Background(), *waitTime)
+ ctx, cancel := context2.WithTimeout(context.Background(), *waitTime, "./go/cmd/mysqlctld/mysqlctld.go:71")
mycnfFile := mysqlctl.MycnfFile(uint32(*tabletUID))
if _, statErr := os.Stat(mycnfFile); os.IsNotExist(statErr) {
// Generate my.cnf from scratch and use it to find mysqld.
diff --git a/go/cmd/vtbackup/vtbackup.go b/go/cmd/vtbackup/vtbackup.go
index 64ef47a96..7c8089105 100644
--- a/go/cmd/vtbackup/vtbackup.go
+++ b/go/cmd/vtbackup/vtbackup.go
@@ -68,6 +68,7 @@ import (
"strings"
"time"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/exit"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqlescape"
@@ -200,7 +201,7 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back
if err != nil {
return fmt.Errorf("failed to initialize mysql config: %v", err)
}
- initCtx, initCancel := context.WithTimeout(ctx, *mysqlTimeout)
+ initCtx, initCancel := context2.WithTimeout(ctx, *mysqlTimeout, "./go/cmd/vtbackup/vtbackup.go:203")
defer initCancel()
if err := mysqld.Init(initCtx, mycnf, *initDBSQLFile); err != nil {
return fmt.Errorf("failed to initialize mysql data dir and start mysqld: %v", err)
@@ -209,7 +210,7 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back
defer func() {
// Be careful not to use the original context, because we don't want to
// skip shutdown just because we timed out waiting for other things.
- ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+ ctx, cancel := context2.WithTimeout(context.Background(), 30*time.Second, "./go/cmd/vtbackup/vtbackup.go:212")
defer cancel()
mysqld.Shutdown(ctx, mycnf, false)
}()
diff --git a/go/cmd/vtbench/vtbench.go b/go/cmd/vtbench/vtbench.go
index 348299542..a7a548bb9 100644
--- a/go/cmd/vtbench/vtbench.go
+++ b/go/cmd/vtbench/vtbench.go
@@ -23,6 +23,7 @@ import (
"strings"
"time"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/exit"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/log"
@@ -156,7 +157,7 @@ func main() {
b := vtbench.NewBench(*threads, *count, connParams, *sql)
- ctx, cancel := context.WithTimeout(context.Background(), *deadline)
+ ctx, cancel := context2.WithTimeout(context.Background(), *deadline, "./go/cmd/vtbench/vtbench.go:159")
defer cancel()
fmt.Printf("Initializing test with %s protocol / %d threads / %d iterations\n",
diff --git a/go/cmd/vtclient/vtclient.go b/go/cmd/vtclient/vtclient.go
index 87c6c4dc4..d89c3e3c1 100644
--- a/go/cmd/vtclient/vtclient.go
+++ b/go/cmd/vtclient/vtclient.go
@@ -30,6 +30,7 @@ import (
"time"
"github.com/olekukonko/tablewriter"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
@@ -183,7 +184,7 @@ func run() (*results, error) {
log.Infof("Sending the query...")
- ctx, cancel := context.WithTimeout(context.Background(), *timeout)
+ ctx, cancel := context2.WithTimeout(context.Background(), *timeout, "./go/cmd/vtclient/vtclient.go:186")
defer cancel()
return execMulti(ctx, db, args[0])
}
diff --git a/go/cmd/vtctl/vtctl.go b/go/cmd/vtctl/vtctl.go
index 8365b9c0a..b8ebf5b4e 100644
--- a/go/cmd/vtctl/vtctl.go
+++ b/go/cmd/vtctl/vtctl.go
@@ -27,6 +27,7 @@ import (
"time"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/exit"
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/log"
@@ -91,7 +92,7 @@ func main() {
vtctl.WorkflowManager = workflow.NewManager(ts)
- ctx, cancel := context.WithTimeout(context.Background(), *waitTime)
+ ctx, cancel := context2.WithTimeout(context.Background(), *waitTime, "./go/cmd/vtctl/vtctl.go:94")
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient())
installSignalHandlers(cancel)
diff --git a/go/cmd/vtctlclient/main.go b/go/cmd/vtctlclient/main.go
index f93696326..2279561b8 100644
--- a/go/cmd/vtctlclient/main.go
+++ b/go/cmd/vtctlclient/main.go
@@ -23,6 +23,7 @@ import (
"time"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/exit"
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/log"
@@ -55,7 +56,7 @@ func main() {
os.Exit(1)
}
- ctx, cancel := context.WithTimeout(context.Background(), *actionTimeout)
+ ctx, cancel := context2.WithTimeout(context.Background(), *actionTimeout, "./go/cmd/vtctlclient/main.go:58")
defer cancel()
err := vtctlclient.RunCommandAndWait(
diff --git a/go/cmd/vtgateclienttest/services/echo.go b/go/cmd/vtgateclienttest/services/echo.go
index ce7604124..6435324ea 100644
--- a/go/cmd/vtgateclienttest/services/echo.go
+++ b/go/cmd/vtgateclienttest/services/echo.go
@@ -401,23 +401,23 @@ func (c *echoClient) VStream(ctx context.Context, tabletType topodatapb.TabletTy
if strings.HasPrefix(vgtid.ShardGtids[0].Shard, EchoPrefix) {
_ = callback([]*binlogdatapb.VEvent{
{
- Type: 1,
+ Type: 1,
Timestamp: 1234,
- Gtid: "echo-gtid-1",
- Ddl: "echo-ddl-1",
- Vgtid: vgtid,
+ Gtid: "echo-gtid-1",
+ Ddl: "echo-ddl-1",
+ Vgtid: vgtid,
RowEvent: &binlogdatapb.RowEvent{
- TableName:"echo-table-1",
+ TableName: "echo-table-1",
},
},
{
- Type: 2,
+ Type: 2,
Timestamp: 4321,
- Gtid: "echo-gtid-2",
- Ddl: "echo-ddl-2",
- Vgtid: vgtid,
+ Gtid: "echo-gtid-2",
+ Ddl: "echo-ddl-2",
+ Vgtid: vgtid,
FieldEvent: &binlogdatapb.FieldEvent{
- TableName:"echo-table-2",
+ TableName: "echo-table-2",
},
},
})
diff --git a/go/cmd/vtqueryserver/plugin_grpcqueryservice.go b/go/cmd/vtqueryserver/plugin_grpcqueryservice.go
index 60478f80a..b16cb0420 100644
--- a/go/cmd/vtqueryserver/plugin_grpcqueryservice.go
+++ b/go/cmd/vtqueryserver/plugin_grpcqueryservice.go
@@ -24,7 +24,6 @@ import (
"vitess.io/vitess/go/vt/vttablet/tabletserver"
)
-
func init() {
tabletserver.RegisterFunctions = append(tabletserver.RegisterFunctions, func(qsc tabletserver.Controller) {
if servenv.GRPCCheckServiceMap("queryservice") {
diff --git a/go/cmd/vtworkerclient/vtworkerclient.go b/go/cmd/vtworkerclient/vtworkerclient.go
index 0e1874453..59f434c9a 100644
--- a/go/cmd/vtworkerclient/vtworkerclient.go
+++ b/go/cmd/vtworkerclient/vtworkerclient.go
@@ -23,6 +23,7 @@ import (
"syscall"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/worker/vtworkerclient"
@@ -37,7 +38,7 @@ var (
func main() {
flag.Parse()
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/cmd/vtworkerclient/vtworkerclient.go:40")
defer cancel()
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT)
diff --git a/go/cmd/zk/zkcmd.go b/go/cmd/zk/zkcmd.go
index c6cf45e6b..31663c0c0 100644
--- a/go/cmd/zk/zkcmd.go
+++ b/go/cmd/zk/zkcmd.go
@@ -36,6 +36,7 @@ import (
"golang.org/x/crypto/ssh/terminal"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/exit"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
@@ -157,7 +158,7 @@ func main() {
subFlags := flag.NewFlagSet(cmdName, flag.ExitOnError)
// Create a context for the command, cancel it if we get a signal.
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/cmd/zk/zkcmd.go:160")
sigRecv := make(chan os.Signal, 1)
signal.Notify(sigRecv, os.Interrupt)
go func() {
diff --git a/go/context2/context.go b/go/context2/context.go
new file mode 100644
index 000000000..b4a03a28d
--- /dev/null
+++ b/go/context2/context.go
@@ -0,0 +1,40 @@
+package context2
+
+import (
+ "context"
+ "time"
+
+ "vitess.io/vitess/go/context2"
+ "vitess.io/vitess/go/vt/log"
+)
+
+type Context2 struct {
+ context.Context
+ reason string
+}
+
+func WithCancel(parent context.Context, reason string) (context.Context, context.CancelFunc) {
+ ctx, cancel := context2.WithCancel(parent, "./go/context2/context.go:15")
+ ctx2 := Context2{ctx, reason}
+ return ctx2, cancel
+}
+
+func WithDeadline(parent context.Context, d time.Time, reason string) (context.Context, context.CancelFunc) {
+ ctx, cancel := context2.WithDeadline(parent, d, "./go/context2/context.go:21")
+ ctx2 := Context2{ctx, reason}
+ return ctx2, cancel
+}
+
+func WithTimeout(parent context.Context, timeout time.Duration, reason string) (context.Context, context.CancelFunc) {
+ ctx, cancel := context2.WithTimeout(parent, timeout, "./go/context2/context.go:27")
+ ctx2 := Context2{ctx, reason}
+ return ctx2, cancel
+}
+
+func (c Context2) Err() error {
+ err := c.Context.Err()
+ if err != nil {
+ log.Infof("Context errored due to: %s", c.reason)
+ }
+ return err
+}
diff --git a/go/context2/context.go-E b/go/context2/context.go-E
new file mode 100644
index 000000000..63e15d80e
--- /dev/null
+++ b/go/context2/context.go-E
@@ -0,0 +1,38 @@
+package context2
+
+import (
+ "context"
+ "time"
+ "vitess.io/vitess/go/vt/log"
+)
+
+type Context2 struct {
+ context.Context
+ reason string
+}
+
+func WithCancel(parent context.Context, reason string) (context.Context, context.CancelFunc) {
+ ctx, cancel := context.WithCancel(parent)
+ ctx2 := Context2{ctx, reason}
+ return ctx2, cancel
+}
+
+func WithDeadline(parent context.Context, d time.Time, reason string) (context.Context, context.CancelFunc) {
+ ctx, cancel := context.WithDeadline(parent, d)
+ ctx2 := Context2{ctx, reason}
+ return ctx2, cancel
+}
+
+func WithTimeout(parent context.Context, timeout time.Duration, reason string) (context.Context, context.CancelFunc) {
+ ctx, cancel := context.WithTimeout(parent, timeout)
+ ctx2 := Context2{ctx, reason}
+ return ctx2, cancel
+}
+
+func (c Context2) Err() error {
+ err := c.Context.Err()
+ if err != nil {
+ log.Infof("Context errored due to: %s", c.reason)
+ }
+ return err
+}
diff --git a/go/mysql/client_test.go b/go/mysql/client_test.go
index 13d25073e..3fa8aea13 100644
--- a/go/mysql/client_test.go
+++ b/go/mysql/client_test.go
@@ -26,6 +26,7 @@ import (
"time"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
)
// assertSQLError makes sure we get the right error.
@@ -71,7 +72,7 @@ func TestConnectTimeout(t *testing.T) {
defer listener.Close()
// Test that canceling the context really interrupts the Connect.
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/mysql/client_test.go:74")
done := make(chan struct{})
go func() {
_, err := Connect(ctx, params)
@@ -85,7 +86,7 @@ func TestConnectTimeout(t *testing.T) {
<-done
// Tests a connection timeout works.
- ctx, cancel = context.WithTimeout(context.Background(), 100*time.Millisecond)
+ ctx, cancel = context2.WithTimeout(context.Background(), 100*time.Millisecond, "./go/mysql/client_test.go:88")
_, err = Connect(ctx, params)
cancel()
if err != context.DeadlineExceeded {
diff --git a/go/mysql/server_test.go b/go/mysql/server_test.go
index 49456128c..71d4e1199 100644
--- a/go/mysql/server_test.go
+++ b/go/mysql/server_test.go
@@ -29,6 +29,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/sqltypes"
vtenv "vitess.io/vitess/go/vt/env"
"vitess.io/vitess/go/vt/tlstest"
@@ -1246,7 +1247,7 @@ func TestListenerShutdown(t *testing.T) {
Pass: "password1",
}
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/mysql/server_test.go:1249")
defer cancel()
conn, err := Connect(ctx, params)
diff --git a/go/pools/resource_pool.go b/go/pools/resource_pool.go
index 8e92f0bf5..87c9caf09 100644
--- a/go/pools/resource_pool.go
+++ b/go/pools/resource_pool.go
@@ -26,6 +26,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/sync2"
"vitess.io/vitess/go/timer"
"vitess.io/vitess/go/trace"
@@ -100,7 +101,7 @@ func NewResourcePool(factory Factory, capacity, maxCap int, idleTimeout time.Dur
rp.resources <- resourceWrapper{}
}
- ctx, cancel := context.WithTimeout(context.TODO(), prefillTimeout)
+ ctx, cancel := context2.WithTimeout(context.TODO(), prefillTimeout, "./go/pools/resource_pool.go:103")
defer cancel()
if prefillParallelism != 0 {
sem := sync2.NewSemaphore(prefillParallelism, 0 /* timeout */)
diff --git a/go/pools/resource_pool_test.go b/go/pools/resource_pool_test.go
index 15c21c920..af8c80f9d 100644
--- a/go/pools/resource_pool_test.go
+++ b/go/pools/resource_pool_test.go
@@ -22,6 +22,7 @@ import (
"time"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/sync2"
)
@@ -618,7 +619,7 @@ func TestTimeout(t *testing.T) {
if err != nil {
t.Fatal(err)
}
- newctx, cancel := context.WithTimeout(ctx, 1*time.Millisecond)
+ newctx, cancel := context2.WithTimeout(ctx, 1*time.Millisecond, "./go/pools/resource_pool_test.go:621")
_, err = p.Get(newctx)
cancel()
want := "resource pool timed out"
@@ -633,7 +634,7 @@ func TestExpired(t *testing.T) {
count.Set(0)
p := NewResourcePool(PoolFactory, 1, 1, time.Second, 0)
defer p.Close()
- ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(-1*time.Second))
+ ctx, cancel := context2.WithDeadline(context.Background(), time.Now().Add(-1*time.Second), "./go/pools/resource_pool_test.go:636")
r, err := p.Get(ctx)
if err == nil {
p.Put(r)
diff --git a/go/trace/opentracing.go b/go/trace/opentracing.go
index 4413d808a..77f16c530 100644
--- a/go/trace/opentracing.go
+++ b/go/trace/opentracing.go
@@ -17,7 +17,6 @@ package trace
import (
otgrpc "github.com/opentracing-contrib/go-grpc"
- "github.com/opentracing/opentracing-go"
"golang.org/x/net/context"
"google.golang.org/grpc"
"vitess.io/vitess/go/vt/vterrors"
diff --git a/go/trace/plugin_jaeger.go b/go/trace/plugin_jaeger.go
index b8053a4e2..97ee7794a 100644
--- a/go/trace/plugin_jaeger.go
+++ b/go/trace/plugin_jaeger.go
@@ -19,8 +19,6 @@ import (
"flag"
"io"
- "github.com/opentracing/opentracing-go"
- "github.com/uber/jaeger-client-go"
"github.com/uber/jaeger-client-go/config"
"vitess.io/vitess/go/vt/log"
)
diff --git a/go/vt/binlog/binlog_streamer_test.go b/go/vt/binlog/binlog_streamer_test.go
index 576e7b562..9a10fb38d 100644
--- a/go/vt/binlog/binlog_streamer_test.go
+++ b/go/vt/binlog/binlog_streamer_test.go
@@ -26,6 +26,7 @@ import (
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/mysql"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
@@ -193,7 +194,7 @@ func TestStreamerStop(t *testing.T) {
bls := NewStreamer(&mysql.ConnParams{DbName: "vt_test_keyspace"}, nil, nil, mysql.Position{}, 0, sendTransaction)
// Start parseEvents(), but don't send it anything, so it just waits.
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/binlog/binlog_streamer_test.go:196")
done := make(chan error)
go func() {
_, err := bls.parseEvents(ctx, events)
diff --git a/go/vt/binlog/binlogplayer/binlog_player_test.go b/go/vt/binlog/binlogplayer/binlog_player_test.go
index dedcfcc08..f95aa2c0c 100644
--- a/go/vt/binlog/binlogplayer/binlog_player_test.go
+++ b/go/vt/binlog/binlogplayer/binlog_player_test.go
@@ -22,6 +22,7 @@ import (
"time"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/throttler"
@@ -306,7 +307,7 @@ func TestRetryOnDeadlock(t *testing.T) {
// has finished. Otherwise, it may cause race with other tests.
func applyEvents(blp *BinlogPlayer) func() error {
errChan := make(chan error)
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/binlog/binlogplayer/binlog_player_test.go:309")
go func() {
errChan <- blp.ApplyBinlogEvents(ctx)
diff --git a/go/vt/binlog/slave_connection.go b/go/vt/binlog/slave_connection.go
index c44d6e87f..a8d815aed 100644
--- a/go/vt/binlog/slave_connection.go
+++ b/go/vt/binlog/slave_connection.go
@@ -22,6 +22,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/pools"
"vitess.io/vitess/go/vt/dbconfigs"
@@ -96,7 +97,7 @@ var slaveIDPool = pools.NewIDPool()
// StartBinlogDumpFromCurrent requests a replication binlog dump from
// the current position.
func (sc *SlaveConnection) StartBinlogDumpFromCurrent(ctx context.Context) (mysql.Position, <-chan mysql.BinlogEvent, error) {
- ctx, sc.cancel = context.WithCancel(ctx)
+ ctx, sc.cancel = context2.WithCancel(ctx, "./go/vt/binlog/slave_connection.go:99")
masterPosition, err := sc.Conn.MasterPosition()
if err != nil {
@@ -116,7 +117,7 @@ func (sc *SlaveConnection) StartBinlogDumpFromCurrent(ctx context.Context) (mysq
//
// Note the context is valid and used until eventChan is closed.
func (sc *SlaveConnection) StartBinlogDumpFromPosition(ctx context.Context, startPos mysql.Position) (<-chan mysql.BinlogEvent, error) {
- ctx, sc.cancel = context.WithCancel(ctx)
+ ctx, sc.cancel = context2.WithCancel(ctx, "./go/vt/binlog/slave_connection.go:119")
log.Infof("sending binlog dump command: startPos=%v, slaveID=%v", startPos, sc.slaveID)
if err := sc.SendBinlogDumpCommand(sc.slaveID, startPos); err != nil {
@@ -192,7 +193,7 @@ func (sc *SlaveConnection) streamEvents(ctx context.Context) chan mysql.BinlogEv
//
// Note the context is valid and used until eventChan is closed.
func (sc *SlaveConnection) StartBinlogDumpFromBinlogBeforeTimestamp(ctx context.Context, timestamp int64) (<-chan mysql.BinlogEvent, error) {
- ctx, sc.cancel = context.WithCancel(ctx)
+ ctx, sc.cancel = context2.WithCancel(ctx, "./go/vt/binlog/slave_connection.go:195")
filename, err := sc.findFileBeforeTimestamp(ctx, timestamp)
if err != nil {
diff --git a/go/vt/binlog/updatestreamctl.go b/go/vt/binlog/updatestreamctl.go
index 3525c28a2..faea9bd5f 100644
--- a/go/vt/binlog/updatestreamctl.go
+++ b/go/vt/binlog/updatestreamctl.go
@@ -22,6 +22,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/sync2"
@@ -266,7 +267,7 @@ func (updateStream *UpdateStreamImpl) StreamKeyRange(ctx context.Context, positi
return fmt.Errorf("newKeyspaceIDResolverFactory failed: %v", err)
}
- streamCtx, cancel := context.WithCancel(ctx)
+ streamCtx, cancel := context2.WithCancel(ctx, "./go/vt/binlog/updatestreamctl.go:269")
i := updateStream.streams.Add(cancel)
defer updateStream.streams.Delete(i)
@@ -302,7 +303,7 @@ func (updateStream *UpdateStreamImpl) StreamTables(ctx context.Context, position
})
bls := NewStreamer(updateStream.cp, updateStream.se, charset, pos, 0, f)
- streamCtx, cancel := context.WithCancel(ctx)
+ streamCtx, cancel := context2.WithCancel(ctx, "./go/vt/binlog/updatestreamctl.go:305")
i := updateStream.streams.Add(cancel)
defer updateStream.streams.Delete(i)
diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go
index 2e5e527b9..40a5d4e79 100644
--- a/go/vt/discovery/healthcheck.go
+++ b/go/vt/discovery/healthcheck.go
@@ -51,6 +51,7 @@ import (
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/netutil"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/sync2"
@@ -495,7 +496,7 @@ func (hc *HealthCheckImpl) finalizeConn(hcc *healthCheckConn) {
if hcc.conn != nil {
// Don't use hcc.ctx because it's already closed.
// Use a separate context, and add a timeout to prevent unbounded waits.
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ ctx, cancel := context2.WithTimeout(context.Background(), 10*time.Second, "./go/vt/discovery/healthcheck.go:498")
defer cancel()
hcc.conn.Close(ctx)
hcc.conn = nil
@@ -513,7 +514,7 @@ func (hc *HealthCheckImpl) checkConn(hcc *healthCheckConn, name string) {
retryDelay := hc.retryDelay
for {
- streamCtx, streamCancel := context.WithCancel(hcc.ctx)
+ streamCtx, streamCancel := context2.WithCancel(hcc.ctx, "./go/vt/discovery/healthcheck.go:516")
// Setup a watcher that restarts the timer every time an update is received.
// If a timeout occurs for a serving tablet, we make it non-serving and send
@@ -715,7 +716,7 @@ func (hc *HealthCheckImpl) SetListener(listener HealthCheckStatsListener, sendDo
// It does not block on making connection.
// name is an optional tag for the tablet, e.g. an alternative address.
func (hc *HealthCheckImpl) AddTablet(tablet *topodatapb.Tablet, name string) {
- ctx, cancelFunc := context.WithCancel(context.Background())
+ ctx, cancelFunc := context2.WithCancel(context.Background(), "./go/vt/discovery/healthcheck.go:718")
key := TabletToMapKey(tablet)
hcc := &healthCheckConn{
ctx: ctx,
diff --git a/go/vt/discovery/tablet_stats_cache_wait_test.go b/go/vt/discovery/tablet_stats_cache_wait_test.go
index 19933494f..cffab6b4d 100644
--- a/go/vt/discovery/tablet_stats_cache_wait_test.go
+++ b/go/vt/discovery/tablet_stats_cache_wait_test.go
@@ -22,6 +22,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/topo"
querypb "vitess.io/vitess/go/vt/proto/query"
@@ -29,7 +30,7 @@ import (
)
func TestWaitForTablets(t *testing.T) {
- shortCtx, shortCancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
+ shortCtx, shortCancel := context2.WithTimeout(context.Background(), 10*time.Millisecond, "./go/vt/discovery/tablet_stats_cache_wait_test.go:32")
defer shortCancel()
waitAvailableTabletInterval = 20 * time.Millisecond
@@ -48,7 +49,7 @@ func TestWaitForTablets(t *testing.T) {
}
// this should fail, but return a non-timeout error
- cancelledCtx, cancel := context.WithCancel(context.Background())
+ cancelledCtx, cancel := context2.WithCancel(context.Background(), "./go/vt/discovery/tablet_stats_cache_wait_test.go:51")
cancel()
if err := tsc.WaitForTablets(cancelledCtx, "cell", "keyspace", "shard", topodatapb.TabletType_REPLICA); err == nil || err == context.DeadlineExceeded {
t.Errorf("want: non-timeout error, got: %v", err)
@@ -67,7 +68,7 @@ func TestWaitForTablets(t *testing.T) {
input <- shr
// and ask again, with longer time outs so it's not flaky
- longCtx, longCancel := context.WithTimeout(context.Background(), 10*time.Second)
+ longCtx, longCancel := context2.WithTimeout(context.Background(), 10*time.Second, "./go/vt/discovery/tablet_stats_cache_wait_test.go:70")
defer longCancel()
waitAvailableTabletInterval = 10 * time.Millisecond
if err := tsc.WaitForTablets(longCtx, "cell", "keyspace", "shard", topodatapb.TabletType_REPLICA); err != nil {
diff --git a/go/vt/discovery/topology_watcher.go b/go/vt/discovery/topology_watcher.go
index 429b72f54..00d25363a 100644
--- a/go/vt/discovery/topology_watcher.go
+++ b/go/vt/discovery/topology_watcher.go
@@ -26,6 +26,7 @@ import (
"time"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/trace"
@@ -153,7 +154,7 @@ func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, tr TabletR
// We want the span from the context, but not the cancelation that comes with it
spanContext := trace.CopySpan(context.Background(), ctx)
- tw.ctx, tw.cancelFunc = context.WithCancel(spanContext)
+ tw.ctx, tw.cancelFunc = context2.WithCancel(spanContext, "./go/vt/discovery/topology_watcher.go:156")
tw.wg.Add(1)
go tw.watch()
return tw
diff --git a/go/vt/grpcclient/client.go b/go/vt/grpcclient/client.go
index 2ab77b011..62e270ab8 100644
--- a/go/vt/grpcclient/client.go
+++ b/go/vt/grpcclient/client.go
@@ -21,8 +21,6 @@ package grpcclient
import (
"flag"
- "github.com/grpc-ecosystem/go-grpc-middleware"
- "github.com/grpc-ecosystem/go-grpc-prometheus"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
diff --git a/go/vt/grpcclient/client_test.go b/go/vt/grpcclient/client_test.go
index 106a0937e..306040c04 100644
--- a/go/vt/grpcclient/client_test.go
+++ b/go/vt/grpcclient/client_test.go
@@ -24,6 +24,7 @@ import (
"google.golang.org/grpc"
+ "vitess.io/vitess/go/context2"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
vtgateservicepb "vitess.io/vitess/go/vt/proto/vtgateservice"
)
@@ -41,7 +42,7 @@ func TestDialErrors(t *testing.T) {
t.Fatal(err)
}
vtg := vtgateservicepb.NewVitessClient(gconn)
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
+ ctx, cancel := context2.WithTimeout(context.Background(), 10*time.Millisecond, "./go/vt/grpcclient/client_test.go:44")
_, err = vtg.Execute(ctx, &vtgatepb.ExecuteRequest{})
cancel()
gconn.Close()
@@ -57,7 +58,7 @@ func TestDialErrors(t *testing.T) {
t.Fatal(err)
}
vtg := vtgateservicepb.NewVitessClient(gconn)
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
+ ctx, cancel := context2.WithTimeout(context.Background(), 10*time.Millisecond, "./go/vt/grpcclient/client_test.go:60")
_, err = vtg.Execute(ctx, &vtgatepb.ExecuteRequest{})
cancel()
gconn.Close()
diff --git a/go/vt/log/log.go b/go/vt/log/log.go
index d2d0adf1d..9c61f1e7c 100644
--- a/go/vt/log/log.go
+++ b/go/vt/log/log.go
@@ -7,6 +7,7 @@ package log
import (
"flag"
+
"github.com/golang/glog"
)
diff --git a/go/vt/mysqlctl/query.go b/go/vt/mysqlctl/query.go
index 0051f5bd6..6759a367f 100644
--- a/go/vt/mysqlctl/query.go
+++ b/go/vt/mysqlctl/query.go
@@ -23,6 +23,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/dbconnpool"
@@ -160,7 +161,7 @@ func (mysqld *Mysqld) killConnection(connID int64) error {
// Get another connection with which to kill.
// Use background context because the caller's context is likely expired,
// which is the reason we're being asked to kill the connection.
- ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
+ ctx, cancel := context2.WithTimeout(context.Background(), 1*time.Second, "./go/vt/mysqlctl/query.go:163")
defer cancel()
if poolConn, connErr := getPoolReconnect(ctx, mysqld.dbaPool); connErr == nil {
// We got a pool connection.
diff --git a/go/vt/schemamanager/schemaswap/schema_swap.go b/go/vt/schemamanager/schemaswap/schema_swap.go
index 4aa4d3816..e485fa832 100644
--- a/go/vt/schemamanager/schemaswap/schema_swap.go
+++ b/go/vt/schemamanager/schemaswap/schema_swap.go
@@ -29,6 +29,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/discovery"
@@ -879,7 +880,7 @@ func (array orderTabletsForSwap) Less(i, j int) bool {
// executeAdminQuery executes a query on a given tablet as 'allprivs' user. The query is executed
// using timeout value from --schema_swap_admin_query_timeout flag.
func (shardSwap *shardSchemaSwap) executeAdminQuery(tablet *topodatapb.Tablet, query string, maxRows int) (*sqltypes.Result, error) {
- sqlCtx, cancelSQLCtx := context.WithTimeout(shardSwap.parent.ctx, *adminQueryTimeout)
+ sqlCtx, cancelSQLCtx := context2.WithTimeout(shardSwap.parent.ctx, *adminQueryTimeout, "./go/vt/schemamanager/schemaswap/schema_swap.go:882")
defer cancelSQLCtx()
sqlResultProto, err := shardSwap.parent.tabletClient.ExecuteFetchAsAllPrivs(
diff --git a/go/vt/schemamanager/tablet_executor.go b/go/vt/schemamanager/tablet_executor.go
index 6346e6f91..a9738cb1f 100644
--- a/go/vt/schemamanager/tablet_executor.go
+++ b/go/vt/schemamanager/tablet_executor.go
@@ -23,6 +23,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/sync2"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/wrangler"
@@ -252,7 +253,7 @@ func (exec *TabletExecutor) executeOnAllTablets(ctx context.Context, execResult
// execute the schema change via replication. This is best-effort, meaning
// we still return overall success if the timeout expires.
concurrency := sync2.NewSemaphore(10, 0)
- reloadCtx, cancel := context.WithTimeout(ctx, exec.waitSlaveTimeout)
+ reloadCtx, cancel := context2.WithTimeout(ctx, exec.waitSlaveTimeout, "./go/vt/schemamanager/tablet_executor.go:255")
defer cancel()
for _, result := range execResult.SuccessShards {
wg.Add(1)
diff --git a/go/vt/servenv/grpc_server_test.go b/go/vt/servenv/grpc_server_test.go
index 8c611cf22..4a9cf034c 100644
--- a/go/vt/servenv/grpc_server_test.go
+++ b/go/vt/servenv/grpc_server_test.go
@@ -17,62 +17,62 @@ limitations under the License.
package servenv
import (
- "testing"
+ "testing"
- "golang.org/x/net/context"
+ "golang.org/x/net/context"
- "google.golang.org/grpc"
+ "google.golang.org/grpc"
)
func TestEmpty(t *testing.T) {
- interceptors := &serverInterceptorBuilder{}
- if len(interceptors.Build()) > 0 {
- t.Fatalf("expected empty builder to report as empty")
- }
+ interceptors := &serverInterceptorBuilder{}
+ if len(interceptors.Build()) > 0 {
+ t.Fatalf("expected empty builder to report as empty")
+ }
}
func TestSingleInterceptor(t *testing.T) {
- interceptors := &serverInterceptorBuilder{}
- fake := &FakeInterceptor{}
+ interceptors := &serverInterceptorBuilder{}
+ fake := &FakeInterceptor{}
- interceptors.Add(fake.StreamServerInterceptor, fake.UnaryServerInterceptor)
+ interceptors.Add(fake.StreamServerInterceptor, fake.UnaryServerInterceptor)
- if len(interceptors.streamInterceptors) != 1 {
- t.Fatalf("expected 1 server options to be available")
- }
- if len(interceptors.unaryInterceptors) != 1 {
- t.Fatalf("expected 1 server options to be available")
- }
+ if len(interceptors.streamInterceptors) != 1 {
+ t.Fatalf("expected 1 server options to be available")
+ }
+ if len(interceptors.unaryInterceptors) != 1 {
+ t.Fatalf("expected 1 server options to be available")
+ }
}
func TestDoubleInterceptor(t *testing.T) {
- interceptors := &serverInterceptorBuilder{}
- fake1 := &FakeInterceptor{name: "ettan"}
- fake2 := &FakeInterceptor{name: "tvaon"}
-
- interceptors.Add(fake1.StreamServerInterceptor, fake1.UnaryServerInterceptor)
- interceptors.Add(fake2.StreamServerInterceptor, fake2.UnaryServerInterceptor)
-
- if len(interceptors.streamInterceptors) != 2 {
- t.Fatalf("expected 1 server options to be available")
- }
- if len(interceptors.unaryInterceptors) != 2 {
- t.Fatalf("expected 1 server options to be available")
- }
+ interceptors := &serverInterceptorBuilder{}
+ fake1 := &FakeInterceptor{name: "ettan"}
+ fake2 := &FakeInterceptor{name: "tvaon"}
+
+ interceptors.Add(fake1.StreamServerInterceptor, fake1.UnaryServerInterceptor)
+ interceptors.Add(fake2.StreamServerInterceptor, fake2.UnaryServerInterceptor)
+
+ if len(interceptors.streamInterceptors) != 2 {
+ t.Fatalf("expected 1 server options to be available")
+ }
+ if len(interceptors.unaryInterceptors) != 2 {
+ t.Fatalf("expected 1 server options to be available")
+ }
}
type FakeInterceptor struct {
- name string
- streamSeen interface{}
- unarySeen interface{}
+ name string
+ streamSeen interface{}
+ unarySeen interface{}
}
func (fake *FakeInterceptor) StreamServerInterceptor(value interface{}, stream grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
- fake.streamSeen = value
- return handler(value, stream)
+ fake.streamSeen = value
+ return handler(value, stream)
}
func (fake *FakeInterceptor) UnaryServerInterceptor(ctx context.Context, value interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
- fake.unarySeen = value
- return handler(ctx, value)
+ fake.unarySeen = value
+ return handler(ctx, value)
}
diff --git a/go/vt/sqlparser/like_filter.go b/go/vt/sqlparser/like_filter.go
index 598cb4911..411e906b3 100644
--- a/go/vt/sqlparser/like_filter.go
+++ b/go/vt/sqlparser/like_filter.go
@@ -15,7 +15,7 @@ func replacer(s string) string {
return s[2:]
}
- result := strings.Replace(s, "%" ,".*", -1)
+ result := strings.Replace(s, "%", ".*", -1)
result = strings.Replace(result, "_", ".", -1)
return result
diff --git a/go/vt/sqlparser/like_filter_test.go b/go/vt/sqlparser/like_filter_test.go
index c44d6ff65..59687287c 100644
--- a/go/vt/sqlparser/like_filter_test.go
+++ b/go/vt/sqlparser/like_filter_test.go
@@ -1,8 +1,9 @@
package sqlparser
import (
- "github.com/stretchr/testify/assert"
"testing"
+
+ "github.com/stretchr/testify/assert"
)
func TestEmptyLike(t *testing.T) {
diff --git a/go/vt/srvtopo/resilient_server_flaky_test.go b/go/vt/srvtopo/resilient_server_flaky_test.go
index e40d9500c..b21ae1883 100644
--- a/go/vt/srvtopo/resilient_server_flaky_test.go
+++ b/go/vt/srvtopo/resilient_server_flaky_test.go
@@ -27,6 +27,7 @@ import (
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/status"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/memorytopo"
@@ -308,7 +309,7 @@ func TestGetSrvKeyspace(t *testing.T) {
time.Sleep(*srvTopoCacheTTL)
- timeoutCtx, _ := context.WithTimeout(context.Background(), *srvTopoCacheRefresh*2)
+ timeoutCtx, _ := context2.WithTimeout(context.Background(), *srvTopoCacheRefresh*2, "./go/vt/srvtopo/resilient_server_flaky_test.go:311")
_, err = rs.GetSrvKeyspace(timeoutCtx, "test_cell", "test_ks")
wantErr := "timed out waiting for keyspace"
if err == nil || err.Error() != wantErr {
@@ -346,7 +347,7 @@ func TestSrvKeyspaceCachedError(t *testing.T) {
time.Sleep(*srvTopoCacheTTL + 10*time.Millisecond)
// Ask again with a different context, should get an error and
// save that context.
- ctx, cancel := context.WithCancel(ctx)
+ ctx, cancel := context2.WithCancel(ctx, "./go/vt/srvtopo/resilient_server_flaky_test.go:349")
defer cancel()
_, err2 := rs.GetSrvKeyspace(ctx, "test_cell", "unknown_ks")
if err2 == nil {
@@ -598,7 +599,7 @@ func TestGetSrvKeyspaceNames(t *testing.T) {
time.Sleep(*srvTopoCacheTTL)
- timeoutCtx, _ := context.WithTimeout(context.Background(), *srvTopoCacheRefresh*2)
+ timeoutCtx, _ := context2.WithTimeout(context.Background(), *srvTopoCacheRefresh*2, "./go/vt/srvtopo/resilient_server_flaky_test.go:601")
_, err = rs.GetSrvKeyspaceNames(timeoutCtx, "test_cell")
wantErr := "timed out waiting for keyspace names"
if err == nil || err.Error() != wantErr {
diff --git a/go/vt/topo/consultopo/election.go b/go/vt/topo/consultopo/election.go
index bacbbac8d..65455b1c8 100644
--- a/go/vt/topo/consultopo/election.go
+++ b/go/vt/topo/consultopo/election.go
@@ -22,6 +22,7 @@ import (
"golang.org/x/net/context"
"github.com/hashicorp/consul/api"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"
)
@@ -90,7 +91,7 @@ func (mp *consulMasterParticipation) WaitForMastership() (context.Context, error
}
// We have the lock, keep mastership until we lose it.
- lockCtx, lockCancel := context.WithCancel(context.Background())
+ lockCtx, lockCancel := context2.WithCancel(context.Background(), "./go/vt/topo/consultopo/election.go:93")
go func() {
select {
case <-lost:
diff --git a/go/vt/topo/consultopo/watch.go b/go/vt/topo/consultopo/watch.go
index 8021738ca..de4ca89b9 100644
--- a/go/vt/topo/consultopo/watch.go
+++ b/go/vt/topo/consultopo/watch.go
@@ -24,6 +24,7 @@ import (
"golang.org/x/net/context"
"github.com/hashicorp/consul/api"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/topo"
)
@@ -51,7 +52,7 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, <
}
// Create a context, will be used to cancel the watch.
- watchCtx, watchCancel := context.WithCancel(context.Background())
+ watchCtx, watchCancel := context2.WithCancel(context.Background(), "./go/vt/topo/consultopo/watch.go:54")
// Create the notifications channel, send updates to it.
notifications := make(chan *topo.WatchData, 10)
diff --git a/go/vt/topo/etcd2topo/election.go b/go/vt/topo/etcd2topo/election.go
index 354c5e499..ff3b66256 100644
--- a/go/vt/topo/etcd2topo/election.go
+++ b/go/vt/topo/etcd2topo/election.go
@@ -22,6 +22,7 @@ import (
"github.com/coreos/etcd/clientv3"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"
)
@@ -73,7 +74,7 @@ func (mp *etcdMasterParticipation) WaitForMastership() (context.Context, error)
// We use a cancelable context here. If stop is closed,
// we just cancel that context.
- lockCtx, lockCancel := context.WithCancel(context.Background())
+ lockCtx, lockCancel := context2.WithCancel(context.Background(), "./go/vt/topo/etcd2topo/election.go:76")
go func() {
<-mp.stop
if ld != nil {
diff --git a/go/vt/topo/etcd2topo/lock.go b/go/vt/topo/etcd2topo/lock.go
index 27928cf3e..7f916239e 100644
--- a/go/vt/topo/etcd2topo/lock.go
+++ b/go/vt/topo/etcd2topo/lock.go
@@ -24,6 +24,7 @@ import (
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
@@ -86,7 +87,7 @@ func (s *Server) waitOnLastRev(ctx context.Context, cli *clientv3.Client, nodePa
// Wait for release on blocking key. Cancel the watch when we
// exit this function.
key := string(lastKey.Kvs[0].Key)
- ctx, cancel := context.WithCancel(ctx)
+ ctx, cancel := context2.WithCancel(ctx, "./go/vt/topo/etcd2topo/lock.go:89")
defer cancel()
wc := cli.Watch(ctx, key, clientv3.WithRev(revision))
if wc == nil {
diff --git a/go/vt/topo/etcd2topo/server_test.go b/go/vt/topo/etcd2topo/server_test.go
index beccd612b..920686bc2 100644
--- a/go/vt/topo/etcd2topo/server_test.go
+++ b/go/vt/topo/etcd2topo/server_test.go
@@ -28,6 +28,7 @@ import (
"golang.org/x/net/context"
"github.com/coreos/etcd/clientv3"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/testfiles"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/test"
@@ -73,7 +74,7 @@ func startEtcd(t *testing.T) (*exec.Cmd, string, string) {
}
// Wait until we can list "/", or timeout.
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ ctx, cancel := context2.WithTimeout(context.Background(), 10*time.Second, "./go/vt/topo/etcd2topo/server_test.go:76")
defer cancel()
start := time.Now()
for {
diff --git a/go/vt/topo/etcd2topo/watch.go b/go/vt/topo/etcd2topo/watch.go
index 706fd994e..cb024798c 100644
--- a/go/vt/topo/etcd2topo/watch.go
+++ b/go/vt/topo/etcd2topo/watch.go
@@ -23,6 +23,7 @@ import (
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
@@ -50,10 +51,10 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, <
}
// Create an outer context that will be canceled on return and will cancel all inner watches.
- outerCtx, outerCancel := context.WithCancel(context.Background())
+ outerCtx, outerCancel := context2.WithCancel(context.Background(), "./go/vt/topo/etcd2topo/watch.go:53")
// Create a context, will be used to cancel the watch on retry.
- watchCtx, watchCancel := context.WithCancel(outerCtx)
+ watchCtx, watchCancel := context2.WithCancel(outerCtx, "./go/vt/topo/etcd2topo/watch.go:56")
// Create the Watcher. We start watching from the response we
// got, not from the file original version, as the server may
@@ -87,7 +88,7 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, <
watchRetries++
// Cancel inner context on retry and create new one.
watchCancel()
- watchCtx, watchCancel = context.WithCancel(outerCtx)
+ watchCtx, watchCancel = context2.WithCancel(outerCtx, "./go/vt/topo/etcd2topo/watch.go:90")
newWatcher := s.cli.Watch(watchCtx, nodePath, clientv3.WithRev(currVersion))
if newWatcher == nil {
log.Warningf("watch %v failed and get a nil channel returned, currVersion: %v", nodePath, currVersion)
diff --git a/go/vt/topo/locks.go b/go/vt/topo/locks.go
index f9f15a479..184e505e5 100644
--- a/go/vt/topo/locks.go
+++ b/go/vt/topo/locks.go
@@ -26,6 +26,7 @@ import (
"time"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
@@ -217,7 +218,7 @@ func CheckKeyspaceLocked(ctx context.Context, keyspace string) error {
func (l *Lock) lockKeyspace(ctx context.Context, ts *Server, keyspace string) (LockDescriptor, error) {
log.Infof("Locking keyspace %v for action %v", keyspace, l.Action)
- ctx, cancel := context.WithTimeout(ctx, *RemoteOperationTimeout)
+ ctx, cancel := context2.WithTimeout(ctx, *RemoteOperationTimeout, "./go/vt/topo/locks.go:220")
defer cancel()
span, ctx := trace.NewSpan(ctx, "TopoServer.LockKeyspaceForAction")
@@ -241,7 +242,7 @@ func (l *Lock) unlockKeyspace(ctx context.Context, ts *Server, keyspace string,
// Note that we are not using the user provided RemoteOperationTimeout
// here because it is possible that that timeout is too short.
ctx = trace.CopySpan(context.TODO(), ctx)
- ctx, cancel := context.WithTimeout(ctx, defaultLockTimeout)
+ ctx, cancel := context2.WithTimeout(ctx, defaultLockTimeout, "./go/vt/topo/locks.go:244")
defer cancel()
span, ctx := trace.NewSpan(ctx, "TopoServer.UnlockKeyspaceForAction")
@@ -358,7 +359,7 @@ func CheckShardLocked(ctx context.Context, keyspace, shard string) error {
func (l *Lock) lockShard(ctx context.Context, ts *Server, keyspace, shard string) (LockDescriptor, error) {
log.Infof("Locking shard %v/%v for action %v", keyspace, shard, l.Action)
- ctx, cancel := context.WithTimeout(ctx, *RemoteOperationTimeout)
+ ctx, cancel := context2.WithTimeout(ctx, *RemoteOperationTimeout, "./go/vt/topo/locks.go:361")
defer cancel()
span, ctx := trace.NewSpan(ctx, "TopoServer.LockShardForAction")
@@ -382,7 +383,7 @@ func (l *Lock) unlockShard(ctx context.Context, ts *Server, keyspace, shard stri
// Note that we are not using the user provided RemoteOperationTimeout
// here because it is possible that that timeout is too short.
ctx = trace.CopySpan(context.TODO(), ctx)
- ctx, cancel := context.WithTimeout(ctx, defaultLockTimeout)
+ ctx, cancel := context2.WithTimeout(ctx, defaultLockTimeout, "./go/vt/topo/locks.go:385")
defer cancel()
span, ctx := trace.NewSpan(ctx, "TopoServer.UnlockShardForAction")
diff --git a/go/vt/topo/memorytopo/election.go b/go/vt/topo/memorytopo/election.go
index f66154a02..16db7ccb7 100644
--- a/go/vt/topo/memorytopo/election.go
+++ b/go/vt/topo/memorytopo/election.go
@@ -21,6 +21,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"
)
@@ -81,7 +82,7 @@ func (mp *cMasterParticipation) WaitForMastership() (context.Context, error) {
// We use a cancelable context here. If stop is closed,
// we just cancel that context.
- lockCtx, lockCancel := context.WithCancel(context.Background())
+ lockCtx, lockCancel := context2.WithCancel(context.Background(), "./go/vt/topo/memorytopo/election.go:84")
go func() {
<-mp.stop
if ld != nil {
diff --git a/go/vt/topo/test/lock.go b/go/vt/topo/test/lock.go
index 315653ff0..ab5e60043 100644
--- a/go/vt/topo/test/lock.go
+++ b/go/vt/topo/test/lock.go
@@ -22,6 +22,7 @@ import (
"time"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/topo"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
@@ -92,14 +93,14 @@ func checkLockTimeout(ctx context.Context, t *testing.T, conn topo.Conn) {
}
// test we can't take the lock again
- fastCtx, cancel := context.WithTimeout(ctx, timeUntilLockIsTaken)
+ fastCtx, cancel := context2.WithTimeout(ctx, timeUntilLockIsTaken, "./go/vt/topo/test/lock.go:95")
if _, err := conn.Lock(fastCtx, keyspacePath, "again"); !topo.IsErrType(err, topo.Timeout) {
t.Fatalf("Lock(again): %v", err)
}
cancel()
// test we can interrupt taking the lock
- interruptCtx, cancel := context.WithCancel(ctx)
+ interruptCtx, cancel := context2.WithCancel(ctx, "./go/vt/topo/test/lock.go:102")
go func() {
time.Sleep(timeUntilLockIsTaken)
cancel()
diff --git a/go/vt/topo/zk2topo/election.go b/go/vt/topo/zk2topo/election.go
index 15975a00f..df408f41e 100644
--- a/go/vt/topo/zk2topo/election.go
+++ b/go/vt/topo/zk2topo/election.go
@@ -22,6 +22,7 @@ import (
"github.com/z-division/go-zookeeper/zk"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/log"
@@ -49,7 +50,7 @@ func (zs *Server) NewMasterParticipation(name, id string) (topo.MasterParticipat
id: []byte(id),
done: make(chan struct{}),
}
- result.stopCtx, result.stopCtxCancel = context.WithCancel(context.Background())
+ result.stopCtx, result.stopCtxCancel = context2.WithCancel(context.Background(), "./go/vt/topo/zk2topo/election.go:52")
return result, nil
}
@@ -122,7 +123,7 @@ func (mp *zkMasterParticipation) WaitForMastership() (context.Context, error) {
}
// we got the lock, create our background context
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/topo/zk2topo/election.go:125")
go mp.watchMastership(ctx, mp.zs.conn, proposal, cancel)
return ctx, nil
}
diff --git a/go/vt/vtctl/throttler.go b/go/vt/vtctl/throttler.go
index 1e8452e57..1822f42d5 100644
--- a/go/vt/vtctl/throttler.go
+++ b/go/vt/vtctl/throttler.go
@@ -27,6 +27,7 @@ import (
"github.com/olekukonko/tablewriter"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/throttler"
"vitess.io/vitess/go/vt/throttler/throttlerclient"
@@ -86,7 +87,7 @@ func commandThrottlerMaxRates(ctx context.Context, wr *wrangler.Wrangler, subFla
}
// Connect to the server.
- ctx, cancel := context.WithTimeout(ctx, shortTimeout)
+ ctx, cancel := context2.WithTimeout(ctx, shortTimeout, "./go/vt/vtctl/throttler.go:89")
defer cancel()
client, err := throttlerclient.New(*server)
if err != nil {
@@ -139,7 +140,7 @@ func commandThrottlerSetMaxRate(ctx context.Context, wr *wrangler.Wrangler, subF
}
// Connect to the server.
- ctx, cancel := context.WithTimeout(ctx, shortTimeout)
+ ctx, cancel := context2.WithTimeout(ctx, shortTimeout, "./go/vt/vtctl/throttler.go:142")
defer cancel()
client, err := throttlerclient.New(*server)
if err != nil {
@@ -176,7 +177,7 @@ func commandGetThrottlerConfiguration(ctx context.Context, wr *wrangler.Wrangler
}
// Connect to the server.
- ctx, cancel := context.WithTimeout(ctx, shortTimeout)
+ ctx, cancel := context2.WithTimeout(ctx, shortTimeout, "./go/vt/vtctl/throttler.go:179")
defer cancel()
client, err := throttlerclient.New(*server)
if err != nil {
@@ -230,7 +231,7 @@ func commandUpdateThrottlerConfiguration(ctx context.Context, wr *wrangler.Wrang
}
// Connect to the server.
- ctx, cancel := context.WithTimeout(ctx, shortTimeout)
+ ctx, cancel := context2.WithTimeout(ctx, shortTimeout, "./go/vt/vtctl/throttler.go:233")
defer cancel()
client, err := throttlerclient.New(*server)
if err != nil {
@@ -268,7 +269,7 @@ func commandResetThrottlerConfiguration(ctx context.Context, wr *wrangler.Wrangl
}
// Connect to the server.
- ctx, cancel := context.WithTimeout(ctx, shortTimeout)
+ ctx, cancel := context2.WithTimeout(ctx, shortTimeout, "./go/vt/vtctl/throttler.go:271")
defer cancel()
client, err := throttlerclient.New(*server)
if err != nil {
diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go
index 94cb8744a..f935fd7db 100644
--- a/go/vt/vtctl/vtctl.go
+++ b/go/vt/vtctl/vtctl.go
@@ -107,6 +107,7 @@ import (
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/flagutil"
"vitess.io/vitess/go/json2"
"vitess.io/vitess/go/mysql"
@@ -1010,7 +1011,7 @@ func commandWaitForDrain(ctx context.Context, wr *wrangler.Wrangler, subFlags *f
}
if *timeout != 0 {
var cancel context.CancelFunc
- ctx, cancel = context.WithTimeout(ctx, *timeout)
+ ctx, cancel = context2.WithTimeout(ctx, *timeout, "./go/vt/vtctl/vtctl.go:1013")
defer cancel()
}
diff --git a/go/vt/vtctld/action_repository.go b/go/vt/vtctld/action_repository.go
index 7b484707e..c957b3280 100644
--- a/go/vt/vtctld/action_repository.go
+++ b/go/vt/vtctld/action_repository.go
@@ -24,6 +24,7 @@ import (
"golang.org/x/net/context"
"vitess.io/vitess/go/acl"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
@@ -115,7 +116,7 @@ func (ar *ActionRepository) ApplyKeyspaceAction(ctx context.Context, actionName,
return result
}
- ctx, cancel := context.WithTimeout(ctx, *actionTimeout)
+ ctx, cancel := context2.WithTimeout(ctx, *actionTimeout, "./go/vt/vtctld/action_repository.go:118")
wr := wrangler.New(logutil.NewConsoleLogger(), ar.ts, tmclient.NewTabletManagerClient())
output, err := action(ctx, wr, keyspace, r)
cancel()
@@ -142,7 +143,7 @@ func (ar *ActionRepository) ApplyShardAction(ctx context.Context, actionName, ke
return result
}
- ctx, cancel := context.WithTimeout(ctx, *actionTimeout)
+ ctx, cancel := context2.WithTimeout(ctx, *actionTimeout, "./go/vt/vtctld/action_repository.go:145")
wr := wrangler.New(logutil.NewConsoleLogger(), ar.ts, tmclient.NewTabletManagerClient())
output, err := action(ctx, wr, keyspace, shard, r)
cancel()
@@ -176,7 +177,7 @@ func (ar *ActionRepository) ApplyTabletAction(ctx context.Context, actionName st
}
// run the action
- ctx, cancel := context.WithTimeout(ctx, *actionTimeout)
+ ctx, cancel := context2.WithTimeout(ctx, *actionTimeout, "./go/vt/vtctld/action_repository.go:179")
wr := wrangler.New(logutil.NewConsoleLogger(), ar.ts, tmclient.NewTabletManagerClient())
output, err := action.method(ctx, wr, tabletAlias, r)
cancel()
diff --git a/go/vt/vtctld/tablet_data_test.go b/go/vt/vtctld/tablet_data_test.go
index edc69d985..e8b32d2d2 100644
--- a/go/vt/vtctld/tablet_data_test.go
+++ b/go/vt/vtctld/tablet_data_test.go
@@ -24,6 +24,7 @@ import (
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/logutil"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
@@ -141,7 +142,7 @@ func TestTabletData(t *testing.T) {
}()
// Start streaming and wait for the first result.
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ ctx, cancel := context2.WithTimeout(context.Background(), 5*time.Second, "./go/vt/vtctld/tablet_data_test.go:144")
result, err := thc.Get(ctx, tablet1.Tablet.Alias)
cancel()
close(stop)
diff --git a/go/vt/vtctld/workflow.go b/go/vt/vtctld/workflow.go
index 1982e9093..e95b4a5fc 100644
--- a/go/vt/vtctld/workflow.go
+++ b/go/vt/vtctld/workflow.go
@@ -21,6 +21,7 @@ import (
"time"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/flagutil"
@@ -85,7 +86,7 @@ func initWorkflowManager(ts *topo.Server) {
}
func runWorkflowManagerAlone() {
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vtctld/workflow.go:88")
go vtctl.WorkflowManager.Run(ctx)
// Running cancel on OnTermSync will cancel the context of any
diff --git a/go/vt/vtgate/buffer/buffer_test.go b/go/vt/vtgate/buffer/buffer_test.go
index 5951ebb65..f4d7074e2 100644
--- a/go/vt/vtgate/buffer/buffer_test.go
+++ b/go/vt/vtgate/buffer/buffer_test.go
@@ -26,6 +26,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"
@@ -574,7 +575,7 @@ func testRequestCanceled(t *testing.T, explicitEnd bool) {
if err := waitForRequestsInFlight(b, 1); err != nil {
t.Fatal(err)
}
- ctx2, cancel2 := context.WithCancel(context.Background())
+ ctx2, cancel2 := context2.WithCancel(context.Background(), "./go/vt/vtgate/buffer/buffer_test.go:577")
stopped2 := issueRequest(ctx2, t, b, failoverErr)
if err := waitForRequestsInFlight(b, 2); err != nil {
t.Fatal(err)
diff --git a/go/vt/vtgate/buffer/shard_buffer.go b/go/vt/vtgate/buffer/shard_buffer.go
index 0916b225b..8a455b4f8 100644
--- a/go/vt/vtgate/buffer/shard_buffer.go
+++ b/go/vt/vtgate/buffer/shard_buffer.go
@@ -24,6 +24,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/sync2"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
@@ -327,7 +328,7 @@ func (sb *shardBuffer) bufferRequestLocked(ctx context.Context) (*entry, error)
done: make(chan struct{}),
deadline: sb.now().Add(*window),
}
- e.bufferCtx, e.bufferCancel = context.WithCancel(ctx)
+ e.bufferCtx, e.bufferCancel = context2.WithCancel(ctx, "./go/vt/vtgate/buffer/shard_buffer.go:330")
sb.queue = append(sb.queue, e)
if max := lastRequestsInFlightMax.Counts()[sb.statsKeyJoined]; max < int64(len(sb.queue)) {
diff --git a/go/vt/vtgate/endtoend/vstream_test.go b/go/vt/vtgate/endtoend/vstream_test.go
index 1d382bc0f..85e553346 100644
--- a/go/vt/vtgate/endtoend/vstream_test.go
+++ b/go/vt/vtgate/endtoend/vstream_test.go
@@ -22,6 +22,7 @@ import (
"testing"
"github.com/golang/protobuf/proto"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/mysql"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
"vitess.io/vitess/go/vt/proto/query"
@@ -31,7 +32,7 @@ import (
)
func TestVStream(t *testing.T) {
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vtgate/endtoend/vstream_test.go:34")
defer cancel()
gconn, err := vtgateconn.Dial(ctx, grpcAddress)
diff --git a/go/vt/vtgate/engine/merge_sort.go b/go/vt/vtgate/engine/merge_sort.go
index ee5c9cf1d..1c87cad97 100644
--- a/go/vt/vtgate/engine/merge_sort.go
+++ b/go/vt/vtgate/engine/merge_sort.go
@@ -24,6 +24,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/sqltypes"
querypb "vitess.io/vitess/go/vt/proto/query"
@@ -37,7 +38,7 @@ import (
// was pulled out. Since the input streams are sorted the same way that the heap is
// sorted, this guarantees that the merged stream will also be sorted the same way.
func mergeSort(vcursor VCursor, query string, orderBy []OrderbyParams, rss []*srvtopo.ResolvedShard, bvs []map[string]*querypb.BindVariable, callback func(*sqltypes.Result) error) error {
- ctx, cancel := context.WithCancel(vcursor.Context())
+ ctx, cancel := context2.WithCancel(vcursor.Context(), "./go/vt/vtgate/engine/merge_sort.go:40")
defer cancel()
handles := make([]*streamHandle, len(rss))
diff --git a/go/vt/vtgate/executor_stream_test.go b/go/vt/vtgate/executor_stream_test.go
index ea41aa88c..aabd41723 100644
--- a/go/vt/vtgate/executor_stream_test.go
+++ b/go/vt/vtgate/executor_stream_test.go
@@ -21,6 +21,7 @@ import (
"time"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/discovery"
querypb "vitess.io/vitess/go/vt/proto/query"
@@ -85,7 +86,7 @@ func TestStreamSQLSharded(t *testing.T) {
func executorStreamMessages(executor *Executor, sql string) (qr *sqltypes.Result, err error) {
results := make(chan *sqltypes.Result, 100)
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
+ ctx, cancel := context2.WithTimeout(context.Background(), 10*time.Millisecond, "./go/vt/vtgate/executor_stream_test.go:88")
defer cancel()
err = executor.StreamExecute(
ctx,
diff --git a/go/vt/vtgate/gateway/gateway.go b/go/vt/vtgate/gateway/gateway.go
index 7b458332f..a9c304969 100644
--- a/go/vt/vtgate/gateway/gateway.go
+++ b/go/vt/vtgate/gateway/gateway.go
@@ -23,6 +23,7 @@ import (
"time"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/flagutil"
"vitess.io/vitess/go/vt/log"
@@ -107,7 +108,7 @@ func GetCreator() Creator {
// just calls it.
func WaitForTablets(gw Gateway, tabletTypesToWait []topodatapb.TabletType) error {
log.Infof("Gateway waiting for serving tablets...")
- ctx, cancel := context.WithTimeout(context.Background(), *initialTabletTimeout)
+ ctx, cancel := context2.WithTimeout(context.Background(), *initialTabletTimeout, "./go/vt/vtgate/gateway/gateway.go:110")
defer cancel()
err := gw.WaitForTablets(ctx, tabletTypesToWait)
diff --git a/go/vt/vtgate/plugin_mysql_server.go b/go/vt/vtgate/plugin_mysql_server.go
index ea5e16abd..4cdd72c30 100644
--- a/go/vt/vtgate/plugin_mysql_server.go
+++ b/go/vt/vtgate/plugin_mysql_server.go
@@ -26,6 +26,7 @@ import (
"syscall"
"time"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
@@ -89,7 +90,7 @@ func (vh *vtgateHandler) ConnectionClosed(c *mysql.Conn) {
var ctx context.Context
var cancel context.CancelFunc
if *mysqlQueryTimeout != 0 {
- ctx, cancel = context.WithTimeout(context.Background(), *mysqlQueryTimeout)
+ ctx, cancel = context2.WithTimeout(context.Background(), *mysqlQueryTimeout, "./go/vt/vtgate/plugin_mysql_server.go:92")
defer cancel()
} else {
ctx = context.Background()
@@ -136,7 +137,7 @@ func (vh *vtgateHandler) ComQuery(c *mysql.Conn, query string, callback func(*sq
ctx := context.Background()
var cancel context.CancelFunc
if *mysqlQueryTimeout != 0 {
- ctx, cancel = context.WithTimeout(ctx, *mysqlQueryTimeout)
+ ctx, cancel = context2.WithTimeout(ctx, *mysqlQueryTimeout, "./go/vt/vtgate/plugin_mysql_server.go:139")
defer cancel()
}
@@ -203,7 +204,7 @@ func (vh *vtgateHandler) ComPrepare(c *mysql.Conn, query string) ([]*querypb.Fie
var ctx context.Context
var cancel context.CancelFunc
if *mysqlQueryTimeout != 0 {
- ctx, cancel = context.WithTimeout(context.Background(), *mysqlQueryTimeout)
+ ctx, cancel = context2.WithTimeout(context.Background(), *mysqlQueryTimeout, "./go/vt/vtgate/plugin_mysql_server.go:206")
defer cancel()
} else {
ctx = context.Background()
@@ -262,7 +263,7 @@ func (vh *vtgateHandler) ComStmtExecute(c *mysql.Conn, prepare *mysql.PrepareDat
var ctx context.Context
var cancel context.CancelFunc
if *mysqlQueryTimeout != 0 {
- ctx, cancel = context.WithTimeout(context.Background(), *mysqlQueryTimeout)
+ ctx, cancel = context2.WithTimeout(context.Background(), *mysqlQueryTimeout, "./go/vt/vtgate/plugin_mysql_server.go:265")
defer cancel()
} else {
ctx = context.Background()
diff --git a/go/vt/vtgate/resolver.go b/go/vt/vtgate/resolver.go
index be535df38..813723054 100644
--- a/go/vt/vtgate/resolver.go
+++ b/go/vt/vtgate/resolver.go
@@ -26,6 +26,7 @@ import (
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
@@ -347,7 +348,7 @@ func (res *Resolver) UpdateStream(ctx context.Context, keyspace string, shard st
// VStream streams events from one target. This function ensures that events of each
// transaction are streamed together, along with the corresponding GTID.
func (res *Resolver) VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, send func(events []*binlogdatapb.VEvent) error) error {
- ctx, cancel := context.WithCancel(ctx)
+ ctx, cancel := context2.WithCancel(ctx, "./go/vt/vtgate/resolver.go:350")
defer cancel()
// mu protects sending on ch, err and positions.
diff --git a/go/vt/vtgate/resolver_test.go b/go/vt/vtgate/resolver_test.go
index e26a77e5e..67748fb38 100644
--- a/go/vt/vtgate/resolver_test.go
+++ b/go/vt/vtgate/resolver_test.go
@@ -27,6 +27,7 @@ import (
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/key"
@@ -570,7 +571,7 @@ func TestResolverVStream(t *testing.T) {
}}
sbc0.AddVStreamEvents(send2, nil)
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vtgate/resolver_test.go:573")
defer cancel()
count := 1
@@ -619,7 +620,7 @@ func TestResolverVStreamChunks(t *testing.T) {
}
sbc1.AddVStreamEvents([]*binlogdatapb.VEvent{{Type: binlogdatapb.VEventType_COMMIT}}, nil)
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vtgate/resolver_test.go:622")
defer cancel()
rowEncountered := false
@@ -695,7 +696,7 @@ func TestResolverVStreamMulti(t *testing.T) {
}
sbc1.AddVStreamEvents(send1, nil)
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vtgate/resolver_test.go:698")
defer cancel()
var got *binlogdatapb.VGtid
@@ -760,7 +761,7 @@ func TestResolverVStreamRetry(t *testing.T) {
sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "cc"))
sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "final error"))
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vtgate/resolver_test.go:763")
defer cancel()
count := 0
@@ -819,7 +820,7 @@ func TestResolverVStreamHeartbeat(t *testing.T) {
}}
sbc0.AddVStreamEvents(send1, nil)
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vtgate/resolver_test.go:822")
defer cancel()
vgtid := &binlogdatapb.VGtid{
diff --git a/go/vt/vtgate/scatter_conn.go b/go/vt/vtgate/scatter_conn.go
index 3bc5cb796..fff03a706 100644
--- a/go/vt/vtgate/scatter_conn.go
+++ b/go/vt/vtgate/scatter_conn.go
@@ -25,6 +25,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/concurrency"
@@ -546,7 +547,7 @@ func (tt *timeTracker) Record(target *querypb.Target) time.Time {
func (stc *ScatterConn) MessageStream(ctx context.Context, rss []*srvtopo.ResolvedShard, name string, callback func(*sqltypes.Result) error) error {
// The cancelable context is used for handling errors
// from individual streams.
- ctx, cancel := context.WithCancel(ctx)
+ ctx, cancel := context2.WithCancel(ctx, "./go/vt/vtgate/scatter_conn.go:549")
defer cancel()
// mu is used to merge multiple callback calls into one.
diff --git a/go/vt/vtgate/vcursor_impl.go b/go/vt/vtgate/vcursor_impl.go
index 8da8069b3..13dbec234 100644
--- a/go/vt/vtgate/vcursor_impl.go
+++ b/go/vt/vtgate/vcursor_impl.go
@@ -22,6 +22,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/sqlparser"
@@ -82,7 +83,7 @@ func (vc *vcursorImpl) MaxMemoryRows() int {
// SetContextTimeout updates context and sets a timeout.
func (vc *vcursorImpl) SetContextTimeout(timeout time.Duration) context.CancelFunc {
- ctx, cancel := context.WithTimeout(vc.ctx, timeout)
+ ctx, cancel := context2.WithTimeout(vc.ctx, timeout, "./go/vt/vtgate/vcursor_impl.go:85")
vc.ctx = ctx
return cancel
}
diff --git a/go/vt/vtgate/vtgate_test.go b/go/vt/vtgate/vtgate_test.go
index 8a8093666..b90db46d0 100644
--- a/go/vt/vtgate/vtgate_test.go
+++ b/go/vt/vtgate/vtgate_test.go
@@ -29,6 +29,7 @@ import (
"golang.org/x/net/context"
"github.com/golang/protobuf/proto"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/key"
@@ -1224,7 +1225,7 @@ func TestVTGateMessageStreamSharded(t *testing.T) {
_ = hcVTGateTest.AddTestTablet("aa", "1.1.1.1", 1002, ks, "20-40", topodatapb.TabletType_MASTER, true, 1, nil)
ch := make(chan *sqltypes.Result)
done := make(chan struct{})
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vtgate/vtgate_test.go:1227")
go func() {
kr := &topodatapb.KeyRange{End: []byte{0x40}}
err := rpcVTGate.MessageStream(ctx, ks, "", kr, "msg", func(qr *sqltypes.Result) error {
@@ -1269,7 +1270,7 @@ func TestVTGateMessageStreamUnsharded(t *testing.T) {
_ = hcVTGateTest.AddTestTablet("aa", "1.1.1.1", 1001, ks, "0", topodatapb.TabletType_MASTER, true, 1, nil)
ch := make(chan *sqltypes.Result)
done := make(chan struct{})
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vtgate/vtgate_test.go:1272")
go func() {
err := rpcVTGate.MessageStream(ctx, ks, "0", nil, "msg", func(qr *sqltypes.Result) error {
select {
@@ -1304,7 +1305,7 @@ func TestVTGateMessageStreamRetry(t *testing.T) {
_ = hcVTGateTest.AddTestTablet("aa", "1.1.1.1", 1001, ks, "0", topodatapb.TabletType_MASTER, true, 1, nil)
ch := make(chan *sqltypes.Result)
done := make(chan struct{})
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vtgate/vtgate_test.go:1307")
go func() {
err := rpcVTGate.MessageStream(ctx, ks, "0", nil, "msg", func(qr *sqltypes.Result) error {
select {
@@ -1347,7 +1348,7 @@ func TestVTGateMessageStreamUnavailable(t *testing.T) {
tablet.MustFailCodes[vtrpcpb.Code_UNAVAILABLE] = 1
ch := make(chan *sqltypes.Result)
done := make(chan struct{})
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vtgate/vtgate_test.go:1350")
go func() {
err := rpcVTGate.MessageStream(ctx, ks, "0", nil, "msg", func(qr *sqltypes.Result) error {
select {
diff --git a/go/vt/vtqueryserver/plugin_mysql_server.go b/go/vt/vtqueryserver/plugin_mysql_server.go
index a2593b95a..85fed96d8 100644
--- a/go/vt/vtqueryserver/plugin_mysql_server.go
+++ b/go/vt/vtqueryserver/plugin_mysql_server.go
@@ -25,6 +25,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/callerid"
@@ -75,7 +76,7 @@ func (mh *proxyHandler) ConnectionClosed(c *mysql.Conn) {
var ctx context.Context
var cancel context.CancelFunc
if *mysqlQueryTimeout != 0 {
- ctx, cancel = context.WithTimeout(context.Background(), *mysqlQueryTimeout)
+ ctx, cancel = context2.WithTimeout(context.Background(), *mysqlQueryTimeout, "./go/vt/vtqueryserver/plugin_mysql_server.go:78")
defer cancel()
} else {
ctx = context.Background()
@@ -90,7 +91,7 @@ func (mh *proxyHandler) ComQuery(c *mysql.Conn, query string, callback func(*sql
var ctx context.Context
var cancel context.CancelFunc
if *mysqlQueryTimeout != 0 {
- ctx, cancel = context.WithTimeout(context.Background(), *mysqlQueryTimeout)
+ ctx, cancel = context2.WithTimeout(context.Background(), *mysqlQueryTimeout, "./go/vt/vtqueryserver/plugin_mysql_server.go:93")
defer cancel()
} else {
ctx = context.Background()
diff --git a/go/vt/vttablet/agentrpctest/test_agent_rpc.go b/go/vt/vttablet/agentrpctest/test_agent_rpc.go
index 5632b66ae..44d6d5ac8 100644
--- a/go/vt/vttablet/agentrpctest/test_agent_rpc.go
+++ b/go/vt/vttablet/agentrpctest/test_agent_rpc.go
@@ -28,6 +28,7 @@ import (
"golang.org/x/net/context"
"github.com/golang/protobuf/proto"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/hook"
"vitess.io/vitess/go/vt/logutil"
@@ -201,7 +202,7 @@ func agentRPCTestPingPanic(ctx context.Context, t *testing.T, client tmclient.Ta
// RPCs failed due to an expired context before .Dial().
func agentRPCTestDialExpiredContext(ctx context.Context, t *testing.T, client tmclient.TabletManagerClient, tablet *topodatapb.Tablet) {
// Using a timeout of 0 here such that .Dial() will fail immediately.
- expiredCtx, cancel := context.WithTimeout(ctx, 0)
+ expiredCtx, cancel := context2.WithTimeout(ctx, 0, "./go/vt/vttablet/agentrpctest/test_agent_rpc.go:204")
defer cancel()
err := client.Ping(expiredCtx, tablet)
if err == nil {
@@ -227,7 +228,7 @@ func agentRPCTestRPCTimeout(ctx context.Context, t *testing.T, client tmclient.T
// NOTE: This might still race e.g. when test execution takes too long the
// context will be expired in dial() already. In such cases coverage
// will be reduced but the test will not flake.
- shortCtx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
+ shortCtx, cancel := context2.WithTimeout(ctx, 10*time.Millisecond, "./go/vt/vttablet/agentrpctest/test_agent_rpc.go:230")
defer cancel()
fakeAgent.setSlow(true)
defer func() { fakeAgent.setSlow(false) }()
diff --git a/go/vt/vttablet/endtoend/misc_test.go b/go/vt/vttablet/endtoend/misc_test.go
index 39af7ba16..eb2a8738d 100644
--- a/go/vt/vttablet/endtoend/misc_test.go
+++ b/go/vt/vttablet/endtoend/misc_test.go
@@ -29,6 +29,7 @@ import (
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/callerid"
@@ -467,7 +468,7 @@ func TestStreamHealth_Expired(t *testing.T) {
var health *querypb.StreamHealthResponse
framework.Server.BroadcastHealth(0, nil, time.Millisecond)
time.Sleep(5 * time.Millisecond)
- ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100)
+ ctx, cancel := context2.WithTimeout(context.Background(), time.Millisecond*100, "./go/vt/vttablet/endtoend/misc_test.go:470")
defer cancel()
if err := framework.Server.StreamHealth(ctx, func(shr *querypb.StreamHealthResponse) error {
health = shr
diff --git a/go/vt/vttablet/grpctabletconn/conn.go b/go/vt/vttablet/grpctabletconn/conn.go
index aae4da11a..1ef34b9c1 100644
--- a/go/vt/vttablet/grpctabletconn/conn.go
+++ b/go/vt/vttablet/grpctabletconn/conn.go
@@ -23,6 +23,7 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/netutil"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/callerid"
@@ -148,7 +149,7 @@ func (conn *gRPCQueryClient) StreamExecute(ctx context.Context, target *querypb.
// no direct API to end a stream from the client side. If callback
// returns an error, we return from the function. The deferred
// cancel will then cause the stream to be terminated.
- ctx, cancel := context.WithCancel(ctx)
+ ctx, cancel := context2.WithCancel(ctx, "./go/vt/vttablet/grpctabletconn/conn.go:151")
defer cancel()
stream, err := func() (queryservicepb.Query_StreamExecuteClient, error) {
@@ -493,7 +494,7 @@ func (conn *gRPCQueryClient) BeginExecuteBatch(ctx context.Context, target *quer
// MessageStream streams messages.
func (conn *gRPCQueryClient) MessageStream(ctx context.Context, target *querypb.Target, name string, callback func(*sqltypes.Result) error) error {
// Please see comments in StreamExecute to see how this works.
- ctx, cancel := context.WithCancel(ctx)
+ ctx, cancel := context2.WithCancel(ctx, "./go/vt/vttablet/grpctabletconn/conn.go:496")
defer cancel()
stream, err := func() (queryservicepb.Query_MessageStreamClient, error) {
@@ -594,7 +595,7 @@ func (conn *gRPCQueryClient) SplitQuery(
// StreamHealth starts a streaming RPC for VTTablet health status updates.
func (conn *gRPCQueryClient) StreamHealth(ctx context.Context, callback func(*querypb.StreamHealthResponse) error) error {
// Please see comments in StreamExecute to see how this works.
- ctx, cancel := context.WithCancel(ctx)
+ ctx, cancel := context2.WithCancel(ctx, "./go/vt/vttablet/grpctabletconn/conn.go:597")
defer cancel()
stream, err := func() (queryservicepb.Query_StreamHealthClient, error) {
@@ -630,7 +631,7 @@ func (conn *gRPCQueryClient) StreamHealth(ctx context.Context, callback func(*qu
// UpdateStream starts a streaming query to VTTablet.
func (conn *gRPCQueryClient) UpdateStream(ctx context.Context, target *querypb.Target, position string, timestamp int64, callback func(*querypb.StreamEvent) error) error {
// Please see comments in StreamExecute to see how this works.
- ctx, cancel := context.WithCancel(ctx)
+ ctx, cancel := context2.WithCancel(ctx, "./go/vt/vttablet/grpctabletconn/conn.go:633")
defer cancel()
stream, err := func() (queryservicepb.Query_UpdateStreamClient, error) {
diff --git a/go/vt/vttablet/heartbeat/reader.go b/go/vt/vttablet/heartbeat/reader.go
index 54dbc72b8..b66774860 100644
--- a/go/vt/vttablet/heartbeat/reader.go
+++ b/go/vt/vttablet/heartbeat/reader.go
@@ -21,6 +21,7 @@ import (
"sync"
"time"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/vterrors"
"golang.org/x/net/context"
@@ -149,7 +150,7 @@ func (r *Reader) GetLatest() (time.Duration, error) {
func (r *Reader) readHeartbeat() {
defer tabletenv.LogError()
- ctx, cancel := context.WithDeadline(context.Background(), r.now().Add(r.interval))
+ ctx, cancel := context2.WithDeadline(context.Background(), r.now().Add(r.interval), "./go/vt/vttablet/heartbeat/reader.go:152")
defer cancel()
res, err := r.fetchMostRecentHeartbeat(ctx)
diff --git a/go/vt/vttablet/heartbeat/writer.go b/go/vt/vttablet/heartbeat/writer.go
index e84f75920..da9ee67b7 100644
--- a/go/vt/vttablet/heartbeat/writer.go
+++ b/go/vt/vttablet/heartbeat/writer.go
@@ -21,6 +21,7 @@ import (
"sync"
"time"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/vterrors"
"golang.org/x/net/context"
@@ -203,7 +204,7 @@ func (w *Writer) bindHeartbeatVars(query string) (string, error) {
// writeHeartbeat updates the heartbeat row for this tablet with the current time in nanoseconds.
func (w *Writer) writeHeartbeat() {
defer tabletenv.LogError()
- ctx, cancel := context.WithDeadline(context.Background(), w.now().Add(w.interval))
+ ctx, cancel := context2.WithDeadline(context.Background(), w.now().Add(w.interval), "./go/vt/vttablet/heartbeat/writer.go:206")
defer cancel()
update, err := w.bindHeartbeatVars(sqlUpdateHeartbeat)
if err != nil {
diff --git a/go/vt/vttablet/tabletmanager/action_agent.go b/go/vt/vttablet/tabletmanager/action_agent.go
index 7fe05a7b5..fe01c4d4a 100644
--- a/go/vt/vttablet/tabletmanager/action_agent.go
+++ b/go/vt/vttablet/tabletmanager/action_agent.go
@@ -44,6 +44,7 @@ import (
"sync"
"time"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/vterrors"
"golang.org/x/net/context"
@@ -746,7 +747,7 @@ func (agent *ActionAgent) checkTabletMysqlPort(ctx context.Context, tablet *topo
// Update the port in the topology. Use a shorter timeout, so if
// the topo server is busy / throttling us, we don't hang forever here.
// The healthcheck go routine will try again next time.
- ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
+ ctx, cancel := context2.WithTimeout(ctx, 10*time.Second, "./go/vt/vttablet/tabletmanager/action_agent.go:749")
defer cancel()
if !agent.waitingForMysql {
log.Warningf("MySQL port has changed from %v to %v, updating it in tablet record", topoproto.MysqlPort(tablet), mport)
diff --git a/go/vt/vttablet/tabletmanager/init_tablet.go b/go/vt/vttablet/tabletmanager/init_tablet.go
index 47e5c0091..a731a13e0 100644
--- a/go/vt/vttablet/tabletmanager/init_tablet.go
+++ b/go/vt/vttablet/tabletmanager/init_tablet.go
@@ -24,8 +24,7 @@ import (
"fmt"
"time"
- "golang.org/x/net/context"
-
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/flagutil"
"vitess.io/vitess/go/netutil"
"vitess.io/vitess/go/vt/log"
@@ -84,7 +83,7 @@ func (agent *ActionAgent) InitTablet(port, gRPCPort int32) error {
// Create a context for this whole operation. Note we will
// retry some actions upon failure up to this context expires.
- ctx, cancel := context.WithTimeout(agent.batchCtx, *initTimeout)
+ ctx, cancel := context2.WithTimeout(agent.batchCtx, *initTimeout, "./go/vt/vttablet/tabletmanager/init_tablet.go:87")
defer cancel()
// Read the shard, create it if necessary.
diff --git a/go/vt/vttablet/tabletmanager/replication_reporter.go b/go/vt/vttablet/tabletmanager/replication_reporter.go
index 4ec3b1241..5f3ca9ecf 100644
--- a/go/vt/vttablet/tabletmanager/replication_reporter.go
+++ b/go/vt/vttablet/tabletmanager/replication_reporter.go
@@ -22,6 +22,7 @@ import (
"html/template"
"time"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/vterrors"
"golang.org/x/net/context"
@@ -68,7 +69,7 @@ func (r *replicationReporter) Report(isSlaveType, shouldQueryServiceBeRunning bo
log.Infof("Slave is stopped. Running with --disable_active_reparents so will not try to reconnect to master...")
} else {
log.Infof("Slave is stopped. Trying to reconnect to master...")
- ctx, cancel := context.WithTimeout(r.agent.batchCtx, 5*time.Second)
+ ctx, cancel := context2.WithTimeout(r.agent.batchCtx, 5*time.Second, "./go/vt/vttablet/tabletmanager/replication_reporter.go:71")
if err := repairReplication(ctx, r.agent); err != nil {
log.Infof("Failed to reconnect to master: %v", err)
}
diff --git a/go/vt/vttablet/tabletmanager/rpc_external_reparent.go b/go/vt/vttablet/tabletmanager/rpc_external_reparent.go
index 7b066ed1e..c389dfe3b 100644
--- a/go/vt/vttablet/tabletmanager/rpc_external_reparent.go
+++ b/go/vt/vttablet/tabletmanager/rpc_external_reparent.go
@@ -23,6 +23,7 @@ import (
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/event"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/trace"
@@ -116,7 +117,7 @@ func (agent *ActionAgent) TabletExternallyReparented(ctx context.Context, extern
agent.updateState(ctx, newTablet, "fastTabletExternallyReparented")
// Start the finalize stage with a background context, but connect the trace.
- bgCtx, cancel := context.WithTimeout(agent.batchCtx, *finalizeReparentTimeout)
+ bgCtx, cancel := context2.WithTimeout(agent.batchCtx, *finalizeReparentTimeout, "./go/vt/vttablet/tabletmanager/rpc_external_reparent.go:119")
bgCtx = trace.CopySpan(bgCtx, ctx)
agent.finalizeReparentCtx = bgCtx
go func() {
diff --git a/go/vt/vttablet/tabletmanager/rpc_replication.go b/go/vt/vttablet/tabletmanager/rpc_replication.go
index 9ef143979..850fb57ee 100644
--- a/go/vt/vttablet/tabletmanager/rpc_replication.go
+++ b/go/vt/vttablet/tabletmanager/rpc_replication.go
@@ -21,6 +21,7 @@ import (
"fmt"
"time"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/vterrors"
"golang.org/x/net/context"
@@ -103,7 +104,7 @@ func (agent *ActionAgent) StopSlaveMinimum(ctx context.Context, position string,
if err != nil {
return "", err
}
- waitCtx, cancel := context.WithTimeout(ctx, waitTime)
+ waitCtx, cancel := context2.WithTimeout(ctx, waitTime, "./go/vt/vttablet/tabletmanager/rpc_replication.go:106")
defer cancel()
if err := agent.MysqlDaemon.WaitMasterPos(waitCtx, pos); err != nil {
return "", err
@@ -153,7 +154,7 @@ func (agent *ActionAgent) StartSlaveUntilAfter(ctx context.Context, position str
}
defer agent.unlock()
- waitCtx, cancel := context.WithTimeout(ctx, waitTime)
+ waitCtx, cancel := context2.WithTimeout(ctx, waitTime, "./go/vt/vttablet/tabletmanager/rpc_replication.go:156")
defer cancel()
pos, err := mysql.DecodePosition(position)
diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go
index 41c333aa9..dc8ee78eb 100644
--- a/go/vt/vttablet/tabletmanager/vreplication/controller.go
+++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go
@@ -22,6 +22,7 @@ import (
"strconv"
"time"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/vterrors"
"github.com/golang/protobuf/proto"
@@ -106,7 +107,7 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor
ct.tabletPicker = tp
// cancel
- ctx, ct.cancel = context.WithCancel(ctx)
+ ctx, ct.cancel = context2.WithCancel(ctx, "./go/vt/vttablet/tabletmanager/vreplication/controller.go:109")
go ct.run(ctx)
diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller_test.go b/go/vt/vttablet/tabletmanager/vreplication/controller_test.go
index ba858cf04..630732948 100644
--- a/go/vt/vttablet/tabletmanager/vreplication/controller_test.go
+++ b/go/vt/vttablet/tabletmanager/vreplication/controller_test.go
@@ -24,6 +24,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon"
@@ -223,7 +224,7 @@ func TestControllerCanceledContext(t *testing.T) {
"source": fmt.Sprintf(`keyspace:"%s" shard:"0" key_range:<end:"\200" > `, env.KeyspaceName),
}
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletmanager/vreplication/controller_test.go:226")
cancel()
ct, err := newController(ctx, params, nil, nil, env.TopoServ, env.Cells[0], "rdonly", nil)
if err != nil {
diff --git a/go/vt/vttablet/tabletmanager/vreplication/engine.go b/go/vt/vttablet/tabletmanager/vreplication/engine.go
index 9a08a9746..618deca59 100644
--- a/go/vt/vttablet/tabletmanager/vreplication/engine.go
+++ b/go/vt/vttablet/tabletmanager/vreplication/engine.go
@@ -24,6 +24,7 @@ import (
"time"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
@@ -109,7 +110,7 @@ func (vre *Engine) Open(ctx context.Context) error {
return nil
}
- vre.ctx, vre.cancel = context.WithCancel(ctx)
+ vre.ctx, vre.cancel = context2.WithCancel(ctx, "./go/vt/vttablet/tabletmanager/vreplication/engine.go:112")
vre.isOpen = true
if err := vre.initAll(); err != nil {
go vre.Close()
diff --git a/go/vt/vttablet/tabletmanager/vreplication/engine_test.go b/go/vt/vttablet/tabletmanager/vreplication/engine_test.go
index 27f2d31d6..f3de8a862 100644
--- a/go/vt/vttablet/tabletmanager/vreplication/engine_test.go
+++ b/go/vt/vttablet/tabletmanager/vreplication/engine_test.go
@@ -23,6 +23,7 @@ import (
"time"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
@@ -364,7 +365,7 @@ func TestWaitForPosCancel(t *testing.T) {
sqltypes.NewVarBinary("Running"),
sqltypes.NewVarBinary(""),
}}}, nil)
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletmanager/vreplication/engine_test.go:367")
cancel()
err := vre.WaitForPos(ctx, 1, "MariaDB/0-1-1084")
if err == nil || err != context.Canceled {
diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier.go
index 37152f006..83ebb5bd6 100644
--- a/go/vt/vttablet/tabletmanager/vreplication/vcopier.go
+++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier.go
@@ -27,6 +27,7 @@ import (
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
@@ -115,7 +116,7 @@ func (vc *vcopier) copyNext(ctx context.Context, settings binlogplayer.VRSetting
}
func (vc *vcopier) catchup(ctx context.Context, copyState map[string]*sqltypes.Result) error {
- ctx, cancel := context.WithCancel(ctx)
+ ctx, cancel := context2.WithCancel(ctx, "./go/vt/vttablet/tabletmanager/vreplication/vcopier.go:118")
defer cancel()
settings, err := binlogplayer.ReadVRSettings(vc.vr.dbClient, vc.vr.id)
@@ -182,7 +183,7 @@ func (vc *vcopier) copyTable(ctx context.Context, tableName string, copyState ma
}
defer vsClient.Close(ctx)
- ctx, cancel := context.WithTimeout(ctx, copyTimeout)
+ ctx, cancel := context2.WithTimeout(ctx, copyTimeout, "./go/vt/vttablet/tabletmanager/vreplication/vcopier.go:185")
defer cancel()
target := &querypb.Target{
diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go
index 3a4bfd1ff..8681eccf8 100644
--- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go
+++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go
@@ -24,6 +24,7 @@ import (
"time"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
@@ -113,7 +114,7 @@ func (vp *vplayer) fetchAndApply(ctx context.Context) error {
return fmt.Errorf("error dialing tablet: %v", err)
}
defer vsClient.Close(ctx)
- ctx, cancel := context.WithCancel(ctx)
+ ctx, cancel := context2.WithCancel(ctx, "./go/vt/vttablet/tabletmanager/vreplication/vplayer.go:116")
defer cancel()
relay := newRelayLog(ctx, relayLogMaxItems, relayLogMaxSize)
diff --git a/go/vt/vttablet/tabletserver/connpool/dbconn_test.go b/go/vt/vttablet/tabletserver/connpool/dbconn_test.go
index aad986ee7..855a5622f 100644
--- a/go/vt/vttablet/tabletserver/connpool/dbconn_test.go
+++ b/go/vt/vttablet/tabletserver/connpool/dbconn_test.go
@@ -26,6 +26,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/fakesqldb"
"vitess.io/vitess/go/sqltypes"
@@ -62,7 +63,7 @@ func TestDBConnExec(t *testing.T) {
connPool := newPool()
connPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams())
defer connPool.Close()
- ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(10*time.Second))
+ ctx, cancel := context2.WithDeadline(context.Background(), time.Now().Add(10*time.Second), "./go/vt/vttablet/tabletserver/connpool/dbconn_test.go:65")
defer cancel()
dbConn, err := NewDBConn(connPool, db.ConnParams())
if dbConn != nil {
@@ -140,7 +141,7 @@ func TestDBConnDeadline(t *testing.T) {
defer connPool.Close()
db.SetConnDelay(100 * time.Millisecond)
- ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(50*time.Millisecond))
+ ctx, cancel := context2.WithDeadline(context.Background(), time.Now().Add(50*time.Millisecond), "./go/vt/vttablet/tabletserver/connpool/dbconn_test.go:143")
defer cancel()
dbConn, err := NewDBConn(connPool, db.ConnParams())
@@ -163,7 +164,7 @@ func TestDBConnDeadline(t *testing.T) {
startCounts = tabletenv.MySQLStats.Counts()
- ctx, cancel = context.WithDeadline(context.Background(), time.Now().Add(10*time.Second))
+ ctx, cancel = context2.WithDeadline(context.Background(), time.Now().Add(10*time.Second), "./go/vt/vttablet/tabletserver/connpool/dbconn_test.go:166")
defer cancel()
result, err := dbConn.Exec(ctx, sql, 1, false)
@@ -300,7 +301,7 @@ func TestDBConnStream(t *testing.T) {
connPool := newPool()
connPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams())
defer connPool.Close()
- ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(10*time.Second))
+ ctx, cancel := context2.WithDeadline(context.Background(), time.Now().Add(10*time.Second), "./go/vt/vttablet/tabletserver/connpool/dbconn_test.go:303")
defer cancel()
dbConn, err := NewDBConn(connPool, db.ConnParams())
if dbConn != nil {
diff --git a/go/vt/vttablet/tabletserver/messager/message_manager.go b/go/vt/vttablet/tabletserver/messager/message_manager.go
index 2b8b93e4d..a8aadf356 100644
--- a/go/vt/vttablet/tabletserver/messager/message_manager.go
+++ b/go/vt/vttablet/tabletserver/messager/message_manager.go
@@ -23,6 +23,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/sync2"
@@ -56,7 +57,7 @@ type messageReceiver struct {
}
func newMessageReceiver(ctx context.Context, send func(*sqltypes.Result) error) (*messageReceiver, <-chan struct{}) {
- ctx, cancel := context.WithCancel(ctx)
+ ctx, cancel := context2.WithCancel(ctx, "./go/vt/vttablet/tabletserver/messager/message_manager.go:59")
rcv := &messageReceiver{
ctx: ctx,
errChan: make(chan error, 1),
@@ -533,7 +534,7 @@ func (mm *messageManager) postpone(tsv TabletService, name string, ackWaitTime t
return
}
defer mm.postponeSema.Release()
- ctx, cancel := context.WithTimeout(tabletenv.LocalContext(), ackWaitTime)
+ ctx, cancel := context2.WithTimeout(tabletenv.LocalContext(), ackWaitTime, "./go/vt/vttablet/tabletserver/messager/message_manager.go:536")
defer cancel()
if _, err := tsv.PostponeMessages(ctx, nil, name, ids); err != nil {
// This can happen during spikes. Record the incident for monitoring.
@@ -542,7 +543,7 @@ func (mm *messageManager) postpone(tsv TabletService, name string, ackWaitTime t
}
func (mm *messageManager) runPoller() {
- ctx, cancel := context.WithTimeout(tabletenv.LocalContext(), mm.pollerTicks.Interval())
+ ctx, cancel := context2.WithTimeout(tabletenv.LocalContext(), mm.pollerTicks.Interval(), "./go/vt/vttablet/tabletserver/messager/message_manager.go:545")
defer func() {
tabletenv.LogError()
cancel()
@@ -612,7 +613,7 @@ func (mm *messageManager) runPurge() {
// purge is a non-member because it should be called asynchronously and should
// not rely on members of messageManager.
func purge(tsv TabletService, name string, purgeAfter, purgeInterval time.Duration) {
- ctx, cancel := context.WithTimeout(tabletenv.LocalContext(), purgeInterval)
+ ctx, cancel := context2.WithTimeout(tabletenv.LocalContext(), purgeInterval, "./go/vt/vttablet/tabletserver/messager/message_manager.go:615")
defer func() {
tabletenv.LogError()
cancel()
diff --git a/go/vt/vttablet/tabletserver/messager/message_manager_test.go b/go/vt/vttablet/tabletserver/messager/message_manager_test.go
index aa43af69e..6fd142db7 100644
--- a/go/vt/vttablet/tabletserver/messager/message_manager_test.go
+++ b/go/vt/vttablet/tabletserver/messager/message_manager_test.go
@@ -26,6 +26,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/mysql/fakesqldb"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/sync2"
@@ -106,7 +107,7 @@ func TestReceiverCancel(t *testing.T) {
mm.Open()
defer mm.Close()
r1 := newTestReceiver(0)
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletserver/messager/message_manager_test.go:109")
_ = mm.Subscribe(ctx, r1.rcv)
cancel()
// r1 should eventually be unsubscribed.
@@ -232,7 +233,7 @@ func TestMessageManagerSend(t *testing.T) {
// Test that mm stops sending to a canceled receiver.
r2 := newTestReceiver(1)
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletserver/messager/message_manager_test.go:235")
mm.Subscribe(ctx, r2.rcv)
<-r2.ch
mm.Add(&MessageRow{Row: []sqltypes.Value{sqltypes.NewVarBinary("2")}})
@@ -343,7 +344,7 @@ func TestMessageManagerSendEOF(t *testing.T) {
mm.Open()
defer mm.Close()
r1 := newTestReceiver(0)
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletserver/messager/message_manager_test.go:346")
mm.Subscribe(ctx, r1.rcv)
// Pull field info.
<-r1.ch
@@ -514,7 +515,7 @@ func TestMessageManagerPoller(t *testing.T) {
mm.Open()
defer mm.Close()
r1 := newTestReceiver(1)
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletserver/messager/message_manager_test.go:517")
mm.Subscribe(ctx, r1.rcv)
<-r1.ch
mm.pollerTicks.Trigger()
diff --git a/go/vt/vttablet/tabletserver/query_engine.go b/go/vt/vttablet/tabletserver/query_engine.go
index f2994f784..c7eb0e64e 100644
--- a/go/vt/vttablet/tabletserver/query_engine.go
+++ b/go/vt/vttablet/tabletserver/query_engine.go
@@ -28,6 +28,7 @@ import (
"vitess.io/vitess/go/acl"
"vitess.io/vitess/go/cache"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/streamlog"
@@ -383,7 +384,7 @@ func (qe *QueryEngine) getQueryConn(ctx context.Context) (*connpool.DBConn, erro
timeout := qe.connTimeout.Get()
if timeout != 0 {
- ctxTimeout, cancel := context.WithTimeout(ctx, timeout)
+ ctxTimeout, cancel := context2.WithTimeout(ctx, timeout, "./go/vt/vttablet/tabletserver/query_engine.go:386")
defer cancel()
conn, err := qe.conns.Get(ctxTimeout)
if err != nil {
diff --git a/go/vt/vttablet/tabletserver/replication_watcher.go b/go/vt/vttablet/tabletserver/replication_watcher.go
index da3bfd9df..615ce96ee 100644
--- a/go/vt/vttablet/tabletserver/replication_watcher.go
+++ b/go/vt/vttablet/tabletserver/replication_watcher.go
@@ -22,6 +22,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/binlog"
@@ -91,7 +92,7 @@ func (rpw *ReplicationWatcher) Open() {
if rpw.isOpen || !rpw.watchReplication {
return
}
- ctx, cancel := context.WithCancel(tabletenv.LocalContext())
+ ctx, cancel := context2.WithCancel(tabletenv.LocalContext(), "./go/vt/vttablet/tabletserver/replication_watcher.go:94")
rpw.cancel = cancel
rpw.wg.Add(1)
go rpw.Process(ctx, rpw.dbconfigs)
diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go
index 64711af9b..89393bd19 100644
--- a/go/vt/vttablet/tabletserver/tabletserver.go
+++ b/go/vt/vttablet/tabletserver/tabletserver.go
@@ -31,6 +31,7 @@ import (
"golang.org/x/net/context"
"vitess.io/vitess/go/acl"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/history"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
@@ -2016,7 +2017,7 @@ func (tsv *TabletServer) UpdateStream(ctx context.Context, target *querypb.Targe
s := binlog.NewEventStreamer(tsv.dbconfigs.DbaWithDB(), tsv.se, p, timestamp, callback)
// Create a cancelable wrapping context.
- streamCtx, streamCancel := context.WithCancel(ctx)
+ streamCtx, streamCancel := context2.WithCancel(ctx, "./go/vt/vttablet/tabletserver/tabletserver.go:2019")
i := tsv.updateStreamList.Add(streamCancel)
defer tsv.updateStreamList.Delete(i)
@@ -2318,7 +2319,7 @@ func withTimeout(ctx context.Context, timeout time.Duration, options *querypb.Ex
if timeout == 0 || options.GetWorkload() == querypb.ExecuteOptions_DBA || tabletenv.IsLocalContext(ctx) {
return ctx, func() {}
}
- return context.WithTimeout(ctx, timeout)
+ return context2.WithTimeout(ctx, timeout, "./go/vt/vttablet/tabletserver/tabletserver.go:2321")
}
// skipQueryPlanCache returns true if the query plan should be cached
diff --git a/go/vt/vttablet/tabletserver/tabletserver_test.go b/go/vt/vttablet/tabletserver/tabletserver_test.go
index ca5d2c38d..71f8c4218 100644
--- a/go/vt/vttablet/tabletserver/tabletserver_test.go
+++ b/go/vt/vttablet/tabletserver/tabletserver_test.go
@@ -32,6 +32,7 @@ import (
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/fakesqldb"
"vitess.io/vitess/go/sqltypes"
@@ -958,7 +959,7 @@ func TestTabletServerBeginFail(t *testing.T) {
t.Fatalf("StartService failed: %v", err)
}
defer tsv.StopService()
- ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond)
+ ctx, cancel := context2.WithTimeout(context.Background(), 1*time.Nanosecond, "./go/vt/vttablet/tabletserver/tabletserver_test.go:961")
defer cancel()
tsv.Begin(ctx, &target, nil)
_, err = tsv.Begin(ctx, &target, nil)
@@ -2092,7 +2093,7 @@ func TestSerializeTransactionsSameRow_RequestCanceled(t *testing.T) {
}()
// tx2.
- ctxTx2, cancelTx2 := context.WithCancel(ctx)
+ ctxTx2, cancelTx2 := context2.WithCancel(ctx, "./go/vt/vttablet/tabletserver/tabletserver_test.go:2095")
wg.Add(1)
go func() {
defer wg.Done()
diff --git a/go/vt/vttablet/tabletserver/tx_engine.go b/go/vt/vttablet/tabletserver/tx_engine.go
index 148569a02..e9c73da0f 100644
--- a/go/vt/vttablet/tabletserver/tx_engine.go
+++ b/go/vt/vttablet/tabletserver/tx_engine.go
@@ -23,6 +23,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/timer"
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/concurrency"
@@ -541,7 +542,7 @@ func (te *TxEngine) rollbackPrepared() {
// transactions and calls the notifier on them.
func (te *TxEngine) startWatchdog() {
te.ticks.Start(func() {
- ctx, cancel := context.WithTimeout(tabletenv.LocalContext(), te.abandonAge/4)
+ ctx, cancel := context2.WithTimeout(tabletenv.LocalContext(), te.abandonAge/4, "./go/vt/vttablet/tabletserver/tx_engine.go:544")
defer cancel()
// Raise alerts on prepares that have been unresolved for too long.
diff --git a/go/vt/vttablet/tabletserver/tx_prep_pool.go b/go/vt/vttablet/tabletserver/tx_prep_pool.go
index 184431cc4..6de13cb1b 100644
--- a/go/vt/vttablet/tabletserver/tx_prep_pool.go
+++ b/go/vt/vttablet/tabletserver/tx_prep_pool.go
@@ -24,7 +24,7 @@ import (
var (
errPrepCommitting = errors.New("committing")
- errPrepFailed = errors.New("failed")
+ errPrepFailed = errors.New("failed")
)
// TxPreparedPool manages connections for prepared transactions.
diff --git a/go/vt/vttablet/tabletserver/txserializer/tx_serializer_test.go b/go/vt/vttablet/tabletserver/txserializer/tx_serializer_test.go
index bc64e396a..2117fb957 100644
--- a/go/vt/vttablet/tabletserver/txserializer/tx_serializer_test.go
+++ b/go/vt/vttablet/tabletserver/txserializer/tx_serializer_test.go
@@ -27,6 +27,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/streamlog"
"vitess.io/vitess/go/vt/vterrors"
@@ -306,7 +307,7 @@ func TestTxSerializerCancel(t *testing.T) {
}
// tx3 (gets queued and must wait).
- ctx3, cancel3 := context.WithCancel(context.Background())
+ ctx3, cancel3 := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletserver/txserializer/tx_serializer_test.go:309")
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
diff --git a/go/vt/vttablet/tabletserver/vstreamer/engine_test.go b/go/vt/vttablet/tabletserver/vstreamer/engine_test.go
index c88a9aeef..1578ff723 100644
--- a/go/vt/vttablet/tabletserver/vstreamer/engine_test.go
+++ b/go/vt/vttablet/tabletserver/vstreamer/engine_test.go
@@ -23,6 +23,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)
@@ -53,7 +54,7 @@ func TestUpdateVSchema(t *testing.T) {
defer env.SetVSchema("{}")
// We have to start at least one stream to start the vschema watcher.
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletserver/vstreamer/engine_test.go:56")
defer cancel()
filter := &binlogdatapb.Filter{
diff --git a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go
index c4114d451..ce98a74dd 100644
--- a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go
+++ b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go
@@ -20,6 +20,7 @@ import (
"context"
"fmt"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/dbconfigs"
@@ -49,7 +50,7 @@ type rowStreamer struct {
}
func newRowStreamer(ctx context.Context, cp *mysql.ConnParams, se *schema.Engine, query string, lastpk []sqltypes.Value, kschema *vindexes.KeyspaceSchema, send func(*binlogdatapb.VStreamRowsResponse) error) *rowStreamer {
- ctx, cancel := context.WithCancel(ctx)
+ ctx, cancel := context2.WithCancel(ctx, "./go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go:52")
return &rowStreamer{
ctx: ctx,
cancel: cancel,
diff --git a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go
index c57135f4d..918db73a7 100644
--- a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go
+++ b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go
@@ -22,6 +22,7 @@ import (
"testing"
"time"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/sqltypes"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
@@ -253,7 +254,7 @@ func TestStreamRowsCancel(t *testing.T) {
})
engine.se.Reload(context.Background())
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go:256")
defer cancel()
err := engine.StreamRows(ctx, "select * from t1", nil, func(rows *binlogdatapb.VStreamRowsResponse) error {
diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
index 91e09e8f3..76ddd1cf8 100644
--- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
+++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
@@ -23,6 +23,7 @@ import (
"io"
"time"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/binlog"
@@ -70,7 +71,7 @@ type streamerPlan struct {
}
func newVStreamer(ctx context.Context, cp *mysql.ConnParams, se *schema.Engine, startPos string, filter *binlogdatapb.Filter, kschema *vindexes.KeyspaceSchema, send func([]*binlogdatapb.VEvent) error) *vstreamer {
- ctx, cancel := context.WithCancel(ctx)
+ ctx, cancel := context2.WithCancel(ctx, "./go/vt/vttablet/tabletserver/vstreamer/vstreamer.go:73")
return &vstreamer{
ctx: ctx,
cancel: cancel,
diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go
index baa49af74..eab8df500 100644
--- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go
+++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go
@@ -23,6 +23,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/mysql"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
@@ -193,7 +194,7 @@ func TestREKeyRange(t *testing.T) {
}
defer env.SetVSchema("{}")
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go:196")
defer cancel()
filter := &binlogdatapb.Filter{
@@ -340,7 +341,7 @@ func TestDDLAddColumn(t *testing.T) {
})
engine.se.Reload(context.Background())
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go:343")
defer cancel()
// Test RE as well as select-based filters.
@@ -406,7 +407,7 @@ func TestDDLDropColumn(t *testing.T) {
})
engine.se.Reload(context.Background())
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go:409")
defer cancel()
ch := make(chan []*binlogdatapb.VEvent)
@@ -771,7 +772,7 @@ func TestMinimalMode(t *testing.T) {
"set @@session.binlog_row_image='full'",
})
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go:774")
defer cancel()
ch := make(chan []*binlogdatapb.VEvent)
@@ -810,7 +811,7 @@ func TestStatementMode(t *testing.T) {
"set @@session.binlog_format='row'",
})
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go:813")
defer cancel()
ch := make(chan []*binlogdatapb.VEvent)
@@ -829,7 +830,7 @@ func TestStatementMode(t *testing.T) {
func runCases(t *testing.T, filter *binlogdatapb.Filter, testcases []testcase) {
t.Helper()
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go:832")
defer cancel()
ch := startStream(ctx, t, filter)
diff --git a/go/vt/vttest/mysqlctl.go b/go/vt/vttest/mysqlctl.go
index d7e135dbf..8a7f6b4fa 100644
--- a/go/vt/vttest/mysqlctl.go
+++ b/go/vt/vttest/mysqlctl.go
@@ -25,6 +25,7 @@ import (
"strings"
"time"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/mysql"
)
@@ -52,7 +53,7 @@ type Mysqlctl struct {
// Setup spawns a new mysqld service and initializes it with the defaults.
// The service is kept running in the background until TearDown() is called.
func (ctl *Mysqlctl) Setup() error {
- ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
+ ctx, cancel := context2.WithTimeout(context.Background(), 60*time.Second, "./go/vt/vttest/mysqlctl.go:55")
defer cancel()
cmd := exec.CommandContext(ctx,
@@ -76,7 +77,7 @@ func (ctl *Mysqlctl) Setup() error {
// TearDown shutdowns the running mysqld service
func (ctl *Mysqlctl) TearDown() error {
- ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
+ ctx, cancel := context2.WithTimeout(context.Background(), 60*time.Second, "./go/vt/vttest/mysqlctl.go:79")
defer cancel()
cmd := exec.CommandContext(ctx,
diff --git a/go/vt/worker/chunk.go b/go/vt/worker/chunk.go
index 4b775b470..1b9b9e80b 100644
--- a/go/vt/worker/chunk.go
+++ b/go/vt/worker/chunk.go
@@ -19,6 +19,7 @@ package worker
import (
"fmt"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
@@ -94,7 +95,7 @@ func generateChunks(ctx context.Context, wr *wrangler.Wrangler, tablet *topodata
// Get the MIN and MAX of the leading column of the primary key.
query := fmt.Sprintf("SELECT MIN(%v), MAX(%v) FROM %v.%v", sqlescape.EscapeID(td.PrimaryKeyColumns[0]), sqlescape.EscapeID(td.PrimaryKeyColumns[0]), sqlescape.EscapeID(topoproto.TabletDbName(tablet)), sqlescape.EscapeID(td.Name))
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/chunk.go:97")
qr, err := wr.TabletManagerClient().ExecuteFetchAsApp(shortCtx, tablet, true, []byte(query), 1)
cancel()
if err != nil {
diff --git a/go/vt/worker/diff_utils.go b/go/vt/worker/diff_utils.go
index 19d2dcbca..895267e0f 100644
--- a/go/vt/worker/diff_utils.go
+++ b/go/vt/worker/diff_utils.go
@@ -25,6 +25,7 @@ import (
"strings"
"time"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tmclient"
@@ -63,7 +64,7 @@ type QueryResultReader struct {
// NewQueryResultReaderForTablet creates a new QueryResultReader for
// the provided tablet / sql query
func NewQueryResultReaderForTablet(ctx context.Context, ts *topo.Server, tabletAlias *topodatapb.TabletAlias, sql string) (*QueryResultReader, error) {
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/diff_utils.go:66")
tablet, err := ts.GetTablet(shortCtx, tabletAlias)
cancel()
if err != nil {
@@ -97,7 +98,7 @@ func NewQueryResultReaderForTablet(ctx context.Context, ts *topo.Server, tabletA
// NewTransactionalQueryResultReaderForTablet creates a new QueryResultReader for
// the provided tablet / sql query, and runs it in an existing transaction
func NewTransactionalQueryResultReaderForTablet(ctx context.Context, ts *topo.Server, tabletAlias *topodatapb.TabletAlias, sql string, txID int64) (*QueryResultReader, error) {
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/diff_utils.go:100")
tablet, err := ts.GetTablet(shortCtx, tabletAlias)
cancel()
if err != nil {
@@ -130,7 +131,7 @@ func NewTransactionalQueryResultReaderForTablet(ctx context.Context, ts *topo.Se
// RollbackTransaction rolls back the transaction
func RollbackTransaction(ctx context.Context, ts *topo.Server, tabletAlias *topodatapb.TabletAlias, txID int64) error {
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/diff_utils.go:133")
tablet, err := ts.GetTablet(shortCtx, tabletAlias)
cancel()
if err != nil {
diff --git a/go/vt/worker/executor.go b/go/vt/worker/executor.go
index 07ec11d13..32a6beb2c 100644
--- a/go/vt/worker/executor.go
+++ b/go/vt/worker/executor.go
@@ -20,6 +20,7 @@ import (
"fmt"
"time"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/vterrors"
"golang.org/x/net/context"
@@ -117,7 +118,7 @@ func (e *executor) refreshState(ctx context.Context) error {
func (e *executor) fetchWithRetries(ctx context.Context, action func(ctx context.Context, tablet *topodatapb.Tablet) error) error {
retryDuration := *retryDuration
// We should keep retrying up until the retryCtx runs out.
- retryCtx, retryCancel := context.WithTimeout(ctx, retryDuration)
+ retryCtx, retryCancel := context2.WithTimeout(ctx, retryDuration, "./go/vt/worker/executor.go:120")
defer retryCancel()
// Is this current attempt a retry of a previous attempt?
isRetry := false
@@ -150,7 +151,7 @@ func (e *executor) fetchWithRetries(ctx context.Context, action func(ctx context
// Run the command (in a block since goto above does not allow to introduce
// new variables until the label is reached.)
{
- tryCtx, cancel := context.WithTimeout(retryCtx, 2*time.Minute)
+ tryCtx, cancel := context2.WithTimeout(retryCtx, 2*time.Minute, "./go/vt/worker/executor.go:153")
err = action(tryCtx, master.Tablet)
cancel()
diff --git a/go/vt/worker/instance.go b/go/vt/worker/instance.go
index 371ff734e..0bba03eff 100644
--- a/go/vt/worker/instance.go
+++ b/go/vt/worker/instance.go
@@ -24,6 +24,7 @@ import (
"time"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/tb"
@@ -115,7 +116,7 @@ func (wi *Instance) setAndStartWorker(ctx context.Context, wrk Worker, wr *wrang
wi.currentWorker = wrk
wi.currentMemoryLogger = logutil.NewMemoryLogger()
- wi.currentContext, wi.currentCancelFunc = context.WithCancel(ctx)
+ wi.currentContext, wi.currentCancelFunc = context2.WithCancel(ctx, "./go/vt/worker/instance.go:118")
wi.lastRunError = nil
wi.lastRunStopTime = time.Unix(0, 0)
done := make(chan struct{})
diff --git a/go/vt/worker/legacy_split_clone.go b/go/vt/worker/legacy_split_clone.go
index a191a1b79..890a71207 100644
--- a/go/vt/worker/legacy_split_clone.go
+++ b/go/vt/worker/legacy_split_clone.go
@@ -27,6 +27,7 @@ import (
"sync"
"time"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/vterrors"
"golang.org/x/net/context"
@@ -266,7 +267,7 @@ func (scw *LegacySplitCloneWorker) init(ctx context.Context) error {
var err error
// read the keyspace and validate it
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/legacy_split_clone.go:269")
scw.keyspaceInfo, err = scw.wr.TopoServer().GetKeyspace(shortCtx, scw.keyspace)
cancel()
if err != nil {
@@ -274,7 +275,7 @@ func (scw *LegacySplitCloneWorker) init(ctx context.Context) error {
}
// find the OverlappingShards in the keyspace
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/legacy_split_clone.go:277")
osList, err := topotools.FindOverlappingShards(shortCtx, scw.wr.TopoServer(), scw.keyspace)
cancel()
if err != nil {
@@ -367,7 +368,7 @@ func (scw *LegacySplitCloneWorker) findTargets(ctx context.Context) error {
// get the tablet info for them, and stop their replication
scw.sourceTablets = make([]*topodatapb.Tablet, len(scw.sourceAliases))
for i, alias := range scw.sourceAliases {
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/legacy_split_clone.go:370")
ti, err := scw.wr.TopoServer().GetTablet(shortCtx, alias)
cancel()
if err != nil {
@@ -375,7 +376,7 @@ func (scw *LegacySplitCloneWorker) findTargets(ctx context.Context) error {
}
scw.sourceTablets[i] = ti.Tablet
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/legacy_split_clone.go:378")
err = scw.wr.TabletManagerClient().StopSlave(shortCtx, scw.sourceTablets[i])
cancel()
if err != nil {
@@ -398,7 +399,7 @@ func (scw *LegacySplitCloneWorker) findTargets(ctx context.Context) error {
// Make sure we find a master for each destination shard and log it.
scw.wr.Logger().Infof("Finding a MASTER tablet for each destination shard...")
for _, si := range scw.destinationShards {
- waitCtx, waitCancel := context.WithTimeout(ctx, 10*time.Second)
+ waitCtx, waitCancel := context2.WithTimeout(ctx, 10*time.Second, "./go/vt/worker/legacy_split_clone.go:401")
defer waitCancel()
if err := scw.tsc.WaitForTablets(waitCtx, scw.cell, si.Keyspace(), si.ShardName(), topodatapb.TabletType_MASTER); err != nil {
return vterrors.Wrapf(err, "cannot find MASTER tablet for destination shard for %v/%v", si.Keyspace(), si.ShardName())
@@ -410,7 +411,7 @@ func (scw *LegacySplitCloneWorker) findTargets(ctx context.Context) error {
master := masters[0]
// Get the MySQL database name of the tablet.
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/legacy_split_clone.go:413")
ti, err := scw.wr.TopoServer().GetTablet(shortCtx, master.Tablet.Alias)
cancel()
if err != nil {
@@ -453,7 +454,7 @@ func (scw *LegacySplitCloneWorker) copy(ctx context.Context) error {
// on all source shards. Furthermore, we estimate the number of rows
// in each source shard for each table to be about the same
// (rowCount is used to estimate an ETA)
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/legacy_split_clone.go:456")
sourceSchemaDefinition, err := scw.wr.GetSchema(shortCtx, scw.sourceAliases[0], nil, scw.excludeTables, false /* includeViews */)
cancel()
if err != nil {
@@ -476,7 +477,7 @@ func (scw *LegacySplitCloneWorker) copy(ctx context.Context) error {
mu := sync.Mutex{}
var firstError error
- ctx, cancelCopy := context.WithCancel(ctx)
+ ctx, cancelCopy := context2.WithCancel(ctx, "./go/vt/worker/legacy_split_clone.go:479")
processError := func(format string, args ...interface{}) {
scw.wr.Logger().Errorf(format, args...)
mu.Lock()
@@ -605,7 +606,7 @@ func (scw *LegacySplitCloneWorker) copy(ctx context.Context) error {
sourcePositions := make([]string, len(scw.sourceShards))
// get the current position from the sources
for shardIndex := range scw.sourceShards {
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/legacy_split_clone.go:608")
status, err := scw.wr.TabletManagerClient().SlaveStatus(shortCtx, scw.sourceTablets[shardIndex])
cancel()
if err != nil {
diff --git a/go/vt/worker/legacy_split_clone_test.go b/go/vt/worker/legacy_split_clone_test.go
index 65494ade2..8f721a2c9 100644
--- a/go/vt/worker/legacy_split_clone_test.go
+++ b/go/vt/worker/legacy_split_clone_test.go
@@ -28,6 +28,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/fakesqldb"
"vitess.io/vitess/go/sqltypes"
@@ -468,7 +469,7 @@ func TestLegacySplitCloneV2_NoMasterAvailable(t *testing.T) {
statsRetryCounters.ResetAll()
errs := make(chan error, 1)
go func() {
- ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+ ctx, cancel := context2.WithTimeout(context.Background(), 30*time.Second, "./go/vt/worker/legacy_split_clone_test.go:471")
defer cancel()
for {
diff --git a/go/vt/worker/multi_split_diff.go b/go/vt/worker/multi_split_diff.go
index 9a93ca1e1..c43d6a3d4 100644
--- a/go/vt/worker/multi_split_diff.go
+++ b/go/vt/worker/multi_split_diff.go
@@ -24,6 +24,7 @@ import (
"time"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vttablet/queryservice"
"vitess.io/vitess/go/vt/vttablet/tabletconn"
@@ -200,13 +201,13 @@ func (msdw *MultiSplitDiffWorker) init(ctx context.Context) error {
}
var err error
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:203")
msdw.keyspaceInfo, err = msdw.wr.TopoServer().GetKeyspace(shortCtx, msdw.keyspace)
cancel()
if err != nil {
return vterrors.Wrapf(err, "cannot read keyspace %v", msdw.keyspace)
}
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:209")
msdw.shardInfo, err = msdw.wr.TopoServer().GetShard(shortCtx, msdw.keyspace, msdw.shard)
cancel()
if err != nil {
@@ -228,7 +229,7 @@ func (msdw *MultiSplitDiffWorker) init(ctx context.Context) error {
// findDestinationShards finds all the shards that have filtered replication from the source shard
func (msdw *MultiSplitDiffWorker) findDestinationShards(ctx context.Context) ([]*topo.ShardInfo, error) {
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:231")
keyspaces, err := msdw.wr.TopoServer().GetKeyspaces(shortCtx)
cancel()
if err != nil {
@@ -252,7 +253,7 @@ func (msdw *MultiSplitDiffWorker) findDestinationShards(ctx context.Context) ([]
}
func (msdw *MultiSplitDiffWorker) findShardsInKeyspace(ctx context.Context, keyspace string) ([]*topo.ShardInfo, error) {
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:255")
shards, err := msdw.wr.TopoServer().GetShardNames(shortCtx, keyspace)
cancel()
if err != nil {
@@ -284,7 +285,7 @@ func (msdw *MultiSplitDiffWorker) findShardsInKeyspace(ctx context.Context, keys
}
func (msdw *MultiSplitDiffWorker) getShardInfo(ctx context.Context, keyspace string, shard string) (*topo.ShardInfo, uint32, error) {
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:287")
si, err := msdw.wr.TopoServer().GetShard(shortCtx, keyspace, shard)
cancel()
if err != nil {
@@ -349,14 +350,14 @@ func (msdw *MultiSplitDiffWorker) stopVreplicationOnAll(ctx context.Context, tab
tablet := tabletInfo[i].Tablet
msdw.wr.Logger().Infof("stopping master binlog replication on %v", shardInfo.MasterAlias)
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:352")
_, err := msdw.wr.TabletManagerClient().VReplicationExec(shortCtx, tablet, binlogplayer.StopVReplication(msdw.sourceUID, "for split diff"))
cancel()
if err != nil {
return nil, vterrors.Wrapf(err, "VReplicationExec(stop) for %v failed", shardInfo.MasterAlias)
}
wrangler.RecordVReplicationAction(msdw.cleaner, tablet, binlogplayer.StartVReplication(msdw.sourceUID))
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:359")
p3qr, err := msdw.wr.TabletManagerClient().VReplicationExec(shortCtx, tablet, binlogplayer.ReadVReplicationPos(msdw.sourceUID))
cancel()
if err != nil {
@@ -375,7 +376,7 @@ func (msdw *MultiSplitDiffWorker) stopVreplicationOnAll(ctx context.Context, tab
}
func (msdw *MultiSplitDiffWorker) getMasterTabletInfoForShard(ctx context.Context, shardInfo *topo.ShardInfo) (*topo.TabletInfo, error) {
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:378")
masterInfo, err := msdw.wr.TopoServer().GetTablet(shortCtx, shardInfo.MasterAlias)
cancel()
if err != nil {
@@ -389,7 +390,7 @@ func (msdw *MultiSplitDiffWorker) getMasterTabletInfoForShard(ctx context.Contex
// (add a cleanup task to restart binlog replication on the source tablet, and
// change the existing ChangeSlaveType cleanup action to 'spare' type)
func (msdw *MultiSplitDiffWorker) stopReplicationOnSourceTabletAt(ctx context.Context, destVreplicationPos []string) (string, error) {
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:392")
sourceTablet, err := msdw.wr.TopoServer().GetTablet(shortCtx, msdw.sourceAlias)
cancel()
if err != nil {
@@ -406,14 +407,14 @@ func (msdw *MultiSplitDiffWorker) stopReplicationOnSourceTabletAt(ctx context.Co
msdw.wr.Logger().Infof("stopping slave %v at a minimum of %v", msdw.sourceAlias, vreplicationPos)
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:409")
msdw.wr.TabletManagerClient().StartSlave(shortCtx, sourceTablet.Tablet)
cancel()
if err != nil {
return "", err
}
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:416")
mysqlPos, err = msdw.wr.TabletManagerClient().StopSlaveMinimum(shortCtx, sourceTablet.Tablet, vreplicationPos, *remoteActionsTimeout)
cancel()
if err != nil {
@@ -431,21 +432,21 @@ func (msdw *MultiSplitDiffWorker) stopReplicationOnSourceTabletAt(ctx context.Co
// up to the specified source position, and return the destination position.
func (msdw *MultiSplitDiffWorker) stopVreplicationAt(ctx context.Context, shardInfo *topo.ShardInfo, sourcePosition string, masterInfo *topo.TabletInfo) (string, error) {
msdw.wr.Logger().Infof("Restarting master %v until it catches up to %v", shardInfo.MasterAlias, sourcePosition)
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:434")
_, err := msdw.wr.TabletManagerClient().VReplicationExec(shortCtx, masterInfo.Tablet, binlogplayer.StartVReplicationUntil(msdw.sourceUID, sourcePosition))
cancel()
if err != nil {
return "", vterrors.Wrapf(err, "VReplication(start until) for %v until %v failed", shardInfo.MasterAlias, sourcePosition)
}
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:441")
err = msdw.wr.TabletManagerClient().VReplicationWaitForPos(shortCtx, masterInfo.Tablet, int(msdw.sourceUID), sourcePosition)
cancel()
if err != nil {
return "", vterrors.Wrapf(err, "VReplicationWaitForPos for %v until %v failed", shardInfo.MasterAlias, sourcePosition)
}
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:448")
masterPos, err := msdw.wr.TabletManagerClient().MasterPosition(shortCtx, masterInfo.Tablet)
cancel()
if err != nil {
@@ -464,7 +465,7 @@ func (msdw *MultiSplitDiffWorker) stopReplicationAt(ctx context.Context, destina
} else {
msdw.wr.Logger().Infof("waiting for destination tablet %v to catch up to %v", destinationAlias, masterPos)
}
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:467")
destinationTablet, err := msdw.wr.TopoServer().GetTablet(shortCtx, destinationAlias)
cancel()
if err != nil {
@@ -475,7 +476,7 @@ func (msdw *MultiSplitDiffWorker) stopReplicationAt(ctx context.Context, destina
time.Sleep(1 * time.Minute)
}
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:478")
if msdw.waitForFixedTimeRatherThanGtidSet {
err = msdw.wr.TabletManagerClient().StopSlave(shortCtx, destinationTablet.Tablet)
} else {
@@ -493,7 +494,7 @@ func (msdw *MultiSplitDiffWorker) stopReplicationAt(ctx context.Context, destina
// (remove the cleanup task that does the same)
func (msdw *MultiSplitDiffWorker) startVreplication(ctx context.Context, shardInfo *topo.ShardInfo, masterInfo *topo.TabletInfo) error {
msdw.wr.Logger().Infof("restarting filtered replication on master %v", shardInfo.MasterAlias)
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:496")
_, err := msdw.wr.TabletManagerClient().VReplicationExec(shortCtx, masterInfo.Tablet, binlogplayer.StartVReplication(msdw.sourceUID))
if err != nil {
return vterrors.Wrapf(err, "VReplicationExec(start) failed for %v", shardInfo.MasterAlias)
@@ -560,7 +561,7 @@ func (msdw *MultiSplitDiffWorker) synchronizeSrcAndDestTxState(ctx context.Conte
}
}
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:563")
source, _ := msdw.wr.TopoServer().GetTablet(shortCtx, msdw.sourceAlias)
cancel()
@@ -604,7 +605,7 @@ func (msdw *MultiSplitDiffWorker) synchronizeSrcAndDestTxState(ctx context.Conte
return err
}
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:607")
destTabletInfo, err := msdw.wr.TopoServer().GetTablet(shortCtx, destinationAlias)
cancel()
if err != nil {
@@ -652,7 +653,7 @@ func (msdw *MultiSplitDiffWorker) synchronizeSrcAndDestTxState(ctx context.Conte
func (msdw *MultiSplitDiffWorker) waitForDestinationTabletToReach(ctx context.Context, tablet *topodatapb.Tablet, mysqlPos string) error {
for i := 0; i < 20; i++ {
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:655")
pos, err := msdw.wr.TabletManagerClient().MasterPosition(shortCtx, tablet)
cancel()
if err != nil {
@@ -758,7 +759,7 @@ func (msdw *MultiSplitDiffWorker) gatherSchemaInfo(ctx context.Context) ([]*tabl
wg.Add(1)
go func(i int, destinationAlias *topodatapb.TabletAlias) {
var err error
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:761")
destinationSchemaDefinition, err := msdw.wr.GetSchema(
shortCtx, destinationAlias, nil /* tables */, msdw.excludeTables, false /* includeViews */)
cancel()
@@ -773,7 +774,7 @@ func (msdw *MultiSplitDiffWorker) gatherSchemaInfo(ctx context.Context) ([]*tabl
wg.Add(1)
go func() {
var err error
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:776")
sourceSchemaDefinition, err = msdw.wr.GetSchema(
shortCtx, msdw.sourceAlias, nil /* tables */, msdw.excludeTables, false /* includeViews */)
cancel()
@@ -811,7 +812,7 @@ func (msdw *MultiSplitDiffWorker) diffSchemaInformation(ctx context.Context, des
}
func (msdw *MultiSplitDiffWorker) loadVSchema(ctx context.Context) (*vindexes.KeyspaceSchema, error) {
- shortCtx, cancel := context.WithCancel(ctx)
+ shortCtx, cancel := context2.WithCancel(ctx, "./go/vt/worker/multi_split_diff.go:814")
kschema, err := msdw.wr.TopoServer().GetVSchema(shortCtx, msdw.keyspace)
cancel()
if err != nil {
diff --git a/go/vt/worker/multi_split_diff_cmd.go b/go/vt/worker/multi_split_diff_cmd.go
index 5a6b880c5..1c6cb4e42 100644
--- a/go/vt/worker/multi_split_diff_cmd.go
+++ b/go/vt/worker/multi_split_diff_cmd.go
@@ -25,6 +25,7 @@ import (
"strings"
"sync"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
@@ -123,7 +124,7 @@ func commandMultiSplitDiff(wi *Instance, wr *wrangler.Wrangler, subFlags *flag.F
// shardSources returns all the shards that are SourceShards of at least one other shard.
func shardSources(ctx context.Context, wr *wrangler.Wrangler) ([]map[string]string, error) {
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff_cmd.go:126")
keyspaces, err := wr.TopoServer().GetKeyspaces(shortCtx)
cancel()
if err != nil {
@@ -139,7 +140,7 @@ func shardSources(ctx context.Context, wr *wrangler.Wrangler) ([]map[string]stri
wg.Add(1)
go func(keyspace string) {
defer wg.Done()
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff_cmd.go:142")
shards, err := wr.TopoServer().GetShardNames(shortCtx, keyspace)
cancel()
if err != nil {
@@ -150,7 +151,7 @@ func shardSources(ctx context.Context, wr *wrangler.Wrangler) ([]map[string]stri
wg.Add(1)
go func(keyspace, shard string) {
defer wg.Done()
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff_cmd.go:153")
si, err := wr.TopoServer().GetShard(shortCtx, keyspace, shard)
cancel()
if err != nil {
diff --git a/go/vt/worker/restartable_result_reader.go b/go/vt/worker/restartable_result_reader.go
index 04f95695f..995096e96 100644
--- a/go/vt/worker/restartable_result_reader.go
+++ b/go/vt/worker/restartable_result_reader.go
@@ -22,6 +22,7 @@ import (
"strings"
"time"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/vterrors"
"golang.org/x/net/context"
@@ -225,7 +226,7 @@ func (r *RestartableResultReader) Next() (*sqltypes.Result, error) {
func (r *RestartableResultReader) nextWithRetries() (*sqltypes.Result, error) {
// In case of errors we will keep retrying until retryCtx is done.
- retryCtx, retryCancel := context.WithTimeout(r.ctx, *retryDuration)
+ retryCtx, retryCancel := context2.WithTimeout(r.ctx, *retryDuration, "./go/vt/worker/restartable_result_reader.go:228")
defer retryCancel()
// The first retry is the second attempt because we already tried once in Next()
diff --git a/go/vt/worker/split_clone.go b/go/vt/worker/split_clone.go
index 8b8cd2d00..39fa401c7 100644
--- a/go/vt/worker/split_clone.go
+++ b/go/vt/worker/split_clone.go
@@ -25,6 +25,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/event"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
@@ -527,7 +528,7 @@ func (scw *SplitCloneWorker) init(ctx context.Context) error {
scw.setState(WorkerStateInit)
// read the keyspace and validate it
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_clone.go:530")
var err error
scw.destinationKeyspaceInfo, err = scw.wr.TopoServer().GetKeyspace(shortCtx, scw.destinationKeyspace)
cancel()
@@ -577,7 +578,7 @@ func (scw *SplitCloneWorker) init(ctx context.Context) error {
func (scw *SplitCloneWorker) initShardsForHorizontalResharding(ctx context.Context) error {
// find the OverlappingShards in the keyspace
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_clone.go:580")
osList, err := topotools.FindOverlappingShards(shortCtx, scw.wr.TopoServer(), scw.destinationKeyspace)
cancel()
if err != nil {
@@ -634,7 +635,7 @@ func (scw *SplitCloneWorker) initShardsForVerticalSplit(ctx context.Context) err
}
sourceKeyspace := servedFrom
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_clone.go:637")
shardMap, err := scw.wr.TopoServer().FindAllShardsInKeyspace(shortCtx, sourceKeyspace)
cancel()
if err != nil {
@@ -774,7 +775,7 @@ func (scw *SplitCloneWorker) findOfflineSourceTablets(ctx context.Context) error
// get the tablet info for them, and stop their replication
scw.sourceTablets = make([]*topodatapb.Tablet, len(scw.sourceAliases))
for i, alias := range scw.sourceAliases {
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_clone.go:777")
ti, err := scw.wr.TopoServer().GetTablet(shortCtx, alias)
cancel()
if err != nil {
@@ -782,7 +783,7 @@ func (scw *SplitCloneWorker) findOfflineSourceTablets(ctx context.Context) error
}
scw.sourceTablets[i] = ti.Tablet
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_clone.go:785")
err = scw.wr.TabletManagerClient().StopSlave(shortCtx, scw.sourceTablets[i])
cancel()
if err != nil {
@@ -817,7 +818,7 @@ func (scw *SplitCloneWorker) findTransactionalSources(ctx context.Context) error
// get the tablet info
scw.sourceTablets = make([]*topodatapb.Tablet, 1)
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_clone.go:820")
ti, err := scw.wr.TopoServer().GetTablet(shortCtx, scw.sourceAliases[0])
cancel()
if err != nil {
@@ -843,7 +844,7 @@ func (scw *SplitCloneWorker) findDestinationMasters(ctx context.Context) error {
// Make sure we find a master for each destination shard and log it.
scw.wr.Logger().Infof("Finding a MASTER tablet for each destination shard...")
for _, si := range scw.destinationShards {
- waitCtx, waitCancel := context.WithTimeout(ctx, *waitForHealthyTabletsTimeout)
+ waitCtx, waitCancel := context2.WithTimeout(ctx, *waitForHealthyTabletsTimeout, "./go/vt/worker/split_clone.go:846")
err := scw.tsc.WaitForTablets(waitCtx, scw.cell, si.Keyspace(), si.ShardName(), topodatapb.TabletType_MASTER)
waitCancel()
if err != nil {
@@ -1178,7 +1179,7 @@ func (scw *SplitCloneWorker) clone(ctx context.Context, state StatusWorkerState)
var firstError error
- ctx, cancelCopy := context.WithCancel(ctx)
+ ctx, cancelCopy := context2.WithCancel(ctx, "./go/vt/worker/split_clone.go:1181")
defer cancelCopy()
processError := func(format string, args ...interface{}) {
// in theory we could have two threads see firstError as null and both write to the variable
@@ -1245,7 +1246,7 @@ func (scw *SplitCloneWorker) setUpVReplication(ctx context.Context) error {
sourcePositions[0] = scw.lastPos
} else {
for shardIndex := range scw.sourceShards {
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_clone.go:1248")
status, err := scw.wr.TabletManagerClient().SlaveStatus(shortCtx, scw.sourceTablets[shardIndex])
cancel()
if err != nil {
@@ -1254,7 +1255,7 @@ func (scw *SplitCloneWorker) setUpVReplication(ctx context.Context) error {
sourcePositions[shardIndex] = status.Position
}
}
- cancelableCtx, cancel := context.WithCancel(ctx)
+ cancelableCtx, cancel := context2.WithCancel(ctx, "./go/vt/worker/split_clone.go:1257")
rec := concurrency.AllErrorRecorder{}
handleError := func(e error) {
rec.RecordError(e)
@@ -1319,7 +1320,7 @@ func (scw *SplitCloneWorker) getSourceSchema(ctx context.Context, tablet *topoda
// on all source shards. Furthermore, we estimate the number of rows
// in each source shard for each table to be about the same
// (rowCount is used to estimate an ETA)
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_clone.go:1322")
sourceSchemaDefinition, err := scw.wr.GetSchema(shortCtx, tablet.Alias, scw.tables, scw.excludeTables, false /* includeViews */)
cancel()
if err != nil {
diff --git a/go/vt/worker/split_clone_cmd.go b/go/vt/worker/split_clone_cmd.go
index 4cf6b8795..da7782709 100644
--- a/go/vt/worker/split_clone_cmd.go
+++ b/go/vt/worker/split_clone_cmd.go
@@ -27,6 +27,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/proto/vtrpc"
@@ -151,7 +152,7 @@ func commandSplitClone(wi *Instance, wr *wrangler.Wrangler, subFlags *flag.FlagS
}
func keyspacesWithOverlappingShards(ctx context.Context, wr *wrangler.Wrangler) ([]map[string]string, error) {
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_clone_cmd.go:154")
keyspaces, err := wr.TopoServer().GetKeyspaces(shortCtx)
cancel()
if err != nil {
@@ -166,7 +167,7 @@ func keyspacesWithOverlappingShards(ctx context.Context, wr *wrangler.Wrangler)
wg.Add(1)
go func(keyspace string) {
defer wg.Done()
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_clone_cmd.go:169")
osList, err := topotools.FindOverlappingShards(shortCtx, wr.TopoServer(), keyspace)
cancel()
if err != nil {
diff --git a/go/vt/worker/split_clone_test.go b/go/vt/worker/split_clone_test.go
index 88a83ae6e..7cf717db9 100644
--- a/go/vt/worker/split_clone_test.go
+++ b/go/vt/worker/split_clone_test.go
@@ -27,6 +27,7 @@ import (
"time"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/fakesqldb"
"vitess.io/vitess/go/sqltypes"
@@ -1025,7 +1026,7 @@ func TestSplitCloneV2_NoMasterAvailable(t *testing.T) {
// late because this Go routine looks at it and can run before the worker.
statsRetryCounters.ResetAll()
go func() {
- ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+ ctx, cancel := context2.WithTimeout(context.Background(), 30*time.Second, "./go/vt/worker/split_clone_test.go:1028")
defer cancel()
for {
diff --git a/go/vt/worker/split_diff.go b/go/vt/worker/split_diff.go
index 55b5c084f..0eeb44db6 100644
--- a/go/vt/worker/split_diff.go
+++ b/go/vt/worker/split_diff.go
@@ -21,6 +21,7 @@ import (
"sort"
"sync"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
@@ -187,13 +188,13 @@ func (sdw *SplitDiffWorker) init(ctx context.Context) error {
sdw.SetState(WorkerStateInit)
var err error
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff.go:190")
sdw.keyspaceInfo, err = sdw.wr.TopoServer().GetKeyspace(shortCtx, sdw.keyspace)
cancel()
if err != nil {
return vterrors.Wrapf(err, "cannot read keyspace %v", sdw.keyspace)
}
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff.go:196")
sdw.shardInfo, err = sdw.wr.TopoServer().GetShard(shortCtx, sdw.keyspace, sdw.shard)
cancel()
if err != nil {
@@ -258,7 +259,7 @@ func (sdw *SplitDiffWorker) findTargets(ctx context.Context) error {
// to FindWorkerTablet could attempt to set to DRAIN state the same tablet. Only
// one of these calls to FindWorkerTablet will succeed and the rest will fail.
// The following, makes sures we keep trying to find a worker tablet when this error occur.
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff.go:261")
for {
select {
case <-shortCtx.Done():
@@ -297,7 +298,7 @@ func (sdw *SplitDiffWorker) findTargets(ctx context.Context) error {
func (sdw *SplitDiffWorker) synchronizeReplication(ctx context.Context) error {
sdw.SetState(WorkerStateSyncReplication)
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff.go:300")
defer cancel()
masterInfo, err := sdw.wr.TopoServer().GetTablet(shortCtx, sdw.shardInfo.MasterAlias)
if err != nil {
@@ -306,7 +307,7 @@ func (sdw *SplitDiffWorker) synchronizeReplication(ctx context.Context) error {
// 1 - stop the master binlog replication, get its current position
sdw.wr.Logger().Infof("Stopping master binlog replication on %v", sdw.shardInfo.MasterAlias)
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff.go:309")
defer cancel()
_, err = sdw.wr.TabletManagerClient().VReplicationExec(shortCtx, masterInfo.Tablet, binlogplayer.StopVReplication(sdw.sourceShard.Uid, "for split diff"))
if err != nil {
@@ -331,7 +332,7 @@ func (sdw *SplitDiffWorker) synchronizeReplication(ctx context.Context) error {
return err
}
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff.go:334")
defer cancel()
mysqlPos, err := sdw.wr.TabletManagerClient().StopSlaveMinimum(shortCtx, sourceTablet.Tablet, vreplicationPos, *remoteActionsTimeout)
if err != nil {
@@ -345,7 +346,7 @@ func (sdw *SplitDiffWorker) synchronizeReplication(ctx context.Context) error {
// 3 - ask the master of the destination shard to resume filtered
// replication up to the new list of positions
sdw.wr.Logger().Infof("Restarting master %v until it catches up to %v", sdw.shardInfo.MasterAlias, mysqlPos)
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff.go:348")
defer cancel()
_, err = sdw.wr.TabletManagerClient().VReplicationExec(shortCtx, masterInfo.Tablet, binlogplayer.StartVReplicationUntil(sdw.sourceShard.Uid, mysqlPos))
if err != nil {
@@ -362,13 +363,13 @@ func (sdw *SplitDiffWorker) synchronizeReplication(ctx context.Context) error {
// 4 - wait until the destination tablet is equal or passed
// that master binlog position, and stop its replication.
sdw.wr.Logger().Infof("Waiting for destination tablet %v to catch up to %v", sdw.destinationAlias, masterPos)
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff.go:365")
defer cancel()
destinationTablet, err := sdw.wr.TopoServer().GetTablet(shortCtx, sdw.destinationAlias)
if err != nil {
return err
}
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff.go:371")
defer cancel()
if _, err = sdw.wr.TabletManagerClient().StopSlaveMinimum(shortCtx, destinationTablet.Tablet, masterPos, *remoteActionsTimeout); err != nil {
return vterrors.Wrapf(err, "StopSlaveMinimum for %v at %v failed", sdw.destinationAlias, masterPos)
@@ -377,7 +378,7 @@ func (sdw *SplitDiffWorker) synchronizeReplication(ctx context.Context) error {
// 5 - restart filtered replication on destination master
sdw.wr.Logger().Infof("Restarting filtered replication on master %v", sdw.shardInfo.MasterAlias)
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff.go:380")
defer cancel()
if _, err = sdw.wr.TabletManagerClient().VReplicationExec(shortCtx, masterInfo.Tablet, binlogplayer.StartVReplication(sdw.sourceShard.Uid)); err != nil {
return vterrors.Wrapf(err, "VReplicationExec(start) failed for %v", sdw.shardInfo.MasterAlias)
@@ -400,7 +401,7 @@ func (sdw *SplitDiffWorker) diff(ctx context.Context) error {
wg.Add(1)
go func() {
var err error
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff.go:403")
sdw.destinationSchemaDefinition, err = sdw.wr.GetSchema(
shortCtx, sdw.destinationAlias, nil /* tables */, sdw.excludeTables, false /* includeViews */)
cancel()
@@ -413,7 +414,7 @@ func (sdw *SplitDiffWorker) diff(ctx context.Context) error {
wg.Add(1)
go func() {
var err error
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff.go:416")
sdw.sourceSchemaDefinition, err = sdw.wr.GetSchema(
shortCtx, sdw.sourceAlias, nil /* tables */, sdw.excludeTables, false /* includeViews */)
cancel()
diff --git a/go/vt/worker/split_diff_cmd.go b/go/vt/worker/split_diff_cmd.go
index 5ece6f5dc..bd52efae6 100644
--- a/go/vt/worker/split_diff_cmd.go
+++ b/go/vt/worker/split_diff_cmd.go
@@ -25,6 +25,7 @@ import (
"strings"
"sync"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
@@ -114,7 +115,7 @@ func commandSplitDiff(wi *Instance, wr *wrangler.Wrangler, subFlags *flag.FlagSe
// shardsWithSources returns all the shards that have SourceShards set
// with no Tables list.
func shardsWithSources(ctx context.Context, wr *wrangler.Wrangler) ([]map[string]string, error) {
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff_cmd.go:117")
keyspaces, err := wr.TopoServer().GetKeyspaces(shortCtx)
cancel()
if err != nil {
@@ -129,7 +130,7 @@ func shardsWithSources(ctx context.Context, wr *wrangler.Wrangler) ([]map[string
wg.Add(1)
go func(keyspace string) {
defer wg.Done()
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff_cmd.go:132")
shards, err := wr.TopoServer().GetShardNames(shortCtx, keyspace)
cancel()
if err != nil {
@@ -140,7 +141,7 @@ func shardsWithSources(ctx context.Context, wr *wrangler.Wrangler) ([]map[string
wg.Add(1)
go func(keyspace, shard string) {
defer wg.Done()
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff_cmd.go:143")
si, err := wr.TopoServer().GetShard(shortCtx, keyspace, shard)
cancel()
if err != nil {
diff --git a/go/vt/worker/tablet_provider.go b/go/vt/worker/tablet_provider.go
index 2d42bcb7f..47a7b950b 100644
--- a/go/vt/worker/tablet_provider.go
+++ b/go/vt/worker/tablet_provider.go
@@ -19,6 +19,7 @@ package worker
import (
"fmt"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/vterrors"
"golang.org/x/net/context"
@@ -57,7 +58,7 @@ func newSingleTabletProvider(ctx context.Context, ts *topo.Server, alias *topoda
}
func (p *singleTabletProvider) getTablet() (*topodatapb.Tablet, error) {
- shortCtx, cancel := context.WithTimeout(p.ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(p.ctx, *remoteActionsTimeout, "./go/vt/worker/tablet_provider.go:60")
tablet, err := p.ts.GetTablet(shortCtx, p.alias)
cancel()
if err != nil {
diff --git a/go/vt/worker/topo_utils.go b/go/vt/worker/topo_utils.go
index 4dcc52a1d..aa7b4a182 100644
--- a/go/vt/worker/topo_utils.go
+++ b/go/vt/worker/topo_utils.go
@@ -22,6 +22,7 @@ import (
"math/rand"
"time"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/vterrors"
"golang.org/x/net/context"
@@ -68,7 +69,7 @@ func FindHealthyTablet(ctx context.Context, wr *wrangler.Wrangler, tsc *discover
}
func waitForHealthyTablets(ctx context.Context, wr *wrangler.Wrangler, tsc *discovery.TabletStatsCache, cell, keyspace, shard string, minHealthyRdonlyTablets int, timeout time.Duration, tabletType topodatapb.TabletType) ([]discovery.TabletStats, error) {
- busywaitCtx, busywaitCancel := context.WithTimeout(ctx, timeout)
+ busywaitCtx, busywaitCancel := context2.WithTimeout(ctx, timeout, "./go/vt/worker/topo_utils.go:71")
defer busywaitCancel()
start := time.Now()
@@ -122,7 +123,7 @@ func FindWorkerTablet(ctx context.Context, wr *wrangler.Wrangler, cleaner *wrang
}
wr.Logger().Infof("Changing tablet %v to '%v'", topoproto.TabletAliasString(tabletAlias), topodatapb.TabletType_DRAINED)
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/topo_utils.go:125")
err = wr.ChangeSlaveType(shortCtx, tabletAlias, topodatapb.TabletType_DRAINED)
cancel()
if err != nil {
@@ -131,7 +132,7 @@ func FindWorkerTablet(ctx context.Context, wr *wrangler.Wrangler, cleaner *wrang
ourURL := servenv.ListeningURL.String()
wr.Logger().Infof("Adding tag[worker]=%v to tablet %v", ourURL, topoproto.TabletAliasString(tabletAlias))
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/topo_utils.go:134")
_, err = wr.TopoServer().UpdateTabletFields(shortCtx, tabletAlias, func(tablet *topodatapb.Tablet) error {
if tablet.Tags == nil {
tablet.Tags = make(map[string]string)
@@ -154,7 +155,7 @@ func FindWorkerTablet(ctx context.Context, wr *wrangler.Wrangler, cleaner *wrang
wrangler.RecordChangeSlaveTypeAction(cleaner, tabletAlias, topodatapb.TabletType_DRAINED, tabletType)
// We refresh the destination vttablet reloads the worker URL when it reloads the tablet.
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/topo_utils.go:157")
wr.RefreshTabletState(shortCtx, tabletAlias)
if err != nil {
return nil, err
diff --git a/go/vt/worker/utils_test.go b/go/vt/worker/utils_test.go
index 98bdd416e..6dbe8a9dc 100644
--- a/go/vt/worker/utils_test.go
+++ b/go/vt/worker/utils_test.go
@@ -22,6 +22,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/mysql/fakesqldb"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/topo"
@@ -39,7 +40,7 @@ import (
func runCommand(t *testing.T, wi *Instance, wr *wrangler.Wrangler, args []string) error {
// Limit the scope of the context e.g. to implicitly terminate stray Go
// routines.
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/worker/utils_test.go:42")
defer cancel()
worker, done, err := wi.RunCommand(ctx, args, wr, false /* runFromCli */)
diff --git a/go/vt/worker/vertical_split_clone_cmd.go b/go/vt/worker/vertical_split_clone_cmd.go
index 7045fb3e6..0298ca959 100644
--- a/go/vt/worker/vertical_split_clone_cmd.go
+++ b/go/vt/worker/vertical_split_clone_cmd.go
@@ -25,6 +25,7 @@ import (
"strings"
"sync"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/vterrors"
@@ -152,7 +153,7 @@ func commandVerticalSplitClone(wi *Instance, wr *wrangler.Wrangler, subFlags *fl
// keyspacesWithServedFrom returns all the keyspaces that have ServedFrom set
// to one value.
func keyspacesWithServedFrom(ctx context.Context, wr *wrangler.Wrangler) ([]string, error) {
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_clone_cmd.go:155")
keyspaces, err := wr.TopoServer().GetKeyspaces(shortCtx)
cancel()
if err != nil {
@@ -167,7 +168,7 @@ func keyspacesWithServedFrom(ctx context.Context, wr *wrangler.Wrangler) ([]stri
wg.Add(1)
go func(keyspace string) {
defer wg.Done()
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_clone_cmd.go:170")
ki, err := wr.TopoServer().GetKeyspace(shortCtx, keyspace)
cancel()
if err != nil {
@@ -288,7 +289,7 @@ func interactiveVerticalSplitClone(ctx context.Context, wi *Instance, wr *wrangl
}
// Figure out the shard
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_clone_cmd.go:291")
shardMap, err := wr.TopoServer().FindAllShardsInKeyspace(shortCtx, keyspace)
cancel()
if err != nil {
diff --git a/go/vt/worker/vertical_split_diff.go b/go/vt/worker/vertical_split_diff.go
index fb5a8e3a0..a99c8ae95 100644
--- a/go/vt/worker/vertical_split_diff.go
+++ b/go/vt/worker/vertical_split_diff.go
@@ -21,6 +21,7 @@ import (
"html/template"
"sync"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/vterrors"
"golang.org/x/net/context"
@@ -261,7 +262,7 @@ func (vsdw *VerticalSplitDiffWorker) findTargets(ctx context.Context) error {
func (vsdw *VerticalSplitDiffWorker) synchronizeReplication(ctx context.Context) error {
vsdw.SetState(WorkerStateSyncReplication)
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_diff.go:264")
defer cancel()
masterInfo, err := vsdw.wr.TopoServer().GetTablet(shortCtx, vsdw.shardInfo.MasterAlias)
if err != nil {
@@ -272,7 +273,7 @@ func (vsdw *VerticalSplitDiffWorker) synchronizeReplication(ctx context.Context)
// 1 - stop the master binlog replication, get its current position
vsdw.wr.Logger().Infof("Stopping master binlog replication on %v", topoproto.TabletAliasString(vsdw.shardInfo.MasterAlias))
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_diff.go:275")
defer cancel()
_, err = vsdw.wr.TabletManagerClient().VReplicationExec(shortCtx, masterInfo.Tablet, binlogplayer.StopVReplication(ss.Uid, "for split diff"))
if err != nil {
@@ -291,7 +292,7 @@ func (vsdw *VerticalSplitDiffWorker) synchronizeReplication(ctx context.Context)
// stop replication
vsdw.wr.Logger().Infof("Stopping slave %v at a minimum of %v", topoproto.TabletAliasString(vsdw.sourceAlias), vreplicationPos)
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_diff.go:294")
defer cancel()
sourceTablet, err := vsdw.wr.TopoServer().GetTablet(shortCtx, vsdw.sourceAlias)
if err != nil {
@@ -309,7 +310,7 @@ func (vsdw *VerticalSplitDiffWorker) synchronizeReplication(ctx context.Context)
// 3 - ask the master of the destination shard to resume filtered
// replication up to the new list of positions
vsdw.wr.Logger().Infof("Restarting master %v until it catches up to %v", topoproto.TabletAliasString(vsdw.shardInfo.MasterAlias), mysqlPos)
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_diff.go:312")
defer cancel()
_, err = vsdw.wr.TabletManagerClient().VReplicationExec(shortCtx, masterInfo.Tablet, binlogplayer.StartVReplicationUntil(ss.Uid, mysqlPos))
if err != nil {
@@ -326,13 +327,13 @@ func (vsdw *VerticalSplitDiffWorker) synchronizeReplication(ctx context.Context)
// 4 - wait until the destination tablet is equal or passed
// that master binlog position, and stop its replication.
vsdw.wr.Logger().Infof("Waiting for destination tablet %v to catch up to %v", topoproto.TabletAliasString(vsdw.destinationAlias), masterPos)
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_diff.go:329")
defer cancel()
destinationTablet, err := vsdw.wr.TopoServer().GetTablet(shortCtx, vsdw.destinationAlias)
if err != nil {
return err
}
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_diff.go:335")
defer cancel()
_, err = vsdw.wr.TabletManagerClient().StopSlaveMinimum(shortCtx, destinationTablet.Tablet, masterPos, *remoteActionsTimeout)
if err != nil {
@@ -342,7 +343,7 @@ func (vsdw *VerticalSplitDiffWorker) synchronizeReplication(ctx context.Context)
// 5 - restart filtered replication on destination master
vsdw.wr.Logger().Infof("Restarting filtered replication on master %v", topoproto.TabletAliasString(vsdw.shardInfo.MasterAlias))
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_diff.go:345")
defer cancel()
if _, err = vsdw.wr.TabletManagerClient().VReplicationExec(shortCtx, masterInfo.Tablet, binlogplayer.StartVReplication(ss.Uid)); err != nil {
return vterrors.Wrapf(err, "VReplicationExec(start) failed for %v", vsdw.shardInfo.MasterAlias)
@@ -365,7 +366,7 @@ func (vsdw *VerticalSplitDiffWorker) diff(ctx context.Context) error {
wg.Add(1)
go func() {
var err error
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_diff.go:368")
vsdw.destinationSchemaDefinition, err = vsdw.wr.GetSchema(
shortCtx, vsdw.destinationAlias, vsdw.shardInfo.SourceShards[0].Tables, nil /* excludeTables */, false /* includeViews */)
cancel()
@@ -378,7 +379,7 @@ func (vsdw *VerticalSplitDiffWorker) diff(ctx context.Context) error {
wg.Add(1)
go func() {
var err error
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_diff.go:381")
vsdw.sourceSchemaDefinition, err = vsdw.wr.GetSchema(
shortCtx, vsdw.sourceAlias, vsdw.shardInfo.SourceShards[0].Tables, nil /* excludeTables */, false /* includeViews */)
cancel()
diff --git a/go/vt/worker/vertical_split_diff_cmd.go b/go/vt/worker/vertical_split_diff_cmd.go
index e4cfd4d34..cc997b9d1 100644
--- a/go/vt/worker/vertical_split_diff_cmd.go
+++ b/go/vt/worker/vertical_split_diff_cmd.go
@@ -24,6 +24,7 @@ import (
"strconv"
"sync"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/vterrors"
"golang.org/x/net/context"
@@ -101,7 +102,7 @@ func commandVerticalSplitDiff(wi *Instance, wr *wrangler.Wrangler, subFlags *fla
// shardsWithTablesSources returns all the shards that have SourceShards set
// to one value, with an array of Tables.
func shardsWithTablesSources(ctx context.Context, wr *wrangler.Wrangler) ([]map[string]string, error) {
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_diff_cmd.go:104")
keyspaces, err := wr.TopoServer().GetKeyspaces(shortCtx)
cancel()
if err != nil {
@@ -116,7 +117,7 @@ func shardsWithTablesSources(ctx context.Context, wr *wrangler.Wrangler) ([]map[
wg.Add(1)
go func(keyspace string) {
defer wg.Done()
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_diff_cmd.go:119")
shards, err := wr.TopoServer().GetShardNames(shortCtx, keyspace)
cancel()
if err != nil {
@@ -127,7 +128,7 @@ func shardsWithTablesSources(ctx context.Context, wr *wrangler.Wrangler) ([]map[
wg.Add(1)
go func(keyspace, shard string) {
defer wg.Done()
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_diff_cmd.go:130")
si, err := wr.TopoServer().GetShard(shortCtx, keyspace, shard)
cancel()
if err != nil {
diff --git a/go/vt/worker/vtworkerclienttest/client_testsuite.go b/go/vt/worker/vtworkerclienttest/client_testsuite.go
index b5ff551d5..1ac4b3bff 100644
--- a/go/vt/worker/vtworkerclienttest/client_testsuite.go
+++ b/go/vt/worker/vtworkerclienttest/client_testsuite.go
@@ -34,6 +34,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/topo/memorytopo"
"vitess.io/vitess/go/vt/vterrors"
@@ -123,7 +124,7 @@ func runVtworkerCommand(client vtworkerclient.Client, args []string) error {
func commandErrorsBecauseBusy(t *testing.T, client vtworkerclient.Client, serverSideCancelation bool) {
// Run the vtworker "Block" command which blocks until we cancel the context.
var wg sync.WaitGroup
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/worker/vtworkerclienttest/client_testsuite.go:126")
// blockCommandStarted will be closed after we're sure that vtworker is
// running the "Block" command.
blockCommandStarted := make(chan struct{})
diff --git a/go/vt/workflow/manager.go b/go/vt/workflow/manager.go
index f3f94d39f..c4529a8bd 100644
--- a/go/vt/workflow/manager.go
+++ b/go/vt/workflow/manager.go
@@ -26,6 +26,7 @@ import (
gouuid "github.com/pborman/uuid"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"
@@ -352,7 +353,7 @@ func (m *Manager) Start(ctx context.Context, uuid string) error {
func (m *Manager) runWorkflow(rw *runningWorkflow) {
// Create a context to run it.
var ctx context.Context
- ctx, rw.cancel = context.WithCancel(m.ctx)
+ ctx, rw.cancel = context2.WithCancel(m.ctx, "./go/vt/workflow/manager.go:355")
// And run it in the background.
go m.executeWorkflowRun(ctx, rw)
@@ -589,7 +590,7 @@ func AvailableFactories() map[string]bool {
// StartManager starts a manager. This function should only be used for tests purposes.
func StartManager(m *Manager) (*sync.WaitGroup, context.Context, context.CancelFunc) {
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/workflow/manager.go:592")
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
diff --git a/go/vt/wrangler/cleaner.go b/go/vt/wrangler/cleaner.go
index e40b9c08d..837d20a88 100644
--- a/go/vt/wrangler/cleaner.go
+++ b/go/vt/wrangler/cleaner.go
@@ -22,6 +22,7 @@ import (
"time"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
@@ -96,7 +97,7 @@ type cleanUpHelper struct {
// basis. They are then serialized on each target.
func (cleaner *Cleaner) CleanUp(wr *Wrangler) error {
// we use a background context so we're not dependent on the original context timeout
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
+ ctx, cancel := context2.WithTimeout(context.Background(), 5*time.Minute, "./go/vt/wrangler/cleaner.go:99")
actionMap := make(map[string]*cleanUpHelper)
rec := concurrency.AllErrorRecorder{}
cleaner.mu.Lock()
diff --git a/go/vt/wrangler/fake_tablet_test.go b/go/vt/wrangler/fake_tablet_test.go
index 85adabf6c..fc5ada634 100644
--- a/go/vt/wrangler/fake_tablet_test.go
+++ b/go/vt/wrangler/fake_tablet_test.go
@@ -25,6 +25,7 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/mysql/fakesqldb"
"vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon"
querypb "vitess.io/vitess/go/vt/proto/query"
@@ -183,7 +184,7 @@ func (ft *fakeTablet) StartActionLoop(t *testing.T, wr *Wrangler) {
step := 10 * time.Millisecond
c := tmclient.NewTabletManagerClient()
for timeout >= 0 {
- ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
+ ctx, cancel := context2.WithTimeout(context.Background(), 1*time.Second, "./go/vt/wrangler/fake_tablet_test.go:186")
err := c.Ping(ctx, ft.Agent.Tablet())
cancel()
if err == nil {
diff --git a/go/vt/wrangler/keyspace.go b/go/vt/wrangler/keyspace.go
index b31f20d83..6d64cb55b 100644
--- a/go/vt/wrangler/keyspace.go
+++ b/go/vt/wrangler/keyspace.go
@@ -25,6 +25,7 @@ import (
"time"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/event"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
@@ -495,7 +496,7 @@ func (wr *Wrangler) waitForFilteredReplication(ctx context.Context, sourcePositi
wg.Add(1)
go func(si *topo.ShardInfo) {
defer wg.Done()
- ctx, cancel := context.WithTimeout(ctx, waitTime)
+ ctx, cancel := context2.WithTimeout(ctx, waitTime, "./go/vt/wrangler/keyspace.go:498")
defer cancel()
var pos string
@@ -1231,7 +1232,7 @@ func (wr *Wrangler) replicaMigrateServedFrom(ctx context.Context, ki *topo.Keysp
// replication and starts accepting writes
func (wr *Wrangler) masterMigrateServedFrom(ctx context.Context, ki *topo.KeyspaceInfo, sourceShard *topo.ShardInfo, destinationShard *topo.ShardInfo, tables []string, ev *events.MigrateServedFrom, filteredReplicationWaitTime time.Duration) error {
// Read the data we need
- ctx, cancel := context.WithTimeout(ctx, filteredReplicationWaitTime)
+ ctx, cancel := context2.WithTimeout(ctx, filteredReplicationWaitTime, "./go/vt/wrangler/keyspace.go:1234")
defer cancel()
sourceMasterTabletInfo, err := wr.ts.GetTablet(ctx, sourceShard.MasterAlias)
if err != nil {
@@ -1356,7 +1357,7 @@ func (wr *Wrangler) RefreshTabletsByShard(ctx context.Context, si *topo.ShardInf
// Setting an upper bound timeout to fail faster in case of an error.
// Using 60 seconds because RefreshState should not take more than 30 seconds.
// (RefreshState will restart the tablet's QueryService and most time will be spent on the shutdown, i.e. waiting up to 30 seconds on transactions (see Config.TransactionTimeout)).
- ctx, cancel := context.WithTimeout(ctx, 60*time.Second)
+ ctx, cancel := context2.WithTimeout(ctx, 60*time.Second, "./go/vt/wrangler/keyspace.go:1359")
if err := wr.tmc.RefreshState(ctx, ti.Tablet); err != nil {
wr.Logger().Warningf("RefreshTabletsByShard: failed to refresh %v: %v", ti.AliasString(), err)
}
diff --git a/go/vt/wrangler/migrater.go b/go/vt/wrangler/migrater.go
index 0f5c368a2..91c7ed462 100644
--- a/go/vt/wrangler/migrater.go
+++ b/go/vt/wrangler/migrater.go
@@ -28,6 +28,7 @@ import (
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/sync2"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
@@ -505,7 +506,7 @@ func (mi *migrater) changeTableSourceWrites(ctx context.Context, access accessTy
}
func (mi *migrater) waitForCatchup(ctx context.Context, filteredReplicationWaitTime time.Duration) error {
- ctx, cancel := context.WithTimeout(ctx, filteredReplicationWaitTime)
+ ctx, cancel := context2.WithTimeout(ctx, filteredReplicationWaitTime, "./go/vt/wrangler/migrater.go:508")
defer cancel()
var mu sync.Mutex
diff --git a/go/vt/wrangler/reparent.go b/go/vt/wrangler/reparent.go
index 06b99e21f..9e752e1f1 100644
--- a/go/vt/wrangler/reparent.go
+++ b/go/vt/wrangler/reparent.go
@@ -26,6 +26,7 @@ import (
"sync"
"time"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/event"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqlescape"
@@ -206,7 +207,7 @@ func (wr *Wrangler) initShardMasterLocked(ctx context.Context, ev *events.Repare
// at a wrong replication spot.
// Create a context for the following RPCs that respects waitSlaveTimeout
- resetCtx, resetCancel := context.WithTimeout(ctx, waitSlaveTimeout)
+ resetCtx, resetCancel := context2.WithTimeout(ctx, waitSlaveTimeout, "./go/vt/wrangler/reparent.go:209")
defer resetCancel()
event.DispatchUpdate(ev, "resetting replication on all tablets")
@@ -249,7 +250,7 @@ func (wr *Wrangler) initShardMasterLocked(ctx context.Context, ev *events.Repare
// Create a cancelable context for the following RPCs.
// If error conditions happen, we can cancel all outgoing RPCs.
- replCtx, replCancel := context.WithTimeout(ctx, waitSlaveTimeout)
+ replCtx, replCancel := context2.WithTimeout(ctx, waitSlaveTimeout, "./go/vt/wrangler/reparent.go:252")
defer replCancel()
// Now tell the new master to insert the reparent_journal row,
@@ -414,7 +415,7 @@ func (wr *Wrangler) plannedReparentShardLocked(ctx context.Context, ev *events.R
ev.OldMaster = *oldMasterTabletInfo.Tablet
// create a new context for the short running remote operations
- remoteCtx, remoteCancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
+ remoteCtx, remoteCancel := context2.WithTimeout(ctx, *topo.RemoteOperationTimeout, "./go/vt/wrangler/reparent.go:417")
defer remoteCancel()
// Demote the current master, get its replication position
@@ -425,7 +426,7 @@ func (wr *Wrangler) plannedReparentShardLocked(ctx context.Context, ev *events.R
return fmt.Errorf("old master tablet %v DemoteMaster failed: %v", topoproto.TabletAliasString(shardInfo.MasterAlias), err)
}
- remoteCtx, remoteCancel = context.WithTimeout(ctx, waitSlaveTimeout)
+ remoteCtx, remoteCancel = context2.WithTimeout(ctx, waitSlaveTimeout, "./go/vt/wrangler/reparent.go:428")
defer remoteCancel()
// Wait on the master-elect tablet until it reaches that position,
@@ -436,7 +437,7 @@ func (wr *Wrangler) plannedReparentShardLocked(ctx context.Context, ev *events.R
if err != nil || (ctx.Err() != nil && ctx.Err() == context.DeadlineExceeded) {
remoteCancel()
// if this fails it is not enough to return an error. we should rollback all the changes made by DemoteMaster
- remoteCtx, remoteCancel = context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
+ remoteCtx, remoteCancel = context2.WithTimeout(ctx, *topo.RemoteOperationTimeout, "./go/vt/wrangler/reparent.go:439")
defer remoteCancel()
if err1 := wr.tmc.UndoDemoteMaster(remoteCtx, oldMasterTabletInfo.Tablet); err1 != nil {
log.Warningf("Encountered error %v while trying to undo DemoteMaster", err1)
@@ -451,7 +452,7 @@ func (wr *Wrangler) plannedReparentShardLocked(ctx context.Context, ev *events.R
// Create a cancelable context for the following RPCs.
// If error conditions happen, we can cancel all outgoing RPCs.
- replCtx, replCancel := context.WithTimeout(ctx, waitSlaveTimeout)
+ replCtx, replCancel := context2.WithTimeout(ctx, waitSlaveTimeout, "./go/vt/wrangler/reparent.go:454")
defer replCancel()
// Go through all the tablets:
@@ -533,7 +534,7 @@ func (maxPosSearch *maxReplPosSearch) processTablet(tablet *topodatapb.Tablet) {
defer maxPosSearch.waitGroup.Done()
maxPosSearch.wrangler.logger.Infof("getting replication position from %v", topoproto.TabletAliasString(tablet.Alias))
- slaveStatusCtx, cancelSlaveStatus := context.WithTimeout(maxPosSearch.ctx, maxPosSearch.waitSlaveTimeout)
+ slaveStatusCtx, cancelSlaveStatus := context2.WithTimeout(maxPosSearch.ctx, maxPosSearch.waitSlaveTimeout, "./go/vt/wrangler/reparent.go:536")
defer cancelSlaveStatus()
status, err := maxPosSearch.wrangler.tmc.SlaveStatus(slaveStatusCtx, tablet)
@@ -668,7 +669,7 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events
ev.OldMaster = *oldMasterTabletInfo.Tablet
wr.logger.Infof("deleting old master %v", shardInfoMasterAliasStr)
- ctx, cancel := context.WithTimeout(ctx, waitSlaveTimeout)
+ ctx, cancel := context2.WithTimeout(ctx, waitSlaveTimeout, "./go/vt/wrangler/reparent.go:671")
defer cancel()
if err := topotools.DeleteTablet(ctx, wr.ts, oldMasterTabletInfo.Tablet); err != nil {
@@ -688,7 +689,7 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events
go func(alias string, tabletInfo *topo.TabletInfo) {
defer wg.Done()
wr.logger.Infof("getting replication position from %v", alias)
- ctx, cancel := context.WithTimeout(ctx, waitSlaveTimeout)
+ ctx, cancel := context2.WithTimeout(ctx, waitSlaveTimeout, "./go/vt/wrangler/reparent.go:691")
defer cancel()
rp, err := wr.tmc.StopReplicationAndGetStatus(ctx, tabletInfo.Tablet)
if err != nil {
@@ -744,7 +745,7 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events
// Create a cancelable context for the following RPCs.
// If error conditions happen, we can cancel all outgoing RPCs.
- replCtx, replCancel := context.WithCancel(ctx)
+ replCtx, replCancel := context2.WithCancel(ctx, "./go/vt/wrangler/reparent.go:747")
defer replCancel()
// Reset replication on all slaves to point to the new master, and
diff --git a/go/vt/wrangler/schema.go b/go/vt/wrangler/schema.go
index fd0411727..ec9201196 100644
--- a/go/vt/wrangler/schema.go
+++ b/go/vt/wrangler/schema.go
@@ -26,6 +26,7 @@ import (
"golang.org/x/net/context"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/sync2"
"vitess.io/vitess/go/vt/concurrency"
@@ -359,7 +360,7 @@ func (wr *Wrangler) CopySchemaShard(ctx context.Context, sourceTabletAlias *topo
// Notify slaves to reload schema. This is best-effort.
concurrency := sync2.NewSemaphore(10, 0)
- reloadCtx, cancel := context.WithTimeout(ctx, waitSlaveTimeout)
+ reloadCtx, cancel := context2.WithTimeout(ctx, waitSlaveTimeout, "./go/vt/wrangler/schema.go:362")
defer cancel()
wr.ReloadSchemaShard(reloadCtx, destKeyspace, destShard, destMasterPos, concurrency, true /* includeMaster */)
return nil
@@ -435,7 +436,7 @@ func (wr *Wrangler) applySQLShard(ctx context.Context, tabletInfo *topo.TabletIn
if err != nil {
return fmt.Errorf("fillStringTemplate failed: %v", err)
}
- ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
+ ctx, cancel := context2.WithTimeout(ctx, 30*time.Second, "./go/vt/wrangler/schema.go:438")
defer cancel()
// Need to make sure that we enable binlog, since we're only applying the statement on masters.
_, err = wr.tmc.ExecuteFetchAsDba(ctx, tabletInfo.Tablet, false, []byte(filledChange), 0, false, reloadSchema)
diff --git a/go/vt/wrangler/testlib/fake_tablet.go b/go/vt/wrangler/testlib/fake_tablet.go
index 75766e27d..557ee2ad0 100644
--- a/go/vt/wrangler/testlib/fake_tablet.go
+++ b/go/vt/wrangler/testlib/fake_tablet.go
@@ -30,6 +30,7 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc"
+ "vitess.io/vitess/go/context2"
"vitess.io/vitess/go/mysql/fakesqldb"
"vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon"
"vitess.io/vitess/go/vt/topo"
@@ -206,7 +207,7 @@ func (ft *FakeTablet) StartActionLoop(t *testing.T, wr *wrangler.Wrangler) {
step := 10 * time.Millisecond
c := tmclient.NewTabletManagerClient()
for timeout >= 0 {
- ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
+ ctx, cancel := context2.WithTimeout(context.Background(), 1*time.Second, "./go/vt/wrangler/testlib/fake_tablet.go:209")
err := c.Ping(ctx, ft.Agent.Tablet())
cancel()
if err == nil {
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment