Skip to content

Instantly share code, notes, and snippets.

@nwithan8
Created November 5, 2020 03:07
Show Gist options
  • Select an option

  • Save nwithan8/de7343565935cd43c057556cf88f892e to your computer and use it in GitHub Desktop.

Select an option

Save nwithan8/de7343565935cd43c057556cf88f892e to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
"""
Scrape tweets from multiple users timelines
Indicate usernames by -u <username1> <username2> <username3> ... or -f <text_file_of_usernames>
Indicate start and end dates with -s <start_date in YYYY-MM-DD> and -e <end_date in YYYY-MM-DD>
Indicate tweets from past X days with -a <past_X_days> (overrides start date)
Indicate past X number of tweets (between start and end dates if indicated) with -n <number>
"""
import tweepy # https://github.com/tweepy/tweepy
import csv
import argparse
from datetime import datetime, timedelta
# Twitter API credentials
consumer_key = ""
consumer_secret = ""
access_key = ""
access_secret = ""
parser = argparse.ArgumentParser()
parser.add_argument('-u', '--users', nargs='+', default=None, type=str, help='list of usernames to collect from')
parser.add_argument('-f', '--file', type=str, default=None, help="File with list of usernames")
parser.add_argument('output', type=str, help="Output filename")
parser.add_argument('-n', '--number', type=int, help='Only collect past X number of tweets (up to 3240)')
parser.add_argument('-a', '--age', type=int, default=None, help='Only collect past tweets from past X days')
parser.add_argument('-s', '--start', type=lambda s: datetime.strptime(s, '%Y-%m-%d'), default=None,
help="Start date (YYYY-mm-dd)")
parser.add_argument('-e', '--end', type=lambda s: datetime.strptime(s, '%Y-%m-%d'), default=None,
help="End date (YYYY-mm-dd)")
args = parser.parse_args()
if args.age:
args.start = datetime.now() - timedelta(days=args.age)
args.end = None
def make_url(status):
return f'twitter.com/{status.user.screen_name}/status/{status.id}'
def after_check(status):
time = status.created_at
if args.start and time < args.start:
return False
return True
def before_check(status):
if args.end and status.created_at > (args.end + timedelta(days=1)):
return False
return True
def get_all_tweets(screen_name):
# Twitter only allows access to a users most recent 3240 tweets with this method
# authorize twitter, initialize tweepy
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_key, access_secret)
api = tweepy.API(auth)
# initialize a list to hold all the tweepy Tweets
all_tweets = []
keep_going = True
total_count = 0
# make initial request for most recent tweets (200 is the maximum allowed count)
new_tweets = api.user_timeline(screen_name=screen_name, count=200)
for tweet in new_tweets:
if args.number and total_count >= args.number:
keep_going = False
break
elif not after_check(tweet):
keep_going = False
break
else:
if before_check(tweet):
all_tweets.append(tweet)
total_count += 1
while keep_going:
print(f"...{len(all_tweets)} tweets downloaded so far")
oldest = all_tweets[-1].id - 1
new_tweets = api.user_timeline(screen_name=screen_name, count=200, max_id=oldest)
if new_tweets:
for tweet in new_tweets:
if args.number and total_count >= args.number:
keep_going = False
break
elif not after_check(tweet):
keep_going = False
break
else:
if before_check(tweet):
all_tweets.append(tweet)
total_count += 1
print(f"{len(all_tweets)} tweets downloaded from {screen_name}")
# transform the tweepy tweets into a 2D array that will populate the csv
return [[tweet.created_at, tweet.text, make_url(tweet)] for tweet in all_tweets]
def read_names_from_file(filename):
with open(filename, 'r') as f:
return f.read().splitlines()
if __name__ == '__main__':
if not args.users and not args.file:
raise Exception("Must provide either a list of usernames or a path to a file with usernames")
if args.file:
names = read_names_from_file(filename=args.file)
else:
names = args.users
with open(args.output, 'w+') as f:
writer = csv.writer(f)
writer.writerow([
"Username",
"Tweet Date",
"Tweet Text",
"Tweet Link",
])
for name in names:
print(f"Getting tweets from {name}...")
tweets = get_all_tweets(name)
# write the csv
for tweet in tweets:
tweet = [name] + tweet
writer.writerow(tweet)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment