Skip to content

Instantly share code, notes, and snippets.

@qrtt1
Last active December 25, 2015 08:29
Show Gist options
  • Save qrtt1/6946660 to your computer and use it in GitHub Desktop.
Save qrtt1/6946660 to your computer and use it in GitHub Desktop.
A ydl-signature server for example
import sys
reload(sys)
sys.setdefaultencoding('utf8')
import logging
import os
from twisted.internet import reactor
from twisted.web.resource import Resource
from twisted.web.server import Site
from multiprocessing import Process
import urllib2, urllib
from types import StringType, UnicodeType
import re
from ydl_utils import *
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
origin_urlopen = None
class YoutubeSignaturePage(Resource):
isLeaf = True
def __init__(self):
self.count = 500
pass
def render_GET(self, request):
return self.render_POST(request)
def render_POST(self, request):
if "/count" == request.path:
return "%d" % self.count
return self.handle_ydl_signature(request)
def count_down_for_dead(self):
self.count -= 1
if self.count <= 0:
if os.path.isdir('/home/ec2-user'):
import time
print "will shutdown"
time.sleep(25)
os.system('sudo shutdown -h now')
pass
def prepare_user_data(self, url, request):
# find video_id in pattern
# .../v/${video_id}$
matcher_for_video_id = re.search(".*\/v\/(.*)$", url)
if matcher_for_video_id is None:
logger.debug("cannot find the video_id for url: %s" % url )
return
video_id = matcher_for_video_id.groups()[0]
has_fetch_data = False
if "video_url" in request.args:
logger.debug("found data for video_url")
if "embedded_url" in request.args:
logger.debug("found data for embedded_url")
if "detail_url" in request.args:
logger.debug("found data for detail_url")
has_fetch_data = True
if not has_fetch_data:
logger.debug("prefetch data is not good for using")
return
def write(video_id, name, data):
with open("%s_%s" % (video_id, name), "wb") as f:
f.write(data)
write(video_id, 'video_url', request.args['video_url'][0])
write(video_id, 'embedded_url', request.args['embedded_url'][0])
write(video_id, 'detail_url', request.args['detail_url'][0])
def handle_ydl_signature(self, request):
url = request.args['url'][0]
sep = request.args['sep'][0]
signatures = request.args['signatures'][0]
# prepare the user pre-fetch data for server
# the client could fetch video_url, embedded_url and detail_url
# for server in order to bypass the server side geo-blocking problem
self.prepare_user_data(url, request)
# we define a request limitation for accesssing youtube
# simple counting the request times and try to shutdown server,
# if and only if the server found the path on the server
# /home/ec2-user and reach the limitation
self.count_down_for_dead()
ie = ManInMiddleYoutubeInfoExtractor(FakeDownloader())
ie.extract(url)
if not ie.found_signature:
return "no signature required"
decrypt_signatures = []
for s in signatures.split(sep):
decrypted = s
try:
decrypted = ie.do_decrypt_signature(s)
except Exception as e:
pass
decrypt_signatures += [decrypted]
ret = ("%s" % sep).join(decrypt_signatures)
return ret.encode('utf-8')
def find_video_id(url):
# /v/ pattern
# example: http://www.youtube.com/v/8PvebsWcpto
m = re.search(".*\/v\/(.*)&", url)
if m:
return m.groups()[0]
# v= pattern
# example: https://www.youtube.com/watch?v=8PvebsWcpto&gl=US&hl=en&has_verified=1
m = re.search(".*v=([^&]+)&", url)
if m:
return m.groups()[0]
# v= pattern
# example: https://www.youtube.com/get_video_info?&video_id=8PvebsWcpto&el=...
m = re.search(".*video_id=([^&]+)&", url)
if m:
return m.groups()[0]
return None
def peep_urlopen(request):
global origin_urlopen
input_type = type(request)
url = None
if input_type is StringType or input_type is UnicodeType:
url = request
else:
url = request.get_full_url()
logger.info("[peep] url: %s" % url )
video_id = find_video_id(url)
logger.info("[peep] find video_id %s" % video_id)
def build_response(url, video_id, suffix):
filename = "%s_%s" % (video_id, suffix)
logger.debug("[peep] use local content[%s] for url %s" % (filename, url))
return urllib.addinfourl(open(filename, "rb"), {}, url)
if video_id:
if video_id in url and "embedded" in url:
return build_response(url, video_id, "embedded_url")
if video_id in url and "detailpage" in url:
return build_response(url, video_id, "detail_url")
if video_id in url and "https://www.youtube.com/watch?v=" in url:
return build_response(url, video_id, "video_url")
response = origin_urlopen(request)
logger.debug("no cache data, download url: %s" % url)
return response
def change_urlopen_behavior():
global origin_urlopen
origin_urlopen = urllib2.urlopen
logger.debug("before to replace urlopen %s" % urllib2.urlopen)
urllib2.urlopen = peep_urlopen
logger.debug("after to replace urlopen %s" % urllib2.urlopen)
if __name__ == "__main__":
args = sys.argv
port = 8000
if len(args) == 2:
try:
port = int(args[1])
except Error:
logger.debug("bad port number: %s" % args[1])
pass
logger.debug("open http server with port: %d" % port)
# hacking urlopen to provide user upload's data
change_urlopen_behavior()
factory = Site(YoutubeSignaturePage())
reactor.listenTCP(port, factory)
reactor.run()
import logging
from youtube_dl.extractor import YoutubeIE
from youtube_dl.utils import write_string
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
class FakeDownloader(object):
def __init__(self):
self.params={}
pass
def report_warning(self, msg):
logger.warn(msg)
def to_screen(self, msg):
logger.debug(msg)
class ManInMiddleYoutubeInfoExtractor(YoutubeIE):
def __init__(self, *args, **kwargs):
super(ManInMiddleYoutubeInfoExtractor, self).__init__(*args, **kwargs)
self.found_signature = False
def _decrypt_signature(self, s, video_id, player_url, age_gate=False):
self.info = {'video_id':video_id, 'player_url':player_url, 'age_gate':age_gate}
self.found_signature = True
return super(ManInMiddleYoutubeInfoExtractor, self)._decrypt_signature(s, video_id, player_url, age_gate)
def do_decrypt_signature(self, s):
return super(ManInMiddleYoutubeInfoExtractor, self)._decrypt_signature(s, self.info['video_id'], self.info['player_url'], self.info['age_gate'])
if __name__ == "__main__":
sig = "557685D7DAFFE2255E27A9C04B56A90BA15D9E88.838872FC20960C110126A500B5D55899F5F45B7D7D"
url = "http://www.youtube.com/v/TWHNr0BrNgo"
ie = ManInMiddleYoutubeInfoExtractor(FakeDownloader())
ie.extract(url)
if ie.found_signature:
print ie.do_decrypt_signature(sig)
else:
print "no signature"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment