Last active
March 23, 2018 10:31
-
-
Save mortie/099154ef2011fc806bb8ba0592f472b0 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
################### | |
# <Configuration> # | |
################### | |
# Run 'command' while the temperature is less than 'maxTemp'. | |
maxTemp = 80 # Celsius | |
# When the temperature reached 'maxTemp', wait until it reaches 'minTemp' | |
# before running 'command' again. | |
minTemp = 60 # Celsius | |
# What sensor should be used? | |
sensor = "coretemp-isa-0000/Core 0" | |
# A custom sensor reading command, in order to support sensors which | |
# don't show up in OpenHardwareMonitor (Windows) or lm_sensors (Linux). | |
# This should be an argv-style array of arguments, and the command should | |
# print a temperature in Celsius as the very last word of its output. | |
customSensor = None | |
# The command which should be run when the temperature is low. | |
# This should be an argv-style array of arguments. | |
command = [ "echo", "No command specified." ] | |
# The directory to run 'command' in. | |
directory = r"." | |
# The number of lines to output from 'command'. | |
outputLines = 30 | |
#################### | |
# </Configuration> # | |
#################### | |
import time | |
import os | |
import sys | |
from threading import Thread | |
from subprocess import Popen, PIPE, check_output | |
import re | |
import traceback | |
running = True | |
proc = None | |
procOutput = [] | |
def readProc(stream): | |
global running | |
global proc | |
global procOutput | |
while running: | |
if proc: | |
for line in getattr(proc, stream): | |
procOutput.append(line) | |
time.sleep(1) | |
readThreads = [ | |
Thread(target=readProc, args=["stdout"]), | |
Thread(target=readProc, args=["stderr"]), | |
] | |
class SensorReading(): | |
def __init__(self, name, ident, temp): | |
self.Name = name | |
self.Identifier = ident | |
self.Value = temp | |
self.SensorType = "Temperature" | |
if sys.platform == "win32": | |
import wmi | |
hwmon = wmi.WMI(namespace="root/OpenHardwareMonitor") | |
def readSensors(): | |
readings = [] | |
for sens in hwmon.sensor(): | |
if sens.SensorType == "Temperature": | |
readings.append(sens) | |
return readings | |
def clearScreen(): | |
os.system("cls") | |
else: | |
def readSensors(): | |
readings = [] | |
devname = "" | |
out = check_output(["sensors"]).decode("utf-8") | |
for line in out.split("\n"): | |
if line.startswith("Adapter:"): | |
continue | |
elif ":" in line: | |
[name, right] = re.split(": *", line) | |
ident = devname+"/"+name | |
temp = float(right.split("°")[0]) | |
readings.append(SensorReading(name, ident, temp)) | |
elif line != "": | |
devname = line | |
return readings | |
def clearScreen(): | |
os.system("clear") | |
def sensorTable(sensors, strs): | |
maxLen = 0 | |
currentSensor = None | |
for sens in sensors: | |
if sens.Identifier == sensor: | |
currentSensor = sens | |
l = len(sens.Name) + len(sens.Identifier) | |
if l > maxLen: | |
maxLen = l | |
for sens in sensors: | |
strs.append( | |
sens.Name+" ("+sens.Identifier+"):"+ | |
" "*(1 + maxLen - len(sens.Name+sens.Identifier))+ | |
str(sens.Value)+"°C") | |
return currentSensor | |
def sensors(): | |
global procOutput | |
print("...") | |
# Read sensors | |
currentSensor = None | |
strs = [] | |
sensors = readSensors() | |
sensors.sort(key=lambda s: s.Name) | |
if customSensor: | |
custom = check_output(customSensor).decode("utf-8").strip() | |
value = float(re.split(r"\s+", custom)[-1]) | |
reading = SensorReading("Custom Sensor", "custom", value) | |
sensors.append(reading) | |
currentSensor = sensorTable(sensors, strs) | |
# Current sensor | |
strs.append("") | |
if currentSensor: | |
strs.append("Current sensor: "+currentSensor.Name+" ("+currentSensor.Identifier+"):") | |
strs.append(str(currentSensor.Value)+"°C, range: "+str(minTemp)+"-"+str(maxTemp)) | |
else: | |
strs.append("No sensor named "+sensor+" found.") | |
# Process status | |
strs.append("") | |
printOutput = len(procOutput) != 0 | |
if proc == None: | |
printOutput = False | |
strs.append("Status: not running") | |
elif proc.poll() == None: | |
strs.append("Status: running") | |
else: | |
strs.append("Status: crashed") | |
# Process output | |
if printOutput: | |
if len(procOutput) > outputLines: | |
del procOutput[:len(procOutput) - outputLines] | |
strs.append("Process Output:") | |
clearScreen() | |
for s in strs: | |
print(s) | |
if printOutput: | |
for line in procOutput: | |
sys.stdout.buffer.write(line) | |
sys.stdout.buffer.flush() | |
if currentSensor: | |
return currentSensor.Value | |
else: | |
return None | |
os.chdir(directory) | |
def loop(): | |
global proc | |
global procOutput | |
for t in readThreads: t.start() | |
while True: | |
temp = sensors() | |
if proc: | |
if temp == None or temp >= maxTemp: | |
proc.kill() | |
proc = None | |
else: | |
if temp != None and temp < minTemp: | |
procOutput = [] | |
proc = Popen(command, stdout=PIPE, stderr=PIPE) | |
time.sleep(2) | |
try: | |
loop() | |
except: | |
exctype = sys.exc_info()[0] | |
if exctype != KeyboardInterrupt: | |
traceback.print_exc() | |
print("Stopping...") | |
running = False | |
if proc: | |
proc.kill() | |
sys.exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment