Created
September 15, 2016 16:05
-
-
Save PierreZ/7b2913414562bb686bc1fe0f41a56dec to your computer and use it in GitHub Desktop.
warp10push
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package main | |
| import ( | |
| "bytes" | |
| "fmt" | |
| "log" | |
| "net/http" | |
| "net/http/httputil" | |
| "os" | |
| "strings" | |
| "time" | |
| ) | |
| var ( | |
| // Env vars | |
| warp10Endpoint = os.Getenv("WARP10_ENDPOINT") | |
| warp10Token = os.Getenv("WARP10_TOKEN") | |
| // Channel for GTS | |
| ch chan (*GTS) | |
| ) | |
| // GTS is a representation of a Geo Time Series | |
| // Please see http://www.warp10.io/apis/gts-input-format/ | |
| type GTS struct { | |
| TS int64 // Timestamp of the reading, in microseconds since the Unix Epoch | |
| Name string // Class name | |
| Labels map[string]string // Comma separated list of labels, using the syntax `key=value` | |
| Value int64 // The value of the reading | |
| } | |
| // NewGTS is creating a new GTS with a name and a value | |
| func NewGTS(name string, value int64) *GTS { | |
| now := time.Now() | |
| nanos := now.UnixNano() / 1000 | |
| return >S{Name: name, Value: value, TS: nanos} | |
| } | |
| // AddLabel is pushing a new label to the GTS | |
| func (gts *GTS) AddLabel(key string, value string) { | |
| gts.Labels[key] = value | |
| } | |
| // Publish is pushing a new GTS on the channel | |
| func Publish(gts *GTS) { | |
| ch <- gts | |
| } | |
| // Print respects the following format: | |
| // TS/LAT:LON/ELEV NAME{LABELS} VALUE | |
| func (gts GTS) Print() []byte { | |
| var s string | |
| fmt.Println(gts) | |
| s = fmt.Sprintf("%d// %s{%s} %d", gts.TS, gts.Name, gts.getLabels(), gts.Value) | |
| return []byte(s) | |
| } | |
| // getLabels format the map into the right form | |
| func (gts GTS) getLabels() string { | |
| s := "" | |
| for key, value := range gts.Labels { | |
| s = s + key + "=" + value + "," | |
| } | |
| // Removing last comma | |
| s = strings.TrimSuffix(s, ",") | |
| return s | |
| } | |
| // sendToWarp10 is used to push a GTS to a Warp10 datastore | |
| func sendToWarp10(gts *GTS) { | |
| req, err := http.NewRequest("POST", warp10Endpoint+"/api/v0/update", bytes.NewBuffer(gts.Print())) | |
| if err != nil { | |
| log.Println(err) | |
| return | |
| } | |
| req.Header.Set("X-Warp10-Token", warp10Token) | |
| client := &http.Client{} | |
| resp, err := client.Do(req) | |
| if resp.StatusCode != 200 { | |
| dump, err := httputil.DumpResponse(resp, true) | |
| if err != nil { | |
| log.Fatalln("error dumping request", err) | |
| } | |
| fmt.Println(string(dump)) | |
| return | |
| } | |
| log.Println("HTTP Return code", resp.StatusCode, "-", gts.Name, ":", gts.Value) | |
| defer resp.Body.Close() | |
| } | |
| // worker is starting the event-loop for reveiving GTS | |
| func worker() { | |
| var gts *GTS | |
| for { | |
| gts = <-ch | |
| go sendToWarp10(gts) | |
| } | |
| } | |
| // init is called at starting of the app | |
| func init() { | |
| // Checking env | |
| if len(warp10Endpoint) == 0 || len(warp10Token) == 0 { | |
| panic("WARP10_ENDPOINT or WARP10_TOKEN is not set, quitting...") | |
| } | |
| // Creating Channel | |
| ch = make(chan *GTS, 255) | |
| // Starting the event-loop | |
| go worker() | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment