Last active
December 25, 2015 08:29
-
-
Save qrtt1/6946660 to your computer and use it in GitHub Desktop.
A ydl-signature server for example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
reload(sys) | |
sys.setdefaultencoding('utf8') | |
import logging | |
import os | |
from twisted.internet import reactor | |
from twisted.web.resource import Resource | |
from twisted.web.server import Site | |
from multiprocessing import Process | |
import urllib2, urllib | |
from types import StringType, UnicodeType | |
import re | |
from ydl_utils import * | |
logging.basicConfig(level=logging.DEBUG) | |
logger = logging.getLogger(__name__) | |
origin_urlopen = None | |
class YoutubeSignaturePage(Resource): | |
isLeaf = True | |
def __init__(self): | |
self.count = 500 | |
pass | |
def render_GET(self, request): | |
return self.render_POST(request) | |
def render_POST(self, request): | |
if "/count" == request.path: | |
return "%d" % self.count | |
return self.handle_ydl_signature(request) | |
def count_down_for_dead(self): | |
self.count -= 1 | |
if self.count <= 0: | |
if os.path.isdir('/home/ec2-user'): | |
import time | |
print "will shutdown" | |
time.sleep(25) | |
os.system('sudo shutdown -h now') | |
pass | |
def prepare_user_data(self, url, request): | |
# find video_id in pattern | |
# .../v/${video_id}$ | |
matcher_for_video_id = re.search(".*\/v\/(.*)$", url) | |
if matcher_for_video_id is None: | |
logger.debug("cannot find the video_id for url: %s" % url ) | |
return | |
video_id = matcher_for_video_id.groups()[0] | |
has_fetch_data = False | |
if "video_url" in request.args: | |
logger.debug("found data for video_url") | |
if "embedded_url" in request.args: | |
logger.debug("found data for embedded_url") | |
if "detail_url" in request.args: | |
logger.debug("found data for detail_url") | |
has_fetch_data = True | |
if not has_fetch_data: | |
logger.debug("prefetch data is not good for using") | |
return | |
def write(video_id, name, data): | |
with open("%s_%s" % (video_id, name), "wb") as f: | |
f.write(data) | |
write(video_id, 'video_url', request.args['video_url'][0]) | |
write(video_id, 'embedded_url', request.args['embedded_url'][0]) | |
write(video_id, 'detail_url', request.args['detail_url'][0]) | |
def handle_ydl_signature(self, request): | |
url = request.args['url'][0] | |
sep = request.args['sep'][0] | |
signatures = request.args['signatures'][0] | |
# prepare the user pre-fetch data for server | |
# the client could fetch video_url, embedded_url and detail_url | |
# for server in order to bypass the server side geo-blocking problem | |
self.prepare_user_data(url, request) | |
# we define a request limitation for accesssing youtube | |
# simple counting the request times and try to shutdown server, | |
# if and only if the server found the path on the server | |
# /home/ec2-user and reach the limitation | |
self.count_down_for_dead() | |
ie = ManInMiddleYoutubeInfoExtractor(FakeDownloader()) | |
ie.extract(url) | |
if not ie.found_signature: | |
return "no signature required" | |
decrypt_signatures = [] | |
for s in signatures.split(sep): | |
decrypted = s | |
try: | |
decrypted = ie.do_decrypt_signature(s) | |
except Exception as e: | |
pass | |
decrypt_signatures += [decrypted] | |
ret = ("%s" % sep).join(decrypt_signatures) | |
return ret.encode('utf-8') | |
def find_video_id(url): | |
# /v/ pattern | |
# example: http://www.youtube.com/v/8PvebsWcpto | |
m = re.search(".*\/v\/(.*)&", url) | |
if m: | |
return m.groups()[0] | |
# v= pattern | |
# example: https://www.youtube.com/watch?v=8PvebsWcpto&gl=US&hl=en&has_verified=1 | |
m = re.search(".*v=([^&]+)&", url) | |
if m: | |
return m.groups()[0] | |
# v= pattern | |
# example: https://www.youtube.com/get_video_info?&video_id=8PvebsWcpto&el=... | |
m = re.search(".*video_id=([^&]+)&", url) | |
if m: | |
return m.groups()[0] | |
return None | |
def peep_urlopen(request): | |
global origin_urlopen | |
input_type = type(request) | |
url = None | |
if input_type is StringType or input_type is UnicodeType: | |
url = request | |
else: | |
url = request.get_full_url() | |
logger.info("[peep] url: %s" % url ) | |
video_id = find_video_id(url) | |
logger.info("[peep] find video_id %s" % video_id) | |
def build_response(url, video_id, suffix): | |
filename = "%s_%s" % (video_id, suffix) | |
logger.debug("[peep] use local content[%s] for url %s" % (filename, url)) | |
return urllib.addinfourl(open(filename, "rb"), {}, url) | |
if video_id: | |
if video_id in url and "embedded" in url: | |
return build_response(url, video_id, "embedded_url") | |
if video_id in url and "detailpage" in url: | |
return build_response(url, video_id, "detail_url") | |
if video_id in url and "https://www.youtube.com/watch?v=" in url: | |
return build_response(url, video_id, "video_url") | |
response = origin_urlopen(request) | |
logger.debug("no cache data, download url: %s" % url) | |
return response | |
def change_urlopen_behavior(): | |
global origin_urlopen | |
origin_urlopen = urllib2.urlopen | |
logger.debug("before to replace urlopen %s" % urllib2.urlopen) | |
urllib2.urlopen = peep_urlopen | |
logger.debug("after to replace urlopen %s" % urllib2.urlopen) | |
if __name__ == "__main__": | |
args = sys.argv | |
port = 8000 | |
if len(args) == 2: | |
try: | |
port = int(args[1]) | |
except Error: | |
logger.debug("bad port number: %s" % args[1]) | |
pass | |
logger.debug("open http server with port: %d" % port) | |
# hacking urlopen to provide user upload's data | |
change_urlopen_behavior() | |
factory = Site(YoutubeSignaturePage()) | |
reactor.listenTCP(port, factory) | |
reactor.run() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
from youtube_dl.extractor import YoutubeIE | |
from youtube_dl.utils import write_string | |
logging.basicConfig(level=logging.DEBUG) | |
logger = logging.getLogger(__name__) | |
class FakeDownloader(object): | |
def __init__(self): | |
self.params={} | |
pass | |
def report_warning(self, msg): | |
logger.warn(msg) | |
def to_screen(self, msg): | |
logger.debug(msg) | |
class ManInMiddleYoutubeInfoExtractor(YoutubeIE): | |
def __init__(self, *args, **kwargs): | |
super(ManInMiddleYoutubeInfoExtractor, self).__init__(*args, **kwargs) | |
self.found_signature = False | |
def _decrypt_signature(self, s, video_id, player_url, age_gate=False): | |
self.info = {'video_id':video_id, 'player_url':player_url, 'age_gate':age_gate} | |
self.found_signature = True | |
return super(ManInMiddleYoutubeInfoExtractor, self)._decrypt_signature(s, video_id, player_url, age_gate) | |
def do_decrypt_signature(self, s): | |
return super(ManInMiddleYoutubeInfoExtractor, self)._decrypt_signature(s, self.info['video_id'], self.info['player_url'], self.info['age_gate']) | |
if __name__ == "__main__": | |
sig = "557685D7DAFFE2255E27A9C04B56A90BA15D9E88.838872FC20960C110126A500B5D55899F5F45B7D7D" | |
url = "http://www.youtube.com/v/TWHNr0BrNgo" | |
ie = ManInMiddleYoutubeInfoExtractor(FakeDownloader()) | |
ie.extract(url) | |
if ie.found_signature: | |
print ie.do_decrypt_signature(sig) | |
else: | |
print "no signature" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment