Skip to content

Instantly share code, notes, and snippets.

@mwvent
Last active April 15, 2025 14:31
Show Gist options
  • Save mwvent/920029460d3854ce667eca6cae7df521 to your computer and use it in GitHub Desktop.
Save mwvent/920029460d3854ce667eca6cae7df521 to your computer and use it in GitHub Desktop.
Grid cctv viewer/CCTV wall player with no black borders on cams (scaled to grid space) and cameras can cover variable amount of rows/cols - handy for cams that output double height/width streams. Requires mpv.py from https://github.com/jaseg/python-mpv and Gtk / mpv
#!/usr/bin/env python3
import time
import os
import sys
import locale
import mpv
import gi
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk, GLib, Gdk
import logging
import cairo
import asyncio
import threading
import json
logging.basicConfig(level=logging.INFO)
class Camera() :
def __init__(self, config, parent) :
self._config = config
self._parent = parent
self._connected = False
self._started = False
self._drawing_area = None
required_keys = ["name", "url", "col", "row"]
for key in required_keys:
if key not in self._config:
logging.error(f"Missing a required key '{key}' in camera config.")
if "textprovider" in self._config :
self._textprovider = CamTextProvider(self, self._config["textprovider"])
GLib.timeout_add_seconds(1, self.poll)
@property
def name(self) :
return self._config["name"]
@property
def url(self) :
return self._config["url"]
@property
def col(self) :
return self._config["col"]
@property
def row(self) :
return self._config["row"]
@property
def cols(self) :
if not "cols" in self._config : return 1
return self._config["cols"]
@property
def rows(self) :
if not "rows" in self._config : return 1
return self._config["rows"]
@property
def stall_threshold(self) :
if "stall_threshold" in self._config :
return self._config["stall_threshold"]
else :
return 9
@property
def audio(self) :
if "audio" in self._config :
cfg = self._config["audio"]
if cfg == False or cfg == True :
return cfg
elif cfg.lower() == "yes" or cfg.lower() == "no" :
return cfg.lower() == "yes"
elif cfg.lower() == "true" or cfg.lower() == "false" :
return cfg.lower() == "true"
else :
logging.error(f"Audio option for {self.name} invalid : {cfg}")
return False
return self._config["stall_threshold"]
else :
return False
@property
def drawing_area(self) :
if not self._drawing_area :
logging.info(f"Creating drawing area for {self.name}")
self._drawing_area = Gtk.DrawingArea()
parentCols = self._parent.usedCols - 1 if self._parent.usedCols > 1 else 1
parentRows = self._parent.usedRows - 1 if self._parent.usedRows > 1 else 1
x = int(self._parent.winX / parentCols) * self.cols
y = int(self._parent.winY / parentRows ) * self.rows
self._drawing_area.set_size_request(x, y)
self._drawing_area.show()
self._drawing_area.connect("realize", self.on_drawing_area_realize)
self._drawing_area.connect("draw", self.on_draw)
return self._drawing_area
@property
def player(self) :
return self._player
@property
def connected(self) :
return self._connected
def attachToGrid(self, grid) :
grid.attach(self.drawing_area, self.col, self.row, self.cols, self.rows)
def on_drawing_area_realize(self, drawing_area):
logging.info("Realise " + self.name)
self._player = mpv.MPV(wid=str(drawing_area.get_property("window").get_xid()))
# TODO some of these propties should be overridable in config
self._player['cache'] = 'no'
self._player['cache-secs'] = 0
self._player['demuxer-readahead-secs'] = 0
self._player['untimed'] = True
self._player['cache-pause'] = "no"
self._player['demuxer-max-bytes'] = "1M"
self._player['keepaspect'] = "no"
self._player['vo'] = "gpu"
self._player['framedrop'] = "decoder"
if self.audio == False :
self._player['audio'] = "no"
self._player['title'] = self.name
self._player.loop = True
self._player.observe_property('time-pos', self.on_time_pos_change)
self._started = False
self._connected = False
def on_draw(self, drawing_area, cr):
if self._connected:
# allow mpv to draw a frame
pass
else:
# mpv not connected to stream - draw disconnected instead
width = self._drawing_area.get_allocated_width()
height = self._drawing_area.get_allocated_height()
cr.set_source_rgb(0, 0, 0)
cr.rectangle(0, 0, width, height)
cr.fill()
cr.set_source_rgb(1, 1, 1)
cr.set_font_size(30)
cr.move_to(width / 2 - 100, height / 2 - 15)
cr.show_text("Disconnected")
_poll_lastcheck= 0
_poll_latest = 0
_poll_stall_count = 0
def on_time_pos_change(self, name, value):
self._poll_latest = value if value is not None else 0
def poll(self) :
# Start mpv if not running
if not self._started :
self._poll_lastcheck= 0
self._poll_latest = 0
self._poll_stall_count = 0
self._connected = False
try :
logging.info(f"{self.name} begin playing {self.url}")
self._player.play(self.url)
self._started = True
except Exception as e:
logging.error(f"Error starting stream for {self.name}: {e}")
finally :
return True
if self._poll_latest > self._poll_lastcheck :
# no stall
self._poll_stall_count = 0
self._connected = True
self._poll_lastcheck = self._poll_latest
return True
# time has not increased - possible stall
self._poll_stall_count += 1
logging.warning(f"{self.name} {self._poll_latest} {self._poll_lastcheck} StallCounter: {self._poll_stall_count}")
# stall thresh reached
if self._poll_stall_count > self.stall_threshold :
logging.error(f"Stalled - Restarting stream for {self.name}, last_check: {self._poll_lastcheck}, latest: {self._poll_latest}")
self._connected = False
self._started = False
try :
self._player.stop()
except Exception as e:
logging.warn(f"Error stopping stream for {self.name}: {e}")
return True
class Clock(Camera) :
def __init__(self, config, parent) :
self._config = config
self._parent = parent
self._connected = True
self._started = True
self._drawing_area = None
required_keys = ["col", "row"]
for key in required_keys:
if key not in self._config:
logging.error(f"Missing a required key '{key}' in camera config.")
if "textprovider" in self._config :
logging.error(f"textprovider is not valid in clock config.")
self._label = Gtk.Label()
def attachToGrid(self, grid) :
self._label.set_hexpand(False)
self._label.set_halign(Gtk.Align.FILL)
self._label.set_xalign(0.5)
self._label.set_yalign(0.5)
self._label.set_name("time_label")
self._label.set_ellipsize(3)
self._label.show()
self._label.get_style_context().add_provider(self._parent.css_provider, Gtk.STYLE_PROVIDER_PRIORITY_APPLICATION)
GLib.timeout_add_seconds(1, self.poll)
grid.attach(self._label, self.col, self.row, self.cols, self.rows)
#self.grid.attach(self.label, 3, 1, 1, 1)
@property
def name(self) :
return "clock"
@property
def url(self) :
return "clock"
def poll(self) :
current_time = time.strftime("%H:%M")
self._label.set_text(current_time)
return True
class CamTextProvider():
def __init__(self, cam, config) :
self._cam = cam
self._config = config
self._engine = None
self._text = ""
if "type" in self._config :
if self._config["type"] == "moonraker" :
self._engine = TextProvider_moonraker(self, cam, config)
@property
def text(self) :
if self._engine is None :
return ""
return self._text
@text.setter
def text(self, newtext) :
self._text = newtext
self.sendTextToMpv()
def sendTextToMpv(self) :
if self._text == "" : return
# print(self._text, self._cam.player, self._cam.connected)
if self._cam.player is not None and self._cam.connected :
self._cam.player.show_text(self._text, "1500")
class TextProvider_moonraker(CamTextProvider):
def __init__(self, parent, cam, config) :
try :
import websockets
except Exception as e:
logging.error(f"Cannot load websocket - moonraker text cannot be shown")
return False
self._parent = parent
self._cam = cam
self._config = config
required_keys = ["type", "url", "retry_delay", "timeout"]
for key in required_keys:
if key not in config:
logging.error(f"Missing required key '{key}' in Moonraker config for {cam.name}.")
return False
self.printer_state = {
"extruder_temp": -1, "state": "unknown", "heater_bed_temp": -1, "is_active": False, "progress": 0
}
self.start_websocket()
async def printer_websocket(self):
try :
import websockets
except Exception as e:
logging.error(f"Cannot load websocket - moonraker text cannot be shown")
return False
uri = self._config["url"]
retry_delay = self._config["retry_delay"]
timeout = self._config["timeout"]
while True:
try:
async with websockets.connect(uri) as websocket:
logging.info("WebSocket connection established.")
subscribe_message = {
"jsonrpc": "2.0",
"method": "printer.objects.subscribe",
"params": {
"objects": {
"virtual_sdcard": ["progress", "is_active"],
"idle_timeout": ["state", "printing_time"],
"heater_bed": ["temperature", "target"],
"extruder": ["temperature", "target", "can_extrude"],
}
},
"id": 123,
}
logging.info(f"Sending subscribe message: {json.dumps(subscribe_message)}")
await websocket.send(json.dumps(subscribe_message))
start_time = asyncio.get_event_loop().time()
initial_state_received = False
while not initial_state_received:
message = await asyncio.wait_for(websocket.recv(), timeout=timeout)
logging.info(f"Initial message: {message}")
try:
data = json.loads(message)
if 'result' in data and 'status' in data['result']:
self.process_printer_data(data['result']['status'])
initial_state_received = True
elif 'method' in data and data['method'] == 'printer.objects.updated' and 'params' in data and 'status' in data['params']:
self.process_printer_data(data['params']['status'])
initial_state_received = True
elif 'error' in data :
logging.error(f"Error from Moonraker: {data['error']}")
if "method not found" in str(data['error']).lower():
logging.error("Method not found error, exiting")
return #exit if method not found
else:
logging.error("Mobileraker or other error, attempting resubscription")
await asyncio.sleep(retry_delay)
break
except json.JSONDecodeError as e:
logging.error(f"JSON decode error: {e}, message: {message}")
await asyncio.sleep(retry_delay)
break
if (asyncio.get_event_loop().time() - start_time) > timeout:
logging.error("Timeout receiving initial printer status")
await asyncio.sleep(retry_delay)
break
if not initial_state_received:
continue #restart the connection loop.
while True:
message = await websocket.recv()
try:
data = json.loads(message)
except json.JSONDecodeError as e:
logging.error(f"JSON decode error: {e}, message: {message}")
continue
if 'params' in data and 'status' in data['params']:
self.process_printer_data(data['params']['status'])
elif 'params' in data and 'method' in data and data['method'] == 'notify_status_update':
self.process_notify_status_update(data['params'][0])
except websockets.exceptions.ConnectionClosedError as e:
logging.error(f"WebSocket connection closed: {e}")
await asyncio.sleep(retry_delay)
continue
except Exception as e:
logging.error(f"WebSocket error: {e}")
await asyncio.sleep(retry_delay)
continue
finally:
logging.info("WebSocket connection closed or resubscribing.")
def process_notify_status_update(self, update_data):
try:
if "heater_bed" in update_data and "temperature" in update_data["heater_bed"]:
self.printer_state["heater_bed_temp"] = update_data["heater_bed"]["temperature"]
if "extruder" in update_data and "temperature" in update_data["extruder"]:
self.printer_state["extruder_temp"] = update_data["extruder"]["temperature"]
if "virtual_sdcard" in update_data:
if "progress" in update_data["virtual_sdcard"]:
self.printer_state["progress"] = f"{update_data['virtual_sdcard']['progress'] * 100:.2f}%"
if "is_active" in update_data["virtual_sdcard"]:
self.printer_state["is_active"] = update_data["virtual_sdcard"]["is_active"]
if "idle_timeout" in update_data and "state" in update_data["idle_timeout"]:
self.printer_state["state"] = update_data["idle_timeout"]["state"]
self.update_printer_cam_text()
except KeyError as e:
logging.error(f"KeyError in process_notify_status_update: {e}")
except Exception as e:
logging.error(f"Error in process_notify_status_update: {e}")
def update_printer_cam_text(self):
try:
state = self.printer_state['state']
extruder_temp = f"{self.printer_state['extruder_temp']:.1f}C"
heater_bed_temp = f"{self.printer_state['heater_bed_temp']:.1f}C"
sd_active = self.printer_state['is_active']
sd_progress = self.printer_state['progress']
status_lines = [
f"{state}",
f"Ext: {extruder_temp}",
f"Bed: {heater_bed_temp}"
]
if sd_active:
status_lines.append(f"SD: {sd_progress}")
status = " ".join(status_lines)
self._parent.text = status
except KeyError as e:
logging.error(f"KeyError in update_printer_cam_text: {e}")
except Exception as e:
logging.error(f"Error in update_printer_cam_text: {e}")
def process_printer_data(self, printer_data):
try:
if "idle_timeout" in printer_data and "state" in printer_data["idle_timeout"]:
self.printer_state["state"] = printer_data["idle_timeout"]["state"]
if "heater_bed" in printer_data and "temperature" in printer_data["heater_bed"]:
self.printer_state["heater_bed_temp"] = printer_data["heater_bed"]["temperature"]
if "extruder" in printer_data and "temperature" in printer_data["extruder"]:
self.printer_state["extruder_temp"] = printer_data["extruder"]["temperature"]
if "virtual_sdcard" in printer_data:
if "is_active" in printer_data["virtual_sdcard"]:
self.printer_state["is_active"] = printer_data["virtual_sdcard"]["is_active"]
if "progress" in printer_data["virtual_sdcard"]:
self.printer_state["progress"] = f"{printer_data['virtual_sdcard']['progress'] * 100:.2f}%"
self.update_printer_cam_text()
except KeyError as e:
logging.error(f"KeyError processing printer data: {e}")
except Exception as e:
logging.error(f"Error processing printer data: {e}")
def start_websocket(self):
self.websocket_thread = threading.Thread(target=self.run_websocket_loop)
self.websocket_thread.daemon = True
self.websocket_thread.start()
def run_websocket_loop(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self.printer_websocket())
loop.close()
class MainClass(Gtk.Window):
usedCols = 0
usedRows = 0
def __init__(self):
super(MainClass, self).__init__()
if not self.load_config():
logging.error("Config load failed, exiting.")
sys.exit(1)
self.fullscreen()
self.connect("realize", self.on_realize)
self.show_all()
def load_config(self):
config_path = "config.json"
if not os.path.exists(config_path):
logging.error(f"Config file not found: {config_path}")
return False
try:
with open(config_path, "r") as f:
config = json.load(f)
self._cams_config = config.get("cameras", []) #default to empty list.
self.cams = []
for camconfig in self._cams_config:
try :
if "url" not in camconfig:
logging.error(f"ERROR: Cam has missing url")
continue
if camconfig["url"] == "clock":
self.cams.append(Clock(camconfig, self))
else:
newCamera = Camera(camconfig, self)
self.usedRows = max(self.usedRows, newCamera.row + newCamera.rows)
self.usedCols = max(self.usedCols, newCamera.col + newCamera.cols)
self.cams.append(newCamera)
except KeyError as e:
logging.error(f"KeyError in cam config: {e}")
continue #skip this cam.
return True
except (FileNotFoundError, json.JSONDecodeError) as e:
logging.error(f"Error loading config: {e}")
return False
_winX = None
_winY = None
def _init_winXY(self) :
screen = Gdk.Screen.get_default()
num_monitors = screen.get_n_monitors()
if num_monitors > 0:
monitor_num = 0
geometry = screen.get_monitor_geometry(monitor_num)
winX = geometry.width
winY = geometry.height
else:
winX = 1920
winY = 1080
self._winX = winX
self._winY = winY
@property
def winX(self) :
if self._winX is None :
self._init_winXY()
return self._winX
@property
def winY(self) :
if self._winY is None :
self._init_winXY()
return self._winY
_css_provider = None
@property
def css_provider(self) :
if self._css_provider is None :
self._css_provider = Gtk.CssProvider()
gridCols = self.usedCols - 1
gridCols = gridCols if gridCols > 0 else 1
fontsz = int((self.winX / gridCols) / gridCols)
css = """
GtkWindow {
background-color: black;
}
GtkGrid {
background-color: black;
}
#time_label {
color: white;
font-size: """ + str(fontsz) + """px;
min-width: 100px;
font-family: monospace;
}
"""
css = css.encode('utf-8')
self._css_provider.load_from_data(css)
self.get_style_context().add_provider(self._css_provider, Gtk.STYLE_PROVIDER_PRIORITY_APPLICATION)
return self._css_provider
_grid = None
@property
def grid(self) :
if self._grid is None :
self._grid = Gtk.Grid()
self._grid.get_style_context().add_provider(self.css_provider, Gtk.STYLE_PROVIDER_PRIORITY_APPLICATION)
return self._grid
def on_realize(self, widget):
logging.info("Realise main")
self.set_app_paintable(True)
self.set_default_size(self.winX, self.winY)
self.connect("destroy", self.on_destroy)
self.grid.show()
for cam in self.cams:
cam.attachToGrid(self.grid)
self.add(self.grid)
def on_destroy(self, widget, data=None):
Gtk.main_quit()
if __name__ == '__main__':
locale.setlocale(locale.LC_NUMERIC, 'C')
application = MainClass()
Gtk.main()
{
"cameras": [
{
"name": "drive",
"url": "rtsp:/XXXX:554/user=XXXX_password=XXXX_channel=0_stream=1&onvif=0.sdp?real_stream",
"col": 1,
"row": 1,
"cols": 1,
"rows": 3,
"stall_threshold": 9
},
{
"name": "door",
"url": "rtsp:/XXXX:XXXX@XXXX:554/stream1",
"col": 2,
"row": 1,
"stall_threshold": 9
},
{
"name": "printer1",
"url": "rtsp://XXXX:XXXX@XXXX:554/user=XXXX_password=XXXX_channel=1_stream=1.sdp?real_stream",
"col": 3,
"row": 1,
"stall_threshold": 9
},
{
"name": "clock",
"url": "clock",
"col": 2,
"row": 2
},
{
"name": "roadl",
"url": "rtsp://XXXXXXXXu@XXXX:554/realmonitor?channel=0&stream=1.sdp",
"col": 2,
"row": 3,
"stall_threshold": 9
},
{
"name": "roadr",
"url": "rtsp://XXXX:XXXX@XXXX:554/realmonitor?channel=1&stream=1.sdp",
"col": 3,
"row": 3,
"stall_threshold": 9
},
{
"name": "printer2",
"url": "http:/XXXX:8080/stream",
"col": 3,
"row": 2,
"stall_threshold": 9,
"textprovider": {
"type": "moonraker",
"url": "ws://XXXX:7125/websocket",
"retry_delay": 5,
"timeout": 10
}
}
]
}
@mwvent
Copy link
Author

mwvent commented Mar 29, 2025

Also text provider classes can be added to overlay text on the stream. The example one connects to a 3d printer running klipper / moonraker and overlays some stats on the image. A simple clock class is also added to use in one of the grid spaces and can be invoked by setting the cam url to 'clock'

@mwvent
Copy link
Author

mwvent commented Mar 29, 2025

20250329_205332

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment