Last active
July 14, 2022 11:39
-
-
Save fabiand/6ab55588487a2ce00320cc560b74d136 to your computer and use it in GitHub Desktop.
Index pod/container logs into loki running locally
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# loki from | |
# https://github.com/grafana/loki/tree/v2.6.0 | |
# run: | |
# $ pwd | |
# <somewhere>/ocp-must-gather.local.614777<...>openshift-origin-must-gather-sha256-<...>77c9c15e76eca | |
# $ python3 mgloki.py | |
# namespaces/openshift-apiserver-operator/pods/openshift-apiserver-operator-76bd66d74b-v22tc/openshift-apiserver-operator-55bdd9d74b-v63tc.yaml | |
# … | |
# | |
# query: | |
# $ ./logcli-linux-amd64 query 'rate({host_name =~ ".+"}[5m] |= "client-side")' | |
import yaml | |
from pprint import pprint | |
from glob import glob | |
import os.path | |
import json | |
import re | |
import requests | |
import datetime | |
import timestring | |
import time | |
def yffile(name): | |
with open(name, "r") as fd: | |
return yaml.safe_load(fd) | |
class Container(): | |
name = None | |
logsfn = None | |
def logs(self): | |
with open(self.logsfn, "r") as logstream: | |
for line in logstream.readlines(): | |
yield line | |
class Pod(): | |
yaml = None | |
containers = [] | |
def from_file(fn): | |
p = Pod() | |
p.yaml = yffile(fn) | |
print(fn) | |
for cy in p.yaml["spec"]["containers"]: | |
c = Container() | |
c.yaml = cy | |
c.logsfn = os.path.join(os.path.dirname(fn), c.yaml["name"], c.yaml["name"], "logs", "current.log") | |
assert os.path.exists(c.logsfn) | |
p.containers.append(c) | |
return p | |
def structured_logs(self): | |
assert len(self.containers) > 0 | |
for c in self.containers: | |
for line in c.logs(): | |
# This line below needs to be specific per container! | |
splitLine = re.split("\s+", line) | |
# Originally for ES, then in LokiIngester mangled to match loki | |
yield {"@timestamp": splitLine[0], | |
"message": " ".join(splitLine[1:]), | |
"host.name": self.yaml["spec"]["nodeName"], | |
"host.ip": self.yaml["status"]["hostIP"], | |
"pod.uid": self.yaml["metadata"]["uid"], | |
"pod.name": self.yaml["metadata"]["name"], | |
"pod.namespace": self.yaml["metadata"]["namespace"], | |
"container.name": c.yaml["name"], | |
# "pod.ownerReferences": self.yaml["metadata"]["ownerReferences"], | |
} | |
class Pods(): | |
glob = "namespaces/*/pods/*/*.yaml" | |
def structured_logs(self): | |
for pfn in glob(self.glob): | |
pod = Pod.from_file(pfn) | |
for line in pod.structured_logs(): | |
yield line | |
class LokiIngester(): | |
def __init__(self): | |
self.session = requests.Session() | |
def post(self, json): | |
#pprint(json) | |
resp = self.session.post("http://localhost:3100/loki/api/v1/push", json=json) | |
assert resp.status_code == 204 | |
#print(resp.content) | |
return resp | |
def ingestOne(self, line): | |
labels = {k.replace(".", "_"): v for k, v in line.items()} | |
msg = line["message"] | |
# FIXME this must be parsed from the logfile | |
# How do we circumvent that loki just accepts "recent" logs? | |
#ts = line["@timestamp"] | |
ts = time.time_ns() | |
del labels["message"] | |
del labels["@timestamp"] | |
streams = {"streams": [{ | |
"stream": labels, | |
"values": [[str(ts), msg]] | |
}]} | |
resp = self.post(json=streams) | |
def ingest(self, structuredLogs): | |
for line in structuredLogs: | |
self.ingestOne(self) | |
loki = LokiIngester() | |
#loki.post({"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ str(int(time.time() * 1e9)), "fizzbuzz" ]]}]}) | |
for line in Pods().structured_logs(): | |
loki.ingestOne(line) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment