Skip to content

Instantly share code, notes, and snippets.

@maurorappa
Created March 16, 2019 20:19
Show Gist options
  • Select an option

  • Save maurorappa/07ffc2e29c71ac4df92ed1f3331f7b34 to your computer and use it in GitHub Desktop.

Select an option

Save maurorappa/07ffc2e29c71ac4df92ed1f3331f7b34 to your computer and use it in GitHub Desktop.
Prometheus CSV uploader to AWS S3 as external storage
package main
//based on https://github.com/prometheus/prometheus/blob/release-2.8/documentation/examples/remote_storage/example_write_adapter/server.go
import (
"bufio"
"bytes"
"compress/zlib"
"flag"
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awsutil"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"
"io/ioutil"
"log"
"net/http"
"os"
"strconv"
"strings"
"time"
)
var (
filename string
incremental, max int
metric_name, label_name, label_value, aws_key, aws_secret, aws_bucket string
todisk chan (string)
verbose bool
file os.File
)
func init() {
todisk = make(chan string, 10)
}
func savetofile(todisk <-chan string) {
for {
datais := <-todisk
line := strings.Split(datais, ">")
item, _ := strconv.Atoi(line[0])
datetime := time.Now()
if (item % max) == 0 {
incremental = item / max
if verbose {
fmt.Printf("time for new file_%d\n", incremental)
}
filename = "data_" + metric_name + "_" + datetime.Format("2006-01-02_15") + "_" + strconv.Itoa(incremental-1) + ".csv"
compress(filename)
err := os.Remove(filename)
if err != nil {
fmt.Printf("Error delete file %s\n", filename)
}
s3upload(filename + ".zlib")
}
filename = "data_" + metric_name + "_" + datetime.Format("2006-01-02_15") + "_" + strconv.Itoa(incremental) + ".csv"
file, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
fmt.Println("Cannot create file, ", err)
os.Exit(17)
}
fmt.Fprintf(file, "%s\n", line[1])
file.Close()
}
}
func main() {
flag.StringVar(&metric_name, "m", "", "metric name")
flag.StringVar(&label_name, "n", "*", "label name")
flag.StringVar(&label_value, "l", "*", "label value")
flag.StringVar(&aws_key, "k", "", "AWS S3 Key")
flag.StringVar(&aws_bucket, "b", "", "AWS S3 Bucket name")
flag.StringVar(&aws_secret, "s", "", "AWS S3 Secret")
flag.IntVar(&max, "x", 100, "Max entries per file")
flag.BoolVar(&verbose, "v", false, "Enable logging")
flag.Parse()
item := 0
go savetofile(todisk)
http.HandleFunc("/receive", func(w http.ResponseWriter, r *http.Request) {
compressed, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
reqBuf, err := snappy.Decode(nil, compressed)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
var req prompb.WriteRequest
if err := proto.Unmarshal(reqBuf, &req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
hit := false
text := ""
for _, ts := range req.Timeseries {
m := make(model.Metric, len(ts.Labels))
text = ""
for _, l := range ts.Labels {
text = text + l.Name + "," + l.Value + ","
if m[model.LabelName("__name__")] == model.LabelValue(metric_name) {
hit = true
}
m[model.LabelName(l.Name)] = model.LabelValue(l.Value)
if m[model.LabelName(label_name)] == model.LabelValue(label_value) {
hit = true
}
if l.Name == label_name && (l.Value == label_value || label_value == "*") {
hit = true
}
}
if hit {
samples := ""
for _, s := range ts.Samples {
samples = samples + "," + strconv.FormatInt(s.Timestamp, 10) + "," + strconv.FormatInt(int64(s.Value), 10)
}
item++
allofit := strconv.Itoa(item) + ">" + strings.TrimSuffix(text, ",") + samples
if verbose {
fmt.Printf("received: %s\n", allofit)
}
todisk <- allofit
}
hit = false
}
})
log.Fatal(http.ListenAndServe(":1234", nil))
}
func compress(filename string) {
rawfile, err := os.Open(filename)
if err != nil {
fmt.Println(err)
return
}
defer rawfile.Close()
// calculate the buffer size for rawfile
info, _ := rawfile.Stat()
var size int64 = info.Size()
rawbytes := make([]byte, size)
// read rawfile content into buffer
buffer := bufio.NewReader(rawfile)
_, err = buffer.Read(rawbytes)
if err != nil {
fmt.Println(err)
}
var buf bytes.Buffer
writer := zlib.NewWriter(&buf)
writer.Write(rawbytes)
writer.Close()
err = ioutil.WriteFile(filename+".zlib", buf.Bytes(), info.Mode())
// use 0666 to replace info.Mode() if you prefer
if err != nil {
fmt.Println(err)
}
fmt.Printf("%s compressed\n", filename)
}
func s3upload(filename string) {
token := ""
creds := credentials.NewStaticCredentials(aws_key, aws_secret, token)
_, err := creds.Get()
if err != nil {
fmt.Println(err)
}
cfg := aws.NewConfig().WithRegion("eu-west-2").WithCredentials(creds)
svc := s3.New(session.New(), cfg)
file, err := os.Open(filename)
if err != nil {
fmt.Println(err)
}
defer file.Close()
fileInfo, _ := file.Stat()
size := fileInfo.Size()
buffer := make([]byte, size) // read file content to buffer
file.Read(buffer)
fileBytes := bytes.NewReader(buffer)
fileType := http.DetectContentType(buffer)
path := "/prometheus/" + file.Name()
params := &s3.PutObjectInput{
Bucket: aws.String(aws_bucket),
Key: aws.String(path),
Body: fileBytes,
ContentLength: aws.Int64(size),
ContentType: aws.String(fileType),
}
resp, err := svc.PutObject(params)
if err != nil {
fmt.Println(err)
}
if verbose {
fmt.Printf("response %s\n", awsutil.StringValue(resp))
} else {
fmt.Printf("uploaded to %s\n", aws_bucket)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment