Skip to content

Instantly share code, notes, and snippets.

@anarcie
Created January 30, 2025 17:27
Show Gist options
  • Select an option

  • Save anarcie/3b62435af4121ae086664b3822059912 to your computer and use it in GitHub Desktop.

Select an option

Save anarcie/3b62435af4121ae086664b3822059912 to your computer and use it in GitHub Desktop.
import requests
import yaml
import os
import json
import sys
import subprocess
import psutil
import re
import time
from ntfy import ntfySend as ntfySend
#----------------------------#
# debug
#----------------------------#
#
# Output Debug Info
#
#----------------------------#
def debug(message):
debug = False
if debug is True:
print(message)
#----------------------------#
# loadConf
#----------------------------#
#
# load yaml config files
#
#----------------------------#
def loadConf(confFile):
if os.path.isfile(confFile):
with open(confFile, 'r') as file:
streamConf = yaml.safe_load(file)
if streamConf is None:
return {}
return streamConf
else:
print(f"Conf File Not Found: {confFile}")
#----------------------------#
# writeConf
#----------------------------#
#
# write yaml config files
#
#----------------------------#
def writeConf(confFile, yaml_dict):
with open(confFile, 'w') as file:
documents = yaml.dump(yaml_dict, file)
#----------------------------#
# twitch_login_verify
#----------------------------#
#
# login to twitch via dev api
#
#----------------------------#
def twitch_login_verify(twitchConf):
try:
isValid = False
token_valid_url = "https://id.twitch.tv/oauth2/validate"
token = twitchConf['token']
token_valid_headers = {'Authorization': f"OAuth {token}"}
res=requests.get(token_valid_url, headers=token_valid_headers)
if res.status_code == 200:
# print()
# print(res.json())
isValid = True
# ntfySend("Twitch", f"Twitch Token is Valid: {token}", priority=1, title="Token Status")
else:
isValid = False
print(f"Twitch Token is NOT Valid: {token}\n")
ntfySend("Twitch", f"Twitch Token is Invalid:\n{token}", priority="5", tags="x", title="Refresh Twitch Token")
return isValid
except Exception as e:
print(e)
sys.exit(1)
#----------------------------#
# twitch_login
#----------------------------#
#
# login to twitch via dev api
#
#----------------------------#
def twitch_login(twitchConf):
try:
token_url = "https://id.twitch.tv/oauth2/token"
token_headers = {'Content-Type': 'application/x-www-form-urlencoded'}
token_data = {
'client_id': twitchConf['client_id'], \
'client_secret': twitchConf['client_secret'], \
'grant_type': 'client_credentials' \
}
r=requests.post(token_url, headers=token_headers, data=token_data)
if r.status_code == 200:
jsonArr = r.json()
token = "Bearer " + str(jsonArr['access_token'])
# print(f"Twitch Login - {token}\n")
return token
else:
print(f"Twitch Login - Failed\n")
sys.exit(1)
except Exception as e:
print(e)
sys.exit(1)
return False
#----------------------------#
# getStreamStates
#----------------------------#
#
# get state/game of streams
#
#----------------------------#
def getStreamStates(streams, client, token):
streamList = {}
try:
queryString = ""
for stream in streams:
queryString = queryString + f"user_login={stream}&"
queryString = queryString[:-1]
check_url = f"https://api.twitch.tv/helix/streams?{queryString}"
check_headers = {'Authorization': token, 'Client-Id': client}
r=requests.get(check_url, headers=check_headers)
response = r.json()
if len(response['data'])>0:
response = response['data']
for streamer in response:
streamList[streamer['user_name'].lower()] = { \
'state': streamer['type'], \
'game': streamer['game_name'].lower() \
}
return streamList
except Exception as e:
print(e)
sys.exit(1)
return streamList
#----------------------------#
# checkOutputDir
#----------------------------#
#
# ensure dir tree exists
#
#----------------------------#
def checkOutputDir(outputPath):
if os.path.isdir(outputPath) is False:
print(f"Creating Output Dir: {outputPath}")
os.makedirs(outputPath)
#----------------------------#
# recordingState
#----------------------------#
#
# check if stream is being recorded
#
#----------------------------#
def recordingState(streamer):
for p in psutil.process_iter():
procCMD = " ".join(p.cmdline())
if streamer in procCMD and "ffmpeg" not in procCMD:
return True
return False
#----------------------------#
# du
#----------------------------#
#
# calculate the size of a dir
#
#----------------------------#
def du(path):
return subprocess.check_output(['du','-s', path]).split()[0].decode('utf-8')
#----------------------------#
# isGrowing
#----------------------------#
#
# is a dir growing?
#
#----------------------------#
def isGrowing(prevSize, currSize):
return True
if int(currSize) > int(prevSize) + 8:
return True
else:
return False
#----------------------------#
# killRecording
#----------------------------#
#
# kill the stream recorder
#
#----------------------------#
def killRecording(streamConf, stream, logLocation):
killcmd = streamConf['record']['kill'].replace('%streamer%', stream).split(' ')
print(f"Killing Stream ({killcmd})")
with open(f"{logLocation}/{stream}/stream-kill-out.log","wb") as outPipe, open(f"{logLocation}/{stream}/stream-kill-err.log","wb") as errPipe:
process = subprocess.Popen(killcmd,stdout=outPipe,stderr=errPipe)
process.wait()
#----------------------------#
# ignoreGame
#----------------------------#
#
# check if a game is ignored
#
#----------------------------#
def ignoreGame(currentGame, ignoredList):
for banned in ignoredList:
if banned.lower() in currentGame.lower():
return True
return False
#----------------------------#
# recordStream
#----------------------------#
#
# check if a game is ignored
#
#----------------------------#
def recordStream(twitchChannel, streamConf, streamlinkOpt, quality='best'):
twitchBase = streamConf['twitch']['base_url']
twithURL = f"{twitchBase}/{twitchChannel}"
#streamlinkOpt = streamlinkOpt.split("")
cmd = ["streamlink", "--twitch-disable-hosting", streamlinkOpt, twithURL, quality]
subprocess.Popen(cmd)
#----------------------------#
# main
#----------------------------#
#
# main program entry
#
#----------------------------#
def main(argv):
argSwitch = None
# if len(argv) > 1:
# argSwitch = argv[1]
# print(f"Arg Switch: {argSwitch}")
streamConf = loadConf('/media/data/scripts/streams.yaml')
duConf = loadConf('/media/data/scripts/streams.du')
twitch_token = twitch_login(streamConf['twitch'])
if twitch_token is False:
print("Cannot Log Into Twitch, Exiting")
sys.exit(1)
verifyToken = twitch_login_verify(streamConf['twitch'])
print("Adding Disable Ads to Config")
streamConf['twitch']['push_to_config'].append("twitch-disable-ads")
# for x in streamConf['twitch']['push_to_config']:
# print(f" -{x}")
# print("")
# Get all our channels states at once
checkAll = getStreamStates(streamConf['streamers'], streamConf['twitch']['client_id'], twitch_token)
for stream in streamConf['streamers']:
# Build the Command to Record The Stream
outputFolder = os.path.join(streamConf['output']['out_folder'], stream)
# Ensure The path Exists
if not os.path.exists(outputFolder):
os.makedirs(outputFolder)
base_cmd = streamConf['record']['cmd']
streamlinkOpt = ' '.join(streamConf['record']['streamlink_options'])
base_cmd = base_cmd \
.replace('%streamer%', stream) \
.replace('%script%', streamConf['general']['script']) \
.split(" ")
streamlinkOpt = streamlinkOpt \
.replace( '%out_folder%', outputFolder) \
.replace('%streamer%', stream) \
.replace('%token%', "\'" + twitch_token.replace("Bearer ", "OAuth ") + "\'") \
base_cmd.append(streamlinkOpt)
print(f"Checking stream: {stream}")
if stream in checkAll and checkAll[stream]['state'] == "live":
# Stream Is Live
print(f" - Stream is live")
if stream in duConf:
lastCheck = duConf[stream]
else:
duConf[stream] = {'game': None, 'du': 0}
lastCheck = {'game': None, 'du': 0}
# Check If They are playing an ignored game / changed games
lastGame = lastCheck['game']
currentGame = checkAll[stream]['game']
duConf[stream]['game'] = currentGame
isNewGsame = True if lastGame != currentGame else False
# Replace the Game name, with a safer version
# Had issue withs games that had ":"
cleanedTitle = ""
for idx,x in enumerate(base_cmd):
if "%game%" in x:
cleanedTitle = currentGame.title()
cleanedTitle = cleanedTitle.replace(": ", " - ")
cleanedTitle = cleanedTitle.replace(":", "-")
cleanedTitle = re.sub('[^0-9a-zA-Z \-]', '', cleanedTitle)
cleanedTitle = cleanedTitle.replace(" ", "-")
cleanedTitle = cleanedTitle.replace("--", "-").replace("--", "-")
x = x.replace("%game%", cleanedTitle)
base_cmd[idx] = x
ignore_game_list = streamConf['ignore_games']
if stream in streamConf['streamer_ignore_games']:
ignore_game_list = streamConf['ignore_games'] + streamConf['streamer_ignore_games'][stream]
print(f" - Ignored Games: {ignore_game_list}")
ignoreCurrent = ignoreGame(currentGame, ignore_game_list)
print(f" - Playing: {currentGame.title()}")
# Check If The Folder Is Growing / Actually Recording
prevDU = lastCheck['du'] if stream in duConf else 0
currDU = du(outputFolder)
duConf[stream]['du'] = int(currDU)
growingState = isGrowing(prevDU, currDU)
logLocation = streamConf['output']['log_output']
checkOutputDir(f"{logLocation}/{stream}/")
# Write to conf file
with open("/media/data/scripts/config.twitch", "w") as wc:
for pushLine in streamConf['twitch']['push_to_config']:
pushLine = pushLine.replace('%token%', twitch_token.replace("Bearer ", ""))
wc.write(pushLine+"\n")
# Decide what to do with all our information
logLocation = streamConf['output']['log_output']
if recordingState(stream) is False and ignoreCurrent is False:
print(f" - Starting recording session\n")
checkOutputDir(outputFolder)
ntfySend("Twitch", f"{stream}: {currentGame}", title="Starting Recording Session")
subprocess.Popen(base_cmd,stdout=subprocess.PIPE, stderr=subprocess.PIPE)
else:
if isNewGsame is True and ignoreCurrent is False:
print(" - Killing recording (New Game)")
killRecording(streamConf, stream, logLocation)
duConf[stream]['du'] = 0
duConf[stream]['game'] = currentGame
print(f" - Starting recording session\n")
checkOutputDir(outputFolder)
ntfySend("Twitch", f"{stream}: {currentGame}", title=f"Starting Recording (Stopped Playing {lastGame})")
subprocess.Popen(base_cmd,stdout=subprocess.PIPE, stderr=subprocess.PIPE)
elif growingState is False and recordingState(stream) is True:
print(" - Killing orphaned recording (not growing)")
killRecording(streamConf, stream, logLocation)
duConf[stream]['du'] = 0
duConf[stream]['game'] = currentGame
elif ignoreCurrent is True and recordingState(stream) is True:
print(f" - Killing ignored game recording ({currentGame})")
ntfySend("Twitch", f"{stream}: {currentGame}", title=f"Stopping Recording Session (Playing: {lastGame})")
killRecording(streamConf, stream, logLocation)
duConf[stream]['du'] = 0
duConf[stream]['game'] = currentGame
elif ignoreCurrent is True and recordingState(stream) is False:
print(f" - Ignoring Game ({currentGame})")
else:
print(f" - Already recording")
else:
# Stream Is Not Live
print(f" - Stream is not live")
if recordingState(stream):
print(" - Killing orphaned recording")
killRecording(streamConf, stream, streamConf['output']['log_output'])
duConf[stream]['du'] = 0
duConf[stream]['game'] = None
print()
writeConf('/media/data/scripts/streams.du', duConf)
#----------------------------#
# __main__
#----------------------------#
#
# main program entry if ran as script
#
#----------------------------#
if __name__ == "__main__":
main(sys.argv)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment