Created
August 16, 2024 15:43
-
-
Save justfoolingaround/fbb21c7e9cee4acadb75781551f7305a to your computer and use it in GitHub Desktop.
A Twitch streaming client that tries to stay as real-time as possible.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
In short, the goal of this project is to | |
stay ahead of your friends during a live | |
watch party to avoid any spoiling reactions | |
when the stakes are real high. | |
This means that you'll be the one spoiling | |
your friends though, so keep up the act! | |
--- | |
HANGING PRINT STATEMENTS ARE INTENDED. THEY | |
ARE TO BE REMOVED WHEN I DISCOVER A FIX FOR | |
THOSE. | |
I NEVER ENCOUNTERED ADS OR A DISCONTINUOUS | |
STREAM, IT'S NOT THAT I'M LAZY. | |
--- | |
Piper.py by @justfoolingaround (devkr) | |
Requirements: python-requests, mpv in PATH | |
A Twitch streaming client that tries to stay | |
as real-time as possible. | |
Even Twitch's fastest client has a live edge | |
of 2, `piper` tries to keep it at about 1. | |
A to-do may be to potentially add certain | |
level of dynamic edge management so that we | |
can handle unexpectancies easily. | |
Handling `low_latency` case could potentially | |
be done differently in order to enforce any | |
live edges but this may sacrifice the future | |
dynamicity of `piper`'s live edge. | |
""" | |
import logging | |
import subprocess | |
import time | |
import urllib.parse | |
import requests | |
TAG_NAME_MAPPING = { | |
"#EXT-X-STREAM-INF": "stream_information", | |
"#EXT-X-MEDIA": "media", | |
"#EXT-X-TARGETDURATION": "target_duration", | |
"#EXT-X-VERSION": "version", | |
"#EXT-X-MEDIA-SEQUENCE": "media_sequence", | |
"#EXTINF": "information", | |
"#EXT-X-TWITCH-PREFETCH": "low_latency", | |
} | |
def parse_stream_attrs(expression: str): | |
attrs = {} | |
args = [] | |
if ":" not in expression: | |
return (expression, attrs, args) | |
tag, attributes = expression.split(":", 1) | |
i = 0 | |
n = len(attributes) | |
while i < n: | |
key = "" | |
value = "" | |
while i < n and attributes[i] != "=": | |
key += attributes[i] | |
i += 1 | |
i += 1 | |
if i < n and attributes[i] == '"': | |
i += 1 | |
while i < n and attributes[i] != '"': | |
value += attributes[i] | |
i += 1 | |
i += 1 | |
else: | |
while i < n and attributes[i] != ",": | |
value += attributes[i] | |
i += 1 | |
if not value: | |
args.append(key) | |
else: | |
attrs.update({key.lower(): value}) | |
i += 1 | |
if tag in TAG_NAME_MAPPING: | |
tag = TAG_NAME_MAPPING[tag] | |
return (tag, attrs, args) | |
class Twitch: | |
CLIENT_ID = "kimne78kx3ncx6brgo4mv6wki5h1ko" | |
GQL_URL = "https://gql.twitch.tv/gql" | |
def __init__(self, session: requests.Session): | |
self.session = session | |
self.logger = logging.getLogger("twitch") | |
def iter_channels(self, query: str, *, must_be_live=True, must_not_be_dj=True): | |
response = self.session.post( | |
Twitch.GQL_URL, | |
headers={ | |
"Client-Id": self.CLIENT_ID, | |
}, | |
json=[ | |
{ | |
"operationName": "SearchTray_SearchSuggestions", | |
"variables": { | |
"queryFragment": query, | |
"withOfflineChannelContent": not must_be_live, | |
"includeIsDJ": not must_not_be_dj, | |
}, | |
"extensions": { | |
"persistedQuery": { | |
"version": 1, | |
"sha256Hash": "2749d8bc89a2ddd37518e23742a4287becd3064c40465d8b57317cabd0efe096", | |
} | |
}, | |
} | |
], | |
) | |
for edge in response.json()[0]["data"]["searchSuggestions"]["edges"]: | |
if edge is None or edge["node"]["content"] is None: | |
continue | |
yield edge["node"]["content"]["login"] | |
def iter_stream_urls(self, username: str): | |
response = self.session.post( | |
Twitch.GQL_URL, | |
headers={ | |
"Client-Id": self.CLIENT_ID, | |
}, | |
json={ | |
"operationName": "PlaybackAccessToken_Template", | |
"query": 'query PlaybackAccessToken_Template($channel: String!) { streamPlaybackAccessToken(channelName: $channel, params: {platform: "ios", playerType: "site" }) { value signature authorization { isForbidden forbiddenReasonCode } __typename }}', | |
"variables": { | |
"channel": username, | |
}, | |
}, | |
) | |
access_token = response.json()["data"]["streamPlaybackAccessToken"] | |
token = access_token["value"] | |
signature = access_token["signature"] | |
hls_playlist_url = urllib.parse.urlunparse( | |
( | |
"https", | |
"usher.ttvnw.net", | |
f"/api/channel/hls/{username.lower()}.m3u8", | |
"", | |
urllib.parse.urlencode( | |
{ | |
"allow_source": "true", | |
"sig": signature, | |
"token": token, | |
"fast_bread": "true", | |
} | |
), | |
"", | |
) | |
) | |
self.logger.debug("%r: %s", username, hls_playlist_url) | |
stream_attrs = None | |
for media in self.session.get(hls_playlist_url).text.split("\n"): | |
if media[:12] == "#EXT-X-MEDIA": | |
stream_attrs = {} | |
if stream_attrs is None: | |
continue | |
if media[:1] == "#": | |
type_of, attrs, _ = parse_stream_attrs(media) | |
stream_attrs.update({type_of: attrs}) | |
else: | |
stream_attrs.update({"url": media}) | |
yield stream_attrs | |
stream_attrs = None | |
class TwitchStreamIterator: | |
def __init__(self, session: requests.Session, stream: str): | |
self.session = session | |
self.stream = stream | |
self.current_sequence = None | |
def iter_from_sequence(self, our_sequence=None): | |
try: | |
response = self.session.get( | |
self.stream, | |
headers={ | |
"Origin": "https://www.twitch.tv", | |
"Referer": "https://www.twitch.tv/", | |
}, | |
) | |
response.raise_for_status() | |
except (requests.exceptions.HTTPError, requests.exceptions.ConnectionError): | |
return | |
next_has_stream = False | |
for media in response.text.split("\n"): | |
if media[:1] == "#": | |
tag, _, args = parse_stream_attrs(media) | |
if tag == "media_sequence": | |
self.current_sequence = int(args[0]) | |
if our_sequence is None: | |
our_sequence = self.current_sequence | |
if tag == "information": | |
if args[0][-4:] == "live": | |
next_has_stream = True | |
else: | |
print("Encountering ads, trying to skip.") | |
if tag == "low_latency": | |
yield (our_sequence, args[0]) | |
our_sequence += 1 | |
self.current_sequence += 1 | |
else: | |
if next_has_stream: | |
if self.current_sequence >= our_sequence: | |
our_sequence = max(our_sequence, self.current_sequence) | |
yield (our_sequence, media) | |
our_sequence += 1 | |
self.current_sequence += 1 | |
next_has_stream = False | |
def iter_indefinitely(self, last: int = None, *, max_poll: int = None): | |
delta = time.perf_counter() | |
sequence = list(self.iter_from_sequence()) | |
if last is not None: | |
sequence = sequence[-last:] | |
n = 0 | |
for n, stream in sequence: | |
yield (n, stream) | |
our_sequence = n + 1 | |
while True: | |
if max_poll is not None: | |
sleep_for = max(0, max_poll - (time.perf_counter() - delta)) | |
time.sleep(sleep_for) | |
delta = time.perf_counter() | |
sequence = list(self.iter_from_sequence(our_sequence)) | |
if sequence: | |
last_n = sequence[-1][0] | |
diff = last_n - our_sequence | |
if last is not None and diff > last: | |
print( | |
f"Exceeded delta ({diff}>{last}), seeking near {last} with respect to the end." | |
) | |
sequence = sequence[-last:] | |
if our_sequence > last_n + 1 + len(sequence): | |
print( | |
f"Discontinuity detected (if persistent, hike the live edge to {len(sequence)=}):", | |
our_sequence, | |
last_n + 1, | |
) | |
our_sequence = last_n + 1 | |
for n, stream in sequence: | |
yield (n, stream) | |
if __name__ == "__main__": | |
import sys | |
if len(sys.argv) < 2: | |
print(f"Usage: {sys.argv[0]} [LIVE_USER]", file=sys.__stderr__) | |
exit(1) | |
user = sys.argv[1] | |
session = requests.Session() | |
twitch = Twitch(session) | |
options = list(twitch.iter_channels(user)) | |
if not options: | |
print( | |
"Either that user does not exist or is not live at the moment.", | |
file=sys.__stderr__, | |
) | |
exit(1) | |
stream_user = options[0] | |
streams = list(twitch.iter_stream_urls(stream_user)) | |
if not streams: | |
print( | |
"No streams found. The code may be broken.", | |
file=sys.__stderr__, | |
) | |
exit(1) | |
best = streams[0] | |
print(f"Playing a {best['media']['name']} stream.") | |
mpv = subprocess.Popen( | |
[ | |
"mpv", | |
"-", | |
"--cache=no", | |
"--profile=low-latency", | |
"--untimed", | |
f"--force-media-title=You are watching: {stream_user}", | |
], | |
stdin=subprocess.PIPE, | |
) | |
streamer = TwitchStreamIterator( | |
session, | |
best["url"], | |
) | |
chunk_size = 8192 | |
for n, stream in streamer.iter_indefinitely(last=1): | |
playing = True | |
while playing: | |
try: | |
response = session.get( | |
stream, | |
stream=True, | |
) | |
response.raise_for_status() | |
for chunk in response.iter_content(chunk_size): | |
mpv.stdin.write(chunk) | |
mpv.stdin.flush() | |
playing = False | |
except requests.exceptions.ConnectionError: | |
print("Twitch is being naughty, retrying for stream:", n) | |
except BrokenPipeError: | |
print("Cannot stream any data anymore :(.") | |
exit(0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment