-
-
Save yutsuku/86e0d70793aa80d31e13e6f69d388c99 to your computer and use it in GitHub Desktop.
Download chat from a Twitch VOD and print it to a terminal.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# | |
# A script to download chat from a Twitch VOD in SubRip (*.srt) format | |
# Chat will be downloaded all the way until it ends. | |
# | |
# Usage: TWITCH_CLIENT_ID=0123456789abcdef0123456789abcde twitch-vod-chat.py [video_id] [start] | |
# | |
# This script could break at any time, because Twitch's chat API is | |
# undocumented and likes to change at any time; in fact, this script was | |
# created because Twitch got rid of rechat.twitch.tv. | |
# | |
import json | |
import os | |
import requests | |
import sys | |
import time | |
import types | |
import math | |
def time_to_hhmmss(t): | |
hours = int(t // 3600) | |
minutes = int((t - hours * 3600) // 60) | |
seconds = int(t - hours * 3600 - minutes * 60) | |
milliseconds = int((t - hours * 3600 - minutes * 60 - seconds) * 1000) | |
return "{0}:{1:02}:{2:02}.{3:<03}".format(hours, minutes, seconds, milliseconds) | |
def time_to_srt(t): | |
hours = int(t // 3600) | |
minutes = int((t - hours * 3600) // 60) | |
seconds = int(t - hours * 3600 - minutes * 60) | |
milliseconds = int((t - hours * 3600 - minutes * 60 - seconds) * 1000) | |
return "{0:02}:{1:02}:{2:02},{3:<03}".format(hours, minutes, seconds, milliseconds) | |
def lighten_color(color): | |
r = color.r * 3 // 4 + 63 | |
g = color.g * 3 // 4 + 63 | |
b = color.b * 3 // 4 + 63 | |
return types.SimpleNamespace(r=r, g=g, b=b) | |
def clamp(x): | |
return max(0, min(x, 255)) | |
def rgb_to_hex(color): | |
return "#{0:02x}{1:02x}{2:02x}".format(clamp(color.r), clamp(color.g), clamp(color.b)) | |
def message_color(comment): | |
color_by_name = { | |
'white': (255, 255, 255), | |
'black': (0, 0, 0), | |
'red': (255, 0, 0), | |
'green': (0, 255, 0), | |
'blue': (0, 0, 255), | |
'yellow': (255, 255, 0), | |
'gray': (128, 128, 128), | |
'magenta': (255, 0, 255), | |
'cyan': (0, 255, 255) | |
} | |
r = 128 | |
g = 128 | |
b = 128 | |
if 'user_color' in comment['message']: | |
user_color = comment['message']['user_color'] | |
if len(user_color) == 7 and user_color[0] == '#': | |
r = int(user_color[1:3], 16) | |
g = int(user_color[3:5], 16) | |
b = int(user_color[5:7], 16) | |
elif user_color in color_by_name: | |
c = color_by_name[user_color] | |
r = c[0] | |
g = c[1] | |
b = c[2] | |
return types.SimpleNamespace(r=r, g=g, b=b) | |
def badge_icons(message): | |
b = '' | |
if 'user_badges' in message: | |
for badge in message['user_badges']: | |
if badge['_id'] == 'broadcaster': | |
b += '🎥' | |
elif badge['_id'] == 'moderator': | |
b += '⚔' | |
elif badge['_id'] == 'subscriber': | |
b += '★' | |
elif badge['_id'] == 'staff': | |
b += '⛨' | |
return b | |
def simple_name(commenter): | |
name = commenter['name'] | |
display_name = commenter['display_name'] | |
if display_name: | |
c = display_name[0].lower() | |
if (c >= 'a' and c <= 'z') or (c >= '0' and c <= '9') or c == '_': | |
name = display_name | |
return name | |
def format_message(comment): | |
time = comment['content_offset_seconds'] | |
time_to = comment['msg_time_to'] | |
badges = badge_icons(comment['message']) | |
name = simple_name(comment['commenter']) | |
color = lighten_color(message_color(comment)) | |
counter = comment['seq_counter'] | |
message = comment['message']['body'] | |
if comment['message']['is_action']: | |
#message = '\033[38;2;' + str(color.r) + ';' + str(color.g) + ';' + str(color.b) + 'm' + message + '\033[0m' | |
message = '<font color="{color_hex}">{message}</font>'.format(message=message, color_hex=rgb_to_hex(color)) | |
#nick = '\033[38;2;{c.r};{c.g};{c.b}m<{badges}{name}>\033[0m'.format(badges=badges, name=name, c=color) | |
#if 'user_badges' in comment['message']: | |
# is_broadcaster = False | |
# for badge in comment['message']['user_badges']: | |
# if badge['_id'] == 'broadcaster': | |
# is_broadcaster = True | |
# break | |
# if is_broadcaster: | |
# nick = '\033[7m' + nick | |
nick = '<font color="{color_hex}">{name}</font>'.format(name=name, color_hex=rgb_to_hex(color)) | |
#return "\033[94m{counter} {time} -> {time_to}\033[0m {nick} {message}".format(counter=counter, time=time_to_srt(time), time_to=time_to_srt(time_to), nick=nick, message=message) | |
return """{counter} | |
{time} --> {time_to} | |
{nick}: {message} | |
""".format(counter=counter, time=time_to_srt(time), time_to=time_to_srt(time_to), nick=nick, message=message) | |
def print_response_messages(data, start): | |
for comment in data['comments']: | |
if comment['content_offset_seconds'] < start: | |
continue | |
if comment['source'] != 'chat': | |
continue | |
print(format_message(comment)) | |
def process_response_section(data, start, duration_multiplier, max_duration): | |
comments = data['comments'] | |
global seq_counter | |
last_msg_offset = comments[-1]['content_offset_seconds'] | |
segment_duration = last_msg_offset - comments[0]['content_offset_seconds'] | |
per_msg_duration = min(segment_duration * duration_multiplier / len(comments), max_duration) | |
for comment in comments: | |
if comment['content_offset_seconds'] < start: | |
continue | |
if comment['source'] != 'chat': | |
continue | |
comment['msg_time_from'] = comment['content_offset_seconds'] | |
msg_time_from = comment['content_offset_seconds'] | |
msg_time_from_ms = math.floor(msg_time_from * 1000) % 1000 | |
msg_time_from_sec = math.floor(msg_time_from) % 60 | |
msg_time_from_min = math.floor(msg_time_from / 60) % 60 | |
msg_time_from_hour = math.floor(msg_time_from / 3600) | |
comment['msg_time_to'] = msg_time_from + per_msg_duration | |
comment['seq_counter'] = seq_counter | |
msg_time_to = msg_time_from + per_msg_duration | |
msg_time_to_ms = math.floor(msg_time_to * 1000) % 1000 | |
msg_time_to_sec = math.floor(msg_time_to) % 60 | |
msg_time_to_min = math.floor(msg_time_to / 60) % 60 | |
msg_time_to_hour = math.floor(msg_time_to / 3600) | |
print(format_message(comment)) | |
seq_counter = seq_counter + 1 | |
################################################################################ | |
if len(sys.argv) < 3 or 'TWITCH_CLIENT_ID' not in os.environ: | |
print('Usage: TWITCH_CLIENT_ID=[client_id] {0} <video_id> <start>'.format(sys.argv[0]), file=sys.stderr) | |
sys.exit(1) | |
video_id = sys.argv[1] | |
start = int(sys.argv[2]) | |
duration_multiplier = 10 | |
max_duration = 10 | |
seq_counter = 1 | |
session = requests.Session() | |
session.headers = { 'Client-ID': os.environ['TWITCH_CLIENT_ID'], 'Accept': 'application/vnd.twitchtv.v5+json' } | |
response = session.get('https://api.twitch.tv/v5/videos/' + video_id + '/comments?content_offset_seconds=' + str(start), timeout=10) | |
response.raise_for_status() | |
data = response.json() | |
process_response_section(data, start, duration_multiplier, max_duration) | |
cursor = None | |
if '_next' in data: | |
cursor = data['_next'] | |
time.sleep(0.1) | |
while cursor: | |
response = session.get('https://api.twitch.tv/v5/videos/' + video_id + '/comments?cursor=' + cursor, timeout=10) | |
response.raise_for_status() | |
data = response.json() | |
process_response_section(data, start, duration_multiplier, max_duration) | |
if '_next' in data: | |
cursor = data['_next'] | |
time.sleep(0.1) | |
else: | |
cursor = None |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment