Skip to content

Instantly share code, notes, and snippets.

@racerxdl
Created June 10, 2024 12:28
Show Gist options
  • Save racerxdl/80fc84b27ef5600d60afbce0875b3b92 to your computer and use it in GitHub Desktop.
Save racerxdl/80fc84b27ef5600d60afbce0875b3b92 to your computer and use it in GitHub Desktop.
SpaceEngineers Torch DS restarter for cases where the MSCV debug window prevents it from auto restart. Should be run in the same machine (linux) as the DS
import asyncio
import time
import os, signal
import opengsq
from opengsq.responses.source import SourceInfo, GoldSourceInfo, Visibility
async def query(info):
host, port = str(info["host"]), int(str(info["port"]))
source = opengsq.Source(host, port, 15)
async def get_players():
try:
return await source.get_players()
except Exception:
return []
start = time.time()
info, players = await asyncio.gather(source.get_info(), get_players())
if isinstance(info, SourceInfo):
info: SourceInfo = info
connect = f"{host}:{info.port}"
game_id = info.game_id
keywords = info.keywords
elif isinstance(info, GoldSourceInfo):
info: GoldSourceInfo = info
connect = info.address
game_id = None
keywords = None
ping = int((time.time() - start) * 1000)
players.sort(key=lambda x: x.duration, reverse=True)
players, bots = players[info.bots :], players[: info.bots]
result = {
"name": info.name,
"map": info.map,
"password": info.visibility == Visibility.Private,
"numplayers": info.players,
"maxplayers": info.max_players,
"game_id": game_id,
"players": [
{
"name": player.name,
"raw": {"score": player.score, "time": player.duration},
}
for player in players
],
"connect": connect,
"ping": ping,
"raw": info.__dict__,
}
if keywords:
result["raw"]["tags"] = str(keywords).split(",")
return result
def close_pid(pid):
try:
os.kill(pid, signal.SIGKILL)
except Exception:
print(f"Failed to kill pid {pid}")
async def main():
while True:
try:
print("Checking server")
data = await query({
"host": "192.168.52.100", # Your server IP
"port": 27016,
})
name = data["name"]
map = data["map"]
numplayers = data["numplayers"]
maxplayers = data["maxplayers"]
print(f"Server OK: {name} {map} {numplayers}/{maxplayers}")
except Exception as e:
print(e)
print("Restaring server")
pids = os.popen("ps ax |grep Torch | awk '{ print $1 }' | xargs").read().split()
for pid in pids:
print(f"Killing pid {pid}")
close_pid(int(pid))
print("Waiting for 5 minutes before trying again")
await asyncio.sleep(300)
await asyncio.sleep(5)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment