Skip to content

Instantly share code, notes, and snippets.

@axw
Created August 28, 2018 07:42
Show Gist options
  • Save axw/379f3ad10528d76adea9bf1d1c11150e to your computer and use it in GitHub Desktop.
Save axw/379f3ad10528d76adea9bf1d1c11150e to your computer and use it in GitHub Desktop.
Test server for Elastic APM v2 protocol
package main
import (
"compress/gzip"
"compress/zlib"
"context"
"encoding/json"
"flag"
"fmt"
"io"
"log"
"net/http"
"net/http/httputil"
"os"
"os/signal"
"sync"
)
var (
verboseFlag = flag.Bool("v", false, "be verbose")
)
func main() {
var server http.Server
var countsMu sync.Mutex
totalCounts := make(map[string]int)
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
go func() {
<-signals
server.Shutdown(context.Background())
}()
defer func() {
countsMu.Lock()
defer countsMu.Unlock()
fmt.Println("\nTOTAL")
for name, count := range totalCounts {
fmt.Println(os.Stderr, " ", count, name)
}
}()
flag.Parse()
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) {
defer fmt.Println()
if *verboseFlag {
out, err := httputil.DumpRequest(req, false)
if err != nil {
panic(err)
}
fmt.Print(string(out))
}
var rawbytes, uncompressed counter
body := io.TeeReader(req.Body, &rawbytes)
defer func() {
fmt.Println("read", rawbytes.n, "bytes (raw)")
if uncompressed.n > 0 {
fmt.Printf(
"read %d bytes (uncompressed; %.2fx ratio)\n",
uncompressed.n, float64(uncompressed.n)/float64(rawbytes.n),
)
}
}()
switch req.Header.Get("Content-Encoding") {
case "deflate":
reader, err := zlib.NewReader(body)
if err != nil {
log.Println(err)
w.WriteHeader(400)
return
}
body = io.TeeReader(reader, &uncompressed)
case "gzip":
reader, err := gzip.NewReader(body)
if err != nil {
log.Println(err)
w.WriteHeader(400)
return
}
body = io.TeeReader(reader, &uncompressed)
}
//w.Header().Set("Connection", "close")
if true { // false to respond without consuming request body
counts := make(map[string]int)
dumpStats(body, counts)
countsMu.Lock()
for k, v := range counts {
totalCounts[k] += v
}
countsMu.Unlock()
}
w.WriteHeader(http.StatusAccepted)
})
server.Addr = ":8201"
server.Handler = mux
if err := server.ListenAndServe(); err != http.ErrServerClosed {
log.Println(err)
}
}
func dumpStats(r io.Reader, counts map[string]int) {
defer func() {
if r := recover(); r != nil && r != io.EOF {
panic(r)
}
for name, count := range counts {
fmt.Printf("%dx %s\n", count, name)
}
}()
decoder := json.NewDecoder(r)
for {
m := make(map[string]json.RawMessage)
if err := decoder.Decode(&m); err != nil {
panic(err)
}
for k := range m {
counts[k]++
}
}
}
type counter struct {
n int64
}
func (c *counter) Write(buf []byte) (int, error) {
c.n += int64(len(buf))
return len(buf), nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment