Skip to content

Instantly share code, notes, and snippets.

@tung
Last active February 27, 2023 02:33
Show Gist options
  • Save tung/4a7c2f74e5a9448f4c4f8a150e88fed3 to your computer and use it in GitHub Desktop.
Save tung/4a7c2f74e5a9448f4c4f8a150e88fed3 to your computer and use it in GitHub Desktop.
Play Twitch VODs in mpv with chat replay in terminal
#!/usr/bin/env python3
#
# Twitch VOD viewer with terminal chat replay.
#
# Usage: TWITCH_CLIENT_ID=0123456789abcdef0123456789abcde twitch-vod.py https://www.twitch.tv/videos/1234567890 [360p]
#
# Requires mpv and twitch-vod-chat.py - https://gist.github.com/tung/20de3e992ca3a6629843e8169dc0398e
#
import json
import re
import socket
import subprocess
import sys
import time
if len(sys.argv) < 2:
print('At least one argument required.', file=sys.stderr)
sys.exit(1)
MPV_PATH = 'mpv'
TWITCH_CHAT_PATH = '/home/tung/Install/twitch-vod-chat.py'
CHAT_DELAY_S = 5.0 # in seconds
CHAT_GAP_JUMP_S = 20.0 # in seconds
url = sys.argv[1]
twitch_vod_id = re.search('\d+$', url).group()
socket_path = '/tmp/mpv-twitch-' + twitch_vod_id
mpv_args = [MPV_PATH, '--msg-level=all=error', '--input-ipc-server=' + socket_path, url]
if len(sys.argv) > 2:
mpv_args.insert(1, '--ytdl-format=' + sys.argv[2])
mpv_process = subprocess.Popen(mpv_args, stdout=sys.stdout, stderr=subprocess.STDOUT)
chat_process = None
chat_process_start_s = 0.0
# 1: hours - 2: minutes - 3: seconds - 4: milliseconds - 5: rest of message
message_re = re.compile('(\d+):(\d{2}):(\d{2})\.(\d{3})(.+)$')
messages = [b'']
try:
with socket.socket(socket.AF_UNIX) as client:
print('Waiting for {0}...'.format(socket_path), end='', flush=True)
while True:
try:
client.connect(socket_path)
print()
break
except (OSError, ConnectionRefusedError):
try:
time.sleep(1)
print('.', end='', flush=True)
except KeyboardInterrupt:
print('Cancelled.')
raise
try:
old_video_time_s = None
video_time_s = None
while True:
# Get the video time.
old_video_time_s = video_time_s
client.sendall((json.dumps({ 'command': ['get_property', 'playback-time'], 'request_id': 123 }) + '\n').encode('utf-8'))
msg_bytes = client.recv(1024)
for msg_bytes_line in msg_bytes.decode('utf-8').splitlines():
msg_json = json.loads(msg_bytes_line)
if 'request_id' in msg_json and msg_json['error'] == 'success':
video_time_s = float(msg_json['data'])
# Don't bother loading chat at zero seconds if we couldn't get an actual video time.
if video_time_s == None or old_video_time_s == None:
time.sleep(0.25)
continue
# Launch a new chat process if it doesn't yet exist or time has changed by enough.
if chat_process == None or video_time_s < old_video_time_s or video_time_s > old_video_time_s + CHAT_GAP_JUMP_S:
if chat_process != None:
chat_process.terminate()
chat_process.wait()
chat_process_start_s = int(video_time_s - CHAT_DELAY_S) if video_time_s > CHAT_DELAY_S else 0
display_start = chat_process_start_s + CHAT_DELAY_S
print('\r\033[K\033[7m-- {0}:{1:02}:{2:02} --\033[0m'.format(int(display_start / (60 * 60)), int((display_start % (60 * 60) / 60)), int(display_start % 60)))
chat_process = subprocess.Popen(['stdbuf', '-o0', 'python3', '-u', TWITCH_CHAT_PATH, twitch_vod_id, str(chat_process_start_s)], bufsize=0, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
messages.clear()
messages.append(b'')
# Read in chat data if message queue is drained.
if len(messages) <= 1:
data = chat_process.stdout.read(200)
if data == b'':
break
if data != None:
data_split = data.split(b'\n')
messages[-1] = messages[-1] + data_split.pop(0)
messages.extend(data_split)
# Show chat messages from queue up to current video time.
while len(messages) > 1:
line = messages[0].decode('utf-8')
line_match = message_re.search(line)
if line_match != None:
line_time_s = int(line_match.group(1)) * 60 * 60 + int(line_match.group(2)) * 60 + int(line_match.group(3)) + int(line_match.group(4)) / 1000.0
if line_time_s > video_time_s - CHAT_DELAY_S:
break
line_time_s += CHAT_DELAY_S
print('\r\033[K\033[90m{0}:{1:02}:{2:02}{3}'.format(int(line_time_s / (60 * 60)), int((line_time_s % (60 * 60)) / 60), int(line_time_s % 60), line_match.group(5)))
else:
print('\r\033[K' + line)
messages.pop(0)
print('\r\033[KVideo time: {0}:{1:02}:{2:02} '.format(int(video_time_s / (60 * 60)), int((video_time_s % (60 * 60)) / 60), int(video_time_s % 60)), end='', flush=True)
time.sleep(0.25)
except (KeyboardInterrupt, BrokenPipeError):
print('Done.')
finally:
if chat_process != None:
chat_process.terminate()
chat_process.wait()
mpv_process.terminate()
mpv_process.wait()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment