Skip to content

Instantly share code, notes, and snippets.

@Absolucy
Last active January 23, 2025 22:50
Show Gist options
  • Save Absolucy/f9d9daee8307ac8a226bfa1cf00125e6 to your computer and use it in GitHub Desktop.
Save Absolucy/f9d9daee8307ac8a226bfa1cf00125e6 to your computer and use it in GitHub Desktop.
SS13 OBS automatic recorder
# pip install obs-websocket-py psutil colorlog pystray pillow
import json
import psutil
import os
import shutil
import threading
from colorlog import ColoredFormatter, StreamHandler
from logging import INFO, getLogger
from obswebsocket import obsws, requests as obsreq
from time import sleep, time
from pathlib import Path
from urllib.request import Request, urlopen
from pystray import Icon as icon, Menu as menu, MenuItem as item
from PIL import Image, ImageDraw
import socket
import struct
import urllib.parse
obs_host = "localhost"
obs_port = 4455
obs_password = "hunter2"
obs_scene_name = "Space Station 13"
output_dir = "M:/Videos/Recordings/SS13"
stop_recording_after_ds_closes = 95
user_agent = "Lucy's SS13 OBS Script Thingy"
current_round_id = -1
last_status = None
status_reply = None
last_seen_dreamseeker = 95
name_override = None
logger = getLogger("ss13-recorder")
logger.setLevel(INFO)
handler = StreamHandler()
handler.setFormatter(
ColoredFormatter(
"%(asctime)s %(log_color)s%(levelname)s%(reset)s %(name)s: %(message)s"
)
)
logger.addHandler(handler)
obs = obsws(obs_host, obs_port, obs_password)
obs.connect()
# from https://github.com/qwertyquerty/ss13rp/blob/master/util.py
def topic(addr: str, port: int, querystr: str, decode: bool = False) -> str:
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
query = (
b"\x00\x83"
+ struct.pack(">H", len(querystr) + 6)
+ b"\x00\x00\x00\x00\x00"
+ querystr.encode()
+ b"\x00"
)
sock.settimeout(10)
sock.connect((addr, port))
sock.sendall(query)
data = sock.recv(4096)
if not data or len(data) < 6:
return None
if decode:
resp = data[5:-1].decode("utf-8")
if not resp:
return None
return dict(urllib.parse.parse_qsl(resp))
else:
return data[5:-1].decode("utf-8")
except ConnectionError:
return None
def create_image(width, height, outline_color, fill_color):
circle_width = width * 0.75
circle_height = height * 0.75
# Generate an image and draw a pattern
image = Image.new("RGBA", (width, height), (255, 255, 255, 0))
draw = ImageDraw.Draw(image)
start_x = (width / 2) - (circle_width / 2)
start_y = (height / 2) - (circle_height / 2)
draw.ellipse(
(start_x, start_y, start_x + circle_width, start_y + circle_height),
fill=fill_color,
outline=outline_color,
width=5,
)
return image
def get_status():
global name_override
ip = get_dreamseeker_ip()
if not ip:
return None
ip = ip.removeprefix("byond://")
if ("beestation" in ip) or (":7878" in ip):
name_override = None
req = Request(
url="https://api.beestation13.com/stats/bs_sage",
headers={"User-Agent": user_agent},
)
try:
with urlopen(req) as data:
return json.load(data)
except:
logger.exception("failed to get json data")
return None
elif ":1342" in ip:
name_override = "Monke HRP"
try:
x = topic("198.37.111.92", 1342, "?status", True)
return x
except:
logger.exception("failed to get json data")
return None
elif (":4407" in ip) or (":4408" in ip):
name_override = "Monkestation Backup"
try:
x = topic(
"73.48.196.183",
4407,
"?status",
True,
)
return x
except:
logger.exception("failed to get json data")
return None
elif ("monkestation" in ip) or (":3121" in ip):
name_override = "Monkestation"
try:
x = topic(
"play.monkestation.com",
3121,
"?status",
True,
)
return x
except:
logger.exception("failed to get json data")
return None
elif ("manuel" in ip) or (":1447" in ip):
name_override = "tgstation Manuel"
try:
x = topic("manuel.tgstation13.org", 1447, "?status", True)
return x
except:
logger.exception("failed to get json data")
return None
elif "yog" in ip:
name_override = "Yogstation"
try:
return topic("game.yogstation.net", 4133, "?status", True)
except:
logger.exception("failed to get json data")
return None
else:
return None
def is_dreamseeker_open():
try:
return "dreamseeker.exe" in (p.name() for p in psutil.process_iter())
except:
return False
def get_dreamseeker_ip():
try:
for p in psutil.process_iter():
try:
if p.name() != "dreamseeker.exe":
continue
cmdline = p.cmdline()
if len(cmdline) < 2:
continue
return cmdline[1]
except:
continue
except:
return None
def generate_output_path(extension, part=1):
server_name = last_status["version"]
if name_override:
server_name = name_override
round_id = last_status["round_id"]
map_name = last_status["map_name"]
output_subdir = os.path.join(output_dir, server_name)
os.makedirs(output_subdir, exist_ok=True)
output_file = f"{server_name} - Round {round_id} on {map_name}"
if part and part > 1:
output_file = output_file + f" (Part {part})"
if extension:
output_file = output_file + extension
return os.path.join(output_subdir, output_file)
def stop_recording(discard=False) -> bool:
if not last_status:
return False
resp = obs.call(obsreq.StopRecord())
if not resp.status:
logger.error("failed to stop recording")
return False
recording_path = resp.getOutputPath()
recording_ext = Path(recording_path).suffix
output_path = None
if os.path.exists(recording_path):
sleep(2.5)
if discard:
os.remove(recording_path)
else:
part = 1
while not output_path or os.path.exists(output_path):
output_path = generate_output_path(extension=recording_ext, part=part)
part = part + 1
threading.Thread(
target=shutil.move, args=(recording_path, output_path, shutil.copy)
).start()
logger.info(f"recording stopped, saved to {output_path}")
return True
def set_blur(blur: bool):
obs.call(
obsreq.SetSourceFilterEnabled(
sourceName="Main", filterName="SEKRIT", filterEnabled=blur
)
)
def is_recording() -> bool:
try:
status = obs.call(obsreq.GetRecordStatus())
return status.getOutputActive()
except:
return False
def do_exit(_tray, _item):
try:
stop_recording(discard=True)
except:
pass
os._exit(0)
def do_stop(tray, _item):
set_blur(False)
if stop_recording():
update_tray(tray)
def do_start(tray, _item):
obs.call(obsreq.SetCurrentProgramScene(sceneName=obs_scene_name))
set_blur(False)
if not is_recording():
resp = obs.call(obsreq.StartRecord())
if not resp.status:
logger.error("failed to start recording")
sleep(5)
return
logger.info("recording started")
update_tray(tray)
def do_discard(tray, _item):
if stop_recording(discard=True):
update_tray(tray)
def do_force_update(tray, _item):
global status_reply
status_reply = get_status()
update_tray(tray)
# outline,fill
# green,red = active but not recording
# red,red = dreamseeker closed
# red,green = recording
# white,red = setting up
def update_tray(tray):
if not is_dreamseeker_open():
tray.icon = tray.icon = create_image(64, 64, "green", "red")
tray.menu = menu(
item("DreamSeeker Closed", None, enabled=False),
item("Force Update", do_force_update),
item("Exit", do_exit),
)
elif not status_reply:
tray.icon = tray.icon = create_image(64, 64, "red", "red")
tray.menu = menu(
item("No Status", None, enabled=False),
item("Force Update", do_force_update),
item("Exit", do_exit),
)
else:
map_name = status_reply["map_name"]
round_id = int(status_reply["round_id"])
game_state = int(status_reply["gamestate"])
if game_state == 4:
tray.icon = tray.icon = create_image(64, 64, "yellow", "green")
tray.menu = menu(
item("Recording Ended", None, enabled=False),
item(f"Round {round_id} finished on {map_name}", None, enabled=False),
item("Start Recording", do_start),
item("Force Update", do_force_update),
item("Exit", do_exit),
)
elif is_recording():
tray.icon = tray.icon = create_image(64, 64, "red", "green")
tray.menu = menu(
item("Recording", None, enabled=False),
item(
f"Round {round_id} on {map_name}",
None,
enabled=False,
),
item("Stop Recording", do_stop),
item("Discard Recording", do_discard),
item("Force Update", do_force_update),
item("Exit", do_exit),
)
else:
tray.icon = tray.icon = create_image(64, 64, "green", "red")
tray.menu = menu(
item("Not Recording", None, enabled=False),
item(f"Round {round_id} on {map_name}", None, enabled=False),
item("Start Recording", do_start),
item("Force Update", do_force_update),
item("Exit", do_exit),
)
def main_loop(tray):
global status_reply, last_status, current_round_id
tray.visible = True
while True:
update_tray(tray)
if not is_dreamseeker_open():
recording = is_recording()
if recording and time() >= (
last_seen_dreamseeker + stop_recording_after_ds_closes
):
closed_for = time() - last_seen_dreamseeker
logger.info(f"dreamseeker has been closed for {closed_for} seconds")
stop_recording()
status_reply = None
last_status = None
update_tray(tray)
sleep(5)
continue
last_seen_dreamseeker = time()
new_status = get_status()
if (
not new_status
or not "round_id" in new_status
or not "gamestate" in new_status
or not "version" in new_status
or not "map_name" in new_status
):
sleep(5)
continue
last_status = status_reply
status_reply = new_status
old_round_id = current_round_id
current_round_id = int(status_reply.get("round_id", -1))
current_game_state = int(status_reply.get("gamestate", 4))
update_tray(tray)
if is_recording() and current_game_state == 4:
logger.info(f"round {current_round_id} has ended")
if not stop_recording():
sleep(5)
continue
elif current_game_state < 4 and current_round_id != -1:
if old_round_id == -1:
logger.info(f"detected round ID {current_round_id}")
obs.call(obsreq.SetCurrentProgramScene(sceneName=obs_scene_name))
set_blur(False)
if not is_recording():
resp = obs.call(obsreq.StartRecord())
if not resp.status:
logger.error("failed to start recording")
sleep(5)
continue
logger.info("recording started")
elif old_round_id != current_round_id:
logger.info(
f"detected new round ID {current_round_id}, old round ID {old_round_id}"
)
set_blur(False)
if is_recording():
if not stop_recording():
sleep(5)
continue
if not is_recording():
resp = obs.call(obsreq.StartRecord())
if not resp.status:
logger.error("failed to start recording")
sleep(5)
continue
last_status = status_reply
update_tray(tray)
sleep(10)
tray = icon(
"SS13 OBS Recorder",
icon=create_image(64, 64, "white", "red"),
menu=menu(item("Exit", do_exit), item("Stop Recording", do_stop)),
)
tray.run(setup=main_loop)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment