Last active
November 27, 2022 15:44
-
-
Save Alexhuszagh/4b1730f88809dbb376d21662b17f14be to your computer and use it in GitHub Desktop.
Selenium-based Twitter scraper
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
''' | |
selenium_twitter | |
================ | |
Uses selenium to extract Tweets from Twitter | |
for a given user. Note that this breaks Twitter's | |
ToS and you can easily get banned for doing this. | |
I highly recommend you do not do this while logged in, | |
or on the same IP address as your main account. | |
Sample Usage: | |
./selenium_twitter.py \ | |
--user kardonice \ | |
--output kardonice.csv \ | |
--save-media \ | |
--format csv \ | |
--proxy socks5://127.0.0.1:9150 \ | |
--headless \ | |
--verbose | |
Requirements: | |
Python 3.9+ | |
beautifulsoup>=4.9 | |
requests>=2.25 | |
python-dateutil>=2.8 | |
selenium>=4.1 | |
Compatible chromedriver and chrome/chromium versions | |
Optional Requirements: | |
undetected_chromedriver>=3.1.3 | |
pysocks>=1.7 (using a socks5 proxy, like over Tor) | |
youtube_dl>=2021.05.16 (to download videos) | |
The `undetected_chromedriver` is highly recommended, as it makes | |
anti-scraping detection much more difficult and is essentially a | |
drop-in replacement for selenium. | |
If converting video formats for Tweets, you may also need ffmpeg: | |
https://ffmpeg.org/download.html | |
You can download chromedriver here: | |
https://chromedriver.chromium.org/downloads | |
NOTE: We currently do not support Twitter spaces. Any support | |
would be greatly appreciated. | |
''' | |
__version__ = '0.0.0-dev' | |
__author__ = 'Alex Huszagh <[email protected]>' | |
__license__ = 'Unlicense (Public Domain)' | |
import argparse | |
import csv | |
import datetime | |
import dateutil.parser | |
import itertools | |
import json | |
import os | |
import random | |
import re | |
import requests | |
import subprocess | |
import sys | |
import time | |
import urllib.parse | |
try: | |
import undetected_chromedriver as webdriver | |
except ImportError: | |
from selenium import webdriver | |
try: | |
import youtube_dl | |
except ImportError: | |
pass | |
from bs4 import BeautifulSoup | |
from selenium.common.exceptions import ( | |
NoSuchElementException, | |
StaleElementReferenceException, | |
TimeoutException, | |
WebDriverException, | |
) | |
from selenium.webdriver.common.by import By | |
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities | |
from selenium.webdriver.common.keys import Keys | |
from selenium.webdriver.support import expected_conditions as EC | |
from selenium.webdriver.support.ui import WebDriverWait | |
from requests.exceptions import RequestException | |
from urllib3.exceptions import ( | |
HTTPError, | |
IncompleteRead, | |
InvalidChunkLength, | |
MaxRetryError, | |
ProtocolError, | |
ProxyError, | |
TimeoutError, | |
) | |
class TwitterError(Exception): | |
pass | |
class StopTwitterError(Exception): | |
pass | |
LOAD_EXCEPTIONS = (NoSuchElementException, TimeoutException) | |
CONNECTION_EXCEPTIONS = ( | |
IncompleteRead, | |
InvalidChunkLength, | |
MaxRetryError, | |
NoSuchElementException, | |
ProtocolError, | |
ProxyError, | |
TimeoutError, | |
TimeoutException, | |
TwitterError, | |
WebDriverException, | |
) | |
def print_verbose(message, verbose=True): | |
if verbose: | |
print(message) | |
def eprint(*args, **kwargs): | |
print(*args, file=sys.stderr, **kwargs) | |
def current_date(): | |
now = datetime.datetime.now() | |
return now.date() | |
def parse_date(string): | |
year, date, day = string.split('-') | |
return datetime.date(int(year), int(date), int(day)) | |
def format_date(date): | |
return f'{date.year}-{date.month:02d}-{date.day:02d}' | |
def format_datetime(date): | |
return date.strftime('%a %b %d %H:%M:%S %z %Y') | |
def add_interval(date, interval): | |
return datetime.date.fromordinal(date.toordinal() + interval) | |
def subtract_interval(date, interval): | |
return datetime.date.fromordinal(date.toordinal() - interval) | |
def parse_joined_date(string): | |
return datetime.datetime.strptime(string, '%B %Y').date() | |
def add_query(url, key, value): | |
parsed = urllib.parse.urlparse(url) | |
query = dict(urllib.parse.parse_qsl(parsed.query)) | |
query[key] = [value] | |
query_string = urllib.parse.urlencode(query, doseq=True) | |
return urllib.parse.urlunparse(parsed._replace(query=query_string)) | |
def parse_interaction_count(string): | |
lower = string.lower() | |
if 'b' in lower: | |
multiplier = 1000000000 | |
lower = lower[:-1] | |
elif 'm' in lower: | |
multiplier = 1000000 | |
lower = lower[:-1] | |
elif 'k' in lower: | |
multiplier = 1000 | |
lower = lower[:-1] | |
else: | |
multiplier = 1 | |
float_str = lower.replace(',', '') or '0' | |
return int(multiplier * float(float_str)) | |
def random_wait(lower, upper): | |
'''Wait for a random amount of time to thwart anti-scrape detection.''' | |
time.sleep(random.uniform(lower, upper)) | |
def retry_scope(callback, count=3, exception=StaleElementReferenceException): | |
for index in range(count): | |
try: | |
return callback() | |
except exception as err: | |
error = err | |
raise error | |
def get_tweet_id(url): | |
parsed = urllib.parse.urlparse(url) | |
return os.path.basename(parsed.path) | |
def extract_media_data(media_item, tweet_url): | |
# can only post 1 video at a time, which includes GIFs | |
# you can post GIFs and photos at the same time, so you | |
# can have multiple photos and 1 video. | |
image = extract_photo_data(media_item, tweet_url) | |
if image is not None: | |
return image | |
video = extract_video_data(media_item, tweet_url) | |
if video is not None: | |
return video | |
def extract_photo_data(element, tweet_url): | |
image = try_find_element(element, By.TAG_NAME, 'img') | |
if image is None: | |
return | |
parsed = urllib.parse.urlparse(image.get_attribute('src')) | |
query = urllib.parse.parse_qs(parsed.query) | |
# sometimes the extension is none and is already provided in the path | |
extension = query.get('format') | |
path = parsed.path | |
if extension is not None: | |
path = f'{path}.{extension[0]}' | |
url = parsed._replace(path=path, query='') | |
return { | |
'type': 'photo', | |
'media_url': urllib.parse.urlunparse(url._replace(scheme='http')), | |
'media_url_https': urllib.parse.urlunparse(url), | |
} | |
def extract_video_data(element, tweet_url): | |
# don't search for the video directly, since it might not render | |
# if that's the case, the `<video>` tag won't appear. | |
video_selector = '*[data-testid="videoPlayer"]' | |
has_video = has_element(element, By.CSS_SELECTOR, video_selector) | |
if not has_video: | |
return | |
parsed = urllib.parse.urlparse(tweet_url) | |
return { | |
'type': 'video', | |
'media_url': urllib.parse.urlunparse(parsed._replace(scheme='http')), | |
'media_url_https': tweet_url, | |
} | |
def extract_tweet_data(session, url, user_info, args): | |
timeout = int(args.timeout) | |
short_timeout = int(args.short_timeout) | |
open_in_tab(session, url) | |
random_wait(0.1, 0.15) | |
id_str = get_tweet_id(url) | |
tweet_selector = f'article[data-testid="tweet"] a[href$="{id_str}"]' | |
text_selector = '*[data-testid="tweetText"]' | |
time_selector = 'a[role="link"] > time' | |
retweet_selector = 'a[href$="/retweets"]' | |
quote_selector = 'a[href$="/retweets/with_comments"]' | |
favorite_selector = 'a[href$="/likes"]' | |
transitive_text_selector = '*[data-testid="app-text-transition-container"]' | |
sources_url = 'https://help.twitter.com/using-twitter/how-to-tweet#source-labels' | |
source_selector = f'a[href="{sources_url}"][rel="noopener noreferrer nofollow"]' | |
media_selector = '*[data-testid="tweetPhoto"]' | |
data = {} | |
data['user'] = user_info.copy() | |
wait = WebDriverWait(session, timeout) | |
short_wait = WebDriverWait(session, short_timeout) | |
wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, tweet_selector))) | |
tweet = session.find_element(By.CSS_SELECTOR, tweet_selector) | |
while tweet is not None and tweet.tag_name != 'article': | |
tweet = tweet.find_element(by=By.XPATH, value='..') | |
if tweet is None: | |
raise TwitterError(f'Unable to find Tweet body for url "{url}".') | |
try: | |
short_wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, text_selector))) | |
text_elements = tweet.find_elements(By.CSS_SELECTOR, text_selector) | |
except LOAD_EXCEPTIONS: | |
text_elements = [] | |
# empty tweets can have a missing element | |
if text_elements: | |
text = text_elements[0] | |
data['text'] = text.text | |
else: | |
data['text'] = '' | |
data['id_str'] = id_str | |
data['id'] = int(id_str) | |
time = tweet.find_element(By.CSS_SELECTOR, time_selector) | |
timestamp = time.get_attribute('datetime') | |
date = dateutil.parser.parse(timestamp) | |
data['created_at'] = format_datetime(date) | |
# these all may be missing (they're nullable) | |
try: | |
retweets = session.find_element(By.CSS_SELECTOR, retweet_selector) | |
retweets = retweets.find_element(By.CSS_SELECTOR, transitive_text_selector) | |
data['retweet_count'] = parse_interaction_count(retweets.text) | |
except NoSuchElementException: | |
pass | |
try: | |
quotes = session.find_element(By.CSS_SELECTOR, quote_selector) | |
quotes = quotes.find_element(By.CSS_SELECTOR, transitive_text_selector) | |
data['quote_count'] = parse_interaction_count(quotes.text) | |
except NoSuchElementException: | |
pass | |
try: | |
favorites = session.find_element(By.CSS_SELECTOR, favorite_selector) | |
favorites = favorites.find_element(By.CSS_SELECTOR, transitive_text_selector) | |
data['favorite_count'] = parse_interaction_count(favorites.text) | |
except NoSuchElementException: | |
pass | |
data['is_quote_status'] = len(text_elements) > 1 | |
try: | |
source = session.find_element(By.CSS_SELECTOR, source_selector) | |
html = source.get_attribute('outerHTML') | |
soup = BeautifulSoup(html, 'html.parser') | |
for attr in ['target', 'class', 'role']: | |
soup.a.attrs.pop(attr) | |
data['source'] = str(soup.a) | |
except NoSuchElementException: | |
pass | |
try: | |
media = tweet.find_elements(By.CSS_SELECTOR, media_selector) | |
data['entities'] = {} | |
data['entities']['media'] = [] | |
for media_item in media: | |
item = extract_media_data(media_item, url) | |
if item is not None: | |
data['entities']['media'].append(item) | |
except NoSuchElementException: | |
pass | |
close_tab(session) | |
return data | |
def open_in_tab(session, url): | |
# this requires `--disable-popup-blocking` to be set | |
session.execute_script('window.open();') | |
handle = session.window_handles[-1] | |
session.switch_to.window(handle) | |
session.get(url) | |
def close_tab(session): | |
session.execute_script('window.close();') | |
handle = session.window_handles[-1] | |
session.switch_to.window(handle) | |
def parse_memo(location, args): | |
memo = set() | |
if os.path.exists(location.file_path): | |
globals()[f'read_memo_{args.format}'](location, memo) | |
return memo | |
def read_memo_csv(location, memo): | |
with open(location.file_path, newline='') as file: | |
reader = csv.DictReader(file, dialect='excel-tab') | |
for row in reader: | |
memo.add(row['id_str']) | |
def read_memo_json(location, memo): | |
with open(location.file_path) as file: | |
for line in file: | |
data = json.loads(line) | |
memo.add(data['id_str']) | |
def check_cycle_proxy(proxies, session, force, args): | |
proxy = None | |
if hasattr(proxies, '__next__') and session is None: | |
proxy = next(proxies) | |
session = Session(get_session(proxy, args)) | |
elif session is None: | |
proxy = proxies | |
session = Session(get_session(proxy, args)) | |
elif hasattr(proxies, '__next__'): | |
proxy = next(proxies) | |
print_verbose(f'Cycling proxy to {proxy}.') | |
session.close() | |
session._session = get_session(proxy, args) | |
elif session is None: | |
proxy = proxies | |
session = Session(get_session(proxy, args)) | |
elif force: | |
proxy = proxies | |
session.close() | |
session._session = get_session(proxy, args) | |
return (session, proxy) | |
def get_options(proxy, args): | |
options = webdriver.ChromeOptions() | |
if proxy: | |
print_verbose(f'Connecting to proxy at {proxy}.', args.verbose) | |
options.add_argument(f'--proxy-server={proxy}') | |
# Run in a headless session (useful when we know this logic works). | |
if getattr(args, 'headless', None): | |
options.add_argument('--headless') | |
# ensure we disable popup blocking for our tabs | |
options.add_argument('--disable-popup-blocking') | |
return options | |
def get_session(proxy, args): | |
'''Get a pre-configured Selenium driver session.''' | |
options = get_options(proxy, args) | |
# If the page is slow to load or never does, use a custom strategy | |
capabilities = DesiredCapabilities().CHROME | |
capabilities['pageLoadStrategy'] = getattr(args, 'page_load_strategy', 'normal') | |
# ensure we're using the correct major version. | |
# this only works with the undetected chromedriver. | |
kwds = {} | |
if 'undetected_chromedriver' in sys.modules: | |
output = subprocess.check_output(['chromedriver', '--version']).decode('utf-8') | |
version = int(re.match(r'^ChromeDriver (\d+)\.\d+', output).group(1)) | |
kwds['version_main'] = version | |
print_verbose('Getting Chrome browser session', args.verbose) | |
return webdriver.Chrome( | |
options=options, | |
desired_capabilities=capabilities, | |
**kwds, | |
) | |
def try_find_element(parent, by, selector): | |
try: | |
return parent.find_element(by, selector) | |
except NoSuchElementException: | |
pass | |
def has_element(parent, by, selector): | |
return try_find_element(parent, by, selector) is not None | |
def get_user_info(session, screen_name): | |
script_selector = 'script[data-testid="UserProfileSchema-test"]' | |
verified_selector = 'svg[aria-label="Verified account"]' | |
script = session.find_element(By.CSS_SELECTOR, script_selector) | |
data = script.get_attribute('innerText') | |
user = json.loads(data) | |
author = user['author'] | |
profile_image_url_https = urllib.parse.urlparse(author['image']['contentUrl']) | |
profile_image_url = profile_image_url_https._replace(scheme='http') | |
result = { | |
'id': int(author['identifier']), | |
'id_str': author['identifier'], | |
'description': author['description'], | |
'name': author['givenName'], | |
'screen_name': screen_name, | |
'created_at': user['dateCreated'], | |
'profile_image_url': urllib.parse.urlunparse(profile_image_url), | |
'profile_image_url_https': urllib.parse.urlunparse(profile_image_url_https), | |
} | |
for item in author['interactionStatistic']: | |
if item['name'] == 'Friends': | |
result['friends_count'] = item['userInteractionCount'] | |
elif item['name'] == 'Follows': | |
result['followers_count'] = item['userInteractionCount'] | |
is_verified = has_element(session, By.CSS_SELECTOR, verified_selector) | |
result['is_verified'] = is_verified | |
return result | |
def get_tweets(session, query, args): | |
tweet_selector = 'article[data-testid="tweet"]' | |
no_tweets_selector = '*[data-testid="empty_state_header_text"]' | |
short_timeout = int(args.short_timeout) | |
short_wait = WebDriverWait(session, short_timeout) | |
tweets = None | |
try: | |
short_wait.until(EC.any_of( | |
EC.presence_of_element_located((By.CSS_SELECTOR, tweet_selector)), | |
EC.presence_of_element_located((By.CSS_SELECTOR, no_tweets_selector)), | |
)) | |
if has_element(session, By.CSS_SELECTOR, tweet_selector): | |
tweets = session.find_elements(By.CSS_SELECTOR, tweet_selector) | |
elif has_element(session, By.CSS_SELECTOR, no_tweets_selector): | |
tweets = [] | |
except NoSuchElementException: | |
pass | |
except TimeoutException: | |
print(f'Unable to fetch Tweets within timeout for query "{query}", maybe increase the timeout?') | |
return tweets | |
def process_tweets(session, tweets, user_info, memo, args): | |
link_selector = 'a[role="link"] > time' | |
processed = 0 | |
for tweet in tweets: | |
# find the link via the date element | |
link_time = tweet.find_element(By.CSS_SELECTOR, link_selector) | |
link = link_time.find_element(by=By.XPATH, value='..') | |
tweet_url = link.get_attribute('href') | |
id_str = get_tweet_id(tweet_url) | |
if id_str in memo: | |
print_verbose(f'Got duplicate Tweet with ID "{id_str}".', args.verbose) | |
continue | |
# now, process all the tweet data | |
print_verbose(f'Processing tweet at URL "{tweet_url}".', args.verbose) | |
processed += 1 | |
memo.add(id_str) | |
cb = lambda: extract_tweet_data(session, tweet_url, user_info, args) | |
yield retry_scope(cb) | |
if processed == 0: | |
# ensure we mark there were no tweets processed | |
tweets.clear() | |
def get_statuses_impl(location, memo, session, until, args): | |
interval = int(args.interval) | |
session.get(f'https://twitter.com/{args.user}') | |
# first, check if our page actually loaded | |
try: | |
timeline_selector = '*[aria-label="Home timeline"]' | |
home_timeline = session.find_element(By.CSS_SELECTOR, timeline_selector) | |
if 'Something went wrong. Try reloading.' in home_timeline.text: | |
raise TwitterError('Unable to load tweets, Twitter cannot be accessed.') | |
except NoSuchElementException as error: | |
raise TwitterError('Unable to get home timeline') from error | |
# now, get the start: when the account was created | |
# change this into YYYY-MM-DD format. | |
user_info = get_user_info(session, args.user) | |
joined = dateutil.parser.parse(user_info['created_at']).date() | |
# now, need we need to process tweets iteratively, | |
# and log the current date range for each | |
search_selector = 'input[data-testid="SearchBox_Search_Input"]' | |
wait = WebDriverWait(session, int(args.timeout)) | |
while until > joined: | |
# generate our search query | |
since = max(subtract_interval(until, interval), joined) | |
query = f'from:{args.user} until:{format_date(until)} since:{format_date(since)}' | |
until = since | |
print_verbose(f'Running search query of "{query}".', args.verbose) | |
# enter our search terms: must clear field. using `clear` no longer | |
# works, so we must send keys manually | |
wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, search_selector))) | |
element = session.find_element(By.CSS_SELECTOR, search_selector) | |
element.send_keys(Keys.CONTROL + 'a') | |
element.send_keys(Keys.DELETE) | |
element.send_keys(query) | |
element.send_keys(Keys.RETURN) | |
# now, shift to the latest tab. the easiest way for this | |
# is to get the URL, and add f=live for the parameters | |
url = session.current_url | |
live_url = add_query(url, 'f', 'live') | |
print_verbose(f'Going to live url "{live_url}".', args.verbose) | |
session.get(live_url) | |
# now, need to iterate over all tweets | |
# twitter dynamically loads tweets so we | |
# need to scroll into view and store which | |
# tweets we've processed. | |
got_tweets = True | |
while got_tweets: | |
# wrap this in a short little wrapper so if we fail, then we can reload | |
for _ in range(3): | |
tweets = get_tweets(session, query, args) | |
if tweets: | |
break | |
elif tweets is None: | |
continue | |
got_tweets = False | |
if tweets is not None: | |
yield from process_tweets(session, tweets, user_info, memo, args) | |
got_tweets = len(tweets) != 0 | |
# now, we need to scroll towards the end | |
if got_tweets: | |
session.execute_script('arguments[0].scrollIntoView();', tweets[-1]) | |
time.sleep(0.4) | |
# now need to yield since it's a safe time to cycle proxies | |
yield True | |
# NOTE: cannot use `StopIteration` since those raise a | |
# runtime error when used within a generator. | |
raise StopTwitterError('Completed all Tweets through account creation date.') | |
def get_statuses(location, args): | |
memo = parse_memo(location, args) | |
processed_total = 0 | |
consecutive_tries = 0 | |
until = parse_date(args.until) | |
cycle_count = int(args.cycle_count) | |
restart_timeout = int(args.restart_timeout) | |
tweet = None | |
session = None | |
force = False | |
proxy = None | |
proxies = getattr(args, 'proxy', None) | |
if isinstance(proxies, list) and len(proxies) == 1: | |
proxies = proxies[0] | |
elif isinstance(proxies, list) and len(proxies) > 1: | |
proxies = itertools.cycle(proxies) | |
while True: | |
try: | |
session, proxy = check_cycle_proxy(proxies, session, force, args) | |
force = False | |
processed_total = 0 | |
for tweet in get_statuses_impl(location, memo, session, until, args): | |
is_bool = isinstance(tweet, bool) | |
if is_bool and processed_total >= cycle_count: | |
# will cycle proxy on the next loop | |
break | |
elif not is_bool: | |
parsed_date = dateutil.parser.parse(tweet['created_at']).date() | |
until = add_interval(parsed_date, 1) | |
yield (tweet, proxy) | |
processed_total += 1 | |
consecutive_tries = 0 | |
except CONNECTION_EXCEPTIONS: | |
consecutive_tries += 1 | |
print_verbose(f'Got connection error, retrying, done {consecutive_tries} consecutive tries.') | |
if consecutive_tries > 2 and args.restart_on_failure: | |
eprint(f'\033[31mError:\033[0m Restarting on failure, sleeping for {restart_timeout} seconds.') | |
time.sleep(restart_timeout) | |
consecutive_tries = 0 | |
force = True | |
elif consecutive_tries > 2: | |
raise | |
elif tweet is not None: | |
pass | |
force = True | |
except StopTwitterError: | |
print_verbose('Completed all Tweets.', args.verbose) | |
break | |
except (HTTPError, WebDriverException): | |
if not args.restart_on_failure: | |
raise | |
eprint(f'\033[31mError:\033[0m Restarting on failure, sleeping for {restart_timeout} seconds.') | |
time.sleep(restart_timeout) | |
consecutive_tries = 0 | |
force = True | |
SIMPLE_USER_FIELDS = [ | |
'created_at', | |
'description', | |
'entities', | |
'followers_count', | |
'friends_count', | |
'id', | |
'id_str', | |
'name', | |
'screen_name', | |
'url', | |
'verified', | |
] | |
SIMPLE_STATUS_FIELDS = [ | |
'created_at', | |
'favorite_count', | |
'id', | |
'id_str', | |
'is_quote_status', | |
'quote_count', | |
'retweet_count', | |
'retweeted', | |
'source', | |
'text', | |
# NOTE: we currently do not support finding the reply IDs or | |
# quote IDs, since it's not very well marked in the UI. | |
] | |
def extract_fields(item, fields): | |
if fields is None: | |
return item | |
return {k: item.get(k) for k in fields} | |
def write_items(location, iterable, args): | |
restart_timeout = int(args.restart_timeout) | |
os.makedirs(location.parent, exist_ok=True) | |
if args.save_media: | |
os.makedirs(location.directory_path, exist_ok=True) | |
consecutive_tries = 0 | |
while True: | |
try: | |
globals()[f'write_{args.format}'](location, iterable, args) | |
return | |
except RequestException: | |
consecutive_tries += 1 | |
if consecutive_tries > 2 and args.restart_on_failure: | |
eprint(f'\033[31mError:\033[0m Restarting on failure, sleeping for {restart_timeout} seconds.') | |
time.sleep(restart_timeout) | |
consecutive_tries = 0 | |
elif consecutive_tries > 2: | |
raise | |
def write_csv(location, iterable, args): | |
with open(location.file_path, 'a', newline='') as file: | |
writer = None | |
fields = args.fields or SIMPLE_STATUS_FIELDS | |
for index, (item, proxy) in enumerate(iterable): | |
if writer is None: | |
writer = csv.DictWriter(file, fieldnames=fields, dialect='excel-tab') | |
writer.writeheader() | |
# save media prior to item, so we ensure it gets written | |
if args.save_media: | |
save_media_urls(location, item, proxy, args) | |
print_verbose(f'Writing status {item["id_str"]}', args.verbose) | |
data = extract_fields(item, fields) | |
writer.writerow(data) | |
if index % 10 == 0: | |
file.flush() | |
def write_json(location, iterable, args): | |
# this writes it as a series of JSON objects, to avoid failing to write to disk | |
with open(location.file_path, 'a') as file: | |
fields = args.fields | |
for index, (item, proxy) in enumerate(iterable): | |
# save media prior to item, so we ensure it gets written | |
if args.save_media: | |
save_media_urls(location, item, proxy, args) | |
print_verbose(f'Writing status {item["id_str"]}', args.verbose) | |
data = extract_fields(item, fields) | |
file.write(json.dumps(data) + '\n') | |
if index % 10 == 0: | |
file.flush() | |
def save_media_urls(location, item, proxy, args): | |
media = item.get('entities', {}).get('media', []) | |
media += item.get('extended_entities', {}).get('media', []) | |
for media_item in media: | |
save_media_item(location, item, media_item, proxy, args) | |
def save_media_item(location, item, media_item, proxy, args): | |
url = media_item.get('media_url_https') | |
filetype = media_item.get('type') | |
if url is not None: | |
globals()[f'save_media_{filetype}'](location, url, item, proxy, args) | |
def save_media_photo(location, url, item, proxy, args): | |
parsed = urllib.parse.urlparse(url) | |
filename = os.path.basename(parsed.path) | |
unique_filename = f'{item["id_str"]}-{filename}' | |
print_verbose(f'Saving photo at url "{url}" with unique ID {unique_filename}.') | |
kwds = {} | |
if proxy: | |
kwds['proxies'] = { | |
'http': proxy, | |
'https': proxy, | |
} | |
response = requests.get(url, **kwds) | |
if not response.ok: | |
eprint(f'\033[31mError:\033[0m Unable to save media attachment at url "{url}".') | |
path = os.path.join(location.directory_path, unique_filename) | |
with open(path, 'wb') as file: | |
file.write(response.content) | |
def save_media_video(location, url, item, proxy, args): | |
if 'youtube_dl' not in sys.modules: | |
eprint('\033[1;33mWarning:\033[0m Unable to save video: youtube-dl is not installed.') | |
return | |
# videos can either be `mp4` or `m3u8_native` (a plain-text playlist | |
# of native media files), but we always convert to an `mp4`. | |
unique_filename = f'{item["id_str"]}.mp4' | |
path = os.path.join(location.directory_path, unique_filename) | |
print_verbose(f'Saving video at url "{url}" with unique ID {unique_filename}.') | |
args = ['-f', 'best', '--format', 'mp4', '--output', path, '--proxy', proxy, url] | |
try: | |
youtube_dl.main(args) | |
except SystemExit as exit: | |
# youtube-dl calls `sys.exit`, which we need to catch. | |
# re-raise if we have an unsuccessful exit | |
if exit.code != 0: | |
raise exit | |
class Session: | |
'''Shallow wrapper so we can modify sessions in-place.''' | |
def __init__(self, session): | |
self._session = session | |
def __getattr__(self, attr): | |
if attr in self.__dict__: | |
return getattr(self, attr) | |
return getattr(self._session, attr) | |
class OutputLocation: | |
_slots_ = ('parent', 'filename', 'extension') | |
def __init__(self, path, file_format): | |
realpath = os.path.realpath(path) | |
self.parent = os.path.dirname(realpath) | |
basename = os.path.basename(realpath) | |
self.filename, self.extension = os.path.splitext(basename) | |
if not self.extension: | |
self.extension = f'.{file_format}' | |
@property | |
def file_path(self): | |
return os.path.join(self.parent, f'{self.filename}{self.extension}') | |
@property | |
def directory_path(self): | |
return os.path.join(self.parent, self.filename) | |
def main(): | |
'''Parse the command-line arguments.''' | |
today = current_date() | |
tomorrow = add_interval(today, 1) | |
parser = argparse.ArgumentParser(description='Twitter Selenium exporter parameters.') | |
parser.add_argument( | |
'-u', | |
'--user', | |
help='Screen name of user to get Tweets from.', | |
required=True, | |
) | |
parser.add_argument( | |
'-o', | |
'--output', | |
help='Output file name, the extension will be added if not provided.', | |
) | |
parser.add_argument( | |
'-V', | |
'--version', | |
action='version', | |
version=f'%(prog)s {__version__}' | |
) | |
parser.add_argument( | |
'-sm', | |
'--save-media', | |
action='store_true', | |
help='Save media attachments. The directory name defaults to the filename.', | |
) | |
parser.add_argument( | |
'-f', | |
'--fields', | |
help='Fields to extract from each item. Leave empty for all', | |
nargs='*', | |
) | |
parser.add_argument( | |
'--format', | |
help='Export format.', | |
default='json', | |
choices=['json', 'csv'], | |
) | |
parser.add_argument( | |
'-p', | |
'--proxy', | |
help='Proxy server to connect to Twitter over.', | |
nargs='*', | |
) | |
parser.add_argument( | |
'-v', | |
'--verbose', | |
action='store_true', | |
help='Print verbose debugging information.', | |
) | |
parser.add_argument( | |
'--headless', | |
action='store_true', | |
help='Run in headless (no UI) mode.', | |
) | |
parser.add_argument( | |
'--until', | |
help='The date in YYYY-MM-DD format.', | |
default=format_date(tomorrow), | |
) | |
parser.add_argument( | |
'--interval', | |
help='The number of days to generate the since/until date ranges.', | |
default='7', | |
) | |
parser.add_argument( | |
'--timeout', | |
help='Timeout (in seconds) for an element to load after page load.', | |
default='240', | |
) | |
parser.add_argument( | |
'--short-timeout', | |
help='Short timeout (in seconds) waiting elements to dynamically load on a loaded site.', | |
default='10', | |
) | |
parser.add_argument( | |
'--cycle-count', | |
help='Number of Tweets to process before cycling proxies.', | |
default='240', | |
) | |
parser.add_argument( | |
'--restart-on-failure', | |
action='store_true', | |
help='Restart automatically, infinitely, if the command fails.', | |
) | |
parser.add_argument( | |
'--restart-timeout', | |
help='Timeout, in seconds, to sleep before retrying on failure.', | |
default='600', | |
) | |
args = parser.parse_args() | |
if not args.proxy: | |
eprint('\033[1;33mWarning:\033[0m It is highly recommended to use a proxy server.') | |
output = args.output | |
if output is None: | |
output = f'{args.user}_statuses.{args.format}' | |
location = OutputLocation(output, args.format) | |
iterable = get_statuses(location, args) | |
write_items(location, iterable, args) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment