Created
September 17, 2019 23:07
-
-
Save dasl-/d0b3df502a7208c005ee46c4bd78afff to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/go/cmd/mysqlctl/mysqlctl.go b/go/cmd/mysqlctl/mysqlctl.go | |
index 2f39206a0..bf051d7e4 100644 | |
--- a/go/cmd/mysqlctl/mysqlctl.go | |
+++ b/go/cmd/mysqlctl/mysqlctl.go | |
@@ -25,6 +25,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/exit" | |
"vitess.io/vitess/go/flagutil" | |
"vitess.io/vitess/go/mysql" | |
@@ -71,7 +72,7 @@ func initCmd(subFlags *flag.FlagSet, args []string) error { | |
} | |
defer mysqld.Close() | |
- ctx, cancel := context.WithTimeout(context.Background(), *waitTime) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), *waitTime, "./go/cmd/mysqlctl/mysqlctl.go:74") | |
defer cancel() | |
if err := mysqld.Init(ctx, cnf, *initDBSQLFile); err != nil { | |
return fmt.Errorf("failed init mysql: %v", err) | |
@@ -104,7 +105,7 @@ func shutdownCmd(subFlags *flag.FlagSet, args []string) error { | |
} | |
defer mysqld.Close() | |
- ctx, cancel := context.WithTimeout(context.Background(), *waitTime) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), *waitTime, "./go/cmd/mysqlctl/mysqlctl.go:107") | |
defer cancel() | |
if err := mysqld.Shutdown(ctx, cnf, true); err != nil { | |
return fmt.Errorf("failed shutdown mysql: %v", err) | |
@@ -125,7 +126,7 @@ func startCmd(subFlags *flag.FlagSet, args []string) error { | |
} | |
defer mysqld.Close() | |
- ctx, cancel := context.WithTimeout(context.Background(), *waitTime) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), *waitTime, "./go/cmd/mysqlctl/mysqlctl.go:128") | |
defer cancel() | |
if err := mysqld.Start(ctx, cnf, mysqldArgs...); err != nil { | |
return fmt.Errorf("failed start mysql: %v", err) | |
@@ -145,7 +146,7 @@ func teardownCmd(subFlags *flag.FlagSet, args []string) error { | |
} | |
defer mysqld.Close() | |
- ctx, cancel := context.WithTimeout(context.Background(), *waitTime) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), *waitTime, "./go/cmd/mysqlctl/mysqlctl.go:148") | |
defer cancel() | |
if err := mysqld.Teardown(ctx, cnf, *force); err != nil { | |
return fmt.Errorf("failed teardown mysql (forced? %v): %v", *force, err) | |
diff --git a/go/cmd/mysqlctld/mysqlctld.go b/go/cmd/mysqlctld/mysqlctld.go | |
index 5c17ea813..8cafcb6fd 100644 | |
--- a/go/cmd/mysqlctld/mysqlctld.go | |
+++ b/go/cmd/mysqlctld/mysqlctld.go | |
@@ -25,6 +25,7 @@ import ( | |
"time" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/exit" | |
"vitess.io/vitess/go/vt/dbconfigs" | |
"vitess.io/vitess/go/vt/log" | |
@@ -68,7 +69,7 @@ func main() { | |
} | |
// Start or Init mysqld as needed. | |
- ctx, cancel := context.WithTimeout(context.Background(), *waitTime) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), *waitTime, "./go/cmd/mysqlctld/mysqlctld.go:71") | |
mycnfFile := mysqlctl.MycnfFile(uint32(*tabletUID)) | |
if _, statErr := os.Stat(mycnfFile); os.IsNotExist(statErr) { | |
// Generate my.cnf from scratch and use it to find mysqld. | |
diff --git a/go/cmd/vtbackup/vtbackup.go b/go/cmd/vtbackup/vtbackup.go | |
index 64ef47a96..7c8089105 100644 | |
--- a/go/cmd/vtbackup/vtbackup.go | |
+++ b/go/cmd/vtbackup/vtbackup.go | |
@@ -68,6 +68,7 @@ import ( | |
"strings" | |
"time" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/exit" | |
"vitess.io/vitess/go/mysql" | |
"vitess.io/vitess/go/sqlescape" | |
@@ -200,7 +201,7 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back | |
if err != nil { | |
return fmt.Errorf("failed to initialize mysql config: %v", err) | |
} | |
- initCtx, initCancel := context.WithTimeout(ctx, *mysqlTimeout) | |
+ initCtx, initCancel := context2.WithTimeout(ctx, *mysqlTimeout, "./go/cmd/vtbackup/vtbackup.go:203") | |
defer initCancel() | |
if err := mysqld.Init(initCtx, mycnf, *initDBSQLFile); err != nil { | |
return fmt.Errorf("failed to initialize mysql data dir and start mysqld: %v", err) | |
@@ -209,7 +210,7 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back | |
defer func() { | |
// Be careful not to use the original context, because we don't want to | |
// skip shutdown just because we timed out waiting for other things. | |
- ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), 30*time.Second, "./go/cmd/vtbackup/vtbackup.go:212") | |
defer cancel() | |
mysqld.Shutdown(ctx, mycnf, false) | |
}() | |
diff --git a/go/cmd/vtbench/vtbench.go b/go/cmd/vtbench/vtbench.go | |
index 348299542..a7a548bb9 100644 | |
--- a/go/cmd/vtbench/vtbench.go | |
+++ b/go/cmd/vtbench/vtbench.go | |
@@ -23,6 +23,7 @@ import ( | |
"strings" | |
"time" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/exit" | |
"vitess.io/vitess/go/vt/dbconfigs" | |
"vitess.io/vitess/go/vt/log" | |
@@ -156,7 +157,7 @@ func main() { | |
b := vtbench.NewBench(*threads, *count, connParams, *sql) | |
- ctx, cancel := context.WithTimeout(context.Background(), *deadline) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), *deadline, "./go/cmd/vtbench/vtbench.go:159") | |
defer cancel() | |
fmt.Printf("Initializing test with %s protocol / %d threads / %d iterations\n", | |
diff --git a/go/cmd/vtclient/vtclient.go b/go/cmd/vtclient/vtclient.go | |
index 87c6c4dc4..d89c3e3c1 100644 | |
--- a/go/cmd/vtclient/vtclient.go | |
+++ b/go/cmd/vtclient/vtclient.go | |
@@ -30,6 +30,7 @@ import ( | |
"time" | |
"github.com/olekukonko/tablewriter" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/concurrency" | |
"vitess.io/vitess/go/vt/log" | |
"vitess.io/vitess/go/vt/logutil" | |
@@ -183,7 +184,7 @@ func run() (*results, error) { | |
log.Infof("Sending the query...") | |
- ctx, cancel := context.WithTimeout(context.Background(), *timeout) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), *timeout, "./go/cmd/vtclient/vtclient.go:186") | |
defer cancel() | |
return execMulti(ctx, db, args[0]) | |
} | |
diff --git a/go/cmd/vtctl/vtctl.go b/go/cmd/vtctl/vtctl.go | |
index 8365b9c0a..b8ebf5b4e 100644 | |
--- a/go/cmd/vtctl/vtctl.go | |
+++ b/go/cmd/vtctl/vtctl.go | |
@@ -27,6 +27,7 @@ import ( | |
"time" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/exit" | |
"vitess.io/vitess/go/trace" | |
"vitess.io/vitess/go/vt/log" | |
@@ -91,7 +92,7 @@ func main() { | |
vtctl.WorkflowManager = workflow.NewManager(ts) | |
- ctx, cancel := context.WithTimeout(context.Background(), *waitTime) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), *waitTime, "./go/cmd/vtctl/vtctl.go:94") | |
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient()) | |
installSignalHandlers(cancel) | |
diff --git a/go/cmd/vtctlclient/main.go b/go/cmd/vtctlclient/main.go | |
index f93696326..2279561b8 100644 | |
--- a/go/cmd/vtctlclient/main.go | |
+++ b/go/cmd/vtctlclient/main.go | |
@@ -23,6 +23,7 @@ import ( | |
"time" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/exit" | |
"vitess.io/vitess/go/trace" | |
"vitess.io/vitess/go/vt/log" | |
@@ -55,7 +56,7 @@ func main() { | |
os.Exit(1) | |
} | |
- ctx, cancel := context.WithTimeout(context.Background(), *actionTimeout) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), *actionTimeout, "./go/cmd/vtctlclient/main.go:58") | |
defer cancel() | |
err := vtctlclient.RunCommandAndWait( | |
diff --git a/go/cmd/vtworkerclient/vtworkerclient.go b/go/cmd/vtworkerclient/vtworkerclient.go | |
index 0e1874453..59f434c9a 100644 | |
--- a/go/cmd/vtworkerclient/vtworkerclient.go | |
+++ b/go/cmd/vtworkerclient/vtworkerclient.go | |
@@ -23,6 +23,7 @@ import ( | |
"syscall" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/log" | |
"vitess.io/vitess/go/vt/logutil" | |
"vitess.io/vitess/go/vt/worker/vtworkerclient" | |
@@ -37,7 +38,7 @@ var ( | |
func main() { | |
flag.Parse() | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/cmd/vtworkerclient/vtworkerclient.go:40") | |
defer cancel() | |
sigChan := make(chan os.Signal, 1) | |
signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT) | |
diff --git a/go/cmd/zk/zkcmd.go b/go/cmd/zk/zkcmd.go | |
index c6cf45e6b..31663c0c0 100644 | |
--- a/go/cmd/zk/zkcmd.go | |
+++ b/go/cmd/zk/zkcmd.go | |
@@ -36,6 +36,7 @@ import ( | |
"golang.org/x/crypto/ssh/terminal" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/exit" | |
"vitess.io/vitess/go/vt/log" | |
"vitess.io/vitess/go/vt/logutil" | |
@@ -157,7 +158,7 @@ func main() { | |
subFlags := flag.NewFlagSet(cmdName, flag.ExitOnError) | |
// Create a context for the command, cancel it if we get a signal. | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/cmd/zk/zkcmd.go:160") | |
sigRecv := make(chan os.Signal, 1) | |
signal.Notify(sigRecv, os.Interrupt) | |
go func() { | |
diff --git a/go/context2/context.go b/go/context2/context.go | |
new file mode 100644 | |
index 000000000..b4a03a28d | |
--- /dev/null | |
+++ b/go/context2/context.go | |
@@ -0,0 +1,40 @@ | |
+package context2 | |
+ | |
+import ( | |
+ "context" | |
+ "time" | |
+ | |
+ "vitess.io/vitess/go/context2" | |
+ "vitess.io/vitess/go/vt/log" | |
+) | |
+ | |
+type Context2 struct { | |
+ context.Context | |
+ reason string | |
+} | |
+ | |
+func WithCancel(parent context.Context, reason string) (context.Context, context.CancelFunc) { | |
+ ctx, cancel := context2.WithCancel(parent, "./go/context2/context.go:15") | |
+ ctx2 := Context2{ctx, reason} | |
+ return ctx2, cancel | |
+} | |
+ | |
+func WithDeadline(parent context.Context, d time.Time, reason string) (context.Context, context.CancelFunc) { | |
+ ctx, cancel := context2.WithDeadline(parent, d, "./go/context2/context.go:21") | |
+ ctx2 := Context2{ctx, reason} | |
+ return ctx2, cancel | |
+} | |
+ | |
+func WithTimeout(parent context.Context, timeout time.Duration, reason string) (context.Context, context.CancelFunc) { | |
+ ctx, cancel := context2.WithTimeout(parent, timeout, "./go/context2/context.go:27") | |
+ ctx2 := Context2{ctx, reason} | |
+ return ctx2, cancel | |
+} | |
+ | |
+func (c Context2) Err() error { | |
+ err := c.Context.Err() | |
+ if err != nil { | |
+ log.Infof("Context errored due to: %s", c.reason) | |
+ } | |
+ return err | |
+} | |
diff --git a/go/mysql/client_test.go b/go/mysql/client_test.go | |
index 13d25073e..3fa8aea13 100644 | |
--- a/go/mysql/client_test.go | |
+++ b/go/mysql/client_test.go | |
@@ -26,6 +26,7 @@ import ( | |
"time" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
) | |
// assertSQLError makes sure we get the right error. | |
@@ -71,7 +72,7 @@ func TestConnectTimeout(t *testing.T) { | |
defer listener.Close() | |
// Test that canceling the context really interrupts the Connect. | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/mysql/client_test.go:74") | |
done := make(chan struct{}) | |
go func() { | |
_, err := Connect(ctx, params) | |
@@ -85,7 +86,7 @@ func TestConnectTimeout(t *testing.T) { | |
<-done | |
// Tests a connection timeout works. | |
- ctx, cancel = context.WithTimeout(context.Background(), 100*time.Millisecond) | |
+ ctx, cancel = context2.WithTimeout(context.Background(), 100*time.Millisecond, "./go/mysql/client_test.go:88") | |
_, err = Connect(ctx, params) | |
cancel() | |
if err != context.DeadlineExceeded { | |
diff --git a/go/mysql/server_test.go b/go/mysql/server_test.go | |
index 49456128c..71d4e1199 100644 | |
--- a/go/mysql/server_test.go | |
+++ b/go/mysql/server_test.go | |
@@ -29,6 +29,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/sqltypes" | |
vtenv "vitess.io/vitess/go/vt/env" | |
"vitess.io/vitess/go/vt/tlstest" | |
@@ -1246,7 +1247,7 @@ func TestListenerShutdown(t *testing.T) { | |
Pass: "password1", | |
} | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/mysql/server_test.go:1249") | |
defer cancel() | |
conn, err := Connect(ctx, params) | |
diff --git a/go/pools/resource_pool.go b/go/pools/resource_pool.go | |
index 945393c79..ab90a2adb 100644 | |
--- a/go/pools/resource_pool.go | |
+++ b/go/pools/resource_pool.go | |
@@ -26,6 +26,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/sync2" | |
"vitess.io/vitess/go/timer" | |
"vitess.io/vitess/go/trace" | |
@@ -101,7 +102,7 @@ func NewResourcePool(factory Factory, capacity, maxCap int, idleTimeout time.Dur | |
rp.resources <- resourceWrapper{} | |
} | |
- ctx, cancel := context.WithTimeout(context.TODO(), prefillTimeout) | |
+ ctx, cancel := context2.WithTimeout(context.TODO(), prefillTimeout, "./go/pools/resource_pool.go:104") | |
defer cancel() | |
if prefillParallelism != 0 { | |
sem := sync2.NewSemaphore(prefillParallelism, 0 /* timeout */) | |
diff --git a/go/pools/resource_pool_test.go b/go/pools/resource_pool_test.go | |
index f9a7c9ac3..13ac38c0b 100644 | |
--- a/go/pools/resource_pool_test.go | |
+++ b/go/pools/resource_pool_test.go | |
@@ -22,6 +22,7 @@ import ( | |
"time" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/sync2" | |
) | |
@@ -618,7 +619,7 @@ func TestTimeout(t *testing.T) { | |
if err != nil { | |
t.Fatal(err) | |
} | |
- newctx, cancel := context.WithTimeout(ctx, 1*time.Millisecond) | |
+ newctx, cancel := context2.WithTimeout(ctx, 1*time.Millisecond, "./go/pools/resource_pool_test.go:621") | |
_, err = p.Get(newctx) | |
cancel() | |
want := "resource pool timed out" | |
@@ -633,7 +634,7 @@ func TestExpired(t *testing.T) { | |
count.Set(0) | |
p := NewResourcePool(PoolFactory, 1, 1, time.Second, 0) | |
defer p.Close() | |
- ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(-1*time.Second)) | |
+ ctx, cancel := context2.WithDeadline(context.Background(), time.Now().Add(-1*time.Second), "./go/pools/resource_pool_test.go:636") | |
r, err := p.Get(ctx) | |
if err == nil { | |
p.Put(r) | |
diff --git a/go/vt/binlog/binlog_streamer_test.go b/go/vt/binlog/binlog_streamer_test.go | |
index 576e7b562..9a10fb38d 100644 | |
--- a/go/vt/binlog/binlog_streamer_test.go | |
+++ b/go/vt/binlog/binlog_streamer_test.go | |
@@ -26,6 +26,7 @@ import ( | |
"github.com/golang/protobuf/proto" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/mysql" | |
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" | |
@@ -193,7 +194,7 @@ func TestStreamerStop(t *testing.T) { | |
bls := NewStreamer(&mysql.ConnParams{DbName: "vt_test_keyspace"}, nil, nil, mysql.Position{}, 0, sendTransaction) | |
// Start parseEvents(), but don't send it anything, so it just waits. | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/binlog/binlog_streamer_test.go:196") | |
done := make(chan error) | |
go func() { | |
_, err := bls.parseEvents(ctx, events) | |
diff --git a/go/vt/binlog/binlogplayer/binlog_player_test.go b/go/vt/binlog/binlogplayer/binlog_player_test.go | |
index dedcfcc08..f95aa2c0c 100644 | |
--- a/go/vt/binlog/binlogplayer/binlog_player_test.go | |
+++ b/go/vt/binlog/binlogplayer/binlog_player_test.go | |
@@ -22,6 +22,7 @@ import ( | |
"time" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/mysql" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/vt/throttler" | |
@@ -306,7 +307,7 @@ func TestRetryOnDeadlock(t *testing.T) { | |
// has finished. Otherwise, it may cause race with other tests. | |
func applyEvents(blp *BinlogPlayer) func() error { | |
errChan := make(chan error) | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/binlog/binlogplayer/binlog_player_test.go:309") | |
go func() { | |
errChan <- blp.ApplyBinlogEvents(ctx) | |
diff --git a/go/vt/binlog/slave_connection.go b/go/vt/binlog/slave_connection.go | |
index c44d6e87f..a8d815aed 100644 | |
--- a/go/vt/binlog/slave_connection.go | |
+++ b/go/vt/binlog/slave_connection.go | |
@@ -22,6 +22,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/mysql" | |
"vitess.io/vitess/go/pools" | |
"vitess.io/vitess/go/vt/dbconfigs" | |
@@ -96,7 +97,7 @@ var slaveIDPool = pools.NewIDPool() | |
// StartBinlogDumpFromCurrent requests a replication binlog dump from | |
// the current position. | |
func (sc *SlaveConnection) StartBinlogDumpFromCurrent(ctx context.Context) (mysql.Position, <-chan mysql.BinlogEvent, error) { | |
- ctx, sc.cancel = context.WithCancel(ctx) | |
+ ctx, sc.cancel = context2.WithCancel(ctx, "./go/vt/binlog/slave_connection.go:99") | |
masterPosition, err := sc.Conn.MasterPosition() | |
if err != nil { | |
@@ -116,7 +117,7 @@ func (sc *SlaveConnection) StartBinlogDumpFromCurrent(ctx context.Context) (mysq | |
// | |
// Note the context is valid and used until eventChan is closed. | |
func (sc *SlaveConnection) StartBinlogDumpFromPosition(ctx context.Context, startPos mysql.Position) (<-chan mysql.BinlogEvent, error) { | |
- ctx, sc.cancel = context.WithCancel(ctx) | |
+ ctx, sc.cancel = context2.WithCancel(ctx, "./go/vt/binlog/slave_connection.go:119") | |
log.Infof("sending binlog dump command: startPos=%v, slaveID=%v", startPos, sc.slaveID) | |
if err := sc.SendBinlogDumpCommand(sc.slaveID, startPos); err != nil { | |
@@ -192,7 +193,7 @@ func (sc *SlaveConnection) streamEvents(ctx context.Context) chan mysql.BinlogEv | |
// | |
// Note the context is valid and used until eventChan is closed. | |
func (sc *SlaveConnection) StartBinlogDumpFromBinlogBeforeTimestamp(ctx context.Context, timestamp int64) (<-chan mysql.BinlogEvent, error) { | |
- ctx, sc.cancel = context.WithCancel(ctx) | |
+ ctx, sc.cancel = context2.WithCancel(ctx, "./go/vt/binlog/slave_connection.go:195") | |
filename, err := sc.findFileBeforeTimestamp(ctx, timestamp) | |
if err != nil { | |
diff --git a/go/vt/binlog/updatestreamctl.go b/go/vt/binlog/updatestreamctl.go | |
index 3525c28a2..faea9bd5f 100644 | |
--- a/go/vt/binlog/updatestreamctl.go | |
+++ b/go/vt/binlog/updatestreamctl.go | |
@@ -22,6 +22,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/mysql" | |
"vitess.io/vitess/go/stats" | |
"vitess.io/vitess/go/sync2" | |
@@ -266,7 +267,7 @@ func (updateStream *UpdateStreamImpl) StreamKeyRange(ctx context.Context, positi | |
return fmt.Errorf("newKeyspaceIDResolverFactory failed: %v", err) | |
} | |
- streamCtx, cancel := context.WithCancel(ctx) | |
+ streamCtx, cancel := context2.WithCancel(ctx, "./go/vt/binlog/updatestreamctl.go:269") | |
i := updateStream.streams.Add(cancel) | |
defer updateStream.streams.Delete(i) | |
@@ -302,7 +303,7 @@ func (updateStream *UpdateStreamImpl) StreamTables(ctx context.Context, position | |
}) | |
bls := NewStreamer(updateStream.cp, updateStream.se, charset, pos, 0, f) | |
- streamCtx, cancel := context.WithCancel(ctx) | |
+ streamCtx, cancel := context2.WithCancel(ctx, "./go/vt/binlog/updatestreamctl.go:305") | |
i := updateStream.streams.Add(cancel) | |
defer updateStream.streams.Delete(i) | |
diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go | |
index 2e5e527b9..40a5d4e79 100644 | |
--- a/go/vt/discovery/healthcheck.go | |
+++ b/go/vt/discovery/healthcheck.go | |
@@ -51,6 +51,7 @@ import ( | |
"github.com/golang/protobuf/proto" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/netutil" | |
"vitess.io/vitess/go/stats" | |
"vitess.io/vitess/go/sync2" | |
@@ -495,7 +496,7 @@ func (hc *HealthCheckImpl) finalizeConn(hcc *healthCheckConn) { | |
if hcc.conn != nil { | |
// Don't use hcc.ctx because it's already closed. | |
// Use a separate context, and add a timeout to prevent unbounded waits. | |
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), 10*time.Second, "./go/vt/discovery/healthcheck.go:498") | |
defer cancel() | |
hcc.conn.Close(ctx) | |
hcc.conn = nil | |
@@ -513,7 +514,7 @@ func (hc *HealthCheckImpl) checkConn(hcc *healthCheckConn, name string) { | |
retryDelay := hc.retryDelay | |
for { | |
- streamCtx, streamCancel := context.WithCancel(hcc.ctx) | |
+ streamCtx, streamCancel := context2.WithCancel(hcc.ctx, "./go/vt/discovery/healthcheck.go:516") | |
// Setup a watcher that restarts the timer every time an update is received. | |
// If a timeout occurs for a serving tablet, we make it non-serving and send | |
@@ -715,7 +716,7 @@ func (hc *HealthCheckImpl) SetListener(listener HealthCheckStatsListener, sendDo | |
// It does not block on making connection. | |
// name is an optional tag for the tablet, e.g. an alternative address. | |
func (hc *HealthCheckImpl) AddTablet(tablet *topodatapb.Tablet, name string) { | |
- ctx, cancelFunc := context.WithCancel(context.Background()) | |
+ ctx, cancelFunc := context2.WithCancel(context.Background(), "./go/vt/discovery/healthcheck.go:718") | |
key := TabletToMapKey(tablet) | |
hcc := &healthCheckConn{ | |
ctx: ctx, | |
diff --git a/go/vt/discovery/tablet_stats_cache_wait_test.go b/go/vt/discovery/tablet_stats_cache_wait_test.go | |
index 19933494f..cffab6b4d 100644 | |
--- a/go/vt/discovery/tablet_stats_cache_wait_test.go | |
+++ b/go/vt/discovery/tablet_stats_cache_wait_test.go | |
@@ -22,6 +22,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/topo" | |
querypb "vitess.io/vitess/go/vt/proto/query" | |
@@ -29,7 +30,7 @@ import ( | |
) | |
func TestWaitForTablets(t *testing.T) { | |
- shortCtx, shortCancel := context.WithTimeout(context.Background(), 10*time.Millisecond) | |
+ shortCtx, shortCancel := context2.WithTimeout(context.Background(), 10*time.Millisecond, "./go/vt/discovery/tablet_stats_cache_wait_test.go:32") | |
defer shortCancel() | |
waitAvailableTabletInterval = 20 * time.Millisecond | |
@@ -48,7 +49,7 @@ func TestWaitForTablets(t *testing.T) { | |
} | |
// this should fail, but return a non-timeout error | |
- cancelledCtx, cancel := context.WithCancel(context.Background()) | |
+ cancelledCtx, cancel := context2.WithCancel(context.Background(), "./go/vt/discovery/tablet_stats_cache_wait_test.go:51") | |
cancel() | |
if err := tsc.WaitForTablets(cancelledCtx, "cell", "keyspace", "shard", topodatapb.TabletType_REPLICA); err == nil || err == context.DeadlineExceeded { | |
t.Errorf("want: non-timeout error, got: %v", err) | |
@@ -67,7 +68,7 @@ func TestWaitForTablets(t *testing.T) { | |
input <- shr | |
// and ask again, with longer time outs so it's not flaky | |
- longCtx, longCancel := context.WithTimeout(context.Background(), 10*time.Second) | |
+ longCtx, longCancel := context2.WithTimeout(context.Background(), 10*time.Second, "./go/vt/discovery/tablet_stats_cache_wait_test.go:70") | |
defer longCancel() | |
waitAvailableTabletInterval = 10 * time.Millisecond | |
if err := tsc.WaitForTablets(longCtx, "cell", "keyspace", "shard", topodatapb.TabletType_REPLICA); err != nil { | |
diff --git a/go/vt/discovery/topology_watcher.go b/go/vt/discovery/topology_watcher.go | |
index 429b72f54..00d25363a 100644 | |
--- a/go/vt/discovery/topology_watcher.go | |
+++ b/go/vt/discovery/topology_watcher.go | |
@@ -26,6 +26,7 @@ import ( | |
"time" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/stats" | |
"vitess.io/vitess/go/trace" | |
@@ -153,7 +154,7 @@ func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, tr TabletR | |
// We want the span from the context, but not the cancelation that comes with it | |
spanContext := trace.CopySpan(context.Background(), ctx) | |
- tw.ctx, tw.cancelFunc = context.WithCancel(spanContext) | |
+ tw.ctx, tw.cancelFunc = context2.WithCancel(spanContext, "./go/vt/discovery/topology_watcher.go:156") | |
tw.wg.Add(1) | |
go tw.watch() | |
return tw | |
diff --git a/go/vt/grpcclient/client_test.go b/go/vt/grpcclient/client_test.go | |
index 106a0937e..306040c04 100644 | |
--- a/go/vt/grpcclient/client_test.go | |
+++ b/go/vt/grpcclient/client_test.go | |
@@ -24,6 +24,7 @@ import ( | |
"google.golang.org/grpc" | |
+ "vitess.io/vitess/go/context2" | |
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" | |
vtgateservicepb "vitess.io/vitess/go/vt/proto/vtgateservice" | |
) | |
@@ -41,7 +42,7 @@ func TestDialErrors(t *testing.T) { | |
t.Fatal(err) | |
} | |
vtg := vtgateservicepb.NewVitessClient(gconn) | |
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), 10*time.Millisecond, "./go/vt/grpcclient/client_test.go:44") | |
_, err = vtg.Execute(ctx, &vtgatepb.ExecuteRequest{}) | |
cancel() | |
gconn.Close() | |
@@ -57,7 +58,7 @@ func TestDialErrors(t *testing.T) { | |
t.Fatal(err) | |
} | |
vtg := vtgateservicepb.NewVitessClient(gconn) | |
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), 10*time.Millisecond, "./go/vt/grpcclient/client_test.go:60") | |
_, err = vtg.Execute(ctx, &vtgatepb.ExecuteRequest{}) | |
cancel() | |
gconn.Close() | |
diff --git a/go/vt/mysqlctl/query.go b/go/vt/mysqlctl/query.go | |
index 0051f5bd6..6759a367f 100644 | |
--- a/go/vt/mysqlctl/query.go | |
+++ b/go/vt/mysqlctl/query.go | |
@@ -23,6 +23,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/mysql" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/vt/dbconnpool" | |
@@ -160,7 +161,7 @@ func (mysqld *Mysqld) killConnection(connID int64) error { | |
// Get another connection with which to kill. | |
// Use background context because the caller's context is likely expired, | |
// which is the reason we're being asked to kill the connection. | |
- ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), 1*time.Second, "./go/vt/mysqlctl/query.go:163") | |
defer cancel() | |
if poolConn, connErr := getPoolReconnect(ctx, mysqld.dbaPool); connErr == nil { | |
// We got a pool connection. | |
diff --git a/go/vt/schemamanager/schemaswap/schema_swap.go b/go/vt/schemamanager/schemaswap/schema_swap.go | |
index 4aa4d3816..e485fa832 100644 | |
--- a/go/vt/schemamanager/schemaswap/schema_swap.go | |
+++ b/go/vt/schemamanager/schemaswap/schema_swap.go | |
@@ -29,6 +29,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/vt/concurrency" | |
"vitess.io/vitess/go/vt/discovery" | |
@@ -879,7 +880,7 @@ func (array orderTabletsForSwap) Less(i, j int) bool { | |
// executeAdminQuery executes a query on a given tablet as 'allprivs' user. The query is executed | |
// using timeout value from --schema_swap_admin_query_timeout flag. | |
func (shardSwap *shardSchemaSwap) executeAdminQuery(tablet *topodatapb.Tablet, query string, maxRows int) (*sqltypes.Result, error) { | |
- sqlCtx, cancelSQLCtx := context.WithTimeout(shardSwap.parent.ctx, *adminQueryTimeout) | |
+ sqlCtx, cancelSQLCtx := context2.WithTimeout(shardSwap.parent.ctx, *adminQueryTimeout, "./go/vt/schemamanager/schemaswap/schema_swap.go:882") | |
defer cancelSQLCtx() | |
sqlResultProto, err := shardSwap.parent.tabletClient.ExecuteFetchAsAllPrivs( | |
diff --git a/go/vt/schemamanager/tablet_executor.go b/go/vt/schemamanager/tablet_executor.go | |
index 6346e6f91..a9738cb1f 100644 | |
--- a/go/vt/schemamanager/tablet_executor.go | |
+++ b/go/vt/schemamanager/tablet_executor.go | |
@@ -23,6 +23,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/sync2" | |
"vitess.io/vitess/go/vt/sqlparser" | |
"vitess.io/vitess/go/vt/wrangler" | |
@@ -252,7 +253,7 @@ func (exec *TabletExecutor) executeOnAllTablets(ctx context.Context, execResult | |
// execute the schema change via replication. This is best-effort, meaning | |
// we still return overall success if the timeout expires. | |
concurrency := sync2.NewSemaphore(10, 0) | |
- reloadCtx, cancel := context.WithTimeout(ctx, exec.waitSlaveTimeout) | |
+ reloadCtx, cancel := context2.WithTimeout(ctx, exec.waitSlaveTimeout, "./go/vt/schemamanager/tablet_executor.go:255") | |
defer cancel() | |
for _, result := range execResult.SuccessShards { | |
wg.Add(1) | |
diff --git a/go/vt/srvtopo/resilient_server_flaky_test.go b/go/vt/srvtopo/resilient_server_flaky_test.go | |
index e40d9500c..b21ae1883 100644 | |
--- a/go/vt/srvtopo/resilient_server_flaky_test.go | |
+++ b/go/vt/srvtopo/resilient_server_flaky_test.go | |
@@ -27,6 +27,7 @@ import ( | |
"github.com/golang/protobuf/proto" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/status" | |
"vitess.io/vitess/go/vt/topo" | |
"vitess.io/vitess/go/vt/topo/memorytopo" | |
@@ -308,7 +309,7 @@ func TestGetSrvKeyspace(t *testing.T) { | |
time.Sleep(*srvTopoCacheTTL) | |
- timeoutCtx, _ := context.WithTimeout(context.Background(), *srvTopoCacheRefresh*2) | |
+ timeoutCtx, _ := context2.WithTimeout(context.Background(), *srvTopoCacheRefresh*2, "./go/vt/srvtopo/resilient_server_flaky_test.go:311") | |
_, err = rs.GetSrvKeyspace(timeoutCtx, "test_cell", "test_ks") | |
wantErr := "timed out waiting for keyspace" | |
if err == nil || err.Error() != wantErr { | |
@@ -346,7 +347,7 @@ func TestSrvKeyspaceCachedError(t *testing.T) { | |
time.Sleep(*srvTopoCacheTTL + 10*time.Millisecond) | |
// Ask again with a different context, should get an error and | |
// save that context. | |
- ctx, cancel := context.WithCancel(ctx) | |
+ ctx, cancel := context2.WithCancel(ctx, "./go/vt/srvtopo/resilient_server_flaky_test.go:349") | |
defer cancel() | |
_, err2 := rs.GetSrvKeyspace(ctx, "test_cell", "unknown_ks") | |
if err2 == nil { | |
@@ -598,7 +599,7 @@ func TestGetSrvKeyspaceNames(t *testing.T) { | |
time.Sleep(*srvTopoCacheTTL) | |
- timeoutCtx, _ := context.WithTimeout(context.Background(), *srvTopoCacheRefresh*2) | |
+ timeoutCtx, _ := context2.WithTimeout(context.Background(), *srvTopoCacheRefresh*2, "./go/vt/srvtopo/resilient_server_flaky_test.go:601") | |
_, err = rs.GetSrvKeyspaceNames(timeoutCtx, "test_cell") | |
wantErr := "timed out waiting for keyspace names" | |
if err == nil || err.Error() != wantErr { | |
diff --git a/go/vt/topo/consultopo/election.go b/go/vt/topo/consultopo/election.go | |
index bacbbac8d..65455b1c8 100644 | |
--- a/go/vt/topo/consultopo/election.go | |
+++ b/go/vt/topo/consultopo/election.go | |
@@ -22,6 +22,7 @@ import ( | |
"golang.org/x/net/context" | |
"github.com/hashicorp/consul/api" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/log" | |
"vitess.io/vitess/go/vt/topo" | |
) | |
@@ -90,7 +91,7 @@ func (mp *consulMasterParticipation) WaitForMastership() (context.Context, error | |
} | |
// We have the lock, keep mastership until we lose it. | |
- lockCtx, lockCancel := context.WithCancel(context.Background()) | |
+ lockCtx, lockCancel := context2.WithCancel(context.Background(), "./go/vt/topo/consultopo/election.go:93") | |
go func() { | |
select { | |
case <-lost: | |
diff --git a/go/vt/topo/consultopo/watch.go b/go/vt/topo/consultopo/watch.go | |
index 8021738ca..de4ca89b9 100644 | |
--- a/go/vt/topo/consultopo/watch.go | |
+++ b/go/vt/topo/consultopo/watch.go | |
@@ -24,6 +24,7 @@ import ( | |
"golang.org/x/net/context" | |
"github.com/hashicorp/consul/api" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/topo" | |
) | |
@@ -51,7 +52,7 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, < | |
} | |
// Create a context, will be used to cancel the watch. | |
- watchCtx, watchCancel := context.WithCancel(context.Background()) | |
+ watchCtx, watchCancel := context2.WithCancel(context.Background(), "./go/vt/topo/consultopo/watch.go:54") | |
// Create the notifications channel, send updates to it. | |
notifications := make(chan *topo.WatchData, 10) | |
diff --git a/go/vt/topo/etcd2topo/election.go b/go/vt/topo/etcd2topo/election.go | |
index 354c5e499..ff3b66256 100644 | |
--- a/go/vt/topo/etcd2topo/election.go | |
+++ b/go/vt/topo/etcd2topo/election.go | |
@@ -22,6 +22,7 @@ import ( | |
"github.com/coreos/etcd/clientv3" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/log" | |
"vitess.io/vitess/go/vt/topo" | |
) | |
@@ -73,7 +74,7 @@ func (mp *etcdMasterParticipation) WaitForMastership() (context.Context, error) | |
// We use a cancelable context here. If stop is closed, | |
// we just cancel that context. | |
- lockCtx, lockCancel := context.WithCancel(context.Background()) | |
+ lockCtx, lockCancel := context2.WithCancel(context.Background(), "./go/vt/topo/etcd2topo/election.go:76") | |
go func() { | |
<-mp.stop | |
if ld != nil { | |
diff --git a/go/vt/topo/etcd2topo/lock.go b/go/vt/topo/etcd2topo/lock.go | |
index 27928cf3e..7f916239e 100644 | |
--- a/go/vt/topo/etcd2topo/lock.go | |
+++ b/go/vt/topo/etcd2topo/lock.go | |
@@ -24,6 +24,7 @@ import ( | |
"github.com/coreos/etcd/clientv3" | |
"github.com/coreos/etcd/mvcc/mvccpb" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/proto/vtrpc" | |
"vitess.io/vitess/go/vt/vterrors" | |
@@ -86,7 +87,7 @@ func (s *Server) waitOnLastRev(ctx context.Context, cli *clientv3.Client, nodePa | |
// Wait for release on blocking key. Cancel the watch when we | |
// exit this function. | |
key := string(lastKey.Kvs[0].Key) | |
- ctx, cancel := context.WithCancel(ctx) | |
+ ctx, cancel := context2.WithCancel(ctx, "./go/vt/topo/etcd2topo/lock.go:89") | |
defer cancel() | |
wc := cli.Watch(ctx, key, clientv3.WithRev(revision)) | |
if wc == nil { | |
diff --git a/go/vt/topo/etcd2topo/server_test.go b/go/vt/topo/etcd2topo/server_test.go | |
index beccd612b..920686bc2 100644 | |
--- a/go/vt/topo/etcd2topo/server_test.go | |
+++ b/go/vt/topo/etcd2topo/server_test.go | |
@@ -28,6 +28,7 @@ import ( | |
"golang.org/x/net/context" | |
"github.com/coreos/etcd/clientv3" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/testfiles" | |
"vitess.io/vitess/go/vt/topo" | |
"vitess.io/vitess/go/vt/topo/test" | |
@@ -73,7 +74,7 @@ func startEtcd(t *testing.T) (*exec.Cmd, string, string) { | |
} | |
// Wait until we can list "/", or timeout. | |
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), 10*time.Second, "./go/vt/topo/etcd2topo/server_test.go:76") | |
defer cancel() | |
start := time.Now() | |
for { | |
diff --git a/go/vt/topo/etcd2topo/watch.go b/go/vt/topo/etcd2topo/watch.go | |
index 706fd994e..cb024798c 100644 | |
--- a/go/vt/topo/etcd2topo/watch.go | |
+++ b/go/vt/topo/etcd2topo/watch.go | |
@@ -23,6 +23,7 @@ import ( | |
"github.com/coreos/etcd/clientv3" | |
"github.com/coreos/etcd/mvcc/mvccpb" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/proto/vtrpc" | |
"vitess.io/vitess/go/vt/vterrors" | |
@@ -50,10 +51,10 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, < | |
} | |
// Create an outer context that will be canceled on return and will cancel all inner watches. | |
- outerCtx, outerCancel := context.WithCancel(context.Background()) | |
+ outerCtx, outerCancel := context2.WithCancel(context.Background(), "./go/vt/topo/etcd2topo/watch.go:53") | |
// Create a context, will be used to cancel the watch on retry. | |
- watchCtx, watchCancel := context.WithCancel(outerCtx) | |
+ watchCtx, watchCancel := context2.WithCancel(outerCtx, "./go/vt/topo/etcd2topo/watch.go:56") | |
// Create the Watcher. We start watching from the response we | |
// got, not from the file original version, as the server may | |
@@ -87,7 +88,7 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, < | |
watchRetries++ | |
// Cancel inner context on retry and create new one. | |
watchCancel() | |
- watchCtx, watchCancel = context.WithCancel(outerCtx) | |
+ watchCtx, watchCancel = context2.WithCancel(outerCtx, "./go/vt/topo/etcd2topo/watch.go:90") | |
newWatcher := s.cli.Watch(watchCtx, nodePath, clientv3.WithRev(currVersion)) | |
if newWatcher == nil { | |
log.Warningf("watch %v failed and get a nil channel returned, currVersion: %v", nodePath, currVersion) | |
diff --git a/go/vt/topo/locks.go b/go/vt/topo/locks.go | |
index f9f15a479..184e505e5 100644 | |
--- a/go/vt/topo/locks.go | |
+++ b/go/vt/topo/locks.go | |
@@ -26,6 +26,7 @@ import ( | |
"time" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/proto/vtrpc" | |
"vitess.io/vitess/go/vt/vterrors" | |
@@ -217,7 +218,7 @@ func CheckKeyspaceLocked(ctx context.Context, keyspace string) error { | |
func (l *Lock) lockKeyspace(ctx context.Context, ts *Server, keyspace string) (LockDescriptor, error) { | |
log.Infof("Locking keyspace %v for action %v", keyspace, l.Action) | |
- ctx, cancel := context.WithTimeout(ctx, *RemoteOperationTimeout) | |
+ ctx, cancel := context2.WithTimeout(ctx, *RemoteOperationTimeout, "./go/vt/topo/locks.go:220") | |
defer cancel() | |
span, ctx := trace.NewSpan(ctx, "TopoServer.LockKeyspaceForAction") | |
@@ -241,7 +242,7 @@ func (l *Lock) unlockKeyspace(ctx context.Context, ts *Server, keyspace string, | |
// Note that we are not using the user provided RemoteOperationTimeout | |
// here because it is possible that that timeout is too short. | |
ctx = trace.CopySpan(context.TODO(), ctx) | |
- ctx, cancel := context.WithTimeout(ctx, defaultLockTimeout) | |
+ ctx, cancel := context2.WithTimeout(ctx, defaultLockTimeout, "./go/vt/topo/locks.go:244") | |
defer cancel() | |
span, ctx := trace.NewSpan(ctx, "TopoServer.UnlockKeyspaceForAction") | |
@@ -358,7 +359,7 @@ func CheckShardLocked(ctx context.Context, keyspace, shard string) error { | |
func (l *Lock) lockShard(ctx context.Context, ts *Server, keyspace, shard string) (LockDescriptor, error) { | |
log.Infof("Locking shard %v/%v for action %v", keyspace, shard, l.Action) | |
- ctx, cancel := context.WithTimeout(ctx, *RemoteOperationTimeout) | |
+ ctx, cancel := context2.WithTimeout(ctx, *RemoteOperationTimeout, "./go/vt/topo/locks.go:361") | |
defer cancel() | |
span, ctx := trace.NewSpan(ctx, "TopoServer.LockShardForAction") | |
@@ -382,7 +383,7 @@ func (l *Lock) unlockShard(ctx context.Context, ts *Server, keyspace, shard stri | |
// Note that we are not using the user provided RemoteOperationTimeout | |
// here because it is possible that that timeout is too short. | |
ctx = trace.CopySpan(context.TODO(), ctx) | |
- ctx, cancel := context.WithTimeout(ctx, defaultLockTimeout) | |
+ ctx, cancel := context2.WithTimeout(ctx, defaultLockTimeout, "./go/vt/topo/locks.go:385") | |
defer cancel() | |
span, ctx := trace.NewSpan(ctx, "TopoServer.UnlockShardForAction") | |
diff --git a/go/vt/topo/memorytopo/election.go b/go/vt/topo/memorytopo/election.go | |
index f66154a02..16db7ccb7 100644 | |
--- a/go/vt/topo/memorytopo/election.go | |
+++ b/go/vt/topo/memorytopo/election.go | |
@@ -21,6 +21,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/log" | |
"vitess.io/vitess/go/vt/topo" | |
) | |
@@ -81,7 +82,7 @@ func (mp *cMasterParticipation) WaitForMastership() (context.Context, error) { | |
// We use a cancelable context here. If stop is closed, | |
// we just cancel that context. | |
- lockCtx, lockCancel := context.WithCancel(context.Background()) | |
+ lockCtx, lockCancel := context2.WithCancel(context.Background(), "./go/vt/topo/memorytopo/election.go:84") | |
go func() { | |
<-mp.stop | |
if ld != nil { | |
diff --git a/go/vt/topo/test/lock.go b/go/vt/topo/test/lock.go | |
index 315653ff0..ab5e60043 100644 | |
--- a/go/vt/topo/test/lock.go | |
+++ b/go/vt/topo/test/lock.go | |
@@ -22,6 +22,7 @@ import ( | |
"time" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/topo" | |
topodatapb "vitess.io/vitess/go/vt/proto/topodata" | |
@@ -92,14 +93,14 @@ func checkLockTimeout(ctx context.Context, t *testing.T, conn topo.Conn) { | |
} | |
// test we can't take the lock again | |
- fastCtx, cancel := context.WithTimeout(ctx, timeUntilLockIsTaken) | |
+ fastCtx, cancel := context2.WithTimeout(ctx, timeUntilLockIsTaken, "./go/vt/topo/test/lock.go:95") | |
if _, err := conn.Lock(fastCtx, keyspacePath, "again"); !topo.IsErrType(err, topo.Timeout) { | |
t.Fatalf("Lock(again): %v", err) | |
} | |
cancel() | |
// test we can interrupt taking the lock | |
- interruptCtx, cancel := context.WithCancel(ctx) | |
+ interruptCtx, cancel := context2.WithCancel(ctx, "./go/vt/topo/test/lock.go:102") | |
go func() { | |
time.Sleep(timeUntilLockIsTaken) | |
cancel() | |
diff --git a/go/vt/topo/zk2topo/election.go b/go/vt/topo/zk2topo/election.go | |
index 15975a00f..df408f41e 100644 | |
--- a/go/vt/topo/zk2topo/election.go | |
+++ b/go/vt/topo/zk2topo/election.go | |
@@ -22,6 +22,7 @@ import ( | |
"github.com/z-division/go-zookeeper/zk" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/vterrors" | |
"vitess.io/vitess/go/vt/log" | |
@@ -49,7 +50,7 @@ func (zs *Server) NewMasterParticipation(name, id string) (topo.MasterParticipat | |
id: []byte(id), | |
done: make(chan struct{}), | |
} | |
- result.stopCtx, result.stopCtxCancel = context.WithCancel(context.Background()) | |
+ result.stopCtx, result.stopCtxCancel = context2.WithCancel(context.Background(), "./go/vt/topo/zk2topo/election.go:52") | |
return result, nil | |
} | |
@@ -122,7 +123,7 @@ func (mp *zkMasterParticipation) WaitForMastership() (context.Context, error) { | |
} | |
// we got the lock, create our background context | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/topo/zk2topo/election.go:125") | |
go mp.watchMastership(ctx, mp.zs.conn, proposal, cancel) | |
return ctx, nil | |
} | |
diff --git a/go/vt/vtctl/throttler.go b/go/vt/vtctl/throttler.go | |
index 1e8452e57..1822f42d5 100644 | |
--- a/go/vt/vtctl/throttler.go | |
+++ b/go/vt/vtctl/throttler.go | |
@@ -27,6 +27,7 @@ import ( | |
"github.com/olekukonko/tablewriter" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/logutil" | |
"vitess.io/vitess/go/vt/throttler" | |
"vitess.io/vitess/go/vt/throttler/throttlerclient" | |
@@ -86,7 +87,7 @@ func commandThrottlerMaxRates(ctx context.Context, wr *wrangler.Wrangler, subFla | |
} | |
// Connect to the server. | |
- ctx, cancel := context.WithTimeout(ctx, shortTimeout) | |
+ ctx, cancel := context2.WithTimeout(ctx, shortTimeout, "./go/vt/vtctl/throttler.go:89") | |
defer cancel() | |
client, err := throttlerclient.New(*server) | |
if err != nil { | |
@@ -139,7 +140,7 @@ func commandThrottlerSetMaxRate(ctx context.Context, wr *wrangler.Wrangler, subF | |
} | |
// Connect to the server. | |
- ctx, cancel := context.WithTimeout(ctx, shortTimeout) | |
+ ctx, cancel := context2.WithTimeout(ctx, shortTimeout, "./go/vt/vtctl/throttler.go:142") | |
defer cancel() | |
client, err := throttlerclient.New(*server) | |
if err != nil { | |
@@ -176,7 +177,7 @@ func commandGetThrottlerConfiguration(ctx context.Context, wr *wrangler.Wrangler | |
} | |
// Connect to the server. | |
- ctx, cancel := context.WithTimeout(ctx, shortTimeout) | |
+ ctx, cancel := context2.WithTimeout(ctx, shortTimeout, "./go/vt/vtctl/throttler.go:179") | |
defer cancel() | |
client, err := throttlerclient.New(*server) | |
if err != nil { | |
@@ -230,7 +231,7 @@ func commandUpdateThrottlerConfiguration(ctx context.Context, wr *wrangler.Wrang | |
} | |
// Connect to the server. | |
- ctx, cancel := context.WithTimeout(ctx, shortTimeout) | |
+ ctx, cancel := context2.WithTimeout(ctx, shortTimeout, "./go/vt/vtctl/throttler.go:233") | |
defer cancel() | |
client, err := throttlerclient.New(*server) | |
if err != nil { | |
@@ -268,7 +269,7 @@ func commandResetThrottlerConfiguration(ctx context.Context, wr *wrangler.Wrangl | |
} | |
// Connect to the server. | |
- ctx, cancel := context.WithTimeout(ctx, shortTimeout) | |
+ ctx, cancel := context2.WithTimeout(ctx, shortTimeout, "./go/vt/vtctl/throttler.go:271") | |
defer cancel() | |
client, err := throttlerclient.New(*server) | |
if err != nil { | |
diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go | |
index 94cb8744a..f935fd7db 100644 | |
--- a/go/vt/vtctl/vtctl.go | |
+++ b/go/vt/vtctl/vtctl.go | |
@@ -107,6 +107,7 @@ import ( | |
"github.com/golang/protobuf/proto" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/flagutil" | |
"vitess.io/vitess/go/json2" | |
"vitess.io/vitess/go/mysql" | |
@@ -1010,7 +1011,7 @@ func commandWaitForDrain(ctx context.Context, wr *wrangler.Wrangler, subFlags *f | |
} | |
if *timeout != 0 { | |
var cancel context.CancelFunc | |
- ctx, cancel = context.WithTimeout(ctx, *timeout) | |
+ ctx, cancel = context2.WithTimeout(ctx, *timeout, "./go/vt/vtctl/vtctl.go:1013") | |
defer cancel() | |
} | |
diff --git a/go/vt/vtctld/action_repository.go b/go/vt/vtctld/action_repository.go | |
index 7b484707e..c957b3280 100644 | |
--- a/go/vt/vtctld/action_repository.go | |
+++ b/go/vt/vtctld/action_repository.go | |
@@ -24,6 +24,7 @@ import ( | |
"golang.org/x/net/context" | |
"vitess.io/vitess/go/acl" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/logutil" | |
"vitess.io/vitess/go/vt/topo" | |
"vitess.io/vitess/go/vt/topo/topoproto" | |
@@ -115,7 +116,7 @@ func (ar *ActionRepository) ApplyKeyspaceAction(ctx context.Context, actionName, | |
return result | |
} | |
- ctx, cancel := context.WithTimeout(ctx, *actionTimeout) | |
+ ctx, cancel := context2.WithTimeout(ctx, *actionTimeout, "./go/vt/vtctld/action_repository.go:118") | |
wr := wrangler.New(logutil.NewConsoleLogger(), ar.ts, tmclient.NewTabletManagerClient()) | |
output, err := action(ctx, wr, keyspace, r) | |
cancel() | |
@@ -142,7 +143,7 @@ func (ar *ActionRepository) ApplyShardAction(ctx context.Context, actionName, ke | |
return result | |
} | |
- ctx, cancel := context.WithTimeout(ctx, *actionTimeout) | |
+ ctx, cancel := context2.WithTimeout(ctx, *actionTimeout, "./go/vt/vtctld/action_repository.go:145") | |
wr := wrangler.New(logutil.NewConsoleLogger(), ar.ts, tmclient.NewTabletManagerClient()) | |
output, err := action(ctx, wr, keyspace, shard, r) | |
cancel() | |
@@ -176,7 +177,7 @@ func (ar *ActionRepository) ApplyTabletAction(ctx context.Context, actionName st | |
} | |
// run the action | |
- ctx, cancel := context.WithTimeout(ctx, *actionTimeout) | |
+ ctx, cancel := context2.WithTimeout(ctx, *actionTimeout, "./go/vt/vtctld/action_repository.go:179") | |
wr := wrangler.New(logutil.NewConsoleLogger(), ar.ts, tmclient.NewTabletManagerClient()) | |
output, err := action.method(ctx, wr, tabletAlias, r) | |
cancel() | |
diff --git a/go/vt/vtctld/tablet_data_test.go b/go/vt/vtctld/tablet_data_test.go | |
index edc69d985..e8b32d2d2 100644 | |
--- a/go/vt/vtctld/tablet_data_test.go | |
+++ b/go/vt/vtctld/tablet_data_test.go | |
@@ -24,6 +24,7 @@ import ( | |
"github.com/golang/protobuf/proto" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/logutil" | |
querypb "vitess.io/vitess/go/vt/proto/query" | |
topodatapb "vitess.io/vitess/go/vt/proto/topodata" | |
@@ -141,7 +142,7 @@ func TestTabletData(t *testing.T) { | |
}() | |
// Start streaming and wait for the first result. | |
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), 5*time.Second, "./go/vt/vtctld/tablet_data_test.go:144") | |
result, err := thc.Get(ctx, tablet1.Tablet.Alias) | |
cancel() | |
close(stop) | |
diff --git a/go/vt/vtctld/workflow.go b/go/vt/vtctld/workflow.go | |
index 1982e9093..e95b4a5fc 100644 | |
--- a/go/vt/vtctld/workflow.go | |
+++ b/go/vt/vtctld/workflow.go | |
@@ -21,6 +21,7 @@ import ( | |
"time" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/trace" | |
"vitess.io/vitess/go/flagutil" | |
@@ -85,7 +86,7 @@ func initWorkflowManager(ts *topo.Server) { | |
} | |
func runWorkflowManagerAlone() { | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vtctld/workflow.go:88") | |
go vtctl.WorkflowManager.Run(ctx) | |
// Running cancel on OnTermSync will cancel the context of any | |
diff --git a/go/vt/vtgate/buffer/buffer_test.go b/go/vt/vtgate/buffer/buffer_test.go | |
index 5951ebb65..f4d7074e2 100644 | |
--- a/go/vt/vtgate/buffer/buffer_test.go | |
+++ b/go/vt/vtgate/buffer/buffer_test.go | |
@@ -26,6 +26,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/discovery" | |
"vitess.io/vitess/go/vt/topo/topoproto" | |
"vitess.io/vitess/go/vt/vterrors" | |
@@ -574,7 +575,7 @@ func testRequestCanceled(t *testing.T, explicitEnd bool) { | |
if err := waitForRequestsInFlight(b, 1); err != nil { | |
t.Fatal(err) | |
} | |
- ctx2, cancel2 := context.WithCancel(context.Background()) | |
+ ctx2, cancel2 := context2.WithCancel(context.Background(), "./go/vt/vtgate/buffer/buffer_test.go:577") | |
stopped2 := issueRequest(ctx2, t, b, failoverErr) | |
if err := waitForRequestsInFlight(b, 2); err != nil { | |
t.Fatal(err) | |
diff --git a/go/vt/vtgate/buffer/shard_buffer.go b/go/vt/vtgate/buffer/shard_buffer.go | |
index 0916b225b..8a455b4f8 100644 | |
--- a/go/vt/vtgate/buffer/shard_buffer.go | |
+++ b/go/vt/vtgate/buffer/shard_buffer.go | |
@@ -24,6 +24,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/sync2" | |
"vitess.io/vitess/go/vt/log" | |
"vitess.io/vitess/go/vt/logutil" | |
@@ -327,7 +328,7 @@ func (sb *shardBuffer) bufferRequestLocked(ctx context.Context) (*entry, error) | |
done: make(chan struct{}), | |
deadline: sb.now().Add(*window), | |
} | |
- e.bufferCtx, e.bufferCancel = context.WithCancel(ctx) | |
+ e.bufferCtx, e.bufferCancel = context2.WithCancel(ctx, "./go/vt/vtgate/buffer/shard_buffer.go:330") | |
sb.queue = append(sb.queue, e) | |
if max := lastRequestsInFlightMax.Counts()[sb.statsKeyJoined]; max < int64(len(sb.queue)) { | |
diff --git a/go/vt/vtgate/endtoend/vstream_test.go b/go/vt/vtgate/endtoend/vstream_test.go | |
index 1d382bc0f..85e553346 100644 | |
--- a/go/vt/vtgate/endtoend/vstream_test.go | |
+++ b/go/vt/vtgate/endtoend/vstream_test.go | |
@@ -22,6 +22,7 @@ import ( | |
"testing" | |
"github.com/golang/protobuf/proto" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/mysql" | |
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" | |
"vitess.io/vitess/go/vt/proto/query" | |
@@ -31,7 +32,7 @@ import ( | |
) | |
func TestVStream(t *testing.T) { | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vtgate/endtoend/vstream_test.go:34") | |
defer cancel() | |
gconn, err := vtgateconn.Dial(ctx, grpcAddress) | |
diff --git a/go/vt/vtgate/engine/merge_sort.go b/go/vt/vtgate/engine/merge_sort.go | |
index ee5c9cf1d..1c87cad97 100644 | |
--- a/go/vt/vtgate/engine/merge_sort.go | |
+++ b/go/vt/vtgate/engine/merge_sort.go | |
@@ -24,6 +24,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/sqltypes" | |
querypb "vitess.io/vitess/go/vt/proto/query" | |
@@ -37,7 +38,7 @@ import ( | |
// was pulled out. Since the input streams are sorted the same way that the heap is | |
// sorted, this guarantees that the merged stream will also be sorted the same way. | |
func mergeSort(vcursor VCursor, query string, orderBy []OrderbyParams, rss []*srvtopo.ResolvedShard, bvs []map[string]*querypb.BindVariable, callback func(*sqltypes.Result) error) error { | |
- ctx, cancel := context.WithCancel(vcursor.Context()) | |
+ ctx, cancel := context2.WithCancel(vcursor.Context(), "./go/vt/vtgate/engine/merge_sort.go:40") | |
defer cancel() | |
handles := make([]*streamHandle, len(rss)) | |
diff --git a/go/vt/vtgate/executor_stream_test.go b/go/vt/vtgate/executor_stream_test.go | |
index ea41aa88c..aabd41723 100644 | |
--- a/go/vt/vtgate/executor_stream_test.go | |
+++ b/go/vt/vtgate/executor_stream_test.go | |
@@ -21,6 +21,7 @@ import ( | |
"time" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/vt/discovery" | |
querypb "vitess.io/vitess/go/vt/proto/query" | |
@@ -85,7 +86,7 @@ func TestStreamSQLSharded(t *testing.T) { | |
func executorStreamMessages(executor *Executor, sql string) (qr *sqltypes.Result, err error) { | |
results := make(chan *sqltypes.Result, 100) | |
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), 10*time.Millisecond, "./go/vt/vtgate/executor_stream_test.go:88") | |
defer cancel() | |
err = executor.StreamExecute( | |
ctx, | |
diff --git a/go/vt/vtgate/gateway/gateway.go b/go/vt/vtgate/gateway/gateway.go | |
index 7b458332f..a9c304969 100644 | |
--- a/go/vt/vtgate/gateway/gateway.go | |
+++ b/go/vt/vtgate/gateway/gateway.go | |
@@ -23,6 +23,7 @@ import ( | |
"time" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/flagutil" | |
"vitess.io/vitess/go/vt/log" | |
@@ -107,7 +108,7 @@ func GetCreator() Creator { | |
// just calls it. | |
func WaitForTablets(gw Gateway, tabletTypesToWait []topodatapb.TabletType) error { | |
log.Infof("Gateway waiting for serving tablets...") | |
- ctx, cancel := context.WithTimeout(context.Background(), *initialTabletTimeout) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), *initialTabletTimeout, "./go/vt/vtgate/gateway/gateway.go:110") | |
defer cancel() | |
err := gw.WaitForTablets(ctx, tabletTypesToWait) | |
diff --git a/go/vt/vtgate/plugin_mysql_server.go b/go/vt/vtgate/plugin_mysql_server.go | |
index ea5e16abd..4cdd72c30 100644 | |
--- a/go/vt/vtgate/plugin_mysql_server.go | |
+++ b/go/vt/vtgate/plugin_mysql_server.go | |
@@ -26,6 +26,7 @@ import ( | |
"syscall" | |
"time" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/sqlparser" | |
"vitess.io/vitess/go/vt/vterrors" | |
@@ -89,7 +90,7 @@ func (vh *vtgateHandler) ConnectionClosed(c *mysql.Conn) { | |
var ctx context.Context | |
var cancel context.CancelFunc | |
if *mysqlQueryTimeout != 0 { | |
- ctx, cancel = context.WithTimeout(context.Background(), *mysqlQueryTimeout) | |
+ ctx, cancel = context2.WithTimeout(context.Background(), *mysqlQueryTimeout, "./go/vt/vtgate/plugin_mysql_server.go:92") | |
defer cancel() | |
} else { | |
ctx = context.Background() | |
@@ -136,7 +137,7 @@ func (vh *vtgateHandler) ComQuery(c *mysql.Conn, query string, callback func(*sq | |
ctx := context.Background() | |
var cancel context.CancelFunc | |
if *mysqlQueryTimeout != 0 { | |
- ctx, cancel = context.WithTimeout(ctx, *mysqlQueryTimeout) | |
+ ctx, cancel = context2.WithTimeout(ctx, *mysqlQueryTimeout, "./go/vt/vtgate/plugin_mysql_server.go:139") | |
defer cancel() | |
} | |
@@ -203,7 +204,7 @@ func (vh *vtgateHandler) ComPrepare(c *mysql.Conn, query string) ([]*querypb.Fie | |
var ctx context.Context | |
var cancel context.CancelFunc | |
if *mysqlQueryTimeout != 0 { | |
- ctx, cancel = context.WithTimeout(context.Background(), *mysqlQueryTimeout) | |
+ ctx, cancel = context2.WithTimeout(context.Background(), *mysqlQueryTimeout, "./go/vt/vtgate/plugin_mysql_server.go:206") | |
defer cancel() | |
} else { | |
ctx = context.Background() | |
@@ -262,7 +263,7 @@ func (vh *vtgateHandler) ComStmtExecute(c *mysql.Conn, prepare *mysql.PrepareDat | |
var ctx context.Context | |
var cancel context.CancelFunc | |
if *mysqlQueryTimeout != 0 { | |
- ctx, cancel = context.WithTimeout(context.Background(), *mysqlQueryTimeout) | |
+ ctx, cancel = context2.WithTimeout(context.Background(), *mysqlQueryTimeout, "./go/vt/vtgate/plugin_mysql_server.go:265") | |
defer cancel() | |
} else { | |
ctx = context.Background() | |
diff --git a/go/vt/vtgate/resolver.go b/go/vt/vtgate/resolver.go | |
index be535df38..813723054 100644 | |
--- a/go/vt/vtgate/resolver.go | |
+++ b/go/vt/vtgate/resolver.go | |
@@ -26,6 +26,7 @@ import ( | |
"github.com/golang/protobuf/proto" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/vt/key" | |
"vitess.io/vitess/go/vt/log" | |
@@ -347,7 +348,7 @@ func (res *Resolver) UpdateStream(ctx context.Context, keyspace string, shard st | |
// VStream streams events from one target. This function ensures that events of each | |
// transaction are streamed together, along with the corresponding GTID. | |
func (res *Resolver) VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, send func(events []*binlogdatapb.VEvent) error) error { | |
- ctx, cancel := context.WithCancel(ctx) | |
+ ctx, cancel := context2.WithCancel(ctx, "./go/vt/vtgate/resolver.go:350") | |
defer cancel() | |
// mu protects sending on ch, err and positions. | |
diff --git a/go/vt/vtgate/resolver_test.go b/go/vt/vtgate/resolver_test.go | |
index e26a77e5e..67748fb38 100644 | |
--- a/go/vt/vtgate/resolver_test.go | |
+++ b/go/vt/vtgate/resolver_test.go | |
@@ -27,6 +27,7 @@ import ( | |
"github.com/golang/protobuf/proto" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/vt/discovery" | |
"vitess.io/vitess/go/vt/key" | |
@@ -570,7 +571,7 @@ func TestResolverVStream(t *testing.T) { | |
}} | |
sbc0.AddVStreamEvents(send2, nil) | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vtgate/resolver_test.go:573") | |
defer cancel() | |
count := 1 | |
@@ -619,7 +620,7 @@ func TestResolverVStreamChunks(t *testing.T) { | |
} | |
sbc1.AddVStreamEvents([]*binlogdatapb.VEvent{{Type: binlogdatapb.VEventType_COMMIT}}, nil) | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vtgate/resolver_test.go:622") | |
defer cancel() | |
rowEncountered := false | |
@@ -695,7 +696,7 @@ func TestResolverVStreamMulti(t *testing.T) { | |
} | |
sbc1.AddVStreamEvents(send1, nil) | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vtgate/resolver_test.go:698") | |
defer cancel() | |
var got *binlogdatapb.VGtid | |
@@ -760,7 +761,7 @@ func TestResolverVStreamRetry(t *testing.T) { | |
sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "cc")) | |
sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "final error")) | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vtgate/resolver_test.go:763") | |
defer cancel() | |
count := 0 | |
@@ -819,7 +820,7 @@ func TestResolverVStreamHeartbeat(t *testing.T) { | |
}} | |
sbc0.AddVStreamEvents(send1, nil) | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vtgate/resolver_test.go:822") | |
defer cancel() | |
vgtid := &binlogdatapb.VGtid{ | |
diff --git a/go/vt/vtgate/scatter_conn.go b/go/vt/vtgate/scatter_conn.go | |
index 3bc5cb796..fff03a706 100644 | |
--- a/go/vt/vtgate/scatter_conn.go | |
+++ b/go/vt/vtgate/scatter_conn.go | |
@@ -25,6 +25,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/stats" | |
"vitess.io/vitess/go/vt/concurrency" | |
@@ -546,7 +547,7 @@ func (tt *timeTracker) Record(target *querypb.Target) time.Time { | |
func (stc *ScatterConn) MessageStream(ctx context.Context, rss []*srvtopo.ResolvedShard, name string, callback func(*sqltypes.Result) error) error { | |
// The cancelable context is used for handling errors | |
// from individual streams. | |
- ctx, cancel := context.WithCancel(ctx) | |
+ ctx, cancel := context2.WithCancel(ctx, "./go/vt/vtgate/scatter_conn.go:549") | |
defer cancel() | |
// mu is used to merge multiple callback calls into one. | |
diff --git a/go/vt/vtgate/vcursor_impl.go b/go/vt/vtgate/vcursor_impl.go | |
index 8da8069b3..13dbec234 100644 | |
--- a/go/vt/vtgate/vcursor_impl.go | |
+++ b/go/vt/vtgate/vcursor_impl.go | |
@@ -22,6 +22,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/vt/key" | |
"vitess.io/vitess/go/vt/sqlparser" | |
@@ -82,7 +83,7 @@ func (vc *vcursorImpl) MaxMemoryRows() int { | |
// SetContextTimeout updates context and sets a timeout. | |
func (vc *vcursorImpl) SetContextTimeout(timeout time.Duration) context.CancelFunc { | |
- ctx, cancel := context.WithTimeout(vc.ctx, timeout) | |
+ ctx, cancel := context2.WithTimeout(vc.ctx, timeout, "./go/vt/vtgate/vcursor_impl.go:85") | |
vc.ctx = ctx | |
return cancel | |
} | |
diff --git a/go/vt/vtgate/vtgate_test.go b/go/vt/vtgate/vtgate_test.go | |
index 8a8093666..b90db46d0 100644 | |
--- a/go/vt/vtgate/vtgate_test.go | |
+++ b/go/vt/vtgate/vtgate_test.go | |
@@ -29,6 +29,7 @@ import ( | |
"golang.org/x/net/context" | |
"github.com/golang/protobuf/proto" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/vt/discovery" | |
"vitess.io/vitess/go/vt/key" | |
@@ -1224,7 +1225,7 @@ func TestVTGateMessageStreamSharded(t *testing.T) { | |
_ = hcVTGateTest.AddTestTablet("aa", "1.1.1.1", 1002, ks, "20-40", topodatapb.TabletType_MASTER, true, 1, nil) | |
ch := make(chan *sqltypes.Result) | |
done := make(chan struct{}) | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vtgate/vtgate_test.go:1227") | |
go func() { | |
kr := &topodatapb.KeyRange{End: []byte{0x40}} | |
err := rpcVTGate.MessageStream(ctx, ks, "", kr, "msg", func(qr *sqltypes.Result) error { | |
@@ -1269,7 +1270,7 @@ func TestVTGateMessageStreamUnsharded(t *testing.T) { | |
_ = hcVTGateTest.AddTestTablet("aa", "1.1.1.1", 1001, ks, "0", topodatapb.TabletType_MASTER, true, 1, nil) | |
ch := make(chan *sqltypes.Result) | |
done := make(chan struct{}) | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vtgate/vtgate_test.go:1272") | |
go func() { | |
err := rpcVTGate.MessageStream(ctx, ks, "0", nil, "msg", func(qr *sqltypes.Result) error { | |
select { | |
@@ -1304,7 +1305,7 @@ func TestVTGateMessageStreamRetry(t *testing.T) { | |
_ = hcVTGateTest.AddTestTablet("aa", "1.1.1.1", 1001, ks, "0", topodatapb.TabletType_MASTER, true, 1, nil) | |
ch := make(chan *sqltypes.Result) | |
done := make(chan struct{}) | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vtgate/vtgate_test.go:1307") | |
go func() { | |
err := rpcVTGate.MessageStream(ctx, ks, "0", nil, "msg", func(qr *sqltypes.Result) error { | |
select { | |
@@ -1347,7 +1348,7 @@ func TestVTGateMessageStreamUnavailable(t *testing.T) { | |
tablet.MustFailCodes[vtrpcpb.Code_UNAVAILABLE] = 1 | |
ch := make(chan *sqltypes.Result) | |
done := make(chan struct{}) | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vtgate/vtgate_test.go:1350") | |
go func() { | |
err := rpcVTGate.MessageStream(ctx, ks, "0", nil, "msg", func(qr *sqltypes.Result) error { | |
select { | |
diff --git a/go/vt/vtqueryserver/plugin_mysql_server.go b/go/vt/vtqueryserver/plugin_mysql_server.go | |
index a2593b95a..85fed96d8 100644 | |
--- a/go/vt/vtqueryserver/plugin_mysql_server.go | |
+++ b/go/vt/vtqueryserver/plugin_mysql_server.go | |
@@ -25,6 +25,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/mysql" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/vt/callerid" | |
@@ -75,7 +76,7 @@ func (mh *proxyHandler) ConnectionClosed(c *mysql.Conn) { | |
var ctx context.Context | |
var cancel context.CancelFunc | |
if *mysqlQueryTimeout != 0 { | |
- ctx, cancel = context.WithTimeout(context.Background(), *mysqlQueryTimeout) | |
+ ctx, cancel = context2.WithTimeout(context.Background(), *mysqlQueryTimeout, "./go/vt/vtqueryserver/plugin_mysql_server.go:78") | |
defer cancel() | |
} else { | |
ctx = context.Background() | |
@@ -90,7 +91,7 @@ func (mh *proxyHandler) ComQuery(c *mysql.Conn, query string, callback func(*sql | |
var ctx context.Context | |
var cancel context.CancelFunc | |
if *mysqlQueryTimeout != 0 { | |
- ctx, cancel = context.WithTimeout(context.Background(), *mysqlQueryTimeout) | |
+ ctx, cancel = context2.WithTimeout(context.Background(), *mysqlQueryTimeout, "./go/vt/vtqueryserver/plugin_mysql_server.go:93") | |
defer cancel() | |
} else { | |
ctx = context.Background() | |
diff --git a/go/vt/vttablet/agentrpctest/test_agent_rpc.go b/go/vt/vttablet/agentrpctest/test_agent_rpc.go | |
index 5632b66ae..44d6d5ac8 100644 | |
--- a/go/vt/vttablet/agentrpctest/test_agent_rpc.go | |
+++ b/go/vt/vttablet/agentrpctest/test_agent_rpc.go | |
@@ -28,6 +28,7 @@ import ( | |
"golang.org/x/net/context" | |
"github.com/golang/protobuf/proto" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/vt/hook" | |
"vitess.io/vitess/go/vt/logutil" | |
@@ -201,7 +202,7 @@ func agentRPCTestPingPanic(ctx context.Context, t *testing.T, client tmclient.Ta | |
// RPCs failed due to an expired context before .Dial(). | |
func agentRPCTestDialExpiredContext(ctx context.Context, t *testing.T, client tmclient.TabletManagerClient, tablet *topodatapb.Tablet) { | |
// Using a timeout of 0 here such that .Dial() will fail immediately. | |
- expiredCtx, cancel := context.WithTimeout(ctx, 0) | |
+ expiredCtx, cancel := context2.WithTimeout(ctx, 0, "./go/vt/vttablet/agentrpctest/test_agent_rpc.go:204") | |
defer cancel() | |
err := client.Ping(expiredCtx, tablet) | |
if err == nil { | |
@@ -227,7 +228,7 @@ func agentRPCTestRPCTimeout(ctx context.Context, t *testing.T, client tmclient.T | |
// NOTE: This might still race e.g. when test execution takes too long the | |
// context will be expired in dial() already. In such cases coverage | |
// will be reduced but the test will not flake. | |
- shortCtx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, 10*time.Millisecond, "./go/vt/vttablet/agentrpctest/test_agent_rpc.go:230") | |
defer cancel() | |
fakeAgent.setSlow(true) | |
defer func() { fakeAgent.setSlow(false) }() | |
diff --git a/go/vt/vttablet/endtoend/misc_test.go b/go/vt/vttablet/endtoend/misc_test.go | |
index 39af7ba16..eb2a8738d 100644 | |
--- a/go/vt/vttablet/endtoend/misc_test.go | |
+++ b/go/vt/vttablet/endtoend/misc_test.go | |
@@ -29,6 +29,7 @@ import ( | |
"github.com/golang/protobuf/proto" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/mysql" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/vt/callerid" | |
@@ -467,7 +468,7 @@ func TestStreamHealth_Expired(t *testing.T) { | |
var health *querypb.StreamHealthResponse | |
framework.Server.BroadcastHealth(0, nil, time.Millisecond) | |
time.Sleep(5 * time.Millisecond) | |
- ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), time.Millisecond*100, "./go/vt/vttablet/endtoend/misc_test.go:470") | |
defer cancel() | |
if err := framework.Server.StreamHealth(ctx, func(shr *querypb.StreamHealthResponse) error { | |
health = shr | |
diff --git a/go/vt/vttablet/grpctabletconn/conn.go b/go/vt/vttablet/grpctabletconn/conn.go | |
index aae4da11a..1ef34b9c1 100644 | |
--- a/go/vt/vttablet/grpctabletconn/conn.go | |
+++ b/go/vt/vttablet/grpctabletconn/conn.go | |
@@ -23,6 +23,7 @@ import ( | |
"golang.org/x/net/context" | |
"google.golang.org/grpc" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/netutil" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/vt/callerid" | |
@@ -148,7 +149,7 @@ func (conn *gRPCQueryClient) StreamExecute(ctx context.Context, target *querypb. | |
// no direct API to end a stream from the client side. If callback | |
// returns an error, we return from the function. The deferred | |
// cancel will then cause the stream to be terminated. | |
- ctx, cancel := context.WithCancel(ctx) | |
+ ctx, cancel := context2.WithCancel(ctx, "./go/vt/vttablet/grpctabletconn/conn.go:151") | |
defer cancel() | |
stream, err := func() (queryservicepb.Query_StreamExecuteClient, error) { | |
@@ -493,7 +494,7 @@ func (conn *gRPCQueryClient) BeginExecuteBatch(ctx context.Context, target *quer | |
// MessageStream streams messages. | |
func (conn *gRPCQueryClient) MessageStream(ctx context.Context, target *querypb.Target, name string, callback func(*sqltypes.Result) error) error { | |
// Please see comments in StreamExecute to see how this works. | |
- ctx, cancel := context.WithCancel(ctx) | |
+ ctx, cancel := context2.WithCancel(ctx, "./go/vt/vttablet/grpctabletconn/conn.go:496") | |
defer cancel() | |
stream, err := func() (queryservicepb.Query_MessageStreamClient, error) { | |
@@ -594,7 +595,7 @@ func (conn *gRPCQueryClient) SplitQuery( | |
// StreamHealth starts a streaming RPC for VTTablet health status updates. | |
func (conn *gRPCQueryClient) StreamHealth(ctx context.Context, callback func(*querypb.StreamHealthResponse) error) error { | |
// Please see comments in StreamExecute to see how this works. | |
- ctx, cancel := context.WithCancel(ctx) | |
+ ctx, cancel := context2.WithCancel(ctx, "./go/vt/vttablet/grpctabletconn/conn.go:597") | |
defer cancel() | |
stream, err := func() (queryservicepb.Query_StreamHealthClient, error) { | |
@@ -630,7 +631,7 @@ func (conn *gRPCQueryClient) StreamHealth(ctx context.Context, callback func(*qu | |
// UpdateStream starts a streaming query to VTTablet. | |
func (conn *gRPCQueryClient) UpdateStream(ctx context.Context, target *querypb.Target, position string, timestamp int64, callback func(*querypb.StreamEvent) error) error { | |
// Please see comments in StreamExecute to see how this works. | |
- ctx, cancel := context.WithCancel(ctx) | |
+ ctx, cancel := context2.WithCancel(ctx, "./go/vt/vttablet/grpctabletconn/conn.go:633") | |
defer cancel() | |
stream, err := func() (queryservicepb.Query_UpdateStreamClient, error) { | |
diff --git a/go/vt/vttablet/heartbeat/reader.go b/go/vt/vttablet/heartbeat/reader.go | |
index 54dbc72b8..b66774860 100644 | |
--- a/go/vt/vttablet/heartbeat/reader.go | |
+++ b/go/vt/vttablet/heartbeat/reader.go | |
@@ -21,6 +21,7 @@ import ( | |
"sync" | |
"time" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/vterrors" | |
"golang.org/x/net/context" | |
@@ -149,7 +150,7 @@ func (r *Reader) GetLatest() (time.Duration, error) { | |
func (r *Reader) readHeartbeat() { | |
defer tabletenv.LogError() | |
- ctx, cancel := context.WithDeadline(context.Background(), r.now().Add(r.interval)) | |
+ ctx, cancel := context2.WithDeadline(context.Background(), r.now().Add(r.interval), "./go/vt/vttablet/heartbeat/reader.go:152") | |
defer cancel() | |
res, err := r.fetchMostRecentHeartbeat(ctx) | |
diff --git a/go/vt/vttablet/heartbeat/writer.go b/go/vt/vttablet/heartbeat/writer.go | |
index e84f75920..da9ee67b7 100644 | |
--- a/go/vt/vttablet/heartbeat/writer.go | |
+++ b/go/vt/vttablet/heartbeat/writer.go | |
@@ -21,6 +21,7 @@ import ( | |
"sync" | |
"time" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/vterrors" | |
"golang.org/x/net/context" | |
@@ -203,7 +204,7 @@ func (w *Writer) bindHeartbeatVars(query string) (string, error) { | |
// writeHeartbeat updates the heartbeat row for this tablet with the current time in nanoseconds. | |
func (w *Writer) writeHeartbeat() { | |
defer tabletenv.LogError() | |
- ctx, cancel := context.WithDeadline(context.Background(), w.now().Add(w.interval)) | |
+ ctx, cancel := context2.WithDeadline(context.Background(), w.now().Add(w.interval), "./go/vt/vttablet/heartbeat/writer.go:206") | |
defer cancel() | |
update, err := w.bindHeartbeatVars(sqlUpdateHeartbeat) | |
if err != nil { | |
diff --git a/go/vt/vttablet/tabletmanager/action_agent.go b/go/vt/vttablet/tabletmanager/action_agent.go | |
index 7fe05a7b5..fe01c4d4a 100644 | |
--- a/go/vt/vttablet/tabletmanager/action_agent.go | |
+++ b/go/vt/vttablet/tabletmanager/action_agent.go | |
@@ -44,6 +44,7 @@ import ( | |
"sync" | |
"time" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/vterrors" | |
"golang.org/x/net/context" | |
@@ -746,7 +747,7 @@ func (agent *ActionAgent) checkTabletMysqlPort(ctx context.Context, tablet *topo | |
// Update the port in the topology. Use a shorter timeout, so if | |
// the topo server is busy / throttling us, we don't hang forever here. | |
// The healthcheck go routine will try again next time. | |
- ctx, cancel := context.WithTimeout(ctx, 10*time.Second) | |
+ ctx, cancel := context2.WithTimeout(ctx, 10*time.Second, "./go/vt/vttablet/tabletmanager/action_agent.go:749") | |
defer cancel() | |
if !agent.waitingForMysql { | |
log.Warningf("MySQL port has changed from %v to %v, updating it in tablet record", topoproto.MysqlPort(tablet), mport) | |
diff --git a/go/vt/vttablet/tabletmanager/init_tablet.go b/go/vt/vttablet/tabletmanager/init_tablet.go | |
index 47e5c0091..a731a13e0 100644 | |
--- a/go/vt/vttablet/tabletmanager/init_tablet.go | |
+++ b/go/vt/vttablet/tabletmanager/init_tablet.go | |
@@ -24,8 +24,7 @@ import ( | |
"fmt" | |
"time" | |
- "golang.org/x/net/context" | |
- | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/flagutil" | |
"vitess.io/vitess/go/netutil" | |
"vitess.io/vitess/go/vt/log" | |
@@ -84,7 +83,7 @@ func (agent *ActionAgent) InitTablet(port, gRPCPort int32) error { | |
// Create a context for this whole operation. Note we will | |
// retry some actions upon failure up to this context expires. | |
- ctx, cancel := context.WithTimeout(agent.batchCtx, *initTimeout) | |
+ ctx, cancel := context2.WithTimeout(agent.batchCtx, *initTimeout, "./go/vt/vttablet/tabletmanager/init_tablet.go:87") | |
defer cancel() | |
// Read the shard, create it if necessary. | |
diff --git a/go/vt/vttablet/tabletmanager/replication_reporter.go b/go/vt/vttablet/tabletmanager/replication_reporter.go | |
index 4ec3b1241..5f3ca9ecf 100644 | |
--- a/go/vt/vttablet/tabletmanager/replication_reporter.go | |
+++ b/go/vt/vttablet/tabletmanager/replication_reporter.go | |
@@ -22,6 +22,7 @@ import ( | |
"html/template" | |
"time" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/vterrors" | |
"golang.org/x/net/context" | |
@@ -68,7 +69,7 @@ func (r *replicationReporter) Report(isSlaveType, shouldQueryServiceBeRunning bo | |
log.Infof("Slave is stopped. Running with --disable_active_reparents so will not try to reconnect to master...") | |
} else { | |
log.Infof("Slave is stopped. Trying to reconnect to master...") | |
- ctx, cancel := context.WithTimeout(r.agent.batchCtx, 5*time.Second) | |
+ ctx, cancel := context2.WithTimeout(r.agent.batchCtx, 5*time.Second, "./go/vt/vttablet/tabletmanager/replication_reporter.go:71") | |
if err := repairReplication(ctx, r.agent); err != nil { | |
log.Infof("Failed to reconnect to master: %v", err) | |
} | |
diff --git a/go/vt/vttablet/tabletmanager/rpc_external_reparent.go b/go/vt/vttablet/tabletmanager/rpc_external_reparent.go | |
index 848ba234f..0a3b34e9c 100644 | |
--- a/go/vt/vttablet/tabletmanager/rpc_external_reparent.go | |
+++ b/go/vt/vttablet/tabletmanager/rpc_external_reparent.go | |
@@ -23,6 +23,7 @@ import ( | |
"github.com/golang/protobuf/proto" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/event" | |
"vitess.io/vitess/go/stats" | |
"vitess.io/vitess/go/trace" | |
@@ -112,7 +113,7 @@ func (agent *ActionAgent) TabletExternallyReparented(ctx context.Context, extern | |
agent.updateState(ctx, newTablet, "fastTabletExternallyReparented") | |
} | |
// Start the finalize stage with a background context, but connect the trace. | |
- bgCtx, cancel := context.WithTimeout(agent.batchCtx, *finalizeReparentTimeout) | |
+ bgCtx, cancel := context2.WithTimeout(agent.batchCtx, *finalizeReparentTimeout, "./go/vt/vttablet/tabletmanager/rpc_external_reparent.go:115") | |
bgCtx = trace.CopySpan(bgCtx, ctx) | |
agent.finalizeReparentCtx = bgCtx | |
go func() { | |
diff --git a/go/vt/vttablet/tabletmanager/rpc_replication.go b/go/vt/vttablet/tabletmanager/rpc_replication.go | |
index 9ef143979..850fb57ee 100644 | |
--- a/go/vt/vttablet/tabletmanager/rpc_replication.go | |
+++ b/go/vt/vttablet/tabletmanager/rpc_replication.go | |
@@ -21,6 +21,7 @@ import ( | |
"fmt" | |
"time" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/vterrors" | |
"golang.org/x/net/context" | |
@@ -103,7 +104,7 @@ func (agent *ActionAgent) StopSlaveMinimum(ctx context.Context, position string, | |
if err != nil { | |
return "", err | |
} | |
- waitCtx, cancel := context.WithTimeout(ctx, waitTime) | |
+ waitCtx, cancel := context2.WithTimeout(ctx, waitTime, "./go/vt/vttablet/tabletmanager/rpc_replication.go:106") | |
defer cancel() | |
if err := agent.MysqlDaemon.WaitMasterPos(waitCtx, pos); err != nil { | |
return "", err | |
@@ -153,7 +154,7 @@ func (agent *ActionAgent) StartSlaveUntilAfter(ctx context.Context, position str | |
} | |
defer agent.unlock() | |
- waitCtx, cancel := context.WithTimeout(ctx, waitTime) | |
+ waitCtx, cancel := context2.WithTimeout(ctx, waitTime, "./go/vt/vttablet/tabletmanager/rpc_replication.go:156") | |
defer cancel() | |
pos, err := mysql.DecodePosition(position) | |
diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go | |
index 41c333aa9..dc8ee78eb 100644 | |
--- a/go/vt/vttablet/tabletmanager/vreplication/controller.go | |
+++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go | |
@@ -22,6 +22,7 @@ import ( | |
"strconv" | |
"time" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/vterrors" | |
"github.com/golang/protobuf/proto" | |
@@ -106,7 +107,7 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor | |
ct.tabletPicker = tp | |
// cancel | |
- ctx, ct.cancel = context.WithCancel(ctx) | |
+ ctx, ct.cancel = context2.WithCancel(ctx, "./go/vt/vttablet/tabletmanager/vreplication/controller.go:109") | |
go ct.run(ctx) | |
diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller_test.go b/go/vt/vttablet/tabletmanager/vreplication/controller_test.go | |
index f330985c4..acecde60b 100644 | |
--- a/go/vt/vttablet/tabletmanager/vreplication/controller_test.go | |
+++ b/go/vt/vttablet/tabletmanager/vreplication/controller_test.go | |
@@ -24,6 +24,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/vt/binlog/binlogplayer" | |
"vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon" | |
@@ -225,7 +226,7 @@ func TestControllerCanceledContext(t *testing.T) { | |
"source": fmt.Sprintf(`keyspace:"%s" shard:"0" key_range:<end:"\200" > `, env.KeyspaceName), | |
} | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletmanager/vreplication/controller_test.go:228") | |
cancel() | |
ct, err := newController(ctx, params, nil, nil, env.TopoServ, env.Cells[0], "rdonly", nil) | |
if err != nil { | |
diff --git a/go/vt/vttablet/tabletmanager/vreplication/engine.go b/go/vt/vttablet/tabletmanager/vreplication/engine.go | |
index 4baad2ac6..8b4c0fecd 100644 | |
--- a/go/vt/vttablet/tabletmanager/vreplication/engine.go | |
+++ b/go/vt/vttablet/tabletmanager/vreplication/engine.go | |
@@ -24,6 +24,7 @@ import ( | |
"time" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/mysql" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/vt/binlog/binlogplayer" | |
@@ -110,7 +111,7 @@ func (vre *Engine) Open(ctx context.Context) error { | |
return nil | |
} | |
- vre.ctx, vre.cancel = context.WithCancel(ctx) | |
+ vre.ctx, vre.cancel = context2.WithCancel(ctx, "./go/vt/vttablet/tabletmanager/vreplication/engine.go:113") | |
vre.isOpen = true | |
if err := vre.initAll(); err != nil { | |
go vre.Close() | |
diff --git a/go/vt/vttablet/tabletmanager/vreplication/engine_test.go b/go/vt/vttablet/tabletmanager/vreplication/engine_test.go | |
index d16ca35cc..11cb9b670 100644 | |
--- a/go/vt/vttablet/tabletmanager/vreplication/engine_test.go | |
+++ b/go/vt/vttablet/tabletmanager/vreplication/engine_test.go | |
@@ -23,6 +23,7 @@ import ( | |
"time" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/mysql" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/vt/binlog/binlogplayer" | |
@@ -398,7 +399,7 @@ func TestWaitForPosCancel(t *testing.T) { | |
sqltypes.NewVarBinary("Running"), | |
sqltypes.NewVarBinary(""), | |
}}}, nil) | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletmanager/vreplication/engine_test.go:401") | |
cancel() | |
err := vre.WaitForPos(ctx, 1, "MariaDB/0-1-1084") | |
if err == nil || err != context.Canceled { | |
diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier.go | |
index 37152f006..83ebb5bd6 100644 | |
--- a/go/vt/vttablet/tabletmanager/vreplication/vcopier.go | |
+++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier.go | |
@@ -27,6 +27,7 @@ import ( | |
"github.com/golang/protobuf/proto" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/mysql" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/vt/binlog/binlogplayer" | |
@@ -115,7 +116,7 @@ func (vc *vcopier) copyNext(ctx context.Context, settings binlogplayer.VRSetting | |
} | |
func (vc *vcopier) catchup(ctx context.Context, copyState map[string]*sqltypes.Result) error { | |
- ctx, cancel := context.WithCancel(ctx) | |
+ ctx, cancel := context2.WithCancel(ctx, "./go/vt/vttablet/tabletmanager/vreplication/vcopier.go:118") | |
defer cancel() | |
settings, err := binlogplayer.ReadVRSettings(vc.vr.dbClient, vc.vr.id) | |
@@ -182,7 +183,7 @@ func (vc *vcopier) copyTable(ctx context.Context, tableName string, copyState ma | |
} | |
defer vsClient.Close(ctx) | |
- ctx, cancel := context.WithTimeout(ctx, copyTimeout) | |
+ ctx, cancel := context2.WithTimeout(ctx, copyTimeout, "./go/vt/vttablet/tabletmanager/vreplication/vcopier.go:185") | |
defer cancel() | |
target := &querypb.Target{ | |
diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go | |
index 3a4bfd1ff..8681eccf8 100644 | |
--- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go | |
+++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go | |
@@ -24,6 +24,7 @@ import ( | |
"time" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/mysql" | |
"vitess.io/vitess/go/sqltypes" | |
@@ -113,7 +114,7 @@ func (vp *vplayer) fetchAndApply(ctx context.Context) error { | |
return fmt.Errorf("error dialing tablet: %v", err) | |
} | |
defer vsClient.Close(ctx) | |
- ctx, cancel := context.WithCancel(ctx) | |
+ ctx, cancel := context2.WithCancel(ctx, "./go/vt/vttablet/tabletmanager/vreplication/vplayer.go:116") | |
defer cancel() | |
relay := newRelayLog(ctx, relayLogMaxItems, relayLogMaxSize) | |
diff --git a/go/vt/vttablet/tabletserver/connpool/dbconn_test.go b/go/vt/vttablet/tabletserver/connpool/dbconn_test.go | |
index aad986ee7..855a5622f 100644 | |
--- a/go/vt/vttablet/tabletserver/connpool/dbconn_test.go | |
+++ b/go/vt/vttablet/tabletserver/connpool/dbconn_test.go | |
@@ -26,6 +26,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/mysql" | |
"vitess.io/vitess/go/mysql/fakesqldb" | |
"vitess.io/vitess/go/sqltypes" | |
@@ -62,7 +63,7 @@ func TestDBConnExec(t *testing.T) { | |
connPool := newPool() | |
connPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams()) | |
defer connPool.Close() | |
- ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(10*time.Second)) | |
+ ctx, cancel := context2.WithDeadline(context.Background(), time.Now().Add(10*time.Second), "./go/vt/vttablet/tabletserver/connpool/dbconn_test.go:65") | |
defer cancel() | |
dbConn, err := NewDBConn(connPool, db.ConnParams()) | |
if dbConn != nil { | |
@@ -140,7 +141,7 @@ func TestDBConnDeadline(t *testing.T) { | |
defer connPool.Close() | |
db.SetConnDelay(100 * time.Millisecond) | |
- ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(50*time.Millisecond)) | |
+ ctx, cancel := context2.WithDeadline(context.Background(), time.Now().Add(50*time.Millisecond), "./go/vt/vttablet/tabletserver/connpool/dbconn_test.go:143") | |
defer cancel() | |
dbConn, err := NewDBConn(connPool, db.ConnParams()) | |
@@ -163,7 +164,7 @@ func TestDBConnDeadline(t *testing.T) { | |
startCounts = tabletenv.MySQLStats.Counts() | |
- ctx, cancel = context.WithDeadline(context.Background(), time.Now().Add(10*time.Second)) | |
+ ctx, cancel = context2.WithDeadline(context.Background(), time.Now().Add(10*time.Second), "./go/vt/vttablet/tabletserver/connpool/dbconn_test.go:166") | |
defer cancel() | |
result, err := dbConn.Exec(ctx, sql, 1, false) | |
@@ -300,7 +301,7 @@ func TestDBConnStream(t *testing.T) { | |
connPool := newPool() | |
connPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams()) | |
defer connPool.Close() | |
- ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(10*time.Second)) | |
+ ctx, cancel := context2.WithDeadline(context.Background(), time.Now().Add(10*time.Second), "./go/vt/vttablet/tabletserver/connpool/dbconn_test.go:303") | |
defer cancel() | |
dbConn, err := NewDBConn(connPool, db.ConnParams()) | |
if dbConn != nil { | |
diff --git a/go/vt/vttablet/tabletserver/messager/message_manager.go b/go/vt/vttablet/tabletserver/messager/message_manager.go | |
index 2b8b93e4d..a8aadf356 100644 | |
--- a/go/vt/vttablet/tabletserver/messager/message_manager.go | |
+++ b/go/vt/vttablet/tabletserver/messager/message_manager.go | |
@@ -23,6 +23,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/stats" | |
"vitess.io/vitess/go/sync2" | |
@@ -56,7 +57,7 @@ type messageReceiver struct { | |
} | |
func newMessageReceiver(ctx context.Context, send func(*sqltypes.Result) error) (*messageReceiver, <-chan struct{}) { | |
- ctx, cancel := context.WithCancel(ctx) | |
+ ctx, cancel := context2.WithCancel(ctx, "./go/vt/vttablet/tabletserver/messager/message_manager.go:59") | |
rcv := &messageReceiver{ | |
ctx: ctx, | |
errChan: make(chan error, 1), | |
@@ -533,7 +534,7 @@ func (mm *messageManager) postpone(tsv TabletService, name string, ackWaitTime t | |
return | |
} | |
defer mm.postponeSema.Release() | |
- ctx, cancel := context.WithTimeout(tabletenv.LocalContext(), ackWaitTime) | |
+ ctx, cancel := context2.WithTimeout(tabletenv.LocalContext(), ackWaitTime, "./go/vt/vttablet/tabletserver/messager/message_manager.go:536") | |
defer cancel() | |
if _, err := tsv.PostponeMessages(ctx, nil, name, ids); err != nil { | |
// This can happen during spikes. Record the incident for monitoring. | |
@@ -542,7 +543,7 @@ func (mm *messageManager) postpone(tsv TabletService, name string, ackWaitTime t | |
} | |
func (mm *messageManager) runPoller() { | |
- ctx, cancel := context.WithTimeout(tabletenv.LocalContext(), mm.pollerTicks.Interval()) | |
+ ctx, cancel := context2.WithTimeout(tabletenv.LocalContext(), mm.pollerTicks.Interval(), "./go/vt/vttablet/tabletserver/messager/message_manager.go:545") | |
defer func() { | |
tabletenv.LogError() | |
cancel() | |
@@ -612,7 +613,7 @@ func (mm *messageManager) runPurge() { | |
// purge is a non-member because it should be called asynchronously and should | |
// not rely on members of messageManager. | |
func purge(tsv TabletService, name string, purgeAfter, purgeInterval time.Duration) { | |
- ctx, cancel := context.WithTimeout(tabletenv.LocalContext(), purgeInterval) | |
+ ctx, cancel := context2.WithTimeout(tabletenv.LocalContext(), purgeInterval, "./go/vt/vttablet/tabletserver/messager/message_manager.go:615") | |
defer func() { | |
tabletenv.LogError() | |
cancel() | |
diff --git a/go/vt/vttablet/tabletserver/messager/message_manager_test.go b/go/vt/vttablet/tabletserver/messager/message_manager_test.go | |
index aa43af69e..6fd142db7 100644 | |
--- a/go/vt/vttablet/tabletserver/messager/message_manager_test.go | |
+++ b/go/vt/vttablet/tabletserver/messager/message_manager_test.go | |
@@ -26,6 +26,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/mysql/fakesqldb" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/sync2" | |
@@ -106,7 +107,7 @@ func TestReceiverCancel(t *testing.T) { | |
mm.Open() | |
defer mm.Close() | |
r1 := newTestReceiver(0) | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletserver/messager/message_manager_test.go:109") | |
_ = mm.Subscribe(ctx, r1.rcv) | |
cancel() | |
// r1 should eventually be unsubscribed. | |
@@ -232,7 +233,7 @@ func TestMessageManagerSend(t *testing.T) { | |
// Test that mm stops sending to a canceled receiver. | |
r2 := newTestReceiver(1) | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletserver/messager/message_manager_test.go:235") | |
mm.Subscribe(ctx, r2.rcv) | |
<-r2.ch | |
mm.Add(&MessageRow{Row: []sqltypes.Value{sqltypes.NewVarBinary("2")}}) | |
@@ -343,7 +344,7 @@ func TestMessageManagerSendEOF(t *testing.T) { | |
mm.Open() | |
defer mm.Close() | |
r1 := newTestReceiver(0) | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletserver/messager/message_manager_test.go:346") | |
mm.Subscribe(ctx, r1.rcv) | |
// Pull field info. | |
<-r1.ch | |
@@ -514,7 +515,7 @@ func TestMessageManagerPoller(t *testing.T) { | |
mm.Open() | |
defer mm.Close() | |
r1 := newTestReceiver(1) | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletserver/messager/message_manager_test.go:517") | |
mm.Subscribe(ctx, r1.rcv) | |
<-r1.ch | |
mm.pollerTicks.Trigger() | |
diff --git a/go/vt/vttablet/tabletserver/query_engine.go b/go/vt/vttablet/tabletserver/query_engine.go | |
index f2994f784..c7eb0e64e 100644 | |
--- a/go/vt/vttablet/tabletserver/query_engine.go | |
+++ b/go/vt/vttablet/tabletserver/query_engine.go | |
@@ -28,6 +28,7 @@ import ( | |
"vitess.io/vitess/go/acl" | |
"vitess.io/vitess/go/cache" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/mysql" | |
"vitess.io/vitess/go/stats" | |
"vitess.io/vitess/go/streamlog" | |
@@ -383,7 +384,7 @@ func (qe *QueryEngine) getQueryConn(ctx context.Context) (*connpool.DBConn, erro | |
timeout := qe.connTimeout.Get() | |
if timeout != 0 { | |
- ctxTimeout, cancel := context.WithTimeout(ctx, timeout) | |
+ ctxTimeout, cancel := context2.WithTimeout(ctx, timeout, "./go/vt/vttablet/tabletserver/query_engine.go:386") | |
defer cancel() | |
conn, err := qe.conns.Get(ctxTimeout) | |
if err != nil { | |
diff --git a/go/vt/vttablet/tabletserver/replication_watcher.go b/go/vt/vttablet/tabletserver/replication_watcher.go | |
index da3bfd9df..615ce96ee 100644 | |
--- a/go/vt/vttablet/tabletserver/replication_watcher.go | |
+++ b/go/vt/vttablet/tabletserver/replication_watcher.go | |
@@ -22,6 +22,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/mysql" | |
"vitess.io/vitess/go/stats" | |
"vitess.io/vitess/go/vt/binlog" | |
@@ -91,7 +92,7 @@ func (rpw *ReplicationWatcher) Open() { | |
if rpw.isOpen || !rpw.watchReplication { | |
return | |
} | |
- ctx, cancel := context.WithCancel(tabletenv.LocalContext()) | |
+ ctx, cancel := context2.WithCancel(tabletenv.LocalContext(), "./go/vt/vttablet/tabletserver/replication_watcher.go:94") | |
rpw.cancel = cancel | |
rpw.wg.Add(1) | |
go rpw.Process(ctx, rpw.dbconfigs) | |
diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go | |
index 64711af9b..89393bd19 100644 | |
--- a/go/vt/vttablet/tabletserver/tabletserver.go | |
+++ b/go/vt/vttablet/tabletserver/tabletserver.go | |
@@ -31,6 +31,7 @@ import ( | |
"golang.org/x/net/context" | |
"vitess.io/vitess/go/acl" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/history" | |
"vitess.io/vitess/go/mysql" | |
"vitess.io/vitess/go/sqltypes" | |
@@ -2016,7 +2017,7 @@ func (tsv *TabletServer) UpdateStream(ctx context.Context, target *querypb.Targe | |
s := binlog.NewEventStreamer(tsv.dbconfigs.DbaWithDB(), tsv.se, p, timestamp, callback) | |
// Create a cancelable wrapping context. | |
- streamCtx, streamCancel := context.WithCancel(ctx) | |
+ streamCtx, streamCancel := context2.WithCancel(ctx, "./go/vt/vttablet/tabletserver/tabletserver.go:2019") | |
i := tsv.updateStreamList.Add(streamCancel) | |
defer tsv.updateStreamList.Delete(i) | |
@@ -2318,7 +2319,7 @@ func withTimeout(ctx context.Context, timeout time.Duration, options *querypb.Ex | |
if timeout == 0 || options.GetWorkload() == querypb.ExecuteOptions_DBA || tabletenv.IsLocalContext(ctx) { | |
return ctx, func() {} | |
} | |
- return context.WithTimeout(ctx, timeout) | |
+ return context2.WithTimeout(ctx, timeout, "./go/vt/vttablet/tabletserver/tabletserver.go:2321") | |
} | |
// skipQueryPlanCache returns true if the query plan should be cached | |
diff --git a/go/vt/vttablet/tabletserver/tabletserver_test.go b/go/vt/vttablet/tabletserver/tabletserver_test.go | |
index ca5d2c38d..71f8c4218 100644 | |
--- a/go/vt/vttablet/tabletserver/tabletserver_test.go | |
+++ b/go/vt/vttablet/tabletserver/tabletserver_test.go | |
@@ -32,6 +32,7 @@ import ( | |
"github.com/golang/protobuf/proto" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/mysql" | |
"vitess.io/vitess/go/mysql/fakesqldb" | |
"vitess.io/vitess/go/sqltypes" | |
@@ -958,7 +959,7 @@ func TestTabletServerBeginFail(t *testing.T) { | |
t.Fatalf("StartService failed: %v", err) | |
} | |
defer tsv.StopService() | |
- ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), 1*time.Nanosecond, "./go/vt/vttablet/tabletserver/tabletserver_test.go:961") | |
defer cancel() | |
tsv.Begin(ctx, &target, nil) | |
_, err = tsv.Begin(ctx, &target, nil) | |
@@ -2092,7 +2093,7 @@ func TestSerializeTransactionsSameRow_RequestCanceled(t *testing.T) { | |
}() | |
// tx2. | |
- ctxTx2, cancelTx2 := context.WithCancel(ctx) | |
+ ctxTx2, cancelTx2 := context2.WithCancel(ctx, "./go/vt/vttablet/tabletserver/tabletserver_test.go:2095") | |
wg.Add(1) | |
go func() { | |
defer wg.Done() | |
diff --git a/go/vt/vttablet/tabletserver/tx_engine.go b/go/vt/vttablet/tabletserver/tx_engine.go | |
index 148569a02..e9c73da0f 100644 | |
--- a/go/vt/vttablet/tabletserver/tx_engine.go | |
+++ b/go/vt/vttablet/tabletserver/tx_engine.go | |
@@ -23,6 +23,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/timer" | |
"vitess.io/vitess/go/trace" | |
"vitess.io/vitess/go/vt/concurrency" | |
@@ -541,7 +542,7 @@ func (te *TxEngine) rollbackPrepared() { | |
// transactions and calls the notifier on them. | |
func (te *TxEngine) startWatchdog() { | |
te.ticks.Start(func() { | |
- ctx, cancel := context.WithTimeout(tabletenv.LocalContext(), te.abandonAge/4) | |
+ ctx, cancel := context2.WithTimeout(tabletenv.LocalContext(), te.abandonAge/4, "./go/vt/vttablet/tabletserver/tx_engine.go:544") | |
defer cancel() | |
// Raise alerts on prepares that have been unresolved for too long. | |
diff --git a/go/vt/vttablet/tabletserver/txserializer/tx_serializer_test.go b/go/vt/vttablet/tabletserver/txserializer/tx_serializer_test.go | |
index bc64e396a..2117fb957 100644 | |
--- a/go/vt/vttablet/tabletserver/txserializer/tx_serializer_test.go | |
+++ b/go/vt/vttablet/tabletserver/txserializer/tx_serializer_test.go | |
@@ -27,6 +27,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/streamlog" | |
"vitess.io/vitess/go/vt/vterrors" | |
@@ -306,7 +307,7 @@ func TestTxSerializerCancel(t *testing.T) { | |
} | |
// tx3 (gets queued and must wait). | |
- ctx3, cancel3 := context.WithCancel(context.Background()) | |
+ ctx3, cancel3 := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletserver/txserializer/tx_serializer_test.go:309") | |
wg := sync.WaitGroup{} | |
wg.Add(1) | |
go func() { | |
diff --git a/go/vt/vttablet/tabletserver/vstreamer/engine_test.go b/go/vt/vttablet/tabletserver/vstreamer/engine_test.go | |
index c88a9aeef..1578ff723 100644 | |
--- a/go/vt/vttablet/tabletserver/vstreamer/engine_test.go | |
+++ b/go/vt/vttablet/tabletserver/vstreamer/engine_test.go | |
@@ -23,6 +23,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" | |
) | |
@@ -53,7 +54,7 @@ func TestUpdateVSchema(t *testing.T) { | |
defer env.SetVSchema("{}") | |
// We have to start at least one stream to start the vschema watcher. | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletserver/vstreamer/engine_test.go:56") | |
defer cancel() | |
filter := &binlogdatapb.Filter{ | |
diff --git a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go | |
index c4114d451..ce98a74dd 100644 | |
--- a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go | |
+++ b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go | |
@@ -20,6 +20,7 @@ import ( | |
"context" | |
"fmt" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/mysql" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/vt/dbconfigs" | |
@@ -49,7 +50,7 @@ type rowStreamer struct { | |
} | |
func newRowStreamer(ctx context.Context, cp *mysql.ConnParams, se *schema.Engine, query string, lastpk []sqltypes.Value, kschema *vindexes.KeyspaceSchema, send func(*binlogdatapb.VStreamRowsResponse) error) *rowStreamer { | |
- ctx, cancel := context.WithCancel(ctx) | |
+ ctx, cancel := context2.WithCancel(ctx, "./go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go:52") | |
return &rowStreamer{ | |
ctx: ctx, | |
cancel: cancel, | |
diff --git a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go | |
index c57135f4d..918db73a7 100644 | |
--- a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go | |
+++ b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go | |
@@ -22,6 +22,7 @@ import ( | |
"testing" | |
"time" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/sqltypes" | |
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" | |
@@ -253,7 +254,7 @@ func TestStreamRowsCancel(t *testing.T) { | |
}) | |
engine.se.Reload(context.Background()) | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go:256") | |
defer cancel() | |
err := engine.StreamRows(ctx, "select * from t1", nil, func(rows *binlogdatapb.VStreamRowsResponse) error { | |
diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go | |
index 91e09e8f3..76ddd1cf8 100644 | |
--- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go | |
+++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go | |
@@ -23,6 +23,7 @@ import ( | |
"io" | |
"time" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/mysql" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/vt/binlog" | |
@@ -70,7 +71,7 @@ type streamerPlan struct { | |
} | |
func newVStreamer(ctx context.Context, cp *mysql.ConnParams, se *schema.Engine, startPos string, filter *binlogdatapb.Filter, kschema *vindexes.KeyspaceSchema, send func([]*binlogdatapb.VEvent) error) *vstreamer { | |
- ctx, cancel := context.WithCancel(ctx) | |
+ ctx, cancel := context2.WithCancel(ctx, "./go/vt/vttablet/tabletserver/vstreamer/vstreamer.go:73") | |
return &vstreamer{ | |
ctx: ctx, | |
cancel: cancel, | |
diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go | |
index baa49af74..eab8df500 100644 | |
--- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go | |
+++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go | |
@@ -23,6 +23,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/mysql" | |
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" | |
@@ -193,7 +194,7 @@ func TestREKeyRange(t *testing.T) { | |
} | |
defer env.SetVSchema("{}") | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go:196") | |
defer cancel() | |
filter := &binlogdatapb.Filter{ | |
@@ -340,7 +341,7 @@ func TestDDLAddColumn(t *testing.T) { | |
}) | |
engine.se.Reload(context.Background()) | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go:343") | |
defer cancel() | |
// Test RE as well as select-based filters. | |
@@ -406,7 +407,7 @@ func TestDDLDropColumn(t *testing.T) { | |
}) | |
engine.se.Reload(context.Background()) | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go:409") | |
defer cancel() | |
ch := make(chan []*binlogdatapb.VEvent) | |
@@ -771,7 +772,7 @@ func TestMinimalMode(t *testing.T) { | |
"set @@session.binlog_row_image='full'", | |
}) | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go:774") | |
defer cancel() | |
ch := make(chan []*binlogdatapb.VEvent) | |
@@ -810,7 +811,7 @@ func TestStatementMode(t *testing.T) { | |
"set @@session.binlog_format='row'", | |
}) | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go:813") | |
defer cancel() | |
ch := make(chan []*binlogdatapb.VEvent) | |
@@ -829,7 +830,7 @@ func TestStatementMode(t *testing.T) { | |
func runCases(t *testing.T, filter *binlogdatapb.Filter, testcases []testcase) { | |
t.Helper() | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go:832") | |
defer cancel() | |
ch := startStream(ctx, t, filter) | |
diff --git a/go/vt/vttest/mysqlctl.go b/go/vt/vttest/mysqlctl.go | |
index d7e135dbf..8a7f6b4fa 100644 | |
--- a/go/vt/vttest/mysqlctl.go | |
+++ b/go/vt/vttest/mysqlctl.go | |
@@ -25,6 +25,7 @@ import ( | |
"strings" | |
"time" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/mysql" | |
) | |
@@ -52,7 +53,7 @@ type Mysqlctl struct { | |
// Setup spawns a new mysqld service and initializes it with the defaults. | |
// The service is kept running in the background until TearDown() is called. | |
func (ctl *Mysqlctl) Setup() error { | |
- ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), 60*time.Second, "./go/vt/vttest/mysqlctl.go:55") | |
defer cancel() | |
cmd := exec.CommandContext(ctx, | |
@@ -76,7 +77,7 @@ func (ctl *Mysqlctl) Setup() error { | |
// TearDown shutdowns the running mysqld service | |
func (ctl *Mysqlctl) TearDown() error { | |
- ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), 60*time.Second, "./go/vt/vttest/mysqlctl.go:79") | |
defer cancel() | |
cmd := exec.CommandContext(ctx, | |
diff --git a/go/vt/worker/chunk.go b/go/vt/worker/chunk.go | |
index 4b775b470..1b9b9e80b 100644 | |
--- a/go/vt/worker/chunk.go | |
+++ b/go/vt/worker/chunk.go | |
@@ -19,6 +19,7 @@ package worker | |
import ( | |
"fmt" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/proto/vtrpc" | |
"vitess.io/vitess/go/vt/vterrors" | |
@@ -94,7 +95,7 @@ func generateChunks(ctx context.Context, wr *wrangler.Wrangler, tablet *topodata | |
// Get the MIN and MAX of the leading column of the primary key. | |
query := fmt.Sprintf("SELECT MIN(%v), MAX(%v) FROM %v.%v", sqlescape.EscapeID(td.PrimaryKeyColumns[0]), sqlescape.EscapeID(td.PrimaryKeyColumns[0]), sqlescape.EscapeID(topoproto.TabletDbName(tablet)), sqlescape.EscapeID(td.Name)) | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/chunk.go:97") | |
qr, err := wr.TabletManagerClient().ExecuteFetchAsApp(shortCtx, tablet, true, []byte(query), 1) | |
cancel() | |
if err != nil { | |
diff --git a/go/vt/worker/diff_utils.go b/go/vt/worker/diff_utils.go | |
index 19d2dcbca..895267e0f 100644 | |
--- a/go/vt/worker/diff_utils.go | |
+++ b/go/vt/worker/diff_utils.go | |
@@ -25,6 +25,7 @@ import ( | |
"strings" | |
"time" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/proto/vtrpc" | |
"vitess.io/vitess/go/vt/vterrors" | |
"vitess.io/vitess/go/vt/vttablet/tmclient" | |
@@ -63,7 +64,7 @@ type QueryResultReader struct { | |
// NewQueryResultReaderForTablet creates a new QueryResultReader for | |
// the provided tablet / sql query | |
func NewQueryResultReaderForTablet(ctx context.Context, ts *topo.Server, tabletAlias *topodatapb.TabletAlias, sql string) (*QueryResultReader, error) { | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/diff_utils.go:66") | |
tablet, err := ts.GetTablet(shortCtx, tabletAlias) | |
cancel() | |
if err != nil { | |
@@ -97,7 +98,7 @@ func NewQueryResultReaderForTablet(ctx context.Context, ts *topo.Server, tabletA | |
// NewTransactionalQueryResultReaderForTablet creates a new QueryResultReader for | |
// the provided tablet / sql query, and runs it in an existing transaction | |
func NewTransactionalQueryResultReaderForTablet(ctx context.Context, ts *topo.Server, tabletAlias *topodatapb.TabletAlias, sql string, txID int64) (*QueryResultReader, error) { | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/diff_utils.go:100") | |
tablet, err := ts.GetTablet(shortCtx, tabletAlias) | |
cancel() | |
if err != nil { | |
@@ -130,7 +131,7 @@ func NewTransactionalQueryResultReaderForTablet(ctx context.Context, ts *topo.Se | |
// RollbackTransaction rolls back the transaction | |
func RollbackTransaction(ctx context.Context, ts *topo.Server, tabletAlias *topodatapb.TabletAlias, txID int64) error { | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/diff_utils.go:133") | |
tablet, err := ts.GetTablet(shortCtx, tabletAlias) | |
cancel() | |
if err != nil { | |
diff --git a/go/vt/worker/executor.go b/go/vt/worker/executor.go | |
index 07ec11d13..32a6beb2c 100644 | |
--- a/go/vt/worker/executor.go | |
+++ b/go/vt/worker/executor.go | |
@@ -20,6 +20,7 @@ import ( | |
"fmt" | |
"time" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/vterrors" | |
"golang.org/x/net/context" | |
@@ -117,7 +118,7 @@ func (e *executor) refreshState(ctx context.Context) error { | |
func (e *executor) fetchWithRetries(ctx context.Context, action func(ctx context.Context, tablet *topodatapb.Tablet) error) error { | |
retryDuration := *retryDuration | |
// We should keep retrying up until the retryCtx runs out. | |
- retryCtx, retryCancel := context.WithTimeout(ctx, retryDuration) | |
+ retryCtx, retryCancel := context2.WithTimeout(ctx, retryDuration, "./go/vt/worker/executor.go:120") | |
defer retryCancel() | |
// Is this current attempt a retry of a previous attempt? | |
isRetry := false | |
@@ -150,7 +151,7 @@ func (e *executor) fetchWithRetries(ctx context.Context, action func(ctx context | |
// Run the command (in a block since goto above does not allow to introduce | |
// new variables until the label is reached.) | |
{ | |
- tryCtx, cancel := context.WithTimeout(retryCtx, 2*time.Minute) | |
+ tryCtx, cancel := context2.WithTimeout(retryCtx, 2*time.Minute, "./go/vt/worker/executor.go:153") | |
err = action(tryCtx, master.Tablet) | |
cancel() | |
diff --git a/go/vt/worker/instance.go b/go/vt/worker/instance.go | |
index 371ff734e..0bba03eff 100644 | |
--- a/go/vt/worker/instance.go | |
+++ b/go/vt/worker/instance.go | |
@@ -24,6 +24,7 @@ import ( | |
"time" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/trace" | |
"vitess.io/vitess/go/tb" | |
@@ -115,7 +116,7 @@ func (wi *Instance) setAndStartWorker(ctx context.Context, wrk Worker, wr *wrang | |
wi.currentWorker = wrk | |
wi.currentMemoryLogger = logutil.NewMemoryLogger() | |
- wi.currentContext, wi.currentCancelFunc = context.WithCancel(ctx) | |
+ wi.currentContext, wi.currentCancelFunc = context2.WithCancel(ctx, "./go/vt/worker/instance.go:118") | |
wi.lastRunError = nil | |
wi.lastRunStopTime = time.Unix(0, 0) | |
done := make(chan struct{}) | |
diff --git a/go/vt/worker/legacy_split_clone.go b/go/vt/worker/legacy_split_clone.go | |
index a191a1b79..890a71207 100644 | |
--- a/go/vt/worker/legacy_split_clone.go | |
+++ b/go/vt/worker/legacy_split_clone.go | |
@@ -27,6 +27,7 @@ import ( | |
"sync" | |
"time" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/vterrors" | |
"golang.org/x/net/context" | |
@@ -266,7 +267,7 @@ func (scw *LegacySplitCloneWorker) init(ctx context.Context) error { | |
var err error | |
// read the keyspace and validate it | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/legacy_split_clone.go:269") | |
scw.keyspaceInfo, err = scw.wr.TopoServer().GetKeyspace(shortCtx, scw.keyspace) | |
cancel() | |
if err != nil { | |
@@ -274,7 +275,7 @@ func (scw *LegacySplitCloneWorker) init(ctx context.Context) error { | |
} | |
// find the OverlappingShards in the keyspace | |
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/legacy_split_clone.go:277") | |
osList, err := topotools.FindOverlappingShards(shortCtx, scw.wr.TopoServer(), scw.keyspace) | |
cancel() | |
if err != nil { | |
@@ -367,7 +368,7 @@ func (scw *LegacySplitCloneWorker) findTargets(ctx context.Context) error { | |
// get the tablet info for them, and stop their replication | |
scw.sourceTablets = make([]*topodatapb.Tablet, len(scw.sourceAliases)) | |
for i, alias := range scw.sourceAliases { | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/legacy_split_clone.go:370") | |
ti, err := scw.wr.TopoServer().GetTablet(shortCtx, alias) | |
cancel() | |
if err != nil { | |
@@ -375,7 +376,7 @@ func (scw *LegacySplitCloneWorker) findTargets(ctx context.Context) error { | |
} | |
scw.sourceTablets[i] = ti.Tablet | |
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/legacy_split_clone.go:378") | |
err = scw.wr.TabletManagerClient().StopSlave(shortCtx, scw.sourceTablets[i]) | |
cancel() | |
if err != nil { | |
@@ -398,7 +399,7 @@ func (scw *LegacySplitCloneWorker) findTargets(ctx context.Context) error { | |
// Make sure we find a master for each destination shard and log it. | |
scw.wr.Logger().Infof("Finding a MASTER tablet for each destination shard...") | |
for _, si := range scw.destinationShards { | |
- waitCtx, waitCancel := context.WithTimeout(ctx, 10*time.Second) | |
+ waitCtx, waitCancel := context2.WithTimeout(ctx, 10*time.Second, "./go/vt/worker/legacy_split_clone.go:401") | |
defer waitCancel() | |
if err := scw.tsc.WaitForTablets(waitCtx, scw.cell, si.Keyspace(), si.ShardName(), topodatapb.TabletType_MASTER); err != nil { | |
return vterrors.Wrapf(err, "cannot find MASTER tablet for destination shard for %v/%v", si.Keyspace(), si.ShardName()) | |
@@ -410,7 +411,7 @@ func (scw *LegacySplitCloneWorker) findTargets(ctx context.Context) error { | |
master := masters[0] | |
// Get the MySQL database name of the tablet. | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/legacy_split_clone.go:413") | |
ti, err := scw.wr.TopoServer().GetTablet(shortCtx, master.Tablet.Alias) | |
cancel() | |
if err != nil { | |
@@ -453,7 +454,7 @@ func (scw *LegacySplitCloneWorker) copy(ctx context.Context) error { | |
// on all source shards. Furthermore, we estimate the number of rows | |
// in each source shard for each table to be about the same | |
// (rowCount is used to estimate an ETA) | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/legacy_split_clone.go:456") | |
sourceSchemaDefinition, err := scw.wr.GetSchema(shortCtx, scw.sourceAliases[0], nil, scw.excludeTables, false /* includeViews */) | |
cancel() | |
if err != nil { | |
@@ -476,7 +477,7 @@ func (scw *LegacySplitCloneWorker) copy(ctx context.Context) error { | |
mu := sync.Mutex{} | |
var firstError error | |
- ctx, cancelCopy := context.WithCancel(ctx) | |
+ ctx, cancelCopy := context2.WithCancel(ctx, "./go/vt/worker/legacy_split_clone.go:479") | |
processError := func(format string, args ...interface{}) { | |
scw.wr.Logger().Errorf(format, args...) | |
mu.Lock() | |
@@ -605,7 +606,7 @@ func (scw *LegacySplitCloneWorker) copy(ctx context.Context) error { | |
sourcePositions := make([]string, len(scw.sourceShards)) | |
// get the current position from the sources | |
for shardIndex := range scw.sourceShards { | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/legacy_split_clone.go:608") | |
status, err := scw.wr.TabletManagerClient().SlaveStatus(shortCtx, scw.sourceTablets[shardIndex]) | |
cancel() | |
if err != nil { | |
diff --git a/go/vt/worker/legacy_split_clone_test.go b/go/vt/worker/legacy_split_clone_test.go | |
index 65494ade2..8f721a2c9 100644 | |
--- a/go/vt/worker/legacy_split_clone_test.go | |
+++ b/go/vt/worker/legacy_split_clone_test.go | |
@@ -28,6 +28,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/mysql" | |
"vitess.io/vitess/go/mysql/fakesqldb" | |
"vitess.io/vitess/go/sqltypes" | |
@@ -468,7 +469,7 @@ func TestLegacySplitCloneV2_NoMasterAvailable(t *testing.T) { | |
statsRetryCounters.ResetAll() | |
errs := make(chan error, 1) | |
go func() { | |
- ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), 30*time.Second, "./go/vt/worker/legacy_split_clone_test.go:471") | |
defer cancel() | |
for { | |
diff --git a/go/vt/worker/multi_split_diff.go b/go/vt/worker/multi_split_diff.go | |
index 9a93ca1e1..c43d6a3d4 100644 | |
--- a/go/vt/worker/multi_split_diff.go | |
+++ b/go/vt/worker/multi_split_diff.go | |
@@ -24,6 +24,7 @@ import ( | |
"time" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/proto/vtrpc" | |
"vitess.io/vitess/go/vt/vttablet/queryservice" | |
"vitess.io/vitess/go/vt/vttablet/tabletconn" | |
@@ -200,13 +201,13 @@ func (msdw *MultiSplitDiffWorker) init(ctx context.Context) error { | |
} | |
var err error | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:203") | |
msdw.keyspaceInfo, err = msdw.wr.TopoServer().GetKeyspace(shortCtx, msdw.keyspace) | |
cancel() | |
if err != nil { | |
return vterrors.Wrapf(err, "cannot read keyspace %v", msdw.keyspace) | |
} | |
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:209") | |
msdw.shardInfo, err = msdw.wr.TopoServer().GetShard(shortCtx, msdw.keyspace, msdw.shard) | |
cancel() | |
if err != nil { | |
@@ -228,7 +229,7 @@ func (msdw *MultiSplitDiffWorker) init(ctx context.Context) error { | |
// findDestinationShards finds all the shards that have filtered replication from the source shard | |
func (msdw *MultiSplitDiffWorker) findDestinationShards(ctx context.Context) ([]*topo.ShardInfo, error) { | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:231") | |
keyspaces, err := msdw.wr.TopoServer().GetKeyspaces(shortCtx) | |
cancel() | |
if err != nil { | |
@@ -252,7 +253,7 @@ func (msdw *MultiSplitDiffWorker) findDestinationShards(ctx context.Context) ([] | |
} | |
func (msdw *MultiSplitDiffWorker) findShardsInKeyspace(ctx context.Context, keyspace string) ([]*topo.ShardInfo, error) { | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:255") | |
shards, err := msdw.wr.TopoServer().GetShardNames(shortCtx, keyspace) | |
cancel() | |
if err != nil { | |
@@ -284,7 +285,7 @@ func (msdw *MultiSplitDiffWorker) findShardsInKeyspace(ctx context.Context, keys | |
} | |
func (msdw *MultiSplitDiffWorker) getShardInfo(ctx context.Context, keyspace string, shard string) (*topo.ShardInfo, uint32, error) { | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:287") | |
si, err := msdw.wr.TopoServer().GetShard(shortCtx, keyspace, shard) | |
cancel() | |
if err != nil { | |
@@ -349,14 +350,14 @@ func (msdw *MultiSplitDiffWorker) stopVreplicationOnAll(ctx context.Context, tab | |
tablet := tabletInfo[i].Tablet | |
msdw.wr.Logger().Infof("stopping master binlog replication on %v", shardInfo.MasterAlias) | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:352") | |
_, err := msdw.wr.TabletManagerClient().VReplicationExec(shortCtx, tablet, binlogplayer.StopVReplication(msdw.sourceUID, "for split diff")) | |
cancel() | |
if err != nil { | |
return nil, vterrors.Wrapf(err, "VReplicationExec(stop) for %v failed", shardInfo.MasterAlias) | |
} | |
wrangler.RecordVReplicationAction(msdw.cleaner, tablet, binlogplayer.StartVReplication(msdw.sourceUID)) | |
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:359") | |
p3qr, err := msdw.wr.TabletManagerClient().VReplicationExec(shortCtx, tablet, binlogplayer.ReadVReplicationPos(msdw.sourceUID)) | |
cancel() | |
if err != nil { | |
@@ -375,7 +376,7 @@ func (msdw *MultiSplitDiffWorker) stopVreplicationOnAll(ctx context.Context, tab | |
} | |
func (msdw *MultiSplitDiffWorker) getMasterTabletInfoForShard(ctx context.Context, shardInfo *topo.ShardInfo) (*topo.TabletInfo, error) { | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:378") | |
masterInfo, err := msdw.wr.TopoServer().GetTablet(shortCtx, shardInfo.MasterAlias) | |
cancel() | |
if err != nil { | |
@@ -389,7 +390,7 @@ func (msdw *MultiSplitDiffWorker) getMasterTabletInfoForShard(ctx context.Contex | |
// (add a cleanup task to restart binlog replication on the source tablet, and | |
// change the existing ChangeSlaveType cleanup action to 'spare' type) | |
func (msdw *MultiSplitDiffWorker) stopReplicationOnSourceTabletAt(ctx context.Context, destVreplicationPos []string) (string, error) { | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:392") | |
sourceTablet, err := msdw.wr.TopoServer().GetTablet(shortCtx, msdw.sourceAlias) | |
cancel() | |
if err != nil { | |
@@ -406,14 +407,14 @@ func (msdw *MultiSplitDiffWorker) stopReplicationOnSourceTabletAt(ctx context.Co | |
msdw.wr.Logger().Infof("stopping slave %v at a minimum of %v", msdw.sourceAlias, vreplicationPos) | |
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:409") | |
msdw.wr.TabletManagerClient().StartSlave(shortCtx, sourceTablet.Tablet) | |
cancel() | |
if err != nil { | |
return "", err | |
} | |
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:416") | |
mysqlPos, err = msdw.wr.TabletManagerClient().StopSlaveMinimum(shortCtx, sourceTablet.Tablet, vreplicationPos, *remoteActionsTimeout) | |
cancel() | |
if err != nil { | |
@@ -431,21 +432,21 @@ func (msdw *MultiSplitDiffWorker) stopReplicationOnSourceTabletAt(ctx context.Co | |
// up to the specified source position, and return the destination position. | |
func (msdw *MultiSplitDiffWorker) stopVreplicationAt(ctx context.Context, shardInfo *topo.ShardInfo, sourcePosition string, masterInfo *topo.TabletInfo) (string, error) { | |
msdw.wr.Logger().Infof("Restarting master %v until it catches up to %v", shardInfo.MasterAlias, sourcePosition) | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:434") | |
_, err := msdw.wr.TabletManagerClient().VReplicationExec(shortCtx, masterInfo.Tablet, binlogplayer.StartVReplicationUntil(msdw.sourceUID, sourcePosition)) | |
cancel() | |
if err != nil { | |
return "", vterrors.Wrapf(err, "VReplication(start until) for %v until %v failed", shardInfo.MasterAlias, sourcePosition) | |
} | |
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:441") | |
err = msdw.wr.TabletManagerClient().VReplicationWaitForPos(shortCtx, masterInfo.Tablet, int(msdw.sourceUID), sourcePosition) | |
cancel() | |
if err != nil { | |
return "", vterrors.Wrapf(err, "VReplicationWaitForPos for %v until %v failed", shardInfo.MasterAlias, sourcePosition) | |
} | |
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:448") | |
masterPos, err := msdw.wr.TabletManagerClient().MasterPosition(shortCtx, masterInfo.Tablet) | |
cancel() | |
if err != nil { | |
@@ -464,7 +465,7 @@ func (msdw *MultiSplitDiffWorker) stopReplicationAt(ctx context.Context, destina | |
} else { | |
msdw.wr.Logger().Infof("waiting for destination tablet %v to catch up to %v", destinationAlias, masterPos) | |
} | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:467") | |
destinationTablet, err := msdw.wr.TopoServer().GetTablet(shortCtx, destinationAlias) | |
cancel() | |
if err != nil { | |
@@ -475,7 +476,7 @@ func (msdw *MultiSplitDiffWorker) stopReplicationAt(ctx context.Context, destina | |
time.Sleep(1 * time.Minute) | |
} | |
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:478") | |
if msdw.waitForFixedTimeRatherThanGtidSet { | |
err = msdw.wr.TabletManagerClient().StopSlave(shortCtx, destinationTablet.Tablet) | |
} else { | |
@@ -493,7 +494,7 @@ func (msdw *MultiSplitDiffWorker) stopReplicationAt(ctx context.Context, destina | |
// (remove the cleanup task that does the same) | |
func (msdw *MultiSplitDiffWorker) startVreplication(ctx context.Context, shardInfo *topo.ShardInfo, masterInfo *topo.TabletInfo) error { | |
msdw.wr.Logger().Infof("restarting filtered replication on master %v", shardInfo.MasterAlias) | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:496") | |
_, err := msdw.wr.TabletManagerClient().VReplicationExec(shortCtx, masterInfo.Tablet, binlogplayer.StartVReplication(msdw.sourceUID)) | |
if err != nil { | |
return vterrors.Wrapf(err, "VReplicationExec(start) failed for %v", shardInfo.MasterAlias) | |
@@ -560,7 +561,7 @@ func (msdw *MultiSplitDiffWorker) synchronizeSrcAndDestTxState(ctx context.Conte | |
} | |
} | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:563") | |
source, _ := msdw.wr.TopoServer().GetTablet(shortCtx, msdw.sourceAlias) | |
cancel() | |
@@ -604,7 +605,7 @@ func (msdw *MultiSplitDiffWorker) synchronizeSrcAndDestTxState(ctx context.Conte | |
return err | |
} | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:607") | |
destTabletInfo, err := msdw.wr.TopoServer().GetTablet(shortCtx, destinationAlias) | |
cancel() | |
if err != nil { | |
@@ -652,7 +653,7 @@ func (msdw *MultiSplitDiffWorker) synchronizeSrcAndDestTxState(ctx context.Conte | |
func (msdw *MultiSplitDiffWorker) waitForDestinationTabletToReach(ctx context.Context, tablet *topodatapb.Tablet, mysqlPos string) error { | |
for i := 0; i < 20; i++ { | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:655") | |
pos, err := msdw.wr.TabletManagerClient().MasterPosition(shortCtx, tablet) | |
cancel() | |
if err != nil { | |
@@ -758,7 +759,7 @@ func (msdw *MultiSplitDiffWorker) gatherSchemaInfo(ctx context.Context) ([]*tabl | |
wg.Add(1) | |
go func(i int, destinationAlias *topodatapb.TabletAlias) { | |
var err error | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:761") | |
destinationSchemaDefinition, err := msdw.wr.GetSchema( | |
shortCtx, destinationAlias, nil /* tables */, msdw.excludeTables, false /* includeViews */) | |
cancel() | |
@@ -773,7 +774,7 @@ func (msdw *MultiSplitDiffWorker) gatherSchemaInfo(ctx context.Context) ([]*tabl | |
wg.Add(1) | |
go func() { | |
var err error | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff.go:776") | |
sourceSchemaDefinition, err = msdw.wr.GetSchema( | |
shortCtx, msdw.sourceAlias, nil /* tables */, msdw.excludeTables, false /* includeViews */) | |
cancel() | |
@@ -811,7 +812,7 @@ func (msdw *MultiSplitDiffWorker) diffSchemaInformation(ctx context.Context, des | |
} | |
func (msdw *MultiSplitDiffWorker) loadVSchema(ctx context.Context) (*vindexes.KeyspaceSchema, error) { | |
- shortCtx, cancel := context.WithCancel(ctx) | |
+ shortCtx, cancel := context2.WithCancel(ctx, "./go/vt/worker/multi_split_diff.go:814") | |
kschema, err := msdw.wr.TopoServer().GetVSchema(shortCtx, msdw.keyspace) | |
cancel() | |
if err != nil { | |
diff --git a/go/vt/worker/multi_split_diff_cmd.go b/go/vt/worker/multi_split_diff_cmd.go | |
index 5a6b880c5..1c6cb4e42 100644 | |
--- a/go/vt/worker/multi_split_diff_cmd.go | |
+++ b/go/vt/worker/multi_split_diff_cmd.go | |
@@ -25,6 +25,7 @@ import ( | |
"strings" | |
"sync" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/proto/topodata" | |
"vitess.io/vitess/go/vt/proto/vtrpc" | |
"vitess.io/vitess/go/vt/vterrors" | |
@@ -123,7 +124,7 @@ func commandMultiSplitDiff(wi *Instance, wr *wrangler.Wrangler, subFlags *flag.F | |
// shardSources returns all the shards that are SourceShards of at least one other shard. | |
func shardSources(ctx context.Context, wr *wrangler.Wrangler) ([]map[string]string, error) { | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff_cmd.go:126") | |
keyspaces, err := wr.TopoServer().GetKeyspaces(shortCtx) | |
cancel() | |
if err != nil { | |
@@ -139,7 +140,7 @@ func shardSources(ctx context.Context, wr *wrangler.Wrangler) ([]map[string]stri | |
wg.Add(1) | |
go func(keyspace string) { | |
defer wg.Done() | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff_cmd.go:142") | |
shards, err := wr.TopoServer().GetShardNames(shortCtx, keyspace) | |
cancel() | |
if err != nil { | |
@@ -150,7 +151,7 @@ func shardSources(ctx context.Context, wr *wrangler.Wrangler) ([]map[string]stri | |
wg.Add(1) | |
go func(keyspace, shard string) { | |
defer wg.Done() | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/multi_split_diff_cmd.go:153") | |
si, err := wr.TopoServer().GetShard(shortCtx, keyspace, shard) | |
cancel() | |
if err != nil { | |
diff --git a/go/vt/worker/restartable_result_reader.go b/go/vt/worker/restartable_result_reader.go | |
index 04f95695f..995096e96 100644 | |
--- a/go/vt/worker/restartable_result_reader.go | |
+++ b/go/vt/worker/restartable_result_reader.go | |
@@ -22,6 +22,7 @@ import ( | |
"strings" | |
"time" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/vterrors" | |
"golang.org/x/net/context" | |
@@ -225,7 +226,7 @@ func (r *RestartableResultReader) Next() (*sqltypes.Result, error) { | |
func (r *RestartableResultReader) nextWithRetries() (*sqltypes.Result, error) { | |
// In case of errors we will keep retrying until retryCtx is done. | |
- retryCtx, retryCancel := context.WithTimeout(r.ctx, *retryDuration) | |
+ retryCtx, retryCancel := context2.WithTimeout(r.ctx, *retryDuration, "./go/vt/worker/restartable_result_reader.go:228") | |
defer retryCancel() | |
// The first retry is the second attempt because we already tried once in Next() | |
diff --git a/go/vt/worker/split_clone.go b/go/vt/worker/split_clone.go | |
index 8b8cd2d00..39fa401c7 100644 | |
--- a/go/vt/worker/split_clone.go | |
+++ b/go/vt/worker/split_clone.go | |
@@ -25,6 +25,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/event" | |
"vitess.io/vitess/go/stats" | |
"vitess.io/vitess/go/vt/binlog/binlogplayer" | |
@@ -527,7 +528,7 @@ func (scw *SplitCloneWorker) init(ctx context.Context) error { | |
scw.setState(WorkerStateInit) | |
// read the keyspace and validate it | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_clone.go:530") | |
var err error | |
scw.destinationKeyspaceInfo, err = scw.wr.TopoServer().GetKeyspace(shortCtx, scw.destinationKeyspace) | |
cancel() | |
@@ -577,7 +578,7 @@ func (scw *SplitCloneWorker) init(ctx context.Context) error { | |
func (scw *SplitCloneWorker) initShardsForHorizontalResharding(ctx context.Context) error { | |
// find the OverlappingShards in the keyspace | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_clone.go:580") | |
osList, err := topotools.FindOverlappingShards(shortCtx, scw.wr.TopoServer(), scw.destinationKeyspace) | |
cancel() | |
if err != nil { | |
@@ -634,7 +635,7 @@ func (scw *SplitCloneWorker) initShardsForVerticalSplit(ctx context.Context) err | |
} | |
sourceKeyspace := servedFrom | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_clone.go:637") | |
shardMap, err := scw.wr.TopoServer().FindAllShardsInKeyspace(shortCtx, sourceKeyspace) | |
cancel() | |
if err != nil { | |
@@ -774,7 +775,7 @@ func (scw *SplitCloneWorker) findOfflineSourceTablets(ctx context.Context) error | |
// get the tablet info for them, and stop their replication | |
scw.sourceTablets = make([]*topodatapb.Tablet, len(scw.sourceAliases)) | |
for i, alias := range scw.sourceAliases { | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_clone.go:777") | |
ti, err := scw.wr.TopoServer().GetTablet(shortCtx, alias) | |
cancel() | |
if err != nil { | |
@@ -782,7 +783,7 @@ func (scw *SplitCloneWorker) findOfflineSourceTablets(ctx context.Context) error | |
} | |
scw.sourceTablets[i] = ti.Tablet | |
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_clone.go:785") | |
err = scw.wr.TabletManagerClient().StopSlave(shortCtx, scw.sourceTablets[i]) | |
cancel() | |
if err != nil { | |
@@ -817,7 +818,7 @@ func (scw *SplitCloneWorker) findTransactionalSources(ctx context.Context) error | |
// get the tablet info | |
scw.sourceTablets = make([]*topodatapb.Tablet, 1) | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_clone.go:820") | |
ti, err := scw.wr.TopoServer().GetTablet(shortCtx, scw.sourceAliases[0]) | |
cancel() | |
if err != nil { | |
@@ -843,7 +844,7 @@ func (scw *SplitCloneWorker) findDestinationMasters(ctx context.Context) error { | |
// Make sure we find a master for each destination shard and log it. | |
scw.wr.Logger().Infof("Finding a MASTER tablet for each destination shard...") | |
for _, si := range scw.destinationShards { | |
- waitCtx, waitCancel := context.WithTimeout(ctx, *waitForHealthyTabletsTimeout) | |
+ waitCtx, waitCancel := context2.WithTimeout(ctx, *waitForHealthyTabletsTimeout, "./go/vt/worker/split_clone.go:846") | |
err := scw.tsc.WaitForTablets(waitCtx, scw.cell, si.Keyspace(), si.ShardName(), topodatapb.TabletType_MASTER) | |
waitCancel() | |
if err != nil { | |
@@ -1178,7 +1179,7 @@ func (scw *SplitCloneWorker) clone(ctx context.Context, state StatusWorkerState) | |
var firstError error | |
- ctx, cancelCopy := context.WithCancel(ctx) | |
+ ctx, cancelCopy := context2.WithCancel(ctx, "./go/vt/worker/split_clone.go:1181") | |
defer cancelCopy() | |
processError := func(format string, args ...interface{}) { | |
// in theory we could have two threads see firstError as null and both write to the variable | |
@@ -1245,7 +1246,7 @@ func (scw *SplitCloneWorker) setUpVReplication(ctx context.Context) error { | |
sourcePositions[0] = scw.lastPos | |
} else { | |
for shardIndex := range scw.sourceShards { | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_clone.go:1248") | |
status, err := scw.wr.TabletManagerClient().SlaveStatus(shortCtx, scw.sourceTablets[shardIndex]) | |
cancel() | |
if err != nil { | |
@@ -1254,7 +1255,7 @@ func (scw *SplitCloneWorker) setUpVReplication(ctx context.Context) error { | |
sourcePositions[shardIndex] = status.Position | |
} | |
} | |
- cancelableCtx, cancel := context.WithCancel(ctx) | |
+ cancelableCtx, cancel := context2.WithCancel(ctx, "./go/vt/worker/split_clone.go:1257") | |
rec := concurrency.AllErrorRecorder{} | |
handleError := func(e error) { | |
rec.RecordError(e) | |
@@ -1319,7 +1320,7 @@ func (scw *SplitCloneWorker) getSourceSchema(ctx context.Context, tablet *topoda | |
// on all source shards. Furthermore, we estimate the number of rows | |
// in each source shard for each table to be about the same | |
// (rowCount is used to estimate an ETA) | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_clone.go:1322") | |
sourceSchemaDefinition, err := scw.wr.GetSchema(shortCtx, tablet.Alias, scw.tables, scw.excludeTables, false /* includeViews */) | |
cancel() | |
if err != nil { | |
diff --git a/go/vt/worker/split_clone_cmd.go b/go/vt/worker/split_clone_cmd.go | |
index 4cf6b8795..da7782709 100644 | |
--- a/go/vt/worker/split_clone_cmd.go | |
+++ b/go/vt/worker/split_clone_cmd.go | |
@@ -27,6 +27,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/concurrency" | |
"vitess.io/vitess/go/vt/proto/topodata" | |
"vitess.io/vitess/go/vt/proto/vtrpc" | |
@@ -151,7 +152,7 @@ func commandSplitClone(wi *Instance, wr *wrangler.Wrangler, subFlags *flag.FlagS | |
} | |
func keyspacesWithOverlappingShards(ctx context.Context, wr *wrangler.Wrangler) ([]map[string]string, error) { | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_clone_cmd.go:154") | |
keyspaces, err := wr.TopoServer().GetKeyspaces(shortCtx) | |
cancel() | |
if err != nil { | |
@@ -166,7 +167,7 @@ func keyspacesWithOverlappingShards(ctx context.Context, wr *wrangler.Wrangler) | |
wg.Add(1) | |
go func(keyspace string) { | |
defer wg.Done() | |
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_clone_cmd.go:169") | |
osList, err := topotools.FindOverlappingShards(shortCtx, wr.TopoServer(), keyspace) | |
cancel() | |
if err != nil { | |
diff --git a/go/vt/worker/split_clone_test.go b/go/vt/worker/split_clone_test.go | |
index 88a83ae6e..7cf717db9 100644 | |
--- a/go/vt/worker/split_clone_test.go | |
+++ b/go/vt/worker/split_clone_test.go | |
@@ -27,6 +27,7 @@ import ( | |
"time" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/mysql" | |
"vitess.io/vitess/go/mysql/fakesqldb" | |
"vitess.io/vitess/go/sqltypes" | |
@@ -1025,7 +1026,7 @@ func TestSplitCloneV2_NoMasterAvailable(t *testing.T) { | |
// late because this Go routine looks at it and can run before the worker. | |
statsRetryCounters.ResetAll() | |
go func() { | |
- ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), 30*time.Second, "./go/vt/worker/split_clone_test.go:1028") | |
defer cancel() | |
for { | |
diff --git a/go/vt/worker/split_diff.go b/go/vt/worker/split_diff.go | |
index 55b5c084f..0eeb44db6 100644 | |
--- a/go/vt/worker/split_diff.go | |
+++ b/go/vt/worker/split_diff.go | |
@@ -21,6 +21,7 @@ import ( | |
"sort" | |
"sync" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/proto/vtrpc" | |
"vitess.io/vitess/go/vt/vterrors" | |
@@ -187,13 +188,13 @@ func (sdw *SplitDiffWorker) init(ctx context.Context) error { | |
sdw.SetState(WorkerStateInit) | |
var err error | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff.go:190") | |
sdw.keyspaceInfo, err = sdw.wr.TopoServer().GetKeyspace(shortCtx, sdw.keyspace) | |
cancel() | |
if err != nil { | |
return vterrors.Wrapf(err, "cannot read keyspace %v", sdw.keyspace) | |
} | |
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff.go:196") | |
sdw.shardInfo, err = sdw.wr.TopoServer().GetShard(shortCtx, sdw.keyspace, sdw.shard) | |
cancel() | |
if err != nil { | |
@@ -258,7 +259,7 @@ func (sdw *SplitDiffWorker) findTargets(ctx context.Context) error { | |
// to FindWorkerTablet could attempt to set to DRAIN state the same tablet. Only | |
// one of these calls to FindWorkerTablet will succeed and the rest will fail. | |
// The following, makes sures we keep trying to find a worker tablet when this error occur. | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff.go:261") | |
for { | |
select { | |
case <-shortCtx.Done(): | |
@@ -297,7 +298,7 @@ func (sdw *SplitDiffWorker) findTargets(ctx context.Context) error { | |
func (sdw *SplitDiffWorker) synchronizeReplication(ctx context.Context) error { | |
sdw.SetState(WorkerStateSyncReplication) | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff.go:300") | |
defer cancel() | |
masterInfo, err := sdw.wr.TopoServer().GetTablet(shortCtx, sdw.shardInfo.MasterAlias) | |
if err != nil { | |
@@ -306,7 +307,7 @@ func (sdw *SplitDiffWorker) synchronizeReplication(ctx context.Context) error { | |
// 1 - stop the master binlog replication, get its current position | |
sdw.wr.Logger().Infof("Stopping master binlog replication on %v", sdw.shardInfo.MasterAlias) | |
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff.go:309") | |
defer cancel() | |
_, err = sdw.wr.TabletManagerClient().VReplicationExec(shortCtx, masterInfo.Tablet, binlogplayer.StopVReplication(sdw.sourceShard.Uid, "for split diff")) | |
if err != nil { | |
@@ -331,7 +332,7 @@ func (sdw *SplitDiffWorker) synchronizeReplication(ctx context.Context) error { | |
return err | |
} | |
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff.go:334") | |
defer cancel() | |
mysqlPos, err := sdw.wr.TabletManagerClient().StopSlaveMinimum(shortCtx, sourceTablet.Tablet, vreplicationPos, *remoteActionsTimeout) | |
if err != nil { | |
@@ -345,7 +346,7 @@ func (sdw *SplitDiffWorker) synchronizeReplication(ctx context.Context) error { | |
// 3 - ask the master of the destination shard to resume filtered | |
// replication up to the new list of positions | |
sdw.wr.Logger().Infof("Restarting master %v until it catches up to %v", sdw.shardInfo.MasterAlias, mysqlPos) | |
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff.go:348") | |
defer cancel() | |
_, err = sdw.wr.TabletManagerClient().VReplicationExec(shortCtx, masterInfo.Tablet, binlogplayer.StartVReplicationUntil(sdw.sourceShard.Uid, mysqlPos)) | |
if err != nil { | |
@@ -362,13 +363,13 @@ func (sdw *SplitDiffWorker) synchronizeReplication(ctx context.Context) error { | |
// 4 - wait until the destination tablet is equal or passed | |
// that master binlog position, and stop its replication. | |
sdw.wr.Logger().Infof("Waiting for destination tablet %v to catch up to %v", sdw.destinationAlias, masterPos) | |
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff.go:365") | |
defer cancel() | |
destinationTablet, err := sdw.wr.TopoServer().GetTablet(shortCtx, sdw.destinationAlias) | |
if err != nil { | |
return err | |
} | |
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff.go:371") | |
defer cancel() | |
if _, err = sdw.wr.TabletManagerClient().StopSlaveMinimum(shortCtx, destinationTablet.Tablet, masterPos, *remoteActionsTimeout); err != nil { | |
return vterrors.Wrapf(err, "StopSlaveMinimum for %v at %v failed", sdw.destinationAlias, masterPos) | |
@@ -377,7 +378,7 @@ func (sdw *SplitDiffWorker) synchronizeReplication(ctx context.Context) error { | |
// 5 - restart filtered replication on destination master | |
sdw.wr.Logger().Infof("Restarting filtered replication on master %v", sdw.shardInfo.MasterAlias) | |
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff.go:380") | |
defer cancel() | |
if _, err = sdw.wr.TabletManagerClient().VReplicationExec(shortCtx, masterInfo.Tablet, binlogplayer.StartVReplication(sdw.sourceShard.Uid)); err != nil { | |
return vterrors.Wrapf(err, "VReplicationExec(start) failed for %v", sdw.shardInfo.MasterAlias) | |
@@ -400,7 +401,7 @@ func (sdw *SplitDiffWorker) diff(ctx context.Context) error { | |
wg.Add(1) | |
go func() { | |
var err error | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff.go:403") | |
sdw.destinationSchemaDefinition, err = sdw.wr.GetSchema( | |
shortCtx, sdw.destinationAlias, nil /* tables */, sdw.excludeTables, false /* includeViews */) | |
cancel() | |
@@ -413,7 +414,7 @@ func (sdw *SplitDiffWorker) diff(ctx context.Context) error { | |
wg.Add(1) | |
go func() { | |
var err error | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff.go:416") | |
sdw.sourceSchemaDefinition, err = sdw.wr.GetSchema( | |
shortCtx, sdw.sourceAlias, nil /* tables */, sdw.excludeTables, false /* includeViews */) | |
cancel() | |
diff --git a/go/vt/worker/split_diff_cmd.go b/go/vt/worker/split_diff_cmd.go | |
index 5ece6f5dc..bd52efae6 100644 | |
--- a/go/vt/worker/split_diff_cmd.go | |
+++ b/go/vt/worker/split_diff_cmd.go | |
@@ -25,6 +25,7 @@ import ( | |
"strings" | |
"sync" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/proto/vtrpc" | |
"vitess.io/vitess/go/vt/vterrors" | |
@@ -114,7 +115,7 @@ func commandSplitDiff(wi *Instance, wr *wrangler.Wrangler, subFlags *flag.FlagSe | |
// shardsWithSources returns all the shards that have SourceShards set | |
// with no Tables list. | |
func shardsWithSources(ctx context.Context, wr *wrangler.Wrangler) ([]map[string]string, error) { | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff_cmd.go:117") | |
keyspaces, err := wr.TopoServer().GetKeyspaces(shortCtx) | |
cancel() | |
if err != nil { | |
@@ -129,7 +130,7 @@ func shardsWithSources(ctx context.Context, wr *wrangler.Wrangler) ([]map[string | |
wg.Add(1) | |
go func(keyspace string) { | |
defer wg.Done() | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff_cmd.go:132") | |
shards, err := wr.TopoServer().GetShardNames(shortCtx, keyspace) | |
cancel() | |
if err != nil { | |
@@ -140,7 +141,7 @@ func shardsWithSources(ctx context.Context, wr *wrangler.Wrangler) ([]map[string | |
wg.Add(1) | |
go func(keyspace, shard string) { | |
defer wg.Done() | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/split_diff_cmd.go:143") | |
si, err := wr.TopoServer().GetShard(shortCtx, keyspace, shard) | |
cancel() | |
if err != nil { | |
diff --git a/go/vt/worker/tablet_provider.go b/go/vt/worker/tablet_provider.go | |
index 2d42bcb7f..47a7b950b 100644 | |
--- a/go/vt/worker/tablet_provider.go | |
+++ b/go/vt/worker/tablet_provider.go | |
@@ -19,6 +19,7 @@ package worker | |
import ( | |
"fmt" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/vterrors" | |
"golang.org/x/net/context" | |
@@ -57,7 +58,7 @@ func newSingleTabletProvider(ctx context.Context, ts *topo.Server, alias *topoda | |
} | |
func (p *singleTabletProvider) getTablet() (*topodatapb.Tablet, error) { | |
- shortCtx, cancel := context.WithTimeout(p.ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(p.ctx, *remoteActionsTimeout, "./go/vt/worker/tablet_provider.go:60") | |
tablet, err := p.ts.GetTablet(shortCtx, p.alias) | |
cancel() | |
if err != nil { | |
diff --git a/go/vt/worker/topo_utils.go b/go/vt/worker/topo_utils.go | |
index 4dcc52a1d..aa7b4a182 100644 | |
--- a/go/vt/worker/topo_utils.go | |
+++ b/go/vt/worker/topo_utils.go | |
@@ -22,6 +22,7 @@ import ( | |
"math/rand" | |
"time" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/vterrors" | |
"golang.org/x/net/context" | |
@@ -68,7 +69,7 @@ func FindHealthyTablet(ctx context.Context, wr *wrangler.Wrangler, tsc *discover | |
} | |
func waitForHealthyTablets(ctx context.Context, wr *wrangler.Wrangler, tsc *discovery.TabletStatsCache, cell, keyspace, shard string, minHealthyRdonlyTablets int, timeout time.Duration, tabletType topodatapb.TabletType) ([]discovery.TabletStats, error) { | |
- busywaitCtx, busywaitCancel := context.WithTimeout(ctx, timeout) | |
+ busywaitCtx, busywaitCancel := context2.WithTimeout(ctx, timeout, "./go/vt/worker/topo_utils.go:71") | |
defer busywaitCancel() | |
start := time.Now() | |
@@ -122,7 +123,7 @@ func FindWorkerTablet(ctx context.Context, wr *wrangler.Wrangler, cleaner *wrang | |
} | |
wr.Logger().Infof("Changing tablet %v to '%v'", topoproto.TabletAliasString(tabletAlias), topodatapb.TabletType_DRAINED) | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/topo_utils.go:125") | |
err = wr.ChangeSlaveType(shortCtx, tabletAlias, topodatapb.TabletType_DRAINED) | |
cancel() | |
if err != nil { | |
@@ -131,7 +132,7 @@ func FindWorkerTablet(ctx context.Context, wr *wrangler.Wrangler, cleaner *wrang | |
ourURL := servenv.ListeningURL.String() | |
wr.Logger().Infof("Adding tag[worker]=%v to tablet %v", ourURL, topoproto.TabletAliasString(tabletAlias)) | |
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/topo_utils.go:134") | |
_, err = wr.TopoServer().UpdateTabletFields(shortCtx, tabletAlias, func(tablet *topodatapb.Tablet) error { | |
if tablet.Tags == nil { | |
tablet.Tags = make(map[string]string) | |
@@ -154,7 +155,7 @@ func FindWorkerTablet(ctx context.Context, wr *wrangler.Wrangler, cleaner *wrang | |
wrangler.RecordChangeSlaveTypeAction(cleaner, tabletAlias, topodatapb.TabletType_DRAINED, tabletType) | |
// We refresh the destination vttablet reloads the worker URL when it reloads the tablet. | |
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/topo_utils.go:157") | |
wr.RefreshTabletState(shortCtx, tabletAlias) | |
if err != nil { | |
return nil, err | |
diff --git a/go/vt/worker/utils_test.go b/go/vt/worker/utils_test.go | |
index 98bdd416e..6dbe8a9dc 100644 | |
--- a/go/vt/worker/utils_test.go | |
+++ b/go/vt/worker/utils_test.go | |
@@ -22,6 +22,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/mysql/fakesqldb" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/vt/topo" | |
@@ -39,7 +40,7 @@ import ( | |
func runCommand(t *testing.T, wi *Instance, wr *wrangler.Wrangler, args []string) error { | |
// Limit the scope of the context e.g. to implicitly terminate stray Go | |
// routines. | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/worker/utils_test.go:42") | |
defer cancel() | |
worker, done, err := wi.RunCommand(ctx, args, wr, false /* runFromCli */) | |
diff --git a/go/vt/worker/vertical_split_clone_cmd.go b/go/vt/worker/vertical_split_clone_cmd.go | |
index 7045fb3e6..0298ca959 100644 | |
--- a/go/vt/worker/vertical_split_clone_cmd.go | |
+++ b/go/vt/worker/vertical_split_clone_cmd.go | |
@@ -25,6 +25,7 @@ import ( | |
"strings" | |
"sync" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/proto/topodata" | |
"vitess.io/vitess/go/vt/vterrors" | |
@@ -152,7 +153,7 @@ func commandVerticalSplitClone(wi *Instance, wr *wrangler.Wrangler, subFlags *fl | |
// keyspacesWithServedFrom returns all the keyspaces that have ServedFrom set | |
// to one value. | |
func keyspacesWithServedFrom(ctx context.Context, wr *wrangler.Wrangler) ([]string, error) { | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_clone_cmd.go:155") | |
keyspaces, err := wr.TopoServer().GetKeyspaces(shortCtx) | |
cancel() | |
if err != nil { | |
@@ -167,7 +168,7 @@ func keyspacesWithServedFrom(ctx context.Context, wr *wrangler.Wrangler) ([]stri | |
wg.Add(1) | |
go func(keyspace string) { | |
defer wg.Done() | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_clone_cmd.go:170") | |
ki, err := wr.TopoServer().GetKeyspace(shortCtx, keyspace) | |
cancel() | |
if err != nil { | |
@@ -288,7 +289,7 @@ func interactiveVerticalSplitClone(ctx context.Context, wi *Instance, wr *wrangl | |
} | |
// Figure out the shard | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_clone_cmd.go:291") | |
shardMap, err := wr.TopoServer().FindAllShardsInKeyspace(shortCtx, keyspace) | |
cancel() | |
if err != nil { | |
diff --git a/go/vt/worker/vertical_split_diff.go b/go/vt/worker/vertical_split_diff.go | |
index fb5a8e3a0..a99c8ae95 100644 | |
--- a/go/vt/worker/vertical_split_diff.go | |
+++ b/go/vt/worker/vertical_split_diff.go | |
@@ -21,6 +21,7 @@ import ( | |
"html/template" | |
"sync" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/vterrors" | |
"golang.org/x/net/context" | |
@@ -261,7 +262,7 @@ func (vsdw *VerticalSplitDiffWorker) findTargets(ctx context.Context) error { | |
func (vsdw *VerticalSplitDiffWorker) synchronizeReplication(ctx context.Context) error { | |
vsdw.SetState(WorkerStateSyncReplication) | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_diff.go:264") | |
defer cancel() | |
masterInfo, err := vsdw.wr.TopoServer().GetTablet(shortCtx, vsdw.shardInfo.MasterAlias) | |
if err != nil { | |
@@ -272,7 +273,7 @@ func (vsdw *VerticalSplitDiffWorker) synchronizeReplication(ctx context.Context) | |
// 1 - stop the master binlog replication, get its current position | |
vsdw.wr.Logger().Infof("Stopping master binlog replication on %v", topoproto.TabletAliasString(vsdw.shardInfo.MasterAlias)) | |
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_diff.go:275") | |
defer cancel() | |
_, err = vsdw.wr.TabletManagerClient().VReplicationExec(shortCtx, masterInfo.Tablet, binlogplayer.StopVReplication(ss.Uid, "for split diff")) | |
if err != nil { | |
@@ -291,7 +292,7 @@ func (vsdw *VerticalSplitDiffWorker) synchronizeReplication(ctx context.Context) | |
// stop replication | |
vsdw.wr.Logger().Infof("Stopping slave %v at a minimum of %v", topoproto.TabletAliasString(vsdw.sourceAlias), vreplicationPos) | |
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_diff.go:294") | |
defer cancel() | |
sourceTablet, err := vsdw.wr.TopoServer().GetTablet(shortCtx, vsdw.sourceAlias) | |
if err != nil { | |
@@ -309,7 +310,7 @@ func (vsdw *VerticalSplitDiffWorker) synchronizeReplication(ctx context.Context) | |
// 3 - ask the master of the destination shard to resume filtered | |
// replication up to the new list of positions | |
vsdw.wr.Logger().Infof("Restarting master %v until it catches up to %v", topoproto.TabletAliasString(vsdw.shardInfo.MasterAlias), mysqlPos) | |
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_diff.go:312") | |
defer cancel() | |
_, err = vsdw.wr.TabletManagerClient().VReplicationExec(shortCtx, masterInfo.Tablet, binlogplayer.StartVReplicationUntil(ss.Uid, mysqlPos)) | |
if err != nil { | |
@@ -326,13 +327,13 @@ func (vsdw *VerticalSplitDiffWorker) synchronizeReplication(ctx context.Context) | |
// 4 - wait until the destination tablet is equal or passed | |
// that master binlog position, and stop its replication. | |
vsdw.wr.Logger().Infof("Waiting for destination tablet %v to catch up to %v", topoproto.TabletAliasString(vsdw.destinationAlias), masterPos) | |
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_diff.go:329") | |
defer cancel() | |
destinationTablet, err := vsdw.wr.TopoServer().GetTablet(shortCtx, vsdw.destinationAlias) | |
if err != nil { | |
return err | |
} | |
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_diff.go:335") | |
defer cancel() | |
_, err = vsdw.wr.TabletManagerClient().StopSlaveMinimum(shortCtx, destinationTablet.Tablet, masterPos, *remoteActionsTimeout) | |
if err != nil { | |
@@ -342,7 +343,7 @@ func (vsdw *VerticalSplitDiffWorker) synchronizeReplication(ctx context.Context) | |
// 5 - restart filtered replication on destination master | |
vsdw.wr.Logger().Infof("Restarting filtered replication on master %v", topoproto.TabletAliasString(vsdw.shardInfo.MasterAlias)) | |
- shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel = context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_diff.go:345") | |
defer cancel() | |
if _, err = vsdw.wr.TabletManagerClient().VReplicationExec(shortCtx, masterInfo.Tablet, binlogplayer.StartVReplication(ss.Uid)); err != nil { | |
return vterrors.Wrapf(err, "VReplicationExec(start) failed for %v", vsdw.shardInfo.MasterAlias) | |
@@ -365,7 +366,7 @@ func (vsdw *VerticalSplitDiffWorker) diff(ctx context.Context) error { | |
wg.Add(1) | |
go func() { | |
var err error | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_diff.go:368") | |
vsdw.destinationSchemaDefinition, err = vsdw.wr.GetSchema( | |
shortCtx, vsdw.destinationAlias, vsdw.shardInfo.SourceShards[0].Tables, nil /* excludeTables */, false /* includeViews */) | |
cancel() | |
@@ -378,7 +379,7 @@ func (vsdw *VerticalSplitDiffWorker) diff(ctx context.Context) error { | |
wg.Add(1) | |
go func() { | |
var err error | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_diff.go:381") | |
vsdw.sourceSchemaDefinition, err = vsdw.wr.GetSchema( | |
shortCtx, vsdw.sourceAlias, vsdw.shardInfo.SourceShards[0].Tables, nil /* excludeTables */, false /* includeViews */) | |
cancel() | |
diff --git a/go/vt/worker/vertical_split_diff_cmd.go b/go/vt/worker/vertical_split_diff_cmd.go | |
index e4cfd4d34..cc997b9d1 100644 | |
--- a/go/vt/worker/vertical_split_diff_cmd.go | |
+++ b/go/vt/worker/vertical_split_diff_cmd.go | |
@@ -24,6 +24,7 @@ import ( | |
"strconv" | |
"sync" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/vterrors" | |
"golang.org/x/net/context" | |
@@ -101,7 +102,7 @@ func commandVerticalSplitDiff(wi *Instance, wr *wrangler.Wrangler, subFlags *fla | |
// shardsWithTablesSources returns all the shards that have SourceShards set | |
// to one value, with an array of Tables. | |
func shardsWithTablesSources(ctx context.Context, wr *wrangler.Wrangler) ([]map[string]string, error) { | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_diff_cmd.go:104") | |
keyspaces, err := wr.TopoServer().GetKeyspaces(shortCtx) | |
cancel() | |
if err != nil { | |
@@ -116,7 +117,7 @@ func shardsWithTablesSources(ctx context.Context, wr *wrangler.Wrangler) ([]map[ | |
wg.Add(1) | |
go func(keyspace string) { | |
defer wg.Done() | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_diff_cmd.go:119") | |
shards, err := wr.TopoServer().GetShardNames(shortCtx, keyspace) | |
cancel() | |
if err != nil { | |
@@ -127,7 +128,7 @@ func shardsWithTablesSources(ctx context.Context, wr *wrangler.Wrangler) ([]map[ | |
wg.Add(1) | |
go func(keyspace, shard string) { | |
defer wg.Done() | |
- shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) | |
+ shortCtx, cancel := context2.WithTimeout(ctx, *remoteActionsTimeout, "./go/vt/worker/vertical_split_diff_cmd.go:130") | |
si, err := wr.TopoServer().GetShard(shortCtx, keyspace, shard) | |
cancel() | |
if err != nil { | |
diff --git a/go/vt/worker/vtworkerclienttest/client_testsuite.go b/go/vt/worker/vtworkerclienttest/client_testsuite.go | |
index b5ff551d5..1ac4b3bff 100644 | |
--- a/go/vt/worker/vtworkerclienttest/client_testsuite.go | |
+++ b/go/vt/worker/vtworkerclienttest/client_testsuite.go | |
@@ -34,6 +34,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/logutil" | |
"vitess.io/vitess/go/vt/topo/memorytopo" | |
"vitess.io/vitess/go/vt/vterrors" | |
@@ -123,7 +124,7 @@ func runVtworkerCommand(client vtworkerclient.Client, args []string) error { | |
func commandErrorsBecauseBusy(t *testing.T, client vtworkerclient.Client, serverSideCancelation bool) { | |
// Run the vtworker "Block" command which blocks until we cancel the context. | |
var wg sync.WaitGroup | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/worker/vtworkerclienttest/client_testsuite.go:126") | |
// blockCommandStarted will be closed after we're sure that vtworker is | |
// running the "Block" command. | |
blockCommandStarted := make(chan struct{}) | |
diff --git a/go/vt/workflow/manager.go b/go/vt/workflow/manager.go | |
index f3f94d39f..c4529a8bd 100644 | |
--- a/go/vt/workflow/manager.go | |
+++ b/go/vt/workflow/manager.go | |
@@ -26,6 +26,7 @@ import ( | |
gouuid "github.com/pborman/uuid" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/log" | |
"vitess.io/vitess/go/vt/topo" | |
@@ -352,7 +353,7 @@ func (m *Manager) Start(ctx context.Context, uuid string) error { | |
func (m *Manager) runWorkflow(rw *runningWorkflow) { | |
// Create a context to run it. | |
var ctx context.Context | |
- ctx, rw.cancel = context.WithCancel(m.ctx) | |
+ ctx, rw.cancel = context2.WithCancel(m.ctx, "./go/vt/workflow/manager.go:355") | |
// And run it in the background. | |
go m.executeWorkflowRun(ctx, rw) | |
@@ -589,7 +590,7 @@ func AvailableFactories() map[string]bool { | |
// StartManager starts a manager. This function should only be used for tests purposes. | |
func StartManager(m *Manager) (*sync.WaitGroup, context.Context, context.CancelFunc) { | |
- ctx, cancel := context.WithCancel(context.Background()) | |
+ ctx, cancel := context2.WithCancel(context.Background(), "./go/vt/workflow/manager.go:592") | |
wg := &sync.WaitGroup{} | |
wg.Add(1) | |
go func() { | |
diff --git a/go/vt/wrangler/cleaner.go b/go/vt/wrangler/cleaner.go | |
index e40b9c08d..837d20a88 100644 | |
--- a/go/vt/wrangler/cleaner.go | |
+++ b/go/vt/wrangler/cleaner.go | |
@@ -22,6 +22,7 @@ import ( | |
"time" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/vt/concurrency" | |
"vitess.io/vitess/go/vt/topo" | |
"vitess.io/vitess/go/vt/topo/topoproto" | |
@@ -96,7 +97,7 @@ type cleanUpHelper struct { | |
// basis. They are then serialized on each target. | |
func (cleaner *Cleaner) CleanUp(wr *Wrangler) error { | |
// we use a background context so we're not dependent on the original context timeout | |
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), 5*time.Minute, "./go/vt/wrangler/cleaner.go:99") | |
actionMap := make(map[string]*cleanUpHelper) | |
rec := concurrency.AllErrorRecorder{} | |
cleaner.mu.Lock() | |
diff --git a/go/vt/wrangler/fake_tablet_test.go b/go/vt/wrangler/fake_tablet_test.go | |
index 85adabf6c..fc5ada634 100644 | |
--- a/go/vt/wrangler/fake_tablet_test.go | |
+++ b/go/vt/wrangler/fake_tablet_test.go | |
@@ -25,6 +25,7 @@ import ( | |
"golang.org/x/net/context" | |
"google.golang.org/grpc" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/mysql/fakesqldb" | |
"vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon" | |
querypb "vitess.io/vitess/go/vt/proto/query" | |
@@ -183,7 +184,7 @@ func (ft *fakeTablet) StartActionLoop(t *testing.T, wr *Wrangler) { | |
step := 10 * time.Millisecond | |
c := tmclient.NewTabletManagerClient() | |
for timeout >= 0 { | |
- ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), 1*time.Second, "./go/vt/wrangler/fake_tablet_test.go:186") | |
err := c.Ping(ctx, ft.Agent.Tablet()) | |
cancel() | |
if err == nil { | |
diff --git a/go/vt/wrangler/keyspace.go b/go/vt/wrangler/keyspace.go | |
index b31f20d83..6d64cb55b 100644 | |
--- a/go/vt/wrangler/keyspace.go | |
+++ b/go/vt/wrangler/keyspace.go | |
@@ -25,6 +25,7 @@ import ( | |
"time" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/event" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/vt/binlog/binlogplayer" | |
@@ -495,7 +496,7 @@ func (wr *Wrangler) waitForFilteredReplication(ctx context.Context, sourcePositi | |
wg.Add(1) | |
go func(si *topo.ShardInfo) { | |
defer wg.Done() | |
- ctx, cancel := context.WithTimeout(ctx, waitTime) | |
+ ctx, cancel := context2.WithTimeout(ctx, waitTime, "./go/vt/wrangler/keyspace.go:498") | |
defer cancel() | |
var pos string | |
@@ -1231,7 +1232,7 @@ func (wr *Wrangler) replicaMigrateServedFrom(ctx context.Context, ki *topo.Keysp | |
// replication and starts accepting writes | |
func (wr *Wrangler) masterMigrateServedFrom(ctx context.Context, ki *topo.KeyspaceInfo, sourceShard *topo.ShardInfo, destinationShard *topo.ShardInfo, tables []string, ev *events.MigrateServedFrom, filteredReplicationWaitTime time.Duration) error { | |
// Read the data we need | |
- ctx, cancel := context.WithTimeout(ctx, filteredReplicationWaitTime) | |
+ ctx, cancel := context2.WithTimeout(ctx, filteredReplicationWaitTime, "./go/vt/wrangler/keyspace.go:1234") | |
defer cancel() | |
sourceMasterTabletInfo, err := wr.ts.GetTablet(ctx, sourceShard.MasterAlias) | |
if err != nil { | |
@@ -1356,7 +1357,7 @@ func (wr *Wrangler) RefreshTabletsByShard(ctx context.Context, si *topo.ShardInf | |
// Setting an upper bound timeout to fail faster in case of an error. | |
// Using 60 seconds because RefreshState should not take more than 30 seconds. | |
// (RefreshState will restart the tablet's QueryService and most time will be spent on the shutdown, i.e. waiting up to 30 seconds on transactions (see Config.TransactionTimeout)). | |
- ctx, cancel := context.WithTimeout(ctx, 60*time.Second) | |
+ ctx, cancel := context2.WithTimeout(ctx, 60*time.Second, "./go/vt/wrangler/keyspace.go:1359") | |
if err := wr.tmc.RefreshState(ctx, ti.Tablet); err != nil { | |
wr.Logger().Warningf("RefreshTabletsByShard: failed to refresh %v: %v", ti.AliasString(), err) | |
} | |
diff --git a/go/vt/wrangler/migrater.go b/go/vt/wrangler/migrater.go | |
index 0f5c368a2..91c7ed462 100644 | |
--- a/go/vt/wrangler/migrater.go | |
+++ b/go/vt/wrangler/migrater.go | |
@@ -28,6 +28,7 @@ import ( | |
"github.com/golang/protobuf/proto" | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/sync2" | |
"vitess.io/vitess/go/vt/binlog/binlogplayer" | |
@@ -505,7 +506,7 @@ func (mi *migrater) changeTableSourceWrites(ctx context.Context, access accessTy | |
} | |
func (mi *migrater) waitForCatchup(ctx context.Context, filteredReplicationWaitTime time.Duration) error { | |
- ctx, cancel := context.WithTimeout(ctx, filteredReplicationWaitTime) | |
+ ctx, cancel := context2.WithTimeout(ctx, filteredReplicationWaitTime, "./go/vt/wrangler/migrater.go:508") | |
defer cancel() | |
var mu sync.Mutex | |
diff --git a/go/vt/wrangler/reparent.go b/go/vt/wrangler/reparent.go | |
index 06b99e21f..9e752e1f1 100644 | |
--- a/go/vt/wrangler/reparent.go | |
+++ b/go/vt/wrangler/reparent.go | |
@@ -26,6 +26,7 @@ import ( | |
"sync" | |
"time" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/event" | |
"vitess.io/vitess/go/mysql" | |
"vitess.io/vitess/go/sqlescape" | |
@@ -206,7 +207,7 @@ func (wr *Wrangler) initShardMasterLocked(ctx context.Context, ev *events.Repare | |
// at a wrong replication spot. | |
// Create a context for the following RPCs that respects waitSlaveTimeout | |
- resetCtx, resetCancel := context.WithTimeout(ctx, waitSlaveTimeout) | |
+ resetCtx, resetCancel := context2.WithTimeout(ctx, waitSlaveTimeout, "./go/vt/wrangler/reparent.go:209") | |
defer resetCancel() | |
event.DispatchUpdate(ev, "resetting replication on all tablets") | |
@@ -249,7 +250,7 @@ func (wr *Wrangler) initShardMasterLocked(ctx context.Context, ev *events.Repare | |
// Create a cancelable context for the following RPCs. | |
// If error conditions happen, we can cancel all outgoing RPCs. | |
- replCtx, replCancel := context.WithTimeout(ctx, waitSlaveTimeout) | |
+ replCtx, replCancel := context2.WithTimeout(ctx, waitSlaveTimeout, "./go/vt/wrangler/reparent.go:252") | |
defer replCancel() | |
// Now tell the new master to insert the reparent_journal row, | |
@@ -414,7 +415,7 @@ func (wr *Wrangler) plannedReparentShardLocked(ctx context.Context, ev *events.R | |
ev.OldMaster = *oldMasterTabletInfo.Tablet | |
// create a new context for the short running remote operations | |
- remoteCtx, remoteCancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout) | |
+ remoteCtx, remoteCancel := context2.WithTimeout(ctx, *topo.RemoteOperationTimeout, "./go/vt/wrangler/reparent.go:417") | |
defer remoteCancel() | |
// Demote the current master, get its replication position | |
@@ -425,7 +426,7 @@ func (wr *Wrangler) plannedReparentShardLocked(ctx context.Context, ev *events.R | |
return fmt.Errorf("old master tablet %v DemoteMaster failed: %v", topoproto.TabletAliasString(shardInfo.MasterAlias), err) | |
} | |
- remoteCtx, remoteCancel = context.WithTimeout(ctx, waitSlaveTimeout) | |
+ remoteCtx, remoteCancel = context2.WithTimeout(ctx, waitSlaveTimeout, "./go/vt/wrangler/reparent.go:428") | |
defer remoteCancel() | |
// Wait on the master-elect tablet until it reaches that position, | |
@@ -436,7 +437,7 @@ func (wr *Wrangler) plannedReparentShardLocked(ctx context.Context, ev *events.R | |
if err != nil || (ctx.Err() != nil && ctx.Err() == context.DeadlineExceeded) { | |
remoteCancel() | |
// if this fails it is not enough to return an error. we should rollback all the changes made by DemoteMaster | |
- remoteCtx, remoteCancel = context.WithTimeout(ctx, *topo.RemoteOperationTimeout) | |
+ remoteCtx, remoteCancel = context2.WithTimeout(ctx, *topo.RemoteOperationTimeout, "./go/vt/wrangler/reparent.go:439") | |
defer remoteCancel() | |
if err1 := wr.tmc.UndoDemoteMaster(remoteCtx, oldMasterTabletInfo.Tablet); err1 != nil { | |
log.Warningf("Encountered error %v while trying to undo DemoteMaster", err1) | |
@@ -451,7 +452,7 @@ func (wr *Wrangler) plannedReparentShardLocked(ctx context.Context, ev *events.R | |
// Create a cancelable context for the following RPCs. | |
// If error conditions happen, we can cancel all outgoing RPCs. | |
- replCtx, replCancel := context.WithTimeout(ctx, waitSlaveTimeout) | |
+ replCtx, replCancel := context2.WithTimeout(ctx, waitSlaveTimeout, "./go/vt/wrangler/reparent.go:454") | |
defer replCancel() | |
// Go through all the tablets: | |
@@ -533,7 +534,7 @@ func (maxPosSearch *maxReplPosSearch) processTablet(tablet *topodatapb.Tablet) { | |
defer maxPosSearch.waitGroup.Done() | |
maxPosSearch.wrangler.logger.Infof("getting replication position from %v", topoproto.TabletAliasString(tablet.Alias)) | |
- slaveStatusCtx, cancelSlaveStatus := context.WithTimeout(maxPosSearch.ctx, maxPosSearch.waitSlaveTimeout) | |
+ slaveStatusCtx, cancelSlaveStatus := context2.WithTimeout(maxPosSearch.ctx, maxPosSearch.waitSlaveTimeout, "./go/vt/wrangler/reparent.go:536") | |
defer cancelSlaveStatus() | |
status, err := maxPosSearch.wrangler.tmc.SlaveStatus(slaveStatusCtx, tablet) | |
@@ -668,7 +669,7 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events | |
ev.OldMaster = *oldMasterTabletInfo.Tablet | |
wr.logger.Infof("deleting old master %v", shardInfoMasterAliasStr) | |
- ctx, cancel := context.WithTimeout(ctx, waitSlaveTimeout) | |
+ ctx, cancel := context2.WithTimeout(ctx, waitSlaveTimeout, "./go/vt/wrangler/reparent.go:671") | |
defer cancel() | |
if err := topotools.DeleteTablet(ctx, wr.ts, oldMasterTabletInfo.Tablet); err != nil { | |
@@ -688,7 +689,7 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events | |
go func(alias string, tabletInfo *topo.TabletInfo) { | |
defer wg.Done() | |
wr.logger.Infof("getting replication position from %v", alias) | |
- ctx, cancel := context.WithTimeout(ctx, waitSlaveTimeout) | |
+ ctx, cancel := context2.WithTimeout(ctx, waitSlaveTimeout, "./go/vt/wrangler/reparent.go:691") | |
defer cancel() | |
rp, err := wr.tmc.StopReplicationAndGetStatus(ctx, tabletInfo.Tablet) | |
if err != nil { | |
@@ -744,7 +745,7 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events | |
// Create a cancelable context for the following RPCs. | |
// If error conditions happen, we can cancel all outgoing RPCs. | |
- replCtx, replCancel := context.WithCancel(ctx) | |
+ replCtx, replCancel := context2.WithCancel(ctx, "./go/vt/wrangler/reparent.go:747") | |
defer replCancel() | |
// Reset replication on all slaves to point to the new master, and | |
diff --git a/go/vt/wrangler/schema.go b/go/vt/wrangler/schema.go | |
index fd0411727..ec9201196 100644 | |
--- a/go/vt/wrangler/schema.go | |
+++ b/go/vt/wrangler/schema.go | |
@@ -26,6 +26,7 @@ import ( | |
"golang.org/x/net/context" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/sync2" | |
"vitess.io/vitess/go/vt/concurrency" | |
@@ -359,7 +360,7 @@ func (wr *Wrangler) CopySchemaShard(ctx context.Context, sourceTabletAlias *topo | |
// Notify slaves to reload schema. This is best-effort. | |
concurrency := sync2.NewSemaphore(10, 0) | |
- reloadCtx, cancel := context.WithTimeout(ctx, waitSlaveTimeout) | |
+ reloadCtx, cancel := context2.WithTimeout(ctx, waitSlaveTimeout, "./go/vt/wrangler/schema.go:362") | |
defer cancel() | |
wr.ReloadSchemaShard(reloadCtx, destKeyspace, destShard, destMasterPos, concurrency, true /* includeMaster */) | |
return nil | |
@@ -435,7 +436,7 @@ func (wr *Wrangler) applySQLShard(ctx context.Context, tabletInfo *topo.TabletIn | |
if err != nil { | |
return fmt.Errorf("fillStringTemplate failed: %v", err) | |
} | |
- ctx, cancel := context.WithTimeout(ctx, 30*time.Second) | |
+ ctx, cancel := context2.WithTimeout(ctx, 30*time.Second, "./go/vt/wrangler/schema.go:438") | |
defer cancel() | |
// Need to make sure that we enable binlog, since we're only applying the statement on masters. | |
_, err = wr.tmc.ExecuteFetchAsDba(ctx, tabletInfo.Tablet, false, []byte(filledChange), 0, false, reloadSchema) | |
diff --git a/go/vt/wrangler/testlib/fake_tablet.go b/go/vt/wrangler/testlib/fake_tablet.go | |
index 75766e27d..557ee2ad0 100644 | |
--- a/go/vt/wrangler/testlib/fake_tablet.go | |
+++ b/go/vt/wrangler/testlib/fake_tablet.go | |
@@ -30,6 +30,7 @@ import ( | |
"golang.org/x/net/context" | |
"google.golang.org/grpc" | |
+ "vitess.io/vitess/go/context2" | |
"vitess.io/vitess/go/mysql/fakesqldb" | |
"vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon" | |
"vitess.io/vitess/go/vt/topo" | |
@@ -206,7 +207,7 @@ func (ft *FakeTablet) StartActionLoop(t *testing.T, wr *wrangler.Wrangler) { | |
step := 10 * time.Millisecond | |
c := tmclient.NewTabletManagerClient() | |
for timeout >= 0 { | |
- ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) | |
+ ctx, cancel := context2.WithTimeout(context.Background(), 1*time.Second, "./go/vt/wrangler/testlib/fake_tablet.go:209") | |
err := c.Ping(ctx, ft.Agent.Tablet()) | |
cancel() | |
if err == nil { |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment