Skip to content

Instantly share code, notes, and snippets.

@rotolonico
Last active February 9, 2022 21:58
Show Gist options
  • Save rotolonico/e733f89c31646c4d7717b3598c7acdf1 to your computer and use it in GitHub Desktop.
Save rotolonico/e733f89c31646c4d7717b3598c7acdf1 to your computer and use it in GitHub Desktop.
Python Interactive Livestream Tool
import random
from pytube import YouTube
import os
import pytchat
import asyncio
import yt_dlp
initialVideoId = "7iaIK_86W3s"
liveId = "d1rGAtMR5XU"
mainPath = os.path.dirname(os.path.abspath(__file__))
defaultVidPath = mainPath + '/1.mp4'
infoPath = mainPath + '/info.txt'
textPath = mainPath + '/text.txt'
currentChoices = []
votes = []
voters = []
lastVoter = ""
def is_int(s):
try:
int(s)
return True
except ValueError:
return False
def string_cropper(s, c):
return (s[:c] + '...') if len(s) > c else \
s
def get_video_from_id(id, attempt):
try:
yt = YouTube("https://www.youtube.com/watch?v=" + id)
endScreens = []
endScreensRaw = yt.vid_info["endscreen"]["endscreenRenderer"]["elements"]
for e in endScreensRaw:
if e["endscreenElementRenderer"]["style"] != 'VIDEO':
continue
duration = e["endscreenElementRenderer"]["videoDuration"]["runs"][0]["text"].split(':')
durationParsed = 0
if len(duration) == 2:
durationParsed = int(duration[0]) * 60 + int(duration[1])
if len(duration) == 3:
durationParsed = int(duration[0]) * 60 * 60 + int(duration[1]) * 60 + int(duration[2])
endScreens.append({
'title': e["endscreenElementRenderer"]["title"]["runs"][0]["text"],
'id': e["endscreenElementRenderer"]["endpoint"]["watchEndpoint"]["videoId"],
'duration': durationParsed
})
v = {
'title': yt.vid_info["videoDetails"]["title"],
'id': yt.vid_info["videoDetails"]["videoId"],
'duration': int(yt.vid_info["videoDetails"]["lengthSeconds"]),
'endscreens': endScreens
}
return v
except:
attempt = attempt + 1
if attempt > 5:
return get_video_from_id(initialVideoId, 0)
return get_video_from_id(id, attempt)
def play_video(d):
if d['status'] == 'finished':
os.rename(d['filename'], defaultVidPath)
with open(infoPath, 'w') as f:
f.write('fetch')
f.close()
def download_and_play_video(video_id):
ydl_opts = {
'progress_hooks': [play_video]
}
url = "https://www.youtube.com/watch?v=" + video_id
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download(url)
def make_choice():
global votes
if len(voters) == 0:
return random.choice(currentChoices)["id"]
m = max(votes)
return currentChoices[random.choice([i for i, j in enumerate(votes) if j == m])]["id"]
def add_vote(vote, voter):
try:
global votes
global voters
global lastVoter
votes[vote] += 1
voters.append(voter.channelId)
lastVoter = string_cropper(voter.name, 30)
except:
print("Something went wrong adding vote!")
def clear_votes():
global votes
global voters
global lastVoter
votes = [0] * 4
voters = []
lastVoter = ""
def update_text_file():
global lastVoter
global currentChoices
try:
text = ""
with open(textPath, 'w') as f:
for i in range(len(currentChoices)):
croppedTitle = string_cropper(currentChoices[i]["title"], 20)
text += ('Type "' + str(i + 1) + '" for "' + croppedTitle + '" | Votes: ' + str(votes[i]))
if len(voters) != 0:
text += (" (" + str(int(votes[i] / len(voters) * 100)) + "%)")
text += '\n'
if len(voters) != 0:
text += "Total votes: " + str(len(voters)) + "\n"
if lastVoter != "":
text += "Last voter: " + lastVoter + "\n"
f.write(text)
f.close()
except:
print("Something went wrong updating text file, retrying without lastVoter")
lastVoter = ""
update_text_file()
def initialize_choices(endscreens):
global currentChoices
currentChoices = endscreens
clear_votes()
update_text_file()
async def playLoop(videoId):
while True:
video = get_video_from_id(videoId, 0)
initialize_choices(video["endscreens"])
download_and_play_video(video["id"])
await asyncio.sleep(video["duration"])
videoId = make_choice()
async def chatLoop():
chat = pytchat.create(video_id=liveId)
while chat.is_alive():
for c in chat.get().sync_items():
if len(c.message) == 1 and c.message.isdigit() and is_int(c.message) and 0 < int(c.message) <= len(
currentChoices) and c.author.channelId is not None and c.author.channelId not in voters:
add_vote((int(c.message) - 1), c.author)
update_text_file()
await asyncio.sleep(0.5)
loop = asyncio.get_event_loop()
cors = asyncio.wait([playLoop(initialVideoId), chatLoop()])
loop.run_until_complete(cors)
@chocolateimage
Copy link

cool!

@rotolonico
Copy link
Author

Added try/except in add_vote because someone in chat changed their name to something weird and broke it somehow 🤷

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment