Skip to content

Instantly share code, notes, and snippets.

@maurorappa
Last active February 25, 2019 22:49
Show Gist options
  • Save maurorappa/4f91da3331795b8dd7b089607fd7244d to your computer and use it in GitHub Desktop.
Save maurorappa/4f91da3331795b8dd7b089607fd7244d to your computer and use it in GitHub Desktop.
Prometheus CSV storage
package main
import (
"bufio"
"bytes"
"compress/zlib"
"flag"
"fmt"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/common/model"
"io/ioutil"
"log"
"net/http"
"os"
"strconv"
"strings"
"time"
"github.com/prometheus/prometheus/prompb"
)
var (
filename string
incremental,max int
metric_name,label_name,label_value string
start time.Time
todisk chan (string)
verbose bool
file os.File
)
func init() {
todisk = make(chan string,10)
}
func savetofile(todisk <-chan string) {
for {
datais := <-todisk
line:=strings.Split(datais,">")
item,_:=strconv.Atoi(line[0])
if (item % max) == 0 {
incremental = item/max
fmt.Printf("time for new file_%d\n",incremental)
}
filename = "data_" + metric_name + "_" + start.Format("2006-01-02_15-04-05") + "_" + strconv.Itoa(incremental) + ".csv"
file, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
fmt.Println("Cannot create file, ", err)
os.Exit(17)
}
fmt.Fprintf(file,"%s\n", line[1])
file.Close()
}
}
func main() {
flag.StringVar(&metric_name,"m", "", "metric name")
flag.StringVar(&label_name,"n", "*", "label name")
flag.StringVar(&label_value,"l", "*", "label value")
flag.IntVar(&max,"x", 100, "Max entries per file")
flag.BoolVar(&verbose,"v", false, "Enable logging")
flag.Parse()
start = time.Now()
item:= 0
go savetofile(todisk)
http.HandleFunc("/receive", func(w http.ResponseWriter, r *http.Request) {
compressed, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
reqBuf, err := snappy.Decode(nil, compressed)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
var req prompb.WriteRequest
if err := proto.Unmarshal(reqBuf, &req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
hit := false
text := ""
for _, ts := range req.Timeseries {
m := make(model.Metric, len(ts.Labels))
text = ""
for _, l := range ts.Labels {
text = text + l.Name + "," + l.Value + ","
if m[model.LabelName("__name__")] == model.LabelValue(metric_name) {
hit = true
}
m[model.LabelName(l.Name)] = model.LabelValue(l.Value)
if m[model.LabelName(label_name)] == model.LabelValue(label_value) {
hit = true
}
if l.Name == label_name && (l.Value == label_value || label_value == "*") {
hit = true
}
}
if hit {
samples := ""
for _, s := range ts.Samples {
samples = samples + "," + strconv.FormatInt(s.Timestamp,10) + "," + strconv.FormatInt(int64(s.Value),10)
}
item++
allofit := strconv.Itoa(item)+">"+strings.TrimSuffix(text,",")+samples
fmt.Printf("%s\n", allofit)
todisk <- allofit
}
hit = false
}
})
log.Fatal(http.ListenAndServe(":1234", nil))
}
func compress(filename string) {
rawfile, err := os.Open(filename)
defer rawfile.Close()
// calculate the buffer size for rawfile
info, _ := rawfile.Stat()
var size int64 = info.Size()
rawbytes := make([]byte, size)
// read rawfile content into buffer
buffer := bufio.NewReader(rawfile)
_, err = buffer.Read(rawbytes)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
var buf bytes.Buffer
writer := zlib.NewWriter(&buf)
writer.Write(rawbytes)
writer.Close()
err = ioutil.WriteFile(filename+".zlib", buf.Bytes(), info.Mode())
// use 0666 to replace info.Mode() if you prefer
if err != nil {
fmt.Println(err)
os.Exit(1)
}
fmt.Printf("%s compressed\n", filename)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment