Last active
November 23, 2022 20:56
-
-
Save Alexhuszagh/298a1bcf5034cbdbc2074c18e737ba38 to your computer and use it in GitHub Desktop.
Script to export Twitter data (statuses, friends, followers).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
''' | |
export_twitter | |
============== | |
Export data information from Twitter, including friends, followers, | |
statuses, and more. Note that the API JSON document should look like: | |
{ | |
"consumer_key": "...", | |
"consumer_secret": "...", | |
"access_token": "...", | |
"access_token_secret": "..." | |
} | |
Sample Usage: | |
./export_twitter.py \ | |
--friends \ | |
--api api.json \ | |
--user kardonice \ | |
--output kardonice.csv \ | |
--save-media \ | |
--format csv \ | |
--verbose | |
Requirements: | |
Python 3.5+ | |
tweepy==3.10.0 | |
requests>=2.25 | |
''' | |
__version__ = '0.0.0-dev' | |
__author__ = 'Alex Huszagh <[email protected]>' | |
__license__ = 'Unlicense (Public Domain)' | |
import argparse | |
import csv | |
import json | |
import os | |
import requests | |
import tweepy | |
import urllib.parse | |
def print_verbose(message, verbose=True): | |
if verbose: | |
print(message) | |
def generate_api(path): | |
api_data = json.load(open(path)) | |
consumer_key = api_data['consumer_key'] | |
consumer_secret = api_data['consumer_secret'] | |
access_token = api_data['access_token'] | |
access_token_secret = api_data['access_token_secret'] | |
auth = tweepy.OAuthHandler(consumer_key, consumer_secret) | |
auth.set_access_token(access_token, access_token_secret) | |
return tweepy.API( | |
auth, | |
timeout=5, | |
wait_on_rate_limit=True, | |
wait_on_rate_limit_notify=True, | |
compression=True | |
) | |
def get_user(api, screen_name): | |
return api.lookup_users(screen_names=[screen_name])[0] | |
def get_cursor(cursor, default): | |
if cursor is None: | |
return default | |
return cursor | |
def get_followers(api, user, args): | |
cursor = tweepy.Cursor( | |
api.followers, | |
user_id=user.id, | |
screen_name=None, | |
cursor=get_cursor(args.cursor, -1), | |
) | |
for page in cursor.pages(): | |
print_verbose(f'Current cursor at {cursor.iterator.next_cursor}', args.verbose) | |
yield from page | |
def get_friends(api, user, args): | |
cursor = tweepy.Cursor( | |
api.friends, | |
user_id=user.id, | |
screen_name=None, | |
cursor=get_cursor(args.cursor, -1), | |
) | |
for page in cursor.pages(): | |
print_verbose(f'Current cursor at {cursor.iterator.next_cursor}', args.verbose) | |
yield from page | |
def get_statuses(api, user, args): | |
cursor = tweepy.Cursor( | |
api.user_timeline, | |
user_id=user.id, | |
screen_name=None, | |
max_id=get_cursor(args.cursor, None), | |
) | |
for page in cursor.pages(): | |
print_verbose(f'Current cursor at {cursor.iterator.max_id}', args.verbose) | |
yield from page | |
ITEM_MESSAGE = { | |
'followers': ('user', 'screen_name'), | |
'friends': ('user', 'screen_name'), | |
'statuses': ('status', 'id_str'), | |
} | |
SIMPLE_USER_FIELDS = [ | |
'created_at', | |
'default_profile', | |
'default_profile_image', | |
'description', | |
'favourites_count', | |
'followers_count', | |
'friends_count', | |
'id', | |
'id_str', | |
'listed_count', | |
'location', | |
'name', | |
'profile_banner_url', | |
'profile_image_url_https', | |
'protected', | |
'screen_name', | |
'statuses_count', | |
'url', | |
'verified', | |
'withheld_scope', | |
] | |
SIMPLE_STATUS_FIELDS = [ | |
'contributors', | |
'created_at', | |
'favorite_count', | |
'favorited', | |
'filter_level', | |
'geo', | |
'id', | |
'id_str', | |
'in_reply_to_screen_name', | |
'in_reply_to_status_id', | |
'in_reply_to_status_id_str', | |
'in_reply_to_user_id', | |
'in_reply_to_user_id_str', | |
'is_quote_status', | |
'lang', | |
'possibly_sensitive', | |
'quote_count', | |
'quoted_status_id', | |
'quoted_status_id_str', | |
'retweet_count', | |
'retweeted', | |
'source', | |
'text', | |
'truncated', | |
'withheld_copyright', | |
'withheld_scope', | |
] | |
SIMPLE_FIELDS = { | |
'followers': SIMPLE_USER_FIELDS, | |
'friends': SIMPLE_USER_FIELDS, | |
'statuses': SIMPLE_STATUS_FIELDS, | |
} | |
def print_item_verbose(item, mode, verbose): | |
field_type, field_attr = ITEM_MESSAGE[mode] | |
print_verbose(f'Writing {field_type} {getattr(item, field_attr)}', verbose) | |
def extract_fields(item, fields): | |
if fields is None: | |
return item._json | |
return {k: item._json.get(k) for k in fields} | |
def write_items(location, iterable, mode, args): | |
os.makedirs(location.parent, exist_ok=True) | |
if args.save_media: | |
os.makedirs(location.directory_path, exist_ok=True) | |
globals()[f'write_{args.format}'](location, iterable, mode, args) | |
def write_csv(location, iterable, mode, args): | |
with open(location.file_path, 'a', newline='') as file: | |
writer = None | |
fields = args.fields or SIMPLE_FIELDS[mode] | |
for index, item in enumerate(iterable): | |
if writer is None: | |
writer = csv.DictWriter(file, fieldnames=fields, dialect='excel-tab') | |
writer.writeheader() | |
print_item_verbose(item, mode, args.verbose) | |
data = extract_fields(item, fields) | |
writer.writerow(data) | |
if index % 10 == 0: | |
file.flush() | |
if args.save_media: | |
save_media_urls(location, item, args) | |
def write_json(location, iterable, mode, args): | |
# this writes it as a series of JSON objects, to avoid failing to write to disk | |
with open(location.file_path, 'a') as file: | |
fields = args.fields | |
for index, item in enumerate(iterable): | |
print_item_verbose(item, mode, args.verbose) | |
data = extract_fields(item, fields) | |
file.write(json.dumps(data) + '\n') | |
if index % 10 == 0: | |
file.flush() | |
if args.save_media: | |
save_media_urls(location, item, args) | |
def save_media_urls(location, item, args): | |
media = item._json.get('entities', {}).get('media', []) | |
media += item._json.get('extended_entities', {}).get('media', []) | |
for media_item in media: | |
save_media_item(location, item, media_item, args) | |
def save_media_item(location, item, media_item, args): | |
url = media_item.get('media_url_https') | |
if url is not None: | |
parsed = urllib.parse.urlparse(url) | |
filename = os.path.basename(parsed.path) | |
unique_filename = f'{item.id_str}-{media_item["id_str"]}-{filename}' | |
save_media(location, url, unique_filename, args) | |
def save_media(location, url, unique_filename, args): | |
print_verbose(f'Saving media at url "{url}" with unique ID {unique_filename}.') | |
response = requests.get(url) | |
if not response.ok: | |
print(f'\033[31mError:\033[0m Unable to save media attachment at url "{url}".') | |
path = os.path.join(location.directory_path, unique_filename) | |
with open(path, 'wb') as file: | |
file.write(response.content) | |
def get_mode(args): | |
attrs = ['friends', 'followers', 'statuses'] | |
for attr in attrs: | |
if getattr(args, attr, None): | |
return attr | |
raise ValueError('Currently unknown export mode') | |
class OutputLocation: | |
_slots_ = ('parent', 'filename', 'extension') | |
def __init__(self, path, file_format): | |
realpath = os.path.realpath(path) | |
self.parent = os.path.dirname(realpath) | |
basename = os.path.basename(realpath) | |
self.filename, self.extension = os.path.splitext(basename) | |
if not self.extension: | |
self.extension = f'.{file_format}' | |
@property | |
def file_path(self): | |
return os.path.join(self.parent, f'{self.filename}{self.extension}') | |
@property | |
def directory_path(self): | |
return os.path.join(self.parent, self.filename) | |
def main(): | |
parser = argparse.ArgumentParser(description='Twitter API exporter parameters.') | |
action_group = parser.add_mutually_exclusive_group(required=True) | |
action_group.add_argument( | |
'--friends', | |
help='Export a list of friends (accounts you follow)', | |
action='store_true', | |
) | |
action_group.add_argument( | |
'--followers', | |
help='Export a list of followers (accounts that follow you)', | |
action='store_true', | |
) | |
action_group.add_argument( | |
'--statuses', | |
help='Export a list of statuses from a user account', | |
action='store_true', | |
) | |
parser.add_argument( | |
'-a', | |
'--api', | |
help='JSON document with the API credentials.', | |
default='api.json', | |
) | |
parser.add_argument( | |
'-u', | |
'--user', | |
help='Screen name of user to get data from.', | |
required=True, | |
) | |
parser.add_argument( | |
'-o', | |
'--output', | |
help='Output file name, the extension will be added if not provided.', | |
) | |
parser.add_argument( | |
'-V', | |
'--version', | |
action='version', | |
version=f'%(prog)s {__version__}' | |
) | |
parser.add_argument( | |
'-sm', | |
'--save-media', | |
action='store_true', | |
help='Save media attachments. The directory name defaults to the filename.', | |
) | |
parser.add_argument( | |
'-f', | |
'--fields', | |
help='Fields to extract from each item. Leave empty for all', | |
nargs='*', | |
) | |
parser.add_argument( | |
'--format', | |
help='Export format.', | |
default='json', | |
choices=['json', 'csv'], | |
) | |
parser.add_argument( | |
'-c', | |
'--cursor', | |
help='Current cursor position (in case of interrupted run).', | |
) | |
parser.add_argument( | |
'-v', | |
'--verbose', | |
action='store_true', | |
help='Print verbose debugging information.', | |
) | |
args = parser.parse_args() | |
mode = get_mode(args) | |
output = args.output | |
if output is None: | |
output = f'{args.user}_{mode}.{args.format}' | |
location = OutputLocation(output, args.format) | |
api = generate_api(args.api) | |
user = get_user(api, args.user) | |
kwds = {} | |
if args.cursor is not None: | |
kwds['cursor'] = int(args.cursor) | |
iterable = globals()[f'get_{mode}'](api, user, args) | |
write_items(location, iterable, mode, args) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment