Skip to content

Instantly share code, notes, and snippets.

@dasl-
Created August 30, 2019 16:43
Show Gist options
  • Save dasl-/6c274a79ecb96e6d24c171aa82ac9c92 to your computer and use it in GitHub Desktop.
Save dasl-/6c274a79ecb96e6d24c171aa82ac9c92 to your computer and use it in GitHub Desktop.
diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go
index 0350d8bc7..b0721d10b 100644
--- a/go/vt/vtgate/executor.go
+++ b/go/vt/vtgate/executor.go
@@ -649,6 +649,49 @@ func (e *Executor) handleSet(ctx context.Context, safeSession *SafeSession, sql
return &sqltypes.Result{}, nil
}
+// handleSetWorkload like handleSet above but handles only SET workload queries, all other sets are unsupported
+// needed for StreamExecute which does not support transactions
+func (e *Executor) handleSetWorkload(safeSession *SafeSession, sql string, logStats *LogStats) (*sqltypes.Result, error) {
+ vals, scope, err := sqlparser.ExtractSetValues(sql)
+ execStart := time.Now()
+ logStats.PlanTime = execStart.Sub(logStats.StartTime)
+ defer func() {
+ logStats.ExecuteTime = time.Since(execStart)
+ }()
+
+ if err != nil {
+ return &sqltypes.Result{}, vterrors.New(vtrpcpb.Code_INVALID_ARGUMENT, err.Error())
+ }
+
+ if scope == sqlparser.GlobalStr {
+ return &sqltypes.Result{}, vterrors.New(vtrpcpb.Code_INVALID_ARGUMENT, "unsupported in set: global")
+ }
+
+ for k, v := range vals {
+ if k.Scope == sqlparser.GlobalStr {
+ return &sqltypes.Result{}, vterrors.New(vtrpcpb.Code_INVALID_ARGUMENT, "unsupported in set: global")
+ }
+
+ if k.Key != "workload" {
+ return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unsupported construct: %s", sql)
+ }
+
+ val, ok := v.(string)
+ if !ok {
+ return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unexpected value type for workload: %T", v)
+ }
+ out, ok := querypb.ExecuteOptions_Workload_value[strings.ToUpper(val)]
+ if !ok {
+ return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid workload: %s", val)
+ }
+ if safeSession.Options == nil {
+ safeSession.Options = &querypb.ExecuteOptions{}
+ }
+ safeSession.Options.Workload = querypb.ExecuteOptions_Workload(out)
+ }
+ return &sqltypes.Result{}, nil
+}
+
func (e *Executor) handleSetVitessMetadata(ctx context.Context, session *SafeSession, k sqlparser.SetKey, v interface{}) (*sqltypes.Result, error) {
//TODO(kalfonso): move to its own acl check and consolidate into an acl component that can handle multiple operations (vschema, metadata)
allowed := vschemaacl.Authorized(callerid.ImmediateCallerIDFromContext(ctx))
@@ -1126,7 +1169,8 @@ func (e *Executor) handleComment(sql string) (*sqltypes.Result, error) {
// StreamExecute executes a streaming query.
func (e *Executor) StreamExecute(ctx context.Context, method string, safeSession *SafeSession, sql string, bindVars map[string]*querypb.BindVariable, target querypb.Target, callback func(*sqltypes.Result) error) (err error) {
logStats := NewLogStats(ctx, method, sql, bindVars)
- logStats.StmtType = sqlparser.StmtType(sqlparser.Preview(sql))
+ stmtType := sqlparser.Preview(sql)
+ logStats.StmtType = sqlparser.StmtType(stmtType)
defer logStats.Send()
if bindVars == nil {
@@ -1135,10 +1179,21 @@ func (e *Executor) StreamExecute(ctx context.Context, method string, safeSession
query, comments := sqlparser.SplitMarginComments(sql)
vcursor := newVCursorImpl(ctx, safeSession, target.Keyspace, target.TabletType, comments, e, logStats)
- // check if this is a stream statement for messaging
- // TODO: support keyRange syntax
- if logStats.StmtType == sqlparser.StmtType(sqlparser.StmtStream) {
+ switch stmtType {
+ case sqlparser.StmtStream:
+ // check if this is a stream statement for messaging
+ // TODO: support keyRange syntax
return e.handleMessageStream(ctx, safeSession, sql, target, callback, vcursor, logStats)
+ case sqlparser.StmtSet:
+ // handle SET workload queries in order to switch between oltp and olap modes
+ result, setErr := e.handleSetWorkload(safeSession, sql, logStats)
+ if setErr != nil {
+ return setErr
+ }
+ if err := callback(result); err != nil {
+ return err
+ }
+ return nil
}
plan, err := e.getPlan(
diff --git a/go/vt/vtgate/executor_stream_test.go b/go/vt/vtgate/executor_stream_test.go
index ea41aa88c..fa7a86e4f 100644
--- a/go/vt/vtgate/executor_stream_test.go
+++ b/go/vt/vtgate/executor_stream_test.go
@@ -23,10 +23,12 @@ import (
"golang.org/x/net/context"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/discovery"
- querypb "vitess.io/vitess/go/vt/proto/query"
- topodatapb "vitess.io/vitess/go/vt/proto/topodata"
_ "vitess.io/vitess/go/vt/vtgate/vindexes"
"vitess.io/vitess/go/vt/vttablet/sandboxconn"
+
+ querypb "vitess.io/vitess/go/vt/proto/query"
+ topodatapb "vitess.io/vitess/go/vt/proto/topodata"
+ vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
)
func TestStreamSQLUnsharded(t *testing.T) {
@@ -83,6 +85,54 @@ func TestStreamSQLSharded(t *testing.T) {
}
}
+func TestStreamSQLSet(t *testing.T) {
+ executor, _, _, _ := createExecutorEnv()
+ logChan := QueryLogger.Subscribe("Test")
+ defer QueryLogger.Unsubscribe(logChan)
+
+ testcases := []struct {
+ in string
+ out *vtgatepb.Session
+ err string
+ }{{
+ in: "set workload = 'unspecified'",
+ out: &vtgatepb.Session{Autocommit: true, Options: &querypb.ExecuteOptions{Workload: querypb.ExecuteOptions_UNSPECIFIED}},
+ }, {
+ in: "set workload = 'oltp'",
+ out: &vtgatepb.Session{Autocommit: true, Options: &querypb.ExecuteOptions{Workload: querypb.ExecuteOptions_OLTP}},
+ }, {
+ in: "set workload = 'olap'",
+ out: &vtgatepb.Session{Autocommit: true, Options: &querypb.ExecuteOptions{Workload: querypb.ExecuteOptions_OLAP}},
+ }, {
+ in: "set workload = 'dba'",
+ out: &vtgatepb.Session{Autocommit: true, Options: &querypb.ExecuteOptions{Workload: querypb.ExecuteOptions_DBA}},
+ }, {
+ in: "set workload = 'aa'",
+ err: "invalid workload: aa",
+ }, {
+ in: "set workload = 1",
+ err: "unexpected value type for workload: int64",
+ }, {
+ in: "set autocommit = 1",
+ err: "unsupported construct: set autocommit = 1",
+ }, {
+ in: "set names utf8",
+ err: "unsupported construct: set names utf8",
+ }}
+ for _, tcase := range testcases {
+ _, err := executorStreamMessages(executor, tcase.in)
+ if err != nil {
+ if err.Error() != tcase.err {
+ t.Errorf("%s error: %v, want %s", tcase.in, err, tcase.err)
+ }
+ continue
+ }
+ if masterSession.Options.Workload != tcase.out.Options.Workload {
+ t.Errorf("%s: %v, want %s", tcase.in, masterSession, tcase.out)
+ }
+ }
+}
+
func executorStreamMessages(executor *Executor, sql string) (qr *sqltypes.Result, err error) {
results := make(chan *sqltypes.Result, 100)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
diff --git a/go/vt/vtgate/resolver.go b/go/vt/vtgate/resolver.go
index be535df38..5d83ea47a 100644
--- a/go/vt/vtgate/resolver.go
+++ b/go/vt/vtgate/resolver.go
@@ -23,12 +23,14 @@ import (
"sort"
"strings"
"sync"
+ "time"
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
+ "vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/srvtopo"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vterrors"
@@ -250,9 +252,28 @@ func (res *Resolver) StreamExecute(
keyspace string,
tabletType topodatapb.TabletType,
destination key.Destination,
+ safeSession *SafeSession,
options *querypb.ExecuteOptions,
callback func(*sqltypes.Result) error,
) error {
+
+ logStats := NewLogStats(ctx, "StreamExecute", sql, bindVars)
+ stmtType := sqlparser.Preview(sql)
+ logStats.StmtType = sqlparser.StmtType(stmtType)
+ defer logStats.Send()
+
+ // handle SET workload queries in order to switch between oltp and olap modes
+ if stmtType == sqlparser.StmtSet {
+ result, setErr := res.handleSetWorkload(safeSession, sql, logStats)
+ if setErr != nil {
+ return setErr
+ }
+ if err := callback(result); err != nil {
+ return err
+ }
+ return nil
+ }
+
rss, err := res.resolver.ResolveDestination(ctx, keyspace, tabletType, destination)
if err != nil {
return err
@@ -495,6 +516,49 @@ func (res *Resolver) GetGatewayCacheStatus() gateway.TabletCacheStatusList {
return res.scatterConn.GetGatewayCacheStatus()
}
+// handleSetWorkload like handleSet above but handles only SET workload queries, all other sets are unsupported
+// needed for StreamExecute which does not support transactions
+func (res *Resolver) handleSetWorkload(safeSession *SafeSession, sql string, logStats *LogStats) (*sqltypes.Result, error) {
+ vals, scope, err := sqlparser.ExtractSetValues(sql)
+ execStart := time.Now()
+ logStats.PlanTime = execStart.Sub(logStats.StartTime)
+ defer func() {
+ logStats.ExecuteTime = time.Since(execStart)
+ }()
+
+ if err != nil {
+ return &sqltypes.Result{}, vterrors.New(vtrpcpb.Code_INVALID_ARGUMENT, err.Error())
+ }
+
+ if scope == sqlparser.GlobalStr {
+ return &sqltypes.Result{}, vterrors.New(vtrpcpb.Code_INVALID_ARGUMENT, "unsupported in set: global")
+ }
+
+ for k, v := range vals {
+ if k.Scope == sqlparser.GlobalStr {
+ return &sqltypes.Result{}, vterrors.New(vtrpcpb.Code_INVALID_ARGUMENT, "unsupported in set: global")
+ }
+
+ if k.Key != "workload" {
+ return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unsupported construct: %s", sql)
+ }
+
+ val, ok := v.(string)
+ if !ok {
+ return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unexpected value type for workload: %T", v)
+ }
+ out, ok := querypb.ExecuteOptions_Workload_value[strings.ToUpper(val)]
+ if !ok {
+ return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid workload: %s", val)
+ }
+ if safeSession.Options == nil {
+ safeSession.Options = &querypb.ExecuteOptions{}
+ }
+ safeSession.Options.Workload = querypb.ExecuteOptions_Workload(out)
+ }
+ return &sqltypes.Result{}, nil
+}
+
// StrsEquals compares contents of two string slices.
func StrsEquals(a, b []string) bool {
if len(a) != len(b) {
diff --git a/go/vt/vtgate/resolver_test.go b/go/vt/vtgate/resolver_test.go
index e26a77e5e..2315bcccd 100644
--- a/go/vt/vtgate/resolver_test.go
+++ b/go/vt/vtgate/resolver_test.go
@@ -136,6 +136,7 @@ func TestResolverStreamExecuteKeyspaceIds(t *testing.T) {
keyspace,
topodatapb.TabletType_MASTER,
key.DestinationKeyspaceIDs([][]byte{{0x10}, {0x15}}),
+ NewSafeSession(masterSession),
nil,
func(r *sqltypes.Result) error {
qr.AppendResult(r)
@@ -151,6 +152,7 @@ func TestResolverStreamExecuteKeyspaceIds(t *testing.T) {
keyspace,
topodatapb.TabletType_MASTER,
key.DestinationKeyspaceIDs([][]byte{{0x10}, {0x15}, {0x25}}),
+ NewSafeSession(masterSession),
nil,
func(r *sqltypes.Result) error {
qr.AppendResult(r)
@@ -171,6 +173,7 @@ func TestResolverStreamExecuteKeyRanges(t *testing.T) {
keyspace,
topodatapb.TabletType_MASTER,
key.DestinationKeyRanges([]*topodatapb.KeyRange{{Start: []byte{0x10}, End: []byte{0x15}}}),
+ NewSafeSession(masterSession),
nil,
func(r *sqltypes.Result) error {
qr.AppendResult(r)
@@ -187,6 +190,7 @@ func TestResolverStreamExecuteKeyRanges(t *testing.T) {
keyspace,
topodatapb.TabletType_MASTER,
key.DestinationKeyRanges([]*topodatapb.KeyRange{{Start: []byte{0x10}, End: []byte{0x25}}}),
+ NewSafeSession(masterSession),
nil,
func(r *sqltypes.Result) error {
qr.AppendResult(r)
@@ -196,6 +200,68 @@ func TestResolverStreamExecuteKeyRanges(t *testing.T) {
})
}
+func TestResolverStreamSetWorkload(t *testing.T) {
+ keyspace := "TestResolverStreamSetWorkload"
+ createSandbox(keyspace)
+ hc := discovery.NewFakeHealthCheck()
+ res := newTestResolver(hc, new(sandboxTopo), "aa")
+
+ testcases := []struct {
+ in string
+ out *vtgatepb.Session
+ err string
+ }{{
+ in: "set workload = 'unspecified'",
+ out: &vtgatepb.Session{Autocommit: true, Options: &querypb.ExecuteOptions{Workload: querypb.ExecuteOptions_UNSPECIFIED}},
+ }, {
+ in: "set workload = 'oltp'",
+ out: &vtgatepb.Session{Autocommit: true, Options: &querypb.ExecuteOptions{Workload: querypb.ExecuteOptions_OLTP}},
+ }, {
+ in: "set workload = 'olap'",
+ out: &vtgatepb.Session{Autocommit: true, Options: &querypb.ExecuteOptions{Workload: querypb.ExecuteOptions_OLAP}},
+ }, {
+ in: "set workload = 'dba'",
+ out: &vtgatepb.Session{Autocommit: true, Options: &querypb.ExecuteOptions{Workload: querypb.ExecuteOptions_DBA}},
+ }, {
+ in: "set workload = 'aa'",
+ err: "invalid workload: aa",
+ }, {
+ in: "set workload = 1",
+ err: "unexpected value type for workload: int64",
+ }, {
+ in: "set autocommit = 1",
+ err: "unsupported construct: set autocommit = 1",
+ }, {
+ in: "set names utf8",
+ err: "unsupported construct: set names utf8",
+ }}
+ for _, tcase := range testcases {
+ qr := new(sqltypes.Result)
+ err := res.StreamExecute(context.Background(),
+ tcase.in,
+ nil,
+ keyspace,
+ topodatapb.TabletType_MASTER,
+ key.DestinationKeyspaceIDs([][]byte{{0x10}, {0x15}}),
+ NewSafeSession(masterSession),
+ nil,
+ func(r *sqltypes.Result) error {
+ qr.AppendResult(r)
+ return nil
+ })
+
+ if err != nil {
+ if err.Error() != tcase.err {
+ t.Errorf("%s error: %v, want %s", tcase.in, err, tcase.err)
+ }
+ continue
+ }
+ if masterSession.Options.Workload != tcase.out.Options.Workload {
+ t.Errorf("%s: %v, want %s", tcase.in, masterSession, tcase.out)
+ }
+ }
+}
+
func testResolverGeneric(t *testing.T, name string, action func(res *Resolver) (*sqltypes.Result, error)) {
t.Run("successful execute", func(t *testing.T) {
createSandbox(name)
diff --git a/go/vt/vtgate/vtgate.go b/go/vt/vtgate/vtgate.go
index 5b5094d96..efd03b59f 100644
--- a/go/vt/vtgate/vtgate.go
+++ b/go/vt/vtgate/vtgate.go
@@ -336,6 +336,7 @@ func (vtg *VTGate) StreamExecute(ctx context.Context, session *vtgatepb.Session,
// In this context, we don't care if we can't fully parse destination
destKeyspace, destTabletType, dest, _ := vtg.executor.ParseDestinationTarget(session.TargetString)
statsKey := []string{"StreamExecute", destKeyspace, topoproto.TabletTypeLString(destTabletType)}
+ safeSession := NewSafeSession(session)
defer vtg.timings.Record(statsKey, time.Now())
@@ -356,6 +357,7 @@ func (vtg *VTGate) StreamExecute(ctx context.Context, session *vtgatepb.Session,
destKeyspace,
destTabletType,
dest,
+ safeSession,
session.Options,
func(reply *sqltypes.Result) error {
vtg.rowsReturned.Add(statsKey, int64(len(reply.Rows)))
@@ -365,7 +367,7 @@ func (vtg *VTGate) StreamExecute(ctx context.Context, session *vtgatepb.Session,
err = vtg.executor.StreamExecute(
ctx,
"StreamExecute",
- NewSafeSession(session),
+ safeSession,
sql,
bindVariables,
querypb.Target{
@@ -670,6 +672,11 @@ func (vtg *VTGate) StreamExecuteKeyspaceIds(ctx context.Context, sql string, bin
statsKey := []string{"StreamExecuteKeyspaceIds", keyspace, ltt}
defer vtg.timings.Record(statsKey, startTime)
+ session := &vtgatepb.Session{
+ Options: options,
+ Autocommit: true,
+ }
+
var err error
if bvErr := sqltypes.ValidateBindVariables(bindVariables); bvErr != nil {
@@ -684,6 +691,7 @@ func (vtg *VTGate) StreamExecuteKeyspaceIds(ctx context.Context, sql string, bin
keyspace,
tabletType,
key.DestinationKeyspaceIDs(keyspaceIds),
+ NewSafeSession(session),
options,
func(reply *sqltypes.Result) error {
vtg.rowsReturned.Add(statsKey, int64(len(reply.Rows)))
@@ -719,6 +727,11 @@ func (vtg *VTGate) StreamExecuteKeyRanges(ctx context.Context, sql string, bindV
statsKey := []string{"StreamExecuteKeyRanges", keyspace, ltt}
defer vtg.timings.Record(statsKey, startTime)
+ session := &vtgatepb.Session{
+ Options: options,
+ Autocommit: true,
+ }
+
var err error
if bvErr := sqltypes.ValidateBindVariables(bindVariables); bvErr != nil {
@@ -733,6 +746,7 @@ func (vtg *VTGate) StreamExecuteKeyRanges(ctx context.Context, sql string, bindV
keyspace,
tabletType,
key.DestinationKeyRanges(keyRanges),
+ NewSafeSession(session),
options,
func(reply *sqltypes.Result) error {
vtg.rowsReturned.Add(statsKey, int64(len(reply.Rows)))
@@ -763,6 +777,11 @@ func (vtg *VTGate) StreamExecuteShards(ctx context.Context, sql string, bindVari
statsKey := []string{"StreamExecuteShards", keyspace, ltt}
defer vtg.timings.Record(statsKey, startTime)
+ session := &vtgatepb.Session{
+ Options: options,
+ Autocommit: true,
+ }
+
var err error
if bvErr := sqltypes.ValidateBindVariables(bindVariables); bvErr != nil {
@@ -777,6 +796,7 @@ func (vtg *VTGate) StreamExecuteShards(ctx context.Context, sql string, bindVari
keyspace,
tabletType,
key.DestinationShards(shards),
+ NewSafeSession(session),
options,
func(reply *sqltypes.Result) error {
vtg.rowsReturned.Add(statsKey, int64(len(reply.Rows)))
diff --git a/go/vt/vttablet/tabletserver/planbuilder/plan.go b/go/vt/vttablet/tabletserver/planbuilder/plan.go
index b332c67dc..40e8566dc 100644
--- a/go/vt/vttablet/tabletserver/planbuilder/plan.go
+++ b/go/vt/vttablet/tabletserver/planbuilder/plan.go
@@ -317,7 +317,7 @@ func BuildStreaming(sql string, tables map[string]*schema.Table) (*Plan, error)
if tableName := analyzeFrom(stmt.From); !tableName.IsEmpty() {
plan.setTable(tableName, tables)
}
- case *sqlparser.OtherRead, *sqlparser.Show, *sqlparser.Union:
+ case *sqlparser.OtherRead, *sqlparser.Show, *sqlparser.Union, *sqlparser.Set:
// pass
default:
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "'%v' not allowed for streaming", sqlparser.String(stmt))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment