Created
June 20, 2016 17:53
-
-
Save pauldix/616fef6052f499b466c312b3100747b9 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bufio" | |
"flag" | |
"fmt" | |
"github.com/influxdata/influxdb/client" | |
"math/rand" | |
"net/url" | |
"os" | |
"strings" | |
"sync" | |
"syscall" | |
"time" | |
) | |
var ( | |
userVar string | |
hostsVar string | |
tags = map[string]string{"foo": "bar"} | |
) | |
const ( | |
valuesPerPost = 100 | |
outputAfterPosts = 10 | |
queriesPerPeriod = 50 | |
pauseBetweenPeriod = time.Second | |
outputAfterPeriods = 5 | |
measurement = "thing" | |
fieldName = "val" | |
) | |
func main() { | |
fmt.Println("starting") | |
flag.StringVar(&userVar, "user", "", "InfluxDB user to make requests to database. You will be prompted to enter the password.") | |
flag.StringVar(&hostsVar, "hosts", "localhost:8086", "Comma separated list of <host>:<port> of hosts to write and query during test") | |
flag.Parse() | |
var password string | |
if userVar != "" { | |
password = getPassword("Enter password: ") | |
} | |
hosts := strings.Split(hostsVar, ",") | |
clients := make([]*client.Client, len(hosts)) | |
for i, h := range hosts { | |
config := client.NewConfig() | |
if userVar != "" { | |
config.Username = userVar | |
config.Password = password | |
} | |
u, err := url.Parse(fmt.Sprintf("http://%s", h)) | |
if err != nil { | |
fmt.Println(err.Error()) | |
flag.Usage() | |
os.Exit(1) | |
} | |
config.URL = *u | |
c, err := client.NewClient(config) | |
if err != nil { | |
fmt.Println(err) | |
flag.Usage() | |
os.Exit(1) | |
} | |
clients[i] = c | |
} | |
if err := writeHistorical(clients[0]); err != nil { | |
fmt.Println(err.Error()) | |
os.Exit(1) | |
} | |
for _, c := range clients { | |
go write(c) | |
go query(c) | |
} | |
// wait forever... | |
var wg sync.WaitGroup | |
wg.Add(1) | |
wg.Wait() | |
} | |
// writeHistorical will create the database and retention policy for the demo | |
// and then write in enough historical data to create multiple shards. | |
func writeHistorical(c *client.Client) error { | |
q := client.Query{ | |
Command: "CREATE DATABASE demo WITH DURATION 3d REPLICATION 1 SHARD DURATION 1h NAME \"default\"", | |
} | |
if res, err := c.Query(q); err != nil { | |
return err | |
} else if res.Error() != nil { | |
return res.Error() | |
} | |
fmt.Println("demo database created") | |
bp := client.BatchPoints{Database: "demo"} | |
// write 5 hours of historical data to create multiple shards for the demo | |
t := time.Now().Add(-5 * time.Hour) | |
for t.Before(time.Now()) { | |
bp.Points = append(bp.Points, client.Point{ | |
Measurement: measurement, | |
Tags: tags, | |
Fields: map[string]interface{}{fieldName: 1}, | |
Time: t, | |
}) | |
t = t.Add(100 * time.Millisecond) | |
} | |
if _, err := c.Write(bp); err != nil { | |
return err | |
} | |
fmt.Printf("wrote %d historical points\n", len(bp.Points)) | |
return nil | |
} | |
// continuously writes data at a set rate | |
func write(c *client.Client) { | |
runCount := 0 | |
errorCount := 0 | |
lastOutput := time.Now() | |
for { | |
bp := client.BatchPoints{ | |
Database: "demo", | |
} | |
for i := 0; i < valuesPerPost; i++ { | |
bp.Points = append(bp.Points, client.Point{ | |
Measurement: measurement, | |
Tags: tags, | |
Fields: map[string]interface{}{fieldName: i}, | |
Time: time.Now(), | |
}) | |
} | |
if _, err := c.Write(bp); err != nil { | |
errorCount++ | |
} | |
runCount++ | |
if runCount == outputAfterPosts { | |
valuesPosted := (valuesPerPost * runCount) - (valuesPerPost - errorCount) | |
fmt.Printf("%s: posted %d values in %d requests in %s. %d errors\n", | |
c.Addr(), | |
valuesPosted, | |
runCount, | |
time.Since(lastOutput), | |
errorCount) | |
lastOutput = time.Now() | |
errorCount = 0 | |
runCount = 0 | |
} | |
time.Sleep(time.Duration(100+rand.Intn(900)) * time.Millisecond) | |
} | |
} | |
// continuously queries the database at a set rate | |
func query(c *client.Client) { | |
runCount := 0 | |
errorCount := 0 | |
queryCount := 0 | |
lastOutput := time.Now() | |
for { | |
var r *client.Response | |
var err error | |
queriesToRun := rand.Intn(queriesPerPeriod) + 1 | |
queryCount += queriesToRun | |
for i := 0; i < queriesToRun; i++ { | |
q := client.Query{ | |
Command: fmt.Sprintf("select count(%s) from \"demo\".\"default\".\"%s\" where time > now() - 20s", fieldName, measurement), | |
Database: "demo", | |
} | |
r, err = c.Query(q) | |
if err != nil || r.Error() != nil { | |
errorCount++ | |
err = nil | |
} | |
} | |
runCount++ | |
if runCount == outputAfterPeriods { | |
var lastResponse string | |
if r == nil { | |
lastResponse = "NaN" | |
} else if r.Error() != nil { | |
lastResponse = r.Error().Error() | |
} else if len(r.Results) > 0 { | |
s := r.Results[0].Series | |
if len(s) == 0 { | |
lastResponse = "no results" | |
} | |
v := s[0].Values | |
if len(v) == 0 { | |
lastResponse = "no results" | |
} | |
lastResponse = fmt.Sprintf("%v", v[0][1]) | |
} else { | |
lastResponse = "no results" | |
} | |
fmt.Printf("%s: queried %d times with %d errors in %s. last count: %s\n", | |
c.Addr(), | |
queryCount, | |
errorCount, | |
time.Since(lastOutput), | |
lastResponse) | |
runCount = 0 | |
queryCount = 0 | |
errorCount = 0 | |
lastOutput = time.Now() | |
} | |
time.Sleep(pauseBetweenPeriod) | |
} | |
} | |
// getPassword - Prompt for password. Use stty to disable echoing. | |
func getPassword(prompt string) string { | |
fmt.Print(prompt) | |
// Common settings and variables for both stty calls. | |
attrs := syscall.ProcAttr{ | |
Dir: "", | |
Env: []string{}, | |
Files: []uintptr{os.Stdin.Fd(), os.Stdout.Fd(), os.Stderr.Fd()}, | |
Sys: nil} | |
var ws syscall.WaitStatus | |
// Disable echoing. | |
pid, err := syscall.ForkExec( | |
"/bin/stty", | |
[]string{"stty", "-echo"}, | |
&attrs) | |
if err != nil { | |
panic(err) | |
} | |
// Wait for the stty process to complete. | |
_, err = syscall.Wait4(pid, &ws, 0, nil) | |
if err != nil { | |
panic(err) | |
} | |
// Echo is disabled, now grab the data. | |
reader := bufio.NewReader(os.Stdin) | |
text, err := reader.ReadString('\n') | |
if err != nil { | |
panic(err) | |
} | |
// Re-enable echo. | |
pid, err = syscall.ForkExec( | |
"/bin/stty", | |
[]string{"stty", "echo"}, | |
&attrs) | |
if err != nil { | |
panic(err) | |
} | |
// Wait for the stty process to complete. | |
_, err = syscall.Wait4(pid, &ws, 0, nil) | |
if err != nil { | |
panic(err) | |
} | |
return strings.TrimSpace(text) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment