Last active
March 6, 2023 06:01
-
-
Save Absolucy/96da39e181f63cd9a8337240bf7a2d19 to your computer and use it in GitHub Desktop.
SS13 OBS automatic recorder
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket | |
import struct | |
import json | |
import psutil | |
import os | |
import shutil | |
from colorlog import ColoredFormatter, StreamHandler | |
from logging import INFO, getLogger | |
from obswebsocket import obsws, requests as obsreq | |
from time import sleep, time | |
from pathlib import Path | |
source = "Lucy's Automatic SS13 Recorder (uses status query to check for round ID change, which toggles OBS)" | |
querystr = json.dumps({"query": "status", "source": source, "auth": "anonymous"}) | |
obs_host = "localhost" | |
obs_port = 4455 | |
obs_password = "[insert OBS websocket password here]" | |
obs_scene_name = "Space Station 13" | |
ss13_host = "sage.beestation13.com" | |
ss13_port = 7878 | |
output_dir = "C:/Users/Lucy/Videos/SS13" | |
stop_recording_after_ds_closes = 10 | |
current_round_id = -1 | |
last_status = None | |
status_reply = None | |
last_seen_dreamseeker = 95 | |
logger = getLogger("ss13-recorder") | |
logger.setLevel(INFO) | |
handler = StreamHandler() | |
handler.setFormatter(ColoredFormatter("%(asctime)s %(log_color)s%(levelname)s%(reset)s %(name)s: %(message)s")) | |
logger.addHandler(handler) | |
def fetch_status(addr, port): | |
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
query = b"\x00\x83" + struct.pack('>H', len(querystr) + 6) + b"\x00\x00\x00\x00\x00" + querystr.encode() + b"\x00" | |
sock.settimeout(10) | |
sock.connect((addr, port)) | |
sock.sendall(query) | |
data = sock.recv(4096) | |
return json.loads(data[5:-1].decode())["data"] | |
obs = obsws(obs_host, obs_port, obs_password) | |
obs.connect() | |
def is_dreamseeker_open(): | |
return "dreamseeker.exe" in (p.name() for p in psutil.process_iter()) | |
def generate_output_path(extension, part=1): | |
server_name = last_status["version"] | |
round_id = last_status["round_id"] | |
map_name = last_status["map_name"] | |
output_file = f"{server_name} - Round {round_id} on {map_name}" | |
if part and part > 1: | |
output_file = output_file + f" (Part {part})" | |
if extension: | |
output_file = output_file + extension | |
return os.path.join(output_dir, output_file) | |
def stop_recording(): | |
if not last_status: | |
return | |
resp = obs.call(obsreq.StopRecord()) | |
if not resp.status: | |
logger.error("failed to stop recording") | |
return False | |
recording_path = resp.getOutputPath() | |
recording_ext = Path(recording_path).suffix | |
output_path = None | |
if os.path.exists(recording_path): | |
sleep(2.5) | |
part = 1 | |
while not output_path or os.path.exists(output_path): | |
output_path = generate_output_path(extension=recording_ext, part=part) | |
part = part + 1 | |
shutil.move(recording_path, output_path) | |
logger.info(f"recording stopped, saved to {output_path}") | |
return True | |
def update_status(): | |
global status_reply | |
try: | |
status_reply = fetch_status(ss13_host, ss13_port) | |
except: | |
logger.warn("failed to fetch new server status, server must be down!") | |
def is_recording(): | |
try: | |
status = obs.call(obsreq.GetRecordStatus()) | |
return status.getOutputActive() | |
except: | |
return False | |
def is_replay_active(): | |
try: | |
status = obs.call(obsreq.GetReplayBufferStatus()) | |
return status.getOutputActive() | |
except: | |
return False | |
def is_paused(): | |
try: | |
status = obs.call(obsreq.GetRecordStatus()) | |
return status.getOutputPaused() | |
except: | |
return False | |
try: | |
while True: | |
if not is_dreamseeker_open(): | |
recording = is_recording() | |
paused = is_paused() | |
if not recording and not paused: | |
sleep(5) | |
continue | |
if recording and not paused: | |
resp = obs.call(obsreq.PauseRecord()) | |
if not resp.status: | |
logger.error(f"failed to pause recording") | |
sleep(5) | |
continue | |
logger.info("recording paused due to dreamseeker being closed") | |
if (paused or recording) and time() >= (last_seen_dreamseeker + stop_recording_after_ds_closes): | |
closed_for = time() - last_seen_dreamseeker | |
logger.info(f"dreamseeker has been closed for {closed_for} seconds") | |
stop_recording() | |
status_reply = None | |
last_status = None | |
sleep(5) | |
continue | |
last_seen_dreamseeker = time() | |
update_status() | |
if not status_reply: | |
sleep(15) | |
continue | |
if is_paused(): | |
resp = obs.call(obsreq.ResumeRecord()) | |
if not resp.status: | |
logger.error(f"failed to unpause recording") | |
sleep(5) | |
continue | |
old_round_id = current_round_id | |
current_round_id = status_reply["round_id"] | |
current_game_state = status_reply["gamestate"] | |
if (is_recording() or is_paused()) and current_game_state == 4: | |
logger.info(f"round {current_round_id} has ended") | |
if not stop_recording(): | |
sleep(5) | |
continue | |
elif current_game_state < 4: | |
if old_round_id == -1: | |
logger.info(f"detected round ID {current_round_id}") | |
resp = obs.call(obsreq.SetCurrentProgramScene(sceneName=obs_scene_name)) | |
if not resp.status: | |
logger.error(f"failed to set scene to '{obs_scene_name}'") | |
sleep(5) | |
continue | |
if not is_recording(): | |
resp = obs.call(obsreq.StartRecord()) | |
if not resp.status: | |
logger.error("failed to start recording") | |
sleep(5) | |
continue | |
logger.info("recording started") | |
if not is_replay_active(): | |
resp = obs.call(obsreq.StartReplayBuffer()) | |
if not resp.status: | |
logger.error("failed to start replay buffer") | |
sleep(5) | |
continue | |
logger.info("replay buffer started") | |
elif old_round_id != current_round_id: | |
logger.info(f"detected new round ID {current_round_id}, old round ID {old_round_id}") | |
if is_recording() or is_paused(): | |
if not stop_recording(): | |
sleep(5) | |
continue | |
if not is_recording(): | |
resp = obs.call(obsreq.StartRecord()) | |
if not resp.status: | |
logger.error("failed to start recording") | |
sleep(5) | |
continue | |
last_status = status_reply | |
sleep(45) | |
except KeyboardInterrupt: | |
logger.info("interrupted, exiting") | |
if last_status and (is_recording() or is_paused()): | |
stop_recording() | |
quit(0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment