Last active
May 19, 2026 19:03
-
-
Save yucer/34daf10a631e0aed39ad729c943ced18 to your computer and use it in GitHub Desktop.
Export telegram links
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Export shared links from a Telegram group topic to a markdown file. | |
| Usage: | |
| python3 export_telegram_links.py "https://t.me/+InviteHash" --topic 13287 | |
| python3 export_telegram_links.py list | |
| Authentication: | |
| Get API credentials at https://my.telegram.org/apps then set: | |
| TELEGRAM_API_ID=12345 | |
| TELEGRAM_API_HASH=abc123... | |
| On first run you will be prompted for your phone number and the login | |
| code Telegram sends you. A session file is written so subsequent runs | |
| skip that step. | |
| Requires: | |
| pip install telethon | |
| """ | |
| import sys | |
| import os | |
| import re | |
| import asyncio | |
| import argparse | |
| import urllib.request | |
| import urllib.error | |
| import html | |
| from datetime import datetime, timedelta, timezone | |
| # --------------------------------------------------------------------------- | |
| # CLI | |
| # --------------------------------------------------------------------------- | |
| def _parse_invite(raw): | |
| """Extract the +Hash from a t.me invite URL, or return the raw string.""" | |
| m = re.search(r"t\.me/\+([A-Za-z0-9_-]+)", raw) | |
| return f"+{m.group(1)}" if m else raw | |
| _parser = argparse.ArgumentParser( | |
| prog="export_telegram_links.py", | |
| description="Export shared links from a Telegram group topic to markdown.", | |
| epilog=( | |
| "Env vars: TELEGRAM_API_ID, TELEGRAM_API_HASH (from https://my.telegram.org/apps)\n" | |
| " FETCH_TITLES=0 to skip fetching page titles" | |
| ), | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| ) | |
| _parser.add_argument("channel", nargs="?", | |
| help="Invite URL (https://t.me/+Hash) or 'list'") | |
| _parser.add_argument("--topic", "-t", type=int, default=None, | |
| help="Forum topic ID") | |
| _parser.add_argument("--since", "-s", type=int, default=180, metavar="DAYS", | |
| help="Only scan messages from the last N days (default: 180)") | |
| _parser.add_argument("--output", "-o", default="./telegram_links", | |
| help="Output directory (default: ./telegram_links)") | |
| _args = _parser.parse_args() | |
| if not _args.channel: | |
| _parser.print_help() | |
| sys.exit(0) | |
| if _args.channel == "list": | |
| LIST_MODE = True | |
| CHANNEL = None | |
| TOPIC_ID = None | |
| else: | |
| LIST_MODE = False | |
| CHANNEL = _parse_invite(_args.channel) | |
| TOPIC_ID = _args.topic | |
| OUTPUT_DIR = _args.output | |
| SINCE_DAYS = _args.since | |
| API_ID = os.environ.get("TELEGRAM_API_ID", "") | |
| API_HASH = os.environ.get("TELEGRAM_API_HASH", "") | |
| BOT_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN", "") | |
| FETCH_TITLES = os.environ.get("FETCH_TITLES", "1").lower() not in ("0", "false", "no") | |
| TITLE_TIMEOUT = 5 # seconds | |
| UA = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 Chrome/124.0 Safari/537.36" | |
| # --------------------------------------------------------------------------- | |
| # Optional: fetch page <title> for richer markdown output | |
| # --------------------------------------------------------------------------- | |
| def _page_title(url): | |
| try: | |
| req = urllib.request.Request(url, headers={"User-Agent": UA}) | |
| with urllib.request.urlopen(req, timeout=TITLE_TIMEOUT) as r: | |
| ct = r.headers.get_content_type() or "" | |
| if "html" not in ct: | |
| return "" | |
| chunk = r.read(8192).decode("utf-8", errors="replace") | |
| m = re.search(r"<title[^>]*>([^<]{1,200})</title>", chunk, re.IGNORECASE) | |
| return html.unescape(m.group(1).strip()) if m else "" | |
| except Exception: | |
| return "" | |
| # --------------------------------------------------------------------------- | |
| # Link extraction from Telethon message objects | |
| # --------------------------------------------------------------------------- | |
| def _extract_urls(message): | |
| """Return a list of URLs found in message entities or plain text.""" | |
| urls = [] | |
| # Prefer structured entities — more reliable than regex on raw text | |
| if message.entities: | |
| from telethon.tl.types import ( | |
| MessageEntityUrl, | |
| MessageEntityTextUrl, | |
| MessageEntityMention, | |
| ) | |
| text = message.raw_text or "" | |
| for ent in message.entities: | |
| if isinstance(ent, MessageEntityTextUrl): | |
| urls.append(ent.url) | |
| elif isinstance(ent, MessageEntityUrl): | |
| urls.append(text[ent.offset: ent.offset + ent.length]) | |
| # Web previews attached to the message | |
| if message.web_preview and message.web_preview.url: | |
| urls.append(message.web_preview.url) | |
| # Fallback: regex scan on plain text | |
| if not urls and message.raw_text: | |
| urls = re.findall(r"https?://[^\s\"'<>)\]]+", message.raw_text) | |
| # Deduplicate while preserving order | |
| seen, unique = set(), [] | |
| for u in urls: | |
| u = u.rstrip(".,;:!?)") | |
| if u not in seen: | |
| seen.add(u) | |
| unique.append(u) | |
| return unique | |
| # --------------------------------------------------------------------------- | |
| # List available channels / groups | |
| # --------------------------------------------------------------------------- | |
| async def list_channels(client): | |
| from telethon.tl.types import Channel, Chat | |
| print(f"{'ID':<15} {'Type':<8} Title") | |
| print("-" * 60) | |
| async for dialog in client.iter_dialogs(): | |
| entity = dialog.entity | |
| if isinstance(entity, Channel): | |
| kind = "channel" if entity.broadcast else "group" | |
| cid = f"-100{entity.id}" | |
| elif isinstance(entity, Chat): | |
| kind = "group" | |
| cid = str(-entity.id) | |
| else: | |
| continue | |
| username = getattr(entity, "username", "") or "" | |
| label = f"@{username}" if username else cid | |
| print(f"{cid:<15} {kind:<8} {dialog.title} ({label})") | |
| # --------------------------------------------------------------------------- | |
| # Fetch all messages and collect links | |
| # --------------------------------------------------------------------------- | |
| async def _resolve_entity(client, channel): | |
| """Resolve a channel string to a Telethon entity, handling invite links.""" | |
| if channel.startswith("+"): | |
| # Private invite link — check if already a member, else peek via CheckChatInvite | |
| from telethon.tl.functions.messages import CheckChatInviteRequest, ImportChatInviteRequest | |
| from telethon.tl.types import ChatInviteAlready | |
| result = await client(CheckChatInviteRequest(channel[1:])) | |
| if isinstance(result, ChatInviteAlready): | |
| return result.chat | |
| # Not a member — join automatically so we can read history | |
| print(f" Joining group via invite link ...") | |
| update = await client(ImportChatInviteRequest(channel[1:])) | |
| return update.chats[0] | |
| try: | |
| return await client.get_entity(channel) | |
| except ValueError: | |
| print(" Entity not cached — fetching dialogs to resolve access hash ...") | |
| await client.get_dialogs() | |
| return await client.get_entity(channel) | |
| async def get_topic_title(client, entity, topic_id): | |
| """Fetch a single topic's title from its root service message.""" | |
| from telethon.tl.types import MessageService, MessageActionTopicCreate | |
| msg = await client.get_messages(entity, ids=topic_id) | |
| if (msg and isinstance(msg, MessageService) | |
| and isinstance(getattr(msg, "action", None), MessageActionTopicCreate)): | |
| return msg.action.title | |
| return str(topic_id) | |
| async def get_forum_topics(client, entity): | |
| """Return list of (topic_id, topic_title) for a forum group.""" | |
| from telethon.tl.functions.messages import GetForumTopicsRequest | |
| topics, offset_topic = [], 0 | |
| while True: | |
| result = await client(GetForumTopicsRequest( | |
| peer=entity, q="", | |
| offset_date=None, offset_id=0, | |
| offset_topic=offset_topic, limit=100, | |
| )) | |
| topics.extend(result.topics) | |
| if len(result.topics) < 100: | |
| break | |
| offset_topic = result.topics[-1].id | |
| return [(t.id, t.title) for t in topics] | |
| async def collect_links(client, entity, topic_id=None, topic_title=None): | |
| label = f"topic '{topic_title}'" if topic_title else ("all topics" if not topic_id else f"topic {topic_id}") | |
| since_date = (datetime.now(timezone.utc) - timedelta(days=SINCE_DAYS)) if SINCE_DAYS else None | |
| if since_date: | |
| print(f" Scanning {label} (since {since_date.date()}) ...") | |
| else: | |
| print(f" Scanning {label} ...") | |
| links = [] | |
| total = 0 | |
| iter_kwargs = {"reverse": False} | |
| if topic_id: | |
| # Forum topics are reply threads whose root is the topic-creation | |
| # service message, so fetch the thread directly instead of scanning the | |
| # whole chat and inferring membership from reply metadata. | |
| iter_kwargs["reply_to"] = topic_id | |
| async for message in client.iter_messages(entity, **iter_kwargs): | |
| if since_date and message.date and message.date < since_date: | |
| break | |
| if topic_id: | |
| rt = getattr(message, "reply_to", None) | |
| top_id = (getattr(rt, "reply_to_top_id", None) | |
| or getattr(rt, "reply_to_msg_id", None)) | |
| if top_id != topic_id: | |
| continue | |
| total += 1 | |
| if total % 500 == 0: | |
| print(f" ... scanned {total} messages") | |
| urls = _extract_urls(message) | |
| if not urls: | |
| continue | |
| sender = "" | |
| if message.sender: | |
| sender = getattr(message.sender, "username", "") or \ | |
| getattr(message.sender, "first_name", "") or "" | |
| for url in urls: | |
| links.append({ | |
| "url": url, | |
| "msg_id": message.id, | |
| "date": message.date, | |
| "sender": sender, | |
| }) | |
| print(f" found {len(links)} link(s) in {total} messages.") | |
| return links | |
| # --------------------------------------------------------------------------- | |
| # Markdown writer | |
| # --------------------------------------------------------------------------- | |
| def slugify(name): | |
| return re.sub(r"[^\w\-]", "_", name.lower().strip()) | |
| def write_markdown(channel_title, links, topic_title=None): | |
| os.makedirs(OUTPUT_DIR, exist_ok=True) | |
| parts = [slugify(channel_title)] | |
| if topic_title: | |
| parts.append(slugify(topic_title)) | |
| path = os.path.join(OUTPUT_DIR, "_".join(parts) + ".md") | |
| heading = f"{channel_title} — {topic_title}" if topic_title else channel_title | |
| with open(path, "w") as f: | |
| f.write(f"# {heading}\n\n") | |
| f.write(f"*{len(links)} link(s) exported*\n\n") | |
| for item in links: | |
| url = item["url"] | |
| date = item["date"].strftime("%Y-%m-%d") if item["date"] else "" | |
| sender = item["sender"] | |
| title = "" | |
| if FETCH_TITLES: | |
| title = _page_title(url) | |
| label = title if title else url | |
| line = f"- [{label}]({url})" | |
| meta = [x for x in [date, f"@{sender}" if sender else ""] if x] | |
| if meta: | |
| line += f" `{' | '.join(meta)}`" | |
| f.write(line + "\n") | |
| print(f" -> {path}") | |
| # --------------------------------------------------------------------------- | |
| # Main | |
| # --------------------------------------------------------------------------- | |
| async def _run(): | |
| try: | |
| from telethon import TelegramClient | |
| from telethon.sessions import StringSession | |
| except ImportError: | |
| print("ERROR: telethon is not installed. Run: pip install telethon") | |
| sys.exit(1) | |
| if not API_ID or not API_HASH: | |
| print("ERROR: Set TELEGRAM_API_ID and TELEGRAM_API_HASH.") | |
| print(" Get credentials at https://my.telegram.org/apps") | |
| sys.exit(1) | |
| session_name = slugify(CHANNEL.lstrip("@")) if CHANNEL else "session" | |
| client = TelegramClient(session_name, int(API_ID), API_HASH) | |
| if BOT_TOKEN: | |
| await client.start(bot_token=BOT_TOKEN) | |
| else: | |
| await client.start() | |
| async with client: | |
| if LIST_MODE: | |
| await list_channels(client) | |
| return | |
| entity = await _resolve_entity(client, CHANNEL) | |
| channel_title = getattr(entity, "title", CHANNEL) | |
| print(f" Group: {channel_title}") | |
| if TOPIC_ID: | |
| topics = await get_forum_topics(client, entity) | |
| topic_title = next((t for tid, t in topics if tid == TOPIC_ID), str(TOPIC_ID)) | |
| links = await collect_links(client, entity, topic_id=TOPIC_ID, topic_title=topic_title) | |
| if links: | |
| write_markdown(channel_title, links, topic_title=topic_title) | |
| else: | |
| print(" No links found.") | |
| else: | |
| print(" No --topic given — processing all forum topics ...") | |
| topics = await get_forum_topics(client, entity) | |
| print(f" Found {len(topics)} topic(s).\n") | |
| for topic_id, topic_title in topics: | |
| links = await collect_links(client, entity, topic_id=topic_id, topic_title=topic_title) | |
| if links: | |
| write_markdown(channel_title, links, topic_title=topic_title) | |
| print("\nDone.") | |
| def main(): | |
| asyncio.run(_run()) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment