Skip to content

Instantly share code, notes, and snippets.

@YOUR-WORST-TACO
Last active March 24, 2025 01:33
Show Gist options
  • Save YOUR-WORST-TACO/1d2d86a286012d9dfcffc834f79ea88e to your computer and use it in GitHub Desktop.
Save YOUR-WORST-TACO/1d2d86a286012d9dfcffc834f79ea88e to your computer and use it in GitHub Desktop.
Fan Controller for PWM and Bambu X1C
#!/fancontrol/bin/python
import bambulabs_api as bl
import logging
import math
import os
import signal
import sys
import time
from cysystemd import journal
from dotenv import load_dotenv
from enum import Enum
from rpi_hardware_pwm import HardwarePWM
load_dotenv()
def clamp(n, min, max):
if n < min:
return min
elif n > max:
return max
else:
return n
class FanState(Enum):
OFF = 0
TARGET_HIGH = 1
TARGET_LOW = 2
COOLING = 3
CHAMBER = 4
class FanController:
def __init__(self):
self.logger = self.__init_logger()
self.printer_ip = os.getenv('IP')
self.printer_serial = os.getenv('SERIAL')
self.printer_access_code = os.getenv('ACCESS_CODE')
self.delay = os.getenv('DELAY') or 1
self.low_fan_speed = int(os.getenv('LOW_SPEED')) or 50
self.high_fan_speed = int(os.getenv('HIGH_SPEED')) or 100
if (self.printer_ip is None or self.printer_serial is None or self.printer_access_code is None):
self.logger.info('IP, SERIAL, or ACCESS_CODE not defined in .env')
sys.exit(0)
self.printer = bl.PrinterMQTTClient(self.printer_ip, self.printer_access_code, self.printer_serial)
self.pwm = HardwarePWM(pwm_channel=0, hz=25, chip=0)
self.pwm.change_frequency(25_000)
self.last_state = None
self.last_speed = None
signal.signal(signal.SIGTERM, self._handle_sigterm)
def __init_logger(self):
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()
logger.addHandler(journal.JournaldLogHandler())
return logger
def _handle_sigterm(self, sig, frame):
self.logger.warning('Keyboard interrupt (SIGTERM) received...')
self.stop()
def start(self):
try:
self.pwm.start(0)
self.logger.info('Connecting to printer...')
self.printer.connect()
self.printer.start()
time.sleep(2)
while True:
self.handlePrinterTemp()
time.sleep(self.delay)
except KeyboardInterrupt:
self.logger.warning('Keyboard interrupt (SIGINT) received...')
self.stop()
def stop(self):
self.logger.info('Disconnecting from printer...')
self.printer.stop()
self.logger.info('Resetting fan...')
self.pwm.stop()
sys.exit(0)
def handlePrinterTemp(self):
bed_temp = self.printer.get_bed_temperature()
bed_temp_target = self.printer.get_bed_temperature_target()
chamber_fan = self.printer.get_chamber_fan_speed()
if chamber_fan > 0:
chamber_fan_percentile = math.floor((chamber_fan / 255) * 100)
exhaust_fan_speed = clamp(chamber_fan_percentile, self.low_fan_speed, self.high_fan_speed)
if self.last_speed == chamber_fan_percentile and self.last_state == FanState.CHAMBER:
return
self.logger.info(f'Detected chamber fan at {chamber_fan_percentile}%, Exhaust fan set to {exhaust_fan_speed}.')
self.pwm.change_duty_cycle(exhaust_fan_speed)
self.last_speed = chamber_fan_percentile
self.last_state = FanState.CHAMBER
return
if bed_temp_target > 0:
if bed_temp_target >= 90:
if self.last_state == FanState.TARGET_HIGH:
return
self.logger.info(f'Bed temp target ({bed_temp_target}) is >= 90, Exhaust fan set to {self.high_fan_speed}%')
self.pwm.change_duty_cycle(self.high_fan_speed)
self.last_state = FanState.TARGET_HIGH
return
if self.last_state == FanState.TARGET_LOW:
return
self.logger.info(f'Bed temp target ({bed_temp_target}) is < 90, Exhaust fan set to {self.low_fan_speed}%')
self.pwm.change_duty_cycle(self.low_fan_speed)
self.last_state = FanState.TARGET_LOW
return
if bed_temp >= 40:
if self.last_state == FanState.COOLING:
return
self.logger.info(f'Printer is cooling down, bed ({bed_temp}), Exhaust fan set to {self.high_fan_speed}%')
self.pwm.change_duty_cycle(self.high_fan_speed)
self.last_state = FanState.COOLING
return
if self.last_state == FanState.OFF:
return
self.logger.info('Printer is idle and cool, turning off fan.')
self.pwm.change_duty_cycle(0)
self.last_state = FanState.OFF
return
if __name__ == '__main__':
service = FanController()
service.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment