Skip to content

Instantly share code, notes, and snippets.

@jprante
Created October 17, 2015 15:11
Show Gist options
  • Save jprante/6fe1dbe53509b6ac2afc to your computer and use it in GitHub Desktop.
Save jprante/6fe1dbe53509b6ac2afc to your computer and use it in GitHub Desktop.
Testing a HTTP ES client in Go
package main
import (
"fmt"
"github.com/satori/go.uuid"
"os"
//"crypto/tls"
"bytes"
"encoding/json"
"github.com/Sirupsen/logrus"
"io/ioutil"
"net"
"net/http"
"net/url"
"sync"
"time"
)
var (
log = logrus.New()
esHttpClient = GetHttpClient()
wg sync.WaitGroup
esQueue chan (*Record) = make(chan *Record, 10)
total_es_records = 0
mutex sync.Mutex
done chan (bool) = make(chan bool, 1)
resp_counter = 0
)
var DefaultDialer = &net.Dialer{Timeout: 10 * time.Second, KeepAlive: 10 * time.Second}
func GetHttpClient() http.Client {
tr := &http.Transport{
//TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
Dial: DefaultDialer.Dial,
// TLSHandshakeTimeout: 2 * time.Second,
MaxIdleConnsPerHost: 1100,
}
if os.Getenv("HTTPS_PROXY") != "" {
proxyUrl, _ := url.Parse(os.Getenv("HTTPS_PROXY"))
tr.Proxy = http.ProxyURL(proxyUrl)
}
client := http.Client{Transport: tr}
return client
}
type Record struct {
id string
Name string
Time string
}
func (s *Record) SendToES() {
url := fmt.Sprintf("http://localhost:9200/foobar/example_type/%s", s.id)
retry_counter := 0
for {
js, err := json.Marshal(s)
if err != nil {
log.Error("Error marshalling record: ", err)
}
req, err := http.NewRequest("POST", url, bytes.NewBuffer(js))
if err != nil {
log.Error(err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Connection", "close") // 997,120 docs
//req.Header.Set("Connection", "keep-alive") // 416,700
resp, err := esHttpClient.Do(req)
if err != nil {
time.Sleep(time.Duration(retry_counter) * time.Second)
retry_counter++
if retry_counter > 100 {
log.Error("ES errored 5x: ", err)
break
}
continue
}
body, err := ioutil.ReadAll(resp.Body)
fmt.Printf("%s",body)
resp_counter++
resp.Body.Close()
if err != nil {
log.Error(err)
}
if os.Getenv("DEBUG") == "true" {
log.Info(string(body))
}
break
}
fmt.Printf("\nresp_counter=%d\n", resp_counter)
}
func startEsWorkers() {
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
for r := range esQueue {
r.SendToES()
mutex.Lock()
total_es_records++
mutex.Unlock()
}
wg.Done()
}()
}
done <- true
}
func main() {
go startEsWorkers()
for i := 0; i < 1000; i++ {
esQueue <- &Record{id: uuid.NewV4().String(), Name: "John Doe", Time: time.Now().String()}
}
log.Info("Finished queueing: ", total_es_records)
close(esQueue)
<-done
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment