Skip to content

Instantly share code, notes, and snippets.

@wulab
Created May 13, 2020 16:06
Show Gist options
  • Save wulab/71eaeaddc0c00a263fb68a1b04f9da04 to your computer and use it in GitHub Desktop.
Save wulab/71eaeaddc0c00a263fb68a1b04f9da04 to your computer and use it in GitHub Desktop.
from elasticsearch import Elasticsearch
import re
import subprocess
import time
def build_doc():
doc = {}
doc.update(current_batt())
doc.update(current_time())
return doc
def current_batt():
readings = {}
output = subprocess.check_output(["/sbin/apcaccess"], encoding='utf-8')
for key in ["LINEV", "LOADPCT", "BCHARGE", "TIMELEFT", "BATTV"]:
readings[key] = extract(key, output)
return readings
def current_time():
return {"timestamp": int(time.time())}
def extract(key, string):
pattern = re.compile(f"{key}\s+:\s+([0-9,.]+)")
match = re.search(pattern, string)
if match:
return float(match[1])
else:
return None
def main():
es = Elasticsearch()
es.cluster.health(wait_for_status="yellow")
es.index(index="batteries", body=build_doc())
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment