Last active
May 10, 2021 01:09
-
-
Save skypenguins/509e6c66e246ae44bcb08953db2841ef to your computer and use it in GitHub Desktop.
Download media that an authorized user has RTed
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json, config | |
from requests_oauthlib import OAuth1Session | |
import datetime | |
import requests | |
import shutil | |
import re | |
import argparse | |
def get_user_timeline(max_id, screen_name=config.SCREEN_NAME): | |
twitter = OAuth1Session(config.CONSUMER_KEY, config.CONSUMER_SECRET, | |
config.ACCESS_TOKEN, config.ACCESS_TOKEN_SECRET) | |
url = 'https://api.twitter.com/1.1/statuses/user_timeline.json?tweet_mode=extended' # get tweets of greater than or equal to 140 characters | |
if max_id: | |
params = { | |
'screen_name': screen_name, | |
'count': '200', | |
'include_rts': 1, | |
'max_id': max_id | |
} | |
else: | |
params = {'screen_name': screen_name, 'count': '200', 'include_rts': 1} | |
res = twitter.get(url, params=params) | |
if res.status_code == 200: | |
user_timeline = json.loads(res.text) | |
return user_timeline | |
else: | |
print('HTTP error: %d' % res.status_code) | |
return False | |
def get_retweet_list(year, month, day): | |
rts = [] | |
max_id = None | |
flag = True | |
print('starting to get retweets...') | |
while flag: | |
user_timeline = get_user_timeline(max_id=max_id) | |
if user_timeline: | |
max_id = user_timeline[-1]['id_str'] | |
# get retweets | |
vanilla_rts = [ | |
status for status in user_timeline | |
if 'retweeted_status' in status | |
] | |
# search a retweet of the specified date | |
for status in vanilla_rts: | |
dt = datetime.datetime.strptime(status['created_at'], | |
'%a %b %d %H:%M:%S +0000 %Y') | |
created_at = dt + datetime.timedelta(hours=9) # JST | |
if datetime.date(year, month, day) <= created_at.date(): | |
rts.append(status) | |
print(created_at) | |
else: | |
flag = False | |
else: | |
print('getting error: no timelime') | |
break | |
else: | |
print('RTs:', len(rts)) | |
print('getting RTs finished') | |
return rts | |
def get_media_urls_from_tl(tweets): | |
urls = [] | |
for tweet in tweets: | |
dt = datetime.datetime.strptime( | |
tweet['retweeted_status']['created_at'], | |
'%a %b %d %H:%M:%S +0000 %Y') | |
created_at = dt + datetime.timedelta(hours=9) # JST | |
dt2 = datetime.datetime.strptime(tweet['created_at'], | |
'%a %b %d %H:%M:%S +0000 %Y') | |
user_status_created_at = dt2 + datetime.timedelta(hours=9) # JST | |
print('----------------------------------------------------') | |
print('{name} @{screen_name}\n'.format( | |
name=tweet['retweeted_status']['user']['name'], | |
screen_name=tweet['retweeted_status']['user']['screen_name'])) | |
print(tweet['retweeted_status']['full_text']) | |
print('\noriginal status id:', tweet['retweeted_status']['id_str']) | |
print('user\'s status id:', tweet['id_str']) | |
print('original date:', created_at) | |
print('retweeted date:', user_status_created_at) | |
if 'extended_entities' in tweet['retweeted_status']: | |
# get image urls | |
for media in tweet['retweeted_status']['extended_entities'][ | |
'media']: # multiple images | |
urls.append(media['media_url_https']) | |
print('image url:', media['media_url_https']) | |
# get video urls | |
videos = [] | |
bitrates = [] | |
if 'video_info' in media: | |
for variant in media['video_info'][ | |
'variants']: # multiple video qualities | |
if 'bitrate' in variant: | |
bitrates.append(int(variant['bitrate'])) | |
videos.append(variant) | |
# select best quality | |
best_bitrate = max(bitrates) | |
for video in videos: | |
if best_bitrate == video['bitrate']: | |
urls.append(video['url']) | |
print('video url:', video['url']) | |
else: | |
print('\nno media') | |
print('\nRTs:', len(tweets)) | |
print('extracted media:', len(urls)) | |
print('getting media urls finished') | |
return urls | |
def dl_media(media_urls): | |
save_dir = './media/' | |
for media_url in media_urls: | |
file_name = str(media_url.split("/")[-1]) | |
file_name = re.sub(r'\?.*', '', file_name) # delete parameter | |
full_path = save_dir + file_name | |
print('downloading {file_name} ... ({index} of {length})'.format( | |
file_name=file_name, | |
index=media_urls.index(media_url) + 1, | |
length=len(media_urls))) | |
if ('.mp4' in file_name) == False: | |
media_url = media_url + '?name=large' | |
try: | |
headers_dic = { | |
"User-Agent": | |
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.97 Safari/537.36" | |
} | |
with requests.get(media_url, stream=True, | |
headers=headers_dic) as res: | |
with open(full_path, 'wb') as f: | |
res.raw.decode_content = True | |
shutil.copyfileobj(res.raw, f) | |
print('download finished') | |
except requests.exceptions.HTTPError: | |
print(str(requests.exceptions.HTTPError)) | |
print('止まるんじゃねぇぞ…') | |
print('all download finished') | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser( | |
description='download media that an authorized user has RTed') | |
parser.add_argument( | |
'year', | |
type=int, | |
help= | |
'the year of media RTed by the authorized user that you want to get') | |
parser.add_argument( | |
'month', | |
type=int, | |
help= | |
'the month of media RTed by the authorized user that you want to get') | |
parser.add_argument( | |
'day', | |
type=int, | |
help='the day of media RTed by the authorized user that you want to get' | |
) | |
args = parser.parse_args() | |
dl_media( | |
get_media_urls_from_tl( | |
get_retweet_list(args.year, args.month, args.day))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment