Skip to content

Instantly share code, notes, and snippets.

@PierreZ
Created September 15, 2016 16:05
Show Gist options
  • Save PierreZ/7b2913414562bb686bc1fe0f41a56dec to your computer and use it in GitHub Desktop.
Save PierreZ/7b2913414562bb686bc1fe0f41a56dec to your computer and use it in GitHub Desktop.
warp10push
package main
import (
"bytes"
"fmt"
"log"
"net/http"
"net/http/httputil"
"os"
"strings"
"time"
)
var (
// Env vars
warp10Endpoint = os.Getenv("WARP10_ENDPOINT")
warp10Token = os.Getenv("WARP10_TOKEN")
// Channel for GTS
ch chan (*GTS)
)
// GTS is a representation of a Geo Time Series
// Please see http://www.warp10.io/apis/gts-input-format/
type GTS struct {
TS int64 // Timestamp of the reading, in microseconds since the Unix Epoch
Name string // Class name
Labels map[string]string // Comma separated list of labels, using the syntax `key=value`
Value int64 // The value of the reading
}
// NewGTS is creating a new GTS with a name and a value
func NewGTS(name string, value int64) *GTS {
now := time.Now()
nanos := now.UnixNano() / 1000
return &GTS{Name: name, Value: value, TS: nanos}
}
// AddLabel is pushing a new label to the GTS
func (gts *GTS) AddLabel(key string, value string) {
gts.Labels[key] = value
}
// Publish is pushing a new GTS on the channel
func Publish(gts *GTS) {
ch <- gts
}
// Print respects the following format:
// TS/LAT:LON/ELEV NAME{LABELS} VALUE
func (gts GTS) Print() []byte {
var s string
fmt.Println(gts)
s = fmt.Sprintf("%d// %s{%s} %d", gts.TS, gts.Name, gts.getLabels(), gts.Value)
return []byte(s)
}
// getLabels format the map into the right form
func (gts GTS) getLabels() string {
s := ""
for key, value := range gts.Labels {
s = s + key + "=" + value + ","
}
// Removing last comma
s = strings.TrimSuffix(s, ",")
return s
}
// sendToWarp10 is used to push a GTS to a Warp10 datastore
func sendToWarp10(gts *GTS) {
req, err := http.NewRequest("POST", warp10Endpoint+"/api/v0/update", bytes.NewBuffer(gts.Print()))
if err != nil {
log.Println(err)
return
}
req.Header.Set("X-Warp10-Token", warp10Token)
client := &http.Client{}
resp, err := client.Do(req)
if resp.StatusCode != 200 {
dump, err := httputil.DumpResponse(resp, true)
if err != nil {
log.Fatalln("error dumping request", err)
}
fmt.Println(string(dump))
return
}
log.Println("HTTP Return code", resp.StatusCode, "-", gts.Name, ":", gts.Value)
defer resp.Body.Close()
}
// worker is starting the event-loop for reveiving GTS
func worker() {
var gts *GTS
for {
gts = <-ch
go sendToWarp10(gts)
}
}
// init is called at starting of the app
func init() {
// Checking env
if len(warp10Endpoint) == 0 || len(warp10Token) == 0 {
panic("WARP10_ENDPOINT or WARP10_TOKEN is not set, quitting...")
}
// Creating Channel
ch = make(chan *GTS, 255)
// Starting the event-loop
go worker()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment