Created
November 5, 2020 03:07
-
-
Save nwithan8/de7343565935cd43c057556cf88f892e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python | |
| """ | |
| Scrape tweets from multiple users timelines | |
| Indicate usernames by -u <username1> <username2> <username3> ... or -f <text_file_of_usernames> | |
| Indicate start and end dates with -s <start_date in YYYY-MM-DD> and -e <end_date in YYYY-MM-DD> | |
| Indicate tweets from past X days with -a <past_X_days> (overrides start date) | |
| Indicate past X number of tweets (between start and end dates if indicated) with -n <number> | |
| """ | |
| import tweepy # https://github.com/tweepy/tweepy | |
| import csv | |
| import argparse | |
| from datetime import datetime, timedelta | |
| # Twitter API credentials | |
| consumer_key = "" | |
| consumer_secret = "" | |
| access_key = "" | |
| access_secret = "" | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument('-u', '--users', nargs='+', default=None, type=str, help='list of usernames to collect from') | |
| parser.add_argument('-f', '--file', type=str, default=None, help="File with list of usernames") | |
| parser.add_argument('output', type=str, help="Output filename") | |
| parser.add_argument('-n', '--number', type=int, help='Only collect past X number of tweets (up to 3240)') | |
| parser.add_argument('-a', '--age', type=int, default=None, help='Only collect past tweets from past X days') | |
| parser.add_argument('-s', '--start', type=lambda s: datetime.strptime(s, '%Y-%m-%d'), default=None, | |
| help="Start date (YYYY-mm-dd)") | |
| parser.add_argument('-e', '--end', type=lambda s: datetime.strptime(s, '%Y-%m-%d'), default=None, | |
| help="End date (YYYY-mm-dd)") | |
| args = parser.parse_args() | |
| if args.age: | |
| args.start = datetime.now() - timedelta(days=args.age) | |
| args.end = None | |
| def make_url(status): | |
| return f'twitter.com/{status.user.screen_name}/status/{status.id}' | |
| def after_check(status): | |
| time = status.created_at | |
| if args.start and time < args.start: | |
| return False | |
| return True | |
| def before_check(status): | |
| if args.end and status.created_at > (args.end + timedelta(days=1)): | |
| return False | |
| return True | |
| def get_all_tweets(screen_name): | |
| # Twitter only allows access to a users most recent 3240 tweets with this method | |
| # authorize twitter, initialize tweepy | |
| auth = tweepy.OAuthHandler(consumer_key, consumer_secret) | |
| auth.set_access_token(access_key, access_secret) | |
| api = tweepy.API(auth) | |
| # initialize a list to hold all the tweepy Tweets | |
| all_tweets = [] | |
| keep_going = True | |
| total_count = 0 | |
| # make initial request for most recent tweets (200 is the maximum allowed count) | |
| new_tweets = api.user_timeline(screen_name=screen_name, count=200) | |
| for tweet in new_tweets: | |
| if args.number and total_count >= args.number: | |
| keep_going = False | |
| break | |
| elif not after_check(tweet): | |
| keep_going = False | |
| break | |
| else: | |
| if before_check(tweet): | |
| all_tweets.append(tweet) | |
| total_count += 1 | |
| while keep_going: | |
| print(f"...{len(all_tweets)} tweets downloaded so far") | |
| oldest = all_tweets[-1].id - 1 | |
| new_tweets = api.user_timeline(screen_name=screen_name, count=200, max_id=oldest) | |
| if new_tweets: | |
| for tweet in new_tweets: | |
| if args.number and total_count >= args.number: | |
| keep_going = False | |
| break | |
| elif not after_check(tweet): | |
| keep_going = False | |
| break | |
| else: | |
| if before_check(tweet): | |
| all_tweets.append(tweet) | |
| total_count += 1 | |
| print(f"{len(all_tweets)} tweets downloaded from {screen_name}") | |
| # transform the tweepy tweets into a 2D array that will populate the csv | |
| return [[tweet.created_at, tweet.text, make_url(tweet)] for tweet in all_tweets] | |
| def read_names_from_file(filename): | |
| with open(filename, 'r') as f: | |
| return f.read().splitlines() | |
| if __name__ == '__main__': | |
| if not args.users and not args.file: | |
| raise Exception("Must provide either a list of usernames or a path to a file with usernames") | |
| if args.file: | |
| names = read_names_from_file(filename=args.file) | |
| else: | |
| names = args.users | |
| with open(args.output, 'w+') as f: | |
| writer = csv.writer(f) | |
| writer.writerow([ | |
| "Username", | |
| "Tweet Date", | |
| "Tweet Text", | |
| "Tweet Link", | |
| ]) | |
| for name in names: | |
| print(f"Getting tweets from {name}...") | |
| tweets = get_all_tweets(name) | |
| # write the csv | |
| for tweet in tweets: | |
| tweet = [name] + tweet | |
| writer.writerow(tweet) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment