Created
October 15, 2021 08:31
-
-
Save henvic/3a4873734c4ffd1ff066f797280d0f41 to your computer and use it in GitHub Desktop.
search test infrastructure with Replica still using testing.TB (to be replaced)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Package searchtest can be used to write integration tests with OpenSearch and search-replica. | |
// | |
// You can set the following environment variables to control logging: | |
// VERBOSE_OPENSEARCH=true to enable logging of requests and responses made with the OpenSearch client. | |
// VERBOSE_SEARCH_REPLICA=true to print verbose output from the search-replica process. | |
// | |
// Use If search-replica isn't on your $PATH or to run a different command, you can set the environment variable: | |
// SEARCH_REPLICA_COMMAND=search-replica | |
// | |
// search-replica works subscribing to a PostgreSQL publication using logical replication. | |
// https://www.postgresql.org/docs/current/logical-replication.html | |
// | |
// The easiest way to have OpenSearch running on your machine is with Docker composer. | |
// For more information, see https://opensearch.org/downloads.html | |
// | |
// Install search-replica with: | |
// $ go install github.com/hatch-studio/search-replica | |
package searchtest | |
import ( | |
"bytes" | |
"context" | |
"crypto/tls" | |
"fmt" | |
"io" | |
"net" | |
"net/http" | |
"os" | |
"os/exec" | |
"strings" | |
"syscall" | |
"testing" | |
"time" | |
"github.com/opensearch-project/opensearch-go" | |
"github.com/opensearch-project/opensearch-go/opensearchapi" | |
"github.com/opensearch-project/opensearch-go/opensearchtransport" | |
) | |
var ( | |
// IndexPrefix for the search used for integration tests. | |
// In practice, it has the same value as sqltest.DatabasePrefix. | |
IndexPrefix = "test" | |
// The following unexported variables are controlled by environment variables. | |
// They're unexported as providing multiple ways to set them is a source of confusion. | |
verboseOpenSearch bool | |
verboseSearchReplica bool | |
) | |
func init() { | |
if v, ok := os.LookupEnv("VERBOSE_OPENSEARCH"); ok && v != "false" { | |
verboseOpenSearch = true | |
} | |
if v, ok := os.LookupEnv("VERBOSE_SEARCH_REPLICA"); ok && v != "false" { | |
verboseSearchReplica = true | |
} | |
} | |
// DefaultClient for OpenSearch. | |
// Not safe for use on production code due to allowing insecure TLS connections. | |
func DefaultClient(t testing.TB) *opensearch.Client { | |
cfg := opensearch.Config{ | |
Addresses: []string{os.Getenv("OPENSEARCH_URL")}, | |
Username: os.Getenv("OPENSEARCH_USERNAME"), | |
Password: os.Getenv("OPENSEARCH_PASSWORD"), | |
Transport: searchTransport, | |
} | |
if verboseOpenSearch { | |
cfg.Logger = &opensearchtransport.CurlLogger{ | |
Output: Logger(t), | |
EnableRequestBody: true, | |
EnableResponseBody: true, | |
} | |
} | |
search, err := opensearch.NewClient(cfg) | |
if err != nil { | |
t.Fatalf("Error creating the client: %s", err) | |
} | |
return search | |
} | |
// searchTransport is almost the same as http.DefaultTransport | |
// except for not verifying HTTPS connections, so that the developer and CI might use OpenSearch right away without configuring certificates. | |
var searchTransport = &http.Transport{ | |
Proxy: http.ProxyFromEnvironment, | |
DialContext: (&net.Dialer{ | |
Timeout: 30 * time.Second, | |
KeepAlive: 30 * time.Second, | |
}).DialContext, | |
TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, | |
ForceAttemptHTTP2: true, | |
MaxIdleConns: 100, | |
IdleConnTimeout: 90 * time.Second, | |
TLSHandshakeTimeout: 10 * time.Second, | |
ExpectContinueTimeout: 1 * time.Second, | |
} | |
// Replica manages the search-replica process that is called when tests are executed. | |
type Replica struct { | |
// Publication is essentially a group of PostgreSQL tables whose data changes are intended to be replicated through logical replication. | |
Publication string | |
// Slot that search-replica should use. | |
Slot string | |
// GracefulStop can be set to call t.Cleanup automatically once tests are done. | |
// If this field is not set, you must call Stop() manually. | |
// If search-replica cannot be stopped gracefully within this duration, it's killed. | |
GracefulStop time.Duration | |
// cmd is the search-replica process. | |
cmd *exec.Cmd | |
// test controlling the replica. | |
t testing.TB | |
// stopCtx manages graceful shutdown of the search-replica process. | |
stopCtx context.Context | |
} | |
// Start search-replica. | |
// It waits until receiving an indication that that streaming replication has started before returning. | |
// | |
// The field StopTimeout can be set to stop execution automatically once tests are done. | |
func (r *Replica) Start(ctx context.Context, t testing.TB) { | |
if r.t != nil { | |
panic("search-replica runner already started") | |
} | |
command := os.Getenv("SEARCH_REPLICA_COMMAND") | |
if command == "" { | |
command = "search-replica" | |
} | |
var stopCancel context.CancelFunc | |
r.stopCtx, stopCancel = context.WithCancel(ctx) | |
p, err := exec.LookPath(command) | |
if err != nil { | |
t.Fatalf("cannot find search-replica on your system: %v", err) | |
} | |
r.cmd = exec.CommandContext(r.stopCtx, p, "-recreate") | |
r.cmd.Env = append( | |
os.Environ(), | |
"SEARCH_PUSH_PERIOD=1s", | |
fmt.Sprintf("PG_PUBLICATION=%s", r.Publication), | |
fmt.Sprintf("PG_SLOT=%s", r.Slot), | |
) | |
// Wait until search-replica announces it's ready. | |
var success bool | |
done := make(chan struct{}, 1) | |
ew := &expectWriter{ | |
find: []byte("Started streaming replication"), | |
ready: func() { | |
success = true | |
done <- struct{}{} | |
}, | |
} | |
if verboseSearchReplica { | |
r.cmd.Stdout = os.Stdout | |
r.cmd.Stderr = io.MultiWriter(os.Stderr, ew) | |
} else { | |
r.cmd.Stderr = ew | |
} | |
go func() { | |
defer stopCancel() | |
t.Log("starting up search-replica") | |
if err := r.cmd.Run(); err != nil && !strings.Contains(err.Error(), "signal: terminated") { | |
t.Errorf("search-replica died: %v", err) | |
done <- struct{}{} | |
} | |
}() | |
if r.GracefulStop != 0 { | |
t.Cleanup(func() { | |
ctx, cancel := context.WithTimeout(context.Background(), r.GracefulStop) | |
defer cancel() | |
r.Stop(ctx) | |
}) | |
} | |
<-done | |
if !success { | |
t.Fatal("search-replica failed to start") | |
} | |
} | |
// Stop tries to gracefully shutdown search-replica. | |
// Should be called with t.Cleanup. | |
// | |
// If it doesn't terminate gracefully before the received context expires, | |
// it kills the process. | |
func (r *Replica) Stop(ctx context.Context) { | |
if r.cmd == nil || r.cmd.Process == nil || r.cmd.Process.Pid == 0 { | |
return | |
} | |
r.cmd.Process.Signal(syscall.SIGTERM) | |
select { | |
case <-r.stopCtx.Done(): | |
return | |
case <-ctx.Done(): | |
r.t.Error("failed to shutdown search-replica gracefully") | |
} | |
} | |
// expectWriter implements a io.Writer that checks if a message appears in a stream. | |
type expectWriter struct { | |
find []byte | |
success bool | |
ready func() | |
memory []byte | |
} | |
func (w *expectWriter) Write(p []byte) (n int, err error) { | |
if !w.success && bytes.Contains(append(w.memory, p...), w.find) { | |
w.success = true | |
w.ready() | |
} | |
switch { | |
case len(p) > len(w.find): | |
w.memory = p | |
default: | |
w.memory = append(w.memory[len(w.find)-len(p):], p...) | |
} | |
return len(p), nil | |
} | |
// NewInfrastructure creates a new search engine infrastructure. | |
func NewInfrastructure(t testing.TB, c *opensearch.Client) *Infrastructure { | |
return &Infrastructure{ | |
t: t, | |
client: c, | |
} | |
} | |
// Infrastructure for automating integration testing with OpenSearch. | |
type Infrastructure struct { | |
t testing.TB | |
client *opensearch.Client | |
} | |
// Reindex recreates the search engine index. | |
// | |
// You probably want to use the database name as the index name. | |
// To make development easier, the index isn't deleted after the tests are done so you can manually check them easily. | |
// You can use the OpenSearch Dashboard or CLI tool for that. | |
// | |
// After table schema changes, the settings or mapping might get outdated. | |
// If this happens, you might find the following command useful to understand what is going on: | |
// $ opensearch-cli curl get --path "/<index>" | jq --indent 4 | |
func (i *Infrastructure) Reindex(ctx context.Context, index string, body []byte) { | |
i.deleteIndex(ctx, index) | |
i.createIndex(ctx, index, body) | |
} | |
// createdIndex creates the search index. | |
func (i *Infrastructure) createIndex(ctx context.Context, index string, body []byte) { | |
i.verifyIndex(index) | |
req := opensearchapi.IndicesCreateRequest{ | |
Index: index, | |
Body: bytes.NewBuffer(body), | |
} | |
resp, err := req.Do(ctx, i.client) | |
if err != nil { | |
i.t.Fatalf("map index: %v", err) | |
} | |
defer resp.Body.Close() // nolint: errcheck | |
if resp.IsError() { | |
i.t.Fatalf("search: mapping response with status code %v", resp.StatusCode) | |
} | |
} | |
// deleteIndex deletes the search index. | |
// If the index doesn't exist, it does nothing. | |
func (i *Infrastructure) deleteIndex(ctx context.Context, index string) { | |
i.verifyIndex(index) | |
req := opensearchapi.IndicesDeleteRequest{ | |
Index: []string{index}, | |
} | |
resp, err := req.Do(ctx, i.client) | |
if err != nil { | |
i.t.Fatalf("cleanup index: %v", err) | |
} | |
defer resp.Body.Close() // nolint: errcheck | |
// Ignore index_not_found_exception error. | |
if resp.IsError() && resp.StatusCode != 404 { | |
i.t.Fatalf("search: cleanup response with status code %v", resp.StatusCode) | |
} | |
} | |
// verifyIndex checks if the index starts with "test" with the purpose of making this fail safely if called on a production environment by mistake. | |
func (i *Infrastructure) verifyIndex(index string) { | |
if !strings.HasPrefix(index, IndexPrefix) { | |
i.t.Fatalf(`refusing to run integration tests: search engine index name is %q (%q prefix is required)`, index, IndexPrefix) | |
} | |
} | |
// Logger for OpenSearch requests executed during tests. | |
// | |
// It only prints when verbose mode is used, and its output is controlled by | |
// setting the environment variable VERBOSE_OPENSEARCH=true. | |
func Logger(t testing.TB) io.Writer { | |
return &testingLogger{ | |
t: t, | |
} | |
} | |
type testingLogger struct { | |
t testing.TB | |
} | |
func (l *testingLogger) Write(p []byte) (int, error) { | |
l.t.Log(string(p)) | |
return 0, nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment