Last active
June 20, 2018 09:16
-
-
Save larryhou/e7890f5b83c16819857c3d4c296a3653 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import requests, json, argparse, sys, time, re, urllib | |
YOUTUBE_API_KEY = 'AIzaSyCyLSmcEDJt3HaLFK0_LdJYPkq0RFAVzKA' | |
CHANNEL_SETTING = [ | |
'UCQT2Ai7hQMnnvVTGd6GdrOQ', | |
'UCO3pO3ykAUybrjv3RBbXEHw', | |
'UCkWfzJTG5j-V8gTJQgEOexA', | |
'UCdRKafyb--geO9ySg6CbhYA', | |
'UCtAIPjABiQD3qjlEl1T5VpA' | |
] | |
def tojson(data): | |
return json.dumps(data, ensure_ascii=False, indent=4) | |
def parse_multiple_param(value): | |
if isinstance(value, tuple) or isinstance(value, list): | |
return ','.join([str(x) for x in value]) | |
return value | |
def query_api_channel(id, part = 'snippet,contentDetails,statistics'): | |
params = { | |
'id':id, | |
'part':part, | |
'key':YOUTUBE_API_KEY | |
} | |
response = requests.get('https://www.googleapis.com/youtube/v3/channels', params=params) | |
assert response.status_code == 200, tojson(response.json()) | |
print(tojson(response.json())) | |
def query_api_video(id, part = 'snippet,contentDetails,statistics'): | |
params = { | |
'id': id, | |
'part': part, | |
'key': YOUTUBE_API_KEY | |
} | |
response = requests.get('https://www.googleapis.com/youtube/v3/videos', params=params) | |
assert response.status_code == 200, tojson(response.json()) | |
print(tojson(response.json())) | |
def query_api_search(channel, part = 'snippet', max_result = 50): | |
params = { | |
'channelId': channel, | |
'part': part, | |
'maxResults':max_result, | |
'order':'date', | |
'safeSearch':'none', | |
'type':'video', | |
'key': YOUTUBE_API_KEY | |
} | |
response = requests.get('https://www.googleapis.com/youtube/v3/search', params=params) | |
assert response.status_code == 200, tojson(response.json()) | |
return response.json() | |
# print(tojson(response.json())) | |
def query_api_playlist(channel, part = 'snippet,contentDetails,status', max_result = 50): | |
params = { | |
'channelId': channel, | |
'part': part, | |
'maxResults':max_result, | |
'key': YOUTUBE_API_KEY | |
} | |
response = requests.get('https://www.googleapis.com/youtube/v3/playlists', params=params) | |
assert response.status_code == 200, tojson(response.json()) | |
return response.json() | |
# print(tojson(response.json())) | |
def query_api_playlist_items(id, part = 'snippet,contentDetails', max_result = 10): | |
params = { | |
'playlistId': id, | |
'part': part, | |
'maxResults':max_result, | |
'key': YOUTUBE_API_KEY | |
} | |
response = requests.get('https://www.googleapis.com/youtube/v3/playlistItems', params=params) | |
assert response.status_code == 200, tojson(response.json()) | |
return response.json() | |
# print(tojson(response.json())) | |
def query_api_category(region_code, part = 'snippet'): | |
params = { | |
'regionCode': region_code, | |
'part': part, | |
'key': YOUTUBE_API_KEY | |
} | |
response = requests.get('https://www.googleapis.com/youtube/v3/videoCategories', params=params) | |
assert response.status_code == 200, tojson(response.json()) | |
print(tojson(response.json())) | |
def decode_parameters(query_string): | |
result = {} | |
for param in query_string.split('&'): | |
kvpair = param.split('=') | |
if len(kvpair) == 2: | |
result[kvpair[0]] = urllib.request.unquote(kvpair[1]) | |
return result | |
def get_download_links(video_id): | |
params = decode_parameters('el=embedded&ps=default&eurl=&gl=US&hl=en') | |
params['video_id'] = video_id | |
response = requests.get('http://www.youtube.com/get_video_info', params=params) | |
video_info = decode_parameters(response.text) | |
download_list = [] | |
for item in video_info.get('adaptive_fmts').split(','): | |
download_info = decode_parameters(item) | |
download_list.append(download_info) | |
print(tojson(download_info)) | |
for item in video_info.get('url_encoded_fmt_stream_map').split(','): | |
download_info = decode_parameters(item) | |
download_list.append(download_info) | |
print(tojson(download_info)) | |
return download_list | |
def search_recent_videos(channel, max_result): | |
description_printed = False | |
result = query_api_search(channel=channel, max_result=max_result) | |
for item in result.get('items'): | |
snippet = item['snippet'] | |
title = snippet['title'] | |
if not description_printed: | |
description_printed = True | |
print('%s[%s]'%(snippet['channelId'], snippet['channelTitle'])) | |
get_download_links(item['id']['videoId']) | |
print('[%s]'%re.sub(r'\.\d+Z$', '', snippet['publishedAt']), 'https://www.youtube.com/watch?v=%s'%item['id']['videoId'], title) | |
def query_recent_videos(channel, time_span, max_result): | |
start_time = time.localtime(time.mktime(time.localtime()) - time_span) | |
video_list = [] | |
for playlist in query_api_playlist(channel=channel).get('items'): | |
playlist_id = playlist.get('id') | |
print('[PLAY]', playlist_id, playlist['snippet']['title']) | |
playlist_list = query_api_playlist_items(id=playlist_id, max_result=max_result).get('items') | |
playlist_list.sort(cmp=lambda a, b: -1 if a['contentDetails'].get('videoPublishedAt') > b['contentDetails'].get('videoPublishedAt') else 1) | |
for item in playlist_list: | |
detail = item.get('contentDetails') | |
if 'videoPublishedAt' not in detail: continue | |
title = item['snippet']['title'] | |
print(' [VIDEO]', detail['videoPublishedAt'], detail['videoId'], title) | |
time_string = re.sub(r'\.\d+Z$','', detail.get('videoPublishedAt')) | |
publish_time = time.strptime(time_string, '%Y-%m-%dT%H:%M:%S') | |
if publish_time >= start_time: | |
video_item = (detail.get('videoId'), publish_time, title) | |
video_list.append(video_item) | |
video_list.sort(cmp=lambda a,b: -1 if a[1] > b[1] else 1) | |
for n in range(len(video_list)): | |
video_item = video_list[n] | |
print(time.strftime('[%Y-%m-%d %H:%M:%S]', video_item[1]), 'https://www.youtube.com/watch?v=%s'%video_item[0], '\'%s\''%(video_item[-1])) | |
def parse_time_span(value, unit): | |
if unit == 's':return int(value) | |
if unit == 'm':return int(value) * 60 | |
if unit == 'h':return int(value) * 3600 | |
if unit == 'd':return int(value) * 3600 * 24 | |
return int(value) | |
def main(): | |
arguments = argparse.ArgumentParser() | |
arguments.add_argument('--channel', '-c', default='UCQT2Ai7hQMnnvVTGd6GdrOQ') | |
arguments.add_argument('--time-span', '-t', default=7, help='') | |
arguments.add_argument('--time-unit', '-u', default='d', choices=['s','m','h','d']) | |
arguments.add_argument('--max-result', '-m', default=10) | |
arguments.add_argument('--channel-index', '-i', type=int, choices=range(len(CHANNEL_SETTING))) | |
options = arguments.parse_args(sys.argv[1:]) | |
channel = options.channel | |
if options.channel_index is not None: channel = CHANNEL_SETTING[options.channel_index] | |
search_recent_videos(channel=channel, max_result=options.max_result) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment