Created
September 4, 2021 20:15
-
-
Save marknca/e6d713cd381f963f39a1d27f74352ce7 to your computer and use it in GitHub Desktop.
Generate an HTML presentation of a Twitter thread given a specific tweet ID
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python3 | |
# Standard library | |
import datetime | |
import re | |
import sys | |
import time | |
# 3rd party library | |
import requests | |
bearer_token = 'YOUR_BEARER_TOKEN_HERE' | |
headers = { 'Authorization': 'Bearer {}'.format(bearer_token) } | |
def get_html_for_thread(tweet_thread, tweets, includes, users): | |
result = '<a name="start_of_thread"></a>\n\n' | |
thread_len = len(tweet_thread.keys()) | |
thread_index = 1 | |
for k, v in tweet_thread.items(): | |
result += "{}\n\n".format(get_html_for_tweet(v, tweets, includes, users, thread_index, thread_len)) | |
thread_index += 1 | |
return result | |
def get_html_for_tweet(tweet, tweets, includes, users, thread_index=None, thread_len=None): | |
result = None | |
if 'data' in tweet: tweet = tweet['data'] | |
created_at_obj = datetime.datetime.strptime(tweet['created_at'].replace('.000Z', ''), '%Y-%m-%dT%H:%M:%S') | |
created_at_str = created_at_obj.strftime("%d-%b-%Y, %H:%M") | |
html = tweet['text'].replace('\n\n', "<br /><br />") | |
# remove image URLs | |
for m in re.findall(r'(https://t.co/(\w+))', tweet['text']): | |
#print("--- checking URL {}".format(m[0])) | |
r = None | |
try: | |
r = requests.get(m[0]) | |
except Exception as err: | |
print("Unable to resolve URL included in tweet. Threw exception:\n\t{}".format(err)) | |
if r and r.status_code == 200: | |
mi = re.match(r'https://twitter.com/.+/status/\d+/(photo)|(video)/\d+', r.url) | |
if mi: | |
# This URL is for an included attachment | |
html = html.replace(" {}".format(m[0]), '') | |
else: | |
html = html.replace(m[0], '<a href="{}">{}</a>'.format(m[0], r.url)) | |
# add images | |
if 'attachments' in tweet and 'media_keys' in tweet['attachments']: | |
for attachment in tweet['attachments']['media_keys']: | |
include = includes[attachment] | |
if include['type'] == 'photo': | |
html += '<br /><img src="{}" />'.format(include['url']) | |
# tweet link | |
user = users[tweet['author_id']] | |
html += '<p class="tweet-link"><img src="{}" alt="{}" /><b>@{}</b> tweeted at <a href="https://twitter.com/{}/status/{}">{}</a></p></p>'.format(user['profile_image_url'], user['username'], user['username'], tweet['author_id'], tweet['id'], created_at_str) | |
position = "" | |
nav = "" | |
if thread_index and thread_len: | |
position = '<p class="tweet-position"><a name="{}">Tweet {}/{}</a><span> </span><a href="#{}">π Next tweet</a>'.format(thread_index, thread_index, thread_len, (thread_index+1)) | |
if thread_index > 1: | |
position += '<span> </span><a href="#start_of_thread">π Start</a>' | |
position += "</p>" | |
if thread_index == thread_len: | |
nav = '<p class="tweet-link"><a href="#start_of_thread">π Start</a>' | |
result = '{}<div class="tweet" data-tweet-id="{}">{}{}</div>'.format(position, tweet['id'], html, nav) | |
return result | |
def organize_tweets(tweets): | |
results = {} | |
for t in tweets: | |
results[t['id']] = t | |
return results | |
def organize_includes(includes): | |
results = {} | |
for i in includes: | |
results[i['media_key']] = i | |
return results | |
def organize_users(users): | |
results = {} | |
for u in users: | |
results[u['id']] = u | |
return results | |
def get_tweet(tweet_id): | |
results = None | |
url = 'https://api.twitter.com/2/tweets/{}?expansions=attachments.media_keys,author_id,referenced_tweets.id&media.fields=url&tweet.fields=created_at,public_metrics,source,text,author_id,in_reply_to_user_id&user.fields=profile_image_url,url,username,verified,name,created_at'.format(tweet_id) | |
r = requests.get(url, headers=headers) | |
if r.status_code == 200: | |
results = r.json() | |
else: | |
print("Could not get tweet {}. Threw err:\n\t{}".format(tweet_id, r.text)) | |
return results | |
def get_tweets_for_user(user_id, max_results=100, token=None): | |
results = None | |
if max_results > 100: max_results = 100 | |
url = 'https://api.twitter.com/2/users/{}/tweets?expansions=attachments.media_keys,author_id,referenced_tweets.id&media.fields=url&max_results=100&tweet.fields=created_at,public_metrics,source,author_id,text,in_reply_to_user_id&user.fields=profile_image_url,url,username,verified,name,created_at'.format(user_id) | |
if token: | |
url += '&pagination_token={}'.format(token) | |
r = requests.get(url, headers=headers) | |
if r.status_code == 200: | |
results = r.json() | |
else: | |
print("Could not get tweets for user {}. Threw err:\n\t{}".format(user_id, r.text)) | |
return results | |
def get_last_tweets_for_user(user_id): | |
tweets = [] | |
includes = [] | |
users = [] | |
token = None | |
for i in range(5): | |
response = get_tweets_for_user(user_id, token=token) | |
tweets += response['data'] | |
includes += response['includes']['media'] # this is a dict | |
users += response['includes']['users'] # this is a dict | |
if 'meta' in response and 'next_token' in response['meta']: | |
token = response['meta']['next_token'] | |
return { 'tweets': tweets, 'includes': includes, 'users': users, } | |
def get_tweet_in_reply_to(replied_to_tweet_id, tweets): | |
result = None | |
for k,t in tweets.items(): | |
if 'referenced_tweets' in t: | |
for ref_t in t['referenced_tweets']: | |
if ref_t['type'] == 'replied_to' and ref_t['id'] == str(replied_to_tweet_id): | |
result = t | |
break | |
return result | |
def get_thread(root_tweet, tweets): | |
thread = { root_tweet['data']['id']: root_tweet } | |
reply = get_tweet_in_reply_to(root_tweet['data']['id'], tweets) | |
if reply: | |
thread[reply['id']] = reply | |
while reply: | |
reply = get_tweet_in_reply_to(reply['id'], tweets) | |
if reply: thread[reply['id']] = reply | |
return thread | |
def main(root_tweet_id=None): | |
""" | |
Call as: | |
get_twitter_thread.py TWEET_ID | |
""" | |
if not root_tweet_id: root_tweet_id = sys.argv[-1] | |
print("Finding threat for tweet: {}".format(root_tweet_id)) | |
# Get the root tweets | |
root_tweet = get_tweet(root_tweet_id) | |
root_user_id = None | |
if root_tweet and 'data' in root_tweet and 'author_id' in root_tweet['data']: | |
root_user_id = root_tweet['data']['author_id'] | |
print("Re-building thread by user id #{}...".format(root_user_id)) | |
# Get the last 500 tweets from the current user to build the thread from | |
tweets_and_includes_and_users = get_last_tweets_for_user(root_user_id) | |
includes = organize_includes(tweets_and_includes_and_users['includes']) | |
tweets = organize_tweets(tweets_and_includes_and_users['tweets']) | |
users = organize_users(tweets_and_includes_and_users['users']) | |
print("Retrieved {} tweets to sort through...".format(len(tweets_and_includes_and_users['tweets']))) | |
# dict's now follow insertion order | |
tweet_thread = get_thread(root_tweet, tweets) | |
print(get_html_thread(tweet_thread)) | |
return tweets, includes, users, tweet_thread | |
if __name__ == '__main__': main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
An example of the output is available on this page, https://markn.ca/2021/how-a-new-user-can-run-just-one-container-in-the-aws-cloud-maybe/