Last active
March 9, 2025 12:07
-
-
Save DaniAsh551/83e2dec775cfba1b9d55263e3e5ba034 to your computer and use it in GitHub Desktop.
Automatic governor switching in linux userspace based on mouse and keyboard activity.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import asyncio | |
import glob | |
import os | |
from time import time | |
import signal | |
# Configuration parameters, read from environment variables with defaults. | |
IDLE_THRESHOLD = int(os.environ.get("IDLE_THRESHOLD", "30")) # Time in seconds before considering the system idle. | |
ACTIVE_GOVERNOR = os.environ.get("ACTIVE_GOVERNOR", "performance") # CPU governor to use when active. | |
IDLE_GOVERNOR = os.environ.get("IDLE_GOVERNOR", "powersave") # CPU governor to use when idle. | |
# Check if the specified CPU governors are available on the system. | |
try: | |
with open("/sys/devices/system/cpu/cpu0/cpufreq/scaling_available_governors", "r") as f: | |
available_governors = f.readline().strip().split(" ") | |
except FileNotFoundError: | |
print("Error: /sys/devices/system/cpu/cpu0/cpufreq/scaling_available_governors not found. Please ensure cpufreq is enabled.") | |
exit(1) | |
if ACTIVE_GOVERNOR not in available_governors: | |
print(f"Error: ACTIVE_GOVERNOR '{ACTIVE_GOVERNOR}' not available in: {available_governors}") | |
exit(1) | |
if IDLE_GOVERNOR not in available_governors: | |
print(f"Error: IDLE_GOVERNOR '{IDLE_GOVERNOR}' not available in: {available_governors}") | |
exit(1) | |
# Global variable to store the timestamp of the last input event. | |
global last_event_time | |
last_event_time = 0 | |
def is_active(): | |
"""Checks if the system is currently active based on the last input event.""" | |
now = int(time()) | |
return now - last_event_time <= IDLE_THRESHOLD | |
def get_governor(): | |
"""Reads the current CPU governor from the system.""" | |
try: | |
with open("/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor", "r") as f: | |
return f.readline().strip() #Remove trailing whitespace/newline. | |
except FileNotFoundError: | |
print("Error: /sys/devices/system/cpu/cpu0/cpufreq/scaling_governor not found. Please ensure cpufreq is enabled.") | |
return None # Return None if error. | |
def set_governor(governor: str): | |
"""Sets the CPU governor for all available CPUs.""" | |
for cpu_path in glob.glob("/sys/devices/system/cpu/cpu*/cpufreq/scaling_governor"): | |
try: | |
with open(cpu_path, "w") as f: | |
f.write(governor) | |
except IOError as e: | |
print(f"Error setting governor for {cpu_path}: {e}") | |
def sync_governor(): | |
"""Synchronizes the CPU governor based on system activity.""" | |
current_governor = get_governor() | |
if current_governor is None: | |
return; #Exit if cannot get governor. | |
if is_active() and current_governor != ACTIVE_GOVERNOR: | |
print(f"Activity detected, setting gov: '{ACTIVE_GOVERNOR}'") | |
set_governor(ACTIVE_GOVERNOR) | |
elif not is_active() and current_governor != IDLE_GOVERNOR: | |
print(f"Idle detected, setting gov: '{IDLE_GOVERNOR}'") | |
set_governor(IDLE_GOVERNOR) | |
async def auto_governor(): | |
"""Coroutine that periodically checks and updates the CPU governor.""" | |
while True: | |
await asyncio.get_event_loop().run_in_executor(None, sync_governor) # Use executor to avoid blocking. | |
await asyncio.sleep(1) | |
def discover_inputs(): | |
"""Discovers input devices (keyboards and mice).""" | |
inputs = glob.glob("/dev/input/by-path/*kbd*") | |
inputs.append("/dev/input/mice") # Add mice explicitly. | |
return inputs | |
async def watch_input(input_path: str): | |
"""Coroutine that monitors a specific input device for events.""" | |
try: | |
with open(input_path, "rb") as fh: | |
while True: | |
buf = await asyncio.get_event_loop().run_in_executor(None, fh.read, 3) # Use executor to avoid blocking. | |
if buf: # Check if data was read. | |
global last_event_time | |
last_event_time = int(time()) | |
else: | |
await asyncio.sleep(0.1) # Avoid busy loop if nothing to read. | |
except FileNotFoundError: | |
print(f"{input_path} device not found.") | |
except Exception as e: # Catch other errors | |
print(f"Error monitoring {input_path}: {e}") | |
async def main(): | |
"""Main coroutine that orchestrates input monitoring and governor management.""" | |
inputs = discover_inputs() | |
if inputs: | |
tasks = [watch_input(path) for path in inputs] | |
tasks.append(auto_governor()) | |
def signal_handler(signal, frame): | |
"""Handles Ctrl+C signal to gracefully cancel tasks.""" | |
print("Ctrl+C detected. Cancelling tasks...") | |
for task in asyncio.all_tasks(): | |
task.cancel() | |
signal.signal(signal.SIGINT, signal_handler) | |
await asyncio.gather(*tasks) | |
else: | |
print("No input devices found.") | |
if __name__ == "__main__": | |
loop = asyncio.get_event_loop() | |
try: | |
loop.run_until_complete(main()) | |
except asyncio.CancelledError: | |
print("Main task cancelled.") | |
finally: | |
loop.close() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[Unit] | |
Description=Background activity_gov.py task | |
After=network.target | |
[Service] | |
Type=simple | |
ExecStart=/usr/local/bin/activity_gov.py | |
Restart=on-failure | |
RestartSec=5 | |
[Install] | |
WantedBy=multi-user.target |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Save
activity_gov.py
to/usr/local/bin/activity_gov.py
Make the file executable:
Add to system startup for best results.
You can add the given systemd unit file to
/etc/systemd/system/activity_gov.service
And do:
If you prefer not to use the systemd unit file, with crontab you can do: