Created
May 16, 2020 20:47
-
-
Save tacaswell/fb493516532b9456a7eb5cba61e14ab8 to your computer and use it in GitHub Desktop.
Example of how to ingest sensord logs (from journalctl) to EventModel
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from event_model import compose_run | |
import datetime | |
import dateutil.parser | |
import itertools | |
from io import StringIO | |
import subprocess | |
import numpy as np | |
import functools | |
def extract_event(ts, batch): | |
""" | |
Extract and format the data and timestamp dictionaries | |
Parameters | |
---------- | |
ts : float | |
unix timestamp as a float | |
batch : List[str] | |
Assumed to be of the form :: | |
Adapter: group | |
a: 12 V | |
b: 43 C (min = 127.0 C, max = 127.0 C) | |
Adapter: group2 | |
c: 15 RPM | |
Returns | |
------- | |
data, timestamps : Dict[str, float] | |
The data and timestamp dictionaries as you would find an an Event | |
""" | |
out = {} | |
current_section = "" | |
for entry in batch: | |
head, _, tail = entry.partition(": ") | |
if not head.startswith(" "): | |
current_section = tail.strip() | |
continue | |
key = (current_section, head.strip()) | |
out["_".join(key)] = float(tail.strip().split(" ")[0]) | |
return out, {k: ts for k in out} | |
def extract_datakeys(machine, system, batch): | |
""" | |
Extract and format the data and timestamp dictionaries | |
Parameters | |
---------- | |
ts : float | |
unix timestamp as a float | |
batch : List[str] | |
Assumed to be of the form :: | |
Adapter: group | |
a: 12 V | |
b: 43 C (min = 127.0 C, max = 127.0 C) | |
Adapter: group2 | |
c: 15 RPM | |
Returns | |
------- | |
data_keys : Dict[str, Dict[str, Any]] | |
The data keys as you would find an a Descriptor | |
""" | |
out = {} | |
for entry in batch: | |
head, _, tail = entry.partition(": ") | |
if not head.startswith(" "): | |
current_section = tail.strip() | |
continue | |
key = (current_section, head.strip()) | |
out["_".join(key)] = { | |
"source": f"{machine}_{system}", | |
"dtype": "number", | |
"shape": [], | |
"units": tail.strip().split(" ")[1], | |
} | |
return out | |
def _process_batch(machine, system, dt, batch, *, descs, run_bundle): | |
""" | |
Process the list of strings to an event (and maybe a descriptor) | |
Parameters | |
---------- | |
machine : str | |
The name of the machine | |
system : str | |
The same of the logging system | |
batch : List[str] | |
Assumed to be of the form :: | |
Adapter: group | |
a: 12 V | |
b: 43 C (min = 127.0 C, max = 127.0 C) | |
Adapter: group2 | |
c: 15 RPM | |
descs : Dict[ComposeDescriptorBundle] | |
Cache of know descriptor bundles | |
.. warning :: | |
This input will be mutated | |
run_bundle : ComposeRunBundle | |
The run we are making events for | |
.. warning :: | |
This input will be mutated (implicitly by it's use) | |
""" | |
# first, get the event data | |
data, ts = extract_event(dt.timestamp(), batch) | |
# check if we need to make a new descriptor | |
key = frozenset(data) | |
if key not in descs: | |
# sometimes the logs are missing an entry, handle that ... | |
for dk in descs: | |
if dk > key: | |
missing = dk - key | |
for missing_key in missing: | |
data[missing_key] = np.nan | |
ts[missing_key] = dt.timestamp() | |
key = dk | |
break | |
else: | |
# other wise get the data keys | |
data_keys = extract_datakeys(machine, system, batch) | |
# make the descriptor | |
descriptor_bundle = run_bundle.compose_descriptor( | |
name=system, data_keys=data_keys | |
) | |
# emit it | |
yield "descriptor", descriptor_bundle.descriptor_doc | |
# and stash it | |
descs[key] = descriptor_bundle | |
# get the descriptor from the cache | |
descriptor_bundle = descs[key] | |
# mint the event | |
ev = descriptor_bundle.compose_event(data=data, timestamps=ts) | |
# and emit it | |
yield "event", ev | |
def parse_sensord_log(fin, *, md=None): | |
""" | |
Parse sensord log data to event model | |
Assumes the input logs look like:: | |
... | |
May 10 19:35:19 jupiter sensord[610]: Chip: coretemp-isa-0000 | |
May 10 19:35:19 jupiter sensord[610]: Adapter: ISA adapter | |
May 10 19:35:19 jupiter sensord[610]: Package id 0: 57.0 C | |
May 10 19:35:19 jupiter sensord[610]: Core 0: 57.0 C | |
May 10 19:35:19 jupiter sensord[610]: Core 1: 48.0 C | |
May 10 19:35:19 jupiter sensord[610]: Adapter: ACPI interface | |
May 10 19:35:19 jupiter sensord[610]: temp1: 27.8 C | |
May 10 19:35:19 jupiter sensord[610]: Adapter: ISA adapter | |
May 10 19:35:19 jupiter sensord[610]: in0: +1.19 V (min = +0.00 V, max = +3.06 V) | |
May 10 19:35:19 jupiter sensord[610]: fan1: 866 RPM (min = 0 RPM) (beep) | |
May 10 19:35:19 jupiter sensord[610]: temp1: 39.0 C (min = 127.0 C, max = 127.0 C) (beep) | |
May 10 19:35:19 jupiter sensord[610]: Adapter: PCI adapter | |
May 10 19:35:19 jupiter sensord[610]: Composite: 38.9 C (min = -0.1 C, max = 76.8 C) | |
... | |
Parameters | |
---------- | |
fin : file-like of the log | |
Expects `fin.readline` and `iter(fin)` to give one line at at time. | |
md : Dict[str, Any] | |
Any extra metadata to inject into the start document | |
Yields | |
------ | |
name : str | |
doc : Dict[str, Any] | |
The name / doc pairs of the data | |
""" | |
md = md or {} | |
# get the first line to get a start time | |
ln = None | |
while not ln: | |
ln = fin.readline().strip() | |
if ln.startswith("--"): | |
# skip the comment | |
ln = None | |
month, day, time, rest = ln.split(" ", 3) | |
machine, _, rest = rest.partition(" ") | |
md.setdefault("machine", machine) | |
dt = dateutil.parser.parse(" ".join([month, day, time])) | |
# local running state | |
run_bundle = compose_run(time=dt.timestamp(), metadata=md) | |
descs = {} | |
current_stamp = dt | |
batch_data = [] | |
# this closes over descs cache and the run bundle | |
process_batch = functools.partial( | |
_process_batch, descs=descs, run_bundle=run_bundle | |
) | |
yield "start", run_bundle.start_doc | |
# stick the first line back in, and process remaining text | |
for ln in itertools.chain([ln], fin): | |
# skip blank lines | |
if not ln.strip(): | |
continue | |
# get the timestamp on this line | |
month, day, time, rest = ln.split(" ", 3) | |
# TODO will have issues on year :/ | |
dt = dateutil.parser.parse(" ".join([month, day, time])) | |
# strip off the machine / system | |
machine, _, rest = rest.partition(" ") | |
system, _, rest = rest.partition(": ") | |
# drop the pid | |
system, *_ = system.partition("[") | |
# check if we are still in "current batch" | |
# see if we are within a minute of the previous line. These time stamps | |
# are when the line was emitted to the log (not when sensord ran) so can have | |
# a second spread in the timestamps of one batch, use 5 just to be safe | |
if (dt - current_stamp).total_seconds() > 5: | |
# if not and we have accumulated lines, generate the event | |
# (and maybe descriptor) | |
if batch_data: | |
yield from process_batch(machine, system, current_stamp, batch_data) | |
# reset local state | |
batch_data = [] | |
current_stamp = dt | |
# add the current line to the running set | |
batch_data.append(rest) | |
# make sure we get the last batch | |
if batch_data: | |
yield from process_batch(machine, system, current_stamp, batch_data) | |
# emit the stop document | |
yield "stop", run_bundle.compose_stop(time=current_stamp.timestamp()) | |
def fill_history(boot_index, callback): | |
""" | |
Extract per-boot "runs" of sensord from journalctl. | |
Runs :: | |
journalctl -b -{boot_index} -u sensord | |
and then pipes the results through parse_sensord_log to the callback. | |
Parameters | |
---------- | |
boot_index : int | |
How many boots back to extract the data from | |
callback : Callable[str, Dict] | |
Function to receive the documents | |
Examples | |
-------- | |
>> import databroker | |
>> db = databroker.Broker.named('temp') | |
# get the data from the last boot and store it | |
>> fill_history(1, db.insert) | |
""" | |
ret = subprocess.check_output( | |
f"journalctl -b -{boot_index} -u sensord".split(" ") | |
).decode() | |
for name, doc in parse_sensord_log(StringIO(ret)): | |
serializer(name, doc) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment