Last active
July 1, 2025 08:10
-
-
Save lin-ycv/14a8b091ed5d16ed9d519fa3ebc0caf7 to your computer and use it in GitHub Desktop.
Python scripts to monitor Bambu lab printer print status over local MQTT (LAN), sends print info to a backend server when a new print starts, and allows the backender server to terminate a print if needed
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import time | |
import urllib.request | |
import ssl | |
import paho.mqtt.client as mqtt | |
# --- Shared utility --- | |
print_keys = {"gcode_file", "gcode_state", "mc_print_stage", "mc_remaining_time"} | |
endpoint = "https://....." | |
terminate_cmd = '{"print": {"sequence_id": "0", "command": "stop"}}' | |
def update(original, new): | |
for key, value in new.items(): | |
if key not in print_keys: | |
continue | |
if isinstance(value, dict) and isinstance(original.get(key), dict): | |
update(original[key], value) | |
else: | |
original[key] = value | |
# --- Class per printer --- | |
class PrinterWatcher: | |
def __init__(self, printer): | |
self.printer = printer | |
self.status = {"should_update": True} | |
self.client = mqtt.Client( | |
mqtt.CallbackAPIVersion.VERSION2, | |
None, | |
clean_session=True, | |
protocol=mqtt.MQTTv311, | |
transport='tcp' | |
) | |
self.client.username_pw_set('bblp', printer['code']) | |
self.client.tls_set(tls_version=ssl.PROTOCOL_TLS, cert_reqs=ssl.CERT_NONE) | |
self.client.on_message = self.on_message | |
self.client.on_connect = self.on_connect | |
def on_connect(self, client, userdata, flags, reason_code, properties=None): | |
print(f"[{self.printer['label']}] Connected: {reason_code}") | |
client.subscribe(f"device/{self.printer['serial']}/report") | |
def on_message(self, client, userdata, msg): | |
payload = msg.payload.decode('utf-8') | |
jobj = json.loads(payload) | |
if 'print' in jobj: | |
update(self.status, jobj['print']) | |
if all(key in self.status for key in print_keys): | |
if self.status["mc_print_stage"] == '1' or self.status["gcode_state"] in {"IDLE", "FAILED"}: | |
self.status["should_update"] = True | |
elif self.status["should_update"] and ( | |
self.status["mc_print_stage"] == '2' or | |
self.status["gcode_state"] in {"PREPARE", "RUNNING"} | |
): | |
self.status["should_update"] = False | |
#print(self.status) #DEBUG | |
stop_data = self.check_server_stop() | |
if stop_data and 'end' in stop_data: | |
print("! [",self.printer['label'],"] Stopping as per backend request !") | |
client.publish(f"device/{self.printer['serial']}/request", terminate_cmd) | |
def check_server_stop(self): | |
try: | |
file = self.status.get("gcode_file", "").lower() | |
for suffix in (".3mf", ".gcode"): | |
if file.endswith(suffix): | |
file = file.removesuffix(suffix) | |
print("[",self.printer['label'],"]", | |
"New print started:", file, | |
"[",self.status['mc_remaining_time'], "min ]") | |
ts = int(time.time()) | |
payload = { | |
'printer': self.printer['label'], | |
'file': file, | |
'start': str(ts), | |
'end': str(ts + self.status["mc_remaining_time"] * 60) | |
} | |
req = urllib.request.Request(endpoint, data=json.dumps(payload).encode(), method='POST') | |
with urllib.request.urlopen(req, timeout=10) as response: | |
return json.loads(response.read().decode()) | |
except Exception as e: | |
print(f"[{self.printer['label']}] Error: {e}") | |
self.status["should_update"] = True | |
return None | |
def run(self): | |
while True: | |
try: | |
self.client.connect(self.printer['ip'], 8883) | |
self.client.loop_forever() | |
except KeyboardInterrupt: | |
self.client.disconnect() | |
print(f"[{self.printer['label']}] Gracefully disconnected.") | |
break | |
except Exception as e: | |
print(f"[{self.printer['label']}] Exception: {e}") | |
self.client.disconnect() | |
time.sleep(30) | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from multiprocessing import Process | |
from bambu_print_watcher import PrinterWatcher | |
import os | |
import signal | |
printers = [ | |
{'label': "AnythingYouWant", 'code': 'REPLACE_ACCESS_CODE', 'serial': "REPLACE_SERIAL_NUMBER", 'ip': "192.168.X.XX"}, | |
{'label': "HumanIdentifiableName", 'code': 'REPLACE_ACCESS_CODE', 'serial': "REPLACE_SERIAL_NUMBER", 'ip': "192.168.X.XX"}, | |
..... | |
] | |
processes = [] | |
try: | |
for printer in printers: | |
watcher = PrinterWatcher(printer) | |
p = Process(target=watcher.run) | |
p.start() | |
processes.append(p) | |
for p in processes: | |
p.join() | |
except: | |
for p in processes: | |
os.kill(p.pid, signal.SIGINT) | |
p.join() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment