Created
July 21, 2019 04:01
-
-
Save siddontang/7a5c8081abc5b3bb02193253c33dcbf6 to your computer and use it in GitHub Desktop.
Sync TiDB logs to Loki
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bufio" | |
"bytes" | |
"encoding/json" | |
"flag" | |
"fmt" | |
"io" | |
"net/http" | |
"os" | |
"regexp" | |
"strings" | |
"time" | |
) | |
var ( | |
addr string | |
path string | |
job string | |
expr *regexp.Regexp | |
) | |
const logFormat = `\[(?P<time>\d{4}\/\d{2}\/\d{2}.\d{2}:\d{2}:\d{2}.\d{3}.\+\d{2}:\d{2})\].\[(?P<level>\w*)\].\[(?P<source>\w.*):(?P<line>\d*)\].(?P<message>.*$)` | |
const timeFormat = "2006/01/02 15:04:05.000 -07:00" | |
func init() { | |
flag.StringVar(&addr, "addr", "http://127.0.0.1:3100/api/prom/push", "Loki Address") | |
flag.StringVar(&path, "path", "", "Log to be synced to Loki") | |
flag.StringVar(&job, "job", "tikv", "Job name") | |
} | |
func perr(err error) { | |
if err == nil { | |
return | |
} | |
println(err.Error()) | |
os.Exit(1) | |
} | |
type entryBody struct { | |
Timestamp string `json:"ts"` | |
Line string `json:"line"` | |
} | |
type streamBody struct { | |
Labels string `json:"labels"` | |
Entries []entryBody `json:"entries"` | |
} | |
type jsonBody struct { | |
Streams []streamBody `json:"streams"` | |
} | |
func postLog(labels string, ts time.Time, line string) { | |
body := jsonBody{ | |
Streams: []streamBody{ | |
{ | |
Labels: labels, | |
Entries: []entryBody{ | |
{ | |
Timestamp: ts.Format("2006-01-02T15:04:05.000000-07:00"), | |
Line: line, | |
}, | |
}, | |
}, | |
}, | |
} | |
jsonValue, err := json.Marshal(body) | |
perr(err) | |
_, err = http.Post(addr, "application/json", bytes.NewBuffer(jsonValue)) | |
perr(err) | |
} | |
func watchLog() { | |
f, err := os.Open(path) | |
perr(err) | |
defer f.Close() | |
reader := bufio.NewReader(f) | |
for { | |
line, _, err := reader.ReadLine() | |
if err == io.EOF { | |
// Very simple way, sleep and try again | |
time.Sleep(time.Second) | |
continue | |
} | |
perr(err) | |
labels, ts, newLine := parseLine(string(line)) | |
fmt.Printf("%v %s %s\n", labels, ts, newLine) | |
postLog(labels, ts, newLine) | |
} | |
} | |
// Label is the key-value label | |
type Label struct { | |
Key string | |
Value string | |
} | |
func parseLine(line string) (labelString string, t time.Time, s string) { | |
s = line | |
match := expr.FindStringSubmatch(line) | |
if match == nil { | |
return | |
} | |
var err error | |
labels := []Label{ | |
{Key: "job", Value: job}, | |
} | |
for i, name := range expr.SubexpNames() { | |
if i != 0 && name != "" { | |
if name == "time" { | |
t, err = time.Parse(timeFormat, match[i]) | |
perr(err) | |
s = strings.TrimSpace(line) | |
s = s[len(fmt.Sprintf("[%s]", match[i])):] | |
} else if name == "level" { | |
labels = append(labels, Label{Key: name, Value: match[i]}) | |
} | |
} | |
} | |
var buf bytes.Buffer | |
buf.WriteString("{") | |
for i, label := range labels { | |
buf.WriteString(fmt.Sprintf("%s=\"%s\"", label.Key, label.Value)) | |
if i != len(labels)-1 { | |
buf.WriteString(",") | |
} | |
} | |
buf.WriteString("}") | |
labelString = buf.String() | |
return | |
} | |
func main() { | |
flag.Parse() | |
expr = regexp.MustCompile(logFormat) | |
watchLog() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment