Last active
August 29, 2015 14:07
-
-
Save nucular/a21a7b0c05486c21d5a5 to your computer and use it in GitHub Desktop.
Python Youtube tools
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
The MIT License (MIT) | |
Copyright (c) 2014 nucular | |
Permission is hereby granted, free of charge, to any person obtaining a copy | |
of this software and associated documentation files (the "Software"), to deal | |
in the Software without restriction, including without limitation the rights | |
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
copies of the Software, and to permit persons to whom the Software is | |
furnished to do so, subject to the following conditions: | |
The above copyright notice and this permission notice shall be included in all | |
copies or substantial portions of the Software. | |
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
SOFTWARE. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
# -*- coding: utf-8 -*- | |
# python3 {{f}} | |
import urllib.request | |
import urllib.parse | |
import json | |
import time | |
def search(query, order_by="relevance", max_results=None, start_index=None, key=None, verbose=False): | |
"""Use the YQL service to search for YouTube videos""" | |
# build the YQL query | |
yql = "SELECT * FROM youtube.search WHERE query=\"{}\"".format(query.replace("\"", "\\\"")) | |
if max_results: | |
yql += " AND max_results={:d}".format(max_results) | |
if start_index: | |
yql += " AND start_index={:d}".format(start_index) | |
if key: | |
yql += " AND key=\"{}\"".format(query.replace("\"", "\\\"")) | |
if max_results: | |
yql += " limit {0:d}".format(max_results) | |
if verbose: print("[videosearch] Query: " + yql) | |
# build the URL query | |
q = urllib.parse.urlencode([ | |
("q", yql), | |
("format", "json"), | |
("env", "store://datatables.org/alltableswithkeys"), | |
("diagnostics", "true"), | |
("debug", "true") | |
]) | |
tries = 5 | |
wait = 0.1 | |
while tries > 0: | |
# love me some YQL | |
res = urllib.request.urlopen("https://query.yahooapis.com/v1/public/yql?" + q) | |
if res.status != 200: | |
raise RuntimeError("YQL responded with code {}".format(res.status)) | |
text = res.read().decode() | |
if verbose: print("[videosearch] Got response") | |
parsed = json.loads(text) | |
if parsed["query"]["results"] != None: | |
return parsed["query"]["results"]["video"] | |
else: | |
for i in parsed["query"]["diagnostics"]["url"]: | |
if "http-status-code" in i and i["http-status-code"] == "403": | |
# Youtube told us to fuck off | |
failed = True | |
if failed: | |
if verbose: print("[videosearch] Fail on Youtube side, waiting {} seconds".format(wait)) | |
time.sleep(wait) | |
wait = wait * 2 | |
tries = tries - 1 | |
if verbose: print("[videosearch] Try {} of 5".format(tries)) | |
else: | |
# Youtube actually gave us no results | |
return [] | |
raise RuntimeError("Youtube has a bad day") | |
if __name__ == "__main__": | |
res = search("charlie bit my finger", verbose=True) | |
for i in res: | |
# ugh | |
print(str((i["id"] + ": " + i["title"]).encode("ascii", "ignore"))[2:-1]) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
# coding=utf-8 | |
# python3 {{f}} | |
import re | |
import http.cookiejar, urllib.request | |
from urllib.parse import unquote | |
class VideoInfo(object): | |
def __init__(self, id): | |
"""Initiate the instance and set a video ID""" | |
self.id = id | |
self._infos = {} | |
def __iter__(self, *args, **kwargs): | |
if self.ispopulated(): | |
return self._infos.__iter__(*args, **kwargs) | |
else: | |
raise Exception("VideoInfo is not populated yet") | |
def __dict__(self): | |
if self.ispopulated(): | |
return self._infos | |
else: | |
raise Exception("VideoInfo is not populated yet") | |
def fetch(self, cookiejar=None, useragent=None): | |
""" | |
Fetch the video data for the ID associated with this instance from | |
YouTube, parse it and populate the instance with the result. | |
""" | |
if not cookiejar: | |
cookiejar = http.cookiejar.CookieJar() | |
opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cookiejar)) | |
res = opener.open("https://youtube.com/get_video_info?video_id={}".format(self.id)) | |
if useragent: | |
opener.addheaders = [("User-agent", useragent)] | |
if res.status != 200: | |
raise RuntimeError("YouTube responded with %s" % res.status) | |
b = res.read() | |
parsed = self.parse(b.decode("utf-8")) | |
self.populate(parsed) | |
return cookiejar | |
def ispopulated(self): | |
"""Check if the instance actually contains video info data.""" | |
return self._infos != {} | |
def populate(self, d): | |
"""Add a dictionary of video info data to the instance.""" | |
if self.ispopulated(): | |
self.depopulate() | |
self._infos = d | |
for k, v in d.items(): | |
setattr(self, k, v) | |
def depopulate(self): | |
"""Remove the current video info data from the instance.""" | |
if self.ispopulated(): | |
for k, v in self._infos.items(): | |
delattr(self, v) | |
self._infos = {} | |
def parse(self, t): | |
"""Parse video infos in the query format used by YouTube to a dict.""" | |
parsed = {} | |
# Kinda like urllib.parse.parse_qs and parse_qsl, but can use | |
# multiple separators | |
def parse_qs(t, sep="&"): | |
parsed = {} | |
sep = re.escape(sep) | |
split = filter(None, re.split("[:" + sep + "]+", t)) | |
for i in split: | |
k, v = i.split("=", 1) | |
parsed[k] = unquote(v) | |
return parsed | |
def parse_qsl(t, sep="&"): | |
parsed = [] | |
sep = re.escape(sep) | |
split = filter(None, re.split("[:" + sep + "]+", t)) | |
for i in split: | |
k, v = i.split("=", 1) | |
parsed.append((k, unquote(v))) | |
return parsed | |
parsed = parse_qs(t) | |
afmts = parsed["adaptive_fmts"] | |
afmts = parse_qsl(afmts, "&,") | |
parsed_afmts = [] | |
current = {} | |
for i, (k, v) in enumerate(afmts): | |
if k == "url": | |
if "url" in current: | |
parsed_afmts.append(current) | |
current = {"url": v} | |
else: | |
# convert some keys | |
if k in ["bitrate", "clen", "fps", "itag", "lmt"]: | |
v = int(v) | |
elif k == "index" or k == "init": | |
v = [int(i) for i in v.split("-")] | |
elif k == "size": | |
split = v.split("x") | |
v = {"w": int(split[0]), "h": int(split[1])} | |
elif k == "type": | |
split = v.split(";", 1) | |
if len(split) == 2: | |
v = (split[0], [i.strip("+") for i in split[1][9:-1].split(",")]) | |
else: | |
v = tuple(split) | |
current[k] = v | |
parsed["adaptive_fmts"] = parsed_afmts | |
fmtmap = parsed["url_encoded_fmt_stream_map"] | |
fmtmap = parse_qsl(fmtmap, "&,") | |
parsed_fmts = [] | |
current = {} | |
for i, (k, v) in enumerate(fmtmap): | |
if k == "itag": | |
if "url" in current: | |
parsed_fmts.append(current) | |
current = {"itag": int(v)} | |
else: | |
# convert some keys | |
if k == "type": | |
split = v.split(";", 1) | |
if len(split) == 2: | |
v = (split[0], [i.strip("+") for i in split[1][9:-1].split(",")]) | |
else: | |
v = tuple(split) | |
current[k] = v | |
# I really hate this key name, rename it to "fmts" | |
del parsed["url_encoded_fmt_stream_map"] | |
parsed["fmts"] = parsed_fmts | |
# convert most keys to a more pythonic format (I've estimated these) | |
types = { | |
"ad_device": int, | |
"ad_flags": int, | |
"ad_logging_flag": int, | |
"ad_slots": lambda t: [int(i) for i in t.split(",")], | |
"afv": bool, | |
"allow_embed": bool, | |
"allow_html5_ads": bool, | |
"allow_ratings": bool, | |
"as_launched_in_country": bool, | |
"avg_rating": float, | |
"cc3_module": int, | |
"cc_asr": int, | |
"cc_font": lambda t: [i.replace("+", " ") for i in t.split(",")], | |
"cid": int, | |
"cl": int, | |
"cut_ad_for_ypc": bool, | |
"dash": bool, | |
"enablecsi": bool, | |
"fexp": lambda t: [int(i) for i in t.split(",")], | |
"fmt_list": lambda t: t.split(","), | |
"focEnabled": bool, | |
"has_cc": bool, | |
"idpj": int, | |
"instream_long": bool, | |
"iv3_module": int, | |
"iv_allow_in_place_switch": bool, | |
"iv_load_policy": int, | |
"keywords": lambda t: [i.replace("+", " ") for i in t.split(",")], | |
"ldpj": int, | |
"length_seconds": int, | |
"loeid": lambda t: [int(i) for i in t.split(",")], | |
"logwatch": bool, | |
"midroll_freqcap": float, | |
"midroll_prefetch_size": int, | |
"loudness": float, | |
"muted": bool, | |
"no_get_video_log": bool, | |
"rmktEnabled": bool, | |
"sentiment": int, | |
"sffb": bool, | |
"show_content_thumbnail": bool, | |
"subscribed": int, | |
"tag_for_child_directed": bool, | |
"timestamp": int, | |
"title": lambda t: t.replace("+", " "), | |
"tmi": int, | |
"use_cipher_signature": bool, | |
"video_verticals": lambda t: [int(i) for i in t[1:-1].split(",")], | |
"view_count": int, | |
"watermark": lambda t: [i for i in filter(len, t.split(","))], | |
"ytfocEnabled": bool | |
} | |
for k,v in types.items(): | |
if k in parsed: | |
parsed[k] = v(parsed[k]) | |
return parsed | |
if __name__ == "__main__": | |
import pprint | |
vi = VideoInfo("_OBlgSz8sSM") | |
vi.fetch() | |
pprint.pprint(vi._infos) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment