Skip to content

Instantly share code, notes, and snippets.

@matiasinsaurralde
Created June 5, 2020 13:53
Show Gist options
  • Save matiasinsaurralde/4721f17432e466008a6ad301ca2d5182 to your computer and use it in GitHub Desktop.
Save matiasinsaurralde/4721f17432e466008a6ad301ca2d5182 to your computer and use it in GitHub Desktop.
es-test.go
package pumps
import(
"testing"
"encoding/json"
"github.com/ory/dockertest/v3"
"github.com/TykTechnologies/tyk-pump/analytics"
"context"
"fmt"
"time"
"os"
elasticv6 "gopkg.in/olivere/elastic.v6"
stdlog "log"
)
const(
esDockerImage = "elasticsearch"
)
var(
esTestPool *dockertest.Pool
)
type recordMapping struct {
APIID string `json:"api_id"`
Timestamp time.Time `json:"@timestamp"`
HTTPMethod string `json:"http_method"`
IPAddress string `json:"ip_address"`
RequestURI string `json:"request_uri"`
RequestTime int64 `json:"request_time_ms"`
}
func init() {
var err error
esTestPool, err = dockertest.NewPool("")
if err != nil {
log.Fatal(err)
}
}
func TestElasticSearchPump(t *testing.T) {
resource, err := esTestPool.Run(esDockerImage, "6.8.10", []string{"discovery.type=single-node"})
if err != nil {
t.Fatal(err)
}
err = esTestPool.Retry(func() error {
// Initialize test client:
esPort := resource.GetPort("9200/tcp")
esURL := fmt.Sprintf("http://localhost:%s", esPort)
urls := []string{esURL}
esClient, err := elasticv6.NewClient(elasticv6.SetURL(urls...),
elasticv6.SetErrorLog(stdlog.New(os.Stderr, "ERROR ", stdlog.LstdFlags)),
elasticv6.SetInfoLog(stdlog.New(os.Stdout, "INFO ", stdlog.LstdFlags)))
if err != nil {
return err
}
// Initialize ES pump:
pmp := ElasticsearchPump{}
cfg := map[string]interface{}{
"elasticsearch_url": esURL,
"version": "6",
"disable_bulk": true,
}
err = pmp.Init(cfg)
if err != nil {
t.Fatal(err)
}
// Write a sample record:
record := analytics.AnalyticsRecord{
APIID: "41433797848f41a558c1573d3e55a410",
IPAddress: "127.0.0.1",
Method: "GET",
Path: "/test",
TimeStamp: time.Now(),
RequestTime: 100,
}
data := make([]interface{}, 0)
for n := 0; n < 20; n++ {
data = append(data, record)
}
err = pmp.WriteData(context.Background(), data)
if err != nil {
t.Fatal(err)
}
stats, err := esClient.IndexStats("tyk_analytics").Human(true).Pretty(true).Do(context.Background())
stat := stats.Indices["tyk_analytics"]
statJSON, _ := json.Marshal(stat)
fmt.Println("ES STATS: ", string(statJSON))
time.Sleep(5*time.Second)
// Retrieve records:
result, err := esClient.Search().Index("tyk_analytics").Do(context.Background())
if err != nil {
t.Fatal(err)
}
if result.TotalHits() == 0 {
// TODO: Handle this error
}
for _, hit := range result.Hits.Hits {
var record recordMapping
err := json.Unmarshal([]byte(*hit.Source), &record)
if err != nil {
t.Fatalf("Couldn't unmarshal stored record: %s", err.Error())
}
if record.HTTPMethod != "GET" {
t.Fatalf("Stored record HTTP method doesn't match, got %s, expected %s", record.HTTPMethod, "GET")
}
fmt.Println("Found record:", string(*hit.Source))
}
return nil
})
if err != nil {
t.Fatal(err)
}
esTestPool.Purge(resource)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment