Last active
January 23, 2025 22:50
-
-
Save Absolucy/f9d9daee8307ac8a226bfa1cf00125e6 to your computer and use it in GitHub Desktop.
SS13 OBS automatic recorder
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# pip install obs-websocket-py psutil colorlog pystray pillow | |
import json | |
import psutil | |
import os | |
import shutil | |
import threading | |
from colorlog import ColoredFormatter, StreamHandler | |
from logging import INFO, getLogger | |
from obswebsocket import obsws, requests as obsreq | |
from time import sleep, time | |
from pathlib import Path | |
from urllib.request import Request, urlopen | |
from pystray import Icon as icon, Menu as menu, MenuItem as item | |
from PIL import Image, ImageDraw | |
import socket | |
import struct | |
import urllib.parse | |
obs_host = "localhost" | |
obs_port = 4455 | |
obs_password = "hunter2" | |
obs_scene_name = "Space Station 13" | |
output_dir = "M:/Videos/Recordings/SS13" | |
stop_recording_after_ds_closes = 95 | |
user_agent = "Lucy's SS13 OBS Script Thingy" | |
current_round_id = -1 | |
last_status = None | |
status_reply = None | |
last_seen_dreamseeker = 95 | |
name_override = None | |
logger = getLogger("ss13-recorder") | |
logger.setLevel(INFO) | |
handler = StreamHandler() | |
handler.setFormatter( | |
ColoredFormatter( | |
"%(asctime)s %(log_color)s%(levelname)s%(reset)s %(name)s: %(message)s" | |
) | |
) | |
logger.addHandler(handler) | |
obs = obsws(obs_host, obs_port, obs_password) | |
obs.connect() | |
# from https://github.com/qwertyquerty/ss13rp/blob/master/util.py | |
def topic(addr: str, port: int, querystr: str, decode: bool = False) -> str: | |
try: | |
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
query = ( | |
b"\x00\x83" | |
+ struct.pack(">H", len(querystr) + 6) | |
+ b"\x00\x00\x00\x00\x00" | |
+ querystr.encode() | |
+ b"\x00" | |
) | |
sock.settimeout(10) | |
sock.connect((addr, port)) | |
sock.sendall(query) | |
data = sock.recv(4096) | |
if not data or len(data) < 6: | |
return None | |
if decode: | |
resp = data[5:-1].decode("utf-8") | |
if not resp: | |
return None | |
return dict(urllib.parse.parse_qsl(resp)) | |
else: | |
return data[5:-1].decode("utf-8") | |
except ConnectionError: | |
return None | |
def create_image(width, height, outline_color, fill_color): | |
circle_width = width * 0.75 | |
circle_height = height * 0.75 | |
# Generate an image and draw a pattern | |
image = Image.new("RGBA", (width, height), (255, 255, 255, 0)) | |
draw = ImageDraw.Draw(image) | |
start_x = (width / 2) - (circle_width / 2) | |
start_y = (height / 2) - (circle_height / 2) | |
draw.ellipse( | |
(start_x, start_y, start_x + circle_width, start_y + circle_height), | |
fill=fill_color, | |
outline=outline_color, | |
width=5, | |
) | |
return image | |
def get_status(): | |
global name_override | |
ip = get_dreamseeker_ip() | |
if not ip: | |
return None | |
ip = ip.removeprefix("byond://") | |
if ("beestation" in ip) or (":7878" in ip): | |
name_override = None | |
req = Request( | |
url="https://api.beestation13.com/stats/bs_sage", | |
headers={"User-Agent": user_agent}, | |
) | |
try: | |
with urlopen(req) as data: | |
return json.load(data) | |
except: | |
logger.exception("failed to get json data") | |
return None | |
elif ":1342" in ip: | |
name_override = "Monke HRP" | |
try: | |
x = topic("198.37.111.92", 1342, "?status", True) | |
return x | |
except: | |
logger.exception("failed to get json data") | |
return None | |
elif (":4407" in ip) or (":4408" in ip): | |
name_override = "Monkestation Backup" | |
try: | |
x = topic( | |
"73.48.196.183", | |
4407, | |
"?status", | |
True, | |
) | |
return x | |
except: | |
logger.exception("failed to get json data") | |
return None | |
elif ("monkestation" in ip) or (":3121" in ip): | |
name_override = "Monkestation" | |
try: | |
x = topic( | |
"play.monkestation.com", | |
3121, | |
"?status", | |
True, | |
) | |
return x | |
except: | |
logger.exception("failed to get json data") | |
return None | |
elif ("manuel" in ip) or (":1447" in ip): | |
name_override = "tgstation Manuel" | |
try: | |
x = topic("manuel.tgstation13.org", 1447, "?status", True) | |
return x | |
except: | |
logger.exception("failed to get json data") | |
return None | |
elif "yog" in ip: | |
name_override = "Yogstation" | |
try: | |
return topic("game.yogstation.net", 4133, "?status", True) | |
except: | |
logger.exception("failed to get json data") | |
return None | |
else: | |
return None | |
def is_dreamseeker_open(): | |
try: | |
return "dreamseeker.exe" in (p.name() for p in psutil.process_iter()) | |
except: | |
return False | |
def get_dreamseeker_ip(): | |
try: | |
for p in psutil.process_iter(): | |
try: | |
if p.name() != "dreamseeker.exe": | |
continue | |
cmdline = p.cmdline() | |
if len(cmdline) < 2: | |
continue | |
return cmdline[1] | |
except: | |
continue | |
except: | |
return None | |
def generate_output_path(extension, part=1): | |
server_name = last_status["version"] | |
if name_override: | |
server_name = name_override | |
round_id = last_status["round_id"] | |
map_name = last_status["map_name"] | |
output_subdir = os.path.join(output_dir, server_name) | |
os.makedirs(output_subdir, exist_ok=True) | |
output_file = f"{server_name} - Round {round_id} on {map_name}" | |
if part and part > 1: | |
output_file = output_file + f" (Part {part})" | |
if extension: | |
output_file = output_file + extension | |
return os.path.join(output_subdir, output_file) | |
def stop_recording(discard=False) -> bool: | |
if not last_status: | |
return False | |
resp = obs.call(obsreq.StopRecord()) | |
if not resp.status: | |
logger.error("failed to stop recording") | |
return False | |
recording_path = resp.getOutputPath() | |
recording_ext = Path(recording_path).suffix | |
output_path = None | |
if os.path.exists(recording_path): | |
sleep(2.5) | |
if discard: | |
os.remove(recording_path) | |
else: | |
part = 1 | |
while not output_path or os.path.exists(output_path): | |
output_path = generate_output_path(extension=recording_ext, part=part) | |
part = part + 1 | |
threading.Thread( | |
target=shutil.move, args=(recording_path, output_path, shutil.copy) | |
).start() | |
logger.info(f"recording stopped, saved to {output_path}") | |
return True | |
def set_blur(blur: bool): | |
obs.call( | |
obsreq.SetSourceFilterEnabled( | |
sourceName="Main", filterName="SEKRIT", filterEnabled=blur | |
) | |
) | |
def is_recording() -> bool: | |
try: | |
status = obs.call(obsreq.GetRecordStatus()) | |
return status.getOutputActive() | |
except: | |
return False | |
def do_exit(_tray, _item): | |
try: | |
stop_recording(discard=True) | |
except: | |
pass | |
os._exit(0) | |
def do_stop(tray, _item): | |
set_blur(False) | |
if stop_recording(): | |
update_tray(tray) | |
def do_start(tray, _item): | |
obs.call(obsreq.SetCurrentProgramScene(sceneName=obs_scene_name)) | |
set_blur(False) | |
if not is_recording(): | |
resp = obs.call(obsreq.StartRecord()) | |
if not resp.status: | |
logger.error("failed to start recording") | |
sleep(5) | |
return | |
logger.info("recording started") | |
update_tray(tray) | |
def do_discard(tray, _item): | |
if stop_recording(discard=True): | |
update_tray(tray) | |
def do_force_update(tray, _item): | |
global status_reply | |
status_reply = get_status() | |
update_tray(tray) | |
# outline,fill | |
# green,red = active but not recording | |
# red,red = dreamseeker closed | |
# red,green = recording | |
# white,red = setting up | |
def update_tray(tray): | |
if not is_dreamseeker_open(): | |
tray.icon = tray.icon = create_image(64, 64, "green", "red") | |
tray.menu = menu( | |
item("DreamSeeker Closed", None, enabled=False), | |
item("Force Update", do_force_update), | |
item("Exit", do_exit), | |
) | |
elif not status_reply: | |
tray.icon = tray.icon = create_image(64, 64, "red", "red") | |
tray.menu = menu( | |
item("No Status", None, enabled=False), | |
item("Force Update", do_force_update), | |
item("Exit", do_exit), | |
) | |
else: | |
map_name = status_reply["map_name"] | |
round_id = int(status_reply["round_id"]) | |
game_state = int(status_reply["gamestate"]) | |
if game_state == 4: | |
tray.icon = tray.icon = create_image(64, 64, "yellow", "green") | |
tray.menu = menu( | |
item("Recording Ended", None, enabled=False), | |
item(f"Round {round_id} finished on {map_name}", None, enabled=False), | |
item("Start Recording", do_start), | |
item("Force Update", do_force_update), | |
item("Exit", do_exit), | |
) | |
elif is_recording(): | |
tray.icon = tray.icon = create_image(64, 64, "red", "green") | |
tray.menu = menu( | |
item("Recording", None, enabled=False), | |
item( | |
f"Round {round_id} on {map_name}", | |
None, | |
enabled=False, | |
), | |
item("Stop Recording", do_stop), | |
item("Discard Recording", do_discard), | |
item("Force Update", do_force_update), | |
item("Exit", do_exit), | |
) | |
else: | |
tray.icon = tray.icon = create_image(64, 64, "green", "red") | |
tray.menu = menu( | |
item("Not Recording", None, enabled=False), | |
item(f"Round {round_id} on {map_name}", None, enabled=False), | |
item("Start Recording", do_start), | |
item("Force Update", do_force_update), | |
item("Exit", do_exit), | |
) | |
def main_loop(tray): | |
global status_reply, last_status, current_round_id | |
tray.visible = True | |
while True: | |
update_tray(tray) | |
if not is_dreamseeker_open(): | |
recording = is_recording() | |
if recording and time() >= ( | |
last_seen_dreamseeker + stop_recording_after_ds_closes | |
): | |
closed_for = time() - last_seen_dreamseeker | |
logger.info(f"dreamseeker has been closed for {closed_for} seconds") | |
stop_recording() | |
status_reply = None | |
last_status = None | |
update_tray(tray) | |
sleep(5) | |
continue | |
last_seen_dreamseeker = time() | |
new_status = get_status() | |
if ( | |
not new_status | |
or not "round_id" in new_status | |
or not "gamestate" in new_status | |
or not "version" in new_status | |
or not "map_name" in new_status | |
): | |
sleep(5) | |
continue | |
last_status = status_reply | |
status_reply = new_status | |
old_round_id = current_round_id | |
current_round_id = int(status_reply.get("round_id", -1)) | |
current_game_state = int(status_reply.get("gamestate", 4)) | |
update_tray(tray) | |
if is_recording() and current_game_state == 4: | |
logger.info(f"round {current_round_id} has ended") | |
if not stop_recording(): | |
sleep(5) | |
continue | |
elif current_game_state < 4 and current_round_id != -1: | |
if old_round_id == -1: | |
logger.info(f"detected round ID {current_round_id}") | |
obs.call(obsreq.SetCurrentProgramScene(sceneName=obs_scene_name)) | |
set_blur(False) | |
if not is_recording(): | |
resp = obs.call(obsreq.StartRecord()) | |
if not resp.status: | |
logger.error("failed to start recording") | |
sleep(5) | |
continue | |
logger.info("recording started") | |
elif old_round_id != current_round_id: | |
logger.info( | |
f"detected new round ID {current_round_id}, old round ID {old_round_id}" | |
) | |
set_blur(False) | |
if is_recording(): | |
if not stop_recording(): | |
sleep(5) | |
continue | |
if not is_recording(): | |
resp = obs.call(obsreq.StartRecord()) | |
if not resp.status: | |
logger.error("failed to start recording") | |
sleep(5) | |
continue | |
last_status = status_reply | |
update_tray(tray) | |
sleep(10) | |
tray = icon( | |
"SS13 OBS Recorder", | |
icon=create_image(64, 64, "white", "red"), | |
menu=menu(item("Exit", do_exit), item("Stop Recording", do_stop)), | |
) | |
tray.run(setup=main_loop) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment