Last active
February 9, 2022 21:58
-
-
Save rotolonico/e733f89c31646c4d7717b3598c7acdf1 to your computer and use it in GitHub Desktop.
Python Interactive Livestream Tool
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import random | |
from pytube import YouTube | |
import os | |
import pytchat | |
import asyncio | |
import yt_dlp | |
initialVideoId = "7iaIK_86W3s" | |
liveId = "d1rGAtMR5XU" | |
mainPath = os.path.dirname(os.path.abspath(__file__)) | |
defaultVidPath = mainPath + '/1.mp4' | |
infoPath = mainPath + '/info.txt' | |
textPath = mainPath + '/text.txt' | |
currentChoices = [] | |
votes = [] | |
voters = [] | |
lastVoter = "" | |
def is_int(s): | |
try: | |
int(s) | |
return True | |
except ValueError: | |
return False | |
def string_cropper(s, c): | |
return (s[:c] + '...') if len(s) > c else \ | |
s | |
def get_video_from_id(id, attempt): | |
try: | |
yt = YouTube("https://www.youtube.com/watch?v=" + id) | |
endScreens = [] | |
endScreensRaw = yt.vid_info["endscreen"]["endscreenRenderer"]["elements"] | |
for e in endScreensRaw: | |
if e["endscreenElementRenderer"]["style"] != 'VIDEO': | |
continue | |
duration = e["endscreenElementRenderer"]["videoDuration"]["runs"][0]["text"].split(':') | |
durationParsed = 0 | |
if len(duration) == 2: | |
durationParsed = int(duration[0]) * 60 + int(duration[1]) | |
if len(duration) == 3: | |
durationParsed = int(duration[0]) * 60 * 60 + int(duration[1]) * 60 + int(duration[2]) | |
endScreens.append({ | |
'title': e["endscreenElementRenderer"]["title"]["runs"][0]["text"], | |
'id': e["endscreenElementRenderer"]["endpoint"]["watchEndpoint"]["videoId"], | |
'duration': durationParsed | |
}) | |
v = { | |
'title': yt.vid_info["videoDetails"]["title"], | |
'id': yt.vid_info["videoDetails"]["videoId"], | |
'duration': int(yt.vid_info["videoDetails"]["lengthSeconds"]), | |
'endscreens': endScreens | |
} | |
return v | |
except: | |
attempt = attempt + 1 | |
if attempt > 5: | |
return get_video_from_id(initialVideoId, 0) | |
return get_video_from_id(id, attempt) | |
def play_video(d): | |
if d['status'] == 'finished': | |
os.rename(d['filename'], defaultVidPath) | |
with open(infoPath, 'w') as f: | |
f.write('fetch') | |
f.close() | |
def download_and_play_video(video_id): | |
ydl_opts = { | |
'progress_hooks': [play_video] | |
} | |
url = "https://www.youtube.com/watch?v=" + video_id | |
with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
ydl.download(url) | |
def make_choice(): | |
global votes | |
if len(voters) == 0: | |
return random.choice(currentChoices)["id"] | |
m = max(votes) | |
return currentChoices[random.choice([i for i, j in enumerate(votes) if j == m])]["id"] | |
def add_vote(vote, voter): | |
try: | |
global votes | |
global voters | |
global lastVoter | |
votes[vote] += 1 | |
voters.append(voter.channelId) | |
lastVoter = string_cropper(voter.name, 30) | |
except: | |
print("Something went wrong adding vote!") | |
def clear_votes(): | |
global votes | |
global voters | |
global lastVoter | |
votes = [0] * 4 | |
voters = [] | |
lastVoter = "" | |
def update_text_file(): | |
global lastVoter | |
global currentChoices | |
try: | |
text = "" | |
with open(textPath, 'w') as f: | |
for i in range(len(currentChoices)): | |
croppedTitle = string_cropper(currentChoices[i]["title"], 20) | |
text += ('Type "' + str(i + 1) + '" for "' + croppedTitle + '" | Votes: ' + str(votes[i])) | |
if len(voters) != 0: | |
text += (" (" + str(int(votes[i] / len(voters) * 100)) + "%)") | |
text += '\n' | |
if len(voters) != 0: | |
text += "Total votes: " + str(len(voters)) + "\n" | |
if lastVoter != "": | |
text += "Last voter: " + lastVoter + "\n" | |
f.write(text) | |
f.close() | |
except: | |
print("Something went wrong updating text file, retrying without lastVoter") | |
lastVoter = "" | |
update_text_file() | |
def initialize_choices(endscreens): | |
global currentChoices | |
currentChoices = endscreens | |
clear_votes() | |
update_text_file() | |
async def playLoop(videoId): | |
while True: | |
video = get_video_from_id(videoId, 0) | |
initialize_choices(video["endscreens"]) | |
download_and_play_video(video["id"]) | |
await asyncio.sleep(video["duration"]) | |
videoId = make_choice() | |
async def chatLoop(): | |
chat = pytchat.create(video_id=liveId) | |
while chat.is_alive(): | |
for c in chat.get().sync_items(): | |
if len(c.message) == 1 and c.message.isdigit() and is_int(c.message) and 0 < int(c.message) <= len( | |
currentChoices) and c.author.channelId is not None and c.author.channelId not in voters: | |
add_vote((int(c.message) - 1), c.author) | |
update_text_file() | |
await asyncio.sleep(0.5) | |
loop = asyncio.get_event_loop() | |
cors = asyncio.wait([playLoop(initialVideoId), chatLoop()]) | |
loop.run_until_complete(cors) |
Added try/except in add_vote because someone in chat changed their name to something weird and broke it somehow 🤷
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
cool!