Created
August 30, 2019 16:43
-
-
Save dasl-/6c274a79ecb96e6d24c171aa82ac9c92 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go | |
index 0350d8bc7..b0721d10b 100644 | |
--- a/go/vt/vtgate/executor.go | |
+++ b/go/vt/vtgate/executor.go | |
@@ -649,6 +649,49 @@ func (e *Executor) handleSet(ctx context.Context, safeSession *SafeSession, sql | |
return &sqltypes.Result{}, nil | |
} | |
+// handleSetWorkload like handleSet above but handles only SET workload queries, all other sets are unsupported | |
+// needed for StreamExecute which does not support transactions | |
+func (e *Executor) handleSetWorkload(safeSession *SafeSession, sql string, logStats *LogStats) (*sqltypes.Result, error) { | |
+ vals, scope, err := sqlparser.ExtractSetValues(sql) | |
+ execStart := time.Now() | |
+ logStats.PlanTime = execStart.Sub(logStats.StartTime) | |
+ defer func() { | |
+ logStats.ExecuteTime = time.Since(execStart) | |
+ }() | |
+ | |
+ if err != nil { | |
+ return &sqltypes.Result{}, vterrors.New(vtrpcpb.Code_INVALID_ARGUMENT, err.Error()) | |
+ } | |
+ | |
+ if scope == sqlparser.GlobalStr { | |
+ return &sqltypes.Result{}, vterrors.New(vtrpcpb.Code_INVALID_ARGUMENT, "unsupported in set: global") | |
+ } | |
+ | |
+ for k, v := range vals { | |
+ if k.Scope == sqlparser.GlobalStr { | |
+ return &sqltypes.Result{}, vterrors.New(vtrpcpb.Code_INVALID_ARGUMENT, "unsupported in set: global") | |
+ } | |
+ | |
+ if k.Key != "workload" { | |
+ return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unsupported construct: %s", sql) | |
+ } | |
+ | |
+ val, ok := v.(string) | |
+ if !ok { | |
+ return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unexpected value type for workload: %T", v) | |
+ } | |
+ out, ok := querypb.ExecuteOptions_Workload_value[strings.ToUpper(val)] | |
+ if !ok { | |
+ return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid workload: %s", val) | |
+ } | |
+ if safeSession.Options == nil { | |
+ safeSession.Options = &querypb.ExecuteOptions{} | |
+ } | |
+ safeSession.Options.Workload = querypb.ExecuteOptions_Workload(out) | |
+ } | |
+ return &sqltypes.Result{}, nil | |
+} | |
+ | |
func (e *Executor) handleSetVitessMetadata(ctx context.Context, session *SafeSession, k sqlparser.SetKey, v interface{}) (*sqltypes.Result, error) { | |
//TODO(kalfonso): move to its own acl check and consolidate into an acl component that can handle multiple operations (vschema, metadata) | |
allowed := vschemaacl.Authorized(callerid.ImmediateCallerIDFromContext(ctx)) | |
@@ -1126,7 +1169,8 @@ func (e *Executor) handleComment(sql string) (*sqltypes.Result, error) { | |
// StreamExecute executes a streaming query. | |
func (e *Executor) StreamExecute(ctx context.Context, method string, safeSession *SafeSession, sql string, bindVars map[string]*querypb.BindVariable, target querypb.Target, callback func(*sqltypes.Result) error) (err error) { | |
logStats := NewLogStats(ctx, method, sql, bindVars) | |
- logStats.StmtType = sqlparser.StmtType(sqlparser.Preview(sql)) | |
+ stmtType := sqlparser.Preview(sql) | |
+ logStats.StmtType = sqlparser.StmtType(stmtType) | |
defer logStats.Send() | |
if bindVars == nil { | |
@@ -1135,10 +1179,21 @@ func (e *Executor) StreamExecute(ctx context.Context, method string, safeSession | |
query, comments := sqlparser.SplitMarginComments(sql) | |
vcursor := newVCursorImpl(ctx, safeSession, target.Keyspace, target.TabletType, comments, e, logStats) | |
- // check if this is a stream statement for messaging | |
- // TODO: support keyRange syntax | |
- if logStats.StmtType == sqlparser.StmtType(sqlparser.StmtStream) { | |
+ switch stmtType { | |
+ case sqlparser.StmtStream: | |
+ // check if this is a stream statement for messaging | |
+ // TODO: support keyRange syntax | |
return e.handleMessageStream(ctx, safeSession, sql, target, callback, vcursor, logStats) | |
+ case sqlparser.StmtSet: | |
+ // handle SET workload queries in order to switch between oltp and olap modes | |
+ result, setErr := e.handleSetWorkload(safeSession, sql, logStats) | |
+ if setErr != nil { | |
+ return setErr | |
+ } | |
+ if err := callback(result); err != nil { | |
+ return err | |
+ } | |
+ return nil | |
} | |
plan, err := e.getPlan( | |
diff --git a/go/vt/vtgate/executor_stream_test.go b/go/vt/vtgate/executor_stream_test.go | |
index ea41aa88c..fa7a86e4f 100644 | |
--- a/go/vt/vtgate/executor_stream_test.go | |
+++ b/go/vt/vtgate/executor_stream_test.go | |
@@ -23,10 +23,12 @@ import ( | |
"golang.org/x/net/context" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/vt/discovery" | |
- querypb "vitess.io/vitess/go/vt/proto/query" | |
- topodatapb "vitess.io/vitess/go/vt/proto/topodata" | |
_ "vitess.io/vitess/go/vt/vtgate/vindexes" | |
"vitess.io/vitess/go/vt/vttablet/sandboxconn" | |
+ | |
+ querypb "vitess.io/vitess/go/vt/proto/query" | |
+ topodatapb "vitess.io/vitess/go/vt/proto/topodata" | |
+ vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" | |
) | |
func TestStreamSQLUnsharded(t *testing.T) { | |
@@ -83,6 +85,54 @@ func TestStreamSQLSharded(t *testing.T) { | |
} | |
} | |
+func TestStreamSQLSet(t *testing.T) { | |
+ executor, _, _, _ := createExecutorEnv() | |
+ logChan := QueryLogger.Subscribe("Test") | |
+ defer QueryLogger.Unsubscribe(logChan) | |
+ | |
+ testcases := []struct { | |
+ in string | |
+ out *vtgatepb.Session | |
+ err string | |
+ }{{ | |
+ in: "set workload = 'unspecified'", | |
+ out: &vtgatepb.Session{Autocommit: true, Options: &querypb.ExecuteOptions{Workload: querypb.ExecuteOptions_UNSPECIFIED}}, | |
+ }, { | |
+ in: "set workload = 'oltp'", | |
+ out: &vtgatepb.Session{Autocommit: true, Options: &querypb.ExecuteOptions{Workload: querypb.ExecuteOptions_OLTP}}, | |
+ }, { | |
+ in: "set workload = 'olap'", | |
+ out: &vtgatepb.Session{Autocommit: true, Options: &querypb.ExecuteOptions{Workload: querypb.ExecuteOptions_OLAP}}, | |
+ }, { | |
+ in: "set workload = 'dba'", | |
+ out: &vtgatepb.Session{Autocommit: true, Options: &querypb.ExecuteOptions{Workload: querypb.ExecuteOptions_DBA}}, | |
+ }, { | |
+ in: "set workload = 'aa'", | |
+ err: "invalid workload: aa", | |
+ }, { | |
+ in: "set workload = 1", | |
+ err: "unexpected value type for workload: int64", | |
+ }, { | |
+ in: "set autocommit = 1", | |
+ err: "unsupported construct: set autocommit = 1", | |
+ }, { | |
+ in: "set names utf8", | |
+ err: "unsupported construct: set names utf8", | |
+ }} | |
+ for _, tcase := range testcases { | |
+ _, err := executorStreamMessages(executor, tcase.in) | |
+ if err != nil { | |
+ if err.Error() != tcase.err { | |
+ t.Errorf("%s error: %v, want %s", tcase.in, err, tcase.err) | |
+ } | |
+ continue | |
+ } | |
+ if masterSession.Options.Workload != tcase.out.Options.Workload { | |
+ t.Errorf("%s: %v, want %s", tcase.in, masterSession, tcase.out) | |
+ } | |
+ } | |
+} | |
+ | |
func executorStreamMessages(executor *Executor, sql string) (qr *sqltypes.Result, err error) { | |
results := make(chan *sqltypes.Result, 100) | |
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) | |
diff --git a/go/vt/vtgate/resolver.go b/go/vt/vtgate/resolver.go | |
index be535df38..5d83ea47a 100644 | |
--- a/go/vt/vtgate/resolver.go | |
+++ b/go/vt/vtgate/resolver.go | |
@@ -23,12 +23,14 @@ import ( | |
"sort" | |
"strings" | |
"sync" | |
+ "time" | |
"github.com/golang/protobuf/proto" | |
"golang.org/x/net/context" | |
"vitess.io/vitess/go/sqltypes" | |
"vitess.io/vitess/go/vt/key" | |
"vitess.io/vitess/go/vt/log" | |
+ "vitess.io/vitess/go/vt/sqlparser" | |
"vitess.io/vitess/go/vt/srvtopo" | |
"vitess.io/vitess/go/vt/topo" | |
"vitess.io/vitess/go/vt/vterrors" | |
@@ -250,9 +252,28 @@ func (res *Resolver) StreamExecute( | |
keyspace string, | |
tabletType topodatapb.TabletType, | |
destination key.Destination, | |
+ safeSession *SafeSession, | |
options *querypb.ExecuteOptions, | |
callback func(*sqltypes.Result) error, | |
) error { | |
+ | |
+ logStats := NewLogStats(ctx, "StreamExecute", sql, bindVars) | |
+ stmtType := sqlparser.Preview(sql) | |
+ logStats.StmtType = sqlparser.StmtType(stmtType) | |
+ defer logStats.Send() | |
+ | |
+ // handle SET workload queries in order to switch between oltp and olap modes | |
+ if stmtType == sqlparser.StmtSet { | |
+ result, setErr := res.handleSetWorkload(safeSession, sql, logStats) | |
+ if setErr != nil { | |
+ return setErr | |
+ } | |
+ if err := callback(result); err != nil { | |
+ return err | |
+ } | |
+ return nil | |
+ } | |
+ | |
rss, err := res.resolver.ResolveDestination(ctx, keyspace, tabletType, destination) | |
if err != nil { | |
return err | |
@@ -495,6 +516,49 @@ func (res *Resolver) GetGatewayCacheStatus() gateway.TabletCacheStatusList { | |
return res.scatterConn.GetGatewayCacheStatus() | |
} | |
+// handleSetWorkload like handleSet above but handles only SET workload queries, all other sets are unsupported | |
+// needed for StreamExecute which does not support transactions | |
+func (res *Resolver) handleSetWorkload(safeSession *SafeSession, sql string, logStats *LogStats) (*sqltypes.Result, error) { | |
+ vals, scope, err := sqlparser.ExtractSetValues(sql) | |
+ execStart := time.Now() | |
+ logStats.PlanTime = execStart.Sub(logStats.StartTime) | |
+ defer func() { | |
+ logStats.ExecuteTime = time.Since(execStart) | |
+ }() | |
+ | |
+ if err != nil { | |
+ return &sqltypes.Result{}, vterrors.New(vtrpcpb.Code_INVALID_ARGUMENT, err.Error()) | |
+ } | |
+ | |
+ if scope == sqlparser.GlobalStr { | |
+ return &sqltypes.Result{}, vterrors.New(vtrpcpb.Code_INVALID_ARGUMENT, "unsupported in set: global") | |
+ } | |
+ | |
+ for k, v := range vals { | |
+ if k.Scope == sqlparser.GlobalStr { | |
+ return &sqltypes.Result{}, vterrors.New(vtrpcpb.Code_INVALID_ARGUMENT, "unsupported in set: global") | |
+ } | |
+ | |
+ if k.Key != "workload" { | |
+ return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unsupported construct: %s", sql) | |
+ } | |
+ | |
+ val, ok := v.(string) | |
+ if !ok { | |
+ return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unexpected value type for workload: %T", v) | |
+ } | |
+ out, ok := querypb.ExecuteOptions_Workload_value[strings.ToUpper(val)] | |
+ if !ok { | |
+ return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid workload: %s", val) | |
+ } | |
+ if safeSession.Options == nil { | |
+ safeSession.Options = &querypb.ExecuteOptions{} | |
+ } | |
+ safeSession.Options.Workload = querypb.ExecuteOptions_Workload(out) | |
+ } | |
+ return &sqltypes.Result{}, nil | |
+} | |
+ | |
// StrsEquals compares contents of two string slices. | |
func StrsEquals(a, b []string) bool { | |
if len(a) != len(b) { | |
diff --git a/go/vt/vtgate/resolver_test.go b/go/vt/vtgate/resolver_test.go | |
index e26a77e5e..2315bcccd 100644 | |
--- a/go/vt/vtgate/resolver_test.go | |
+++ b/go/vt/vtgate/resolver_test.go | |
@@ -136,6 +136,7 @@ func TestResolverStreamExecuteKeyspaceIds(t *testing.T) { | |
keyspace, | |
topodatapb.TabletType_MASTER, | |
key.DestinationKeyspaceIDs([][]byte{{0x10}, {0x15}}), | |
+ NewSafeSession(masterSession), | |
nil, | |
func(r *sqltypes.Result) error { | |
qr.AppendResult(r) | |
@@ -151,6 +152,7 @@ func TestResolverStreamExecuteKeyspaceIds(t *testing.T) { | |
keyspace, | |
topodatapb.TabletType_MASTER, | |
key.DestinationKeyspaceIDs([][]byte{{0x10}, {0x15}, {0x25}}), | |
+ NewSafeSession(masterSession), | |
nil, | |
func(r *sqltypes.Result) error { | |
qr.AppendResult(r) | |
@@ -171,6 +173,7 @@ func TestResolverStreamExecuteKeyRanges(t *testing.T) { | |
keyspace, | |
topodatapb.TabletType_MASTER, | |
key.DestinationKeyRanges([]*topodatapb.KeyRange{{Start: []byte{0x10}, End: []byte{0x15}}}), | |
+ NewSafeSession(masterSession), | |
nil, | |
func(r *sqltypes.Result) error { | |
qr.AppendResult(r) | |
@@ -187,6 +190,7 @@ func TestResolverStreamExecuteKeyRanges(t *testing.T) { | |
keyspace, | |
topodatapb.TabletType_MASTER, | |
key.DestinationKeyRanges([]*topodatapb.KeyRange{{Start: []byte{0x10}, End: []byte{0x25}}}), | |
+ NewSafeSession(masterSession), | |
nil, | |
func(r *sqltypes.Result) error { | |
qr.AppendResult(r) | |
@@ -196,6 +200,68 @@ func TestResolverStreamExecuteKeyRanges(t *testing.T) { | |
}) | |
} | |
+func TestResolverStreamSetWorkload(t *testing.T) { | |
+ keyspace := "TestResolverStreamSetWorkload" | |
+ createSandbox(keyspace) | |
+ hc := discovery.NewFakeHealthCheck() | |
+ res := newTestResolver(hc, new(sandboxTopo), "aa") | |
+ | |
+ testcases := []struct { | |
+ in string | |
+ out *vtgatepb.Session | |
+ err string | |
+ }{{ | |
+ in: "set workload = 'unspecified'", | |
+ out: &vtgatepb.Session{Autocommit: true, Options: &querypb.ExecuteOptions{Workload: querypb.ExecuteOptions_UNSPECIFIED}}, | |
+ }, { | |
+ in: "set workload = 'oltp'", | |
+ out: &vtgatepb.Session{Autocommit: true, Options: &querypb.ExecuteOptions{Workload: querypb.ExecuteOptions_OLTP}}, | |
+ }, { | |
+ in: "set workload = 'olap'", | |
+ out: &vtgatepb.Session{Autocommit: true, Options: &querypb.ExecuteOptions{Workload: querypb.ExecuteOptions_OLAP}}, | |
+ }, { | |
+ in: "set workload = 'dba'", | |
+ out: &vtgatepb.Session{Autocommit: true, Options: &querypb.ExecuteOptions{Workload: querypb.ExecuteOptions_DBA}}, | |
+ }, { | |
+ in: "set workload = 'aa'", | |
+ err: "invalid workload: aa", | |
+ }, { | |
+ in: "set workload = 1", | |
+ err: "unexpected value type for workload: int64", | |
+ }, { | |
+ in: "set autocommit = 1", | |
+ err: "unsupported construct: set autocommit = 1", | |
+ }, { | |
+ in: "set names utf8", | |
+ err: "unsupported construct: set names utf8", | |
+ }} | |
+ for _, tcase := range testcases { | |
+ qr := new(sqltypes.Result) | |
+ err := res.StreamExecute(context.Background(), | |
+ tcase.in, | |
+ nil, | |
+ keyspace, | |
+ topodatapb.TabletType_MASTER, | |
+ key.DestinationKeyspaceIDs([][]byte{{0x10}, {0x15}}), | |
+ NewSafeSession(masterSession), | |
+ nil, | |
+ func(r *sqltypes.Result) error { | |
+ qr.AppendResult(r) | |
+ return nil | |
+ }) | |
+ | |
+ if err != nil { | |
+ if err.Error() != tcase.err { | |
+ t.Errorf("%s error: %v, want %s", tcase.in, err, tcase.err) | |
+ } | |
+ continue | |
+ } | |
+ if masterSession.Options.Workload != tcase.out.Options.Workload { | |
+ t.Errorf("%s: %v, want %s", tcase.in, masterSession, tcase.out) | |
+ } | |
+ } | |
+} | |
+ | |
func testResolverGeneric(t *testing.T, name string, action func(res *Resolver) (*sqltypes.Result, error)) { | |
t.Run("successful execute", func(t *testing.T) { | |
createSandbox(name) | |
diff --git a/go/vt/vtgate/vtgate.go b/go/vt/vtgate/vtgate.go | |
index 5b5094d96..efd03b59f 100644 | |
--- a/go/vt/vtgate/vtgate.go | |
+++ b/go/vt/vtgate/vtgate.go | |
@@ -336,6 +336,7 @@ func (vtg *VTGate) StreamExecute(ctx context.Context, session *vtgatepb.Session, | |
// In this context, we don't care if we can't fully parse destination | |
destKeyspace, destTabletType, dest, _ := vtg.executor.ParseDestinationTarget(session.TargetString) | |
statsKey := []string{"StreamExecute", destKeyspace, topoproto.TabletTypeLString(destTabletType)} | |
+ safeSession := NewSafeSession(session) | |
defer vtg.timings.Record(statsKey, time.Now()) | |
@@ -356,6 +357,7 @@ func (vtg *VTGate) StreamExecute(ctx context.Context, session *vtgatepb.Session, | |
destKeyspace, | |
destTabletType, | |
dest, | |
+ safeSession, | |
session.Options, | |
func(reply *sqltypes.Result) error { | |
vtg.rowsReturned.Add(statsKey, int64(len(reply.Rows))) | |
@@ -365,7 +367,7 @@ func (vtg *VTGate) StreamExecute(ctx context.Context, session *vtgatepb.Session, | |
err = vtg.executor.StreamExecute( | |
ctx, | |
"StreamExecute", | |
- NewSafeSession(session), | |
+ safeSession, | |
sql, | |
bindVariables, | |
querypb.Target{ | |
@@ -670,6 +672,11 @@ func (vtg *VTGate) StreamExecuteKeyspaceIds(ctx context.Context, sql string, bin | |
statsKey := []string{"StreamExecuteKeyspaceIds", keyspace, ltt} | |
defer vtg.timings.Record(statsKey, startTime) | |
+ session := &vtgatepb.Session{ | |
+ Options: options, | |
+ Autocommit: true, | |
+ } | |
+ | |
var err error | |
if bvErr := sqltypes.ValidateBindVariables(bindVariables); bvErr != nil { | |
@@ -684,6 +691,7 @@ func (vtg *VTGate) StreamExecuteKeyspaceIds(ctx context.Context, sql string, bin | |
keyspace, | |
tabletType, | |
key.DestinationKeyspaceIDs(keyspaceIds), | |
+ NewSafeSession(session), | |
options, | |
func(reply *sqltypes.Result) error { | |
vtg.rowsReturned.Add(statsKey, int64(len(reply.Rows))) | |
@@ -719,6 +727,11 @@ func (vtg *VTGate) StreamExecuteKeyRanges(ctx context.Context, sql string, bindV | |
statsKey := []string{"StreamExecuteKeyRanges", keyspace, ltt} | |
defer vtg.timings.Record(statsKey, startTime) | |
+ session := &vtgatepb.Session{ | |
+ Options: options, | |
+ Autocommit: true, | |
+ } | |
+ | |
var err error | |
if bvErr := sqltypes.ValidateBindVariables(bindVariables); bvErr != nil { | |
@@ -733,6 +746,7 @@ func (vtg *VTGate) StreamExecuteKeyRanges(ctx context.Context, sql string, bindV | |
keyspace, | |
tabletType, | |
key.DestinationKeyRanges(keyRanges), | |
+ NewSafeSession(session), | |
options, | |
func(reply *sqltypes.Result) error { | |
vtg.rowsReturned.Add(statsKey, int64(len(reply.Rows))) | |
@@ -763,6 +777,11 @@ func (vtg *VTGate) StreamExecuteShards(ctx context.Context, sql string, bindVari | |
statsKey := []string{"StreamExecuteShards", keyspace, ltt} | |
defer vtg.timings.Record(statsKey, startTime) | |
+ session := &vtgatepb.Session{ | |
+ Options: options, | |
+ Autocommit: true, | |
+ } | |
+ | |
var err error | |
if bvErr := sqltypes.ValidateBindVariables(bindVariables); bvErr != nil { | |
@@ -777,6 +796,7 @@ func (vtg *VTGate) StreamExecuteShards(ctx context.Context, sql string, bindVari | |
keyspace, | |
tabletType, | |
key.DestinationShards(shards), | |
+ NewSafeSession(session), | |
options, | |
func(reply *sqltypes.Result) error { | |
vtg.rowsReturned.Add(statsKey, int64(len(reply.Rows))) | |
diff --git a/go/vt/vttablet/tabletserver/planbuilder/plan.go b/go/vt/vttablet/tabletserver/planbuilder/plan.go | |
index b332c67dc..40e8566dc 100644 | |
--- a/go/vt/vttablet/tabletserver/planbuilder/plan.go | |
+++ b/go/vt/vttablet/tabletserver/planbuilder/plan.go | |
@@ -317,7 +317,7 @@ func BuildStreaming(sql string, tables map[string]*schema.Table) (*Plan, error) | |
if tableName := analyzeFrom(stmt.From); !tableName.IsEmpty() { | |
plan.setTable(tableName, tables) | |
} | |
- case *sqlparser.OtherRead, *sqlparser.Show, *sqlparser.Union: | |
+ case *sqlparser.OtherRead, *sqlparser.Show, *sqlparser.Union, *sqlparser.Set: | |
// pass | |
default: | |
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "'%v' not allowed for streaming", sqlparser.String(stmt)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment