Skip to content

Instantly share code, notes, and snippets.

@cofob
Created May 6, 2025 23:49
Show Gist options
  • Save cofob/671622650dadf4d9afb4434a50a936d8 to your computer and use it in GitHub Desktop.
Save cofob/671622650dadf4d9afb4434a50a936d8 to your computer and use it in GitHub Desktop.
A Python script that converts Telegram HTML chat exports into a structured SQLite database.
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "beautifulsoup4>=4.13.4",
# "lxml>=5.4.0",
# "python-dateutil>=2.9.0.post0",
# ]
# ///
import os
import re
import sqlite3
import datetime
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
from bs4 import BeautifulSoup, NavigableString
from dateutil import parser
import glob
import json
class TelegramExportParser:
def __init__(self, export_dir, db_path, batch_size=1000):
self.export_dir = export_dir
self.db_path = db_path
self.batch_size = batch_size
self.conn = None
self.cursor = None
self.user_cache = {} # Cache for user lookups
def initialize_db(self):
"""Create SQLite database and tables with indexes"""
self.conn = sqlite3.connect(self.db_path)
self.cursor = self.conn.cursor()
# Enable faster transactions
self.cursor.execute("PRAGMA synchronous = OFF")
self.cursor.execute("PRAGMA journal_mode = MEMORY")
self.cursor.execute("PRAGMA temp_store = MEMORY")
self.cursor.execute("PRAGMA cache_size = 10000")
# Create tables
self.cursor.executescript('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
name TEXT UNIQUE,
initials TEXT,
userpic_style TEXT,
userpic_class TEXT,
title TEXT
);
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY,
message_id TEXT,
user_id INTEGER,
timestamp TEXT,
date_text TEXT,
text TEXT,
is_service BOOLEAN,
is_joined BOOLEAN,
reply_to_message_id TEXT,
class_list TEXT,
FOREIGN KEY (user_id) REFERENCES users(id)
);
CREATE TABLE IF NOT EXISTS media (
id INTEGER PRIMARY KEY,
message_id INTEGER,
type TEXT,
title TEXT,
description TEXT,
file_info TEXT,
url TEXT,
class_list TEXT,
FOREIGN KEY (message_id) REFERENCES messages(id)
);
CREATE TABLE IF NOT EXISTS reactions (
id INTEGER PRIMARY KEY,
message_id INTEGER,
emoji TEXT,
count INTEGER,
FOREIGN KEY (message_id) REFERENCES messages(id)
);
CREATE TABLE IF NOT EXISTS reaction_users (
id INTEGER PRIMARY KEY,
reaction_id INTEGER,
user_id INTEGER,
FOREIGN KEY (reaction_id) REFERENCES reactions(id),
FOREIGN KEY (user_id) REFERENCES users(id)
);
CREATE TABLE IF NOT EXISTS forwarded_messages (
id INTEGER PRIMARY KEY,
message_id INTEGER,
from_name TEXT,
timestamp TEXT,
text TEXT,
FOREIGN KEY (message_id) REFERENCES messages(id)
);
CREATE TABLE IF NOT EXISTS polls (
id INTEGER PRIMARY KEY,
message_id INTEGER,
question TEXT,
is_anonymous BOOLEAN,
details TEXT,
FOREIGN KEY (message_id) REFERENCES messages(id)
);
CREATE TABLE IF NOT EXISTS poll_options (
id INTEGER PRIMARY KEY,
poll_id INTEGER,
text TEXT,
FOREIGN KEY (poll_id) REFERENCES polls(id)
);
CREATE TABLE IF NOT EXISTS stickers (
id INTEGER PRIMARY KEY,
message_id INTEGER,
emoji TEXT,
file_path TEXT,
FOREIGN KEY (message_id) REFERENCES messages(id)
);
CREATE TABLE IF NOT EXISTS service_actions (
id INTEGER PRIMARY KEY,
message_id INTEGER,
type TEXT,
target_message_id TEXT,
actor_user_id INTEGER,
target_user_id INTEGER,
action_text TEXT,
FOREIGN KEY (message_id) REFERENCES messages(id),
FOREIGN KEY (actor_user_id) REFERENCES users(id),
FOREIGN KEY (target_user_id) REFERENCES users(id)
);
CREATE TABLE IF NOT EXISTS links (
id INTEGER PRIMARY KEY,
message_id INTEGER,
url TEXT,
text TEXT,
FOREIGN KEY (message_id) REFERENCES messages(id)
);
-- Create indexes for better query performance
CREATE INDEX IF NOT EXISTS idx_messages_message_id ON messages(message_id);
CREATE INDEX IF NOT EXISTS idx_messages_user_id ON messages(user_id);
CREATE INDEX IF NOT EXISTS idx_messages_timestamp ON messages(timestamp);
CREATE INDEX IF NOT EXISTS idx_media_message_id ON media(message_id);
CREATE INDEX IF NOT EXISTS idx_reactions_message_id ON reactions(message_id);
CREATE INDEX IF NOT EXISTS idx_forwarded_message_id ON forwarded_messages(message_id);
CREATE INDEX IF NOT EXISTS idx_polls_message_id ON polls(message_id);
CREATE INDEX IF NOT EXISTS idx_poll_options_poll_id ON poll_options(poll_id);
CREATE INDEX IF NOT EXISTS idx_stickers_message_id ON stickers(message_id);
CREATE INDEX IF NOT EXISTS idx_service_actions_message_id ON service_actions(message_id);
CREATE INDEX IF NOT EXISTS idx_links_message_id ON links(message_id);
''')
self.conn.commit()
def sort_html_files(self, files):
"""Sort HTML files in proper numerical order"""
def extract_number(filename):
# Extract number from 'messagesX.html' format, where X is optional
match = re.search(r'messages(\d+)?.html', os.path.basename(filename))
if match and match.group(1):
return int(match.group(1)) # Return as integer for proper sorting
# No number or it's the main messages.html file
return 0 # messages.html should come first
return sorted(files, key=extract_number)
def find_html_files(self):
"""Find all message HTML files in the export directory"""
pattern = os.path.join(self.export_dir, "messages*.html")
html_files = glob.glob(pattern)
return self.sort_html_files(html_files)
def get_or_create_user(self, name, initials="", userpic_style="", userpic_class="", title=""):
"""Get user ID or create new user if not exists, using cache"""
# Check cache first
if name in self.user_cache:
return self.user_cache[name]
self.cursor.execute("SELECT id FROM users WHERE name = ?", (name,))
result = self.cursor.fetchone()
if result:
user_id = result[0]
# Update user data if we have more info
if initials or userpic_style or userpic_class or title:
self.cursor.execute(
"""
UPDATE users
SET initials = COALESCE(?, initials),
userpic_style = COALESCE(?, userpic_style),
userpic_class = COALESCE(?, userpic_class),
title = COALESCE(?, title)
WHERE id = ?
""",
(
initials if initials else None,
userpic_style if userpic_style else None,
userpic_class if userpic_class else None,
title if title else None,
user_id
)
)
else:
self.cursor.execute(
"INSERT INTO users (name, initials, userpic_style, userpic_class, title) VALUES (?, ?, ?, ?, ?)",
(name, initials, userpic_style, userpic_class, title)
)
user_id = self.cursor.lastrowid
# Update cache
self.user_cache[name] = user_id
return user_id
def extract_text_with_newlines(self, element):
"""Optimized text extraction preserving newlines and formatting"""
if element is None:
return ""
# Replace <br> tags with newlines - all at once for better performance
for br in element.find_all('br'):
br.replace_with('\n')
# Get the text efficiently
return element.get_text()
def extract_links(self, element):
"""Extract all links from an element"""
if element is None:
return []
links = []
for a_tag in element.find_all('a'):
href = a_tag.get('href', '')
text = a_tag.get_text()
if href:
links.append({'url': href, 'text': text})
return links
def parse_message(self, message_elem):
"""Parse a single message element and extract data"""
message_data = {}
# Get message ID
message_id = message_elem.get('id', '').replace('message', '')
message_data['message_id'] = message_id
# Get class list
class_list = message_elem.get('class', [])
message_data['class_list'] = json.dumps(class_list)
# Check if it's a service message
is_service = 'service' in class_list
message_data['is_service'] = is_service
# Check if it's a joined message (continuation from previous)
is_joined = 'joined' in class_list
message_data['is_joined'] = is_joined
if is_service:
# Service message (date headers, system messages, etc.)
body_elem = message_elem.select_one('.body.details')
if body_elem:
message_data['text'] = self.extract_text_with_newlines(body_elem)
message_data['date_text'] = self.extract_text_with_newlines(body_elem)
# Check for service actions (pinned message, new member, etc.)
action_type = None
# Check for pinned message
pinned_link = body_elem.select_one('a[onclick^="return GoToMessage"]')
if pinned_link:
action_type = 'pin'
target_id = pinned_link.get('href', '').replace('#go_to_message', '')
message_data['service_action'] = {
'type': action_type,
'target_message_id': target_id,
'action_text': self.extract_text_with_newlines(body_elem)
}
else:
# Regular message
# Get sender info
from_name_elem = message_elem.select_one('.from_name')
if from_name_elem:
message_data['from_name'] = from_name_elem.get_text(strip=True)
# Get userpic info
userpic_elem = message_elem.select_one('.userpic')
if userpic_elem:
userpic_style = userpic_elem.get('style', '')
userpic_class = ' '.join(userpic_elem.get('class', []))
initials_elem = userpic_elem.select_one('.initials')
initials = ""
title = ""
if initials_elem:
initials = initials_elem.get_text(strip=True)
title = initials_elem.get('title', '')
message_data['userpic_style'] = userpic_style
message_data['userpic_class'] = userpic_class
message_data['initials'] = initials
message_data['title'] = title
# Get timestamp
date_elem = message_elem.select_one('.date.details')
if date_elem:
time_text = date_elem.get_text(strip=True)
timestamp_title = date_elem.get('title', '')
message_data['time_text'] = time_text
message_data['timestamp'] = timestamp_title # Full timestamp with date
# Get message text
text_elem = message_elem.select_one('.text')
if text_elem:
message_data['text'] = self.extract_text_with_newlines(text_elem)
# Extract links
links = self.extract_links(text_elem)
if links:
message_data['links'] = links
# Check for reply
reply_elem = message_elem.select_one('.reply_to.details')
if reply_elem:
reply_link = reply_elem.select_one('a')
if reply_link:
reply_id = reply_link.get('href', '').replace('#go_to_message', '')
message_data['reply_to_message_id'] = reply_id
# Check for media
media_wrap = message_elem.select_one('.media_wrap')
if media_wrap:
media_data = []
media_elems = media_wrap.select('.media')
for media_elem in media_elems:
media_info = {}
# Determine media type and class list
class_list = media_elem.get('class', [])
media_info['class_list'] = json.dumps(class_list)
media_type = next((c for c in class_list if c.startswith('media_')), '')
media_type = media_type.replace('media_', '')
media_info['type'] = media_type
# Get title, description, file info
title_elem = media_elem.select_one('.title')
if title_elem:
media_info['title'] = title_elem.get_text(strip=True)
desc_elem = media_elem.select_one('.description')
if desc_elem:
media_info['description'] = desc_elem.get_text(strip=True)
status_elem = media_elem.select_one('.status.details')
if status_elem:
media_info['file_info'] = status_elem.get_text(strip=True)
# Check for links in media
a_elem = media_elem.select_one('a')
if a_elem:
media_info['url'] = a_elem.get('href', '')
media_data.append(media_info)
message_data['media'] = media_data
# Check for polls
media_poll = media_wrap.select_one('.media_poll')
if media_poll:
poll_data = {}
# Get poll question
question_elem = media_poll.select_one('.question.bold')
if question_elem:
poll_data['question'] = question_elem.get_text(strip=True)
# Check if anonymous
details_elem = media_poll.select_one('.details')
if details_elem:
details_text = details_elem.get_text(strip=True)
poll_data['details'] = details_text
poll_data['is_anonymous'] = 'Anonymous' in details_text
# Get poll options
options = []
option_elems = media_poll.select('.answer')
for option_elem in option_elems:
option_text = option_elem.get_text(strip=True)
options.append(option_text)
poll_data['options'] = options
message_data['poll'] = poll_data
# Check for reactions
reactions_elem = message_elem.select_one('.reactions')
if reactions_elem:
reactions_data = []
reaction_elems = reactions_elem.select('.reaction')
for reaction_elem in reaction_elems:
reaction_info = {}
# Get emoji
emoji_elem = reaction_elem.select_one('.emoji')
if emoji_elem:
reaction_info['emoji'] = emoji_elem.get_text(strip=True)
# Get count
count_elem = reaction_elem.select_one('.count')
if count_elem:
count_text = count_elem.get_text(strip=True)
try:
reaction_info['count'] = int(count_text)
except ValueError:
reaction_info['count'] = 0
else:
# If no count element, assume it's 1
reaction_info['count'] = 1
# Get users who reacted
userpics_elem = reaction_elem.select_one('.userpics')
if userpics_elem:
user_elems = userpics_elem.select('.userpic')
users = []
for user_elem in user_elems:
initials_elem = user_elem.select_one('.initials')
if initials_elem:
user_title = initials_elem.get('title', '')
if user_title:
users.append(user_title)
reaction_info['users'] = users
reactions_data.append(reaction_info)
message_data['reactions'] = reactions_data
# Check for stickers
sticker_elem = message_elem.select_one('a[href^="stickers/"]')
if sticker_elem:
sticker_data = {
'file_path': sticker_elem.get('href', ''),
'emoji': sticker_elem.get_text(strip=True) if sticker_elem.get_text(strip=True) else None
}
message_data['sticker'] = sticker_data
# Check for forwarded message
forwarded_body = message_elem.select_one('.forwarded.body')
if forwarded_body:
forwarded_data = {}
# Get original sender
from_name_elem = forwarded_body.select_one('.from_name')
if from_name_elem:
forwarded_data['from_name'] = from_name_elem.get_text(strip=True)
# Extract date from the text if present
date_span = from_name_elem.select_one('.date.details')
if date_span:
forwarded_data['timestamp'] = date_span.get_text(strip=True)
# Get forwarded text
text_elem = forwarded_body.select_one('.text')
if text_elem:
forwarded_data['text'] = self.extract_text_with_newlines(text_elem)
# Extract links
links = self.extract_links(text_elem)
if links:
forwarded_data['links'] = links
# Get forwarded media
media_wrap = forwarded_body.select_one('.media_wrap')
if media_wrap:
media_data = []
media_elems = media_wrap.select('.media')
for media_elem in media_elems:
media_info = {}
# Determine media type
class_list = media_elem.get('class', [])
media_type = next((c for c in class_list if c.startswith('media_')), '')
media_type = media_type.replace('media_', '')
media_info['type'] = media_type
# Get title, description, file info
title_elem = media_elem.select_one('.title')
if title_elem:
media_info['title'] = title_elem.get_text(strip=True)
desc_elem = media_elem.select_one('.description')
if desc_elem:
media_info['description'] = desc_elem.get_text(strip=True)
status_elem = media_elem.select_one('.status.details')
if status_elem:
media_info['file_info'] = status_elem.get_text(strip=True)
# Check for links in media
a_elem = media_elem.select_one('a')
if a_elem:
media_info['url'] = a_elem.get('href', '')
media_data.append(media_info)
forwarded_data['media'] = media_data
message_data['forwarded'] = forwarded_data
return message_data
def save_message_batch(self, message_batch):
"""Save a batch of messages with a single transaction"""
if not message_batch:
return
try:
# Begin transaction
self.conn.execute("BEGIN TRANSACTION")
for message_data in message_batch:
if message_data.get('is_service', False):
# Handle service message
self.cursor.execute(
"""
INSERT INTO messages
(message_id, text, is_service, date_text, is_joined, class_list)
VALUES
(?, ?, ?, ?, ?, ?)
""",
(
message_data.get('message_id', ''),
message_data.get('text', ''),
True,
message_data.get('date_text', ''),
message_data.get('is_joined', False),
message_data.get('class_list', '[]')
)
)
message_id = self.cursor.lastrowid
# Save service action if present
service_action = message_data.get('service_action')
if service_action:
self.cursor.execute(
"""
INSERT INTO service_actions
(message_id, type, target_message_id, action_text)
VALUES
(?, ?, ?, ?)
""",
(
message_id,
service_action.get('type', ''),
service_action.get('target_message_id', ''),
service_action.get('action_text', '')
)
)
else:
# Handle regular message
from_name = message_data.get('from_name', '')
initials = message_data.get('initials', '')
userpic_style = message_data.get('userpic_style', '')
userpic_class = message_data.get('userpic_class', '')
title = message_data.get('title', '')
# Get or create user
if from_name:
user_id = self.get_or_create_user(from_name, initials, userpic_style, userpic_class, title)
else:
user_id = None
# Save message
self.cursor.execute(
"""
INSERT INTO messages
(message_id, user_id, timestamp, text, is_service, reply_to_message_id, is_joined, class_list)
VALUES
(?, ?, ?, ?, ?, ?, ?, ?)
""",
(
message_data.get('message_id', ''),
user_id,
message_data.get('timestamp', ''),
message_data.get('text', ''),
False,
message_data.get('reply_to_message_id', None),
message_data.get('is_joined', False),
message_data.get('class_list', '[]')
)
)
message_id = self.cursor.lastrowid
# Save links if present
links = message_data.get('links', [])
for link_item in links:
self.cursor.execute(
"""
INSERT INTO links
(message_id, url, text)
VALUES
(?, ?, ?)
""",
(
message_id,
link_item.get('url', ''),
link_item.get('text', '')
)
)
# Save media if present
media_list = message_data.get('media', [])
for media_item in media_list:
self.cursor.execute(
"""
INSERT INTO media
(message_id, type, title, description, file_info, url, class_list)
VALUES
(?, ?, ?, ?, ?, ?, ?)
""",
(
message_id,
media_item.get('type', ''),
media_item.get('title', ''),
media_item.get('description', ''),
media_item.get('file_info', ''),
media_item.get('url', ''),
media_item.get('class_list', '[]')
)
)
# Save poll if present
poll = message_data.get('poll')
if poll:
self.cursor.execute(
"""
INSERT INTO polls
(message_id, question, is_anonymous, details)
VALUES
(?, ?, ?, ?)
""",
(
message_id,
poll.get('question', ''),
poll.get('is_anonymous', False),
poll.get('details', '')
)
)
poll_id = self.cursor.lastrowid
# Save poll options
options = poll.get('options', [])
for option_text in options:
self.cursor.execute(
"""
INSERT INTO poll_options
(poll_id, text)
VALUES
(?, ?)
""",
(poll_id, option_text)
)
# Save sticker if present
sticker = message_data.get('sticker')
if sticker:
self.cursor.execute(
"""
INSERT INTO stickers
(message_id, emoji, file_path)
VALUES
(?, ?, ?)
""",
(
message_id,
sticker.get('emoji', ''),
sticker.get('file_path', '')
)
)
# Save reactions if present
reactions_list = message_data.get('reactions', [])
for reaction_item in reactions_list:
self.cursor.execute(
"""
INSERT INTO reactions
(message_id, emoji, count)
VALUES
(?, ?, ?)
""",
(
message_id,
reaction_item.get('emoji', ''),
reaction_item.get('count', 0)
)
)
reaction_id = self.cursor.lastrowid
# Save reaction users if present
users = reaction_item.get('users', [])
for user_name in users:
user_id = self.get_or_create_user(user_name)
self.cursor.execute(
"""
INSERT INTO reaction_users
(reaction_id, user_id)
VALUES
(?, ?)
""",
(reaction_id, user_id)
)
# Save forwarded message if present
forwarded = message_data.get('forwarded', None)
if forwarded:
self.cursor.execute(
"""
INSERT INTO forwarded_messages
(message_id, from_name, timestamp, text)
VALUES
(?, ?, ?, ?)
""",
(
message_id,
forwarded.get('from_name', ''),
forwarded.get('timestamp', ''),
forwarded.get('text', '')
)
)
forwarded_message_id = self.cursor.lastrowid
# Save forwarded links if present
forwarded_links = forwarded.get('links', [])
for link_item in forwarded_links:
self.cursor.execute(
"""
INSERT INTO links
(message_id, url, text)
VALUES
(?, ?, ?)
""",
(
message_id,
link_item.get('url', ''),
link_item.get('text', '')
)
)
# Commit the transaction
self.conn.commit()
except Exception as e:
self.conn.rollback()
print(f"Error saving message batch: {e}")
def process_html_file(self, html_file):
"""Process a single HTML file and return message data"""
print(f"Parsing file: {html_file}")
message_batch = []
try:
with open(html_file, 'r', encoding='utf-8') as f:
html_content = f.read()
# Use lxml parser for better performance
soup = BeautifulSoup(html_content, 'lxml')
messages = soup.select('.message')
for message in messages:
message_data = self.parse_message(message)
message_batch.append(message_data)
return message_batch
except Exception as e:
print(f"Error processing file {html_file}: {e}")
return []
def parse_html_file_single_process(self, html_file):
"""Parse a single HTML file and extract all messages (used for single process mode)"""
message_batch = self.process_html_file(html_file)
self.save_message_batch(message_batch)
def process_files_parallel(self, html_files, max_workers=None):
"""Process HTML files in parallel"""
if max_workers is None:
max_workers = min(multiprocessing.cpu_count(), len(html_files))
print(f"Processing {len(html_files)} files using {max_workers} workers")
# Use ProcessPoolExecutor for parallel processing
with ProcessPoolExecutor(max_workers=max_workers) as executor:
# Map each HTML file to a worker process
results = list(executor.map(self._process_file_worker, html_files))
# Flatten the results
all_messages = []
for message_batch in results:
all_messages.extend(message_batch)
# Save all messages to the database
for i in range(0, len(all_messages), self.batch_size):
batch = all_messages[i:i + self.batch_size]
self.save_message_batch(batch)
print(f"Saved batch {i//self.batch_size + 1}/{(len(all_messages) + self.batch_size - 1)//self.batch_size}")
@staticmethod
def _process_file_worker(html_file):
"""Worker function for parallel processing"""
parser = TelegramExportParser("", "") # Dummy initialization
return parser.process_html_file(html_file)
def process_all_files(self, use_parallel=True):
"""Process all HTML files and save data to database"""
self.initialize_db()
html_files = self.find_html_files()
print(f"Found {len(html_files)} HTML files to process")
print("Files will be processed in this order:")
for i, file in enumerate(html_files):
print(f"{i+1}. {os.path.basename(file)}")
if use_parallel and len(html_files) > 1:
# Process files in parallel
self.process_files_parallel(html_files)
else:
# Process files sequentially
for html_file in html_files:
self.parse_html_file_single_process(html_file)
print("Processing complete!")
print(f"Database saved to: {self.db_path}")
def close(self):
"""Close database connection"""
if self.conn:
# Final optimizations
self.cursor.execute("PRAGMA optimize")
self.conn.close()
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='Parse Telegram HTML export to SQLite database')
parser.add_argument('export_dir', help='Directory containing Telegram HTML export files')
parser.add_argument('--db', default='telegram_export.db', help='Output SQLite database path')
parser.add_argument('--batch-size', type=int, default=1000, help='Batch size for database operations')
parser.add_argument('--no-parallel', action='store_true', help='Disable parallel processing')
args = parser.parse_args()
parser = TelegramExportParser(args.export_dir, args.db, args.batch_size)
try:
parser.process_all_files(not args.no_parallel)
finally:
parser.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment