Created
April 9, 2018 22:04
-
-
Save xoseperez/c29072f4ce548f7bf7ce5129642fcffb to your computer and use it in GitHub Desktop.
Geekworm battery monitor script
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/python | |
# -*- coding: utf-8 -*- | |
# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 | |
# Geekworks RPi Power Pack Hat Monitor Service | |
# Copyright (C) 2018 by Xose Pérez <[email protected]> | |
# | |
# This program is free software: you can redistribute it and/or modify | |
# it under the terms of the GNU General Public License as published by | |
# the Free Software Foundation, either version 3 of the License, or | |
# (at your option) any later version. | |
# | |
# This program is distributed in the hope that it will be useful, | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
# GNU General Public License for more details. | |
# | |
# You should have received a copy of the GNU General Public License | |
# along with this program. If not, see <http://www.gnu.org/licenses/>. | |
import os | |
import sys | |
import signal | |
import threading | |
import time | |
import argparse | |
from lib.smbus import SMBus | |
from lib.config import Config | |
from lib.utils import ServiceExit, service_shutdown, write_pid_file | |
from lib.utils import printd, Level, Color, clr, set_debug_level | |
# ------------------------------------------------------------------------------ | |
class Battery(threading.Thread): | |
bus = SMBus(1) | |
address = 0x62 | |
capacity = 50 | |
state = 1 | |
count = 0 | |
flag = False | |
def __init__(self, interval, threshold, mincount): | |
threading.Thread.__init__(self) | |
self.interval = interval | |
self.threshold = threshold | |
self.mincount = mincount | |
self.shutdown_flag = threading.Event() | |
self.bus.write_byte_data(self.address, 0x0A, 0x00) | |
def run(self): | |
while not self.shutdown_flag.is_set(): | |
# Calculate current charge | |
msb = int(self.bus.read_byte_data(self.address, 0x04)) | |
lsb = int(self.bus.read_byte_data(self.address, 0x05)) | |
capacity = msb + lsb / 255.0 | |
# Calculate tendency | |
if (self.capacity > capacity): | |
state = 0 | |
if (self.capacity < capacity): | |
state = 1 | |
if (self.state != state): | |
self.count = 0 | |
self.state = state | |
self.capacity = capacity | |
self.count = self.count + 1 | |
# Debug | |
printd(clr(Color.LIGHT_GREY, "Battery capacity: %6.2f%% (%s)" % (self.capacity, "charging" if self.state == 1 else "discharging")), Level.DEBUG) | |
# Alerts | |
if not self.flag and self.state == 0 and self.capacity < self.threshold and self.count > self.mincount: | |
self.flag = True | |
message = "Shutting down system in 60 seconds due to critical battery level" | |
printd(clr(Color.GREEN, message), Level.INFO) | |
os.system("sudo shutdown -h +1 %s" % message) | |
if self.flag and self.state == 1: | |
self.flag = False | |
message = "Battery charging: shutdown cancelled" | |
printd(clr(Color.GREEN, message), Level.INFO) | |
os.system("sudo shutdown -c") | |
# Check again in X seconds | |
time.sleep(self.interval) | |
# ------------------------------------------------------------------------------ | |
def main(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument("-c", dest="config", action="store", help="configuration file", default="config.yml") | |
parser.add_argument("-v", dest="verbose", action="count", help="verbose mode", default=0) | |
arg = parser.parse_args(sys.argv[1:]) | |
# set debug level | |
set_debug_level(Level.INFO if 0 == arg.verbose else Level.DEBUG) | |
# Welcome message | |
printd(clr(Color.WHITE, "\nGeekworm Hat Battery Monitor\n"), Level.INFO) | |
# Load configuration | |
config = Config(arg.config) | |
config.load(); | |
# Configuration values | |
check_interval = config.get("battery", "check_interval", 10) | |
charge_threshold = config.get("battery", "charge_threshold", 25) | |
threshold_count = config.get("battery", "threshold_count", 6) | |
try: | |
# Init job and signal callbacks | |
signal.signal(signal.SIGTERM, service_shutdown) | |
signal.signal(signal.SIGINT, service_shutdown) | |
job = Battery(check_interval, charge_threshold, threshold_count) | |
job.start() | |
# Keep on running | |
while True: | |
time.sleep(1) | |
except ServiceExit: | |
job.shutdown_flag.set() | |
None | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import ruamel.yaml | |
from ruamel.yaml.util import load_yaml_guess_indent | |
class Config(object): | |
filename = None | |
def __init__(self, filename): | |
self.filename = filename | |
self.load() | |
def load(self): | |
self.config, self.ind, self.bsi = load_yaml_guess_indent(open(self.filename)) | |
def get(self, section, key, default): | |
try: | |
return self.config[section][key] | |
except: | |
None | |
return default | |
def set(self, section, key, value): | |
self.config[section][key] = value |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
battery: | |
# How often should we check the battery level (in seconds) | |
check_interval: 10 | |
# Charge threshold | |
charge_threshold: 100 | |
# Shut down after these many intervals discharging below the threshold | |
threshold_count: 180 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import re | |
import sys | |
import glob | |
from time import sleep | |
from threading import Thread | |
# ------------------------------------------------------------------------------ | |
# USB | |
# ------------------------------------------------------------------------------ | |
def find_devices(vendor_id = None, product_id = None): | |
""" | |
Looks for USB devices | |
optionally filtering by with the provided vendor and product IDs | |
""" | |
devices = [] | |
for dn in glob.glob('/sys/bus/usb/devices/*'): | |
try: | |
vid = int(open(os.path.join(dn, "idVendor" )).read().strip(), 16) | |
pid = int(open(os.path.join(dn, "idProduct")).read().strip(), 16) | |
if ((vendor_id is None) or (vid == vendor_id)) and ((product_id is None) or (pid == product_id)): | |
dns = glob.glob(os.path.join(dn, os.path.basename(dn) + "*")) | |
for sdn in dns: | |
for fn in glob.glob(os.path.join(sdn, "*")): | |
if re.search(r"\/ttyUSB[0-9]+$", fn): | |
devices.append(os.path.join("/dev", os.path.basename(fn))) | |
pass | |
pass | |
pass | |
pass | |
except ( ValueError, TypeError, AttributeError, OSError, IOError ): | |
pass | |
pass | |
return devices | |
# ------------------------------------------------------------------------------ | |
# TIMER | |
# ------------------------------------------------------------------------------ | |
class IntervalTimer(Thread): | |
def __init__(self, interval, callback, c_kwargs={}): | |
Thread.__init__(self) | |
self.interval = interval | |
self.callback = callback | |
self.c_kwargs = c_kwargs | |
self.daemon = True | |
self.running = True | |
self.start() | |
def stop(self): | |
self.running = False | |
def run(self): | |
while self.running: | |
self.callback(**self.c_kwargs) | |
sleep(self.interval) | |
# ------------------------------------------------------------------------------ | |
# EXCEPTIONS | |
# ------------------------------------------------------------------------------ | |
class ServiceExit(Exception): | |
""" | |
Custom exception which is used to trigger the clean exit | |
of all running threads and the main program. | |
""" | |
pass | |
def service_shutdown(signum, frame): | |
print('Caught signal %d' % signum) | |
raise ServiceExit | |
# ------------------------------------------------------------------------------ | |
# PID | |
# ------------------------------------------------------------------------------ | |
def write_pid_file(name): | |
pid = str(os.getpid()) | |
h = open("/tmp/%s.pid" % name, "w") | |
h.write(pid) | |
h.close() | |
# ------------------------------------------------------------------------------ | |
# DEBUG | |
# ------------------------------------------------------------------------------ | |
class Level(object): | |
CRITICAL = 0 | |
WARNING = 1 | |
INFO = 2 | |
DEBUG = 3 | |
BLOAT = 4 | |
VERBOSITY = Level.INFO | |
def printd(string, level): | |
if VERBOSITY >= level: | |
print(string) | |
def set_debug_level(lvl): | |
global VERBOSITY | |
VERBOSITY = lvl | |
# ------------------------------------------------------------------------------ | |
# OTHER | |
# ------------------------------------------------------------------------------ | |
def isRPi(): | |
return (os.uname()[4] == 'armv7l') | |
class Color(object): | |
BLACK = '\x1b[1;30m' | |
RED = '\x1b[1;31m' | |
GREEN = '\x1b[1;32m' | |
YELLOW = '\x1b[1;33m' | |
BLUE = '\x1b[1;34m' | |
MAGENTA = '\x1b[1;35m' | |
CYAN = '\x1b[1;36m' | |
WHITE = '\x1b[1;37m' | |
LIGHT_GREY = '\x1b[0;30m' | |
LIGHT_RED = '\x1b[0;31m' | |
LIGHT_GREEN = '\x1b[0;32m' | |
LIGHT_YELLOW = '\x1b[0;33m' | |
LIGHT_BLUE = '\x1b[0;34m' | |
LIGHT_MAGENTA = '\x1b[0;35m' | |
LIGHT_CYAN = '\x1b[0;36m' | |
LIGHT_WHITE = '\x1b[0;37m' | |
def clr(color, text): | |
return color + str(text) + '\x1b[0m' | |
def check_root(): | |
if not os.geteuid() == 0: | |
printd(clr(Color.RED, "Run as root."), Level.CRITICAL) | |
exit(1) | |
def hex_offset_to_string(byte_array): | |
temp = byte_array.replace("\n", "") | |
temp = temp.replace(" ", "") | |
return temp.decode("hex") | |
def mac_to_bytes(mac): | |
return ''.join(chr(int(x, 16)) for x in mac.split(':')) | |
def bytes_to_mac(byte_array): | |
return ':'.join("{:02x}".format(ord(byte)) for byte in byte_array) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment