Last active
April 29, 2022 00:27
-
-
Save stevekuznetsov/5063c6863e4d2d9a900a2aaeca2f89f6 to your computer and use it in GitHub Desktop.
CRDB Contention Reproducer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"flag" | |
"fmt" | |
"net/url" | |
"strings" | |
"sync" | |
"time" | |
"github.com/cockroachdb/apd" | |
"github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgx" | |
"github.com/cockroachdb/cockroach-go/v2/testserver" | |
"github.com/google/uuid" | |
"github.com/jackc/pgx/v4" | |
"github.com/jackc/pgx/v4/log/logrusadapter" | |
"github.com/jackc/pgx/v4/pgxpool" | |
"github.com/sirupsen/logrus" | |
) | |
var sqlPort int | |
func main() { | |
flag.IntVar(&sqlPort, "sql-port", 0, "Connection port to CRDB cluster.") | |
flag.Parse() | |
var conn string | |
if sqlPort == 0 { | |
ts, err := testserver.NewTestServer(testserver.ExposeConsoleOpt(8080)) | |
if err != nil { | |
logrus.WithError(err).Fatal("failed to start crdb") | |
} | |
defer func() { | |
ts.Stop() | |
}() | |
conn = ts.PGURL().String() | |
} else { | |
u := url.URL{ | |
Scheme: "postgresql", | |
User: url.User("root"), | |
Host: fmt.Sprintf("localhost:%d", sqlPort), | |
Path: "defaultdb", | |
} | |
conn = u.String() | |
} | |
ctx := context.Background() | |
cfg, err := pgxpool.ParseConfig(conn) | |
if err != nil { | |
logrus.WithError(err).Fatal("failed to parse test connection") | |
} | |
cfg.ConnConfig.LogLevel = pgx.LogLevelError | |
cfg.ConnConfig.Logger = logrusadapter.NewLogger(logrus.New()) | |
cfg.MaxConns = 128 | |
client, err := pgxpool.ConnectConfig(ctx, cfg) | |
if err != nil { | |
logrus.WithError(err).Fatal("failed to connect to crdb") | |
} | |
for _, stmt := range []string{ | |
`CREATE TABLE IF NOT EXISTS kv | |
( | |
key VARCHAR(512) NOT NULL PRIMARY KEY, | |
value BLOB NOT NULL | |
);`, | |
`CREATE TABLE IF NOT EXISTS test | |
( | |
id UUID DEFAULT gen_random_uuid() NOT NULL PRIMARY KEY | |
);`, | |
`SET CLUSTER SETTING kv.rangefeed.enabled = true;`, | |
`SET CLUSTER SETTING changefeed.experimental_poll_interval = '0.2s';`, | |
} { | |
if _, err := client.Exec(ctx, stmt); err != nil { | |
logrus.WithError(err).Fatal("error initializing the database") | |
} | |
} | |
const numEvents = 100000 | |
const parallelWorkers = 100 | |
for name, op := range map[string]func(context.Context) error { | |
"raw insertions": raw(client), | |
//"with selects": withRead(client), | |
//"with transaction": withTxn(client), | |
} { | |
if err := do(ctx, numEvents, parallelWorkers, op, name); err != nil { | |
logrus.WithError(err).Fatal("failed to interact with CRDB") | |
} | |
} | |
select { | |
case <-time.After(120 * time.Second): | |
case <-ctx.Done(): | |
} | |
} | |
// raw inserts, no contention since we have no SELECT | |
func raw(client *pgxpool.Pool) func(ctx context.Context) error { | |
return func(ctx context.Context) error { | |
_, err := client.Exec(ctx, `INSERT INTO test (id) VALUES (DEFAULT);`) | |
return err | |
} | |
} | |
// withRead has a SELECT afterwards, no contention | |
func withRead(client *pgxpool.Pool) func(ctx context.Context) error { | |
return func(ctx context.Context) error { | |
var id uuid.UUID | |
err := client.QueryRow(ctx, `INSERT INTO test (id) VALUES (DEFAULT) RETURNING id;`).Scan(&id) | |
if err != nil { | |
return err | |
} | |
var hybridLogicalTimestamp apd.Decimal | |
return client.QueryRow(ctx, `SELECT crdb_internal_mvcc_timestamp FROM test WHERE id=$1;`, id).Scan(&hybridLogicalTimestamp) | |
} | |
} | |
// withTxn has a transaction on the insert, all of a sudden we see contention on the SELECT | |
func withTxn(client *pgxpool.Pool) func(ctx context.Context) error { | |
return func(ctx context.Context) error { | |
var id uuid.UUID | |
if err := crdbpgx.ExecuteTx(ctx, client, pgx.TxOptions{}, func(tx pgx.Tx) error { | |
if _, err := tx.Exec(ctx, `INSERT INTO kv (key, value) VALUES ($1, $2);`, uuid.New().String(), strings.Repeat("x", 10000)); err != nil { | |
return err | |
} | |
return tx.QueryRow(ctx, `INSERT INTO test (id) VALUES (DEFAULT) RETURNING id;`).Scan(&id) | |
}); err != nil { | |
return err | |
} | |
var hybridLogicalTimestamp apd.Decimal | |
return client.QueryRow(ctx, `SELECT crdb_internal_mvcc_timestamp FROM test WHERE id=$1;`, id).Scan(&hybridLogicalTimestamp) | |
} | |
} | |
func do(ctx context.Context, iterations, parallelism int, operation func(context.Context) error, msg string) error { | |
var durations []time.Duration | |
sink := make(chan time.Duration) | |
wg := sync.WaitGroup{} | |
wg.Add(1) | |
go func() { | |
defer wg.Done() | |
for { | |
select { | |
case <-ctx.Done(): | |
return | |
case duration, ok := <-sink: | |
if !ok { | |
return | |
} | |
durations = append(durations, duration) | |
if len(durations)%(iterations/10) == 0 { | |
summarize(durations, "processing metrics") | |
} | |
} | |
} | |
}() | |
if err := workers(ctx, iterations, parallelism, func(ctx context.Context) error { | |
before := time.Now() | |
err := operation(ctx) | |
duration := time.Since(before) | |
select { | |
case <-ctx.Done(): | |
return ctx.Err() | |
case sink <- duration: | |
} | |
return err | |
}); err != nil { | |
return err | |
} | |
close(sink) | |
wg.Wait() | |
summarize(durations, msg) | |
return nil | |
} | |
func workers(ctx context.Context, iterations, parallelism int, operation func(context.Context) error) error { | |
wg := &sync.WaitGroup{} | |
operations := make(chan struct{}) | |
for i := 0; i < parallelism; i++ { | |
wg.Add(1) | |
go func() { | |
defer wg.Done() | |
for { | |
select { | |
case <-ctx.Done(): | |
return | |
case _, ok := <-operations: | |
if !ok { | |
return | |
} | |
if err := operation(ctx); err != nil { | |
logrus.WithError(err).Error("failed to interact with the API server") | |
} | |
} | |
} | |
}() | |
} | |
start := time.Now() | |
for i := 0; i < iterations; i++ { | |
start = display(i, iterations, start) | |
select { | |
case <-ctx.Done(): | |
return ctx.Err() | |
case operations <- struct{}{}: | |
} | |
} | |
close(operations) | |
wg.Wait() | |
return nil | |
} | |
func display(i, max int, start time.Time) time.Time { | |
if (i+1)%(max/10) == 0 { | |
logrus.Infof("progress: %d/%d (%.0f%%); %s per ", i+1, max, 100*(float64(i+1)/float64(max)), time.Since(start)/time.Duration(max/10)) | |
return time.Now() | |
} | |
return start | |
} | |
func summarize(durations []time.Duration, msg string) { | |
var all time.Duration | |
for _, duration := range durations { | |
all += duration | |
} | |
logrus.WithFields(logrus.Fields{ | |
"count": len(durations), | |
"mean": all / time.Duration(len(durations)), | |
}).Info(msg) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment