Skip to content

Instantly share code, notes, and snippets.

@yfuruyama
Last active May 13, 2019 10:02
Show Gist options
  • Save yfuruyama/aaeb2559b450674dcccd682f37e50cae to your computer and use it in GitHub Desktop.
Save yfuruyama/aaeb2559b450674dcccd682f37e50cae to your computer and use it in GitHub Desktop.
package main
import (
"context"
"flag"
"fmt"
"log"
"os"
"strings"
"time"
"cloud.google.com/go/spanner"
"go.opencensus.io/trace"
"google.golang.org/api/option"
gtransport "google.golang.org/api/transport/grpc"
sppb "google.golang.org/genproto/googleapis/spanner/v1"
)
type customExporter struct{}
// Compile time assertion that the exporter implements trace.Exporter
var _ trace.Exporter = (*customExporter)(nil)
func (cse *customExporter) ExportSpan(sd *trace.SpanData) {
// only print logs for session wait
for _, annotation := range sd.Annotations {
if strings.HasPrefix(annotation.Message, "Waiting for") {
log.Println(annotation.Message)
}
}
}
func main() {
var project string
var instance string
var database string
flag.StringVar(&project, "project", "", "")
flag.StringVar(&instance, "instance", "", "")
flag.StringVar(&database, "database", "", "")
flag.Parse()
if project == "" || instance == "" || database == "" {
flag.Usage()
os.Exit(1)
}
trace.ApplyConfig(trace.Config{DefaultSampler: trace.AlwaysSample()})
trace.RegisterExporter(new(customExporter))
ctx := context.Background()
sessionLabelKey := "client_ts"
sessionLabelValue := fmt.Sprintf("%d", time.Now().Unix())
sessionFilter := fmt.Sprintf("labels.%s:%s\n", sessionLabelKey, sessionLabelValue)
dbPath := fmt.Sprintf("projects/%s/instances/%s/databases/%s", project, instance, database)
client, err := spanner.NewClientWithConfig(ctx, dbPath, spanner.ClientConfig{
SessionPoolConfig: spanner.SessionPoolConfig{
MinOpened: 5,
MaxOpened: 30,
MaxIdle: 10,
MaxBurst: 30,
},
SessionLabels: map[string]string{
sessionLabelKey: sessionLabelValue,
},
})
if err != nil {
log.Fatal(err)
}
defer client.Close()
// do transactions after 10 sec
time.AfterFunc(time.Second*10, func() {
log.Printf("Run 100 transactions in parallel\n")
for i := 0; i < 100; i++ {
go func() {
client.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
time.Sleep(time.Second * 1)
return nil
})
}()
}
})
// check sessions periodically
for {
count, err := getCurrentSessionCount(ctx, dbPath, sessionFilter)
if err != nil {
log.Fatal(err)
}
log.Printf("Number of sessions: %d\n", count)
}
}
func getCurrentSessionCount(ctx context.Context, dbPath string, filter string) (int, error) {
allOpts := []option.ClientOption{
option.WithEndpoint("spanner.googleapis.com:443"),
}
conn, err := gtransport.Dial(ctx, allOpts...)
if err != nil {
return 0, err
}
rpcClient := sppb.NewSpannerClient(conn)
resp, err := rpcClient.ListSessions(ctx, &sppb.ListSessionsRequest{
Database: dbPath,
Filter: filter,
})
if err != nil {
return 0, err
}
return len(resp.Sessions), nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment