Last active
April 16, 2016 01:42
-
-
Save stefansundin/6261442 to your computer and use it in GitHub Desktop.
Python script to hax YouTube Live segments that are no longer accessible via the playlist. See http://stefansundin.com/blog/452#comments
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# https://gist.github.com/stefansundin/6261442 | |
# Python 3.x | |
# Binary search for the correct clen parameter for YouTube live streams, which is useful when a segment is no longer listed in the playlist file (check EXT-X-EARLIEST-MEDIA-SEQUENCE). | |
# It's quick and dirty. clen is always a multiple of 188 (since it is TS). lmt and dur does not have any impact it seems. | |
# Usually finds the correct url in about 15 tries. | |
# See http://stefansundin.com/blog/452#comments | |
# TODO: add threading to process multiple sequence numbers at the same time. | |
""" | |
Regexps to extract clen and dur from a playlist: | |
Search: #.+\n | |
Replace with empty string | |
Search : .+clen=(\d+).+ | |
Replace: \1 | |
Search : .+clen=(\d+).+dur=(\d).+ | |
Replace: \1\t\2 | |
Some dur variations: | |
dur=8.333: 30 fps, 249 frames, 396800 audio samples | |
same for dur=8.334 | |
dur=6.007: 30 fps, 179 frames, 284800 audio samples | |
dur=3.003: 30 fps, 89 frames, 140800 audio samples | |
dur=6: 25 fps, 148 frames, 284160 audio samples | |
dur=4: 25 fps, 73 frames, 140160 audio samples | |
audio is always 48000 Hz | |
""" | |
import re, urllib.request, sys, time | |
def verb(text): | |
if verbose: | |
print(text) | |
# I get '403 Forbidden' for some streams, but adding a cookie fixes that. Happens randomly. | |
opener = urllib.request.build_opener() | |
opener.addheaders = [("Cookie", "VISITOR_INFO1_LIVE=ktfPrjH8eoc;")] | |
url = "http://www.youtube.com/videoplayback/id/cE00Me8FOo0.1/itag/94/source/yt_live_broadcast/sq/4760/file/seg.ts?ratebypass=yes&cmbypass=yes&newshard=yes&hls_chunk_host=www.youtube.com&gir=yes&dg_shard=cE00Me8FOo0.1_94&playlist_type=DVR&maudio=1&pmbypass=yes&cp=U0hWS1hQT19HUENONl9QSlNKOm1xZVN0bkk3am1l&upn=aaTidjy5_gs&fexp=900064,910825,906000,909546,906397,929117,929121,929906,929907,929922,929127,929129,929131,929930,925720,925722,925718,925714,929917,929919,929933,912521,932306,913428,904830,919373,930803,908536,904122,938701,911423,909549,900816,912711,935802,904494&sver=3&cpn=uD8wf5_9faz8nSDl&ip=130.240.207.250&ipbits=8&expire=1376851926&sparams=ip,ipbits,expire,id,itag,source,ratebypass,live,cmbypass,newshard,hls_chunk_host,gir,dg_shard,playlist_type,maudio,pmbypass,cp&signature=4DF70AF061A666E47201AA68C65CD1313CEEE54C.979D6A004FE7B53E28C2E1FF271ACFE86ED02FEC&key=dg_yt0&live=1&lmt=1376829143883697&clen=647848&dur=6.007" | |
start = 1100 | |
num = 5 | |
# Note: you can't change the itag value since the signature will be incorrect, get a new url | |
itag = re.search("itag/(\d+)/", url) | |
if itag == None: | |
print("Warning: Could not find itag value (quality)") | |
else: | |
itag = itag.groups()[0] | |
itags = {"92":"240p", "93":"360p", "94":"480p", "95":"720p", "96":"1080p"} | |
if itag in itags: | |
print("Note: itag="+itag+" ("+itags[itag]+")") | |
else: | |
print("Warning: Unknown itag="+itag) | |
for segid in range(start,start+num): | |
url = re.sub("/sq/\d+/", "/sq/"+str(segid)+"/", url) | |
# will be multiplied with 188 | |
guess_min = 10 | |
guess_max = 22310 # for full HD this is can be about 4 MB | |
verbose = True | |
# sanity checks | |
# these checks do not always work for some reason (especially in HD) | |
""" | |
clen = 188*guess_min | |
verb("sanity check, guess clen="+str(clen)) | |
url = re.sub("clen=\d+", "clen="+str(clen), url) | |
f = opener.open(url) | |
data = f.read(100) | |
f.close() | |
if len(data) == 0: | |
print("segid="+str(segid)+": clen="+str(clen)+" did not work, this segment is probably no longer accessible.") | |
sys.exit(1) | |
clen = 188*guess_max | |
verb("sanity check, guess clen="+str(clen)) | |
url = re.sub("clen=\d+", "clen="+str(clen), url) | |
f = opener.open(url) | |
data = f.read(100) | |
f.close() | |
if len(data) != 0: | |
print("segid="+str(segid)+": clen="+str(clen)+" worked, please increase guess_max.") | |
sys.exit(1) | |
""" | |
chunks = None | |
for i in range(100): | |
verb("guess_min: "+str(guess_min)) | |
verb("guess_max: "+str(guess_max)) | |
if guess_min+1 == guess_max: | |
clen = 188*guess_min | |
break | |
clen = 188*int(guess_min+(guess_max-guess_min)/2) | |
verb("guess clen="+str(clen)) | |
url = re.sub("clen=\d+", "clen="+str(clen), url) | |
print(url) | |
f = opener.open(url) | |
newchunks = [] | |
recvsize = 0 | |
while True: | |
data = f.read(100000) | |
if not data: | |
break | |
newchunks.append(data) | |
recvsize += len(data) | |
f.close() | |
if recvsize == clen: | |
verb("success, clen might be larger") | |
guess_min = int(clen/188) | |
chunks = newchunks | |
else: | |
verb("fail, clen is smaller") | |
guess_max = int(clen/188) | |
verb("") | |
else: | |
print("segid="+str(segid)+": Sorry, couldn't find it after "+str(i+1)+" tries. Something is wrong.") | |
sys.exit(1) | |
verb("") | |
if chunks == None: | |
print("Could not find a valid url, none worked.") | |
sys.exit(1) | |
verb("clen="+str(clen)) | |
verb("Found the correct url after "+str(i+1)+" tries:") | |
url = re.sub("clen=\d+", "clen="+str(clen), url) | |
print("wget -O "+str(segid)+".ts \""+url+"\"") | |
verb("") | |
# write file | |
with open(str(segid)+".ts", "wb") as f: | |
for chunk in chunks: | |
f.write(chunk) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment