Skip to content

Instantly share code, notes, and snippets.

@cmj
Created April 25, 2026 11:32
Show Gist options
  • Select an option

  • Save cmj/7ea610bd99a73c2bb2ab95dadeb6fbe9 to your computer and use it in GitHub Desktop.

Select an option

Save cmj/7ea610bd99a73c2bb2ab95dadeb6fbe9 to your computer and use it in GitHub Desktop.
Standalone Twitter guest RSS generator
#!/usr/bin/env python3
# Twitter guest - standalone RSS generator
# Works with some news/govt accounts, ex:
# NASA KIRO7Seattle AJEnglish cnn abc nbcnews ap NBCNewYork ABC7NY (AOC potus whitehouse etc)
import os
import time
import json
import random
import urllib.parse
import re
import requests
from datetime import datetime
# config
TWEET_URL = 'http://nitter' # Nitter instance (or x.com)
CACHE_TTL = 60
DEFAULT_LIMIT = 20
USER_URL = 'https://x.com/i/api/graphql/-oaLodhGbbnzJBACb1kk2Q/UserByScreenName'
TWEETS_URL = 'https://api.x.com/graphql/naBcZ4al-iTCFBYGOAMzBQ/UserTweets'
BEARER_TOKEN = os.environ.get(
'BEARER_TOKEN',
'AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs%3D1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA',
)
HEADERS = {
'authorization': f'Bearer {BEARER_TOKEN}',
'User-Agent': 'Mozilla/5.0',
'x-twitter-active-user': 'yes',
'x-twitter-client-language': 'en',
}
FEATURES_USER = '{"hidden_profile_likes_enabled":false,"hidden_profile_subscriptions_enabled":true,"responsive_web_graphql_exclude_directive_enabled":true,"verified_phone_label_enabled":false,"subscriptions_verification_info_is_identity_verified_enabled":false,"subscriptions_verification_info_verified_since_enabled":true,"highlights_tweets_tab_ui_enabled":true,"creator_subscriptions_tweet_preview_api_enabled":true,"responsive_web_graphql_skip_user_profile_image_extensions_enabled":false,"responsive_web_graphql_timeline_navigation_enabled":true}'
FEATURES_TWEETS = '{"creator_subscriptions_tweet_preview_api_enabled":false,"communities_web_enable_tweet_community_results_fetch":false,"c9s_tweet_anatomy_moderator_badge_enabled":false,"articles_preview_enabled":true,"tweetypie_unmention_optimization_enabled":false,"responsive_web_edit_tweet_api_enabled":false,"graphql_is_translatable_rweb_tweet_is_translatable_enabled":false,"view_counts_everywhere_api_enabled":false,"longform_notetweets_consumption_enabled":true,"responsive_web_twitter_article_tweet_consumption_enabled":true,"tweet_awards_web_tipping_enabled":false,"creator_subscriptions_quote_tweet_preview_enabled":false,"freedom_of_speech_not_reach_fetch_enabled":false,"standardized_nudges_misinfo":false,"tweet_with_visibility_results_prefer_gql_limited_actions_policy_enabled":false,"tweet_with_visibility_results_prefer_gql_media_interstitial_enabled":false,"rweb_video_timestamps_enabled":false,"longform_notetweets_rich_text_read_enabled":true,"longform_notetweets_inline_media_enabled":true,"rweb_tipjar_consumption_enabled":false,"responsive_web_graphql_exclude_directive_enabled":false,"verified_phone_label_enabled":false,"responsive_web_graphql_skip_user_profile_image_extensions_enabled":false,"responsive_web_graphql_timeline_navigation_enabled":false,"responsive_web_enhance_cards_enabled":false,"rweb_lists_timeline_redesign_enabled":false,"responsive_web_media_download_video_enabled":false}'
cache = {}
URL_RE = re.compile(r'(https?://[^\s]+)')
def linkify(text: str) -> str:
return URL_RE.sub(lambda m: f'<a href="{m.group(0)}">{m.group(0)}</a>', text or '')
def to_rfc822(dt_str: str) -> str:
dt = datetime.strptime(dt_str, "%a %b %d %H:%M:%S %z %Y")
return dt.strftime("%a, %d %b %Y %H:%M:%S %z")
def render_media(media_list):
html = []
for m in media_list:
mtype = m.get('type')
poster = m.get('media_url_https')
if mtype == 'photo':
if poster:
html.append(f'<br/><img src="{poster}" loading="lazy" />')
elif mtype in ('video', 'animated_gif'):
variants = m.get('video_info', {}).get('variants', [])
mp4s = [v for v in variants if v.get('content_type') == 'video/mp4']
if mp4s:
best = max(mp4s, key=lambda v: v.get('bitrate', 0))
video_url = best.get('url')
if video_url:
html.append(f'<br/><video controls poster="{poster}" src="{video_url}"></video>')
return ''.join(html)
class TwitterRSS:
def __init__(self):
self.session = requests.Session()
self.session.headers.update(HEADERS)
# Fetch and attach guest token
guest_token = self.get_guest()
self.session.headers['x-guest-token'] = guest_token
def _get(self, url, params):
req = requests.Request('GET', url, params=params, headers=dict(self.session.headers))
prepared = self.session.prepare_request(req)
t0 = time.monotonic()
r = self.session.send(prepared, timeout=10)
elapsed = time.monotonic() - t0
# Refresh guest token and retry once on auth failure
if r.status_code in (401, 403):
self.session.headers['x-guest-token'] = self.get_guest()
req = requests.Request('GET', url, params=params, headers=dict(self.session.headers))
prepared = self.session.prepare_request(req)
t0 = time.monotonic()
r = self.session.send(prepared, timeout=10)
elapsed = time.monotonic() - t0
r.raise_for_status()
return r.json()
def get_guest(self) -> str:
"""Fetch a fresh guest token from the Twitter API."""
response = self.session.post(
'https://api.twitter.com/1.1/guest/activate.json',
timeout=10,
)
response.raise_for_status()
token = response.json().get('guest_token')
if not token:
raise RuntimeError("Failed to obtain guest_token from Twitter API")
return token
def get_user(self, username):
params = {
'variables': json.dumps({"screen_name": username, "withSafetyModeUserFields": True}),
'features': FEATURES_USER,
}
json_response = self._get(USER_URL, params)
if 'errors' in json_response and 'data' not in json_response:
msgs = '; '.join(e.get('message', str(e)) for e in json_response['errors'])
raise RuntimeError(f"Twitter API error for @{username}: {msgs}")
try:
result = json_response["data"]["user"]["result"]
core = result.get("core", {})
legacy = result.get("legacy", {})
screen_name = core.get("screen_name") or legacy.get("screen_name")
full_name = core.get("name") or legacy.get("name")
if not screen_name:
raise KeyError(f"screen_name not found in core={core} or legacy={legacy}")
except (KeyError, TypeError) as exc:
raise RuntimeError(
f"Unexpected response shape for @{username} — "
f"got keys {list(json_response.keys())}: {exc}"
) from exc
user_info = {
"id": result["rest_id"],
"username": screen_name,
"full_name": full_name,
}
return user_info
def get_tweets(self, username, limit=DEFAULT_LIMIT):
url = TWEETS_URL
_user = self.get_user(username)
full_name = _user.get("full_name")
user_id = _user.get("id")
params = {
'variables': json.dumps({
'userId': user_id,
'count': min(limit, 20),
'includePromotedContent': False,
'withQuickPromoteEligibilityTweetFields': False,
'withVoice': True,
'withV2Timeline': True,
}),
'features': FEATURES_TWEETS,
}
data = self._get(url, params)
out = []
skipped = 0
instructions = (
data.get('data', {})
.get('user', {})
.get('result', {})
.get('timeline_v2', {})
.get('timeline', {})
.get('instructions', [])
or
data.get('data', {})
.get('user', {})
.get('result', {})
.get('timeline', {})
.get('timeline', {})
.get('instructions', [])
)
for ins in instructions:
for e in ins.get('entries', []):
entry_id = e.get('entryId', '')
if 'cursor' in entry_id.lower():
continue
try:
content_block = e.get('content', {})
entry_type = content_block.get('entryType', '')
if entry_type == 'TimelineTimelineItem':
item_content = content_block.get('itemContent', {})
if item_content.get('itemType') != 'TimelineTweet':
continue
parsed = self._parse_tweet_result(item_content, entry_id)
if parsed:
out.append(parsed)
if len(out) >= limit:
return out
elif entry_type == 'TimelineTimelineModule':
module_items = content_block.get('items', [])
parsed_items = []
for mi in module_items:
mi_content = mi.get('item', {}).get('itemContent', {})
if mi_content.get('itemType') != 'TimelineTweet':
continue
p = self._parse_tweet_result(mi_content, mi.get('entryId', ''))
if p:
parsed_items.append(p)
if not parsed_items:
continue
reply_tweet = parsed_items[-1]
if len(parsed_items) > 1:
reply_tweet['reply_to'] = parsed_items[:-1]
out.append(reply_tweet)
if len(out) >= limit:
return out
else:
continue
except Exception as exc:
skipped += 1
continue
return out
def _parse_tweet_result(self, item_content, entry_id=''):
result = item_content['tweet_results']['result']
result_type = result.get('__typename', '')
if result_type == 'TweetWithVisibilityResults':
result = result['tweet']
elif result_type != 'Tweet':
return None
t = result['legacy']
user = result['core']['user_results']['result']['legacy']
tid = t.get('id_str')
_rt_peek = t.get('retweeted_status_result', {}).get('result')
if _rt_peek:
_rt_peek = _rt_peek.get('tweet', _rt_peek)
media = _rt_peek.get('legacy', {}).get('extended_entities', {}).get('media', [])
if not media:
_rt_qt = _rt_peek.get('quoted_status_result', {}).get('result', {})
_rt_qt = _rt_qt.get('tweet', _rt_qt)
media = _rt_qt.get('legacy', {}).get('extended_entities', {}).get('media', [])
else:
media = t.get('extended_entities', {}).get('media', [])
retweeted = None
rt_result = t.get('retweeted_status_result', {}).get('result')
if rt_result:
rt_type = rt_result.get('__typename', '')
if rt_type == 'TweetWithVisibilityResults':
rt_result = rt_result['tweet']
if rt_result.get('legacy'):
rt_legacy = rt_result['legacy']
rt_user = rt_result.get('core', {}).get('user_results', {}).get('result', {}).get('legacy', {})
rt_tid = rt_legacy.get('id_str')
rt_quoted = None
rt_qt_result = rt_result.get('quoted_status_result', {}).get('result')
if rt_qt_result:
rt_qt_result = rt_qt_result.get('tweet', rt_qt_result)
if rt_qt_result.get('legacy'):
rt_qt_legacy = rt_qt_result['legacy']
rt_qt_user = rt_qt_result.get('core', {}).get('user_results', {}).get('result', {}).get('legacy', {})
rt_qt_tid = rt_qt_legacy.get('id_str')
rt_quoted = {
'id': rt_qt_tid,
'username': rt_qt_user.get('screen_name'),
'name': rt_qt_user.get('name'),
'content': rt_qt_legacy.get('full_text'),
'media': rt_qt_legacy.get('extended_entities', {}).get('media', []),
'tweet_url': f"{TWEET_URL}/{rt_qt_user.get('screen_name')}/status/{rt_qt_tid}",
}
retweeted = {
'id': rt_tid,
'username': rt_user.get('screen_name'),
'name': rt_user.get('name'),
'content': rt_legacy.get('full_text'),
'media': rt_legacy.get('extended_entities', {}).get('media', []),
'tweet_url': f"{TWEET_URL}/{rt_user.get('screen_name')}/status/{rt_tid}",
'quoted': rt_quoted,
}
quoted = None
qt_result = result.get('quoted_status_result', {}).get('result')
if qt_result:
qt_type = qt_result.get('__typename', '')
if qt_type == 'TweetWithVisibilityResults':
qt_result = qt_result['tweet']
if qt_result.get('legacy'):
qt_legacy = qt_result['legacy']
qt_user = qt_result.get('core', {}).get('user_results', {}).get('result', {}).get('legacy', {})
qt_tid = qt_legacy.get('id_str')
quoted = {
'id': qt_tid,
'username': qt_user.get('screen_name'),
'name': qt_user.get('name'),
'content': qt_legacy.get('full_text'),
'media': qt_legacy.get('extended_entities', {}).get('media', []),
'tweet_url': f"{TWEET_URL}/{qt_user.get('screen_name')}/status/{qt_tid}",
}
return {
'id': tid,
'username': user.get('screen_name'),
'name': user.get('name'),
'published_at': t.get('created_at'),
'content': t.get('full_text'),
'media': media,
'tweet_url': f"{TWEET_URL}/{user.get('screen_name')}/status/{tid}",
'retweeted': retweeted,
'quoted': quoted,
'reply_to': [],
}
def build_rss(self, username, items):
parts = []
for t in items:
rt = t.get('retweeted')
tweet_id = t['id']
tweet_url = f"{TWEET_URL}/{t['username']}/status/{tweet_id}"
if rt:
# Full retweet: show retweeter credit, then original tweet in full
rt_text_html = linkify(rt['content'])
rt_media_html = render_media(rt.get('media', []))
title_text = f"RT @{rt['username']}: {rt['content']}"
rt_quoted_html = ''
rt_q = rt.get('quoted')
if rt_q:
rt_q_text_html = linkify(rt_q['content'])
rt_q_media_html = render_media(rt_q.get('media', []))
rt_quoted_html = (
f'<blockquote>'
f'<b><a href="{rt_q["tweet_url"]}">{rt_q["name"]} @{rt_q["username"]}</a></b><br/>'
f'{rt_q_text_html}{rt_q_media_html}'
f'</blockquote>'
)
description_html = (
f'<p><b>{t["name"]} @{t["username"]}</b> retweeted '
f'<a href="{rt["tweet_url"]}"><b>{rt["name"]} @{rt["username"]}</b></a>:</p>'
f'<blockquote>{rt_text_html}{rt_media_html}{rt_quoted_html}</blockquote>'
)
else:
text_html = linkify(t['content'])
media_html = render_media(t.get('media', []))
title_text = t['content']
quoted_html = ''
q = t.get('quoted')
if q:
q_text_html = linkify(q['content'])
q_media_html = render_media(q.get('media', []))
quoted_html = (
f'<blockquote>'
f'<b><a href="{q["tweet_url"]}">{q["name"]} @{q["username"]}</a></b><br/>'
f'{q_text_html}{q_media_html}'
f'</blockquote>'
)
# Render reply-to context (conversation thread parents)
reply_to_html = ''
for parent in t.get('reply_to', []):
p_text_html = linkify(parent['content'])
p_media_html = render_media(parent.get('media', []))
p_quoted_html = ''
pq = parent.get('quoted')
if pq:
pq_text_html = linkify(pq['content'])
pq_media_html = render_media(pq.get('media', []))
p_quoted_html = (
f'<blockquote>'
f'<b><a href="{pq["tweet_url"]}">{pq["name"]} @{pq["username"]}</a></b><br/>'
f'{pq_text_html}{pq_media_html}'
f'</blockquote>'
)
reply_to_html += (
f'<blockquote>'
f'<b><a href="{parent["tweet_url"]}">{parent["name"]} @{parent["username"]}</a></b><br/>'
f'{p_text_html}{p_media_html}{p_quoted_html}'
f'</blockquote>'
)
description_html = f'{reply_to_html}{text_html}{media_html}{quoted_html}'
parts.append(f"""
<item>
<title><![CDATA[{title_text}]]></title>
<description><![CDATA[{description_html}]]></description>
<link>{t['tweet_url']}</link>
<guid isPermaLink="false">{tweet_id}</guid>
<pubDate>{to_rfc822(t['published_at'])}</pubDate>
</item>
""")
feed = f"""<?xml version=\"1.0\" encoding=\"UTF-8\"?>
<rss version=\"2.0\">
<channel>
<title>@{username}</title>
<link>{TWEET_URL}/{username}</link>
<description>Twitter feed for @{username}</description>
{''.join(parts)}
</channel>
</rss>"""
return feed
twitter = TwitterRSS()
def parse_qs(qs):
return dict(urllib.parse.parse_qsl(qs, keep_blank_values=True))
def app(environ, start_response):
params = parse_qs(environ.get('QUERY_STRING', ''))
username = params.get('user', 'NASA')
try:
limit = max(1, min(int(params.get('limit', DEFAULT_LIMIT)), 100))
except ValueError:
start_response('400 Bad Request', [('Content-Type', 'text/plain')])
return [b"'limit' must be an integer"]
key = f"{username}:{limit}"
now = time.time()
if key in cache:
ts, payload = cache[key]
age = now - ts
if age < CACHE_TTL:
start_response('200 OK', [('Content-Type', 'application/rss+xml')])
return [payload]
else:
del cache[key]
try:
t0 = time.monotonic()
items = twitter.get_tweets(username, limit)
rss = twitter.build_rss(username, items).encode()
elapsed = time.monotonic() - t0
cache[key] = (now, rss)
start_response('200 OK', [('Content-Type', 'application/rss+xml; charset=UTF-8')])
return [rss]
except Exception as e:
start_response('500 Internal Server Error', [('Content-Type', 'text/plain')])
return [str(e).encode()]
if __name__ == '__main__':
import argparse
from wsgiref.simple_server import make_server
parser = argparse.ArgumentParser(description='twitter2rss debug server')
parser.add_argument('--host', default='127.0.0.1')
parser.add_argument('--port', type=int, default=8080)
parser.add_argument('--user', default='NASA', help='Default Twitter username to fetch')
args = parser.parse_args()
print(f'Serving on http://{args.host}:{args.port}/?user={args.user}')
print('Press Ctrl+C to stop.')
with make_server(args.host, args.port, app) as httpd:
httpd.serve_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment