Skip to content

Instantly share code, notes, and snippets.

@Absolucy
Last active March 6, 2023 06:01
Show Gist options
  • Save Absolucy/96da39e181f63cd9a8337240bf7a2d19 to your computer and use it in GitHub Desktop.
Save Absolucy/96da39e181f63cd9a8337240bf7a2d19 to your computer and use it in GitHub Desktop.
SS13 OBS automatic recorder
import socket
import struct
import json
import psutil
import os
import shutil
from colorlog import ColoredFormatter, StreamHandler
from logging import INFO, getLogger
from obswebsocket import obsws, requests as obsreq
from time import sleep, time
from pathlib import Path
source = "Lucy's Automatic SS13 Recorder (uses status query to check for round ID change, which toggles OBS)"
querystr = json.dumps({"query": "status", "source": source, "auth": "anonymous"})
obs_host = "localhost"
obs_port = 4455
obs_password = "[insert OBS websocket password here]"
obs_scene_name = "Space Station 13"
ss13_host = "sage.beestation13.com"
ss13_port = 7878
output_dir = "C:/Users/Lucy/Videos/SS13"
stop_recording_after_ds_closes = 10
current_round_id = -1
last_status = None
status_reply = None
last_seen_dreamseeker = 95
logger = getLogger("ss13-recorder")
logger.setLevel(INFO)
handler = StreamHandler()
handler.setFormatter(ColoredFormatter("%(asctime)s %(log_color)s%(levelname)s%(reset)s %(name)s: %(message)s"))
logger.addHandler(handler)
def fetch_status(addr, port):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
query = b"\x00\x83" + struct.pack('>H', len(querystr) + 6) + b"\x00\x00\x00\x00\x00" + querystr.encode() + b"\x00"
sock.settimeout(10)
sock.connect((addr, port))
sock.sendall(query)
data = sock.recv(4096)
return json.loads(data[5:-1].decode())["data"]
obs = obsws(obs_host, obs_port, obs_password)
obs.connect()
def is_dreamseeker_open():
return "dreamseeker.exe" in (p.name() for p in psutil.process_iter())
def generate_output_path(extension, part=1):
server_name = last_status["version"]
round_id = last_status["round_id"]
map_name = last_status["map_name"]
output_file = f"{server_name} - Round {round_id} on {map_name}"
if part and part > 1:
output_file = output_file + f" (Part {part})"
if extension:
output_file = output_file + extension
return os.path.join(output_dir, output_file)
def stop_recording():
if not last_status:
return
resp = obs.call(obsreq.StopRecord())
if not resp.status:
logger.error("failed to stop recording")
return False
recording_path = resp.getOutputPath()
recording_ext = Path(recording_path).suffix
output_path = None
if os.path.exists(recording_path):
sleep(2.5)
part = 1
while not output_path or os.path.exists(output_path):
output_path = generate_output_path(extension=recording_ext, part=part)
part = part + 1
shutil.move(recording_path, output_path)
logger.info(f"recording stopped, saved to {output_path}")
return True
def update_status():
global status_reply
try:
status_reply = fetch_status(ss13_host, ss13_port)
except:
logger.warn("failed to fetch new server status, server must be down!")
def is_recording():
try:
status = obs.call(obsreq.GetRecordStatus())
return status.getOutputActive()
except:
return False
def is_replay_active():
try:
status = obs.call(obsreq.GetReplayBufferStatus())
return status.getOutputActive()
except:
return False
def is_paused():
try:
status = obs.call(obsreq.GetRecordStatus())
return status.getOutputPaused()
except:
return False
try:
while True:
if not is_dreamseeker_open():
recording = is_recording()
paused = is_paused()
if not recording and not paused:
sleep(5)
continue
if recording and not paused:
resp = obs.call(obsreq.PauseRecord())
if not resp.status:
logger.error(f"failed to pause recording")
sleep(5)
continue
logger.info("recording paused due to dreamseeker being closed")
if (paused or recording) and time() >= (last_seen_dreamseeker + stop_recording_after_ds_closes):
closed_for = time() - last_seen_dreamseeker
logger.info(f"dreamseeker has been closed for {closed_for} seconds")
stop_recording()
status_reply = None
last_status = None
sleep(5)
continue
last_seen_dreamseeker = time()
update_status()
if not status_reply:
sleep(15)
continue
if is_paused():
resp = obs.call(obsreq.ResumeRecord())
if not resp.status:
logger.error(f"failed to unpause recording")
sleep(5)
continue
old_round_id = current_round_id
current_round_id = status_reply["round_id"]
current_game_state = status_reply["gamestate"]
if (is_recording() or is_paused()) and current_game_state == 4:
logger.info(f"round {current_round_id} has ended")
if not stop_recording():
sleep(5)
continue
elif current_game_state < 4:
if old_round_id == -1:
logger.info(f"detected round ID {current_round_id}")
resp = obs.call(obsreq.SetCurrentProgramScene(sceneName=obs_scene_name))
if not resp.status:
logger.error(f"failed to set scene to '{obs_scene_name}'")
sleep(5)
continue
if not is_recording():
resp = obs.call(obsreq.StartRecord())
if not resp.status:
logger.error("failed to start recording")
sleep(5)
continue
logger.info("recording started")
if not is_replay_active():
resp = obs.call(obsreq.StartReplayBuffer())
if not resp.status:
logger.error("failed to start replay buffer")
sleep(5)
continue
logger.info("replay buffer started")
elif old_round_id != current_round_id:
logger.info(f"detected new round ID {current_round_id}, old round ID {old_round_id}")
if is_recording() or is_paused():
if not stop_recording():
sleep(5)
continue
if not is_recording():
resp = obs.call(obsreq.StartRecord())
if not resp.status:
logger.error("failed to start recording")
sleep(5)
continue
last_status = status_reply
sleep(45)
except KeyboardInterrupt:
logger.info("interrupted, exiting")
if last_status and (is_recording() or is_paused()):
stop_recording()
quit(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment