Created
March 29, 2017 12:27
-
-
Save bbeaudreault/729887a513bc0e54dcccb5cbdb93ed4b to your computer and use it in GitHub Desktop.
Disclaimer: This was the first piece of go I wrote and I've not run this in a few months. It may not work as is, but could be used as a starting point for some hacking. It expects to be run against the general log of mysql, and makes some assumptions about you care about. For instance it ignores a long list of DBA queries
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
From a89a8d6144136512574681f36e5e9828c757090d Mon Sep 17 00:00:00 2001 | |
From: Bryan Beaudreault <[email protected]> | |
Date: Wed, 4 Jan 2017 11:23:12 -0500 | |
Subject: [PATCH] vtparse -- a tool for analyzing query logs, sending them | |
through vitess planbuilders, and outputting errors | |
--- | |
go/cmd/vtparse/main.go | 10 + | |
go/cmd/vtparse/plugin_cephbackupstorage.go | 5 + | |
go/cmd/vtparse/plugin_etcd2topo.go | 22 + | |
go/cmd/vtparse/plugin_etcdtopo.go | 22 + | |
go/cmd/vtparse/plugin_filebackupstorage.go | 9 + | |
go/cmd/vtparse/plugin_gcsbackupstorage.go | 9 + | |
go/cmd/vtparse/plugin_grpctabletconn.go | 11 + | |
go/cmd/vtparse/plugin_grpcthrottlerclient.go | 11 + | |
go/cmd/vtparse/plugin_grpctmclient.go | 11 + | |
go/cmd/vtparse/plugin_influxdbbackend.go | 11 + | |
go/cmd/vtparse/plugin_s3backupstorage.go | 5 + | |
go/cmd/vtparse/plugin_zk2topo.go | 18 + | |
go/cmd/vtparse/plugin_zktopo.go | 22 + | |
go/cmd/vtparse/vtparse.go | 651 +++++++++++++++++++++++++++ | |
vendor/vendor.json | 6 + | |
15 files changed, 823 insertions(+) | |
create mode 100644 go/cmd/vtparse/main.go | |
create mode 100644 go/cmd/vtparse/plugin_cephbackupstorage.go | |
create mode 100644 go/cmd/vtparse/plugin_etcd2topo.go | |
create mode 100644 go/cmd/vtparse/plugin_etcdtopo.go | |
create mode 100644 go/cmd/vtparse/plugin_filebackupstorage.go | |
create mode 100644 go/cmd/vtparse/plugin_gcsbackupstorage.go | |
create mode 100644 go/cmd/vtparse/plugin_grpctabletconn.go | |
create mode 100644 go/cmd/vtparse/plugin_grpcthrottlerclient.go | |
create mode 100644 go/cmd/vtparse/plugin_grpctmclient.go | |
create mode 100644 go/cmd/vtparse/plugin_influxdbbackend.go | |
create mode 100644 go/cmd/vtparse/plugin_s3backupstorage.go | |
create mode 100644 go/cmd/vtparse/plugin_zk2topo.go | |
create mode 100644 go/cmd/vtparse/plugin_zktopo.go | |
create mode 100644 go/cmd/vtparse/vtparse.go | |
diff --git a/go/cmd/vtparse/main.go b/go/cmd/vtparse/main.go | |
new file mode 100644 | |
index 000000000..6d8d78ee4 | |
--- /dev/null | |
+++ b/go/cmd/vtparse/main.go | |
@@ -0,0 +1,10 @@ | |
+package main | |
+ | |
+import ( | |
+ "github.com/youtube/vitess/go/vt/topo" | |
+) | |
+ | |
+// used at runtime by plug-ins | |
+var ( | |
+ ts topo.Server | |
+) | |
\ No newline at end of file | |
diff --git a/go/cmd/vtparse/plugin_cephbackupstorage.go b/go/cmd/vtparse/plugin_cephbackupstorage.go | |
new file mode 100644 | |
index 000000000..bc5d7b427 | |
--- /dev/null | |
+++ b/go/cmd/vtparse/plugin_cephbackupstorage.go | |
@@ -0,0 +1,5 @@ | |
+package main | |
+ | |
+import ( | |
+ _ "github.com/youtube/vitess/go/vt/mysqlctl/cephbackupstorage" | |
+) | |
diff --git a/go/cmd/vtparse/plugin_etcd2topo.go b/go/cmd/vtparse/plugin_etcd2topo.go | |
new file mode 100644 | |
index 000000000..a3bb8759d | |
--- /dev/null | |
+++ b/go/cmd/vtparse/plugin_etcd2topo.go | |
@@ -0,0 +1,22 @@ | |
+// Copyright 2014, Google Inc. All rights reserved. | |
+// Use of this source code is governed by a BSD-style | |
+// license that can be found in the LICENSE file. | |
+ | |
+package main | |
+ | |
+// This plugin imports etcdtopo to register the etcd implementation of TopoServer. | |
+ | |
+import ( | |
+ "github.com/youtube/vitess/go/vt/servenv" | |
+ "github.com/youtube/vitess/go/vt/topo/etcd2topo" | |
+ "github.com/youtube/vitess/go/vt/vtctld" | |
+) | |
+ | |
+func init() { | |
+ // Wait until flags are parsed, so we can check which topo server is in use. | |
+ servenv.OnRun(func() { | |
+ if s, ok := ts.Impl.(*etcd2topo.Server); ok { | |
+ vtctld.HandleExplorer("etcd2", vtctld.NewBackendExplorer(s)) | |
+ } | |
+ }) | |
+} | |
diff --git a/go/cmd/vtparse/plugin_etcdtopo.go b/go/cmd/vtparse/plugin_etcdtopo.go | |
new file mode 100644 | |
index 000000000..e3f4bc774 | |
--- /dev/null | |
+++ b/go/cmd/vtparse/plugin_etcdtopo.go | |
@@ -0,0 +1,22 @@ | |
+// Copyright 2014, Google Inc. All rights reserved. | |
+// Use of this source code is governed by a BSD-style | |
+// license that can be found in the LICENSE file. | |
+ | |
+package main | |
+ | |
+// This plugin imports etcdtopo to register the etcd implementation of TopoServer. | |
+ | |
+import ( | |
+ "github.com/youtube/vitess/go/vt/etcdtopo" | |
+ "github.com/youtube/vitess/go/vt/servenv" | |
+ "github.com/youtube/vitess/go/vt/vtctld" | |
+) | |
+ | |
+func init() { | |
+ // Wait until flags are parsed, so we can check which topo server is in use. | |
+ servenv.OnRun(func() { | |
+ if etcdServer, ok := ts.Impl.(*etcdtopo.Server); ok { | |
+ vtctld.HandleExplorer("etcd", etcdtopo.NewExplorer(etcdServer)) | |
+ } | |
+ }) | |
+} | |
diff --git a/go/cmd/vtparse/plugin_filebackupstorage.go b/go/cmd/vtparse/plugin_filebackupstorage.go | |
new file mode 100644 | |
index 000000000..cf6c96313 | |
--- /dev/null | |
+++ b/go/cmd/vtparse/plugin_filebackupstorage.go | |
@@ -0,0 +1,9 @@ | |
+// Copyright 2015, Google Inc. All rights reserved. | |
+// Use of this source code is governed by a BSD-style | |
+// license that can be found in the LICENSE file. | |
+ | |
+package main | |
+ | |
+import ( | |
+ _ "github.com/youtube/vitess/go/vt/mysqlctl/filebackupstorage" | |
+) | |
diff --git a/go/cmd/vtparse/plugin_gcsbackupstorage.go b/go/cmd/vtparse/plugin_gcsbackupstorage.go | |
new file mode 100644 | |
index 000000000..de22788fd | |
--- /dev/null | |
+++ b/go/cmd/vtparse/plugin_gcsbackupstorage.go | |
@@ -0,0 +1,9 @@ | |
+// Copyright 2015, Google Inc. All rights reserved. | |
+// Use of this source code is governed by a BSD-style | |
+// license that can be found in the LICENSE file. | |
+ | |
+package main | |
+ | |
+import ( | |
+ _ "github.com/youtube/vitess/go/vt/mysqlctl/gcsbackupstorage" | |
+) | |
diff --git a/go/cmd/vtparse/plugin_grpctabletconn.go b/go/cmd/vtparse/plugin_grpctabletconn.go | |
new file mode 100644 | |
index 000000000..282f56094 | |
--- /dev/null | |
+++ b/go/cmd/vtparse/plugin_grpctabletconn.go | |
@@ -0,0 +1,11 @@ | |
+// Copyright 2013, Google Inc. All rights reserved. | |
+// Use of this source code is governed by a BSD-style | |
+// license that can be found in the LICENSE file. | |
+ | |
+package main | |
+ | |
+// Imports and register the gRPC tabletconn client | |
+ | |
+import ( | |
+ _ "github.com/youtube/vitess/go/vt/tabletserver/grpctabletconn" | |
+) | |
diff --git a/go/cmd/vtparse/plugin_grpcthrottlerclient.go b/go/cmd/vtparse/plugin_grpcthrottlerclient.go | |
new file mode 100644 | |
index 000000000..e62ca45b4 | |
--- /dev/null | |
+++ b/go/cmd/vtparse/plugin_grpcthrottlerclient.go | |
@@ -0,0 +1,11 @@ | |
+// Copyright 2016, Google Inc. All rights reserved. | |
+// Use of this source code is governed by a BSD-style | |
+// license that can be found in the LICENSE file. | |
+ | |
+package main | |
+ | |
+// Imports and register the gRPC throttler client. | |
+ | |
+import ( | |
+ _ "github.com/youtube/vitess/go/vt/throttler/grpcthrottlerclient" | |
+) | |
diff --git a/go/cmd/vtparse/plugin_grpctmclient.go b/go/cmd/vtparse/plugin_grpctmclient.go | |
new file mode 100644 | |
index 000000000..a41f5ccad | |
--- /dev/null | |
+++ b/go/cmd/vtparse/plugin_grpctmclient.go | |
@@ -0,0 +1,11 @@ | |
+// Copyright 2013, Google Inc. All rights reserved. | |
+// Use of this source code is governed by a BSD-style | |
+// license that can be found in the LICENSE file. | |
+ | |
+package main | |
+ | |
+// Imports and register the gRPC tabletmanager client | |
+ | |
+import ( | |
+ _ "github.com/youtube/vitess/go/vt/tabletmanager/grpctmclient" | |
+) | |
diff --git a/go/cmd/vtparse/plugin_influxdbbackend.go b/go/cmd/vtparse/plugin_influxdbbackend.go | |
new file mode 100644 | |
index 000000000..0c1605e4e | |
--- /dev/null | |
+++ b/go/cmd/vtparse/plugin_influxdbbackend.go | |
@@ -0,0 +1,11 @@ | |
+// Copyright 2014, Google Inc. All rights reserved. | |
+// Use of this source code is governed by a BSD-style | |
+// license that can be found in the LICENSE file. | |
+ | |
+package main | |
+ | |
+// This plugin imports influxdbbackend to register the influxdbbackend stats backend. | |
+ | |
+import ( | |
+ _ "github.com/youtube/vitess/go/stats/influxdbbackend" | |
+) | |
diff --git a/go/cmd/vtparse/plugin_s3backupstorage.go b/go/cmd/vtparse/plugin_s3backupstorage.go | |
new file mode 100644 | |
index 000000000..42b627e35 | |
--- /dev/null | |
+++ b/go/cmd/vtparse/plugin_s3backupstorage.go | |
@@ -0,0 +1,5 @@ | |
+package main | |
+ | |
+import ( | |
+ _ "github.com/youtube/vitess/go/vt/mysqlctl/s3backupstorage" | |
+) | |
diff --git a/go/cmd/vtparse/plugin_zk2topo.go b/go/cmd/vtparse/plugin_zk2topo.go | |
new file mode 100644 | |
index 000000000..d9f63b357 | |
--- /dev/null | |
+++ b/go/cmd/vtparse/plugin_zk2topo.go | |
@@ -0,0 +1,18 @@ | |
+package main | |
+ | |
+// Imports and register the Zookeeper TopologyServer and its Explorer. | |
+ | |
+import ( | |
+ "github.com/youtube/vitess/go/vt/servenv" | |
+ "github.com/youtube/vitess/go/vt/topo/zk2topo" | |
+ "github.com/youtube/vitess/go/vt/vtctld" | |
+) | |
+ | |
+func init() { | |
+ // Wait until flags are parsed, so we can check which topo server is in use. | |
+ servenv.OnRun(func() { | |
+ if s, ok := ts.Impl.(*zk2topo.Server); ok { | |
+ vtctld.HandleExplorer("zk2", vtctld.NewBackendExplorer(s)) | |
+ } | |
+ }) | |
+} | |
diff --git a/go/cmd/vtparse/plugin_zktopo.go b/go/cmd/vtparse/plugin_zktopo.go | |
new file mode 100644 | |
index 000000000..9b4626a91 | |
--- /dev/null | |
+++ b/go/cmd/vtparse/plugin_zktopo.go | |
@@ -0,0 +1,22 @@ | |
+// Copyright 2013, Google Inc. All rights reserved. | |
+// Use of this source code is governed by a BSD-style | |
+// license that can be found in the LICENSE file. | |
+ | |
+package main | |
+ | |
+// Imports and register the Zookeeper TopologyServer and its Explorer. | |
+ | |
+import ( | |
+ "github.com/youtube/vitess/go/vt/servenv" | |
+ "github.com/youtube/vitess/go/vt/vtctld" | |
+ "github.com/youtube/vitess/go/vt/zktopo" | |
+) | |
+ | |
+func init() { | |
+ // Wait until flags are parsed, so we can check which topo server is in use. | |
+ servenv.OnRun(func() { | |
+ if zkServer, ok := ts.Impl.(*zktopo.Server); ok { | |
+ vtctld.HandleExplorer("zk", zktopo.NewZkExplorer(zkServer.GetZConn())) | |
+ } | |
+ }) | |
+} | |
diff --git a/go/cmd/vtparse/vtparse.go b/go/cmd/vtparse/vtparse.go | |
new file mode 100644 | |
index 000000000..de17bf00b | |
--- /dev/null | |
+++ b/go/cmd/vtparse/vtparse.go | |
@@ -0,0 +1,651 @@ | |
+// Copyright 2017, Google Inc. All rights reserved. | |
+// Use of this source code is governed by a BSD-style | |
+// license that can be found in the LICENSE file. | |
+ | |
+package main | |
+ | |
+import ( | |
+ "bufio" | |
+ "bytes" | |
+ "database/sql" | |
+ "flag" | |
+ "fmt" | |
+ "io" | |
+ "os" | |
+ "os/signal" | |
+ "path/filepath" | |
+ "regexp" | |
+ "strings" | |
+ "syscall" | |
+ "time" | |
+ | |
+ "runtime" | |
+ | |
+ "reflect" | |
+ | |
+ _ "github.com/go-sql-driver/mysql" | |
+ log "github.com/golang/glog" | |
+ "github.com/youtube/vitess/go/exit" | |
+ "github.com/youtube/vitess/go/vt/dbconfigs" | |
+ "github.com/youtube/vitess/go/vt/logutil" | |
+ "github.com/youtube/vitess/go/vt/proto/topodata" | |
+ "github.com/youtube/vitess/go/vt/servenv" | |
+ "github.com/youtube/vitess/go/vt/sqlparser" | |
+ "github.com/youtube/vitess/go/vt/tabletmanager/tmclient" | |
+ "github.com/youtube/vitess/go/vt/tabletserver" | |
+ "github.com/youtube/vitess/go/vt/topo" | |
+ "github.com/youtube/vitess/go/vt/topo/topoproto" | |
+ "github.com/youtube/vitess/go/vt/topotools" | |
+ "github.com/youtube/vitess/go/vt/vtgate/engine" | |
+ "github.com/youtube/vitess/go/vt/vtgate/planbuilder" | |
+ "github.com/youtube/vitess/go/vt/vtgate/vindexes" | |
+ "github.com/youtube/vitess/go/vt/wrangler" | |
+ "golang.org/x/net/context" | |
+) | |
+ | |
+var ( | |
+ tailBytes = flag.Int64("tailBytes", 10000, "bytes to parse of input file, starting at the end") | |
+ waitTime = flag.Duration("wait-time", 1*time.Minute, "time to wait while parsing lines") | |
+ cell = flag.String("parse-cell", "", "cell to execute against") | |
+ keyspace = flag.String("parse-keyspace", "", "keyspace to execute against") | |
+ connUser = flag.String("parse-conn-user", "root", "User to connect to db, i.e. root") | |
+ connPassword = flag.String("parse-conn-password", "", "Password to connect to db") | |
+ connHost = flag.String("parse-conn-host", "localhost:3306", "Host and port to connect to db, i.e. localhost:3306") | |
+ connSocket = flag.String("parse-conn-socket", "", "Unix socket file to connect on, i.e. /var/run/mysql.sock") | |
+ connFile = flag.String("parse-conn-file", "", "Path to file containing connect info for db. Should contain at least one each of user=,host=,password=, on separate lines") | |
+ outFile = flag.String("parse-out-file", "", "Path to a file where any unparseable sql queries and their corresponding error output will be sent") | |
+ compareExplain = flag.Bool("parse-compare-explain", true, "Whether to run explains of queries against the db to check that the parsed queries make sense") | |
+ verbosity = flag.Int("parse-verbosity", 1, "How verbose should logging be? 1 == just final stats, 2 == ignored errors, 3 == all") | |
+ alias = flag.String("alias", "", "Tablet alias to use in initializing tablet before parsing") | |
+ hostname = flag.String("hostname", "", "Hostname to use in initializing tablet before parsing") | |
+ justInitialize = flag.Bool("parse-just-init", false, "If set, will initialize the tablet and schema then exit") | |
+ | |
+ ignoredErrorPatterns = []*regexp.Regexp{ | |
+ regexp.MustCompile("(keyspace) (\\w+) not found in vschema"), | |
+ regexp.MustCompile("(symbol) (@@[\\w.]+) not found"), | |
+ regexp.MustCompile("(Table) ('\\w+\\.\\w+') doesn't exist"), | |
+ } | |
+ | |
+ handledErrorPatterns = []handledError{ | |
+ {regexp.MustCompile("(?i)syntax error at position (\\d+)"), "unspecified syntax error"}, | |
+ {regexp.MustCompile("(?i)syntax error at position (?:\\d+) near '(.*)'$"), "syntax error near token"}, | |
+ {regexp.MustCompile("(?i)unsupported: (.*)"), "unsupported construct"}, | |
+ {regexp.MustCompile("(?i)symbol (\\S+) not found"), "symbol not found"}, | |
+ } | |
+ | |
+ informationSchema = regexp.MustCompile("(?i)SELECT .+ FROM INFORMATION_SCHEMA") | |
+ setVariables = regexp.MustCompile("(?i)/\\*!\\d+ SET .*\\*/") | |
+ | |
+ flushLocalLogsPatterns = []*regexp.Regexp{ | |
+ regexp.MustCompile("(?i)^/usr/sbin/mysqld,\\s*Version:.*\\.\\s*started with:"), | |
+ regexp.MustCompile("(?i)^Tcp port:\\s*\\d+\\s*Unix socket:.*"), | |
+ regexp.MustCompile("(?i)^Time\\s*Id\\s*Command\\s*Argument"), | |
+ } | |
+ | |
+ queryCache = make(map[string]int) | |
+ ignoreCache = make(map[string]bool) | |
+ unknownErrorExamples = make(map[string]string) | |
+ handledErrorExamples = make(map[string]map[string]string) | |
+ | |
+ unknownErrorCounts = make(map[string]int) | |
+ handledErrorCounts = make(map[string]map[string]int) | |
+ ignoredErrorCounts = make(map[string]map[string]int) | |
+ | |
+ linesSeen, linesMatched, linesWithoutQuery int | |
+ unknownErrors, handledErrors, ignoredErrors int | |
+ totalQueries, attemptedQueries, successfulQueries, skippedQueries int | |
+ | |
+ // todo: parse as both vttablet and vtgate? | |
+) | |
+ | |
+func main() { | |
+ defer exit.Recover() | |
+ defer logutil.Flush() | |
+ | |
+ sigChan := make(chan os.Signal) | |
+ go func() { | |
+ stacktrace := make([]byte, 8192) | |
+ for range sigChan { | |
+ length := runtime.Stack(stacktrace, true) | |
+ fmt.Println(string(stacktrace[:length])) | |
+ } | |
+ }() | |
+ signal.Notify(sigChan, syscall.SIGQUIT) | |
+ | |
+ // flag parsing | |
+ dbconfigFlags := dbconfigs.DbaConfig | |
+ dbconfigs.RegisterFlags(dbconfigFlags) | |
+ | |
+ flag.Usage = func() { | |
+ fmt.Fprintf(os.Stdout, "Usage: %s [options] input_file \n", os.Args[0]) | |
+ fmt.Fprint(os.Stdout, "\nGiven an input_file general log, goes back tailBytes (default 10000) and attempts to parse queries in the log\n") | |
+ fmt.Fprint(os.Stdout, "\nOptions:\n") | |
+ flag.PrintDefaults() | |
+ fmt.Fprint(os.Stdout, "\n") | |
+ } | |
+ | |
+ flag.Parse() | |
+ | |
+ if len(flag.Args()) != 1 { | |
+ flag.Usage() | |
+ log.Errorf("vtparse requires an input_file positional argument") | |
+ exit.Return(1) | |
+ } | |
+ servenv.FireRunHooks() | |
+ | |
+ ts := topo.Open() | |
+ defer ts.Close() | |
+ | |
+ ctx, cancel := context.WithTimeout(context.Background(), *waitTime) | |
+ wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient()) | |
+ | |
+ installSignalHandlers(cancel) | |
+ | |
+ srv, err := wr.TopoServer().GetSrvVSchema(ctx, *cell) | |
+ exitOnError(err, "get wrangler") | |
+ vs, err := vindexes.BuildVSchema(srv) | |
+ exitOnError(err, "build vschema") | |
+ | |
+ fileName := flag.Arg(0) | |
+ stat, err := os.Stat(fileName) | |
+ startSize := stat.Size() | |
+ in, pos, err := openToLoc(fileName, startSize) | |
+ | |
+ exitOnError(err, "seek file %s", fileName) | |
+ | |
+ reader := bufio.NewReader(in) | |
+ | |
+ lineRe := regexp.MustCompile("(\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{6}Z\\d*)(?:\\s+\\d+)?\\s+(\\w+)\\t*(?:(.+))?$") | |
+ | |
+ var buffer bytes.Buffer | |
+ started := false | |
+ | |
+ creds := getCredentials() | |
+ // simplify format string below by prepending : | |
+ if val, ok := creds["password"]; ok && len(val) > 0 { | |
+ creds["password"] = fmt.Sprintf(":%s", creds["password"]) | |
+ } | |
+ | |
+ var address string | |
+ if val, ok := creds["socket"]; ok && len(val) > 0 { | |
+ address = fmt.Sprintf("unix(%s)", creds["socket"]) | |
+ } else if val, ok := creds["host"]; ok && len(val) > 0 { | |
+ address = fmt.Sprintf("tcp(%s)", creds["host"]) | |
+ } else { | |
+ panic("Need to specify either socket or host, through -parse-conn-host, -parse-conn-socket, or -parse-conn-file") | |
+ } | |
+ db, err := sql.Open("mysql", fmt.Sprintf("%s%s@%s/%s", creds["user"], creds["password"], address, *keyspace)) | |
+ defer db.Close() | |
+ | |
+ err = db.Ping() | |
+ if err != nil { | |
+ panic(err.Error()) // proper error handling instead of panic in your app | |
+ } | |
+ | |
+ tabletAlias, err := topoproto.ParseTabletAlias(*alias) | |
+ exitOnError(err, "parse alias") | |
+ | |
+ tablet := &topodata.Tablet{ | |
+ Alias: tabletAlias, | |
+ Hostname: *hostname, | |
+ PortMap: make(map[string]int32), | |
+ Keyspace: *keyspace, | |
+ Shard: "0", | |
+ Type: topodata.TabletType_REPLICA, | |
+ DbNameOverride: *keyspace, | |
+ Tags: make(map[string]string), | |
+ } | |
+ | |
+ err = wr.InitTablet(ctx, tablet, false, true, false) | |
+ if err != nil && err != topo.ErrNodeExists { | |
+ exitOnError(err, "init tablet") | |
+ } else { | |
+ defer cleanupTabletAndShard(ctx, wr, tabletAlias) | |
+ } | |
+ | |
+ dbcfgs, err := dbconfigs.Init(creds["socket"], dbconfigFlags) | |
+ exitOnError(err, "get dbconfigs") | |
+ queryServiceStats := tabletserver.NewQueryServiceStats("vtparse", false) | |
+ si := tabletserver.NewSchemaInfo("vtparse", dummyChecker{}, 0, 0, 0, make(map[string]string), false, queryServiceStats) | |
+ cfg, err := dbconfigs.WithCredentials(&dbcfgs.Dba) | |
+ si.Open(&cfg, false) | |
+ err = si.Reload(ctx) | |
+ exitOnError(err, "reload schema") | |
+ | |
+ err = topotools.RebuildVSchema(ctx, wr.Logger(), wr.TopoServer(), []string{*cell}) | |
+ exitOnError(err, "rebuild vschema") | |
+ | |
+ if *justInitialize { | |
+ time.Sleep(10 * time.Second) | |
+ fmt.Println("Initialized, exiting.") | |
+ os.Exit(0) | |
+ } | |
+ out := os.Stdout | |
+ if outFile != nil && len(*outFile) > 0 { | |
+ dir := filepath.Dir(*outFile) | |
+ os.MkdirAll(dir, 0755) | |
+ out, err = os.OpenFile(*outFile, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755) | |
+ defer out.Close() | |
+ } | |
+ | |
+ fmt.Fprintf(os.Stdout, "Reading from %d to %d\n", pos, startSize) | |
+ | |
+ var totalRead int64 | |
+ | |
+ line, read, e := readLine(reader) | |
+ | |
+ var startTime string | |
+OUTER: | |
+ for e == nil && pos <= startSize { | |
+ pos += read | |
+ totalRead += read | |
+ linesSeen++ | |
+ | |
+ matches := lineRe.FindStringSubmatch(line) | |
+ if matches != nil { | |
+ if startTime == "" { | |
+ startTime = matches[1] | |
+ } | |
+ linesMatched++ | |
+ started = true | |
+ if matches[2] != "Query" || strings.Contains(matches[3], "FLUSH LOCAL LOGS") { | |
+ linesWithoutQuery++ | |
+ line, read, e = readLine(reader) | |
+ continue | |
+ } | |
+ | |
+ if buffer.Len() > 0 { | |
+ totalQueries++ | |
+ data := string(buffer.Bytes()) | |
+ if ignored := ignored(data); ignored { | |
+ if *verbosity > 1 { | |
+ fmt.Fprintf(os.Stdout, "Skipped: %s\n", data) | |
+ } | |
+ skippedQueries++ | |
+ } else { | |
+ attemptedQueries++ | |
+ if success := tryParse(data, vs, db); success { | |
+ successfulQueries++ | |
+ } | |
+ } | |
+ buffer.Reset() | |
+ } | |
+ buffer.WriteString(matches[3] + "\n") | |
+ } else if started { | |
+ for _, pattern := range flushLocalLogsPatterns { | |
+ if pattern.MatchString(line) { | |
+ line, read, e = readLine(reader) | |
+ continue OUTER | |
+ } | |
+ } | |
+ | |
+ buffer.WriteString(line + "\n") | |
+ } | |
+ | |
+ line, read, e = readLine(reader) | |
+ } | |
+ | |
+ if len(unknownErrorCounts) > 0 { | |
+ out.WriteString("\n\n####\nUNKNOWN ERRORS\n####\n\n") | |
+ for errString, query := range unknownErrorExamples { | |
+ count := unknownErrorCounts[errString] | |
+ out.WriteString(fmt.Sprintf("\nQuery:\n%s\nError: %v\nCount: %d\n-----", query, errString, count)) | |
+ } | |
+ } | |
+ | |
+ if len(handledErrorCounts) > 0 { | |
+ out.WriteString("\n\n####\nHANDLED ERRORS\n####\n\n") | |
+ for errType, innerMap := range handledErrorExamples { | |
+ out.WriteString(fmt.Sprintf("\n##%s:\n", errType)) | |
+ for errUnique, query := range innerMap { | |
+ count := handledErrorCounts[errType][errUnique] | |
+ out.WriteString(fmt.Sprintf("\nQuery:\n%s\nError Type: %s\nUnique String: %s\nCount: %d\n-----", query, errType, errUnique, count)) | |
+ } | |
+ | |
+ } | |
+ } | |
+ | |
+ fmt.Fprintf(os.Stdout, "Finished.\n\n"+ | |
+ "Log start time: %s\n"+ | |
+ "Lines read: %d\n"+ | |
+ "Lines matched: %d\n"+ | |
+ "Lines without queries: %d\n"+ | |
+ "Queries seen: %d\n"+ | |
+ "Queries attempted: %d\n"+ | |
+ "Queries succeeded: %d\n"+ | |
+ "Queries skipped: %d\n"+ | |
+ "Unknown errors: %d\n"+ | |
+ "Ignored errors: %d\n"+ | |
+ "Handled errors: %v\n"+ | |
+ "Ignored error breakdowns: %v\n"+ | |
+ "Handled error breakdowns: %v\n"+ | |
+ "Cache size: %d\n"+ | |
+ "Bytes read: %d\n"+ | |
+ "\n", | |
+ startTime, linesSeen, linesMatched, linesWithoutQuery, | |
+ totalQueries, attemptedQueries, successfulQueries, skippedQueries, | |
+ unknownErrors, ignoredErrors, handledErrors, ignoredErrorCounts, handledErrorCounts, len(queryCache), totalRead) | |
+ | |
+ exit.Return(unknownErrors) | |
+} | |
+ | |
+func cleanupTabletAndShard(ctx context.Context, wr *wrangler.Wrangler, tabletAlias *topodata.TabletAlias) { | |
+ err := wr.DeleteTablet(ctx, tabletAlias, false) | |
+ exitOnError(err, "delete tablet") | |
+ err = wr.DeleteShard(ctx, *keyspace, "0", true, true) | |
+ exitOnError(err, "delete shard") | |
+} | |
+ | |
+func readLine(r *bufio.Reader) (string, int64, error) { | |
+ var ( | |
+ isPrefix = true | |
+ err error | |
+ line, ln []byte | |
+ ) | |
+ for isPrefix && err == nil { | |
+ line, isPrefix, err = r.ReadLine() | |
+ ln = append(ln, line...) | |
+ } | |
+ return string(ln), int64(len(ln)), err | |
+} | |
+ | |
+func getCredentials() map[string]string { | |
+ creds := make(map[string]string) | |
+ creds["user"] = *connUser | |
+ creds["password"] = *connPassword | |
+ creds["host"] = *connHost | |
+ creds["socket"] = *connSocket | |
+ | |
+ if connFile != nil && len(*connFile) > 0 { | |
+ cin, cerr := os.Open(*connFile) | |
+ exitOnError(cerr, "open conn file: %s", *connFile) | |
+ cscanner := bufio.NewScanner(cin) | |
+ | |
+ re := regexp.MustCompile("(^[^=]+)=(.+)$") | |
+ for cscanner.Scan() { | |
+ line := cscanner.Text() | |
+ matches := re.FindStringSubmatch(line) | |
+ if matches != nil && len(matches) == 3 { | |
+ creds[matches[1]] = matches[2] | |
+ } | |
+ } | |
+ | |
+ } | |
+ | |
+ // account for possibly quoted values | |
+ re := regexp.MustCompile("'|\"") | |
+ for key, value := range creds { | |
+ if len(value) == 0 { | |
+ continue | |
+ } | |
+ if re.MatchString(string(value[0])) && value[0] == value[len(value)-1] { | |
+ creds[key] = value[1 : len(value)-1] | |
+ | |
+ } | |
+ } | |
+ | |
+ // Default port | |
+ re = regexp.MustCompile(":\\d+$") | |
+ if !re.MatchString(creds["host"]) { | |
+ creds["host"] = fmt.Sprintf("%s:3306", creds["host"]) | |
+ } | |
+ | |
+ return creds | |
+} | |
+ | |
+func ignored(sql string) bool { | |
+ if ignored, ok := ignoreCache[sql]; ok { | |
+ return ignored | |
+ } | |
+ | |
+ sql = strings.TrimSpace(strings.ToLower(sql)) | |
+ shouldIgnore := strings.HasPrefix(sql, "set ") || | |
+ strings.HasPrefix(sql, "/*!40103 set time_zone='+00:00' */") || | |
+ strings.HasPrefix(sql, "/*!40100 set @@sql_mode='' */") || | |
+ strings.HasPrefix(sql, "kill ") || | |
+ strings.HasPrefix(sql, "use ") || | |
+ strings.HasPrefix(sql, "show ") || | |
+ strings.HasPrefix(sql, "begin") || | |
+ strings.HasPrefix(sql, "commit") || | |
+ strings.HasPrefix(sql, "rollback") || | |
+ strings.HasPrefix(sql, "explain") || | |
+ strings.HasPrefix(sql, "flush") || | |
+ strings.HasPrefix(sql, "lock") || | |
+ strings.HasPrefix(sql, "unlock") || | |
+ strings.HasPrefix(sql, "purge") || | |
+ strings.HasPrefix(sql, "reset") || | |
+ strings.HasPrefix(sql, "change") || | |
+ strings.HasPrefix(sql, "start") || | |
+ strings.HasPrefix(sql, "stop") || | |
+ strings.HasPrefix(sql, "declare") || | |
+ strings.HasPrefix(sql, "desc") || | |
+ strings.HasPrefix(sql, "help") || | |
+ strings.HasPrefix(sql, "create") || | |
+ strings.HasPrefix(sql, "alter") || | |
+ strings.HasPrefix(sql, "drop") || | |
+ strings.HasPrefix(sql, "rename") || | |
+ strings.HasPrefix(sql, "truncate") || | |
+ strings.HasPrefix(sql, "load") || | |
+ strings.HasPrefix(sql, "grant") || | |
+ strings.HasPrefix(sql, "revoke") || | |
+ strings.HasPrefix(sql, "analyze") || | |
+ strings.HasPrefix(sql, "check") || | |
+ strings.HasPrefix(sql, "optimize") || | |
+ strings.HasPrefix(sql, "repair") || | |
+ strings.HasPrefix(sql, "install") || | |
+ strings.HasPrefix(sql, "uninstall") || | |
+ strings.HasPrefix(sql, "savepoint") || | |
+ strings.HasPrefix(sql, "rollback savepoint") || | |
+ strings.HasPrefix(sql, "release savepoint") || | |
+ strings.Contains(sql, "memory_global_by_current_bytes") || | |
+ strings.Contains(sql, "heartbeat.heartbeat") || | |
+ strings.Contains(sql, "`heartbeat`.`heartbeat`") || | |
+ strings.Contains(sql, "`percona`.`checksums`") || | |
+ informationSchema.MatchString(sql) || | |
+ setVariables.MatchString(sql) | |
+ | |
+ ignoreCache[sql] = shouldIgnore | |
+ | |
+ return shouldIgnore | |
+} | |
+ | |
+func exitOnError(err error, msg string, args ...interface{}) { | |
+ if err != nil { | |
+ if len(args) > 0 { | |
+ msg = fmt.Sprintf(msg, args) | |
+ } | |
+ log.Errorf("Failed for: '%s', %v", msg, err) | |
+ exit.Return(1) | |
+ } | |
+} | |
+ | |
+func openToLoc(fileName string, size int64) (*os.File, int64, error) { | |
+ in, err := os.Open(fileName) | |
+ var pos int64 | |
+ if *tailBytes > 0 && *tailBytes < size { | |
+ pos, err = in.Seek((*tailBytes)*-1, io.SeekEnd) | |
+ } | |
+ return in, pos, err | |
+} | |
+ | |
+func tryParse(sql string, vs *vindexes.VSchema, db *sql.DB) bool { | |
+ if res, ok := queryCache[sql]; ok { | |
+ switch res { | |
+ case 1: | |
+ ignoredErrors++ | |
+ case 2: | |
+ handledErrors++ | |
+ case 3: | |
+ unknownErrors++ | |
+ } | |
+ return res == 0 | |
+ } | |
+ | |
+ plan, err := planbuilder.Build(sql, &wrappedVSchema{ | |
+ vschema: vs, | |
+ keyspace: sqlparser.NewTableIdent(*keyspace), | |
+ }) | |
+ | |
+ if err != nil { | |
+ if errIsIgnored(err) { | |
+ if *verbosity > 1 { | |
+ fmt.Fprintf(os.Stdout, "\nQuery:\n%s\nIgnored Error: %v\n-----", sql, err.Error()) | |
+ } | |
+ ignoredErrors++ | |
+ queryCache[sql] = 1 | |
+ } else if _, ok := unknownErrorExamples[err.Error()]; ok { | |
+ unknownErrorCounts[err.Error()]++ | |
+ } else { | |
+ if errUnique, errType, ok := errIsHandled(err.Error()); ok { | |
+ handledErrors++ | |
+ | |
+ if _, ok := handledErrorExamples[errType]; !ok { | |
+ handledErrorExamples[errType] = make(map[string]string) | |
+ handledErrorCounts[errType] = make(map[string]int) | |
+ } | |
+ handledErrorCounts[errType][errUnique]++ | |
+ if _, ok := handledErrorExamples[errType][errUnique]; !ok { | |
+ handledErrorExamples[errUnique][errUnique] = sql | |
+ } | |
+ queryCache[sql] = 2 | |
+ } else { | |
+ unknownErrors++ | |
+ unknownErrorCounts[err.Error()]++ | |
+ unknownErrorExamples[err.Error()] = sql | |
+ queryCache[sql] = 3 | |
+ } | |
+ } | |
+ return false | |
+ } | |
+ | |
+ route, _ := plan.Instructions.(*engine.Route) | |
+ | |
+ if *verbosity > 2 { | |
+ fmt.Fprintf(os.Stdout, "Original: %s\n", sql) | |
+ fmt.Fprintf(os.Stdout, "Parsed: %s\n", route.Query) | |
+ fmt.Fprintf(os.Stdout, "Field: %s\n", route.FieldQuery) | |
+ } | |
+ | |
+ if *compareExplain { | |
+ res, err := db.Query(fmt.Sprintf("EXPLAIN %s", sql)) | |
+ if err == nil || !errIsIgnored(err) { | |
+ source := parseRows(res, err) | |
+ comp := parseRows(db.Query(fmt.Sprintf("EXPLAIN %s", route.Query))) | |
+ | |
+ if !reflect.DeepEqual(source, comp) { | |
+ fmt.Fprintf(os.Stderr, "original explain: %v\n,parsed explain: %v", source, comp) | |
+ panic("Explains not equal") | |
+ } | |
+ | |
+ if len(route.FieldQuery) > 0 { | |
+ fields := parseRows(db.Query(fmt.Sprintf("EXPLAIN %s", route.FieldQuery))) | |
+ for _, field := range fields { | |
+ if !field.extra.Valid || strings.ToLower(field.extra.String) != "impossible where" { | |
+ fmt.Fprintf(os.Stderr, "field explain: %v", field) | |
+ panic("FieldQuery not simple Impossible WHERE") | |
+ } | |
+ } | |
+ } else if re := regexp.MustCompile("(?i)^\\s*INSERT|UPDATE|DELETE|SHOW|EXPLAIN|ALTER|DROP|CREATE"); !re.MatchString(sql) { | |
+ panic("query should have a field query, but doesn't") | |
+ } | |
+ } | |
+ } | |
+ | |
+ queryCache[sql] = 0 | |
+ return true | |
+ | |
+} | |
+ | |
+type explainResult struct { | |
+ id sql.NullInt64 | |
+ selectType sql.NullString | |
+ table sql.NullString | |
+ partitions sql.NullString | |
+ _type sql.NullString | |
+ possibleKeys sql.NullString | |
+ key sql.NullString | |
+ keyLen sql.NullString | |
+ ref sql.NullString | |
+ rows sql.NullInt64 | |
+ filtered sql.NullString | |
+ extra sql.NullString | |
+} | |
+ | |
+func parseRows(rows *sql.Rows, err error) []explainResult { | |
+ exitOnError(err, "executing query") | |
+ all := []explainResult{} | |
+ | |
+ count := 0 | |
+ for rows.Next() { | |
+ parsed := explainResult{} | |
+ err := rows.Scan(&parsed.id, &parsed.selectType, &parsed.table, &parsed.partitions, &parsed._type, &parsed.possibleKeys, &parsed.key, &parsed.keyLen, &parsed.ref, &parsed.rows, &parsed.filtered, &parsed.extra) | |
+ count++ | |
+ exitOnError(err, "parsing row %d", count) | |
+ all = append(all, parsed) | |
+ | |
+ } | |
+ | |
+ return all | |
+} | |
+ | |
+func errIsIgnored(err error) bool { | |
+ for _, pattern := range ignoredErrorPatterns { | |
+ match := pattern.FindStringSubmatch(err.Error()) | |
+ if match != nil { | |
+ if val, ok := ignoredErrorCounts[match[1]]; ok { | |
+ val[match[2]]++ | |
+ } else { | |
+ val := make(map[string]int) | |
+ val[match[2]]++ | |
+ ignoredErrorCounts[match[1]] = val | |
+ } | |
+ return true | |
+ } | |
+ } | |
+ | |
+ return false | |
+} | |
+ | |
+type handledError struct { | |
+ pattern *regexp.Regexp | |
+ errType string | |
+} | |
+ | |
+func errIsHandled(sql string) (string, string, bool) { | |
+ for _, handled := range handledErrorPatterns { | |
+ match := handled.pattern.FindStringSubmatch(sql) | |
+ if match != nil { | |
+ return match[1], handled.errType, true | |
+ } | |
+ } | |
+ | |
+ return "", "", false | |
+} | |
+ | |
+type wrappedVSchema struct { | |
+ vschema *vindexes.VSchema | |
+ keyspace sqlparser.TableIdent | |
+} | |
+ | |
+func (vs *wrappedVSchema) Find(keyspace, tablename sqlparser.TableIdent) (table *vindexes.Table, err error) { | |
+ if keyspace.IsEmpty() { | |
+ keyspace = vs.keyspace | |
+ } | |
+ return vs.vschema.Find(keyspace.String(), tablename.String()) | |
+} | |
+ | |
+// signal handling, centralized here | |
+func installSignalHandlers(cancel func()) { | |
+ sigChan := make(chan os.Signal, 1) | |
+ signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT) | |
+ go func() { | |
+ <-sigChan | |
+ // we got a signal, cancel the current ctx | |
+ cancel() | |
+ }() | |
+} | |
+ | |
+type dummyChecker struct { | |
+} | |
+ | |
+func (dummyChecker) CheckMySQL() {} | |
diff --git a/vendor/vendor.json b/vendor/vendor.json | |
index 129b4421b..ab97061a5 100644 | |
--- a/vendor/vendor.json | |
+++ b/vendor/vendor.json | |
@@ -341,6 +341,12 @@ | |
"revisionTime": "2016-06-01T19:11:21Z" | |
}, | |
{ | |
+ "checksumSHA1": "xmGg3ttN2R+k3oITmXDLtGXA/LA=", | |
+ "path": "github.com/go-sql-driver/mysql", | |
+ "revision": "2e00b5cd70399450106cec6431c2e2ce3cae5034", | |
+ "revisionTime": "2016-12-24T12:10:19Z" | |
+ }, | |
+ { | |
"checksumSHA1": "HmbftipkadrLlCfzzVQ+iFHbl6g=", | |
"path": "github.com/golang/glog", | |
"revision": "23def4e6c14b4da8ac2ed8007337bc5eb5007998", | |
-- | |
2.11.0 | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment