Created
April 25, 2026 11:32
-
-
Save cmj/7ea610bd99a73c2bb2ab95dadeb6fbe9 to your computer and use it in GitHub Desktop.
Standalone Twitter guest RSS generator
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # Twitter guest - standalone RSS generator | |
| # Works with some news/govt accounts, ex: | |
| # NASA KIRO7Seattle AJEnglish cnn abc nbcnews ap NBCNewYork ABC7NY (AOC potus whitehouse etc) | |
| import os | |
| import time | |
| import json | |
| import random | |
| import urllib.parse | |
| import re | |
| import requests | |
| from datetime import datetime | |
| # config | |
| TWEET_URL = 'http://nitter' # Nitter instance (or x.com) | |
| CACHE_TTL = 60 | |
| DEFAULT_LIMIT = 20 | |
| USER_URL = 'https://x.com/i/api/graphql/-oaLodhGbbnzJBACb1kk2Q/UserByScreenName' | |
| TWEETS_URL = 'https://api.x.com/graphql/naBcZ4al-iTCFBYGOAMzBQ/UserTweets' | |
| BEARER_TOKEN = os.environ.get( | |
| 'BEARER_TOKEN', | |
| 'AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs%3D1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA', | |
| ) | |
| HEADERS = { | |
| 'authorization': f'Bearer {BEARER_TOKEN}', | |
| 'User-Agent': 'Mozilla/5.0', | |
| 'x-twitter-active-user': 'yes', | |
| 'x-twitter-client-language': 'en', | |
| } | |
| FEATURES_USER = '{"hidden_profile_likes_enabled":false,"hidden_profile_subscriptions_enabled":true,"responsive_web_graphql_exclude_directive_enabled":true,"verified_phone_label_enabled":false,"subscriptions_verification_info_is_identity_verified_enabled":false,"subscriptions_verification_info_verified_since_enabled":true,"highlights_tweets_tab_ui_enabled":true,"creator_subscriptions_tweet_preview_api_enabled":true,"responsive_web_graphql_skip_user_profile_image_extensions_enabled":false,"responsive_web_graphql_timeline_navigation_enabled":true}' | |
| FEATURES_TWEETS = '{"creator_subscriptions_tweet_preview_api_enabled":false,"communities_web_enable_tweet_community_results_fetch":false,"c9s_tweet_anatomy_moderator_badge_enabled":false,"articles_preview_enabled":true,"tweetypie_unmention_optimization_enabled":false,"responsive_web_edit_tweet_api_enabled":false,"graphql_is_translatable_rweb_tweet_is_translatable_enabled":false,"view_counts_everywhere_api_enabled":false,"longform_notetweets_consumption_enabled":true,"responsive_web_twitter_article_tweet_consumption_enabled":true,"tweet_awards_web_tipping_enabled":false,"creator_subscriptions_quote_tweet_preview_enabled":false,"freedom_of_speech_not_reach_fetch_enabled":false,"standardized_nudges_misinfo":false,"tweet_with_visibility_results_prefer_gql_limited_actions_policy_enabled":false,"tweet_with_visibility_results_prefer_gql_media_interstitial_enabled":false,"rweb_video_timestamps_enabled":false,"longform_notetweets_rich_text_read_enabled":true,"longform_notetweets_inline_media_enabled":true,"rweb_tipjar_consumption_enabled":false,"responsive_web_graphql_exclude_directive_enabled":false,"verified_phone_label_enabled":false,"responsive_web_graphql_skip_user_profile_image_extensions_enabled":false,"responsive_web_graphql_timeline_navigation_enabled":false,"responsive_web_enhance_cards_enabled":false,"rweb_lists_timeline_redesign_enabled":false,"responsive_web_media_download_video_enabled":false}' | |
| cache = {} | |
| URL_RE = re.compile(r'(https?://[^\s]+)') | |
| def linkify(text: str) -> str: | |
| return URL_RE.sub(lambda m: f'<a href="{m.group(0)}">{m.group(0)}</a>', text or '') | |
| def to_rfc822(dt_str: str) -> str: | |
| dt = datetime.strptime(dt_str, "%a %b %d %H:%M:%S %z %Y") | |
| return dt.strftime("%a, %d %b %Y %H:%M:%S %z") | |
| def render_media(media_list): | |
| html = [] | |
| for m in media_list: | |
| mtype = m.get('type') | |
| poster = m.get('media_url_https') | |
| if mtype == 'photo': | |
| if poster: | |
| html.append(f'<br/><img src="{poster}" loading="lazy" />') | |
| elif mtype in ('video', 'animated_gif'): | |
| variants = m.get('video_info', {}).get('variants', []) | |
| mp4s = [v for v in variants if v.get('content_type') == 'video/mp4'] | |
| if mp4s: | |
| best = max(mp4s, key=lambda v: v.get('bitrate', 0)) | |
| video_url = best.get('url') | |
| if video_url: | |
| html.append(f'<br/><video controls poster="{poster}" src="{video_url}"></video>') | |
| return ''.join(html) | |
| class TwitterRSS: | |
| def __init__(self): | |
| self.session = requests.Session() | |
| self.session.headers.update(HEADERS) | |
| # Fetch and attach guest token | |
| guest_token = self.get_guest() | |
| self.session.headers['x-guest-token'] = guest_token | |
| def _get(self, url, params): | |
| req = requests.Request('GET', url, params=params, headers=dict(self.session.headers)) | |
| prepared = self.session.prepare_request(req) | |
| t0 = time.monotonic() | |
| r = self.session.send(prepared, timeout=10) | |
| elapsed = time.monotonic() - t0 | |
| # Refresh guest token and retry once on auth failure | |
| if r.status_code in (401, 403): | |
| self.session.headers['x-guest-token'] = self.get_guest() | |
| req = requests.Request('GET', url, params=params, headers=dict(self.session.headers)) | |
| prepared = self.session.prepare_request(req) | |
| t0 = time.monotonic() | |
| r = self.session.send(prepared, timeout=10) | |
| elapsed = time.monotonic() - t0 | |
| r.raise_for_status() | |
| return r.json() | |
| def get_guest(self) -> str: | |
| """Fetch a fresh guest token from the Twitter API.""" | |
| response = self.session.post( | |
| 'https://api.twitter.com/1.1/guest/activate.json', | |
| timeout=10, | |
| ) | |
| response.raise_for_status() | |
| token = response.json().get('guest_token') | |
| if not token: | |
| raise RuntimeError("Failed to obtain guest_token from Twitter API") | |
| return token | |
| def get_user(self, username): | |
| params = { | |
| 'variables': json.dumps({"screen_name": username, "withSafetyModeUserFields": True}), | |
| 'features': FEATURES_USER, | |
| } | |
| json_response = self._get(USER_URL, params) | |
| if 'errors' in json_response and 'data' not in json_response: | |
| msgs = '; '.join(e.get('message', str(e)) for e in json_response['errors']) | |
| raise RuntimeError(f"Twitter API error for @{username}: {msgs}") | |
| try: | |
| result = json_response["data"]["user"]["result"] | |
| core = result.get("core", {}) | |
| legacy = result.get("legacy", {}) | |
| screen_name = core.get("screen_name") or legacy.get("screen_name") | |
| full_name = core.get("name") or legacy.get("name") | |
| if not screen_name: | |
| raise KeyError(f"screen_name not found in core={core} or legacy={legacy}") | |
| except (KeyError, TypeError) as exc: | |
| raise RuntimeError( | |
| f"Unexpected response shape for @{username} — " | |
| f"got keys {list(json_response.keys())}: {exc}" | |
| ) from exc | |
| user_info = { | |
| "id": result["rest_id"], | |
| "username": screen_name, | |
| "full_name": full_name, | |
| } | |
| return user_info | |
| def get_tweets(self, username, limit=DEFAULT_LIMIT): | |
| url = TWEETS_URL | |
| _user = self.get_user(username) | |
| full_name = _user.get("full_name") | |
| user_id = _user.get("id") | |
| params = { | |
| 'variables': json.dumps({ | |
| 'userId': user_id, | |
| 'count': min(limit, 20), | |
| 'includePromotedContent': False, | |
| 'withQuickPromoteEligibilityTweetFields': False, | |
| 'withVoice': True, | |
| 'withV2Timeline': True, | |
| }), | |
| 'features': FEATURES_TWEETS, | |
| } | |
| data = self._get(url, params) | |
| out = [] | |
| skipped = 0 | |
| instructions = ( | |
| data.get('data', {}) | |
| .get('user', {}) | |
| .get('result', {}) | |
| .get('timeline_v2', {}) | |
| .get('timeline', {}) | |
| .get('instructions', []) | |
| or | |
| data.get('data', {}) | |
| .get('user', {}) | |
| .get('result', {}) | |
| .get('timeline', {}) | |
| .get('timeline', {}) | |
| .get('instructions', []) | |
| ) | |
| for ins in instructions: | |
| for e in ins.get('entries', []): | |
| entry_id = e.get('entryId', '') | |
| if 'cursor' in entry_id.lower(): | |
| continue | |
| try: | |
| content_block = e.get('content', {}) | |
| entry_type = content_block.get('entryType', '') | |
| if entry_type == 'TimelineTimelineItem': | |
| item_content = content_block.get('itemContent', {}) | |
| if item_content.get('itemType') != 'TimelineTweet': | |
| continue | |
| parsed = self._parse_tweet_result(item_content, entry_id) | |
| if parsed: | |
| out.append(parsed) | |
| if len(out) >= limit: | |
| return out | |
| elif entry_type == 'TimelineTimelineModule': | |
| module_items = content_block.get('items', []) | |
| parsed_items = [] | |
| for mi in module_items: | |
| mi_content = mi.get('item', {}).get('itemContent', {}) | |
| if mi_content.get('itemType') != 'TimelineTweet': | |
| continue | |
| p = self._parse_tweet_result(mi_content, mi.get('entryId', '')) | |
| if p: | |
| parsed_items.append(p) | |
| if not parsed_items: | |
| continue | |
| reply_tweet = parsed_items[-1] | |
| if len(parsed_items) > 1: | |
| reply_tweet['reply_to'] = parsed_items[:-1] | |
| out.append(reply_tweet) | |
| if len(out) >= limit: | |
| return out | |
| else: | |
| continue | |
| except Exception as exc: | |
| skipped += 1 | |
| continue | |
| return out | |
| def _parse_tweet_result(self, item_content, entry_id=''): | |
| result = item_content['tweet_results']['result'] | |
| result_type = result.get('__typename', '') | |
| if result_type == 'TweetWithVisibilityResults': | |
| result = result['tweet'] | |
| elif result_type != 'Tweet': | |
| return None | |
| t = result['legacy'] | |
| user = result['core']['user_results']['result']['legacy'] | |
| tid = t.get('id_str') | |
| _rt_peek = t.get('retweeted_status_result', {}).get('result') | |
| if _rt_peek: | |
| _rt_peek = _rt_peek.get('tweet', _rt_peek) | |
| media = _rt_peek.get('legacy', {}).get('extended_entities', {}).get('media', []) | |
| if not media: | |
| _rt_qt = _rt_peek.get('quoted_status_result', {}).get('result', {}) | |
| _rt_qt = _rt_qt.get('tweet', _rt_qt) | |
| media = _rt_qt.get('legacy', {}).get('extended_entities', {}).get('media', []) | |
| else: | |
| media = t.get('extended_entities', {}).get('media', []) | |
| retweeted = None | |
| rt_result = t.get('retweeted_status_result', {}).get('result') | |
| if rt_result: | |
| rt_type = rt_result.get('__typename', '') | |
| if rt_type == 'TweetWithVisibilityResults': | |
| rt_result = rt_result['tweet'] | |
| if rt_result.get('legacy'): | |
| rt_legacy = rt_result['legacy'] | |
| rt_user = rt_result.get('core', {}).get('user_results', {}).get('result', {}).get('legacy', {}) | |
| rt_tid = rt_legacy.get('id_str') | |
| rt_quoted = None | |
| rt_qt_result = rt_result.get('quoted_status_result', {}).get('result') | |
| if rt_qt_result: | |
| rt_qt_result = rt_qt_result.get('tweet', rt_qt_result) | |
| if rt_qt_result.get('legacy'): | |
| rt_qt_legacy = rt_qt_result['legacy'] | |
| rt_qt_user = rt_qt_result.get('core', {}).get('user_results', {}).get('result', {}).get('legacy', {}) | |
| rt_qt_tid = rt_qt_legacy.get('id_str') | |
| rt_quoted = { | |
| 'id': rt_qt_tid, | |
| 'username': rt_qt_user.get('screen_name'), | |
| 'name': rt_qt_user.get('name'), | |
| 'content': rt_qt_legacy.get('full_text'), | |
| 'media': rt_qt_legacy.get('extended_entities', {}).get('media', []), | |
| 'tweet_url': f"{TWEET_URL}/{rt_qt_user.get('screen_name')}/status/{rt_qt_tid}", | |
| } | |
| retweeted = { | |
| 'id': rt_tid, | |
| 'username': rt_user.get('screen_name'), | |
| 'name': rt_user.get('name'), | |
| 'content': rt_legacy.get('full_text'), | |
| 'media': rt_legacy.get('extended_entities', {}).get('media', []), | |
| 'tweet_url': f"{TWEET_URL}/{rt_user.get('screen_name')}/status/{rt_tid}", | |
| 'quoted': rt_quoted, | |
| } | |
| quoted = None | |
| qt_result = result.get('quoted_status_result', {}).get('result') | |
| if qt_result: | |
| qt_type = qt_result.get('__typename', '') | |
| if qt_type == 'TweetWithVisibilityResults': | |
| qt_result = qt_result['tweet'] | |
| if qt_result.get('legacy'): | |
| qt_legacy = qt_result['legacy'] | |
| qt_user = qt_result.get('core', {}).get('user_results', {}).get('result', {}).get('legacy', {}) | |
| qt_tid = qt_legacy.get('id_str') | |
| quoted = { | |
| 'id': qt_tid, | |
| 'username': qt_user.get('screen_name'), | |
| 'name': qt_user.get('name'), | |
| 'content': qt_legacy.get('full_text'), | |
| 'media': qt_legacy.get('extended_entities', {}).get('media', []), | |
| 'tweet_url': f"{TWEET_URL}/{qt_user.get('screen_name')}/status/{qt_tid}", | |
| } | |
| return { | |
| 'id': tid, | |
| 'username': user.get('screen_name'), | |
| 'name': user.get('name'), | |
| 'published_at': t.get('created_at'), | |
| 'content': t.get('full_text'), | |
| 'media': media, | |
| 'tweet_url': f"{TWEET_URL}/{user.get('screen_name')}/status/{tid}", | |
| 'retweeted': retweeted, | |
| 'quoted': quoted, | |
| 'reply_to': [], | |
| } | |
| def build_rss(self, username, items): | |
| parts = [] | |
| for t in items: | |
| rt = t.get('retweeted') | |
| tweet_id = t['id'] | |
| tweet_url = f"{TWEET_URL}/{t['username']}/status/{tweet_id}" | |
| if rt: | |
| # Full retweet: show retweeter credit, then original tweet in full | |
| rt_text_html = linkify(rt['content']) | |
| rt_media_html = render_media(rt.get('media', [])) | |
| title_text = f"RT @{rt['username']}: {rt['content']}" | |
| rt_quoted_html = '' | |
| rt_q = rt.get('quoted') | |
| if rt_q: | |
| rt_q_text_html = linkify(rt_q['content']) | |
| rt_q_media_html = render_media(rt_q.get('media', [])) | |
| rt_quoted_html = ( | |
| f'<blockquote>' | |
| f'<b><a href="{rt_q["tweet_url"]}">{rt_q["name"]} @{rt_q["username"]}</a></b><br/>' | |
| f'{rt_q_text_html}{rt_q_media_html}' | |
| f'</blockquote>' | |
| ) | |
| description_html = ( | |
| f'<p><b>{t["name"]} @{t["username"]}</b> retweeted ' | |
| f'<a href="{rt["tweet_url"]}"><b>{rt["name"]} @{rt["username"]}</b></a>:</p>' | |
| f'<blockquote>{rt_text_html}{rt_media_html}{rt_quoted_html}</blockquote>' | |
| ) | |
| else: | |
| text_html = linkify(t['content']) | |
| media_html = render_media(t.get('media', [])) | |
| title_text = t['content'] | |
| quoted_html = '' | |
| q = t.get('quoted') | |
| if q: | |
| q_text_html = linkify(q['content']) | |
| q_media_html = render_media(q.get('media', [])) | |
| quoted_html = ( | |
| f'<blockquote>' | |
| f'<b><a href="{q["tweet_url"]}">{q["name"]} @{q["username"]}</a></b><br/>' | |
| f'{q_text_html}{q_media_html}' | |
| f'</blockquote>' | |
| ) | |
| # Render reply-to context (conversation thread parents) | |
| reply_to_html = '' | |
| for parent in t.get('reply_to', []): | |
| p_text_html = linkify(parent['content']) | |
| p_media_html = render_media(parent.get('media', [])) | |
| p_quoted_html = '' | |
| pq = parent.get('quoted') | |
| if pq: | |
| pq_text_html = linkify(pq['content']) | |
| pq_media_html = render_media(pq.get('media', [])) | |
| p_quoted_html = ( | |
| f'<blockquote>' | |
| f'<b><a href="{pq["tweet_url"]}">{pq["name"]} @{pq["username"]}</a></b><br/>' | |
| f'{pq_text_html}{pq_media_html}' | |
| f'</blockquote>' | |
| ) | |
| reply_to_html += ( | |
| f'<blockquote>' | |
| f'<b><a href="{parent["tweet_url"]}">{parent["name"]} @{parent["username"]}</a></b><br/>' | |
| f'{p_text_html}{p_media_html}{p_quoted_html}' | |
| f'</blockquote>' | |
| ) | |
| description_html = f'{reply_to_html}{text_html}{media_html}{quoted_html}' | |
| parts.append(f""" | |
| <item> | |
| <title><![CDATA[{title_text}]]></title> | |
| <description><![CDATA[{description_html}]]></description> | |
| <link>{t['tweet_url']}</link> | |
| <guid isPermaLink="false">{tweet_id}</guid> | |
| <pubDate>{to_rfc822(t['published_at'])}</pubDate> | |
| </item> | |
| """) | |
| feed = f"""<?xml version=\"1.0\" encoding=\"UTF-8\"?> | |
| <rss version=\"2.0\"> | |
| <channel> | |
| <title>@{username}</title> | |
| <link>{TWEET_URL}/{username}</link> | |
| <description>Twitter feed for @{username}</description> | |
| {''.join(parts)} | |
| </channel> | |
| </rss>""" | |
| return feed | |
| twitter = TwitterRSS() | |
| def parse_qs(qs): | |
| return dict(urllib.parse.parse_qsl(qs, keep_blank_values=True)) | |
| def app(environ, start_response): | |
| params = parse_qs(environ.get('QUERY_STRING', '')) | |
| username = params.get('user', 'NASA') | |
| try: | |
| limit = max(1, min(int(params.get('limit', DEFAULT_LIMIT)), 100)) | |
| except ValueError: | |
| start_response('400 Bad Request', [('Content-Type', 'text/plain')]) | |
| return [b"'limit' must be an integer"] | |
| key = f"{username}:{limit}" | |
| now = time.time() | |
| if key in cache: | |
| ts, payload = cache[key] | |
| age = now - ts | |
| if age < CACHE_TTL: | |
| start_response('200 OK', [('Content-Type', 'application/rss+xml')]) | |
| return [payload] | |
| else: | |
| del cache[key] | |
| try: | |
| t0 = time.monotonic() | |
| items = twitter.get_tweets(username, limit) | |
| rss = twitter.build_rss(username, items).encode() | |
| elapsed = time.monotonic() - t0 | |
| cache[key] = (now, rss) | |
| start_response('200 OK', [('Content-Type', 'application/rss+xml; charset=UTF-8')]) | |
| return [rss] | |
| except Exception as e: | |
| start_response('500 Internal Server Error', [('Content-Type', 'text/plain')]) | |
| return [str(e).encode()] | |
| if __name__ == '__main__': | |
| import argparse | |
| from wsgiref.simple_server import make_server | |
| parser = argparse.ArgumentParser(description='twitter2rss debug server') | |
| parser.add_argument('--host', default='127.0.0.1') | |
| parser.add_argument('--port', type=int, default=8080) | |
| parser.add_argument('--user', default='NASA', help='Default Twitter username to fetch') | |
| args = parser.parse_args() | |
| print(f'Serving on http://{args.host}:{args.port}/?user={args.user}') | |
| print('Press Ctrl+C to stop.') | |
| with make_server(args.host, args.port, app) as httpd: | |
| httpd.serve_forever() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment