Skip to content

Instantly share code, notes, and snippets.

@lin-ycv
Last active July 1, 2025 08:10
Show Gist options
  • Save lin-ycv/14a8b091ed5d16ed9d519fa3ebc0caf7 to your computer and use it in GitHub Desktop.
Save lin-ycv/14a8b091ed5d16ed9d519fa3ebc0caf7 to your computer and use it in GitHub Desktop.
Python scripts to monitor Bambu lab printer print status over local MQTT (LAN), sends print info to a backend server when a new print starts, and allows the backender server to terminate a print if needed
import json
import time
import urllib.request
import ssl
import paho.mqtt.client as mqtt
# --- Shared utility ---
print_keys = {"gcode_file", "gcode_state", "mc_print_stage", "mc_remaining_time"}
endpoint = "https://....."
terminate_cmd = '{"print": {"sequence_id": "0", "command": "stop"}}'
def update(original, new):
for key, value in new.items():
if key not in print_keys:
continue
if isinstance(value, dict) and isinstance(original.get(key), dict):
update(original[key], value)
else:
original[key] = value
# --- Class per printer ---
class PrinterWatcher:
def __init__(self, printer):
self.printer = printer
self.status = {"should_update": True}
self.client = mqtt.Client(
mqtt.CallbackAPIVersion.VERSION2,
None,
clean_session=True,
protocol=mqtt.MQTTv311,
transport='tcp'
)
self.client.username_pw_set('bblp', printer['code'])
self.client.tls_set(tls_version=ssl.PROTOCOL_TLS, cert_reqs=ssl.CERT_NONE)
self.client.on_message = self.on_message
self.client.on_connect = self.on_connect
def on_connect(self, client, userdata, flags, reason_code, properties=None):
print(f"[{self.printer['label']}] Connected: {reason_code}")
client.subscribe(f"device/{self.printer['serial']}/report")
def on_message(self, client, userdata, msg):
payload = msg.payload.decode('utf-8')
jobj = json.loads(payload)
if 'print' in jobj:
update(self.status, jobj['print'])
if all(key in self.status for key in print_keys):
if self.status["mc_print_stage"] == '1' or self.status["gcode_state"] in {"IDLE", "FAILED"}:
self.status["should_update"] = True
elif self.status["should_update"] and (
self.status["mc_print_stage"] == '2' or
self.status["gcode_state"] in {"PREPARE", "RUNNING"}
):
self.status["should_update"] = False
#print(self.status) #DEBUG
stop_data = self.check_server_stop()
if stop_data and 'end' in stop_data:
print("! [",self.printer['label'],"] Stopping as per backend request !")
client.publish(f"device/{self.printer['serial']}/request", terminate_cmd)
def check_server_stop(self):
try:
file = self.status.get("gcode_file", "").lower()
for suffix in (".3mf", ".gcode"):
if file.endswith(suffix):
file = file.removesuffix(suffix)
print("[",self.printer['label'],"]",
"New print started:", file,
"[",self.status['mc_remaining_time'], "min ]")
ts = int(time.time())
payload = {
'printer': self.printer['label'],
'file': file,
'start': str(ts),
'end': str(ts + self.status["mc_remaining_time"] * 60)
}
req = urllib.request.Request(endpoint, data=json.dumps(payload).encode(), method='POST')
with urllib.request.urlopen(req, timeout=10) as response:
return json.loads(response.read().decode())
except Exception as e:
print(f"[{self.printer['label']}] Error: {e}")
self.status["should_update"] = True
return None
def run(self):
while True:
try:
self.client.connect(self.printer['ip'], 8883)
self.client.loop_forever()
except KeyboardInterrupt:
self.client.disconnect()
print(f"[{self.printer['label']}] Gracefully disconnected.")
break
except Exception as e:
print(f"[{self.printer['label']}] Exception: {e}")
self.client.disconnect()
time.sleep(30)
from multiprocessing import Process
from bambu_print_watcher import PrinterWatcher
import os
import signal
printers = [
{'label': "AnythingYouWant", 'code': 'REPLACE_ACCESS_CODE', 'serial': "REPLACE_SERIAL_NUMBER", 'ip': "192.168.X.XX"},
{'label': "HumanIdentifiableName", 'code': 'REPLACE_ACCESS_CODE', 'serial': "REPLACE_SERIAL_NUMBER", 'ip': "192.168.X.XX"},
.....
]
processes = []
try:
for printer in printers:
watcher = PrinterWatcher(printer)
p = Process(target=watcher.run)
p.start()
processes.append(p)
for p in processes:
p.join()
except:
for p in processes:
os.kill(p.pid, signal.SIGINT)
p.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment