Created
August 25, 2020 04:45
-
-
Save ledmonster/7d0de61a788b164a510c0359465a41f4 to your computer and use it in GitHub Desktop.
push prometheus exposition format metrics to VictoriaMetrics remote_write endpoint
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package metrics | |
import ( | |
"bytes" | |
"context" | |
"fmt" | |
"net/http" | |
"os" | |
"time" | |
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" | |
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" | |
"github.com/golang/snappy" | |
log "github.com/sirupsen/logrus" | |
) | |
var VictoriaMetricsEndpoint = os.Getenv("VICTORIA_METRICS_ENDPOINT") | |
func pushToVictoriaMetrics(ctx context.Context, job, instance, data string) error { | |
series := parseData(string(data), time.Now().Unix()*1000, func(s string) { | |
log.Errorf("parse error") | |
}) | |
// append same labels as pushgateway | |
addLabels(series, []prompbmarshal.Label{ | |
{Name: "job", Value: job}, | |
{Name: "instance", Value: instance}, | |
}) | |
var raw, compressed []byte | |
req := prompbmarshal.WriteRequest{Timeseries: series} | |
raw = prompbmarshal.MarshalWriteRequest(raw, &req) | |
compressed = snappy.Encode(compressed, raw) | |
res, err := http.Post(VictoriaMetricsEndpoint+"/api/v1/write", "application/json", bytes.NewBuffer(compressed)) | |
if err != nil { | |
return fmt.Errorf("failed to post metrics to VictoriaMetrics: %s", err) | |
} | |
defer res.Body.Close() | |
if res.StatusCode >= 400 { | |
return fmt.Errorf("failed to post metrics to VictoriaMetrics. response code: %d", res.StatusCode) | |
} | |
return nil | |
} | |
// brought from: https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/lib/promscrape/scrapework_test.go#L314 | |
func parseData(data string, timestamp int64, errLogger func(s string)) []prompbmarshal.TimeSeries { | |
var rows parser.Rows | |
rows.UnmarshalWithErrLogger(data, errLogger) | |
var tss []prompbmarshal.TimeSeries | |
for _, r := range rows.Rows { | |
labels := []prompbmarshal.Label{ | |
{ | |
Name: "__name__", | |
Value: r.Metric, | |
}, | |
} | |
for _, tag := range r.Tags { | |
labels = append(labels, prompbmarshal.Label{ | |
Name: tag.Key, | |
Value: tag.Value, | |
}) | |
} | |
var ts prompbmarshal.TimeSeries | |
ts.Labels = labels | |
ts.Samples = []prompbmarshal.Sample{ | |
{ | |
Value: r.Value, | |
Timestamp: timestamp, | |
}, | |
} | |
tss = append(tss, ts) | |
} | |
return tss | |
} | |
func addLabels(series []prompbmarshal.TimeSeries, labels []prompbmarshal.Label) { | |
for i, _ := range series { | |
labelLoop: | |
for _, label := range labels { | |
for j, _label := range series[i].Labels { | |
// overwrite label value if label already exists | |
if _label.Name == label.Name { | |
series[i].Labels[j].Value = label.Value | |
continue labelLoop | |
} | |
} | |
series[i].Labels = append(series[i].Labels, label) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment