Skip to content

Instantly share code, notes, and snippets.

@larryhou
Last active June 20, 2018 09:16
Show Gist options
  • Save larryhou/e7890f5b83c16819857c3d4c296a3653 to your computer and use it in GitHub Desktop.
Save larryhou/e7890f5b83c16819857c3d4c296a3653 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import requests, json, argparse, sys, time, re, urllib
YOUTUBE_API_KEY = 'AIzaSyCyLSmcEDJt3HaLFK0_LdJYPkq0RFAVzKA'
CHANNEL_SETTING = [
'UCQT2Ai7hQMnnvVTGd6GdrOQ',
'UCO3pO3ykAUybrjv3RBbXEHw',
'UCkWfzJTG5j-V8gTJQgEOexA',
'UCdRKafyb--geO9ySg6CbhYA',
'UCtAIPjABiQD3qjlEl1T5VpA'
]
def tojson(data):
return json.dumps(data, ensure_ascii=False, indent=4)
def parse_multiple_param(value):
if isinstance(value, tuple) or isinstance(value, list):
return ','.join([str(x) for x in value])
return value
def query_api_channel(id, part = 'snippet,contentDetails,statistics'):
params = {
'id':id,
'part':part,
'key':YOUTUBE_API_KEY
}
response = requests.get('https://www.googleapis.com/youtube/v3/channels', params=params)
assert response.status_code == 200, tojson(response.json())
print(tojson(response.json()))
def query_api_video(id, part = 'snippet,contentDetails,statistics'):
params = {
'id': id,
'part': part,
'key': YOUTUBE_API_KEY
}
response = requests.get('https://www.googleapis.com/youtube/v3/videos', params=params)
assert response.status_code == 200, tojson(response.json())
print(tojson(response.json()))
def query_api_search(channel, part = 'snippet', max_result = 50):
params = {
'channelId': channel,
'part': part,
'maxResults':max_result,
'order':'date',
'safeSearch':'none',
'type':'video',
'key': YOUTUBE_API_KEY
}
response = requests.get('https://www.googleapis.com/youtube/v3/search', params=params)
assert response.status_code == 200, tojson(response.json())
return response.json()
# print(tojson(response.json()))
def query_api_playlist(channel, part = 'snippet,contentDetails,status', max_result = 50):
params = {
'channelId': channel,
'part': part,
'maxResults':max_result,
'key': YOUTUBE_API_KEY
}
response = requests.get('https://www.googleapis.com/youtube/v3/playlists', params=params)
assert response.status_code == 200, tojson(response.json())
return response.json()
# print(tojson(response.json()))
def query_api_playlist_items(id, part = 'snippet,contentDetails', max_result = 10):
params = {
'playlistId': id,
'part': part,
'maxResults':max_result,
'key': YOUTUBE_API_KEY
}
response = requests.get('https://www.googleapis.com/youtube/v3/playlistItems', params=params)
assert response.status_code == 200, tojson(response.json())
return response.json()
# print(tojson(response.json()))
def query_api_category(region_code, part = 'snippet'):
params = {
'regionCode': region_code,
'part': part,
'key': YOUTUBE_API_KEY
}
response = requests.get('https://www.googleapis.com/youtube/v3/videoCategories', params=params)
assert response.status_code == 200, tojson(response.json())
print(tojson(response.json()))
def decode_parameters(query_string):
result = {}
for param in query_string.split('&'):
kvpair = param.split('=')
if len(kvpair) == 2:
result[kvpair[0]] = urllib.request.unquote(kvpair[1])
return result
def get_download_links(video_id):
params = decode_parameters('el=embedded&ps=default&eurl=&gl=US&hl=en')
params['video_id'] = video_id
response = requests.get('http://www.youtube.com/get_video_info', params=params)
video_info = decode_parameters(response.text)
download_list = []
for item in video_info.get('adaptive_fmts').split(','):
download_info = decode_parameters(item)
download_list.append(download_info)
print(tojson(download_info))
for item in video_info.get('url_encoded_fmt_stream_map').split(','):
download_info = decode_parameters(item)
download_list.append(download_info)
print(tojson(download_info))
return download_list
def search_recent_videos(channel, max_result):
description_printed = False
result = query_api_search(channel=channel, max_result=max_result)
for item in result.get('items'):
snippet = item['snippet']
title = snippet['title']
if not description_printed:
description_printed = True
print('%s[%s]'%(snippet['channelId'], snippet['channelTitle']))
get_download_links(item['id']['videoId'])
print('[%s]'%re.sub(r'\.\d+Z$', '', snippet['publishedAt']), 'https://www.youtube.com/watch?v=%s'%item['id']['videoId'], title)
def query_recent_videos(channel, time_span, max_result):
start_time = time.localtime(time.mktime(time.localtime()) - time_span)
video_list = []
for playlist in query_api_playlist(channel=channel).get('items'):
playlist_id = playlist.get('id')
print('[PLAY]', playlist_id, playlist['snippet']['title'])
playlist_list = query_api_playlist_items(id=playlist_id, max_result=max_result).get('items')
playlist_list.sort(cmp=lambda a, b: -1 if a['contentDetails'].get('videoPublishedAt') > b['contentDetails'].get('videoPublishedAt') else 1)
for item in playlist_list:
detail = item.get('contentDetails')
if 'videoPublishedAt' not in detail: continue
title = item['snippet']['title']
print(' [VIDEO]', detail['videoPublishedAt'], detail['videoId'], title)
time_string = re.sub(r'\.\d+Z$','', detail.get('videoPublishedAt'))
publish_time = time.strptime(time_string, '%Y-%m-%dT%H:%M:%S')
if publish_time >= start_time:
video_item = (detail.get('videoId'), publish_time, title)
video_list.append(video_item)
video_list.sort(cmp=lambda a,b: -1 if a[1] > b[1] else 1)
for n in range(len(video_list)):
video_item = video_list[n]
print(time.strftime('[%Y-%m-%d %H:%M:%S]', video_item[1]), 'https://www.youtube.com/watch?v=%s'%video_item[0], '\'%s\''%(video_item[-1]))
def parse_time_span(value, unit):
if unit == 's':return int(value)
if unit == 'm':return int(value) * 60
if unit == 'h':return int(value) * 3600
if unit == 'd':return int(value) * 3600 * 24
return int(value)
def main():
arguments = argparse.ArgumentParser()
arguments.add_argument('--channel', '-c', default='UCQT2Ai7hQMnnvVTGd6GdrOQ')
arguments.add_argument('--time-span', '-t', default=7, help='')
arguments.add_argument('--time-unit', '-u', default='d', choices=['s','m','h','d'])
arguments.add_argument('--max-result', '-m', default=10)
arguments.add_argument('--channel-index', '-i', type=int, choices=range(len(CHANNEL_SETTING)))
options = arguments.parse_args(sys.argv[1:])
channel = options.channel
if options.channel_index is not None: channel = CHANNEL_SETTING[options.channel_index]
search_recent_videos(channel=channel, max_result=options.max_result)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment