Last active
October 25, 2021 01:54
-
-
Save kfur/68d58b9c69b80eeae3968c0065e6f07e to your computer and use it in GitHub Desktop.
Youtube live chat replay to srt subtitles
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pysrt | |
import json | |
import sys | |
import functools | |
import emoji | |
from urllib import request | |
import re | |
class LiveChat(): | |
replay_url_format = 'https://www.youtube.com/live_chat_replay/get_live_chat_replay?commandMetadata=%5Bobject%20Object%5D&continuation={}%253D%253D&playerOffsetMs={}&hidden=false&pbj=1' | |
headers = { | |
'authority': 'www.youtube.com', | |
'pragma': 'no-cache', | |
'cache-control': 'no-cache', | |
'x-youtube-device': 'cbr=Chrome&cosver=10.0&cos=Windows&cbrver=71.0.3578.80', | |
'x-youtube-page-label': 'youtube.ytfe.desktop_20200116_5_RC0', | |
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.80 Safari/537.36', | |
'x-youtube-client-name': '1', | |
'accept': '*/*', | |
'sec-fetch-site': 'same-origin', | |
'sec-fetch-mode': 'cors', | |
'accept-language': 'en-US,en;q=0.9' | |
} | |
def __init__(self, continuation, max_comments_view_len=160, max_comments_per_view=4): | |
self.continuation = continuation | |
self.max_comments_view_len = max_comments_view_len | |
self.max_comments_per_view = max_comments_per_view | |
def _get_chat_chunk(self, playerOffsetMs): | |
r = request.Request(self.replay_url_format.format(self.continuation, playerOffsetMs), headers=self.headers) | |
with request.urlopen(r) as resp: | |
return resp.read() | |
def live_chat_to_srt(self): | |
lastTimeStamp = 1 | |
newTimeStamp = -1 | |
comments = [] | |
while True: | |
chat_chunk = None | |
try: | |
chat_chunk = json.loads(self._get_chat_chunk(lastTimeStamp)) | |
except Exception as e: | |
print(e) | |
continue | |
comments += parse_comments(chat_chunk) | |
newTimeStamp = int(chat_chunk['response']['continuationContents']['liveChatContinuation']['actions'][-1]['replayChatItemAction']['videoOffsetTimeMsec']) | |
if newTimeStamp == lastTimeStamp: | |
break | |
else: | |
lastTimeStamp = newTimeStamp | |
comments = functools.reduce(uniq_comments, comments, []) | |
subs = comments_to_subs(comments) | |
return pysrt.SubRipFile(subs) | |
def json_file_key(name): | |
a = name.index('.') | |
return int(name[:a]) | |
class Date(): | |
def __init__(self, sec, minute, hour=0, msec=0): | |
self.hour = hour | |
self.minute = minute | |
self.sec = sec | |
self.msec = 0 | |
def __eq__(self, other): | |
return self.hour == other.hour and self.minute == other.minute and self.sec == other.sec and self.msec == other.msec | |
def __gt__(self, other): | |
if self.hour != other.hour and self.hour < other.hour: | |
return False | |
elif self.minute != other.minute and self.minute < other.minute: | |
return False | |
elif self.sec != other.sec and self.sec < other.sec: | |
return False | |
elif self.msec != other.msec and self.msec < other.msec: | |
return False | |
elif self != other: | |
return True | |
return False | |
class Comment(): | |
def __init__(self, author, date, text, isModer=False): | |
self.author = author | |
self.date = date | |
self.text = text | |
self.isModer = isModer | |
def parse_comments(js_com): | |
actions = js_com['response']['continuationContents']['liveChatContinuation']['actions'] | |
comments = [] | |
for act in actions: | |
isModer = False | |
try: | |
text = act['replayChatItemAction']['actions'][0]['addChatItemAction']['item']['liveChatTextMessageRenderer']['message']['runs'][0]['text'] | |
text = emoji.demojize(text) | |
except: | |
continue | |
author = act['replayChatItemAction']['actions'][0]['addChatItemAction']['item']['liveChatTextMessageRenderer']['authorName']['simpleText'] | |
try: | |
isModer = act['replayChatItemAction']['actions'][0]['addChatItemAction']['item']['liveChatTextMessageRenderer']['authorBadges'][0]['liveChatAuthorBadgeRenderer']['icon']['iconType'] == "MODERATOR" | |
except Exception as e: | |
isModer = False | |
date_raw = act['replayChatItemAction']['actions'][0]['addChatItemAction']['item']['liveChatTextMessageRenderer']['timestampText']['simpleText'] | |
hms = date_raw.split(':') | |
date = None | |
if len(hms) == 2: | |
m, s = hms | |
date = Date(int(s), int(m)) | |
elif len(hms) == 3: | |
h, m, s = hms | |
date = Date(int(s), int(m), int(h)) | |
else: | |
raise Exception("date array wrong") | |
comments.append(Comment(author, date, text, isModer)) | |
return comments | |
def uniq_comments(old, new): | |
for i in old: | |
if i.author == new.author and i.date == new.date and i.text == new.text: | |
return old | |
old.append(new) | |
return old | |
def comments_to_subs(comments, max_comments_view_len=160, max_comments_per_view=4, reverse=False): | |
subrip_items = [] | |
item_comments = [] | |
item_index = 0 | |
# fix time due to some comments can have same time with next comment | |
for z, c in enumerate(comments): | |
if len(comments) == z+1: | |
continue | |
if comments[z].date == comments[z + 1].date: | |
comments[z + 1].date.msec += 500 | |
# if comments[z].date > comments[z + 1].date: | |
# comments[z + 1].date.msec = 500 | |
if len(comments) == z + 2: | |
continue | |
if comments[z+1].date > comments[z + 2].date: | |
comments[z + 2].date.msec += 500 | |
for i, com in enumerate(comments): | |
if reverse: | |
item_comments = ["[" + com.author + "]" + ("*" if com.isModer else "") + ": " + com.text] + item_comments | |
else: | |
item_comments.append("["+com.author+"]"+("*" if com.isModer else "") + ": " + com.text) | |
if len(item_comments) > max_comments_per_view: | |
item_comments = item_comments[:4] if reverse else item_comments[max(0, len(item_comments) - 4):] | |
if len('\n'.join(item_comments)) >= max_comments_view_len: | |
item_comments = item_comments[:3] if reverse else item_comments[max(0, len(item_comments) - 3):] | |
if len(comments) > i+1 and com.date > comments[i+1].date: | |
raise Exception('wrong time range ', i) | |
start_time = pysrt.SubRipTime(com.date.hour, | |
com.date.minute, | |
com.date.sec, | |
com.date.msec) | |
end_time = pysrt.SubRipTime(comments[i+1].date.hour if len(comments) > i+1 else com.date.hour, | |
comments[i+1].date.minute if len(comments) > i+1 else com.date.minute, | |
comments[i+1].date.sec if len(comments) > i+1 else (com.date.sec + 5), | |
comments[i+1].date.msec if len(comments) > i+1 else com.date.msec) | |
subitem = pysrt.SubRipItem(item_index, | |
start_time, | |
end_time, | |
'\n'.join(item_comments)) | |
subrip_items.append(subitem) | |
item_index += 1 | |
return subrip_items | |
def main(youtube_link, path=None): | |
# dir = os.listdir(path) | |
# path = os.path.normpath(path) | |
# dir.sort(key=json_file_key) | |
# comments = [] | |
# for f in dir: | |
# jf = open(path+f, 'r') | |
# js_com = json.loads(jf.read()) | |
# comments += parse_comments(js_com) | |
# comments = functools.reduce(uniq_comments, comments, []) | |
# subs = comments_to_subs(comments) | |
# pysrt.SubRipFile(subs).save(sys.argv[2]) | |
vid_html = request.urlopen(youtube_link).read().decode() | |
continuation = re.findall('continuation=([a-zA-Z0-9]+)', vid_html)[0] | |
lc = LiveChat(continuation) | |
lc.live_chat_to_srt().save(path) | |
def print_usage(): | |
print('''Usage: | |
yreplay2srt.py https://youtu.be/qy_qonT38DY subtitles.srt | |
''') | |
if __name__ == '__main__': | |
if len(sys.argv) != 3: | |
print_usage() | |
sys.exit(-1) | |
main(sys.argv[1], sys.argv[2]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment