Last active
April 15, 2025 14:31
-
-
Save mwvent/920029460d3854ce667eca6cae7df521 to your computer and use it in GitHub Desktop.
Grid cctv viewer/CCTV wall player with no black borders on cams (scaled to grid space) and cameras can cover variable amount of rows/cols - handy for cams that output double height/width streams. Requires mpv.py from https://github.com/jaseg/python-mpv and Gtk / mpv
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import time | |
import os | |
import sys | |
import locale | |
import mpv | |
import gi | |
gi.require_version('Gtk', '3.0') | |
from gi.repository import Gtk, GLib, Gdk | |
import logging | |
import cairo | |
import asyncio | |
import threading | |
import json | |
logging.basicConfig(level=logging.INFO) | |
class Camera() : | |
def __init__(self, config, parent) : | |
self._config = config | |
self._parent = parent | |
self._connected = False | |
self._started = False | |
self._drawing_area = None | |
required_keys = ["name", "url", "col", "row"] | |
for key in required_keys: | |
if key not in self._config: | |
logging.error(f"Missing a required key '{key}' in camera config.") | |
if "textprovider" in self._config : | |
self._textprovider = CamTextProvider(self, self._config["textprovider"]) | |
GLib.timeout_add_seconds(1, self.poll) | |
@property | |
def name(self) : | |
return self._config["name"] | |
@property | |
def url(self) : | |
return self._config["url"] | |
@property | |
def col(self) : | |
return self._config["col"] | |
@property | |
def row(self) : | |
return self._config["row"] | |
@property | |
def cols(self) : | |
if not "cols" in self._config : return 1 | |
return self._config["cols"] | |
@property | |
def rows(self) : | |
if not "rows" in self._config : return 1 | |
return self._config["rows"] | |
@property | |
def stall_threshold(self) : | |
if "stall_threshold" in self._config : | |
return self._config["stall_threshold"] | |
else : | |
return 9 | |
@property | |
def audio(self) : | |
if "audio" in self._config : | |
cfg = self._config["audio"] | |
if cfg == False or cfg == True : | |
return cfg | |
elif cfg.lower() == "yes" or cfg.lower() == "no" : | |
return cfg.lower() == "yes" | |
elif cfg.lower() == "true" or cfg.lower() == "false" : | |
return cfg.lower() == "true" | |
else : | |
logging.error(f"Audio option for {self.name} invalid : {cfg}") | |
return False | |
return self._config["stall_threshold"] | |
else : | |
return False | |
@property | |
def drawing_area(self) : | |
if not self._drawing_area : | |
logging.info(f"Creating drawing area for {self.name}") | |
self._drawing_area = Gtk.DrawingArea() | |
parentCols = self._parent.usedCols - 1 if self._parent.usedCols > 1 else 1 | |
parentRows = self._parent.usedRows - 1 if self._parent.usedRows > 1 else 1 | |
x = int(self._parent.winX / parentCols) * self.cols | |
y = int(self._parent.winY / parentRows ) * self.rows | |
self._drawing_area.set_size_request(x, y) | |
self._drawing_area.show() | |
self._drawing_area.connect("realize", self.on_drawing_area_realize) | |
self._drawing_area.connect("draw", self.on_draw) | |
return self._drawing_area | |
@property | |
def player(self) : | |
return self._player | |
@property | |
def connected(self) : | |
return self._connected | |
def attachToGrid(self, grid) : | |
grid.attach(self.drawing_area, self.col, self.row, self.cols, self.rows) | |
def on_drawing_area_realize(self, drawing_area): | |
logging.info("Realise " + self.name) | |
self._player = mpv.MPV(wid=str(drawing_area.get_property("window").get_xid())) | |
# TODO some of these propties should be overridable in config | |
self._player['cache'] = 'no' | |
self._player['cache-secs'] = 0 | |
self._player['demuxer-readahead-secs'] = 0 | |
self._player['untimed'] = True | |
self._player['cache-pause'] = "no" | |
self._player['demuxer-max-bytes'] = "1M" | |
self._player['keepaspect'] = "no" | |
self._player['vo'] = "gpu" | |
self._player['framedrop'] = "decoder" | |
if self.audio == False : | |
self._player['audio'] = "no" | |
self._player['title'] = self.name | |
self._player.loop = True | |
self._player.observe_property('time-pos', self.on_time_pos_change) | |
self._started = False | |
self._connected = False | |
def on_draw(self, drawing_area, cr): | |
if self._connected: | |
# allow mpv to draw a frame | |
pass | |
else: | |
# mpv not connected to stream - draw disconnected instead | |
width = self._drawing_area.get_allocated_width() | |
height = self._drawing_area.get_allocated_height() | |
cr.set_source_rgb(0, 0, 0) | |
cr.rectangle(0, 0, width, height) | |
cr.fill() | |
cr.set_source_rgb(1, 1, 1) | |
cr.set_font_size(30) | |
cr.move_to(width / 2 - 100, height / 2 - 15) | |
cr.show_text("Disconnected") | |
_poll_lastcheck= 0 | |
_poll_latest = 0 | |
_poll_stall_count = 0 | |
def on_time_pos_change(self, name, value): | |
self._poll_latest = value if value is not None else 0 | |
def poll(self) : | |
# Start mpv if not running | |
if not self._started : | |
self._poll_lastcheck= 0 | |
self._poll_latest = 0 | |
self._poll_stall_count = 0 | |
self._connected = False | |
try : | |
logging.info(f"{self.name} begin playing {self.url}") | |
self._player.play(self.url) | |
self._started = True | |
except Exception as e: | |
logging.error(f"Error starting stream for {self.name}: {e}") | |
finally : | |
return True | |
if self._poll_latest > self._poll_lastcheck : | |
# no stall | |
self._poll_stall_count = 0 | |
self._connected = True | |
self._poll_lastcheck = self._poll_latest | |
return True | |
# time has not increased - possible stall | |
self._poll_stall_count += 1 | |
logging.warning(f"{self.name} {self._poll_latest} {self._poll_lastcheck} StallCounter: {self._poll_stall_count}") | |
# stall thresh reached | |
if self._poll_stall_count > self.stall_threshold : | |
logging.error(f"Stalled - Restarting stream for {self.name}, last_check: {self._poll_lastcheck}, latest: {self._poll_latest}") | |
self._connected = False | |
self._started = False | |
try : | |
self._player.stop() | |
except Exception as e: | |
logging.warn(f"Error stopping stream for {self.name}: {e}") | |
return True | |
class Clock(Camera) : | |
def __init__(self, config, parent) : | |
self._config = config | |
self._parent = parent | |
self._connected = True | |
self._started = True | |
self._drawing_area = None | |
required_keys = ["col", "row"] | |
for key in required_keys: | |
if key not in self._config: | |
logging.error(f"Missing a required key '{key}' in camera config.") | |
if "textprovider" in self._config : | |
logging.error(f"textprovider is not valid in clock config.") | |
self._label = Gtk.Label() | |
def attachToGrid(self, grid) : | |
self._label.set_hexpand(False) | |
self._label.set_halign(Gtk.Align.FILL) | |
self._label.set_xalign(0.5) | |
self._label.set_yalign(0.5) | |
self._label.set_name("time_label") | |
self._label.set_ellipsize(3) | |
self._label.show() | |
self._label.get_style_context().add_provider(self._parent.css_provider, Gtk.STYLE_PROVIDER_PRIORITY_APPLICATION) | |
GLib.timeout_add_seconds(1, self.poll) | |
grid.attach(self._label, self.col, self.row, self.cols, self.rows) | |
#self.grid.attach(self.label, 3, 1, 1, 1) | |
@property | |
def name(self) : | |
return "clock" | |
@property | |
def url(self) : | |
return "clock" | |
def poll(self) : | |
current_time = time.strftime("%H:%M") | |
self._label.set_text(current_time) | |
return True | |
class CamTextProvider(): | |
def __init__(self, cam, config) : | |
self._cam = cam | |
self._config = config | |
self._engine = None | |
self._text = "" | |
if "type" in self._config : | |
if self._config["type"] == "moonraker" : | |
self._engine = TextProvider_moonraker(self, cam, config) | |
@property | |
def text(self) : | |
if self._engine is None : | |
return "" | |
return self._text | |
@text.setter | |
def text(self, newtext) : | |
self._text = newtext | |
self.sendTextToMpv() | |
def sendTextToMpv(self) : | |
if self._text == "" : return | |
# print(self._text, self._cam.player, self._cam.connected) | |
if self._cam.player is not None and self._cam.connected : | |
self._cam.player.show_text(self._text, "1500") | |
class TextProvider_moonraker(CamTextProvider): | |
def __init__(self, parent, cam, config) : | |
try : | |
import websockets | |
except Exception as e: | |
logging.error(f"Cannot load websocket - moonraker text cannot be shown") | |
return False | |
self._parent = parent | |
self._cam = cam | |
self._config = config | |
required_keys = ["type", "url", "retry_delay", "timeout"] | |
for key in required_keys: | |
if key not in config: | |
logging.error(f"Missing required key '{key}' in Moonraker config for {cam.name}.") | |
return False | |
self.printer_state = { | |
"extruder_temp": -1, "state": "unknown", "heater_bed_temp": -1, "is_active": False, "progress": 0 | |
} | |
self.start_websocket() | |
async def printer_websocket(self): | |
try : | |
import websockets | |
except Exception as e: | |
logging.error(f"Cannot load websocket - moonraker text cannot be shown") | |
return False | |
uri = self._config["url"] | |
retry_delay = self._config["retry_delay"] | |
timeout = self._config["timeout"] | |
while True: | |
try: | |
async with websockets.connect(uri) as websocket: | |
logging.info("WebSocket connection established.") | |
subscribe_message = { | |
"jsonrpc": "2.0", | |
"method": "printer.objects.subscribe", | |
"params": { | |
"objects": { | |
"virtual_sdcard": ["progress", "is_active"], | |
"idle_timeout": ["state", "printing_time"], | |
"heater_bed": ["temperature", "target"], | |
"extruder": ["temperature", "target", "can_extrude"], | |
} | |
}, | |
"id": 123, | |
} | |
logging.info(f"Sending subscribe message: {json.dumps(subscribe_message)}") | |
await websocket.send(json.dumps(subscribe_message)) | |
start_time = asyncio.get_event_loop().time() | |
initial_state_received = False | |
while not initial_state_received: | |
message = await asyncio.wait_for(websocket.recv(), timeout=timeout) | |
logging.info(f"Initial message: {message}") | |
try: | |
data = json.loads(message) | |
if 'result' in data and 'status' in data['result']: | |
self.process_printer_data(data['result']['status']) | |
initial_state_received = True | |
elif 'method' in data and data['method'] == 'printer.objects.updated' and 'params' in data and 'status' in data['params']: | |
self.process_printer_data(data['params']['status']) | |
initial_state_received = True | |
elif 'error' in data : | |
logging.error(f"Error from Moonraker: {data['error']}") | |
if "method not found" in str(data['error']).lower(): | |
logging.error("Method not found error, exiting") | |
return #exit if method not found | |
else: | |
logging.error("Mobileraker or other error, attempting resubscription") | |
await asyncio.sleep(retry_delay) | |
break | |
except json.JSONDecodeError as e: | |
logging.error(f"JSON decode error: {e}, message: {message}") | |
await asyncio.sleep(retry_delay) | |
break | |
if (asyncio.get_event_loop().time() - start_time) > timeout: | |
logging.error("Timeout receiving initial printer status") | |
await asyncio.sleep(retry_delay) | |
break | |
if not initial_state_received: | |
continue #restart the connection loop. | |
while True: | |
message = await websocket.recv() | |
try: | |
data = json.loads(message) | |
except json.JSONDecodeError as e: | |
logging.error(f"JSON decode error: {e}, message: {message}") | |
continue | |
if 'params' in data and 'status' in data['params']: | |
self.process_printer_data(data['params']['status']) | |
elif 'params' in data and 'method' in data and data['method'] == 'notify_status_update': | |
self.process_notify_status_update(data['params'][0]) | |
except websockets.exceptions.ConnectionClosedError as e: | |
logging.error(f"WebSocket connection closed: {e}") | |
await asyncio.sleep(retry_delay) | |
continue | |
except Exception as e: | |
logging.error(f"WebSocket error: {e}") | |
await asyncio.sleep(retry_delay) | |
continue | |
finally: | |
logging.info("WebSocket connection closed or resubscribing.") | |
def process_notify_status_update(self, update_data): | |
try: | |
if "heater_bed" in update_data and "temperature" in update_data["heater_bed"]: | |
self.printer_state["heater_bed_temp"] = update_data["heater_bed"]["temperature"] | |
if "extruder" in update_data and "temperature" in update_data["extruder"]: | |
self.printer_state["extruder_temp"] = update_data["extruder"]["temperature"] | |
if "virtual_sdcard" in update_data: | |
if "progress" in update_data["virtual_sdcard"]: | |
self.printer_state["progress"] = f"{update_data['virtual_sdcard']['progress'] * 100:.2f}%" | |
if "is_active" in update_data["virtual_sdcard"]: | |
self.printer_state["is_active"] = update_data["virtual_sdcard"]["is_active"] | |
if "idle_timeout" in update_data and "state" in update_data["idle_timeout"]: | |
self.printer_state["state"] = update_data["idle_timeout"]["state"] | |
self.update_printer_cam_text() | |
except KeyError as e: | |
logging.error(f"KeyError in process_notify_status_update: {e}") | |
except Exception as e: | |
logging.error(f"Error in process_notify_status_update: {e}") | |
def update_printer_cam_text(self): | |
try: | |
state = self.printer_state['state'] | |
extruder_temp = f"{self.printer_state['extruder_temp']:.1f}C" | |
heater_bed_temp = f"{self.printer_state['heater_bed_temp']:.1f}C" | |
sd_active = self.printer_state['is_active'] | |
sd_progress = self.printer_state['progress'] | |
status_lines = [ | |
f"{state}", | |
f"Ext: {extruder_temp}", | |
f"Bed: {heater_bed_temp}" | |
] | |
if sd_active: | |
status_lines.append(f"SD: {sd_progress}") | |
status = " ".join(status_lines) | |
self._parent.text = status | |
except KeyError as e: | |
logging.error(f"KeyError in update_printer_cam_text: {e}") | |
except Exception as e: | |
logging.error(f"Error in update_printer_cam_text: {e}") | |
def process_printer_data(self, printer_data): | |
try: | |
if "idle_timeout" in printer_data and "state" in printer_data["idle_timeout"]: | |
self.printer_state["state"] = printer_data["idle_timeout"]["state"] | |
if "heater_bed" in printer_data and "temperature" in printer_data["heater_bed"]: | |
self.printer_state["heater_bed_temp"] = printer_data["heater_bed"]["temperature"] | |
if "extruder" in printer_data and "temperature" in printer_data["extruder"]: | |
self.printer_state["extruder_temp"] = printer_data["extruder"]["temperature"] | |
if "virtual_sdcard" in printer_data: | |
if "is_active" in printer_data["virtual_sdcard"]: | |
self.printer_state["is_active"] = printer_data["virtual_sdcard"]["is_active"] | |
if "progress" in printer_data["virtual_sdcard"]: | |
self.printer_state["progress"] = f"{printer_data['virtual_sdcard']['progress'] * 100:.2f}%" | |
self.update_printer_cam_text() | |
except KeyError as e: | |
logging.error(f"KeyError processing printer data: {e}") | |
except Exception as e: | |
logging.error(f"Error processing printer data: {e}") | |
def start_websocket(self): | |
self.websocket_thread = threading.Thread(target=self.run_websocket_loop) | |
self.websocket_thread.daemon = True | |
self.websocket_thread.start() | |
def run_websocket_loop(self): | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
loop.run_until_complete(self.printer_websocket()) | |
loop.close() | |
class MainClass(Gtk.Window): | |
usedCols = 0 | |
usedRows = 0 | |
def __init__(self): | |
super(MainClass, self).__init__() | |
if not self.load_config(): | |
logging.error("Config load failed, exiting.") | |
sys.exit(1) | |
self.fullscreen() | |
self.connect("realize", self.on_realize) | |
self.show_all() | |
def load_config(self): | |
config_path = "config.json" | |
if not os.path.exists(config_path): | |
logging.error(f"Config file not found: {config_path}") | |
return False | |
try: | |
with open(config_path, "r") as f: | |
config = json.load(f) | |
self._cams_config = config.get("cameras", []) #default to empty list. | |
self.cams = [] | |
for camconfig in self._cams_config: | |
try : | |
if "url" not in camconfig: | |
logging.error(f"ERROR: Cam has missing url") | |
continue | |
if camconfig["url"] == "clock": | |
self.cams.append(Clock(camconfig, self)) | |
else: | |
newCamera = Camera(camconfig, self) | |
self.usedRows = max(self.usedRows, newCamera.row + newCamera.rows) | |
self.usedCols = max(self.usedCols, newCamera.col + newCamera.cols) | |
self.cams.append(newCamera) | |
except KeyError as e: | |
logging.error(f"KeyError in cam config: {e}") | |
continue #skip this cam. | |
return True | |
except (FileNotFoundError, json.JSONDecodeError) as e: | |
logging.error(f"Error loading config: {e}") | |
return False | |
_winX = None | |
_winY = None | |
def _init_winXY(self) : | |
screen = Gdk.Screen.get_default() | |
num_monitors = screen.get_n_monitors() | |
if num_monitors > 0: | |
monitor_num = 0 | |
geometry = screen.get_monitor_geometry(monitor_num) | |
winX = geometry.width | |
winY = geometry.height | |
else: | |
winX = 1920 | |
winY = 1080 | |
self._winX = winX | |
self._winY = winY | |
@property | |
def winX(self) : | |
if self._winX is None : | |
self._init_winXY() | |
return self._winX | |
@property | |
def winY(self) : | |
if self._winY is None : | |
self._init_winXY() | |
return self._winY | |
_css_provider = None | |
@property | |
def css_provider(self) : | |
if self._css_provider is None : | |
self._css_provider = Gtk.CssProvider() | |
gridCols = self.usedCols - 1 | |
gridCols = gridCols if gridCols > 0 else 1 | |
fontsz = int((self.winX / gridCols) / gridCols) | |
css = """ | |
GtkWindow { | |
background-color: black; | |
} | |
GtkGrid { | |
background-color: black; | |
} | |
#time_label { | |
color: white; | |
font-size: """ + str(fontsz) + """px; | |
min-width: 100px; | |
font-family: monospace; | |
} | |
""" | |
css = css.encode('utf-8') | |
self._css_provider.load_from_data(css) | |
self.get_style_context().add_provider(self._css_provider, Gtk.STYLE_PROVIDER_PRIORITY_APPLICATION) | |
return self._css_provider | |
_grid = None | |
@property | |
def grid(self) : | |
if self._grid is None : | |
self._grid = Gtk.Grid() | |
self._grid.get_style_context().add_provider(self.css_provider, Gtk.STYLE_PROVIDER_PRIORITY_APPLICATION) | |
return self._grid | |
def on_realize(self, widget): | |
logging.info("Realise main") | |
self.set_app_paintable(True) | |
self.set_default_size(self.winX, self.winY) | |
self.connect("destroy", self.on_destroy) | |
self.grid.show() | |
for cam in self.cams: | |
cam.attachToGrid(self.grid) | |
self.add(self.grid) | |
def on_destroy(self, widget, data=None): | |
Gtk.main_quit() | |
if __name__ == '__main__': | |
locale.setlocale(locale.LC_NUMERIC, 'C') | |
application = MainClass() | |
Gtk.main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cameras": [ | |
{ | |
"name": "drive", | |
"url": "rtsp:/XXXX:554/user=XXXX_password=XXXX_channel=0_stream=1&onvif=0.sdp?real_stream", | |
"col": 1, | |
"row": 1, | |
"cols": 1, | |
"rows": 3, | |
"stall_threshold": 9 | |
}, | |
{ | |
"name": "door", | |
"url": "rtsp:/XXXX:XXXX@XXXX:554/stream1", | |
"col": 2, | |
"row": 1, | |
"stall_threshold": 9 | |
}, | |
{ | |
"name": "printer1", | |
"url": "rtsp://XXXX:XXXX@XXXX:554/user=XXXX_password=XXXX_channel=1_stream=1.sdp?real_stream", | |
"col": 3, | |
"row": 1, | |
"stall_threshold": 9 | |
}, | |
{ | |
"name": "clock", | |
"url": "clock", | |
"col": 2, | |
"row": 2 | |
}, | |
{ | |
"name": "roadl", | |
"url": "rtsp://XXXXXXXXu@XXXX:554/realmonitor?channel=0&stream=1.sdp", | |
"col": 2, | |
"row": 3, | |
"stall_threshold": 9 | |
}, | |
{ | |
"name": "roadr", | |
"url": "rtsp://XXXX:XXXX@XXXX:554/realmonitor?channel=1&stream=1.sdp", | |
"col": 3, | |
"row": 3, | |
"stall_threshold": 9 | |
}, | |
{ | |
"name": "printer2", | |
"url": "http:/XXXX:8080/stream", | |
"col": 3, | |
"row": 2, | |
"stall_threshold": 9, | |
"textprovider": { | |
"type": "moonraker", | |
"url": "ws://XXXX:7125/websocket", | |
"retry_delay": 5, | |
"timeout": 10 | |
} | |
} | |
] | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Also text provider classes can be added to overlay text on the stream. The example one connects to a 3d printer running klipper / moonraker and overlays some stats on the image. A simple clock class is also added to use in one of the grid spaces and can be invoked by setting the cam url to 'clock'