Last active
August 21, 2016 18:57
-
-
Save alxrz/de3253432ccf09144db0e550b09597af to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-q "query" - Search term | |
-p 1 - <optional> number of pages to scrape (50 results per page) | |
-r playlist,channel - <optional> recursively extract videos from channels or playlists, default is off (be careful some channels / playlists may have thousands of videos, api is slow to process all) | |
--headers true - print with headers or not (false value) | |
Examples: | |
prints to console itself: | |
python youtube_search.py -q "async await" -p 1 -r playlist --headers false | |
in order to print to file (csv is the best, file doesn't have to exist) put at the end: | |
python youtube_search.py -q "async await" -p 1 -r playlist --headers false > some_file.csv | |
Get to file (windows): | |
Win + R > type cmd and click enter > use "cd folder_name" to navigate to desired folder with script | |
Requirements: | |
install python (version <3.0, haven't tested with higher versions) | |
install package manager (http://stackoverflow.com/questions/4750806/how-do-i-install-pip-on-windows) | |
install youtube api by running "pip install --upgrade apiclient" | |
if you get an error install also: "pip install --upgrade argparse" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from apiclient.discovery import build | |
from apiclient.errors import HttpError | |
import argparse | |
from math import ceil | |
import csv | |
import sys | |
reload(sys) | |
sys.setdefaultencoding('utf-8') | |
DEVELOPER_KEY = "AIzaSyCM2TfAq7A7u8bGiNtpBquVJR1QZtyrmNQ" | |
YOUTUBE_API_SERVICE_NAME = "youtube" | |
YOUTUBE_API_VERSION = "v3" | |
youtube = build(YOUTUBE_API_SERVICE_NAME, YOUTUBE_API_VERSION, | |
developerKey=DEVELOPER_KEY) | |
def youtube_search(options): | |
videos = [] | |
page = 0 | |
search_request = youtube.search().list( | |
q=options.query, | |
part="id,snippet", | |
maxResults=50 | |
) | |
type_mapping = { | |
"youtube#video":"videoId", | |
"youtube#channel":"channelId", | |
"youtube#playlist":"playlistId" | |
} | |
channels = [] | |
playlists = [] | |
playlist_titles = [] | |
while search_request and page < options.pages: | |
search_response = search_request.execute() | |
for sr in search_response.get("items", []): | |
kind = sr["id"]["kind"] | |
rid = sr["id"][type_mapping.get(sr["id"]["kind"])] | |
videos.append([kind, sr["snippet"]["title"], rid]) | |
if kind == "youtube#playlist": | |
playlists.append(rid) | |
playlist_titles.append(sr["snippet"]["title"]) | |
elif kind == "youtube#channel": | |
channels.append(rid) | |
search_request = youtube.search().list_next(search_request, search_response) | |
page += 1 | |
wr = csv.writer(sys.stdout, delimiter=',', lineterminator='\n') | |
headers = ['type', 'title', 'id'] | |
if options.headers: | |
if options.recursive: | |
headers.extend(['channel title', 'channel id']) | |
wr.writerow(headers) | |
wr.writerows(videos) | |
if options.recursive and 'channel' in options.recursive: | |
print_videos_from_channels(channels) | |
if options.recursive and 'playlist' in options.recursive: | |
print_videos_from_playlists(playlists, playlist_titles) | |
### gets all playlists from a channel and prints out ### | |
# returns nothing | |
# could be faster by using search with channelId param and type youtube#video | |
def print_videos_from_channels(channels=[]): | |
for channel in channels: | |
print '\n', 'Channel Id (%s)' % channel, '\n' | |
playlists = get_playlists_from_channel(channel) | |
playlist_ids = [v[0] for v in playlists] | |
titles = [v[1] for v in playlists] | |
print_videos_from_playlists(playlist_ids, titles) | |
### get list of playlists from a channel ### | |
# Returns id, title | |
def get_playlists_from_channel(channel): | |
search_request = youtube.playlists().list( | |
channelId=channel, | |
part="id,snippet", | |
maxResults=50 | |
) | |
listIds = [] | |
playlists = [] | |
while search_request: | |
search_results = search_request.execute() | |
for sr in search_results.get("items",[]): | |
if sr["kind"] == "youtube#playlist": | |
playlists.append([sr['id'], sr['snippet']['title']]) | |
search_request = youtube.playlistItems().list_next(search_request, search_results) | |
return playlists | |
##### print videos from all playlists ##### | |
def print_videos_from_playlists(playlist_ids, titles=None): | |
with_titles = (len(titles) == len(playlist_ids)) | |
wr = csv.writer(sys.stdout, delimiter=',', lineterminator='\n') | |
for i in range(0, len(playlist_ids)): | |
title = titles[i] if with_titles else "" | |
playlist = playlist_ids[i] | |
videos = [] | |
print '\n', "Playlist: %s (%s)" % (title, playlist), '\n' | |
playlist_videos = get_videos_from_playlist(playlist) | |
# parse additional params | |
for vid in playlist_videos: | |
videos.append([ | |
"youtube#video", vid["snippet"]["title"], vid["snippet"]["resourceId"]["videoId"], | |
vid["snippet"]["channelTitle"], vid["snippet"]["channelId"] | |
]) | |
wr.writerows(videos) | |
##### get videos from a playlist ##### | |
## returns item object | |
def get_videos_from_playlist(playlist): | |
params = dict( | |
part="id,snippet", | |
playlistId=playlist, | |
maxResults=50 | |
) | |
videos = [] | |
max_pages = 4 # max 200 videos per playlist | |
page = 0 | |
search_request = youtube.playlistItems().list(**params) | |
while search_request and page < max_pages: | |
search_results = search_request.execute() | |
for sr in search_results.get("items", []): | |
if sr["kind"] == "youtube#playlistItem" and sr["snippet"]["resourceId"]["kind"] == "youtube#video": | |
videos.append(sr) | |
search_request = youtube.playlistItems().list_next(search_request, search_results) | |
page += 1 | |
return videos | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser() | |
parser.add_argument("-q", "--query", help="Search term") | |
parser.add_argument("-p", "--pages", help="Optional: Number of returned pages (50 per page)", type=int, default=1) | |
parser.add_argument("-r", "--recursive", help="Optional: Recursively extract videos from playlist|channel, use comma. Default is off", default=None) | |
parser.add_argument("--headers", help="Print out with headers true|false") | |
args = parser.parse_args() | |
args.headers = False if args.headers == 'false' else True | |
# print print_videos_from_channels(["UCCgsgo7EmZHJ2tmSLROXspQ"]) | |
# print print_videos_from_playlists([""]) | |
youtube_search(args) | |
try: | |
pass | |
except HttpError, e: | |
print "An HTTP error %d occurred:\n%s" % (e.resp.status, e.content) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment