Created
September 17, 2019 23:32
-
-
Save dasl-/65aa3d25c03133e500671202a9aa06df to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/go/cmd/mysqlctl/mysqlctl.go b/go/cmd/mysqlctl/mysqlctl.go | |
index da7eec12d..30aa20ecd 100644 | |
--- a/go/cmd/mysqlctl/mysqlctl.go | |
+++ b/go/cmd/mysqlctl/mysqlctl.go | |
@@ -25,6 +25,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/exit" | |
"vitess.io/vitess/go/flagutil" | |
"vitess.io/vitess/go/mysql" | |
@@ -71,7 +72,7 @@ func initCmd(subFlags *flag.FlagSet, args []string) error { | |
} | |
defer mysqld.Close() | |
- ctx, cancel := context.WithTimeout(context.Background(), *waitTime) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), *waitTime, "./go/cmd/mysqlctl/mysqlctl.go:74") | |
defer (func() { log.Infof("ctx cancel at go/cmd/mysqlctl/mysqlctl.go:75\n"); cancel() })() | |
if err := mysqld.Init(ctx, cnf, *initDBSQLFile); err != nil { | |
return fmt.Errorf("failed init mysql: %v", err) | |
@@ -104,7 +105,7 @@ func shutdownCmd(subFlags *flag.FlagSet, args []string) error { | |
} | |
defer mysqld.Close() | |
- ctx, cancel := context.WithTimeout(context.Background(), *waitTime) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), *waitTime, "./go/cmd/mysqlctl/mysqlctl.go:107") | |
defer (func() { log.Infof("ctx cancel at go/cmd/mysqlctl/mysqlctl.go:108\n"); cancel() })() | |
if err := mysqld.Shutdown(ctx, cnf, true); err != nil { | |
return fmt.Errorf("failed shutdown mysql: %v", err) | |
@@ -125,7 +126,7 @@ func startCmd(subFlags *flag.FlagSet, args []string) error { | |
} | |
defer mysqld.Close() | |
- ctx, cancel := context.WithTimeout(context.Background(), *waitTime) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), *waitTime, "./go/cmd/mysqlctl/mysqlctl.go:128") | |
defer (func() { log.Infof("ctx cancel at go/cmd/mysqlctl/mysqlctl.go:129\n"); cancel() })() | |
if err := mysqld.Start(ctx, cnf, mysqldArgs...); err != nil { | |
return fmt.Errorf("failed start mysql: %v", err) | |
@@ -145,7 +146,7 @@ func teardownCmd(subFlags *flag.FlagSet, args []string) error { | |
} | |
defer mysqld.Close() | |
- ctx, cancel := context.WithTimeout(context.Background(), *waitTime) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), *waitTime, "./go/cmd/mysqlctl/mysqlctl.go:148") | |
defer (func() { log.Infof("ctx cancel at go/cmd/mysqlctl/mysqlctl.go:149\n"); cancel() })() | |
if err := mysqld.Teardown(ctx, cnf, *force); err != nil { | |
return fmt.Errorf("failed teardown mysql (forced? %v): %v", *force, err) | |
diff --git a/go/cmd/mysqlctld/mysqlctld.go b/go/cmd/mysqlctld/mysqlctld.go | |
index 5c17ea813..8cafcb6fd 100644 | |
--- a/go/cmd/mysqlctld/mysqlctld.go | |
+++ b/go/cmd/mysqlctld/mysqlctld.go | |
@@ -25,6 +25,7 @@ import ( | |
"time" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/exit" | |
"vitess.io/vitess/go/vt/dbconfigs" | |
"vitess.io/vitess/go/vt/log" | |
@@ -68,7 +69,7 @@ func main() { | |
} | |
// Start or Init mysqld as needed. | |
- ctx, cancel := context.WithTimeout(context.Background(), *waitTime) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), *waitTime, "./go/cmd/mysqlctld/mysqlctld.go:71") | |
mycnfFile := mysqlctl.MycnfFile(uint32(*tabletUID)) | |
if _, statErr := os.Stat(mycnfFile); os.IsNotExist(statErr) { | |
// Generate my.cnf from scratch and use it to find mysqld. | |
diff --git a/go/cmd/vtbackup/vtbackup.go b/go/cmd/vtbackup/vtbackup.go | |
index 0caee687e..4da1f6089 100644 | |
--- a/go/cmd/vtbackup/vtbackup.go | |
+++ b/go/cmd/vtbackup/vtbackup.go | |
@@ -68,6 +68,7 @@ import ( | |
"strings" | |
"time" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/exit" | |
"vitess.io/vitess/go/mysql" | |
"vitess.io/vitess/go/sqlescape" | |
@@ -200,7 +201,7 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back | |
if err != nil { | |
return fmt.Errorf("failed to initialize mysql config: %v", err) | |
} | |
- initCtx, initCancel := context.WithTimeout(ctx, *mysqlTimeout) | |
+ initCtx, initCancel := context2.WithTimeout(ctx, *mysqlTimeout, "./go/cmd/vtbackup/vtbackup.go:203") | |
defer initCancel() | |
if err := mysqld.Init(initCtx, mycnf, *initDBSQLFile); err != nil { | |
return fmt.Errorf("failed to initialize mysql data dir and start mysqld: %v", err) | |
@@ -209,7 +210,7 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back | |
defer func() { | |
// Be careful not to use the original context, because we don't want to | |
// skip shutdown just because we timed out waiting for other things. | |
- ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), 30*time.Second, "./go/cmd/vtbackup/vtbackup.go:212") | |
defer (func() { log.Infof("ctx cancel at go/cmd/vtbackup/vtbackup.go:213\n"); cancel() })() | |
mysqld.Shutdown(ctx, mycnf, false) | |
}() | |
diff --git a/go/cmd/vtbench/vtbench.go b/go/cmd/vtbench/vtbench.go | |
index af6392d87..14e917723 100644 | |
--- a/go/cmd/vtbench/vtbench.go | |
+++ b/go/cmd/vtbench/vtbench.go | |
@@ -23,6 +23,7 @@ import ( | |
"strings" | |
"time" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/exit" | |
"vitess.io/vitess/go/vt/dbconfigs" | |
"vitess.io/vitess/go/vt/log" | |
@@ -156,7 +157,7 @@ func main() { | |
b := vtbench.NewBench(*threads, *count, connParams, *sql) | |
- ctx, cancel := context.WithTimeout(context.Background(), *deadline) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), *deadline, "./go/cmd/vtbench/vtbench.go:159") | |
defer (func() { log.Infof("ctx cancel at go/cmd/vtbench/vtbench.go:160\n"); cancel() })() | |
fmt.Printf("Initializing test with %s protocol / %d threads / %d iterations\n", | |
diff --git a/go/cmd/vtclient/vtclient.go b/go/cmd/vtclient/vtclient.go | |
index 3da6f8738..01ae2540e 100644 | |
--- a/go/cmd/vtclient/vtclient.go | |
+++ b/go/cmd/vtclient/vtclient.go | |
@@ -30,6 +30,7 @@ import ( | |
"time" | |
"github.com/olekukonko/tablewriter" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/concurrency" | |
"vitess.io/vitess/go/vt/log" | |
"vitess.io/vitess/go/vt/logutil" | |
@@ -183,7 +184,7 @@ func run() (*results, error) { | |
log.Infof("Sending the query...") | |
- ctx, cancel := context.WithTimeout(context.Background(), *timeout) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), *timeout, "./go/cmd/vtclient/vtclient.go:186") | |
defer (func() { log.Infof("ctx cancel at go/cmd/vtclient/vtclient.go:187\n"); cancel() })() | |
return execMulti(ctx, db, args[0]) | |
} | |
diff --git a/go/cmd/vtctl/vtctl.go b/go/cmd/vtctl/vtctl.go | |
index 8365b9c0a..b8ebf5b4e 100644 | |
--- a/go/cmd/vtctl/vtctl.go | |
+++ b/go/cmd/vtctl/vtctl.go | |
@@ -27,6 +27,7 @@ import ( | |
"time" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/exit" | |
"vitess.io/vitess/go/trace" | |
"vitess.io/vitess/go/vt/log" | |
@@ -91,7 +92,7 @@ func main() { | |
vtctl.WorkflowManager = workflow.NewManager(ts) | |
- ctx, cancel := context.WithTimeout(context.Background(), *waitTime) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), *waitTime, "./go/cmd/vtctl/vtctl.go:94") | |
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient()) | |
installSignalHandlers(cancel) | |
diff --git a/go/cmd/vtctlclient/main.go b/go/cmd/vtctlclient/main.go | |
index d3b867754..3569d791a 100644 | |
--- a/go/cmd/vtctlclient/main.go | |
+++ b/go/cmd/vtctlclient/main.go | |
@@ -23,6 +23,7 @@ import ( | |
"time" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/exit" | |
"vitess.io/vitess/go/trace" | |
"vitess.io/vitess/go/vt/log" | |
@@ -55,7 +56,7 @@ func main() { | |
os.Exit(1) | |
} | |
- ctx, cancel := context.WithTimeout(context.Background(), *actionTimeout) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), *actionTimeout, "./go/cmd/vtctlclient/main.go:58") | |
defer (func() { log.Infof("ctx cancel at go/cmd/vtctlclient/main.go:59\n"); cancel() })() | |
err := vtctlclient.RunCommandAndWait( | |
diff --git a/go/cmd/vtworkerclient/vtworkerclient.go b/go/cmd/vtworkerclient/vtworkerclient.go | |
index 63e10fc58..737133ca5 100644 | |
--- a/go/cmd/vtworkerclient/vtworkerclient.go | |
+++ b/go/cmd/vtworkerclient/vtworkerclient.go | |
@@ -23,6 +23,7 @@ import ( | |
"syscall" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/log" | |
"vitess.io/vitess/go/vt/logutil" | |
"vitess.io/vitess/go/vt/worker/vtworkerclient" | |
@@ -37,7 +38,7 @@ var ( | |
func main() { | |
flag.Parse() | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/cmd/vtworkerclient/vtworkerclient.go:40") | |
defer (func() { log.Infof("ctx cancel at go/cmd/vtworkerclient/vtworkerclient.go:41\n"); cancel() })() | |
sigChan := make(chan os.Signal, 1) | |
signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT) | |
diff --git a/go/cmd/zk/zkcmd.go b/go/cmd/zk/zkcmd.go | |
index b615a69b1..895b39de2 100644 | |
--- a/go/cmd/zk/zkcmd.go | |
+++ b/go/cmd/zk/zkcmd.go | |
@@ -36,6 +36,7 @@ import ( | |
"golang.org/x/crypto/ssh/terminal" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/exit" | |
"vitess.io/vitess/go/vt/log" | |
"vitess.io/vitess/go/vt/logutil" | |
@@ -157,7 +158,7 @@ func main() { | |
subFlags := flag.NewFlagSet(cmdName, flag.ExitOnError) | |
// Create a context for the command, cancel it if we get a signal. | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/cmd/zk/zkcmd.go:160") | |
sigRecv := make(chan os.Signal, 1) | |
signal.Notify(sigRecv, os.Interrupt) | |
go func() { | |
diff --git a/go/context2/context.go b/go/context2/context.go | |
new file mode 100644 | |
index 000000000..b4a03a28d | |
--- /dev/null | |
+++ b/go/context2/context.go | |
@@ -0,0 +1,40 @@ | |
+package context2 | |
+ | |
+import ( | |
+ "context" | |
+ "time" | |
+ | |
+ "vitess.io/vitess/go/context2" | |
+ "vitess.io/vitess/go/vt/log" | |
+) | |
+ | |
+type Context2 struct { | |
+ context.Context | |
+ reason string | |
+} | |
+ | |
+func WithCancel(parent context.Context, reason string) (context.Context, context.CancelFunc) { | |
+ ctx, cancel := context2.WithCancel(parent, "./go/context2/context.go:15") | |
+ ctx2 := Context2{ctx, reason} | |
+ return ctx2, cancel | |
+} | |
+ | |
+func WithDeadline(parent context.Context, d time.Time, reason string) (context.Context, context.CancelFunc) { | |
+ ctx, cancel := context2.WithDeadline(parent, d, "./go/context2/context.go:21") | |
+ ctx2 := Context2{ctx, reason} | |
+ return ctx2, cancel | |
+} | |
+ | |
+func WithTimeout(parent context.Context, timeout time.Duration, reason string) (context.Context, context.CancelFunc) { | |
+ ctx, cancel := context2.WithTimeout(parent, timeout, "./go/context2/context.go:27") | |
+ ctx2 := Context2{ctx, reason} | |
+ return ctx2, cancel | |
+} | |
+ | |
+func (c Context2) Err() error { | |
+ err := c.Context.Err() | |
+ if err != nil { | |
+ log.Infof("Context errored due to: %s", c.reason) | |
+ } | |
+ return err | |
+} | |
diff --git a/go/mysql/client_test.go b/go/mysql/client_test.go | |
index 13d25073e..3fa8aea13 100644 | |
--- a/go/mysql/client_test.go | |
+++ b/go/mysql/client_test.go | |
@@ -26,6 +26,7 @@ import ( | |
"time" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
) | |
// assertSQLError makes sure we get the right error. | |
@@ -71,7 +72,7 @@ func TestConnectTimeout(t *testing.T) { | |
defer listener.Close() | |
// Test that canceling the context really interrupts the Connect. | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/mysql/client_test.go:74") | |
done := make(chan struct{}) | |
go func() { | |
_, err := Connect(ctx, params) | |
@@ -85,7 +86,7 @@ func TestConnectTimeout(t *testing.T) { | |
<-done | |
// Tests a connection timeout works. | |
- ctx, cancel = context.WithTimeout(context.Background(), 100*time.Millisecond) | |
+ ctx, cancel = context2.WithTimeout(context.Background(), 100*time.Millisecond, "./go/mysql/client_test.go:88") | |
_, err = Connect(ctx, params) | |
cancel() | |
if err != context.DeadlineExceeded { | |
diff --git a/go/mysql/server_test.go b/go/mysql/server_test.go | |
index e4680497e..b582f6cda 100644 | |
--- a/go/mysql/server_test.go | |
+++ b/go/mysql/server_test.go | |
@@ -29,6 +29,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/sqltypes" | |
vtenv "vitess.io/vitess/go/vt/env" | |
"vitess.io/vitess/go/vt/log" | |
@@ -1247,7 +1248,7 @@ func TestListenerShutdown(t *testing.T) { | |
Pass: "password1", | |
} | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/mysql/server_test.go:1250") | |
defer (func() { log.Infof("ctx cancel at go/mysql/server_test.go:1250\n"); cancel() })() | |
conn, err := Connect(ctx, params) | |
diff --git a/go/pools/resource_pool.go b/go/pools/resource_pool.go | |
index e154b7628..6e287d454 100644 | |
--- a/go/pools/resource_pool.go | |
+++ b/go/pools/resource_pool.go | |
@@ -26,6 +26,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/sync2" | |
"vitess.io/vitess/go/timer" | |
"vitess.io/vitess/go/trace" | |
@@ -102,7 +103,7 @@ func NewResourcePool(factory Factory, capacity, maxCap int, idleTimeout time.Dur | |
rp.resources <- resourceWrapper{} | |
} | |
- ctx, cancel := context.WithTimeout(context.TODO(), prefillTimeout) | |
+ ctx, cancel := context2.WithTimeout(context.TODO(), prefillTimeout, "./go/pools/resource_pool.go:105") | |
defer (func() { log.Infof("ctx cancel at go/pools/resource_pool.go:104\n"); cancel() })() | |
if prefillParallelism != 0 { | |
sem := sync2.NewSemaphore(prefillParallelism, 0 /* timeout */) | |
diff --git a/go/pools/resource_pool_test.go b/go/pools/resource_pool_test.go | |
index f9a7c9ac3..13ac38c0b 100644 | |
--- a/go/pools/resource_pool_test.go | |
+++ b/go/pools/resource_pool_test.go | |
@@ -22,6 +22,7 @@ import ( | |
"time" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/sync2" | |
) | |
@@ -618,7 +619,7 @@ func TestTimeout(t *testing.T) { | |
if err != nil { | |
t.Fatal(err) | |
} | |
- newctx, cancel := context.WithTimeout(ctx, 1*time.Millisecond) | |
+ newctx, cancel := context2.WithTimeout(ctx, 1*time.Millisecond, "./go/pools/resource_pool_test.go:621") | |
_, err = p.Get(newctx) | |
cancel() | |
want := "resource pool timed out" | |
@@ -633,7 +634,7 @@ func TestExpired(t *testing.T) { | |
count.Set(0) | |
p := NewResourcePool(PoolFactory, 1, 1, time.Second, 0) | |
defer p.Close() | |
- ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(-1*time.Second)) | |
+ ctx, cancel := context2.WithDeadline(context.Background(), time.Now().Add(-1*time.Second), "./go/pools/resource_pool_test.go:636") | |
r, err := p.Get(ctx) | |
if err == nil { | |
p.Put(r) | |
diff --git a/go/vt/binlog/binlog_streamer_test.go b/go/vt/binlog/binlog_streamer_test.go | |
index 576e7b562..9a10fb38d 100644 | |
--- a/go/vt/binlog/binlog_streamer_test.go | |
+++ b/go/vt/binlog/binlog_streamer_test.go | |
@@ -26,6 +26,7 @@ import ( | |
"github.com/golang/protobuf/proto" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/mysql" | |
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" | |
@@ -193,7 +194,7 @@ func TestStreamerStop(t *testing.T) { | |
bls := NewStreamer(&mysql.ConnParams{DbName: "vt_test_keyspace"}, nil, nil, mysql.Position{}, 0, sendTransaction) | |
// Start parseEvents(), but don't send it anything, so it just waits. | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/binlog/binlog_streamer_test.go:196") | |
done := make(chan error) | |
go func() { | |
_, err := bls.parseEvents(ctx, events) | |
diff --git a/go/vt/binlog/binlogplayer/binlog_player_test.go b/go/vt/binlog/binlogplayer/binlog_player_test.go | |
index dedcfcc08..f95aa2c0c 100644 | |
--- a/go/vt/binlog/binlogplayer/binlog_player_test.go | |
+++ b/go/vt/binlog/binlogplayer/binlog_player_test.go | |
@@ -22,6 +22,7 @@ import ( | |
"time" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/mysql" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/vt/throttler" | |
@@ -306,7 +307,7 @@ func TestRetryOnDeadlock(t *testing.T) { | |
// has finished. Otherwise, it may cause race with other tests. | |
func applyEvents(blp *BinlogPlayer) func() error { | |
errChan := make(chan error) | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/binlog/binlogplayer/binlog_player_test.go:309") | |
go func() { | |
errChan <- blp.ApplyBinlogEvents(ctx) | |
diff --git a/go/vt/binlog/slave_connection.go b/go/vt/binlog/slave_connection.go | |
index 0e744d4cd..9d00ef03b 100644 | |
--- a/go/vt/binlog/slave_connection.go | |
+++ b/go/vt/binlog/slave_connection.go | |
@@ -22,6 +22,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/mysql" | |
"vitess.io/vitess/go/pools" | |
"vitess.io/vitess/go/vt/dbconfigs" | |
@@ -96,7 +97,7 @@ var slaveIDPool = pools.NewIDPool() | |
// StartBinlogDumpFromCurrent requests a replication binlog dump from | |
// the current position. | |
func (sc *SlaveConnection) StartBinlogDumpFromCurrent(ctx context.Context) (mysql.Position, <-chan mysql.BinlogEvent, error) { | |
- ctx, sc.cancel = context.WithCancel(ctx) | |
+ ctx, sc.cancel = context2.WithCancel(ctx, "./go/vt/binlog/slave_connection.go:99") | |
masterPosition, err := sc.Conn.MasterPosition() | |
if err != nil { | |
@@ -116,7 +117,7 @@ func (sc *SlaveConnection) StartBinlogDumpFromCurrent(ctx context.Context) (mysq | |
// | |
// Note the context is valid and used until eventChan is closed. | |
func (sc *SlaveConnection) StartBinlogDumpFromPosition(ctx context.Context, startPos mysql.Position) (<-chan mysql.BinlogEvent, error) { | |
- ctx, sc.cancel = context.WithCancel(ctx) | |
+ ctx, sc.cancel = context2.WithCancel(ctx, "./go/vt/binlog/slave_connection.go:119") | |
log.Infof("sending binlog dump command: startPos=%v, slaveID=%v", startPos, sc.slaveID) | |
if err := sc.SendBinlogDumpCommand(sc.slaveID, startPos); err != nil { | |
@@ -193,7 +194,7 @@ func (sc *SlaveConnection) streamEvents(ctx context.Context) chan mysql.BinlogEv | |
// | |
// Note the context is valid and used until eventChan is closed. | |
func (sc *SlaveConnection) StartBinlogDumpFromBinlogBeforeTimestamp(ctx context.Context, timestamp int64) (<-chan mysql.BinlogEvent, error) { | |
- ctx, sc.cancel = context.WithCancel(ctx) | |
+ ctx, sc.cancel = context2.WithCancel(ctx, "./go/vt/binlog/slave_connection.go:196") | |
filename, err := sc.findFileBeforeTimestamp(ctx, timestamp) | |
if err != nil { | |
diff --git a/go/vt/binlog/updatestreamctl.go b/go/vt/binlog/updatestreamctl.go | |
index 3525c28a2..faea9bd5f 100644 | |
--- a/go/vt/binlog/updatestreamctl.go | |
+++ b/go/vt/binlog/updatestreamctl.go | |
@@ -22,6 +22,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/mysql" | |
"vitess.io/vitess/go/stats" | |
"vitess.io/vitess/go/sync2" | |
@@ -266,7 +267,7 @@ func (updateStream *UpdateStreamImpl) StreamKeyRange(ctx context.Context, positi | |
return fmt.Errorf("newKeyspaceIDResolverFactory failed: %v", err) | |
} | |
- streamCtx, cancel := context.WithCancel(ctx) | |
+ streamCtx, cancel := context2.WithCancel(ctx, "./go/vt/binlog/updatestreamctl.go:269") | |
i := updateStream.streams.Add(cancel) | |
defer updateStream.streams.Delete(i) | |
@@ -302,7 +303,7 @@ func (updateStream *UpdateStreamImpl) StreamTables(ctx context.Context, position | |
}) | |
bls := NewStreamer(updateStream.cp, updateStream.se, charset, pos, 0, f) | |
- streamCtx, cancel := context.WithCancel(ctx) | |
+ streamCtx, cancel := context2.WithCancel(ctx, "./go/vt/binlog/updatestreamctl.go:305") | |
i := updateStream.streams.Add(cancel) | |
defer updateStream.streams.Delete(i) | |
diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go | |
index 65b7d2cb7..af7c994d5 100644 | |
--- a/go/vt/discovery/healthcheck.go | |
+++ b/go/vt/discovery/healthcheck.go | |
@@ -51,6 +51,7 @@ import ( | |
"github.com/golang/protobuf/proto" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/netutil" | |
"vitess.io/vitess/go/stats" | |
"vitess.io/vitess/go/sync2" | |
@@ -495,7 +496,7 @@ func (hc *HealthCheckImpl) finalizeConn(hcc *healthCheckConn) { | |
if hcc.conn != nil { | |
// Don't use hcc.ctx because it's already closed. | |
// Use a separate context, and add a timeout to prevent unbounded waits. | |
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), 10*time.Second, "./go/vt/discovery/healthcheck.go:498") | |
defer (func() { log.Infof("ctx cancel at go/vt/discovery/healthcheck.go:499\n"); cancel() })() | |
hcc.conn.Close(ctx) | |
hcc.conn = nil | |
@@ -513,7 +514,7 @@ func (hc *HealthCheckImpl) checkConn(hcc *healthCheckConn, name string) { | |
retryDelay := hc.retryDelay | |
for { | |
- streamCtx, streamCancel := context.WithCancel(hcc.ctx) | |
+ streamCtx, streamCancel := context2.WithCancel(hcc.ctx, "./go/vt/discovery/healthcheck.go:516") | |
// Setup a watcher that restarts the timer every time an update is received. | |
// If a timeout occurs for a serving tablet, we make it non-serving and send | |
@@ -717,7 +718,7 @@ func (hc *HealthCheckImpl) SetListener(listener HealthCheckStatsListener, sendDo | |
// It does not block on making connection. | |
// name is an optional tag for the tablet, e.g. an alternative address. | |
func (hc *HealthCheckImpl) AddTablet(tablet *topodatapb.Tablet, name string) { | |
- ctx, cancelFunc := context.WithCancel(context.Background()) | |
+ ctx, cancelFunc := context2.WithCancel(context.Background(), "./go/vt/discovery/healthcheck.go:720") | |
key := TabletToMapKey(tablet) | |
hcc := &healthCheckConn{ | |
ctx: ctx, | |
diff --git a/go/vt/discovery/tablet_stats_cache_wait_test.go b/go/vt/discovery/tablet_stats_cache_wait_test.go | |
index 19933494f..cffab6b4d 100644 | |
--- a/go/vt/discovery/tablet_stats_cache_wait_test.go | |
+++ b/go/vt/discovery/tablet_stats_cache_wait_test.go | |
@@ -22,6 +22,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/topo" | |
querypb "vitess.io/vitess/go/vt/proto/query" | |
@@ -29,7 +30,7 @@ import ( | |
) | |
func TestWaitForTablets(t *testing.T) { | |
- shortCtx, shortCancel := context.WithTimeout(context.Background(), 10*time.Millisecond) | |
+ shortCtx, shortCancel := context2.WithTimeout(context.Background(), 10*time.Millisecond, "./go/vt/discovery/tablet_stats_cache_wait_test.go:32") | |
defer shortCancel() | |
waitAvailableTabletInterval = 20 * time.Millisecond | |
@@ -48,7 +49,7 @@ func TestWaitForTablets(t *testing.T) { | |
} | |
// this should fail, but return a non-timeout error | |
- cancelledCtx, cancel := context.WithCancel(context.Background()) | |
+ cancelledCtx, cancel := context2.WithCancel(context.Background(), "./go/vt/discovery/tablet_stats_cache_wait_test.go:51") | |
cancel() | |
if err := tsc.WaitForTablets(cancelledCtx, "cell", "keyspace", "shard", topodatapb.TabletType_REPLICA); err == nil || err == context.DeadlineExceeded { | |
t.Errorf("want: non-timeout error, got: %v", err) | |
@@ -67,7 +68,7 @@ func TestWaitForTablets(t *testing.T) { | |
input <- shr | |
// and ask again, with longer time outs so it's not flaky | |
- longCtx, longCancel := context.WithTimeout(context.Background(), 10*time.Second) | |
+ longCtx, longCancel := context2.WithTimeout(context.Background(), 10*time.Second, "./go/vt/discovery/tablet_stats_cache_wait_test.go:70") | |
defer longCancel() | |
waitAvailableTabletInterval = 10 * time.Millisecond | |
if err := tsc.WaitForTablets(longCtx, "cell", "keyspace", "shard", topodatapb.TabletType_REPLICA); err != nil { | |
diff --git a/go/vt/discovery/topology_watcher.go b/go/vt/discovery/topology_watcher.go | |
index a5a0ca971..8ef10d24c 100644 | |
--- a/go/vt/discovery/topology_watcher.go | |
+++ b/go/vt/discovery/topology_watcher.go | |
@@ -26,6 +26,7 @@ import ( | |
"time" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/stats" | |
"vitess.io/vitess/go/trace" | |
@@ -153,7 +154,7 @@ func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, tr TabletR | |
// We want the span from the context, but not the cancelation that comes with it | |
spanContext := trace.CopySpan(context.Background(), ctx) | |
- tw.ctx, tw.cancelFunc = context.WithCancel(spanContext) | |
+ tw.ctx, tw.cancelFunc = context2.WithCancel(spanContext, "./go/vt/discovery/topology_watcher.go:156") | |
tw.wg.Add(1) | |
go tw.watch() | |
return tw | |
diff --git a/go/vt/grpcclient/client_test.go b/go/vt/grpcclient/client_test.go | |
index 106a0937e..306040c04 100644 | |
--- a/go/vt/grpcclient/client_test.go | |
+++ b/go/vt/grpcclient/client_test.go | |
@@ -24,6 +24,7 @@ import ( | |
"google.golang.org/grpc" | |
+ "vitess.io/vitess/go/context2" | |
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" | |
vtgateservicepb "vitess.io/vitess/go/vt/proto/vtgateservice" | |
) | |
@@ -41,7 +42,7 @@ func TestDialErrors(t *testing.T) { | |
t.Fatal(err) | |
} | |
vtg := vtgateservicepb.NewVitessClient(gconn) | |
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), 10*time.Millisecond, "./go/vt/grpcclient/client_test.go:44") | |
_, err = vtg.Execute(ctx, &vtgatepb.ExecuteRequest{}) | |
cancel() | |
gconn.Close() | |
@@ -57,7 +58,7 @@ func TestDialErrors(t *testing.T) { | |
t.Fatal(err) | |
} | |
vtg := vtgateservicepb.NewVitessClient(gconn) | |
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), 10*time.Millisecond, "./go/vt/grpcclient/client_test.go:60") | |
_, err = vtg.Execute(ctx, &vtgatepb.ExecuteRequest{}) | |
cancel() | |
gconn.Close() | |
diff --git a/go/vt/mysqlctl/query.go b/go/vt/mysqlctl/query.go | |
index e4c200abb..645996790 100644 | |
--- a/go/vt/mysqlctl/query.go | |
+++ b/go/vt/mysqlctl/query.go | |
@@ -23,6 +23,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/mysql" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/vt/dbconnpool" | |
@@ -162,7 +163,7 @@ func (mysqld *Mysqld) killConnection(connID int64) error { | |
// Get another connection with which to kill. | |
// Use background context because the caller's context is likely expired, | |
// which is the reason we're being asked to kill the connection. | |
- ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), 1*time.Second, "./go/vt/mysqlctl/query.go:165") | |
defer (func() { log.Infof("ctx cancel at go/vt/mysqlctl/query.go:166\n"); cancel() })() | |
if poolConn, connErr := getPoolReconnect(ctx, mysqld.dbaPool); connErr == nil { | |
// We got a pool connection. | |
diff --git a/go/vt/schemamanager/schemaswap/schema_swap.go b/go/vt/schemamanager/schemaswap/schema_swap.go | |
index d7688832c..82e0bb6a4 100644 | |
--- a/go/vt/schemamanager/schemaswap/schema_swap.go | |
+++ b/go/vt/schemamanager/schemaswap/schema_swap.go | |
@@ -29,6 +29,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/vt/concurrency" | |
"vitess.io/vitess/go/vt/discovery" | |
@@ -882,7 +883,7 @@ func (array orderTabletsForSwap) Less(i, j int) bool { | |
// executeAdminQuery executes a query on a given tablet as 'allprivs' user. The query is executed | |
// using timeout value from --schema_swap_admin_query_timeout flag. | |
func (shardSwap *shardSchemaSwap) executeAdminQuery(tablet *topodatapb.Tablet, query string, maxRows int) (*sqltypes.Result, error) { | |
- sqlCtx, cancelSQLCtx := context.WithTimeout(shardSwap.parent.ctx, *adminQueryTimeout) | |
+ sqlCtx, cancelSQLCtx := context2.WithTimeout(shardSwap.parent.ctx, *adminQueryTimeout, "./go/vt/schemamanager/schemaswap/schema_swap.go:885") | |
defer (func() { | |
log.Infof("ctx cancel at go/vt/schemamanager/schemaswap/schema_swap.go:886\n") | |
cancelSQLCtx() | |
diff --git a/go/vt/schemamanager/tablet_executor.go b/go/vt/schemamanager/tablet_executor.go | |
index 6c7514223..d4f8f96ab 100644 | |
--- a/go/vt/schemamanager/tablet_executor.go | |
+++ b/go/vt/schemamanager/tablet_executor.go | |
@@ -23,6 +23,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/sync2" | |
"vitess.io/vitess/go/vt/log" | |
"vitess.io/vitess/go/vt/sqlparser" | |
@@ -253,7 +254,7 @@ func (exec *TabletExecutor) executeOnAllTablets(ctx context.Context, execResult | |
// execute the schema change via replication. This is best-effort, meaning | |
// we still return overall success if the timeout expires. | |
concurrency := sync2.NewSemaphore(10, 0) | |
- reloadCtx, cancel := context.WithTimeout(ctx, exec.waitSlaveTimeout) | |
+ reloadCtx, cancel := context2.WithTimeout(ctx, exec.waitSlaveTimeout, "./go/vt/schemamanager/tablet_executor.go:256") | |
defer (func() { log.Infof("ctx cancel at go/vt/schemamanager/tablet_executor.go:256\n"); cancel() })() | |
for _, result := range execResult.SuccessShards { | |
wg.Add(1) | |
diff --git a/go/vt/srvtopo/resilient_server_flaky_test.go b/go/vt/srvtopo/resilient_server_flaky_test.go | |
index bad1800ce..2b197b4e4 100644 | |
--- a/go/vt/srvtopo/resilient_server_flaky_test.go | |
+++ b/go/vt/srvtopo/resilient_server_flaky_test.go | |
@@ -27,6 +27,7 @@ import ( | |
"github.com/golang/protobuf/proto" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/log" | |
"vitess.io/vitess/go/vt/status" | |
"vitess.io/vitess/go/vt/topo" | |
@@ -309,7 +310,7 @@ func TestGetSrvKeyspace(t *testing.T) { | |
time.Sleep(*srvTopoCacheTTL) | |
- timeoutCtx, _ := context.WithTimeout(context.Background(), *srvTopoCacheRefresh*2) | |
+ timeoutCtx, _ := context2.WithTimeout(context.Background(), *srvTopoCacheRefresh*2, "./go/vt/srvtopo/resilient_server_flaky_test.go:312") | |
_, err = rs.GetSrvKeyspace(timeoutCtx, "test_cell", "test_ks") | |
wantErr := "timed out waiting for keyspace" | |
if err == nil || err.Error() != wantErr { | |
@@ -347,7 +348,7 @@ func TestSrvKeyspaceCachedError(t *testing.T) { | |
time.Sleep(*srvTopoCacheTTL + 10*time.Millisecond) | |
// Ask again with a different context, should get an error and | |
// save that context. | |
- ctx, cancel := context.WithCancel(ctx) | |
+ ctx, cancel := context2.WithCancel(ctx, "./go/vt/srvtopo/resilient_server_flaky_test.go:350") | |
defer (func() { log.Infof("ctx cancel at go/vt/srvtopo/resilient_server_flaky_test.go:350\n"); cancel() })() | |
_, err2 := rs.GetSrvKeyspace(ctx, "test_cell", "unknown_ks") | |
if err2 == nil { | |
@@ -599,7 +600,7 @@ func TestGetSrvKeyspaceNames(t *testing.T) { | |
time.Sleep(*srvTopoCacheTTL) | |
- timeoutCtx, _ := context.WithTimeout(context.Background(), *srvTopoCacheRefresh*2) | |
+ timeoutCtx, _ := context2.WithTimeout(context.Background(), *srvTopoCacheRefresh*2, "./go/vt/srvtopo/resilient_server_flaky_test.go:602") | |
_, err = rs.GetSrvKeyspaceNames(timeoutCtx, "test_cell") | |
wantErr := "timed out waiting for keyspace names" | |
if err == nil || err.Error() != wantErr { | |
diff --git a/go/vt/topo/consultopo/election.go b/go/vt/topo/consultopo/election.go | |
index bacbbac8d..65455b1c8 100644 | |
--- a/go/vt/topo/consultopo/election.go | |
+++ b/go/vt/topo/consultopo/election.go | |
@@ -22,6 +22,7 @@ import ( | |
"golang.org/x/net/context" | |
"github.com/hashicorp/consul/api" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/log" | |
"vitess.io/vitess/go/vt/topo" | |
) | |
@@ -90,7 +91,7 @@ func (mp *consulMasterParticipation) WaitForMastership() (context.Context, error | |
} | |
// We have the lock, keep mastership until we lose it. | |
- lockCtx, lockCancel := context.WithCancel(context.Background()) | |
+ lockCtx, lockCancel := context2.WithCancel(context.Background(), "./go/vt/topo/consultopo/election.go:93") | |
go func() { | |
select { | |
case <-lost: | |
diff --git a/go/vt/topo/consultopo/watch.go b/go/vt/topo/consultopo/watch.go | |
index 8021738ca..de4ca89b9 100644 | |
--- a/go/vt/topo/consultopo/watch.go | |
+++ b/go/vt/topo/consultopo/watch.go | |
@@ -24,6 +24,7 @@ import ( | |
"golang.org/x/net/context" | |
"github.com/hashicorp/consul/api" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/topo" | |
) | |
@@ -51,7 +52,7 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, < | |
} | |
// Create a context, will be used to cancel the watch. | |
- watchCtx, watchCancel := context.WithCancel(context.Background()) | |
+ watchCtx, watchCancel := context2.WithCancel(context.Background(), "./go/vt/topo/consultopo/watch.go:54") | |
// Create the notifications channel, send updates to it. | |
notifications := make(chan *topo.WatchData, 10) | |
diff --git a/go/vt/topo/etcd2topo/election.go b/go/vt/topo/etcd2topo/election.go | |
index 354c5e499..ff3b66256 100644 | |
--- a/go/vt/topo/etcd2topo/election.go | |
+++ b/go/vt/topo/etcd2topo/election.go | |
@@ -22,6 +22,7 @@ import ( | |
"github.com/coreos/etcd/clientv3" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/log" | |
"vitess.io/vitess/go/vt/topo" | |
) | |
@@ -73,7 +74,7 @@ func (mp *etcdMasterParticipation) WaitForMastership() (context.Context, error) | |
// We use a cancelable context here. If stop is closed, | |
// we just cancel that context. | |
- lockCtx, lockCancel := context.WithCancel(context.Background()) | |
+ lockCtx, lockCancel := context2.WithCancel(context.Background(), "./go/vt/topo/etcd2topo/election.go:76") | |
go func() { | |
<-mp.stop | |
if ld != nil { | |
diff --git a/go/vt/topo/etcd2topo/lock.go b/go/vt/topo/etcd2topo/lock.go | |
index 471bac495..966e31f11 100644 | |
--- a/go/vt/topo/etcd2topo/lock.go | |
+++ b/go/vt/topo/etcd2topo/lock.go | |
@@ -24,6 +24,7 @@ import ( | |
"github.com/coreos/etcd/clientv3" | |
"github.com/coreos/etcd/mvcc/mvccpb" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/proto/vtrpc" | |
"vitess.io/vitess/go/vt/vterrors" | |
@@ -86,7 +87,7 @@ func (s *Server) waitOnLastRev(ctx context.Context, cli *clientv3.Client, nodePa | |
// Wait for release on blocking key. Cancel the watch when we | |
// exit this function. | |
key := string(lastKey.Kvs[0].Key) | |
- ctx, cancel := context.WithCancel(ctx) | |
+ ctx, cancel := context2.WithCancel(ctx, "./go/vt/topo/etcd2topo/lock.go:89") | |
defer (func() { log.Infof("ctx cancel at go/vt/topo/etcd2topo/lock.go:90\n"); cancel() })() | |
wc := cli.Watch(ctx, key, clientv3.WithRev(revision)) | |
if wc == nil { | |
diff --git a/go/vt/topo/etcd2topo/server_test.go b/go/vt/topo/etcd2topo/server_test.go | |
index 0254878c5..098295dc9 100644 | |
--- a/go/vt/topo/etcd2topo/server_test.go | |
+++ b/go/vt/topo/etcd2topo/server_test.go | |
@@ -28,6 +28,7 @@ import ( | |
"golang.org/x/net/context" | |
"github.com/coreos/etcd/clientv3" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/testfiles" | |
"vitess.io/vitess/go/vt/log" | |
"vitess.io/vitess/go/vt/topo" | |
@@ -74,7 +75,7 @@ func startEtcd(t *testing.T) (*exec.Cmd, string, string) { | |
} | |
// Wait until we can list "/", or timeout. | |
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), 10*time.Second, "./go/vt/topo/etcd2topo/server_test.go:77") | |
defer (func() { log.Infof("ctx cancel at go/vt/topo/etcd2topo/server_test.go:77\n"); cancel() })() | |
start := time.Now() | |
for { | |
diff --git a/go/vt/topo/etcd2topo/watch.go b/go/vt/topo/etcd2topo/watch.go | |
index 706fd994e..cb024798c 100644 | |
--- a/go/vt/topo/etcd2topo/watch.go | |
+++ b/go/vt/topo/etcd2topo/watch.go | |
@@ -23,6 +23,7 @@ import ( | |
"github.com/coreos/etcd/clientv3" | |
"github.com/coreos/etcd/mvcc/mvccpb" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/proto/vtrpc" | |
"vitess.io/vitess/go/vt/vterrors" | |
@@ -50,10 +51,10 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, < | |
} | |
// Create an outer context that will be canceled on return and will cancel all inner watches. | |
- outerCtx, outerCancel := context.WithCancel(context.Background()) | |
+ outerCtx, outerCancel := context2.WithCancel(context.Background(), "./go/vt/topo/etcd2topo/watch.go:53") | |
// Create a context, will be used to cancel the watch on retry. | |
- watchCtx, watchCancel := context.WithCancel(outerCtx) | |
+ watchCtx, watchCancel := context2.WithCancel(outerCtx, "./go/vt/topo/etcd2topo/watch.go:56") | |
// Create the Watcher. We start watching from the response we | |
// got, not from the file original version, as the server may | |
@@ -87,7 +88,7 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, < | |
watchRetries++ | |
// Cancel inner context on retry and create new one. | |
watchCancel() | |
- watchCtx, watchCancel = context.WithCancel(outerCtx) | |
+ watchCtx, watchCancel = context2.WithCancel(outerCtx, "./go/vt/topo/etcd2topo/watch.go:90") | |
newWatcher := s.cli.Watch(watchCtx, nodePath, clientv3.WithRev(currVersion)) | |
if newWatcher == nil { | |
log.Warningf("watch %v failed and get a nil channel returned, currVersion: %v", nodePath, currVersion) | |
diff --git a/go/vt/topo/locks.go b/go/vt/topo/locks.go | |
index 220dd1fcb..8cdc58d29 100644 | |
--- a/go/vt/topo/locks.go | |
+++ b/go/vt/topo/locks.go | |
@@ -26,6 +26,7 @@ import ( | |
"time" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/proto/vtrpc" | |
"vitess.io/vitess/go/vt/vterrors" | |
@@ -217,7 +218,7 @@ func CheckKeyspaceLocked(ctx context.Context, keyspace string) error { | |
func (l *Lock) lockKeyspace(ctx context.Context, ts *Server, keyspace string) (LockDescriptor, error) { | |
log.Infof("Locking keyspace %v for action %v", keyspace, l.Action) | |
- ctx, cancel := context.WithTimeout(ctx, *RemoteOperationTimeout) | |
+ ctx, cancel := context2.WithTimeout(ctx, *RemoteOperationTimeout, "./go/vt/topo/locks.go:220") | |
defer (func() { log.Infof("ctx cancel at go/vt/topo/locks.go:221\n"); cancel() })() | |
span, ctx := trace.NewSpan(ctx, "TopoServer.LockKeyspaceForAction") | |
@@ -241,7 +242,7 @@ func (l *Lock) unlockKeyspace(ctx context.Context, ts *Server, keyspace string, | |
// Note that we are not using the user provided RemoteOperationTimeout | |
// here because it is possible that that timeout is too short. | |
ctx = trace.CopySpan(context.TODO(), ctx) | |
- ctx, cancel := context.WithTimeout(ctx, defaultLockTimeout) | |
+ ctx, cancel := context2.WithTimeout(ctx, defaultLockTimeout, "./go/vt/topo/locks.go:244") | |
defer (func() { log.Infof("ctx cancel at go/vt/topo/locks.go:245\n"); cancel() })() | |
span, ctx := trace.NewSpan(ctx, "TopoServer.UnlockKeyspaceForAction") | |
@@ -358,7 +359,7 @@ func CheckShardLocked(ctx context.Context, keyspace, shard string) error { | |
func (l *Lock) lockShard(ctx context.Context, ts *Server, keyspace, shard string) (LockDescriptor, error) { | |
log.Infof("Locking shard %v/%v for action %v", keyspace, shard, l.Action) | |
- ctx, cancel := context.WithTimeout(ctx, *RemoteOperationTimeout) | |
+ ctx, cancel := context2.WithTimeout(ctx, *RemoteOperationTimeout, "./go/vt/topo/locks.go:361") | |
defer (func() { log.Infof("ctx cancel at go/vt/topo/locks.go:362\n"); cancel() })() | |
span, ctx := trace.NewSpan(ctx, "TopoServer.LockShardForAction") | |
@@ -382,7 +383,7 @@ func (l *Lock) unlockShard(ctx context.Context, ts *Server, keyspace, shard stri | |
// Note that we are not using the user provided RemoteOperationTimeout | |
// here because it is possible that that timeout is too short. | |
ctx = trace.CopySpan(context.TODO(), ctx) | |
- ctx, cancel := context.WithTimeout(ctx, defaultLockTimeout) | |
+ ctx, cancel := context2.WithTimeout(ctx, defaultLockTimeout, "./go/vt/topo/locks.go:385") | |
defer (func() { log.Infof("ctx cancel at go/vt/topo/locks.go:386\n"); cancel() })() | |
span, ctx := trace.NewSpan(ctx, "TopoServer.UnlockShardForAction") | |
diff --git a/go/vt/topo/memorytopo/election.go b/go/vt/topo/memorytopo/election.go | |
index f66154a02..16db7ccb7 100644 | |
--- a/go/vt/topo/memorytopo/election.go | |
+++ b/go/vt/topo/memorytopo/election.go | |
@@ -21,6 +21,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/log" | |
"vitess.io/vitess/go/vt/topo" | |
) | |
@@ -81,7 +82,7 @@ func (mp *cMasterParticipation) WaitForMastership() (context.Context, error) { | |
// We use a cancelable context here. If stop is closed, | |
// we just cancel that context. | |
- lockCtx, lockCancel := context.WithCancel(context.Background()) | |
+ lockCtx, lockCancel := context2.WithCancel(context.Background(), "./go/vt/topo/memorytopo/election.go:84") | |
go func() { | |
<-mp.stop | |
if ld != nil { | |
diff --git a/go/vt/topo/test/lock.go b/go/vt/topo/test/lock.go | |
index 315653ff0..ab5e60043 100644 | |
--- a/go/vt/topo/test/lock.go | |
+++ b/go/vt/topo/test/lock.go | |
@@ -22,6 +22,7 @@ import ( | |
"time" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/topo" | |
topodatapb "vitess.io/vitess/go/vt/proto/topodata" | |
@@ -92,14 +93,14 @@ func checkLockTimeout(ctx context.Context, t *testing.T, conn topo.Conn) { | |
} | |
// test we can't take the lock again | |
- fastCtx, cancel := context.WithTimeout(ctx, timeUntilLockIsTaken) | |
+ fastCtx, cancel := context2.WithTimeout(ctx, timeUntilLockIsTaken, "./go/vt/topo/test/lock.go:95") | |
if _, err := conn.Lock(fastCtx, keyspacePath, "again"); !topo.IsErrType(err, topo.Timeout) { | |
t.Fatalf("Lock(again): %v", err) | |
} | |
cancel() | |
// test we can interrupt taking the lock | |
- interruptCtx, cancel := context.WithCancel(ctx) | |
+ interruptCtx, cancel := context2.WithCancel(ctx, "./go/vt/topo/test/lock.go:102") | |
go func() { | |
time.Sleep(timeUntilLockIsTaken) | |
cancel() | |
diff --git a/go/vt/topo/zk2topo/election.go b/go/vt/topo/zk2topo/election.go | |
index 86639411f..92509aa60 100644 | |
--- a/go/vt/topo/zk2topo/election.go | |
+++ b/go/vt/topo/zk2topo/election.go | |
@@ -22,6 +22,7 @@ import ( | |
"github.com/z-division/go-zookeeper/zk" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/vterrors" | |
"vitess.io/vitess/go/vt/log" | |
@@ -49,7 +50,7 @@ func (zs *Server) NewMasterParticipation(name, id string) (topo.MasterParticipat | |
id: []byte(id), | |
done: make(chan struct{}), | |
} | |
- result.stopCtx, result.stopCtxCancel = context.WithCancel(context.Background()) | |
+ result.stopCtx, result.stopCtxCancel = context2.WithCancel(context.Background(), "./go/vt/topo/zk2topo/election.go:52") | |
return result, nil | |
} | |
@@ -122,7 +123,7 @@ func (mp *zkMasterParticipation) WaitForMastership() (context.Context, error) { | |
} | |
// we got the lock, create our background context | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/topo/zk2topo/election.go:125") | |
go mp.watchMastership(ctx, mp.zs.conn, proposal, cancel) | |
return ctx, nil | |
} | |
diff --git a/go/vt/vtctl/throttler.go b/go/vt/vtctl/throttler.go | |
index 1d5db1d96..2831235d2 100644 | |
--- a/go/vt/vtctl/throttler.go | |
+++ b/go/vt/vtctl/throttler.go | |
@@ -27,6 +27,7 @@ import ( | |
"github.com/olekukonko/tablewriter" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/log" | |
"vitess.io/vitess/go/vt/logutil" | |
"vitess.io/vitess/go/vt/throttler" | |
@@ -87,7 +88,7 @@ func commandThrottlerMaxRates(ctx context.Context, wr *wrangler.Wrangler, subFla | |
} | |
// Connect to the server. | |
- ctx, cancel := context.WithTimeout(ctx, shortTimeout) | |
+ ctx, cancel := context2.WithTimeout(ctx, shortTimeout, "./go/vt/vtctl/throttler.go:90") | |
defer (func() { log.Infof("ctx cancel at go/vt/vtctl/throttler.go:90\n"); cancel() })() | |
client, err := throttlerclient.New(*server) | |
if err != nil { | |
@@ -140,7 +141,7 @@ func commandThrottlerSetMaxRate(ctx context.Context, wr *wrangler.Wrangler, subF | |
} | |
// Connect to the server. | |
- ctx, cancel := context.WithTimeout(ctx, shortTimeout) | |
+ ctx, cancel := context2.WithTimeout(ctx, shortTimeout, "./go/vt/vtctl/throttler.go:143") | |
defer (func() { log.Infof("ctx cancel at go/vt/vtctl/throttler.go:143\n"); cancel() })() | |
client, err := throttlerclient.New(*server) | |
if err != nil { | |
@@ -177,7 +178,7 @@ func commandGetThrottlerConfiguration(ctx context.Context, wr *wrangler.Wrangler | |
} | |
// Connect to the server. | |
- ctx, cancel := context.WithTimeout(ctx, shortTimeout) | |
+ ctx, cancel := context2.WithTimeout(ctx, shortTimeout, "./go/vt/vtctl/throttler.go:180") | |
defer (func() { log.Infof("ctx cancel at go/vt/vtctl/throttler.go:180\n"); cancel() })() | |
client, err := throttlerclient.New(*server) | |
if err != nil { | |
@@ -231,7 +232,7 @@ func commandUpdateThrottlerConfiguration(ctx context.Context, wr *wrangler.Wrang | |
} | |
// Connect to the server. | |
- ctx, cancel := context.WithTimeout(ctx, shortTimeout) | |
+ ctx, cancel := context2.WithTimeout(ctx, shortTimeout, "./go/vt/vtctl/throttler.go:234") | |
defer (func() { log.Infof("ctx cancel at go/vt/vtctl/throttler.go:234\n"); cancel() })() | |
client, err := throttlerclient.New(*server) | |
if err != nil { | |
@@ -269,7 +270,7 @@ func commandResetThrottlerConfiguration(ctx context.Context, wr *wrangler.Wrangl | |
} | |
// Connect to the server. | |
- ctx, cancel := context.WithTimeout(ctx, shortTimeout) | |
+ ctx, cancel := context2.WithTimeout(ctx, shortTimeout, "./go/vt/vtctl/throttler.go:272") | |
defer (func() { log.Infof("ctx cancel at go/vt/vtctl/throttler.go:272\n"); cancel() })() | |
client, err := throttlerclient.New(*server) | |
if err != nil { | |
diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go | |
index 6d9adc726..af31e63b0 100644 | |
--- a/go/vt/vtctl/vtctl.go | |
+++ b/go/vt/vtctl/vtctl.go | |
@@ -107,6 +107,7 @@ import ( | |
"github.com/golang/protobuf/proto" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/flagutil" | |
"vitess.io/vitess/go/json2" | |
"vitess.io/vitess/go/mysql" | |
@@ -1011,7 +1012,7 @@ func commandWaitForDrain(ctx context.Context, wr *wrangler.Wrangler, subFlags *f | |
} | |
if *timeout != 0 { | |
var cancel context.CancelFunc | |
- ctx, cancel = context.WithTimeout(ctx, *timeout) | |
+ ctx, cancel = context2.WithTimeout(ctx, *timeout, "./go/vt/vtctl/vtctl.go:1014") | |
defer (func() { log.Infof("ctx cancel at go/vt/vtctl/vtctl.go:1014\n"); cancel() })() | |
} | |
diff --git a/go/vt/vtctld/action_repository.go b/go/vt/vtctld/action_repository.go | |
index 7b484707e..c957b3280 100644 | |
--- a/go/vt/vtctld/action_repository.go | |
+++ b/go/vt/vtctld/action_repository.go | |
@@ -24,6 +24,7 @@ import ( | |
"golang.org/x/net/context" | |
"vitess.io/vitess/go/acl" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/logutil" | |
"vitess.io/vitess/go/vt/topo" | |
"vitess.io/vitess/go/vt/topo/topoproto" | |
@@ -115,7 +116,7 @@ func (ar *ActionRepository) ApplyKeyspaceAction(ctx context.Context, actionName, | |
return result | |
} | |
- ctx, cancel := context.WithTimeout(ctx, *actionTimeout) | |
+ ctx, cancel := context2.WithTimeout(ctx, *actionTimeout, "./go/vt/vtctld/action_repository.go:118") | |
wr := wrangler.New(logutil.NewConsoleLogger(), ar.ts, tmclient.NewTabletManagerClient()) | |
output, err := action(ctx, wr, keyspace, r) | |
cancel() | |
@@ -142,7 +143,7 @@ func (ar *ActionRepository) ApplyShardAction(ctx context.Context, actionName, ke | |
return result | |
} | |
- ctx, cancel := context.WithTimeout(ctx, *actionTimeout) | |
+ ctx, cancel := context2.WithTimeout(ctx, *actionTimeout, "./go/vt/vtctld/action_repository.go:145") | |
wr := wrangler.New(logutil.NewConsoleLogger(), ar.ts, tmclient.NewTabletManagerClient()) | |
output, err := action(ctx, wr, keyspace, shard, r) | |
cancel() | |
@@ -176,7 +177,7 @@ func (ar *ActionRepository) ApplyTabletAction(ctx context.Context, actionName st | |
} | |
// run the action | |
- ctx, cancel := context.WithTimeout(ctx, *actionTimeout) | |
+ ctx, cancel := context2.WithTimeout(ctx, *actionTimeout, "./go/vt/vtctld/action_repository.go:179") | |
wr := wrangler.New(logutil.NewConsoleLogger(), ar.ts, tmclient.NewTabletManagerClient()) | |
output, err := action.method(ctx, wr, tabletAlias, r) | |
cancel() | |
diff --git a/go/vt/vtctld/tablet_data_test.go b/go/vt/vtctld/tablet_data_test.go | |
index edc69d985..e8b32d2d2 100644 | |
--- a/go/vt/vtctld/tablet_data_test.go | |
+++ b/go/vt/vtctld/tablet_data_test.go | |
@@ -24,6 +24,7 @@ import ( | |
"github.com/golang/protobuf/proto" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/logutil" | |
querypb "vitess.io/vitess/go/vt/proto/query" | |
topodatapb "vitess.io/vitess/go/vt/proto/topodata" | |
@@ -141,7 +142,7 @@ func TestTabletData(t *testing.T) { | |
}() | |
// Start streaming and wait for the first result. | |
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), 5*time.Second, "./go/vt/vtctld/tablet_data_test.go:144") | |
result, err := thc.Get(ctx, tablet1.Tablet.Alias) | |
cancel() | |
close(stop) | |
diff --git a/go/vt/vtctld/workflow.go b/go/vt/vtctld/workflow.go | |
index 1982e9093..e95b4a5fc 100644 | |
--- a/go/vt/vtctld/workflow.go | |
+++ b/go/vt/vtctld/workflow.go | |
@@ -21,6 +21,7 @@ import ( | |
"time" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/trace" | |
"vitess.io/vitess/go/flagutil" | |
@@ -85,7 +86,7 @@ func initWorkflowManager(ts *topo.Server) { | |
} | |
func runWorkflowManagerAlone() { | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vtctld/workflow.go:88") | |
go vtctl.WorkflowManager.Run(ctx) | |
// Running cancel on OnTermSync will cancel the context of any | |
diff --git a/go/vt/vtgate/buffer/buffer_test.go b/go/vt/vtgate/buffer/buffer_test.go | |
index 5951ebb65..f4d7074e2 100644 | |
--- a/go/vt/vtgate/buffer/buffer_test.go | |
+++ b/go/vt/vtgate/buffer/buffer_test.go | |
@@ -26,6 +26,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/discovery" | |
"vitess.io/vitess/go/vt/topo/topoproto" | |
"vitess.io/vitess/go/vt/vterrors" | |
@@ -574,7 +575,7 @@ func testRequestCanceled(t *testing.T, explicitEnd bool) { | |
if err := waitForRequestsInFlight(b, 1); err != nil { | |
t.Fatal(err) | |
} | |
- ctx2, cancel2 := context.WithCancel(context.Background()) | |
+ ctx2, cancel2 := context2.WithCancel(context.Background(), "./go/vt/vtgate/buffer/buffer_test.go:577") | |
stopped2 := issueRequest(ctx2, t, b, failoverErr) | |
if err := waitForRequestsInFlight(b, 2); err != nil { | |
t.Fatal(err) | |
diff --git a/go/vt/vtgate/buffer/shard_buffer.go b/go/vt/vtgate/buffer/shard_buffer.go | |
index 05623c126..17df120db 100644 | |
--- a/go/vt/vtgate/buffer/shard_buffer.go | |
+++ b/go/vt/vtgate/buffer/shard_buffer.go | |
@@ -24,6 +24,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/sync2" | |
"vitess.io/vitess/go/vt/log" | |
"vitess.io/vitess/go/vt/logutil" | |
@@ -327,7 +328,7 @@ func (sb *shardBuffer) bufferRequestLocked(ctx context.Context) (*entry, error) | |
done: make(chan struct{}), | |
deadline: sb.now().Add(*window), | |
} | |
- e.bufferCtx, e.bufferCancel = context.WithCancel(ctx) | |
+ e.bufferCtx, e.bufferCancel = context2.WithCancel(ctx, "./go/vt/vtgate/buffer/shard_buffer.go:330") | |
sb.queue = append(sb.queue, e) | |
if max := lastRequestsInFlightMax.Counts()[sb.statsKeyJoined]; max < int64(len(sb.queue)) { | |
diff --git a/go/vt/vtgate/endtoend/vstream_test.go b/go/vt/vtgate/endtoend/vstream_test.go | |
index fe5956ee0..1bdecfc06 100644 | |
--- a/go/vt/vtgate/endtoend/vstream_test.go | |
+++ b/go/vt/vtgate/endtoend/vstream_test.go | |
@@ -22,6 +22,7 @@ import ( | |
"testing" | |
"github.com/golang/protobuf/proto" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/mysql" | |
"vitess.io/vitess/go/vt/log" | |
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" | |
@@ -32,7 +33,7 @@ import ( | |
) | |
func TestVStream(t *testing.T) { | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vtgate/endtoend/vstream_test.go:35") | |
defer (func() { log.Infof("ctx cancel at go/vt/vtgate/endtoend/vstream_test.go:35\n"); cancel() })() | |
gconn, err := vtgateconn.Dial(ctx, grpcAddress) | |
diff --git a/go/vt/vtgate/engine/merge_sort.go b/go/vt/vtgate/engine/merge_sort.go | |
index 9c6b67995..070a7fc3e 100644 | |
--- a/go/vt/vtgate/engine/merge_sort.go | |
+++ b/go/vt/vtgate/engine/merge_sort.go | |
@@ -24,6 +24,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/vt/log" | |
@@ -38,7 +39,7 @@ import ( | |
// was pulled out. Since the input streams are sorted the same way that the heap is | |
// sorted, this guarantees that the merged stream will also be sorted the same way. | |
func mergeSort(vcursor VCursor, query string, orderBy []OrderbyParams, rss []*srvtopo.ResolvedShard, bvs []map[string]*querypb.BindVariable, callback func(*sqltypes.Result) error) error { | |
- ctx, cancel := context.WithCancel(vcursor.Context()) | |
+ ctx, cancel := context2.WithCancel(vcursor.Context(), "./go/vt/vtgate/engine/merge_sort.go:41") | |
defer (func() { log.Infof("ctx cancel at go/vt/vtgate/engine/merge_sort.go:42\n"); cancel() })() | |
handles := make([]*streamHandle, len(rss)) | |
diff --git a/go/vt/vtgate/executor_stream_test.go b/go/vt/vtgate/executor_stream_test.go | |
index b3e0159cd..bf5ec26db 100644 | |
--- a/go/vt/vtgate/executor_stream_test.go | |
+++ b/go/vt/vtgate/executor_stream_test.go | |
@@ -21,6 +21,7 @@ import ( | |
"time" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/vt/discovery" | |
"vitess.io/vitess/go/vt/log" | |
@@ -136,7 +137,7 @@ func TestStreamSQLSet(t *testing.T) { | |
func executorStreamMessages(executor *Executor, sql string) (qr *sqltypes.Result, err error) { | |
results := make(chan *sqltypes.Result, 100) | |
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), 10*time.Millisecond, "./go/vt/vtgate/executor_stream_test.go:139") | |
defer (func() { log.Infof("ctx cancel at go/vt/vtgate/executor_stream_test.go:89\n"); cancel() })() | |
err = executor.StreamExecute( | |
ctx, | |
diff --git a/go/vt/vtgate/gateway/gateway.go b/go/vt/vtgate/gateway/gateway.go | |
index d7e0b57bb..60b6053fc 100644 | |
--- a/go/vt/vtgate/gateway/gateway.go | |
+++ b/go/vt/vtgate/gateway/gateway.go | |
@@ -23,6 +23,7 @@ import ( | |
"time" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/flagutil" | |
"vitess.io/vitess/go/vt/log" | |
@@ -107,7 +108,7 @@ func GetCreator() Creator { | |
// just calls it. | |
func WaitForTablets(gw Gateway, tabletTypesToWait []topodatapb.TabletType) error { | |
log.Infof("Gateway waiting for serving tablets...") | |
- ctx, cancel := context.WithTimeout(context.Background(), *initialTabletTimeout) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), *initialTabletTimeout, "./go/vt/vtgate/gateway/gateway.go:110") | |
defer (func() { log.Infof("ctx cancel at go/vt/vtgate/gateway/gateway.go:111\n"); cancel() })() | |
err := gw.WaitForTablets(ctx, tabletTypesToWait) | |
diff --git a/go/vt/vtgate/plugin_mysql_server.go b/go/vt/vtgate/plugin_mysql_server.go | |
index fb97beb10..1a60e6745 100644 | |
--- a/go/vt/vtgate/plugin_mysql_server.go | |
+++ b/go/vt/vtgate/plugin_mysql_server.go | |
@@ -26,6 +26,7 @@ import ( | |
"syscall" | |
"time" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/sqlparser" | |
"vitess.io/vitess/go/vt/vterrors" | |
@@ -89,7 +90,7 @@ func (vh *vtgateHandler) ConnectionClosed(c *mysql.Conn) { | |
var ctx context.Context | |
var cancel context.CancelFunc | |
if *mysqlQueryTimeout != 0 { | |
- ctx, cancel = context.WithTimeout(context.Background(), *mysqlQueryTimeout) | |
+ ctx, cancel = context2.WithTimeout(context.Background(), *mysqlQueryTimeout, "./go/vt/vtgate/plugin_mysql_server.go:92") | |
defer (func() { log.Infof("ctx cancel at go/vt/vtgate/plugin_mysql_server.go:93\n"); cancel() })() | |
} else { | |
ctx = context.Background() | |
@@ -136,7 +137,7 @@ func (vh *vtgateHandler) ComQuery(c *mysql.Conn, query string, callback func(*sq | |
ctx := context.Background() | |
var cancel context.CancelFunc | |
if *mysqlQueryTimeout != 0 { | |
- ctx, cancel = context.WithTimeout(ctx, *mysqlQueryTimeout) | |
+ ctx, cancel = context2.WithTimeout(ctx, *mysqlQueryTimeout, "./go/vt/vtgate/plugin_mysql_server.go:139") | |
defer (func() { log.Infof("ctx cancel at go/vt/vtgate/plugin_mysql_server.go:140\n"); cancel() })() | |
} | |
@@ -203,7 +204,7 @@ func (vh *vtgateHandler) ComPrepare(c *mysql.Conn, query string) ([]*querypb.Fie | |
var ctx context.Context | |
var cancel context.CancelFunc | |
if *mysqlQueryTimeout != 0 { | |
- ctx, cancel = context.WithTimeout(context.Background(), *mysqlQueryTimeout) | |
+ ctx, cancel = context2.WithTimeout(context.Background(), *mysqlQueryTimeout, "./go/vt/vtgate/plugin_mysql_server.go:206") | |
defer (func() { log.Infof("ctx cancel at go/vt/vtgate/plugin_mysql_server.go:207\n"); cancel() })() | |
} else { | |
ctx = context.Background() | |
@@ -262,7 +263,7 @@ func (vh *vtgateHandler) ComStmtExecute(c *mysql.Conn, prepare *mysql.PrepareDat | |
var ctx context.Context | |
var cancel context.CancelFunc | |
if *mysqlQueryTimeout != 0 { | |
- ctx, cancel = context.WithTimeout(context.Background(), *mysqlQueryTimeout) | |
+ ctx, cancel = context2.WithTimeout(context.Background(), *mysqlQueryTimeout, "./go/vt/vtgate/plugin_mysql_server.go:265") | |
defer (func() { log.Infof("ctx cancel at go/vt/vtgate/plugin_mysql_server.go:266\n"); cancel() })() | |
} else { | |
ctx = context.Background() | |
diff --git a/go/vt/vtgate/resolver.go b/go/vt/vtgate/resolver.go | |
index b0f905c87..d1cea3e4c 100644 | |
--- a/go/vt/vtgate/resolver.go | |
+++ b/go/vt/vtgate/resolver.go | |
@@ -27,6 +27,7 @@ import ( | |
"github.com/golang/protobuf/proto" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/vt/key" | |
"vitess.io/vitess/go/vt/log" | |
@@ -368,7 +369,7 @@ func (res *Resolver) UpdateStream(ctx context.Context, keyspace string, shard st | |
// VStream streams events from one target. This function ensures that events of each | |
// transaction are streamed together, along with the corresponding GTID. | |
func (res *Resolver) VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, send func(events []*binlogdatapb.VEvent) error) error { | |
- ctx, cancel := context.WithCancel(ctx) | |
+ ctx, cancel := context2.WithCancel(ctx, "./go/vt/vtgate/resolver.go:371") | |
defer (func() { log.Infof("ctx cancel at go/vt/vtgate/resolver.go:351\n"); cancel() })() | |
// mu protects sending on ch, err and positions. | |
diff --git a/go/vt/vtgate/resolver_test.go b/go/vt/vtgate/resolver_test.go | |
index ffbd93f90..d69461ed1 100644 | |
--- a/go/vt/vtgate/resolver_test.go | |
+++ b/go/vt/vtgate/resolver_test.go | |
@@ -27,6 +27,7 @@ import ( | |
"github.com/golang/protobuf/proto" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/vt/discovery" | |
"vitess.io/vitess/go/vt/key" | |
@@ -637,7 +638,7 @@ func TestResolverVStream(t *testing.T) { | |
}} | |
sbc0.AddVStreamEvents(send2, nil) | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vtgate/resolver_test.go:640") | |
defer (func() { log.Infof("ctx cancel at go/vt/vtgate/resolver_test.go:574\n"); cancel() })() | |
count := 1 | |
@@ -686,7 +687,7 @@ func TestResolverVStreamChunks(t *testing.T) { | |
} | |
sbc1.AddVStreamEvents([]*binlogdatapb.VEvent{{Type: binlogdatapb.VEventType_COMMIT}}, nil) | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vtgate/resolver_test.go:689") | |
defer (func() { log.Infof("ctx cancel at go/vt/vtgate/resolver_test.go:623\n"); cancel() })() | |
rowEncountered := false | |
@@ -762,7 +763,7 @@ func TestResolverVStreamMulti(t *testing.T) { | |
} | |
sbc1.AddVStreamEvents(send1, nil) | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vtgate/resolver_test.go:765") | |
defer (func() { log.Infof("ctx cancel at go/vt/vtgate/resolver_test.go:699\n"); cancel() })() | |
var got *binlogdatapb.VGtid | |
@@ -827,7 +828,7 @@ func TestResolverVStreamRetry(t *testing.T) { | |
sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "cc")) | |
sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "final error")) | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vtgate/resolver_test.go:830") | |
defer (func() { log.Infof("ctx cancel at go/vt/vtgate/resolver_test.go:764\n"); cancel() })() | |
count := 0 | |
@@ -886,7 +887,7 @@ func TestResolverVStreamHeartbeat(t *testing.T) { | |
}} | |
sbc0.AddVStreamEvents(send1, nil) | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vtgate/resolver_test.go:889") | |
defer (func() { log.Infof("ctx cancel at go/vt/vtgate/resolver_test.go:823\n"); cancel() })() | |
vgtid := &binlogdatapb.VGtid{ | |
diff --git a/go/vt/vtgate/scatter_conn.go b/go/vt/vtgate/scatter_conn.go | |
index 612c49a0a..e502910b2 100644 | |
--- a/go/vt/vtgate/scatter_conn.go | |
+++ b/go/vt/vtgate/scatter_conn.go | |
@@ -25,6 +25,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/stats" | |
"vitess.io/vitess/go/vt/concurrency" | |
@@ -547,7 +548,7 @@ func (tt *timeTracker) Record(target *querypb.Target) time.Time { | |
func (stc *ScatterConn) MessageStream(ctx context.Context, rss []*srvtopo.ResolvedShard, name string, callback func(*sqltypes.Result) error) error { | |
// The cancelable context is used for handling errors | |
// from individual streams. | |
- ctx, cancel := context.WithCancel(ctx) | |
+ ctx, cancel := context2.WithCancel(ctx, "./go/vt/vtgate/scatter_conn.go:550") | |
defer (func() { log.Infof("ctx cancel at go/vt/vtgate/scatter_conn.go:551\n"); cancel() })() | |
// mu is used to merge multiple callback calls into one. | |
diff --git a/go/vt/vtgate/vcursor_impl.go b/go/vt/vtgate/vcursor_impl.go | |
index 8da8069b3..13dbec234 100644 | |
--- a/go/vt/vtgate/vcursor_impl.go | |
+++ b/go/vt/vtgate/vcursor_impl.go | |
@@ -22,6 +22,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/vt/key" | |
"vitess.io/vitess/go/vt/sqlparser" | |
@@ -82,7 +83,7 @@ func (vc *vcursorImpl) MaxMemoryRows() int { | |
// SetContextTimeout updates context and sets a timeout. | |
func (vc *vcursorImpl) SetContextTimeout(timeout time.Duration) context.CancelFunc { | |
- ctx, cancel := context.WithTimeout(vc.ctx, timeout) | |
+ ctx, cancel := context2.WithTimeout(vc.ctx, timeout, "./go/vt/vtgate/vcursor_impl.go:85") | |
vc.ctx = ctx | |
return cancel | |
} | |
diff --git a/go/vt/vtgate/vtgate_test.go b/go/vt/vtgate/vtgate_test.go | |
index a2c24b2f7..dd326a989 100644 | |
--- a/go/vt/vtgate/vtgate_test.go | |
+++ b/go/vt/vtgate/vtgate_test.go | |
@@ -29,6 +29,7 @@ import ( | |
"golang.org/x/net/context" | |
"github.com/golang/protobuf/proto" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/vt/discovery" | |
"vitess.io/vitess/go/vt/key" | |
@@ -1225,7 +1226,7 @@ func TestVTGateMessageStreamSharded(t *testing.T) { | |
_ = hcVTGateTest.AddTestTablet("aa", "1.1.1.1", 1002, ks, "20-40", topodatapb.TabletType_MASTER, true, 1, nil) | |
ch := make(chan *sqltypes.Result) | |
done := make(chan struct{}) | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vtgate/vtgate_test.go:1228") | |
go func() { | |
kr := &topodatapb.KeyRange{End: []byte{0x40}} | |
err := rpcVTGate.MessageStream(ctx, ks, "", kr, "msg", func(qr *sqltypes.Result) error { | |
@@ -1271,7 +1272,7 @@ func TestVTGateMessageStreamUnsharded(t *testing.T) { | |
_ = hcVTGateTest.AddTestTablet("aa", "1.1.1.1", 1001, ks, "0", topodatapb.TabletType_MASTER, true, 1, nil) | |
ch := make(chan *sqltypes.Result) | |
done := make(chan struct{}) | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vtgate/vtgate_test.go:1274") | |
go func() { | |
err := rpcVTGate.MessageStream(ctx, ks, "0", nil, "msg", func(qr *sqltypes.Result) error { | |
select { | |
@@ -1307,7 +1308,7 @@ func TestVTGateMessageStreamRetry(t *testing.T) { | |
_ = hcVTGateTest.AddTestTablet("aa", "1.1.1.1", 1001, ks, "0", topodatapb.TabletType_MASTER, true, 1, nil) | |
ch := make(chan *sqltypes.Result) | |
done := make(chan struct{}) | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vtgate/vtgate_test.go:1310") | |
go func() { | |
err := rpcVTGate.MessageStream(ctx, ks, "0", nil, "msg", func(qr *sqltypes.Result) error { | |
select { | |
@@ -1351,7 +1352,7 @@ func TestVTGateMessageStreamUnavailable(t *testing.T) { | |
tablet.MustFailCodes[vtrpcpb.Code_UNAVAILABLE] = 1 | |
ch := make(chan *sqltypes.Result) | |
done := make(chan struct{}) | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vtgate/vtgate_test.go:1354") | |
go func() { | |
err := rpcVTGate.MessageStream(ctx, ks, "0", nil, "msg", func(qr *sqltypes.Result) error { | |
select { | |
diff --git a/go/vt/vtqueryserver/plugin_mysql_server.go b/go/vt/vtqueryserver/plugin_mysql_server.go | |
index aba29684d..a984eaaf9 100644 | |
--- a/go/vt/vtqueryserver/plugin_mysql_server.go | |
+++ b/go/vt/vtqueryserver/plugin_mysql_server.go | |
@@ -25,6 +25,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/mysql" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/vt/callerid" | |
@@ -75,7 +76,7 @@ func (mh *proxyHandler) ConnectionClosed(c *mysql.Conn) { | |
var ctx context.Context | |
var cancel context.CancelFunc | |
if *mysqlQueryTimeout != 0 { | |
- ctx, cancel = context.WithTimeout(context.Background(), *mysqlQueryTimeout) | |
+ ctx, cancel = context2.WithTimeout(context.Background(), *mysqlQueryTimeout, "./go/vt/vtqueryserver/plugin_mysql_server.go:78") | |
defer (func() { log.Infof("ctx cancel at go/vt/vtqueryserver/plugin_mysql_server.go:79\n"); cancel() })() | |
} else { | |
ctx = context.Background() | |
@@ -90,7 +91,7 @@ func (mh *proxyHandler) ComQuery(c *mysql.Conn, query string, callback func(*sql | |
var ctx context.Context | |
var cancel context.CancelFunc | |
if *mysqlQueryTimeout != 0 { | |
- ctx, cancel = context.WithTimeout(context.Background(), *mysqlQueryTimeout) | |
+ ctx, cancel = context2.WithTimeout(context.Background(), *mysqlQueryTimeout, "./go/vt/vtqueryserver/plugin_mysql_server.go:93") | |
defer (func() { log.Infof("ctx cancel at go/vt/vtqueryserver/plugin_mysql_server.go:94\n"); cancel() })() | |
} else { | |
ctx = context.Background() | |
diff --git a/go/vt/vttablet/agentrpctest/test_agent_rpc.go b/go/vt/vttablet/agentrpctest/test_agent_rpc.go | |
index 692029033..2fc66c4c5 100644 | |
--- a/go/vt/vttablet/agentrpctest/test_agent_rpc.go | |
+++ b/go/vt/vttablet/agentrpctest/test_agent_rpc.go | |
@@ -28,6 +28,7 @@ import ( | |
"golang.org/x/net/context" | |
"github.com/golang/protobuf/proto" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/vt/hook" | |
"vitess.io/vitess/go/vt/log" | |
@@ -202,7 +203,7 @@ func agentRPCTestPingPanic(ctx context.Context, t *testing.T, client tmclient.Ta | |
// RPCs failed due to an expired context before .Dial(). | |
func agentRPCTestDialExpiredContext(ctx context.Context, t *testing.T, client tmclient.TabletManagerClient, tablet *topodatapb.Tablet) { | |
// Using a timeout of 0 here such that .Dial() will fail immediately. | |
- expiredCtx, cancel := context.WithTimeout(ctx, 0) | |
+ expiredCtx, cancel := context2.WithTimeout(ctx, 0, "./go/vt/vttablet/agentrpctest/test_agent_rpc.go:205") | |
defer (func() { log.Infof("ctx cancel at go/vt/vttablet/agentrpctest/test_agent_rpc.go:205\n"); cancel() })() | |
err := client.Ping(expiredCtx, tablet) | |
if err == nil { | |
@@ -228,7 +229,7 @@ func agentRPCTestRPCTimeout(ctx context.Context, t *testing.T, client tmclient.T | |
// NOTE: This might still race e.g. when test execution takes too long the | |
// context will be expired in dial() already. In such cases coverage | |
// will be reduced but the test will not flake. | |
- shortCtx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, 10*time.Millisecond, "./go/vt/vttablet/agentrpctest/test_agent_rpc.go:231") | |
defer (func() { log.Infof("ctx cancel at go/vt/vttablet/agentrpctest/test_agent_rpc.go:231\n"); cancel() })() | |
fakeAgent.setSlow(true) | |
defer func() { fakeAgent.setSlow(false) }() | |
diff --git a/go/vt/vttablet/endtoend/misc_test.go b/go/vt/vttablet/endtoend/misc_test.go | |
index 0b9c50d38..3c7a82fbf 100644 | |
--- a/go/vt/vttablet/endtoend/misc_test.go | |
+++ b/go/vt/vttablet/endtoend/misc_test.go | |
@@ -29,6 +29,7 @@ import ( | |
"github.com/golang/protobuf/proto" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/mysql" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/vt/callerid" | |
@@ -467,7 +468,7 @@ func TestStreamHealth_Expired(t *testing.T) { | |
var health *querypb.StreamHealthResponse | |
framework.Server.BroadcastHealth(0, nil, time.Millisecond) | |
time.Sleep(5 * time.Millisecond) | |
- ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), time.Millisecond*100, "./go/vt/vttablet/endtoend/misc_test.go:470") | |
defer (func() { log.Infof("ctx cancel at go/vt/vttablet/endtoend/misc_test.go:471\n"); cancel() })() | |
if err := framework.Server.StreamHealth(ctx, func(shr *querypb.StreamHealthResponse) error { | |
health = shr | |
diff --git a/go/vt/vttablet/grpctabletconn/conn.go b/go/vt/vttablet/grpctabletconn/conn.go | |
index edd48b47f..e1ac83a08 100644 | |
--- a/go/vt/vttablet/grpctabletconn/conn.go | |
+++ b/go/vt/vttablet/grpctabletconn/conn.go | |
@@ -23,6 +23,7 @@ import ( | |
"golang.org/x/net/context" | |
"google.golang.org/grpc" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/netutil" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/vt/callerid" | |
@@ -149,7 +150,7 @@ func (conn *gRPCQueryClient) StreamExecute(ctx context.Context, target *querypb. | |
// no direct API to end a stream from the client side. If callback | |
// returns an error, we return from the function. The deferred | |
// cancel will then cause the stream to be terminated. | |
- ctx, cancel := context.WithCancel(ctx) | |
+ ctx, cancel := context2.WithCancel(ctx, "./go/vt/vttablet/grpctabletconn/conn.go:152") | |
defer (func() { log.Infof("ctx cancel at go/vt/vttablet/grpctabletconn/conn.go:153\n"); cancel() })() | |
stream, err := func() (queryservicepb.Query_StreamExecuteClient, error) { | |
@@ -494,7 +495,7 @@ func (conn *gRPCQueryClient) BeginExecuteBatch(ctx context.Context, target *quer | |
// MessageStream streams messages. | |
func (conn *gRPCQueryClient) MessageStream(ctx context.Context, target *querypb.Target, name string, callback func(*sqltypes.Result) error) error { | |
// Please see comments in StreamExecute to see how this works. | |
- ctx, cancel := context.WithCancel(ctx) | |
+ ctx, cancel := context2.WithCancel(ctx, "./go/vt/vttablet/grpctabletconn/conn.go:497") | |
defer (func() { log.Infof("ctx cancel at go/vt/vttablet/grpctabletconn/conn.go:498\n"); cancel() })() | |
stream, err := func() (queryservicepb.Query_MessageStreamClient, error) { | |
@@ -595,7 +596,7 @@ func (conn *gRPCQueryClient) SplitQuery( | |
// StreamHealth starts a streaming RPC for VTTablet health status updates. | |
func (conn *gRPCQueryClient) StreamHealth(ctx context.Context, callback func(*querypb.StreamHealthResponse) error) error { | |
// Please see comments in StreamExecute to see how this works. | |
- ctx, cancel := context.WithCancel(ctx) | |
+ ctx, cancel := context2.WithCancel(ctx, "./go/vt/vttablet/grpctabletconn/conn.go:598") | |
defer (func() { log.Infof("ctx cancel at go/vt/vttablet/grpctabletconn/conn.go:599\n"); cancel() })() | |
stream, err := func() (queryservicepb.Query_StreamHealthClient, error) { | |
@@ -631,7 +632,7 @@ func (conn *gRPCQueryClient) StreamHealth(ctx context.Context, callback func(*qu | |
// UpdateStream starts a streaming query to VTTablet. | |
func (conn *gRPCQueryClient) UpdateStream(ctx context.Context, target *querypb.Target, position string, timestamp int64, callback func(*querypb.StreamEvent) error) error { | |
// Please see comments in StreamExecute to see how this works. | |
- ctx, cancel := context.WithCancel(ctx) | |
+ ctx, cancel := context2.WithCancel(ctx, "./go/vt/vttablet/grpctabletconn/conn.go:634") | |
defer (func() { log.Infof("ctx cancel at go/vt/vttablet/grpctabletconn/conn.go:635\n"); cancel() })() | |
stream, err := func() (queryservicepb.Query_UpdateStreamClient, error) { | |
diff --git a/go/vt/vttablet/heartbeat/reader.go b/go/vt/vttablet/heartbeat/reader.go | |
index 33e480c87..793c0220f 100644 | |
--- a/go/vt/vttablet/heartbeat/reader.go | |
+++ b/go/vt/vttablet/heartbeat/reader.go | |
@@ -21,6 +21,7 @@ import ( | |
"sync" | |
"time" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/vterrors" | |
"golang.org/x/net/context" | |
@@ -149,7 +150,7 @@ func (r *Reader) GetLatest() (time.Duration, error) { | |
func (r *Reader) readHeartbeat() { | |
defer tabletenv.LogError() | |
- ctx, cancel := context.WithDeadline(context.Background(), r.now().Add(r.interval)) | |
+ ctx, cancel := context2.WithDeadline(context.Background(), r.now().Add(r.interval), "./go/vt/vttablet/heartbeat/reader.go:152") | |
defer (func() { log.Infof("ctx cancel at go/vt/vttablet/heartbeat/reader.go:153\n"); cancel() })() | |
res, err := r.fetchMostRecentHeartbeat(ctx) | |
diff --git a/go/vt/vttablet/heartbeat/writer.go b/go/vt/vttablet/heartbeat/writer.go | |
index 31dced89b..b92df96df 100644 | |
--- a/go/vt/vttablet/heartbeat/writer.go | |
+++ b/go/vt/vttablet/heartbeat/writer.go | |
@@ -21,6 +21,7 @@ import ( | |
"sync" | |
"time" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/vterrors" | |
"golang.org/x/net/context" | |
@@ -203,7 +204,7 @@ func (w *Writer) bindHeartbeatVars(query string) (string, error) { | |
// writeHeartbeat updates the heartbeat row for this tablet with the current time in nanoseconds. | |
func (w *Writer) writeHeartbeat() { | |
defer tabletenv.LogError() | |
- ctx, cancel := context.WithDeadline(context.Background(), w.now().Add(w.interval)) | |
+ ctx, cancel := context2.WithDeadline(context.Background(), w.now().Add(w.interval), "./go/vt/vttablet/heartbeat/writer.go:206") | |
defer (func() { log.Infof("ctx cancel at go/vt/vttablet/heartbeat/writer.go:207\n"); cancel() })() | |
update, err := w.bindHeartbeatVars(sqlUpdateHeartbeat) | |
if err != nil { | |
diff --git a/go/vt/vttablet/tabletmanager/action_agent.go b/go/vt/vttablet/tabletmanager/action_agent.go | |
index 0cbf2b87f..797bd3d92 100644 | |
--- a/go/vt/vttablet/tabletmanager/action_agent.go | |
+++ b/go/vt/vttablet/tabletmanager/action_agent.go | |
@@ -44,6 +44,7 @@ import ( | |
"sync" | |
"time" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/vterrors" | |
"golang.org/x/net/context" | |
@@ -746,7 +747,7 @@ func (agent *ActionAgent) checkTabletMysqlPort(ctx context.Context, tablet *topo | |
// Update the port in the topology. Use a shorter timeout, so if | |
// the topo server is busy / throttling us, we don't hang forever here. | |
// The healthcheck go routine will try again next time. | |
- ctx, cancel := context.WithTimeout(ctx, 10*time.Second) | |
+ ctx, cancel := context2.WithTimeout(ctx, 10*time.Second, "./go/vt/vttablet/tabletmanager/action_agent.go:749") | |
defer (func() { log.Infof("ctx cancel at go/vt/vttablet/tabletmanager/action_agent.go:750\n"); cancel() })() | |
if !agent.waitingForMysql { | |
log.Warningf("MySQL port has changed from %v to %v, updating it in tablet record", topoproto.MysqlPort(tablet), mport) | |
diff --git a/go/vt/vttablet/tabletmanager/init_tablet.go b/go/vt/vttablet/tabletmanager/init_tablet.go | |
index 91fe8f560..741dc382e 100644 | |
--- a/go/vt/vttablet/tabletmanager/init_tablet.go | |
+++ b/go/vt/vttablet/tabletmanager/init_tablet.go | |
@@ -24,8 +24,7 @@ import ( | |
"fmt" | |
"time" | |
- "golang.org/x/net/context" | |
- | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/flagutil" | |
"vitess.io/vitess/go/netutil" | |
"vitess.io/vitess/go/vt/log" | |
@@ -84,7 +83,7 @@ func (agent *ActionAgent) InitTablet(port, gRPCPort int32) error { | |
// Create a context for this whole operation. Note we will | |
// retry some actions upon failure up to this context expires. | |
- ctx, cancel := context.WithTimeout(agent.batchCtx, *initTimeout) | |
+ ctx, cancel := context2.WithTimeout(agent.batchCtx, *initTimeout, "./go/vt/vttablet/tabletmanager/init_tablet.go:87") | |
defer (func() { log.Infof("ctx cancel at go/vt/vttablet/tabletmanager/init_tablet.go:88\n"); cancel() })() | |
// Read the shard, create it if necessary. | |
diff --git a/go/vt/vttablet/tabletmanager/replication_reporter.go b/go/vt/vttablet/tabletmanager/replication_reporter.go | |
index 4ec3b1241..5f3ca9ecf 100644 | |
--- a/go/vt/vttablet/tabletmanager/replication_reporter.go | |
+++ b/go/vt/vttablet/tabletmanager/replication_reporter.go | |
@@ -22,6 +22,7 @@ import ( | |
"html/template" | |
"time" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/vterrors" | |
"golang.org/x/net/context" | |
@@ -68,7 +69,7 @@ func (r *replicationReporter) Report(isSlaveType, shouldQueryServiceBeRunning bo | |
log.Infof("Slave is stopped. Running with --disable_active_reparents so will not try to reconnect to master...") | |
} else { | |
log.Infof("Slave is stopped. Trying to reconnect to master...") | |
- ctx, cancel := context.WithTimeout(r.agent.batchCtx, 5*time.Second) | |
+ ctx, cancel := context2.WithTimeout(r.agent.batchCtx, 5*time.Second, "./go/vt/vttablet/tabletmanager/replication_reporter.go:71") | |
if err := repairReplication(ctx, r.agent); err != nil { | |
log.Infof("Failed to reconnect to master: %v", err) | |
} | |
diff --git a/go/vt/vttablet/tabletmanager/rpc_external_reparent.go b/go/vt/vttablet/tabletmanager/rpc_external_reparent.go | |
index cb6245b48..770b8f745 100644 | |
--- a/go/vt/vttablet/tabletmanager/rpc_external_reparent.go | |
+++ b/go/vt/vttablet/tabletmanager/rpc_external_reparent.go | |
@@ -23,6 +23,7 @@ import ( | |
"github.com/golang/protobuf/proto" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/event" | |
"vitess.io/vitess/go/stats" | |
"vitess.io/vitess/go/trace" | |
@@ -113,7 +114,7 @@ func (agent *ActionAgent) TabletExternallyReparented(ctx context.Context, extern | |
agent.updateState(ctx, newTablet, "fastTabletExternallyReparented") | |
} | |
// Start the finalize stage with a background context, but connect the trace. | |
- bgCtx, cancel := context.WithTimeout(agent.batchCtx, *finalizeReparentTimeout) | |
+ bgCtx, cancel := context2.WithTimeout(agent.batchCtx, *finalizeReparentTimeout, "./go/vt/vttablet/tabletmanager/rpc_external_reparent.go:116") | |
bgCtx = trace.CopySpan(bgCtx, ctx) | |
agent.finalizeReparentCtx = bgCtx | |
go func() { | |
diff --git a/go/vt/vttablet/tabletmanager/rpc_replication.go b/go/vt/vttablet/tabletmanager/rpc_replication.go | |
index 41a4501cb..a3d945d9b 100644 | |
--- a/go/vt/vttablet/tabletmanager/rpc_replication.go | |
+++ b/go/vt/vttablet/tabletmanager/rpc_replication.go | |
@@ -21,6 +21,7 @@ import ( | |
"fmt" | |
"time" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/vterrors" | |
"golang.org/x/net/context" | |
@@ -103,7 +104,7 @@ func (agent *ActionAgent) StopSlaveMinimum(ctx context.Context, position string, | |
if err != nil { | |
return "", err | |
} | |
- waitCtx, cancel := context.WithTimeout(ctx, waitTime) | |
+ waitCtx, cancel := context2.WithTimeout(ctx, waitTime, "./go/vt/vttablet/tabletmanager/rpc_replication.go:106") | |
defer (func() { log.Infof("ctx cancel at go/vt/vttablet/tabletmanager/rpc_replication.go:107\n"); cancel() })() | |
if err := agent.MysqlDaemon.WaitMasterPos(waitCtx, pos); err != nil { | |
return "", err | |
@@ -153,7 +154,7 @@ func (agent *ActionAgent) StartSlaveUntilAfter(ctx context.Context, position str | |
} | |
defer agent.unlock() | |
- waitCtx, cancel := context.WithTimeout(ctx, waitTime) | |
+ waitCtx, cancel := context2.WithTimeout(ctx, waitTime, "./go/vt/vttablet/tabletmanager/rpc_replication.go:156") | |
defer (func() { log.Infof("ctx cancel at go/vt/vttablet/tabletmanager/rpc_replication.go:157\n"); cancel() })() | |
pos, err := mysql.DecodePosition(position) | |
diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go | |
index 93e334ce3..e1b151af0 100644 | |
--- a/go/vt/vttablet/tabletmanager/vreplication/controller.go | |
+++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go | |
@@ -22,6 +22,7 @@ import ( | |
"strconv" | |
"time" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/vterrors" | |
"github.com/golang/protobuf/proto" | |
@@ -106,7 +107,7 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor | |
ct.tabletPicker = tp | |
// cancel | |
- ctx, ct.cancel = context.WithCancel(ctx) | |
+ ctx, ct.cancel = context2.WithCancel(ctx, "./go/vt/vttablet/tabletmanager/vreplication/controller.go:109") | |
go ct.run(ctx) | |
diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller_test.go b/go/vt/vttablet/tabletmanager/vreplication/controller_test.go | |
index f330985c4..acecde60b 100644 | |
--- a/go/vt/vttablet/tabletmanager/vreplication/controller_test.go | |
+++ b/go/vt/vttablet/tabletmanager/vreplication/controller_test.go | |
@@ -24,6 +24,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/vt/binlog/binlogplayer" | |
"vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon" | |
@@ -225,7 +226,7 @@ func TestControllerCanceledContext(t *testing.T) { | |
"source": fmt.Sprintf(`keyspace:"%s" shard:"0" key_range:<end:"\200" > `, env.KeyspaceName), | |
} | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletmanager/vreplication/controller_test.go:228") | |
cancel() | |
ct, err := newController(ctx, params, nil, nil, env.TopoServ, env.Cells[0], "rdonly", nil) | |
if err != nil { | |
diff --git a/go/vt/vttablet/tabletmanager/vreplication/engine.go b/go/vt/vttablet/tabletmanager/vreplication/engine.go | |
index ec999ccc0..b75faf5a0 100644 | |
--- a/go/vt/vttablet/tabletmanager/vreplication/engine.go | |
+++ b/go/vt/vttablet/tabletmanager/vreplication/engine.go | |
@@ -24,6 +24,7 @@ import ( | |
"time" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/mysql" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/vt/binlog/binlogplayer" | |
@@ -110,7 +111,7 @@ func (vre *Engine) Open(ctx context.Context) error { | |
return nil | |
} | |
- vre.ctx, vre.cancel = context.WithCancel(ctx) | |
+ vre.ctx, vre.cancel = context2.WithCancel(ctx, "./go/vt/vttablet/tabletmanager/vreplication/engine.go:113") | |
vre.isOpen = true | |
if err := vre.initAll(); err != nil { | |
go vre.Close() | |
diff --git a/go/vt/vttablet/tabletmanager/vreplication/engine_test.go b/go/vt/vttablet/tabletmanager/vreplication/engine_test.go | |
index d16ca35cc..11cb9b670 100644 | |
--- a/go/vt/vttablet/tabletmanager/vreplication/engine_test.go | |
+++ b/go/vt/vttablet/tabletmanager/vreplication/engine_test.go | |
@@ -23,6 +23,7 @@ import ( | |
"time" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/mysql" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/vt/binlog/binlogplayer" | |
@@ -398,7 +399,7 @@ func TestWaitForPosCancel(t *testing.T) { | |
sqltypes.NewVarBinary("Running"), | |
sqltypes.NewVarBinary(""), | |
}}}, nil) | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletmanager/vreplication/engine_test.go:401") | |
cancel() | |
err := vre.WaitForPos(ctx, 1, "MariaDB/0-1-1084") | |
if err == nil || err != context.Canceled { | |
diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier.go | |
index 4688d7412..e44bcf19b 100644 | |
--- a/go/vt/vttablet/tabletmanager/vreplication/vcopier.go | |
+++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier.go | |
@@ -27,6 +27,7 @@ import ( | |
"github.com/golang/protobuf/proto" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/mysql" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/vt/binlog/binlogplayer" | |
@@ -115,7 +116,7 @@ func (vc *vcopier) copyNext(ctx context.Context, settings binlogplayer.VRSetting | |
} | |
func (vc *vcopier) catchup(ctx context.Context, copyState map[string]*sqltypes.Result) error { | |
- ctx, cancel := context.WithCancel(ctx) | |
+ ctx, cancel := context2.WithCancel(ctx, "./go/vt/vttablet/tabletmanager/vreplication/vcopier.go:118") | |
defer (func() { | |
log.Infof("ctx cancel at go/vt/vttablet/tabletmanager/vreplication/vcopier.go:119\n") | |
cancel() | |
@@ -186,7 +187,7 @@ func (vc *vcopier) copyTable(ctx context.Context, tableName string, copyState ma | |
} | |
defer vsClient.Close(ctx) | |
- ctx, cancel := context.WithTimeout(ctx, copyTimeout) | |
+ ctx, cancel := context2.WithTimeout(ctx, copyTimeout, "./go/vt/vttablet/tabletmanager/vreplication/vcopier.go:189") | |
defer (func() { | |
log.Infof("ctx cancel at go/vt/vttablet/tabletmanager/vreplication/vcopier.go:187\n") | |
cancel() | |
diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go | |
index 790143520..75d7298ff 100644 | |
--- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go | |
+++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go | |
@@ -24,6 +24,7 @@ import ( | |
"time" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/mysql" | |
"vitess.io/vitess/go/sqltypes" | |
@@ -113,7 +114,7 @@ func (vp *vplayer) fetchAndApply(ctx context.Context) error { | |
return fmt.Errorf("error dialing tablet: %v", err) | |
} | |
defer vsClient.Close(ctx) | |
- ctx, cancel := context.WithCancel(ctx) | |
+ ctx, cancel := context2.WithCancel(ctx, "./go/vt/vttablet/tabletmanager/vreplication/vplayer.go:116") | |
defer (func() { | |
log.Infof("ctx cancel at go/vt/vttablet/tabletmanager/vreplication/vplayer.go:117\n") | |
cancel() | |
diff --git a/go/vt/vttablet/tabletserver/connpool/dbconn_test.go b/go/vt/vttablet/tabletserver/connpool/dbconn_test.go | |
index ebde54578..e9cfbb62a 100644 | |
--- a/go/vt/vttablet/tabletserver/connpool/dbconn_test.go | |
+++ b/go/vt/vttablet/tabletserver/connpool/dbconn_test.go | |
@@ -26,6 +26,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/mysql" | |
"vitess.io/vitess/go/mysql/fakesqldb" | |
"vitess.io/vitess/go/sqltypes" | |
@@ -63,7 +64,7 @@ func TestDBConnExec(t *testing.T) { | |
connPool := newPool() | |
connPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams()) | |
defer connPool.Close() | |
- ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(10*time.Second)) | |
+ ctx, cancel := context2.WithDeadline(context.Background(), time.Now().Add(10*time.Second), "./go/vt/vttablet/tabletserver/connpool/dbconn_test.go:66") | |
defer (func() { log.Infof("ctx cancel at go/vt/vttablet/tabletserver/connpool/dbconn_test.go:66\n"); cancel() })() | |
dbConn, err := NewDBConn(connPool, db.ConnParams()) | |
if dbConn != nil { | |
@@ -141,7 +142,7 @@ func TestDBConnDeadline(t *testing.T) { | |
defer connPool.Close() | |
db.SetConnDelay(100 * time.Millisecond) | |
- ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(50*time.Millisecond)) | |
+ ctx, cancel := context2.WithDeadline(context.Background(), time.Now().Add(50*time.Millisecond), "./go/vt/vttablet/tabletserver/connpool/dbconn_test.go:144") | |
defer (func() { | |
log.Infof("ctx cancel at go/vt/vttablet/tabletserver/connpool/dbconn_test.go:144\n") | |
cancel() | |
@@ -167,7 +168,7 @@ func TestDBConnDeadline(t *testing.T) { | |
startCounts = tabletenv.MySQLStats.Counts() | |
- ctx, cancel = context.WithDeadline(context.Background(), time.Now().Add(10*time.Second)) | |
+ ctx, cancel = context2.WithDeadline(context.Background(), time.Now().Add(10*time.Second), "./go/vt/vttablet/tabletserver/connpool/dbconn_test.go:170") | |
defer (func() { | |
log.Infof("ctx cancel at go/vt/vttablet/tabletserver/connpool/dbconn_test.go:167\n") | |
cancel() | |
@@ -307,7 +308,7 @@ func TestDBConnStream(t *testing.T) { | |
connPool := newPool() | |
connPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams()) | |
defer connPool.Close() | |
- ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(10*time.Second)) | |
+ ctx, cancel := context2.WithDeadline(context.Background(), time.Now().Add(10*time.Second), "./go/vt/vttablet/tabletserver/connpool/dbconn_test.go:310") | |
defer (func() { | |
log.Infof("ctx cancel at go/vt/vttablet/tabletserver/connpool/dbconn_test.go:304\n") | |
cancel() | |
diff --git a/go/vt/vttablet/tabletserver/messager/message_manager.go b/go/vt/vttablet/tabletserver/messager/message_manager.go | |
index 9c531d25b..4992c4939 100644 | |
--- a/go/vt/vttablet/tabletserver/messager/message_manager.go | |
+++ b/go/vt/vttablet/tabletserver/messager/message_manager.go | |
@@ -23,6 +23,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/stats" | |
"vitess.io/vitess/go/sync2" | |
@@ -56,7 +57,7 @@ type messageReceiver struct { | |
} | |
func newMessageReceiver(ctx context.Context, send func(*sqltypes.Result) error) (*messageReceiver, <-chan struct{}) { | |
- ctx, cancel := context.WithCancel(ctx) | |
+ ctx, cancel := context2.WithCancel(ctx, "./go/vt/vttablet/tabletserver/messager/message_manager.go:59") | |
rcv := &messageReceiver{ | |
ctx: ctx, | |
errChan: make(chan error, 1), | |
@@ -534,7 +535,7 @@ func (mm *messageManager) postpone(tsv TabletService, name string, ackWaitTime t | |
return | |
} | |
defer mm.postponeSema.Release() | |
- ctx, cancel := context.WithTimeout(tabletenv.LocalContext(), ackWaitTime) | |
+ ctx, cancel := context2.WithTimeout(tabletenv.LocalContext(), ackWaitTime, "./go/vt/vttablet/tabletserver/messager/message_manager.go:537") | |
defer (func() { | |
log.Infof("ctx cancel at go/vt/vttablet/tabletserver/messager/message_manager.go:539\n") | |
cancel() | |
@@ -546,7 +547,7 @@ func (mm *messageManager) postpone(tsv TabletService, name string, ackWaitTime t | |
} | |
func (mm *messageManager) runPoller() { | |
- ctx, cancel := context.WithTimeout(tabletenv.LocalContext(), mm.pollerTicks.Interval()) | |
+ ctx, cancel := context2.WithTimeout(tabletenv.LocalContext(), mm.pollerTicks.Interval(), "./go/vt/vttablet/tabletserver/messager/message_manager.go:549") | |
defer func() { | |
tabletenv.LogError() | |
cancel() | |
@@ -616,7 +617,7 @@ func (mm *messageManager) runPurge() { | |
// purge is a non-member because it should be called asynchronously and should | |
// not rely on members of messageManager. | |
func purge(tsv TabletService, name string, purgeAfter, purgeInterval time.Duration) { | |
- ctx, cancel := context.WithTimeout(tabletenv.LocalContext(), purgeInterval) | |
+ ctx, cancel := context2.WithTimeout(tabletenv.LocalContext(), purgeInterval, "./go/vt/vttablet/tabletserver/messager/message_manager.go:619") | |
defer func() { | |
tabletenv.LogError() | |
cancel() | |
diff --git a/go/vt/vttablet/tabletserver/messager/message_manager_test.go b/go/vt/vttablet/tabletserver/messager/message_manager_test.go | |
index aa43af69e..6fd142db7 100644 | |
--- a/go/vt/vttablet/tabletserver/messager/message_manager_test.go | |
+++ b/go/vt/vttablet/tabletserver/messager/message_manager_test.go | |
@@ -26,6 +26,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/mysql/fakesqldb" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/sync2" | |
@@ -106,7 +107,7 @@ func TestReceiverCancel(t *testing.T) { | |
mm.Open() | |
defer mm.Close() | |
r1 := newTestReceiver(0) | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletserver/messager/message_manager_test.go:109") | |
_ = mm.Subscribe(ctx, r1.rcv) | |
cancel() | |
// r1 should eventually be unsubscribed. | |
@@ -232,7 +233,7 @@ func TestMessageManagerSend(t *testing.T) { | |
// Test that mm stops sending to a canceled receiver. | |
r2 := newTestReceiver(1) | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletserver/messager/message_manager_test.go:235") | |
mm.Subscribe(ctx, r2.rcv) | |
<-r2.ch | |
mm.Add(&MessageRow{Row: []sqltypes.Value{sqltypes.NewVarBinary("2")}}) | |
@@ -343,7 +344,7 @@ func TestMessageManagerSendEOF(t *testing.T) { | |
mm.Open() | |
defer mm.Close() | |
r1 := newTestReceiver(0) | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletserver/messager/message_manager_test.go:346") | |
mm.Subscribe(ctx, r1.rcv) | |
// Pull field info. | |
<-r1.ch | |
@@ -514,7 +515,7 @@ func TestMessageManagerPoller(t *testing.T) { | |
mm.Open() | |
defer mm.Close() | |
r1 := newTestReceiver(1) | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletserver/messager/message_manager_test.go:517") | |
mm.Subscribe(ctx, r1.rcv) | |
<-r1.ch | |
mm.pollerTicks.Trigger() | |
diff --git a/go/vt/vttablet/tabletserver/query_engine.go b/go/vt/vttablet/tabletserver/query_engine.go | |
index 27bcacc18..c71ed8b1f 100644 | |
--- a/go/vt/vttablet/tabletserver/query_engine.go | |
+++ b/go/vt/vttablet/tabletserver/query_engine.go | |
@@ -28,6 +28,7 @@ import ( | |
"vitess.io/vitess/go/acl" | |
"vitess.io/vitess/go/cache" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/mysql" | |
"vitess.io/vitess/go/stats" | |
"vitess.io/vitess/go/streamlog" | |
@@ -395,7 +396,7 @@ func (qe *QueryEngine) getQueryConn(ctx context.Context) (*connpool.DBConn, erro | |
timeout := qe.connTimeout.Get() | |
if timeout != 0 { | |
- ctxTimeout, cancel := context.WithTimeout(ctx, timeout) | |
+ ctxTimeout, cancel := context2.WithTimeout(ctx, timeout, "./go/vt/vttablet/tabletserver/query_engine.go:398") | |
defer (func() { log.Infof("ctx cancel at go/vt/vttablet/tabletserver/query_engine.go:387\n"); cancel() })() | |
conn, err := qe.conns.Get(ctxTimeout) | |
if err != nil { | |
diff --git a/go/vt/vttablet/tabletserver/replication_watcher.go b/go/vt/vttablet/tabletserver/replication_watcher.go | |
index 67f8b818c..84f018811 100644 | |
--- a/go/vt/vttablet/tabletserver/replication_watcher.go | |
+++ b/go/vt/vttablet/tabletserver/replication_watcher.go | |
@@ -22,6 +22,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/mysql" | |
"vitess.io/vitess/go/stats" | |
"vitess.io/vitess/go/vt/binlog" | |
@@ -91,7 +92,7 @@ func (rpw *ReplicationWatcher) Open() { | |
if rpw.isOpen || !rpw.watchReplication { | |
return | |
} | |
- ctx, cancel := context.WithCancel(tabletenv.LocalContext()) | |
+ ctx, cancel := context2.WithCancel(tabletenv.LocalContext(), "./go/vt/vttablet/tabletserver/replication_watcher.go:94") | |
rpw.cancel = cancel | |
rpw.wg.Add(1) | |
go rpw.Process(ctx, rpw.dbconfigs) | |
diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go | |
index e3c7a540b..126fe00b6 100644 | |
--- a/go/vt/vttablet/tabletserver/tabletserver.go | |
+++ b/go/vt/vttablet/tabletserver/tabletserver.go | |
@@ -31,6 +31,7 @@ import ( | |
"golang.org/x/net/context" | |
"vitess.io/vitess/go/acl" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/history" | |
"vitess.io/vitess/go/mysql" | |
"vitess.io/vitess/go/sqltypes" | |
@@ -2017,7 +2018,7 @@ func (tsv *TabletServer) UpdateStream(ctx context.Context, target *querypb.Targe | |
s := binlog.NewEventStreamer(tsv.dbconfigs.DbaWithDB(), tsv.se, p, timestamp, callback) | |
// Create a cancelable wrapping context. | |
- streamCtx, streamCancel := context.WithCancel(ctx) | |
+ streamCtx, streamCancel := context2.WithCancel(ctx, "./go/vt/vttablet/tabletserver/tabletserver.go:2020") | |
i := tsv.updateStreamList.Add(streamCancel) | |
defer tsv.updateStreamList.Delete(i) | |
@@ -2319,7 +2320,7 @@ func withTimeout(ctx context.Context, timeout time.Duration, options *querypb.Ex | |
if timeout == 0 || options.GetWorkload() == querypb.ExecuteOptions_DBA || tabletenv.IsLocalContext(ctx) { | |
return ctx, func() {} | |
} | |
- return context.WithTimeout(ctx, timeout) | |
+ return context2.WithTimeout(ctx, timeout, "./go/vt/vttablet/tabletserver/tabletserver.go:2322") | |
} | |
// skipQueryPlanCache returns true if the query plan should be cached | |
diff --git a/go/vt/vttablet/tabletserver/tabletserver_test.go b/go/vt/vttablet/tabletserver/tabletserver_test.go | |
index dcbced312..c2a1fa46c 100644 | |
--- a/go/vt/vttablet/tabletserver/tabletserver_test.go | |
+++ b/go/vt/vttablet/tabletserver/tabletserver_test.go | |
@@ -32,6 +32,7 @@ import ( | |
"github.com/golang/protobuf/proto" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/mysql" | |
"vitess.io/vitess/go/mysql/fakesqldb" | |
"vitess.io/vitess/go/sqltypes" | |
@@ -958,7 +959,7 @@ func TestTabletServerBeginFail(t *testing.T) { | |
t.Fatalf("StartService failed: %v", err) | |
} | |
defer tsv.StopService() | |
- ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), 1*time.Nanosecond, "./go/vt/vttablet/tabletserver/tabletserver_test.go:961") | |
defer (func() { log.Infof("ctx cancel at go/vt/vttablet/tabletserver/tabletserver_test.go:962\n"); cancel() })() | |
tsv.Begin(ctx, &target, nil) | |
_, err = tsv.Begin(ctx, &target, nil) | |
@@ -2092,7 +2093,7 @@ func TestSerializeTransactionsSameRow_RequestCanceled(t *testing.T) { | |
}() | |
// tx2. | |
- ctxTx2, cancelTx2 := context.WithCancel(ctx) | |
+ ctxTx2, cancelTx2 := context2.WithCancel(ctx, "./go/vt/vttablet/tabletserver/tabletserver_test.go:2095") | |
wg.Add(1) | |
go func() { | |
defer wg.Done() | |
diff --git a/go/vt/vttablet/tabletserver/tx_engine.go b/go/vt/vttablet/tabletserver/tx_engine.go | |
index 9a485f028..f4cf91829 100644 | |
--- a/go/vt/vttablet/tabletserver/tx_engine.go | |
+++ b/go/vt/vttablet/tabletserver/tx_engine.go | |
@@ -23,6 +23,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/timer" | |
"vitess.io/vitess/go/trace" | |
"vitess.io/vitess/go/vt/concurrency" | |
@@ -541,7 +542,7 @@ func (te *TxEngine) rollbackPrepared() { | |
// transactions and calls the notifier on them. | |
func (te *TxEngine) startWatchdog() { | |
te.ticks.Start(func() { | |
- ctx, cancel := context.WithTimeout(tabletenv.LocalContext(), te.abandonAge/4) | |
+ ctx, cancel := context2.WithTimeout(tabletenv.LocalContext(), te.abandonAge/4, "./go/vt/vttablet/tabletserver/tx_engine.go:544") | |
defer (func() { log.Infof("ctx cancel at go/vt/vttablet/tabletserver/tx_engine.go:545\n"); cancel() })() | |
// Raise alerts on prepares that have been unresolved for too long. | |
diff --git a/go/vt/vttablet/tabletserver/txserializer/tx_serializer_test.go b/go/vt/vttablet/tabletserver/txserializer/tx_serializer_test.go | |
index bc64e396a..2117fb957 100644 | |
--- a/go/vt/vttablet/tabletserver/txserializer/tx_serializer_test.go | |
+++ b/go/vt/vttablet/tabletserver/txserializer/tx_serializer_test.go | |
@@ -27,6 +27,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/streamlog" | |
"vitess.io/vitess/go/vt/vterrors" | |
@@ -306,7 +307,7 @@ func TestTxSerializerCancel(t *testing.T) { | |
} | |
// tx3 (gets queued and must wait). | |
- ctx3, cancel3 := context.WithCancel(context.Background()) | |
+ ctx3, cancel3 := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletserver/txserializer/tx_serializer_test.go:309") | |
wg := sync.WaitGroup{} | |
wg.Add(1) | |
go func() { | |
diff --git a/go/vt/vttablet/tabletserver/vstreamer/engine_test.go b/go/vt/vttablet/tabletserver/vstreamer/engine_test.go | |
index 9db9ecc01..25ed2e192 100644 | |
--- a/go/vt/vttablet/tabletserver/vstreamer/engine_test.go | |
+++ b/go/vt/vttablet/tabletserver/vstreamer/engine_test.go | |
@@ -23,6 +23,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/log" | |
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" | |
) | |
@@ -54,7 +55,7 @@ func TestUpdateVSchema(t *testing.T) { | |
defer env.SetVSchema("{}") | |
// We have to start at least one stream to start the vschema watcher. | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletserver/vstreamer/engine_test.go:57") | |
defer (func() { | |
log.Infof("ctx cancel at go/vt/vttablet/tabletserver/vstreamer/engine_test.go:57\n") | |
cancel() | |
diff --git a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go | |
index f19ea01f0..88978a380 100644 | |
--- a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go | |
+++ b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go | |
@@ -20,6 +20,7 @@ import ( | |
"context" | |
"fmt" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/mysql" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/vt/dbconfigs" | |
@@ -49,7 +50,7 @@ type rowStreamer struct { | |
} | |
func newRowStreamer(ctx context.Context, cp *mysql.ConnParams, se *schema.Engine, query string, lastpk []sqltypes.Value, kschema *vindexes.KeyspaceSchema, send func(*binlogdatapb.VStreamRowsResponse) error) *rowStreamer { | |
- ctx, cancel := context.WithCancel(ctx) | |
+ ctx, cancel := context2.WithCancel(ctx, "./go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go:52") | |
return &rowStreamer{ | |
ctx: ctx, | |
cancel: cancel, | |
diff --git a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go | |
index 8e237a9e8..678d587d1 100644 | |
--- a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go | |
+++ b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go | |
@@ -22,6 +22,7 @@ import ( | |
"testing" | |
"time" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/vt/log" | |
@@ -254,7 +255,7 @@ func TestStreamRowsCancel(t *testing.T) { | |
}) | |
engine.se.Reload(context.Background()) | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go:257") | |
defer (func() { | |
log.Infof("ctx cancel at go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go:257\n") | |
cancel() | |
diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go | |
index 0b0bc0eb7..6bbaba94f 100644 | |
--- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go | |
+++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go | |
@@ -23,6 +23,7 @@ import ( | |
"io" | |
"time" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/mysql" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/vt/binlog" | |
@@ -70,7 +71,7 @@ type streamerPlan struct { | |
} | |
func newVStreamer(ctx context.Context, cp *mysql.ConnParams, se *schema.Engine, startPos string, filter *binlogdatapb.Filter, kschema *vindexes.KeyspaceSchema, send func([]*binlogdatapb.VEvent) error) *vstreamer { | |
- ctx, cancel := context.WithCancel(ctx) | |
+ ctx, cancel := context2.WithCancel(ctx, "./go/vt/vttablet/tabletserver/vstreamer/vstreamer.go:73") | |
return &vstreamer{ | |
ctx: ctx, | |
cancel: cancel, | |
diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go | |
index 2a8240e8f..e02114cb7 100644 | |
--- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go | |
+++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go | |
@@ -23,6 +23,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/mysql" | |
"vitess.io/vitess/go/vt/log" | |
@@ -194,7 +195,7 @@ func TestREKeyRange(t *testing.T) { | |
} | |
defer env.SetVSchema("{}") | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go:197") | |
defer (func() { | |
log.Infof("ctx cancel at go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go:197\n") | |
cancel() | |
@@ -344,7 +345,7 @@ func TestDDLAddColumn(t *testing.T) { | |
}) | |
engine.se.Reload(context.Background()) | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go:347") | |
defer (func() { | |
log.Infof("ctx cancel at go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go:344\n") | |
cancel() | |
@@ -413,7 +414,7 @@ func TestDDLDropColumn(t *testing.T) { | |
}) | |
engine.se.Reload(context.Background()) | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go:416") | |
defer (func() { | |
log.Infof("ctx cancel at go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go:410\n") | |
cancel() | |
@@ -781,7 +782,7 @@ func TestMinimalMode(t *testing.T) { | |
"set @@session.binlog_row_image='full'", | |
}) | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go:784") | |
defer (func() { | |
log.Infof("ctx cancel at go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go:775\n") | |
cancel() | |
@@ -823,7 +824,7 @@ func TestStatementMode(t *testing.T) { | |
"set @@session.binlog_format='row'", | |
}) | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go:826") | |
defer (func() { | |
log.Infof("ctx cancel at go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go:814\n") | |
cancel() | |
@@ -845,7 +846,7 @@ func TestStatementMode(t *testing.T) { | |
func runCases(t *testing.T, filter *binlogdatapb.Filter, testcases []testcase) { | |
t.Helper() | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go:848") | |
defer (func() { | |
log.Infof("ctx cancel at go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go:833\n") | |
cancel() | |
diff --git a/go/vt/vttest/mysqlctl.go b/go/vt/vttest/mysqlctl.go | |
index 54c243f62..568a28eb5 100644 | |
--- a/go/vt/vttest/mysqlctl.go | |
+++ b/go/vt/vttest/mysqlctl.go | |
@@ -25,6 +25,7 @@ import ( | |
"strings" | |
"time" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/mysql" | |
"vitess.io/vitess/go/vt/log" | |
) | |
@@ -53,7 +54,7 @@ type Mysqlctl struct { | |
// Setup spawns a new mysqld service and initializes it with the defaults. | |
// The service is kept running in the background until TearDown() is called. | |
func (ctl *Mysqlctl) Setup() error { | |
- ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), 60*time.Second, "./go/vt/vttest/mysqlctl.go:56") | |
defer (func() { log.Infof("ctx cancel at go/vt/vttest/mysqlctl.go:56\n"); cancel() })() | |
cmd := exec.CommandContext(ctx, | |
@@ -77,7 +78,7 @@ func (ctl *Mysqlctl) Setup() error { | |
// TearDown shutdowns the running mysqld service | |
func (ctl *Mysqlctl) TearDown() error { | |
- ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), 60*time.Second, "./go/vt/vttest/mysqlctl.go:80") | |
defer (func() { log.Infof("ctx cancel at go/vt/vttest/mysqlctl.go:80\n"); cancel() })() | |
cmd := exec.CommandContext(ctx, | |
diff --git a/go/vt/worker/chunk.go b/go/vt/worker/chunk.go | |
index 4b775b470..1b9b9e80b 100644 | |
--- a/go/vt/worker/chunk.go | |
+++ b/go/vt/worker/chunk.go | |
@@ -19,6 +19,7 @@ package worker | |
import ( | |
"fmt" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/proto/vtrpc" | |
"vitess.io/vitess/go/vt/vterrors" | |
@@ -94,7 +95,7 @@ func generateChunks(ctx context.Context, wr *wrangler.Wrangler, tablet *topodata | |
// Get the MIN and MAX of the leading column of the primary key. | |
query := fmt.Sprintf("SELECT MIN(%v), MAX(%v) FROM %v.%v", sqlescape.EscapeID(td.PrimaryKeyColumns[0]), sqlescape.EscapeID(td.PrimaryKeyColumns[0]), sqlescape.EscapeID(topoproto.TabletDbName(tablet)), sqlescape.EscapeID(td.Name)) | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/chunk.go:97") | |
qr, err := wr.TabletManagerClient().ExecuteFetchAsApp(shortCtx, tablet, true, []byte(query), 1) | |
cancel() | |
if err != nil { | |
diff --git a/go/vt/worker/diff_utils.go b/go/vt/worker/diff_utils.go | |
index 19d2dcbca..895267e0f 100644 | |
--- a/go/vt/worker/diff_utils.go | |
+++ b/go/vt/worker/diff_utils.go | |
@@ -25,6 +25,7 @@ import ( | |
"strings" | |
"time" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/proto/vtrpc" | |
"vitess.io/vitess/go/vt/vterrors" | |
"vitess.io/vitess/go/vt/vttablet/tmclient" | |
@@ -63,7 +64,7 @@ type QueryResultReader struct { | |
// NewQueryResultReaderForTablet creates a new QueryResultReader for | |
// the provided tablet / sql query | |
func NewQueryResultReaderForTablet(ctx context.Context, ts *topo.Server, tabletAlias *topodatapb.TabletAlias, sql string) (*QueryResultReader, error) { | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/diff_utils.go:66") | |
tablet, err := ts.GetTablet(shortCtx, tabletAlias) | |
cancel() | |
if err != nil { | |
@@ -97,7 +98,7 @@ func NewQueryResultReaderForTablet(ctx context.Context, ts *topo.Server, tabletA | |
// NewTransactionalQueryResultReaderForTablet creates a new QueryResultReader for | |
// the provided tablet / sql query, and runs it in an existing transaction | |
func NewTransactionalQueryResultReaderForTablet(ctx context.Context, ts *topo.Server, tabletAlias *topodatapb.TabletAlias, sql string, txID int64) (*QueryResultReader, error) { | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/diff_utils.go:100") | |
tablet, err := ts.GetTablet(shortCtx, tabletAlias) | |
cancel() | |
if err != nil { | |
@@ -130,7 +131,7 @@ func NewTransactionalQueryResultReaderForTablet(ctx context.Context, ts *topo.Se | |
// RollbackTransaction rolls back the transaction | |
func RollbackTransaction(ctx context.Context, ts *topo.Server, tabletAlias *topodatapb.TabletAlias, txID int64) error { | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/diff_utils.go:133") | |
tablet, err := ts.GetTablet(shortCtx, tabletAlias) | |
cancel() | |
if err != nil { | |
diff --git a/go/vt/worker/executor.go b/go/vt/worker/executor.go | |
index e953bc4f0..6a0980987 100644 | |
--- a/go/vt/worker/executor.go | |
+++ b/go/vt/worker/executor.go | |
@@ -20,6 +20,7 @@ import ( | |
"fmt" | |
"time" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/log" | |
"vitess.io/vitess/go/vt/vterrors" | |
@@ -119,7 +120,7 @@ func (e *executor) refreshState(ctx context.Context) error { | |
func (e *executor) fetchWithRetries(ctx context.Context, action func(ctx context.Context, tablet *topodatapb.Tablet) error) error { | |
retryDuration := *retryDuration | |
// We should keep retrying up until the retryCtx runs out. | |
- retryCtx, retryCancel := context.WithTimeout(ctx, retryDuration) | |
+ retryCtx, retryCancel := context2.WithTimeout(ctx, retryDuration, "./go/vt/worker/executor.go:122") | |
defer retryCancel() | |
// Is this current attempt a retry of a previous attempt? | |
isRetry := false | |
@@ -152,7 +153,7 @@ func (e *executor) fetchWithRetries(ctx context.Context, action func(ctx context | |
// Run the command (in a block since goto above does not allow to introduce | |
// new variables until the label is reached.) | |
{ | |
- tryCtx, cancel := context.WithTimeout(retryCtx, 2*time.Minute) | |
+ tryCtx, cancel := context2.WithTimeout(retryCtx, 2*time.Minute, "./go/vt/worker/executor.go:155") | |
err = action(tryCtx, master.Tablet) | |
cancel() | |
diff --git a/go/vt/worker/instance.go b/go/vt/worker/instance.go | |
index c14c2df9b..c46554873 100644 | |
--- a/go/vt/worker/instance.go | |
+++ b/go/vt/worker/instance.go | |
@@ -24,6 +24,7 @@ import ( | |
"time" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/trace" | |
"vitess.io/vitess/go/tb" | |
@@ -115,7 +116,7 @@ func (wi *Instance) setAndStartWorker(ctx context.Context, wrk Worker, wr *wrang | |
wi.currentWorker = wrk | |
wi.currentMemoryLogger = logutil.NewMemoryLogger() | |
- wi.currentContext, wi.currentCancelFunc = context.WithCancel(ctx) | |
+ wi.currentContext, wi.currentCancelFunc = context2.WithCancel(ctx, "./go/vt/worker/instance.go:118") | |
wi.lastRunError = nil | |
wi.lastRunStopTime = time.Unix(0, 0) | |
done := make(chan struct{}) | |
diff --git a/go/vt/worker/legacy_split_clone.go b/go/vt/worker/legacy_split_clone.go | |
index a191a1b79..890a71207 100644 | |
--- a/go/vt/worker/legacy_split_clone.go | |
+++ b/go/vt/worker/legacy_split_clone.go | |
@@ -27,6 +27,7 @@ import ( | |
"sync" | |
"time" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/vterrors" | |
"golang.org/x/net/context" | |
@@ -266,7 +267,7 @@ func (scw *LegacySplitCloneWorker) init(ctx context.Context) error { | |
var err error | |
// read the keyspace and validate it | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/legacy_split_clone.go:269") | |
scw.keyspaceInfo, err = scw.wr.TopoServer().GetKeyspace(shortCtx, scw.keyspace) | |
cancel() | |
if err != nil { | |
@@ -274,7 +275,7 @@ func (scw *LegacySplitCloneWorker) init(ctx context.Context) error { | |
} | |
// find the OverlappingShards in the keyspace | |
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/legacy_split_clone.go:277") | |
osList, err := topotools.FindOverlappingShards(shortCtx, scw.wr.TopoServer(), scw.keyspace) | |
cancel() | |
if err != nil { | |
@@ -367,7 +368,7 @@ func (scw *LegacySplitCloneWorker) findTargets(ctx context.Context) error { | |
// get the tablet info for them, and stop their replication | |
scw.sourceTablets = make([]*topodatapb.Tablet, len(scw.sourceAliases)) | |
for i, alias := range scw.sourceAliases { | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/legacy_split_clone.go:370") | |
ti, err := scw.wr.TopoServer().GetTablet(shortCtx, alias) | |
cancel() | |
if err != nil { | |
@@ -375,7 +376,7 @@ func (scw *LegacySplitCloneWorker) findTargets(ctx context.Context) error { | |
} | |
scw.sourceTablets[i] = ti.Tablet | |
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/legacy_split_clone.go:378") | |
err = scw.wr.TabletManagerClient().StopSlave(shortCtx, scw.sourceTablets[i]) | |
cancel() | |
if err != nil { | |
@@ -398,7 +399,7 @@ func (scw *LegacySplitCloneWorker) findTargets(ctx context.Context) error { | |
// Make sure we find a master for each destination shard and log it. | |
scw.wr.Logger().Infof("Finding a MASTER tablet for each destination shard...") | |
for _, si := range scw.destinationShards { | |
- waitCtx, waitCancel := context.WithTimeout(ctx, 10*time.Second) | |
+ waitCtx, waitCancel := context2.WithTimeout(ctx, 10*time.Second, "./go/vt/worker/legacy_split_clone.go:401") | |
defer waitCancel() | |
if err := scw.tsc.WaitForTablets(waitCtx, scw.cell, si.Keyspace(), si.ShardName(), topodatapb.TabletType_MASTER); err != nil { | |
return vterrors.Wrapf(err, "cannot find MASTER tablet for destination shard for %v/%v", si.Keyspace(), si.ShardName()) | |
@@ -410,7 +411,7 @@ func (scw *LegacySplitCloneWorker) findTargets(ctx context.Context) error { | |
master := masters[0] | |
// Get the MySQL database name of the tablet. | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/legacy_split_clone.go:413") | |
ti, err := scw.wr.TopoServer().GetTablet(shortCtx, master.Tablet.Alias) | |
cancel() | |
if err != nil { | |
@@ -453,7 +454,7 @@ func (scw *LegacySplitCloneWorker) copy(ctx context.Context) error { | |
// on all source shards. Furthermore, we estimate the number of rows | |
// in each source shard for each table to be about the same | |
// (rowCount is used to estimate an ETA) | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/legacy_split_clone.go:456") | |
sourceSchemaDefinition, err := scw.wr.GetSchema(shortCtx, scw.sourceAliases[0], nil, scw.excludeTables, false /* includeViews */) | |
cancel() | |
if err != nil { | |
@@ -476,7 +477,7 @@ func (scw *LegacySplitCloneWorker) copy(ctx context.Context) error { | |
mu := sync.Mutex{} | |
var firstError error | |
- ctx, cancelCopy := context.WithCancel(ctx) | |
+ ctx, cancelCopy := context2.WithCancel(ctx, "./go/vt/worker/legacy_split_clone.go:479") | |
processError := func(format string, args ...interface{}) { | |
scw.wr.Logger().Errorf(format, args...) | |
mu.Lock() | |
@@ -605,7 +606,7 @@ func (scw *LegacySplitCloneWorker) copy(ctx context.Context) error { | |
sourcePositions := make([]string, len(scw.sourceShards)) | |
// get the current position from the sources | |
for shardIndex := range scw.sourceShards { | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/legacy_split_clone.go:608") | |
status, err := scw.wr.TabletManagerClient().SlaveStatus(shortCtx, scw.sourceTablets[shardIndex]) | |
cancel() | |
if err != nil { | |
diff --git a/go/vt/worker/legacy_split_clone_test.go b/go/vt/worker/legacy_split_clone_test.go | |
index 1cc578d56..b46dcce97 100644 | |
--- a/go/vt/worker/legacy_split_clone_test.go | |
+++ b/go/vt/worker/legacy_split_clone_test.go | |
@@ -28,6 +28,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/mysql" | |
"vitess.io/vitess/go/mysql/fakesqldb" | |
"vitess.io/vitess/go/sqltypes" | |
@@ -469,7 +470,7 @@ func TestLegacySplitCloneV2_NoMasterAvailable(t *testing.T) { | |
statsRetryCounters.ResetAll() | |
errs := make(chan error, 1) | |
go func() { | |
- ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), 30*time.Second, "./go/vt/worker/legacy_split_clone_test.go:472") | |
defer (func() { log.Infof("ctx cancel at go/vt/worker/legacy_split_clone_test.go:472\n"); cancel() })() | |
for { | |
diff --git a/go/vt/worker/multi_split_diff.go b/go/vt/worker/multi_split_diff.go | |
index 9a93ca1e1..c43d6a3d4 100644 | |
--- a/go/vt/worker/multi_split_diff.go | |
+++ b/go/vt/worker/multi_split_diff.go | |
@@ -24,6 +24,7 @@ import ( | |
"time" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/proto/vtrpc" | |
"vitess.io/vitess/go/vt/vttablet/queryservice" | |
"vitess.io/vitess/go/vt/vttablet/tabletconn" | |
@@ -200,13 +201,13 @@ func (msdw *MultiSplitDiffWorker) init(ctx context.Context) error { | |
} | |
var err error | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:203") | |
msdw.keyspaceInfo, err = msdw.wr.TopoServer().GetKeyspace(shortCtx, msdw.keyspace) | |
cancel() | |
if err != nil { | |
return vterrors.Wrapf(err, "cannot read keyspace %v", msdw.keyspace) | |
} | |
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:209") | |
msdw.shardInfo, err = msdw.wr.TopoServer().GetShard(shortCtx, msdw.keyspace, msdw.shard) | |
cancel() | |
if err != nil { | |
@@ -228,7 +229,7 @@ func (msdw *MultiSplitDiffWorker) init(ctx context.Context) error { | |
// findDestinationShards finds all the shards that have filtered replication from the source shard | |
func (msdw *MultiSplitDiffWorker) findDestinationShards(ctx context.Context) ([]*topo.ShardInfo, error) { | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:231") | |
keyspaces, err := msdw.wr.TopoServer().GetKeyspaces(shortCtx) | |
cancel() | |
if err != nil { | |
@@ -252,7 +253,7 @@ func (msdw *MultiSplitDiffWorker) findDestinationShards(ctx context.Context) ([] | |
} | |
func (msdw *MultiSplitDiffWorker) findShardsInKeyspace(ctx context.Context, keyspace string) ([]*topo.ShardInfo, error) { | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:255") | |
shards, err := msdw.wr.TopoServer().GetShardNames(shortCtx, keyspace) | |
cancel() | |
if err != nil { | |
@@ -284,7 +285,7 @@ func (msdw *MultiSplitDiffWorker) findShardsInKeyspace(ctx context.Context, keys | |
} | |
func (msdw *MultiSplitDiffWorker) getShardInfo(ctx context.Context, keyspace string, shard string) (*topo.ShardInfo, uint32, error) { | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:287") | |
si, err := msdw.wr.TopoServer().GetShard(shortCtx, keyspace, shard) | |
cancel() | |
if err != nil { | |
@@ -349,14 +350,14 @@ func (msdw *MultiSplitDiffWorker) stopVreplicationOnAll(ctx context.Context, tab | |
tablet := tabletInfo[i].Tablet | |
msdw.wr.Logger().Infof("stopping master binlog replication on %v", shardInfo.MasterAlias) | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:352") | |
_, err := msdw.wr.TabletManagerClient().VReplicationExec(shortCtx, tablet, binlogplayer.StopVReplication(msdw.sourceUID, "for split diff")) | |
cancel() | |
if err != nil { | |
return nil, vterrors.Wrapf(err, "VReplicationExec(stop) for %v failed", shardInfo.MasterAlias) | |
} | |
wrangler.RecordVReplicationAction(msdw.cleaner, tablet, binlogplayer.StartVReplication(msdw.sourceUID)) | |
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:359") | |
p3qr, err := msdw.wr.TabletManagerClient().VReplicationExec(shortCtx, tablet, binlogplayer.ReadVReplicationPos(msdw.sourceUID)) | |
cancel() | |
if err != nil { | |
@@ -375,7 +376,7 @@ func (msdw *MultiSplitDiffWorker) stopVreplicationOnAll(ctx context.Context, tab | |
} | |
func (msdw *MultiSplitDiffWorker) getMasterTabletInfoForShard(ctx context.Context, shardInfo *topo.ShardInfo) (*topo.TabletInfo, error) { | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:378") | |
masterInfo, err := msdw.wr.TopoServer().GetTablet(shortCtx, shardInfo.MasterAlias) | |
cancel() | |
if err != nil { | |
@@ -389,7 +390,7 @@ func (msdw *MultiSplitDiffWorker) getMasterTabletInfoForShard(ctx context.Contex | |
// (add a cleanup task to restart binlog replication on the source tablet, and | |
// change the existing ChangeSlaveType cleanup action to 'spare' type) | |
func (msdw *MultiSplitDiffWorker) stopReplicationOnSourceTabletAt(ctx context.Context, destVreplicationPos []string) (string, error) { | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:392") | |
sourceTablet, err := msdw.wr.TopoServer().GetTablet(shortCtx, msdw.sourceAlias) | |
cancel() | |
if err != nil { | |
@@ -406,14 +407,14 @@ func (msdw *MultiSplitDiffWorker) stopReplicationOnSourceTabletAt(ctx context.Co | |
msdw.wr.Logger().Infof("stopping slave %v at a minimum of %v", msdw.sourceAlias, vreplicationPos) | |
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:409") | |
msdw.wr.TabletManagerClient().StartSlave(shortCtx, sourceTablet.Tablet) | |
cancel() | |
if err != nil { | |
return "", err | |
} | |
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:416") | |
mysqlPos, err = msdw.wr.TabletManagerClient().StopSlaveMinimum(shortCtx, sourceTablet.Tablet, vreplicationPos, *remoteActionsTimeout) | |
cancel() | |
if err != nil { | |
@@ -431,21 +432,21 @@ func (msdw *MultiSplitDiffWorker) stopReplicationOnSourceTabletAt(ctx context.Co | |
// up to the specified source position, and return the destination position. | |
func (msdw *MultiSplitDiffWorker) stopVreplicationAt(ctx context.Context, shardInfo *topo.ShardInfo, sourcePosition string, masterInfo *topo.TabletInfo) (string, error) { | |
msdw.wr.Logger().Infof("Restarting master %v until it catches up to %v", shardInfo.MasterAlias, sourcePosition) | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:434") | |
_, err := msdw.wr.TabletManagerClient().VReplicationExec(shortCtx, masterInfo.Tablet, binlogplayer.StartVReplicationUntil(msdw.sourceUID, sourcePosition)) | |
cancel() | |
if err != nil { | |
return "", vterrors.Wrapf(err, "VReplication(start until) for %v until %v failed", shardInfo.MasterAlias, sourcePosition) | |
} | |
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:441") | |
err = msdw.wr.TabletManagerClient().VReplicationWaitForPos(shortCtx, masterInfo.Tablet, int(msdw.sourceUID), sourcePosition) | |
cancel() | |
if err != nil { | |
return "", vterrors.Wrapf(err, "VReplicationWaitForPos for %v until %v failed", shardInfo.MasterAlias, sourcePosition) | |
} | |
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:448") | |
masterPos, err := msdw.wr.TabletManagerClient().MasterPosition(shortCtx, masterInfo.Tablet) | |
cancel() | |
if err != nil { | |
@@ -464,7 +465,7 @@ func (msdw *MultiSplitDiffWorker) stopReplicationAt(ctx context.Context, destina | |
} else { | |
msdw.wr.Logger().Infof("waiting for destination tablet %v to catch up to %v", destinationAlias, masterPos) | |
} | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:467") | |
destinationTablet, err := msdw.wr.TopoServer().GetTablet(shortCtx, destinationAlias) | |
cancel() | |
if err != nil { | |
@@ -475,7 +476,7 @@ func (msdw *MultiSplitDiffWorker) stopReplicationAt(ctx context.Context, destina | |
time.Sleep(1 * time.Minute) | |
} | |
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:478") | |
if msdw.waitForFixedTimeRatherThanGtidSet { | |
err = msdw.wr.TabletManagerClient().StopSlave(shortCtx, destinationTablet.Tablet) | |
} else { | |
@@ -493,7 +494,7 @@ func (msdw *MultiSplitDiffWorker) stopReplicationAt(ctx context.Context, destina | |
// (remove the cleanup task that does the same) | |
func (msdw *MultiSplitDiffWorker) startVreplication(ctx context.Context, shardInfo *topo.ShardInfo, masterInfo *topo.TabletInfo) error { | |
msdw.wr.Logger().Infof("restarting filtered replication on master %v", shardInfo.MasterAlias) | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:496") | |
_, err := msdw.wr.TabletManagerClient().VReplicationExec(shortCtx, masterInfo.Tablet, binlogplayer.StartVReplication(msdw.sourceUID)) | |
if err != nil { | |
return vterrors.Wrapf(err, "VReplicationExec(start) failed for %v", shardInfo.MasterAlias) | |
@@ -560,7 +561,7 @@ func (msdw *MultiSplitDiffWorker) synchronizeSrcAndDestTxState(ctx context.Conte | |
} | |
} | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:563") | |
source, _ := msdw.wr.TopoServer().GetTablet(shortCtx, msdw.sourceAlias) | |
cancel() | |
@@ -604,7 +605,7 @@ func (msdw *MultiSplitDiffWorker) synchronizeSrcAndDestTxState(ctx context.Conte | |
return err | |
} | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:607") | |
destTabletInfo, err := msdw.wr.TopoServer().GetTablet(shortCtx, destinationAlias) | |
cancel() | |
if err != nil { | |
@@ -652,7 +653,7 @@ func (msdw *MultiSplitDiffWorker) synchronizeSrcAndDestTxState(ctx context.Conte | |
func (msdw *MultiSplitDiffWorker) waitForDestinationTabletToReach(ctx context.Context, tablet *topodatapb.Tablet, mysqlPos string) error { | |
for i := 0; i < 20; i++ { | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:655") | |
pos, err := msdw.wr.TabletManagerClient().MasterPosition(shortCtx, tablet) | |
cancel() | |
if err != nil { | |
@@ -758,7 +759,7 @@ func (msdw *MultiSplitDiffWorker) gatherSchemaInfo(ctx context.Context) ([]*tabl | |
wg.Add(1) | |
go func(i int, destinationAlias *topodatapb.TabletAlias) { | |
var err error | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:761") | |
destinationSchemaDefinition, err := msdw.wr.GetSchema( | |
shortCtx, destinationAlias, nil /* tables */, msdw.excludeTables, false /* includeViews */) | |
cancel() | |
@@ -773,7 +774,7 @@ func (msdw *MultiSplitDiffWorker) gatherSchemaInfo(ctx context.Context) ([]*tabl | |
wg.Add(1) | |
go func() { | |
var err error | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:776") | |
sourceSchemaDefinition, err = msdw.wr.GetSchema( | |
shortCtx, msdw.sourceAlias, nil /* tables */, msdw.excludeTables, false /* includeViews */) | |
cancel() | |
@@ -811,7 +812,7 @@ func (msdw *MultiSplitDiffWorker) diffSchemaInformation(ctx context.Context, des | |
} | |
func (msdw *MultiSplitDiffWorker) loadVSchema(ctx context.Context) (*vindexes.KeyspaceSchema, error) { | |
- shortCtx, cancel := context.WithCancel(ctx) | |
+ shortCtx, cancel := context2.WithCancel(ctx, "./go/vt/worker/multi_split_diff.go:814") | |
kschema, err := msdw.wr.TopoServer().GetVSchema(shortCtx, msdw.keyspace) | |
cancel() | |
if err != nil { | |
diff --git a/go/vt/worker/multi_split_diff_cmd.go b/go/vt/worker/multi_split_diff_cmd.go | |
index 5a6b880c5..1c6cb4e42 100644 | |
--- a/go/vt/worker/multi_split_diff_cmd.go | |
+++ b/go/vt/worker/multi_split_diff_cmd.go | |
@@ -25,6 +25,7 @@ import ( | |
"strings" | |
"sync" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/proto/topodata" | |
"vitess.io/vitess/go/vt/proto/vtrpc" | |
"vitess.io/vitess/go/vt/vterrors" | |
@@ -123,7 +124,7 @@ func commandMultiSplitDiff(wi *Instance, wr *wrangler.Wrangler, subFlags *flag.F | |
// shardSources returns all the shards that are SourceShards of at least one other shard. | |
func shardSources(ctx context.Context, wr *wrangler.Wrangler) ([]map[string]string, error) { | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff_cmd.go:126") | |
keyspaces, err := wr.TopoServer().GetKeyspaces(shortCtx) | |
cancel() | |
if err != nil { | |
@@ -139,7 +140,7 @@ func shardSources(ctx context.Context, wr *wrangler.Wrangler) ([]map[string]stri | |
wg.Add(1) | |
go func(keyspace string) { | |
defer wg.Done() | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff_cmd.go:142") | |
shards, err := wr.TopoServer().GetShardNames(shortCtx, keyspace) | |
cancel() | |
if err != nil { | |
@@ -150,7 +151,7 @@ func shardSources(ctx context.Context, wr *wrangler.Wrangler) ([]map[string]stri | |
wg.Add(1) | |
go func(keyspace, shard string) { | |
defer wg.Done() | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff_cmd.go:153") | |
si, err := wr.TopoServer().GetShard(shortCtx, keyspace, shard) | |
cancel() | |
if err != nil { | |
diff --git a/go/vt/worker/restartable_result_reader.go b/go/vt/worker/restartable_result_reader.go | |
index 04f95695f..995096e96 100644 | |
--- a/go/vt/worker/restartable_result_reader.go | |
+++ b/go/vt/worker/restartable_result_reader.go | |
@@ -22,6 +22,7 @@ import ( | |
"strings" | |
"time" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/vterrors" | |
"golang.org/x/net/context" | |
@@ -225,7 +226,7 @@ func (r *RestartableResultReader) Next() (*sqltypes.Result, error) { | |
func (r *RestartableResultReader) nextWithRetries() (*sqltypes.Result, error) { | |
// In case of errors we will keep retrying until retryCtx is done. | |
- retryCtx, retryCancel := context.WithTimeout(r.ctx, *retryDuration) | |
+ retryCtx, retryCancel := context2.WithTimeout(r.ctx, *retryDuration, "./go/vt/worker/restartable_result_reader.go:228") | |
defer retryCancel() | |
// The first retry is the second attempt because we already tried once in Next() | |
diff --git a/go/vt/worker/split_clone.go b/go/vt/worker/split_clone.go | |
index 24254fc88..ca112cb48 100644 | |
--- a/go/vt/worker/split_clone.go | |
+++ b/go/vt/worker/split_clone.go | |
@@ -25,6 +25,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/event" | |
"vitess.io/vitess/go/stats" | |
"vitess.io/vitess/go/vt/binlog/binlogplayer" | |
@@ -528,7 +529,7 @@ func (scw *SplitCloneWorker) init(ctx context.Context) error { | |
scw.setState(WorkerStateInit) | |
// read the keyspace and validate it | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_clone.go:531") | |
var err error | |
scw.destinationKeyspaceInfo, err = scw.wr.TopoServer().GetKeyspace(shortCtx, scw.destinationKeyspace) | |
cancel() | |
@@ -578,7 +579,7 @@ func (scw *SplitCloneWorker) init(ctx context.Context) error { | |
func (scw *SplitCloneWorker) initShardsForHorizontalResharding(ctx context.Context) error { | |
// find the OverlappingShards in the keyspace | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_clone.go:581") | |
osList, err := topotools.FindOverlappingShards(shortCtx, scw.wr.TopoServer(), scw.destinationKeyspace) | |
cancel() | |
if err != nil { | |
@@ -635,7 +636,7 @@ func (scw *SplitCloneWorker) initShardsForVerticalSplit(ctx context.Context) err | |
} | |
sourceKeyspace := servedFrom | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_clone.go:638") | |
shardMap, err := scw.wr.TopoServer().FindAllShardsInKeyspace(shortCtx, sourceKeyspace) | |
cancel() | |
if err != nil { | |
@@ -775,7 +776,7 @@ func (scw *SplitCloneWorker) findOfflineSourceTablets(ctx context.Context) error | |
// get the tablet info for them, and stop their replication | |
scw.sourceTablets = make([]*topodatapb.Tablet, len(scw.sourceAliases)) | |
for i, alias := range scw.sourceAliases { | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_clone.go:778") | |
ti, err := scw.wr.TopoServer().GetTablet(shortCtx, alias) | |
cancel() | |
if err != nil { | |
@@ -783,7 +784,7 @@ func (scw *SplitCloneWorker) findOfflineSourceTablets(ctx context.Context) error | |
} | |
scw.sourceTablets[i] = ti.Tablet | |
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_clone.go:786") | |
err = scw.wr.TabletManagerClient().StopSlave(shortCtx, scw.sourceTablets[i]) | |
cancel() | |
if err != nil { | |
@@ -818,7 +819,7 @@ func (scw *SplitCloneWorker) findTransactionalSources(ctx context.Context) error | |
// get the tablet info | |
scw.sourceTablets = make([]*topodatapb.Tablet, 1) | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_clone.go:821") | |
ti, err := scw.wr.TopoServer().GetTablet(shortCtx, scw.sourceAliases[0]) | |
cancel() | |
if err != nil { | |
@@ -844,7 +845,7 @@ func (scw *SplitCloneWorker) findDestinationMasters(ctx context.Context) error { | |
// Make sure we find a master for each destination shard and log it. | |
scw.wr.Logger().Infof("Finding a MASTER tablet for each destination shard...") | |
for _, si := range scw.destinationShards { | |
- waitCtx, waitCancel := context.WithTimeout(ctx, *waitForHealthyTabletsTimeout) | |
+ waitCtx, waitCancel := context2.WithTimeout(ctx, *waitForHealthyTabletsTimeout, "./go/vt/worker/split_clone.go:847") | |
err := scw.tsc.WaitForTablets(waitCtx, scw.cell, si.Keyspace(), si.ShardName(), topodatapb.TabletType_MASTER) | |
waitCancel() | |
if err != nil { | |
@@ -1179,7 +1180,7 @@ func (scw *SplitCloneWorker) clone(ctx context.Context, state StatusWorkerState) | |
var firstError error | |
- ctx, cancelCopy := context.WithCancel(ctx) | |
+ ctx, cancelCopy := context2.WithCancel(ctx, "./go/vt/worker/split_clone.go:1182") | |
defer (func() { log.Infof("ctx cancel at go/vt/worker/split_clone.go:1182\n"); cancelCopy() })() | |
processError := func(format string, args ...interface{}) { | |
// in theory we could have two threads see firstError as null and both write to the variable | |
@@ -1246,7 +1247,7 @@ func (scw *SplitCloneWorker) setUpVReplication(ctx context.Context) error { | |
sourcePositions[0] = scw.lastPos | |
} else { | |
for shardIndex := range scw.sourceShards { | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_clone.go:1249") | |
status, err := scw.wr.TabletManagerClient().SlaveStatus(shortCtx, scw.sourceTablets[shardIndex]) | |
cancel() | |
if err != nil { | |
@@ -1255,7 +1256,7 @@ func (scw *SplitCloneWorker) setUpVReplication(ctx context.Context) error { | |
sourcePositions[shardIndex] = status.Position | |
} | |
} | |
- cancelableCtx, cancel := context.WithCancel(ctx) | |
+ cancelableCtx, cancel := context2.WithCancel(ctx, "./go/vt/worker/split_clone.go:1258") | |
rec := concurrency.AllErrorRecorder{} | |
handleError := func(e error) { | |
rec.RecordError(e) | |
@@ -1320,7 +1321,7 @@ func (scw *SplitCloneWorker) getSourceSchema(ctx context.Context, tablet *topoda | |
// on all source shards. Furthermore, we estimate the number of rows | |
// in each source shard for each table to be about the same | |
// (rowCount is used to estimate an ETA) | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_clone.go:1323") | |
sourceSchemaDefinition, err := scw.wr.GetSchema(shortCtx, tablet.Alias, scw.tables, scw.excludeTables, false /* includeViews */) | |
cancel() | |
if err != nil { | |
diff --git a/go/vt/worker/split_clone_cmd.go b/go/vt/worker/split_clone_cmd.go | |
index 4cf6b8795..da7782709 100644 | |
--- a/go/vt/worker/split_clone_cmd.go | |
+++ b/go/vt/worker/split_clone_cmd.go | |
@@ -27,6 +27,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/concurrency" | |
"vitess.io/vitess/go/vt/proto/topodata" | |
"vitess.io/vitess/go/vt/proto/vtrpc" | |
@@ -151,7 +152,7 @@ func commandSplitClone(wi *Instance, wr *wrangler.Wrangler, subFlags *flag.FlagS | |
} | |
func keyspacesWithOverlappingShards(ctx context.Context, wr *wrangler.Wrangler) ([]map[string]string, error) { | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_clone_cmd.go:154") | |
keyspaces, err := wr.TopoServer().GetKeyspaces(shortCtx) | |
cancel() | |
if err != nil { | |
@@ -166,7 +167,7 @@ func keyspacesWithOverlappingShards(ctx context.Context, wr *wrangler.Wrangler) | |
wg.Add(1) | |
go func(keyspace string) { | |
defer wg.Done() | |
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_clone_cmd.go:169") | |
osList, err := topotools.FindOverlappingShards(shortCtx, wr.TopoServer(), keyspace) | |
cancel() | |
if err != nil { | |
diff --git a/go/vt/worker/split_clone_test.go b/go/vt/worker/split_clone_test.go | |
index 72335157b..c26612013 100644 | |
--- a/go/vt/worker/split_clone_test.go | |
+++ b/go/vt/worker/split_clone_test.go | |
@@ -27,6 +27,7 @@ import ( | |
"time" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/mysql" | |
"vitess.io/vitess/go/mysql/fakesqldb" | |
"vitess.io/vitess/go/sqltypes" | |
@@ -1026,7 +1027,7 @@ func TestSplitCloneV2_NoMasterAvailable(t *testing.T) { | |
// late because this Go routine looks at it and can run before the worker. | |
statsRetryCounters.ResetAll() | |
go func() { | |
- ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), 30*time.Second, "./go/vt/worker/split_clone_test.go:1029") | |
defer (func() { log.Infof("ctx cancel at go/vt/worker/split_clone_test.go:1029\n"); cancel() })() | |
for { | |
diff --git a/go/vt/worker/split_diff.go b/go/vt/worker/split_diff.go | |
index 4718c731e..25b60dfd2 100644 | |
--- a/go/vt/worker/split_diff.go | |
+++ b/go/vt/worker/split_diff.go | |
@@ -21,6 +21,7 @@ import ( | |
"sort" | |
"sync" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/log" | |
"vitess.io/vitess/go/vt/proto/vtrpc" | |
"vitess.io/vitess/go/vt/vterrors" | |
@@ -188,13 +189,13 @@ func (sdw *SplitDiffWorker) init(ctx context.Context) error { | |
sdw.SetState(WorkerStateInit) | |
var err error | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff.go:191") | |
sdw.keyspaceInfo, err = sdw.wr.TopoServer().GetKeyspace(shortCtx, sdw.keyspace) | |
cancel() | |
if err != nil { | |
return vterrors.Wrapf(err, "cannot read keyspace %v", sdw.keyspace) | |
} | |
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff.go:197") | |
sdw.shardInfo, err = sdw.wr.TopoServer().GetShard(shortCtx, sdw.keyspace, sdw.shard) | |
cancel() | |
if err != nil { | |
@@ -259,7 +260,7 @@ func (sdw *SplitDiffWorker) findTargets(ctx context.Context) error { | |
// to FindWorkerTablet could attempt to set to DRAIN state the same tablet. Only | |
// one of these calls to FindWorkerTablet will succeed and the rest will fail. | |
// The following, makes sures we keep trying to find a worker tablet when this error occur. | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff.go:262") | |
for { | |
select { | |
case <-shortCtx.Done(): | |
@@ -298,7 +299,7 @@ func (sdw *SplitDiffWorker) findTargets(ctx context.Context) error { | |
func (sdw *SplitDiffWorker) synchronizeReplication(ctx context.Context) error { | |
sdw.SetState(WorkerStateSyncReplication) | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff.go:301") | |
defer (func() { log.Infof("ctx cancel at go/vt/worker/split_diff.go:301\n"); cancel() })() | |
masterInfo, err := sdw.wr.TopoServer().GetTablet(shortCtx, sdw.shardInfo.MasterAlias) | |
if err != nil { | |
@@ -307,7 +308,7 @@ func (sdw *SplitDiffWorker) synchronizeReplication(ctx context.Context) error { | |
// 1 - stop the master binlog replication, get its current position | |
sdw.wr.Logger().Infof("Stopping master binlog replication on %v", sdw.shardInfo.MasterAlias) | |
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff.go:310") | |
defer (func() { log.Infof("ctx cancel at go/vt/worker/split_diff.go:310\n"); cancel() })() | |
_, err = sdw.wr.TabletManagerClient().VReplicationExec(shortCtx, masterInfo.Tablet, binlogplayer.StopVReplication(sdw.sourceShard.Uid, "for split diff")) | |
if err != nil { | |
@@ -332,7 +333,7 @@ func (sdw *SplitDiffWorker) synchronizeReplication(ctx context.Context) error { | |
return err | |
} | |
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff.go:335") | |
defer (func() { log.Infof("ctx cancel at go/vt/worker/split_diff.go:335\n"); cancel() })() | |
mysqlPos, err := sdw.wr.TabletManagerClient().StopSlaveMinimum(shortCtx, sourceTablet.Tablet, vreplicationPos, *remoteActionsTimeout) | |
if err != nil { | |
@@ -346,7 +347,7 @@ func (sdw *SplitDiffWorker) synchronizeReplication(ctx context.Context) error { | |
// 3 - ask the master of the destination shard to resume filtered | |
// replication up to the new list of positions | |
sdw.wr.Logger().Infof("Restarting master %v until it catches up to %v", sdw.shardInfo.MasterAlias, mysqlPos) | |
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff.go:349") | |
defer (func() { log.Infof("ctx cancel at go/vt/worker/split_diff.go:349\n"); cancel() })() | |
_, err = sdw.wr.TabletManagerClient().VReplicationExec(shortCtx, masterInfo.Tablet, binlogplayer.StartVReplicationUntil(sdw.sourceShard.Uid, mysqlPos)) | |
if err != nil { | |
@@ -363,13 +364,13 @@ func (sdw *SplitDiffWorker) synchronizeReplication(ctx context.Context) error { | |
// 4 - wait until the destination tablet is equal or passed | |
// that master binlog position, and stop its replication. | |
sdw.wr.Logger().Infof("Waiting for destination tablet %v to catch up to %v", sdw.destinationAlias, masterPos) | |
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff.go:366") | |
defer (func() { log.Infof("ctx cancel at go/vt/worker/split_diff.go:366\n"); cancel() })() | |
destinationTablet, err := sdw.wr.TopoServer().GetTablet(shortCtx, sdw.destinationAlias) | |
if err != nil { | |
return err | |
} | |
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff.go:372") | |
defer (func() { log.Infof("ctx cancel at go/vt/worker/split_diff.go:372\n"); cancel() })() | |
if _, err = sdw.wr.TabletManagerClient().StopSlaveMinimum(shortCtx, destinationTablet.Tablet, masterPos, *remoteActionsTimeout); err != nil { | |
return vterrors.Wrapf(err, "StopSlaveMinimum for %v at %v failed", sdw.destinationAlias, masterPos) | |
@@ -378,7 +379,7 @@ func (sdw *SplitDiffWorker) synchronizeReplication(ctx context.Context) error { | |
// 5 - restart filtered replication on destination master | |
sdw.wr.Logger().Infof("Restarting filtered replication on master %v", sdw.shardInfo.MasterAlias) | |
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff.go:381") | |
defer (func() { log.Infof("ctx cancel at go/vt/worker/split_diff.go:381\n"); cancel() })() | |
if _, err = sdw.wr.TabletManagerClient().VReplicationExec(shortCtx, masterInfo.Tablet, binlogplayer.StartVReplication(sdw.sourceShard.Uid)); err != nil { | |
return vterrors.Wrapf(err, "VReplicationExec(start) failed for %v", sdw.shardInfo.MasterAlias) | |
@@ -401,7 +402,7 @@ func (sdw *SplitDiffWorker) diff(ctx context.Context) error { | |
wg.Add(1) | |
go func() { | |
var err error | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff.go:404") | |
sdw.destinationSchemaDefinition, err = sdw.wr.GetSchema( | |
shortCtx, sdw.destinationAlias, nil /* tables */, sdw.excludeTables, false /* includeViews */) | |
cancel() | |
@@ -414,7 +415,7 @@ func (sdw *SplitDiffWorker) diff(ctx context.Context) error { | |
wg.Add(1) | |
go func() { | |
var err error | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff.go:417") | |
sdw.sourceSchemaDefinition, err = sdw.wr.GetSchema( | |
shortCtx, sdw.sourceAlias, nil /* tables */, sdw.excludeTables, false /* includeViews */) | |
cancel() | |
diff --git a/go/vt/worker/split_diff_cmd.go b/go/vt/worker/split_diff_cmd.go | |
index 5ece6f5dc..bd52efae6 100644 | |
--- a/go/vt/worker/split_diff_cmd.go | |
+++ b/go/vt/worker/split_diff_cmd.go | |
@@ -25,6 +25,7 @@ import ( | |
"strings" | |
"sync" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/proto/vtrpc" | |
"vitess.io/vitess/go/vt/vterrors" | |
@@ -114,7 +115,7 @@ func commandSplitDiff(wi *Instance, wr *wrangler.Wrangler, subFlags *flag.FlagSe | |
// shardsWithSources returns all the shards that have SourceShards set | |
// with no Tables list. | |
func shardsWithSources(ctx context.Context, wr *wrangler.Wrangler) ([]map[string]string, error) { | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff_cmd.go:117") | |
keyspaces, err := wr.TopoServer().GetKeyspaces(shortCtx) | |
cancel() | |
if err != nil { | |
@@ -129,7 +130,7 @@ func shardsWithSources(ctx context.Context, wr *wrangler.Wrangler) ([]map[string | |
wg.Add(1) | |
go func(keyspace string) { | |
defer wg.Done() | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff_cmd.go:132") | |
shards, err := wr.TopoServer().GetShardNames(shortCtx, keyspace) | |
cancel() | |
if err != nil { | |
@@ -140,7 +141,7 @@ func shardsWithSources(ctx context.Context, wr *wrangler.Wrangler) ([]map[string | |
wg.Add(1) | |
go func(keyspace, shard string) { | |
defer wg.Done() | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff_cmd.go:143") | |
si, err := wr.TopoServer().GetShard(shortCtx, keyspace, shard) | |
cancel() | |
if err != nil { | |
diff --git a/go/vt/worker/tablet_provider.go b/go/vt/worker/tablet_provider.go | |
index 2d42bcb7f..47a7b950b 100644 | |
--- a/go/vt/worker/tablet_provider.go | |
+++ b/go/vt/worker/tablet_provider.go | |
@@ -19,6 +19,7 @@ package worker | |
import ( | |
"fmt" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/vterrors" | |
"golang.org/x/net/context" | |
@@ -57,7 +58,7 @@ func newSingleTabletProvider(ctx context.Context, ts *topo.Server, alias *topoda | |
} | |
func (p *singleTabletProvider) getTablet() (*topodatapb.Tablet, error) { | |
- shortCtx, cancel := context.WithTimeout(p.ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(p.ctx, *remoteActionsTimeout, "./go/vt/worker/tablet_provider.go:60") | |
tablet, err := p.ts.GetTablet(shortCtx, p.alias) | |
cancel() | |
if err != nil { | |
diff --git a/go/vt/worker/topo_utils.go b/go/vt/worker/topo_utils.go | |
index 4dcc52a1d..aa7b4a182 100644 | |
--- a/go/vt/worker/topo_utils.go | |
+++ b/go/vt/worker/topo_utils.go | |
@@ -22,6 +22,7 @@ import ( | |
"math/rand" | |
"time" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/vterrors" | |
"golang.org/x/net/context" | |
@@ -68,7 +69,7 @@ func FindHealthyTablet(ctx context.Context, wr *wrangler.Wrangler, tsc *discover | |
} | |
func waitForHealthyTablets(ctx context.Context, wr *wrangler.Wrangler, tsc *discovery.TabletStatsCache, cell, keyspace, shard string, minHealthyRdonlyTablets int, timeout time.Duration, tabletType topodatapb.TabletType) ([]discovery.TabletStats, error) { | |
- busywaitCtx, busywaitCancel := context.WithTimeout(ctx, timeout) | |
+ busywaitCtx, busywaitCancel := context2.WithTimeout(ctx, timeout, "./go/vt/worker/topo_utils.go:71") | |
defer busywaitCancel() | |
start := time.Now() | |
@@ -122,7 +123,7 @@ func FindWorkerTablet(ctx context.Context, wr *wrangler.Wrangler, cleaner *wrang | |
} | |
wr.Logger().Infof("Changing tablet %v to '%v'", topoproto.TabletAliasString(tabletAlias), topodatapb.TabletType_DRAINED) | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/topo_utils.go:125") | |
err = wr.ChangeSlaveType(shortCtx, tabletAlias, topodatapb.TabletType_DRAINED) | |
cancel() | |
if err != nil { | |
@@ -131,7 +132,7 @@ func FindWorkerTablet(ctx context.Context, wr *wrangler.Wrangler, cleaner *wrang | |
ourURL := servenv.ListeningURL.String() | |
wr.Logger().Infof("Adding tag[worker]=%v to tablet %v", ourURL, topoproto.TabletAliasString(tabletAlias)) | |
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/topo_utils.go:134") | |
_, err = wr.TopoServer().UpdateTabletFields(shortCtx, tabletAlias, func(tablet *topodatapb.Tablet) error { | |
if tablet.Tags == nil { | |
tablet.Tags = make(map[string]string) | |
@@ -154,7 +155,7 @@ func FindWorkerTablet(ctx context.Context, wr *wrangler.Wrangler, cleaner *wrang | |
wrangler.RecordChangeSlaveTypeAction(cleaner, tabletAlias, topodatapb.TabletType_DRAINED, tabletType) | |
// We refresh the destination vttablet reloads the worker URL when it reloads the tablet. | |
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/topo_utils.go:157") | |
wr.RefreshTabletState(shortCtx, tabletAlias) | |
if err != nil { | |
return nil, err | |
diff --git a/go/vt/worker/utils_test.go b/go/vt/worker/utils_test.go | |
index 2c0445299..a5fed7b27 100644 | |
--- a/go/vt/worker/utils_test.go | |
+++ b/go/vt/worker/utils_test.go | |
@@ -22,6 +22,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/mysql/fakesqldb" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/vt/log" | |
@@ -40,7 +41,7 @@ import ( | |
func runCommand(t *testing.T, wi *Instance, wr *wrangler.Wrangler, args []string) error { | |
// Limit the scope of the context e.g. to implicitly terminate stray Go | |
// routines. | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/worker/utils_test.go:43") | |
defer (func() { log.Infof("ctx cancel at go/vt/worker/utils_test.go:43\n"); cancel() })() | |
worker, done, err := wi.RunCommand(ctx, args, wr, false /* runFromCli */) | |
diff --git a/go/vt/worker/vertical_split_clone_cmd.go b/go/vt/worker/vertical_split_clone_cmd.go | |
index 7045fb3e6..0298ca959 100644 | |
--- a/go/vt/worker/vertical_split_clone_cmd.go | |
+++ b/go/vt/worker/vertical_split_clone_cmd.go | |
@@ -25,6 +25,7 @@ import ( | |
"strings" | |
"sync" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/proto/topodata" | |
"vitess.io/vitess/go/vt/vterrors" | |
@@ -152,7 +153,7 @@ func commandVerticalSplitClone(wi *Instance, wr *wrangler.Wrangler, subFlags *fl | |
// keyspacesWithServedFrom returns all the keyspaces that have ServedFrom set | |
// to one value. | |
func keyspacesWithServedFrom(ctx context.Context, wr *wrangler.Wrangler) ([]string, error) { | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_clone_cmd.go:155") | |
keyspaces, err := wr.TopoServer().GetKeyspaces(shortCtx) | |
cancel() | |
if err != nil { | |
@@ -167,7 +168,7 @@ func keyspacesWithServedFrom(ctx context.Context, wr *wrangler.Wrangler) ([]stri | |
wg.Add(1) | |
go func(keyspace string) { | |
defer wg.Done() | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_clone_cmd.go:170") | |
ki, err := wr.TopoServer().GetKeyspace(shortCtx, keyspace) | |
cancel() | |
if err != nil { | |
@@ -288,7 +289,7 @@ func interactiveVerticalSplitClone(ctx context.Context, wi *Instance, wr *wrangl | |
} | |
// Figure out the shard | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_clone_cmd.go:291") | |
shardMap, err := wr.TopoServer().FindAllShardsInKeyspace(shortCtx, keyspace) | |
cancel() | |
if err != nil { | |
diff --git a/go/vt/worker/vertical_split_diff.go b/go/vt/worker/vertical_split_diff.go | |
index 3d72c48b6..63868da73 100644 | |
--- a/go/vt/worker/vertical_split_diff.go | |
+++ b/go/vt/worker/vertical_split_diff.go | |
@@ -21,6 +21,7 @@ import ( | |
"html/template" | |
"sync" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/log" | |
"vitess.io/vitess/go/vt/vterrors" | |
@@ -262,7 +263,7 @@ func (vsdw *VerticalSplitDiffWorker) findTargets(ctx context.Context) error { | |
func (vsdw *VerticalSplitDiffWorker) synchronizeReplication(ctx context.Context) error { | |
vsdw.SetState(WorkerStateSyncReplication) | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_diff.go:265") | |
defer (func() { log.Infof("ctx cancel at go/vt/worker/vertical_split_diff.go:265\n"); cancel() })() | |
masterInfo, err := vsdw.wr.TopoServer().GetTablet(shortCtx, vsdw.shardInfo.MasterAlias) | |
if err != nil { | |
@@ -273,7 +274,7 @@ func (vsdw *VerticalSplitDiffWorker) synchronizeReplication(ctx context.Context) | |
// 1 - stop the master binlog replication, get its current position | |
vsdw.wr.Logger().Infof("Stopping master binlog replication on %v", topoproto.TabletAliasString(vsdw.shardInfo.MasterAlias)) | |
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_diff.go:276") | |
defer (func() { log.Infof("ctx cancel at go/vt/worker/vertical_split_diff.go:276\n"); cancel() })() | |
_, err = vsdw.wr.TabletManagerClient().VReplicationExec(shortCtx, masterInfo.Tablet, binlogplayer.StopVReplication(ss.Uid, "for split diff")) | |
if err != nil { | |
@@ -292,7 +293,7 @@ func (vsdw *VerticalSplitDiffWorker) synchronizeReplication(ctx context.Context) | |
// stop replication | |
vsdw.wr.Logger().Infof("Stopping slave %v at a minimum of %v", topoproto.TabletAliasString(vsdw.sourceAlias), vreplicationPos) | |
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_diff.go:295") | |
defer (func() { log.Infof("ctx cancel at go/vt/worker/vertical_split_diff.go:295\n"); cancel() })() | |
sourceTablet, err := vsdw.wr.TopoServer().GetTablet(shortCtx, vsdw.sourceAlias) | |
if err != nil { | |
@@ -310,7 +311,7 @@ func (vsdw *VerticalSplitDiffWorker) synchronizeReplication(ctx context.Context) | |
// 3 - ask the master of the destination shard to resume filtered | |
// replication up to the new list of positions | |
vsdw.wr.Logger().Infof("Restarting master %v until it catches up to %v", topoproto.TabletAliasString(vsdw.shardInfo.MasterAlias), mysqlPos) | |
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_diff.go:313") | |
defer (func() { log.Infof("ctx cancel at go/vt/worker/vertical_split_diff.go:313\n"); cancel() })() | |
_, err = vsdw.wr.TabletManagerClient().VReplicationExec(shortCtx, masterInfo.Tablet, binlogplayer.StartVReplicationUntil(ss.Uid, mysqlPos)) | |
if err != nil { | |
@@ -327,13 +328,13 @@ func (vsdw *VerticalSplitDiffWorker) synchronizeReplication(ctx context.Context) | |
// 4 - wait until the destination tablet is equal or passed | |
// that master binlog position, and stop its replication. | |
vsdw.wr.Logger().Infof("Waiting for destination tablet %v to catch up to %v", topoproto.TabletAliasString(vsdw.destinationAlias), masterPos) | |
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_diff.go:330") | |
defer (func() { log.Infof("ctx cancel at go/vt/worker/vertical_split_diff.go:330\n"); cancel() })() | |
destinationTablet, err := vsdw.wr.TopoServer().GetTablet(shortCtx, vsdw.destinationAlias) | |
if err != nil { | |
return err | |
} | |
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_diff.go:336") | |
defer (func() { log.Infof("ctx cancel at go/vt/worker/vertical_split_diff.go:336\n"); cancel() })() | |
_, err = vsdw.wr.TabletManagerClient().StopSlaveMinimum(shortCtx, destinationTablet.Tablet, masterPos, *remoteActionsTimeout) | |
if err != nil { | |
@@ -343,7 +344,7 @@ func (vsdw *VerticalSplitDiffWorker) synchronizeReplication(ctx context.Context) | |
// 5 - restart filtered replication on destination master | |
vsdw.wr.Logger().Infof("Restarting filtered replication on master %v", topoproto.TabletAliasString(vsdw.shardInfo.MasterAlias)) | |
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_diff.go:346") | |
defer (func() { log.Infof("ctx cancel at go/vt/worker/vertical_split_diff.go:346\n"); cancel() })() | |
if _, err = vsdw.wr.TabletManagerClient().VReplicationExec(shortCtx, masterInfo.Tablet, binlogplayer.StartVReplication(ss.Uid)); err != nil { | |
return vterrors.Wrapf(err, "VReplicationExec(start) failed for %v", vsdw.shardInfo.MasterAlias) | |
@@ -366,7 +367,7 @@ func (vsdw *VerticalSplitDiffWorker) diff(ctx context.Context) error { | |
wg.Add(1) | |
go func() { | |
var err error | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_diff.go:369") | |
vsdw.destinationSchemaDefinition, err = vsdw.wr.GetSchema( | |
shortCtx, vsdw.destinationAlias, vsdw.shardInfo.SourceShards[0].Tables, nil /* excludeTables */, false /* includeViews */) | |
cancel() | |
@@ -379,7 +380,7 @@ func (vsdw *VerticalSplitDiffWorker) diff(ctx context.Context) error { | |
wg.Add(1) | |
go func() { | |
var err error | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_diff.go:382") | |
vsdw.sourceSchemaDefinition, err = vsdw.wr.GetSchema( | |
shortCtx, vsdw.sourceAlias, vsdw.shardInfo.SourceShards[0].Tables, nil /* excludeTables */, false /* includeViews */) | |
cancel() | |
diff --git a/go/vt/worker/vertical_split_diff_cmd.go b/go/vt/worker/vertical_split_diff_cmd.go | |
index e4cfd4d34..cc997b9d1 100644 | |
--- a/go/vt/worker/vertical_split_diff_cmd.go | |
+++ b/go/vt/worker/vertical_split_diff_cmd.go | |
@@ -24,6 +24,7 @@ import ( | |
"strconv" | |
"sync" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/vterrors" | |
"golang.org/x/net/context" | |
@@ -101,7 +102,7 @@ func commandVerticalSplitDiff(wi *Instance, wr *wrangler.Wrangler, subFlags *fla | |
// shardsWithTablesSources returns all the shards that have SourceShards set | |
// to one value, with an array of Tables. | |
func shardsWithTablesSources(ctx context.Context, wr *wrangler.Wrangler) ([]map[string]string, error) { | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_diff_cmd.go:104") | |
keyspaces, err := wr.TopoServer().GetKeyspaces(shortCtx) | |
cancel() | |
if err != nil { | |
@@ -116,7 +117,7 @@ func shardsWithTablesSources(ctx context.Context, wr *wrangler.Wrangler) ([]map[ | |
wg.Add(1) | |
go func(keyspace string) { | |
defer wg.Done() | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_diff_cmd.go:119") | |
shards, err := wr.TopoServer().GetShardNames(shortCtx, keyspace) | |
cancel() | |
if err != nil { | |
@@ -127,7 +128,7 @@ func shardsWithTablesSources(ctx context.Context, wr *wrangler.Wrangler) ([]map[ | |
wg.Add(1) | |
go func(keyspace, shard string) { | |
defer wg.Done() | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_diff_cmd.go:130") | |
si, err := wr.TopoServer().GetShard(shortCtx, keyspace, shard) | |
cancel() | |
if err != nil { | |
diff --git a/go/vt/worker/vtworkerclienttest/client_testsuite.go b/go/vt/worker/vtworkerclienttest/client_testsuite.go | |
index b5ff551d5..1ac4b3bff 100644 | |
--- a/go/vt/worker/vtworkerclienttest/client_testsuite.go | |
+++ b/go/vt/worker/vtworkerclienttest/client_testsuite.go | |
@@ -34,6 +34,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/logutil" | |
"vitess.io/vitess/go/vt/topo/memorytopo" | |
"vitess.io/vitess/go/vt/vterrors" | |
@@ -123,7 +124,7 @@ func runVtworkerCommand(client vtworkerclient.Client, args []string) error { | |
func commandErrorsBecauseBusy(t *testing.T, client vtworkerclient.Client, serverSideCancelation bool) { | |
// Run the vtworker "Block" command which blocks until we cancel the context. | |
var wg sync.WaitGroup | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/worker/vtworkerclienttest/client_testsuite.go:126") | |
// blockCommandStarted will be closed after we're sure that vtworker is | |
// running the "Block" command. | |
blockCommandStarted := make(chan struct{}) | |
diff --git a/go/vt/workflow/manager.go b/go/vt/workflow/manager.go | |
index a75d2aa1b..9cfc16be0 100644 | |
--- a/go/vt/workflow/manager.go | |
+++ b/go/vt/workflow/manager.go | |
@@ -26,6 +26,7 @@ import ( | |
gouuid "github.com/pborman/uuid" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/log" | |
"vitess.io/vitess/go/vt/topo" | |
@@ -352,7 +353,7 @@ func (m *Manager) Start(ctx context.Context, uuid string) error { | |
func (m *Manager) runWorkflow(rw *runningWorkflow) { | |
// Create a context to run it. | |
var ctx context.Context | |
- ctx, rw.cancel = context.WithCancel(m.ctx) | |
+ ctx, rw.cancel = context2.WithCancel(m.ctx, "./go/vt/workflow/manager.go:355") | |
// And run it in the background. | |
go m.executeWorkflowRun(ctx, rw) | |
@@ -591,7 +592,7 @@ func AvailableFactories() map[string]bool { | |
// StartManager starts a manager. This function should only be used for tests purposes. | |
func StartManager(m *Manager) (*sync.WaitGroup, context.Context, context.CancelFunc) { | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/workflow/manager.go:594") | |
wg := &sync.WaitGroup{} | |
wg.Add(1) | |
go func() { | |
diff --git a/go/vt/wrangler/cleaner.go b/go/vt/wrangler/cleaner.go | |
index e40b9c08d..837d20a88 100644 | |
--- a/go/vt/wrangler/cleaner.go | |
+++ b/go/vt/wrangler/cleaner.go | |
@@ -22,6 +22,7 @@ import ( | |
"time" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/concurrency" | |
"vitess.io/vitess/go/vt/topo" | |
"vitess.io/vitess/go/vt/topo/topoproto" | |
@@ -96,7 +97,7 @@ type cleanUpHelper struct { | |
// basis. They are then serialized on each target. | |
func (cleaner *Cleaner) CleanUp(wr *Wrangler) error { | |
// we use a background context so we're not dependent on the original context timeout | |
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), 5*time.Minute, "./go/vt/wrangler/cleaner.go:99") | |
actionMap := make(map[string]*cleanUpHelper) | |
rec := concurrency.AllErrorRecorder{} | |
cleaner.mu.Lock() | |
diff --git a/go/vt/wrangler/fake_tablet_test.go b/go/vt/wrangler/fake_tablet_test.go | |
index 85adabf6c..fc5ada634 100644 | |
--- a/go/vt/wrangler/fake_tablet_test.go | |
+++ b/go/vt/wrangler/fake_tablet_test.go | |
@@ -25,6 +25,7 @@ import ( | |
"golang.org/x/net/context" | |
"google.golang.org/grpc" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/mysql/fakesqldb" | |
"vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon" | |
querypb "vitess.io/vitess/go/vt/proto/query" | |
@@ -183,7 +184,7 @@ func (ft *fakeTablet) StartActionLoop(t *testing.T, wr *Wrangler) { | |
step := 10 * time.Millisecond | |
c := tmclient.NewTabletManagerClient() | |
for timeout >= 0 { | |
- ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), 1*time.Second, "./go/vt/wrangler/fake_tablet_test.go:186") | |
err := c.Ping(ctx, ft.Agent.Tablet()) | |
cancel() | |
if err == nil { | |
diff --git a/go/vt/wrangler/keyspace.go b/go/vt/wrangler/keyspace.go | |
index e94ee2f68..d94420637 100644 | |
--- a/go/vt/wrangler/keyspace.go | |
+++ b/go/vt/wrangler/keyspace.go | |
@@ -25,6 +25,7 @@ import ( | |
"time" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/event" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/vt/binlog/binlogplayer" | |
@@ -496,7 +497,7 @@ func (wr *Wrangler) waitForFilteredReplication(ctx context.Context, sourcePositi | |
wg.Add(1) | |
go func(si *topo.ShardInfo) { | |
defer wg.Done() | |
- ctx, cancel := context.WithTimeout(ctx, waitTime) | |
+ ctx, cancel := context2.WithTimeout(ctx, waitTime, "./go/vt/wrangler/keyspace.go:499") | |
defer (func() { log.Infof("ctx cancel at go/vt/wrangler/keyspace.go:499\n"); cancel() })() | |
var pos string | |
@@ -1233,7 +1234,7 @@ func (wr *Wrangler) replicaMigrateServedFrom(ctx context.Context, ki *topo.Keysp | |
// replication and starts accepting writes | |
func (wr *Wrangler) masterMigrateServedFrom(ctx context.Context, ki *topo.KeyspaceInfo, sourceShard *topo.ShardInfo, destinationShard *topo.ShardInfo, tables []string, ev *events.MigrateServedFrom, filteredReplicationWaitTime time.Duration) error { | |
// Read the data we need | |
- ctx, cancel := context.WithTimeout(ctx, filteredReplicationWaitTime) | |
+ ctx, cancel := context2.WithTimeout(ctx, filteredReplicationWaitTime, "./go/vt/wrangler/keyspace.go:1236") | |
defer (func() { log.Infof("ctx cancel at go/vt/wrangler/keyspace.go:1236\n"); cancel() })() | |
sourceMasterTabletInfo, err := wr.ts.GetTablet(ctx, sourceShard.MasterAlias) | |
if err != nil { | |
@@ -1358,7 +1359,7 @@ func (wr *Wrangler) RefreshTabletsByShard(ctx context.Context, si *topo.ShardInf | |
// Setting an upper bound timeout to fail faster in case of an error. | |
// Using 60 seconds because RefreshState should not take more than 30 seconds. | |
// (RefreshState will restart the tablet's QueryService and most time will be spent on the shutdown, i.e. waiting up to 30 seconds on transactions (see Config.TransactionTimeout)). | |
- ctx, cancel := context.WithTimeout(ctx, 60*time.Second) | |
+ ctx, cancel := context2.WithTimeout(ctx, 60*time.Second, "./go/vt/wrangler/keyspace.go:1361") | |
if err := wr.tmc.RefreshState(ctx, ti.Tablet); err != nil { | |
wr.Logger().Warningf("RefreshTabletsByShard: failed to refresh %v: %v", ti.AliasString(), err) | |
} | |
diff --git a/go/vt/wrangler/migrater.go b/go/vt/wrangler/migrater.go | |
index f0a9cde00..b25252824 100644 | |
--- a/go/vt/wrangler/migrater.go | |
+++ b/go/vt/wrangler/migrater.go | |
@@ -28,6 +28,7 @@ import ( | |
"github.com/golang/protobuf/proto" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/sync2" | |
"vitess.io/vitess/go/vt/binlog/binlogplayer" | |
@@ -506,7 +507,7 @@ func (mi *migrater) changeTableSourceWrites(ctx context.Context, access accessTy | |
} | |
func (mi *migrater) waitForCatchup(ctx context.Context, filteredReplicationWaitTime time.Duration) error { | |
- ctx, cancel := context.WithTimeout(ctx, filteredReplicationWaitTime) | |
+ ctx, cancel := context2.WithTimeout(ctx, filteredReplicationWaitTime, "./go/vt/wrangler/migrater.go:509") | |
defer (func() { log.Infof("ctx cancel at go/vt/wrangler/migrater.go:506\n"); cancel() })() | |
var mu sync.Mutex | |
diff --git a/go/vt/wrangler/reparent.go b/go/vt/wrangler/reparent.go | |
index 846e8ea62..70000d31c 100644 | |
--- a/go/vt/wrangler/reparent.go | |
+++ b/go/vt/wrangler/reparent.go | |
@@ -26,6 +26,7 @@ import ( | |
"sync" | |
"time" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/event" | |
"vitess.io/vitess/go/mysql" | |
"vitess.io/vitess/go/sqlescape" | |
@@ -206,7 +207,7 @@ func (wr *Wrangler) initShardMasterLocked(ctx context.Context, ev *events.Repare | |
// at a wrong replication spot. | |
// Create a context for the following RPCs that respects waitSlaveTimeout | |
- resetCtx, resetCancel := context.WithTimeout(ctx, waitSlaveTimeout) | |
+ resetCtx, resetCancel := context2.WithTimeout(ctx, waitSlaveTimeout, "./go/vt/wrangler/reparent.go:209") | |
defer resetCancel() | |
event.DispatchUpdate(ev, "resetting replication on all tablets") | |
@@ -249,7 +250,7 @@ func (wr *Wrangler) initShardMasterLocked(ctx context.Context, ev *events.Repare | |
// Create a cancelable context for the following RPCs. | |
// If error conditions happen, we can cancel all outgoing RPCs. | |
- replCtx, replCancel := context.WithTimeout(ctx, waitSlaveTimeout) | |
+ replCtx, replCancel := context2.WithTimeout(ctx, waitSlaveTimeout, "./go/vt/wrangler/reparent.go:252") | |
defer replCancel() | |
// Now tell the new master to insert the reparent_journal row, | |
@@ -414,7 +415,7 @@ func (wr *Wrangler) plannedReparentShardLocked(ctx context.Context, ev *events.R | |
ev.OldMaster = *oldMasterTabletInfo.Tablet | |
// create a new context for the short running remote operations | |
- remoteCtx, remoteCancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout) | |
+ remoteCtx, remoteCancel := context2.WithTimeout(ctx, *topo.RemoteOperationTimeout, "./go/vt/wrangler/reparent.go:417") | |
defer remoteCancel() | |
// Demote the current master, get its replication position | |
@@ -425,7 +426,7 @@ func (wr *Wrangler) plannedReparentShardLocked(ctx context.Context, ev *events.R | |
return fmt.Errorf("old master tablet %v DemoteMaster failed: %v", topoproto.TabletAliasString(shardInfo.MasterAlias), err) | |
} | |
- remoteCtx, remoteCancel = context.WithTimeout(ctx, waitSlaveTimeout) | |
+ remoteCtx, remoteCancel = context2.WithTimeout(ctx, waitSlaveTimeout, "./go/vt/wrangler/reparent.go:428") | |
defer remoteCancel() | |
// Wait on the master-elect tablet until it reaches that position, | |
@@ -436,7 +437,7 @@ func (wr *Wrangler) plannedReparentShardLocked(ctx context.Context, ev *events.R | |
if err != nil || (ctx.Err() != nil && ctx.Err() == context.DeadlineExceeded) { | |
remoteCancel() | |
// if this fails it is not enough to return an error. we should rollback all the changes made by DemoteMaster | |
- remoteCtx, remoteCancel = context.WithTimeout(ctx, *topo.RemoteOperationTimeout) | |
+ remoteCtx, remoteCancel = context2.WithTimeout(ctx, *topo.RemoteOperationTimeout, "./go/vt/wrangler/reparent.go:439") | |
defer remoteCancel() | |
if err1 := wr.tmc.UndoDemoteMaster(remoteCtx, oldMasterTabletInfo.Tablet); err1 != nil { | |
log.Warningf("Encountered error %v while trying to undo DemoteMaster", err1) | |
@@ -451,7 +452,7 @@ func (wr *Wrangler) plannedReparentShardLocked(ctx context.Context, ev *events.R | |
// Create a cancelable context for the following RPCs. | |
// If error conditions happen, we can cancel all outgoing RPCs. | |
- replCtx, replCancel := context.WithTimeout(ctx, waitSlaveTimeout) | |
+ replCtx, replCancel := context2.WithTimeout(ctx, waitSlaveTimeout, "./go/vt/wrangler/reparent.go:454") | |
defer replCancel() | |
// Go through all the tablets: | |
@@ -533,7 +534,7 @@ func (maxPosSearch *maxReplPosSearch) processTablet(tablet *topodatapb.Tablet) { | |
defer maxPosSearch.waitGroup.Done() | |
maxPosSearch.wrangler.logger.Infof("getting replication position from %v", topoproto.TabletAliasString(tablet.Alias)) | |
- slaveStatusCtx, cancelSlaveStatus := context.WithTimeout(maxPosSearch.ctx, maxPosSearch.waitSlaveTimeout) | |
+ slaveStatusCtx, cancelSlaveStatus := context2.WithTimeout(maxPosSearch.ctx, maxPosSearch.waitSlaveTimeout, "./go/vt/wrangler/reparent.go:536") | |
defer (func() { log.Infof("ctx cancel at go/vt/wrangler/reparent.go:537\n"); cancelSlaveStatus() })() | |
status, err := maxPosSearch.wrangler.tmc.SlaveStatus(slaveStatusCtx, tablet) | |
@@ -668,7 +669,7 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events | |
ev.OldMaster = *oldMasterTabletInfo.Tablet | |
wr.logger.Infof("deleting old master %v", shardInfoMasterAliasStr) | |
- ctx, cancel := context.WithTimeout(ctx, waitSlaveTimeout) | |
+ ctx, cancel := context2.WithTimeout(ctx, waitSlaveTimeout, "./go/vt/wrangler/reparent.go:671") | |
defer (func() { log.Infof("ctx cancel at go/vt/wrangler/reparent.go:672\n"); cancel() })() | |
if err := topotools.DeleteTablet(ctx, wr.ts, oldMasterTabletInfo.Tablet); err != nil { | |
@@ -688,7 +689,7 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events | |
go func(alias string, tabletInfo *topo.TabletInfo) { | |
defer wg.Done() | |
wr.logger.Infof("getting replication position from %v", alias) | |
- ctx, cancel := context.WithTimeout(ctx, waitSlaveTimeout) | |
+ ctx, cancel := context2.WithTimeout(ctx, waitSlaveTimeout, "./go/vt/wrangler/reparent.go:691") | |
defer (func() { log.Infof("ctx cancel at go/vt/wrangler/reparent.go:692\n"); cancel() })() | |
rp, err := wr.tmc.StopReplicationAndGetStatus(ctx, tabletInfo.Tablet) | |
if err != nil { | |
@@ -744,7 +745,7 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events | |
// Create a cancelable context for the following RPCs. | |
// If error conditions happen, we can cancel all outgoing RPCs. | |
- replCtx, replCancel := context.WithCancel(ctx) | |
+ replCtx, replCancel := context2.WithCancel(ctx, "./go/vt/wrangler/reparent.go:747") | |
defer replCancel() | |
// Reset replication on all slaves to point to the new master, and | |
diff --git a/go/vt/wrangler/schema.go b/go/vt/wrangler/schema.go | |
index f209b1e50..261f40b0f 100644 | |
--- a/go/vt/wrangler/schema.go | |
+++ b/go/vt/wrangler/schema.go | |
@@ -26,6 +26,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/sync2" | |
"vitess.io/vitess/go/vt/concurrency" | |
@@ -359,7 +360,7 @@ func (wr *Wrangler) CopySchemaShard(ctx context.Context, sourceTabletAlias *topo | |
// Notify slaves to reload schema. This is best-effort. | |
concurrency := sync2.NewSemaphore(10, 0) | |
- reloadCtx, cancel := context.WithTimeout(ctx, waitSlaveTimeout) | |
+ reloadCtx, cancel := context2.WithTimeout(ctx, waitSlaveTimeout, "./go/vt/wrangler/schema.go:362") | |
defer (func() { log.Infof("ctx cancel at go/vt/wrangler/schema.go:363\n"); cancel() })() | |
wr.ReloadSchemaShard(reloadCtx, destKeyspace, destShard, destMasterPos, concurrency, true /* includeMaster */) | |
return nil | |
@@ -435,7 +436,7 @@ func (wr *Wrangler) applySQLShard(ctx context.Context, tabletInfo *topo.TabletIn | |
if err != nil { | |
return fmt.Errorf("fillStringTemplate failed: %v", err) | |
} | |
- ctx, cancel := context.WithTimeout(ctx, 30*time.Second) | |
+ ctx, cancel := context2.WithTimeout(ctx, 30*time.Second, "./go/vt/wrangler/schema.go:438") | |
defer (func() { log.Infof("ctx cancel at go/vt/wrangler/schema.go:439\n"); cancel() })() | |
// Need to make sure that we enable binlog, since we're only applying the statement on masters. | |
_, err = wr.tmc.ExecuteFetchAsDba(ctx, tabletInfo.Tablet, false, []byte(filledChange), 0, false, reloadSchema) | |
diff --git a/go/vt/wrangler/testlib/fake_tablet.go b/go/vt/wrangler/testlib/fake_tablet.go | |
index 75766e27d..557ee2ad0 100644 | |
--- a/go/vt/wrangler/testlib/fake_tablet.go | |
+++ b/go/vt/wrangler/testlib/fake_tablet.go | |
@@ -30,6 +30,7 @@ import ( | |
"golang.org/x/net/context" | |
"google.golang.org/grpc" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/mysql/fakesqldb" | |
"vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon" | |
"vitess.io/vitess/go/vt/topo" | |
@@ -206,7 +207,7 @@ func (ft *FakeTablet) StartActionLoop(t *testing.T, wr *wrangler.Wrangler) { | |
step := 10 * time.Millisecond | |
c := tmclient.NewTabletManagerClient() | |
for timeout >= 0 { | |
- ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), 1*time.Second, "./go/vt/wrangler/testlib/fake_tablet.go:209") | |
err := c.Ping(ctx, ft.Agent.Tablet()) | |
cancel() | |
if err == nil { |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment