Created
May 6, 2025 23:49
-
-
Save cofob/671622650dadf4d9afb4434a50a936d8 to your computer and use it in GitHub Desktop.
A Python script that converts Telegram HTML chat exports into a structured SQLite database.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env -S uv run --script | |
# /// script | |
# requires-python = ">=3.12" | |
# dependencies = [ | |
# "beautifulsoup4>=4.13.4", | |
# "lxml>=5.4.0", | |
# "python-dateutil>=2.9.0.post0", | |
# ] | |
# /// | |
import os | |
import re | |
import sqlite3 | |
import datetime | |
import multiprocessing | |
from concurrent.futures import ProcessPoolExecutor | |
from bs4 import BeautifulSoup, NavigableString | |
from dateutil import parser | |
import glob | |
import json | |
class TelegramExportParser: | |
def __init__(self, export_dir, db_path, batch_size=1000): | |
self.export_dir = export_dir | |
self.db_path = db_path | |
self.batch_size = batch_size | |
self.conn = None | |
self.cursor = None | |
self.user_cache = {} # Cache for user lookups | |
def initialize_db(self): | |
"""Create SQLite database and tables with indexes""" | |
self.conn = sqlite3.connect(self.db_path) | |
self.cursor = self.conn.cursor() | |
# Enable faster transactions | |
self.cursor.execute("PRAGMA synchronous = OFF") | |
self.cursor.execute("PRAGMA journal_mode = MEMORY") | |
self.cursor.execute("PRAGMA temp_store = MEMORY") | |
self.cursor.execute("PRAGMA cache_size = 10000") | |
# Create tables | |
self.cursor.executescript(''' | |
CREATE TABLE IF NOT EXISTS users ( | |
id INTEGER PRIMARY KEY, | |
name TEXT UNIQUE, | |
initials TEXT, | |
userpic_style TEXT, | |
userpic_class TEXT, | |
title TEXT | |
); | |
CREATE TABLE IF NOT EXISTS messages ( | |
id INTEGER PRIMARY KEY, | |
message_id TEXT, | |
user_id INTEGER, | |
timestamp TEXT, | |
date_text TEXT, | |
text TEXT, | |
is_service BOOLEAN, | |
is_joined BOOLEAN, | |
reply_to_message_id TEXT, | |
class_list TEXT, | |
FOREIGN KEY (user_id) REFERENCES users(id) | |
); | |
CREATE TABLE IF NOT EXISTS media ( | |
id INTEGER PRIMARY KEY, | |
message_id INTEGER, | |
type TEXT, | |
title TEXT, | |
description TEXT, | |
file_info TEXT, | |
url TEXT, | |
class_list TEXT, | |
FOREIGN KEY (message_id) REFERENCES messages(id) | |
); | |
CREATE TABLE IF NOT EXISTS reactions ( | |
id INTEGER PRIMARY KEY, | |
message_id INTEGER, | |
emoji TEXT, | |
count INTEGER, | |
FOREIGN KEY (message_id) REFERENCES messages(id) | |
); | |
CREATE TABLE IF NOT EXISTS reaction_users ( | |
id INTEGER PRIMARY KEY, | |
reaction_id INTEGER, | |
user_id INTEGER, | |
FOREIGN KEY (reaction_id) REFERENCES reactions(id), | |
FOREIGN KEY (user_id) REFERENCES users(id) | |
); | |
CREATE TABLE IF NOT EXISTS forwarded_messages ( | |
id INTEGER PRIMARY KEY, | |
message_id INTEGER, | |
from_name TEXT, | |
timestamp TEXT, | |
text TEXT, | |
FOREIGN KEY (message_id) REFERENCES messages(id) | |
); | |
CREATE TABLE IF NOT EXISTS polls ( | |
id INTEGER PRIMARY KEY, | |
message_id INTEGER, | |
question TEXT, | |
is_anonymous BOOLEAN, | |
details TEXT, | |
FOREIGN KEY (message_id) REFERENCES messages(id) | |
); | |
CREATE TABLE IF NOT EXISTS poll_options ( | |
id INTEGER PRIMARY KEY, | |
poll_id INTEGER, | |
text TEXT, | |
FOREIGN KEY (poll_id) REFERENCES polls(id) | |
); | |
CREATE TABLE IF NOT EXISTS stickers ( | |
id INTEGER PRIMARY KEY, | |
message_id INTEGER, | |
emoji TEXT, | |
file_path TEXT, | |
FOREIGN KEY (message_id) REFERENCES messages(id) | |
); | |
CREATE TABLE IF NOT EXISTS service_actions ( | |
id INTEGER PRIMARY KEY, | |
message_id INTEGER, | |
type TEXT, | |
target_message_id TEXT, | |
actor_user_id INTEGER, | |
target_user_id INTEGER, | |
action_text TEXT, | |
FOREIGN KEY (message_id) REFERENCES messages(id), | |
FOREIGN KEY (actor_user_id) REFERENCES users(id), | |
FOREIGN KEY (target_user_id) REFERENCES users(id) | |
); | |
CREATE TABLE IF NOT EXISTS links ( | |
id INTEGER PRIMARY KEY, | |
message_id INTEGER, | |
url TEXT, | |
text TEXT, | |
FOREIGN KEY (message_id) REFERENCES messages(id) | |
); | |
-- Create indexes for better query performance | |
CREATE INDEX IF NOT EXISTS idx_messages_message_id ON messages(message_id); | |
CREATE INDEX IF NOT EXISTS idx_messages_user_id ON messages(user_id); | |
CREATE INDEX IF NOT EXISTS idx_messages_timestamp ON messages(timestamp); | |
CREATE INDEX IF NOT EXISTS idx_media_message_id ON media(message_id); | |
CREATE INDEX IF NOT EXISTS idx_reactions_message_id ON reactions(message_id); | |
CREATE INDEX IF NOT EXISTS idx_forwarded_message_id ON forwarded_messages(message_id); | |
CREATE INDEX IF NOT EXISTS idx_polls_message_id ON polls(message_id); | |
CREATE INDEX IF NOT EXISTS idx_poll_options_poll_id ON poll_options(poll_id); | |
CREATE INDEX IF NOT EXISTS idx_stickers_message_id ON stickers(message_id); | |
CREATE INDEX IF NOT EXISTS idx_service_actions_message_id ON service_actions(message_id); | |
CREATE INDEX IF NOT EXISTS idx_links_message_id ON links(message_id); | |
''') | |
self.conn.commit() | |
def sort_html_files(self, files): | |
"""Sort HTML files in proper numerical order""" | |
def extract_number(filename): | |
# Extract number from 'messagesX.html' format, where X is optional | |
match = re.search(r'messages(\d+)?.html', os.path.basename(filename)) | |
if match and match.group(1): | |
return int(match.group(1)) # Return as integer for proper sorting | |
# No number or it's the main messages.html file | |
return 0 # messages.html should come first | |
return sorted(files, key=extract_number) | |
def find_html_files(self): | |
"""Find all message HTML files in the export directory""" | |
pattern = os.path.join(self.export_dir, "messages*.html") | |
html_files = glob.glob(pattern) | |
return self.sort_html_files(html_files) | |
def get_or_create_user(self, name, initials="", userpic_style="", userpic_class="", title=""): | |
"""Get user ID or create new user if not exists, using cache""" | |
# Check cache first | |
if name in self.user_cache: | |
return self.user_cache[name] | |
self.cursor.execute("SELECT id FROM users WHERE name = ?", (name,)) | |
result = self.cursor.fetchone() | |
if result: | |
user_id = result[0] | |
# Update user data if we have more info | |
if initials or userpic_style or userpic_class or title: | |
self.cursor.execute( | |
""" | |
UPDATE users | |
SET initials = COALESCE(?, initials), | |
userpic_style = COALESCE(?, userpic_style), | |
userpic_class = COALESCE(?, userpic_class), | |
title = COALESCE(?, title) | |
WHERE id = ? | |
""", | |
( | |
initials if initials else None, | |
userpic_style if userpic_style else None, | |
userpic_class if userpic_class else None, | |
title if title else None, | |
user_id | |
) | |
) | |
else: | |
self.cursor.execute( | |
"INSERT INTO users (name, initials, userpic_style, userpic_class, title) VALUES (?, ?, ?, ?, ?)", | |
(name, initials, userpic_style, userpic_class, title) | |
) | |
user_id = self.cursor.lastrowid | |
# Update cache | |
self.user_cache[name] = user_id | |
return user_id | |
def extract_text_with_newlines(self, element): | |
"""Optimized text extraction preserving newlines and formatting""" | |
if element is None: | |
return "" | |
# Replace <br> tags with newlines - all at once for better performance | |
for br in element.find_all('br'): | |
br.replace_with('\n') | |
# Get the text efficiently | |
return element.get_text() | |
def extract_links(self, element): | |
"""Extract all links from an element""" | |
if element is None: | |
return [] | |
links = [] | |
for a_tag in element.find_all('a'): | |
href = a_tag.get('href', '') | |
text = a_tag.get_text() | |
if href: | |
links.append({'url': href, 'text': text}) | |
return links | |
def parse_message(self, message_elem): | |
"""Parse a single message element and extract data""" | |
message_data = {} | |
# Get message ID | |
message_id = message_elem.get('id', '').replace('message', '') | |
message_data['message_id'] = message_id | |
# Get class list | |
class_list = message_elem.get('class', []) | |
message_data['class_list'] = json.dumps(class_list) | |
# Check if it's a service message | |
is_service = 'service' in class_list | |
message_data['is_service'] = is_service | |
# Check if it's a joined message (continuation from previous) | |
is_joined = 'joined' in class_list | |
message_data['is_joined'] = is_joined | |
if is_service: | |
# Service message (date headers, system messages, etc.) | |
body_elem = message_elem.select_one('.body.details') | |
if body_elem: | |
message_data['text'] = self.extract_text_with_newlines(body_elem) | |
message_data['date_text'] = self.extract_text_with_newlines(body_elem) | |
# Check for service actions (pinned message, new member, etc.) | |
action_type = None | |
# Check for pinned message | |
pinned_link = body_elem.select_one('a[onclick^="return GoToMessage"]') | |
if pinned_link: | |
action_type = 'pin' | |
target_id = pinned_link.get('href', '').replace('#go_to_message', '') | |
message_data['service_action'] = { | |
'type': action_type, | |
'target_message_id': target_id, | |
'action_text': self.extract_text_with_newlines(body_elem) | |
} | |
else: | |
# Regular message | |
# Get sender info | |
from_name_elem = message_elem.select_one('.from_name') | |
if from_name_elem: | |
message_data['from_name'] = from_name_elem.get_text(strip=True) | |
# Get userpic info | |
userpic_elem = message_elem.select_one('.userpic') | |
if userpic_elem: | |
userpic_style = userpic_elem.get('style', '') | |
userpic_class = ' '.join(userpic_elem.get('class', [])) | |
initials_elem = userpic_elem.select_one('.initials') | |
initials = "" | |
title = "" | |
if initials_elem: | |
initials = initials_elem.get_text(strip=True) | |
title = initials_elem.get('title', '') | |
message_data['userpic_style'] = userpic_style | |
message_data['userpic_class'] = userpic_class | |
message_data['initials'] = initials | |
message_data['title'] = title | |
# Get timestamp | |
date_elem = message_elem.select_one('.date.details') | |
if date_elem: | |
time_text = date_elem.get_text(strip=True) | |
timestamp_title = date_elem.get('title', '') | |
message_data['time_text'] = time_text | |
message_data['timestamp'] = timestamp_title # Full timestamp with date | |
# Get message text | |
text_elem = message_elem.select_one('.text') | |
if text_elem: | |
message_data['text'] = self.extract_text_with_newlines(text_elem) | |
# Extract links | |
links = self.extract_links(text_elem) | |
if links: | |
message_data['links'] = links | |
# Check for reply | |
reply_elem = message_elem.select_one('.reply_to.details') | |
if reply_elem: | |
reply_link = reply_elem.select_one('a') | |
if reply_link: | |
reply_id = reply_link.get('href', '').replace('#go_to_message', '') | |
message_data['reply_to_message_id'] = reply_id | |
# Check for media | |
media_wrap = message_elem.select_one('.media_wrap') | |
if media_wrap: | |
media_data = [] | |
media_elems = media_wrap.select('.media') | |
for media_elem in media_elems: | |
media_info = {} | |
# Determine media type and class list | |
class_list = media_elem.get('class', []) | |
media_info['class_list'] = json.dumps(class_list) | |
media_type = next((c for c in class_list if c.startswith('media_')), '') | |
media_type = media_type.replace('media_', '') | |
media_info['type'] = media_type | |
# Get title, description, file info | |
title_elem = media_elem.select_one('.title') | |
if title_elem: | |
media_info['title'] = title_elem.get_text(strip=True) | |
desc_elem = media_elem.select_one('.description') | |
if desc_elem: | |
media_info['description'] = desc_elem.get_text(strip=True) | |
status_elem = media_elem.select_one('.status.details') | |
if status_elem: | |
media_info['file_info'] = status_elem.get_text(strip=True) | |
# Check for links in media | |
a_elem = media_elem.select_one('a') | |
if a_elem: | |
media_info['url'] = a_elem.get('href', '') | |
media_data.append(media_info) | |
message_data['media'] = media_data | |
# Check for polls | |
media_poll = media_wrap.select_one('.media_poll') | |
if media_poll: | |
poll_data = {} | |
# Get poll question | |
question_elem = media_poll.select_one('.question.bold') | |
if question_elem: | |
poll_data['question'] = question_elem.get_text(strip=True) | |
# Check if anonymous | |
details_elem = media_poll.select_one('.details') | |
if details_elem: | |
details_text = details_elem.get_text(strip=True) | |
poll_data['details'] = details_text | |
poll_data['is_anonymous'] = 'Anonymous' in details_text | |
# Get poll options | |
options = [] | |
option_elems = media_poll.select('.answer') | |
for option_elem in option_elems: | |
option_text = option_elem.get_text(strip=True) | |
options.append(option_text) | |
poll_data['options'] = options | |
message_data['poll'] = poll_data | |
# Check for reactions | |
reactions_elem = message_elem.select_one('.reactions') | |
if reactions_elem: | |
reactions_data = [] | |
reaction_elems = reactions_elem.select('.reaction') | |
for reaction_elem in reaction_elems: | |
reaction_info = {} | |
# Get emoji | |
emoji_elem = reaction_elem.select_one('.emoji') | |
if emoji_elem: | |
reaction_info['emoji'] = emoji_elem.get_text(strip=True) | |
# Get count | |
count_elem = reaction_elem.select_one('.count') | |
if count_elem: | |
count_text = count_elem.get_text(strip=True) | |
try: | |
reaction_info['count'] = int(count_text) | |
except ValueError: | |
reaction_info['count'] = 0 | |
else: | |
# If no count element, assume it's 1 | |
reaction_info['count'] = 1 | |
# Get users who reacted | |
userpics_elem = reaction_elem.select_one('.userpics') | |
if userpics_elem: | |
user_elems = userpics_elem.select('.userpic') | |
users = [] | |
for user_elem in user_elems: | |
initials_elem = user_elem.select_one('.initials') | |
if initials_elem: | |
user_title = initials_elem.get('title', '') | |
if user_title: | |
users.append(user_title) | |
reaction_info['users'] = users | |
reactions_data.append(reaction_info) | |
message_data['reactions'] = reactions_data | |
# Check for stickers | |
sticker_elem = message_elem.select_one('a[href^="stickers/"]') | |
if sticker_elem: | |
sticker_data = { | |
'file_path': sticker_elem.get('href', ''), | |
'emoji': sticker_elem.get_text(strip=True) if sticker_elem.get_text(strip=True) else None | |
} | |
message_data['sticker'] = sticker_data | |
# Check for forwarded message | |
forwarded_body = message_elem.select_one('.forwarded.body') | |
if forwarded_body: | |
forwarded_data = {} | |
# Get original sender | |
from_name_elem = forwarded_body.select_one('.from_name') | |
if from_name_elem: | |
forwarded_data['from_name'] = from_name_elem.get_text(strip=True) | |
# Extract date from the text if present | |
date_span = from_name_elem.select_one('.date.details') | |
if date_span: | |
forwarded_data['timestamp'] = date_span.get_text(strip=True) | |
# Get forwarded text | |
text_elem = forwarded_body.select_one('.text') | |
if text_elem: | |
forwarded_data['text'] = self.extract_text_with_newlines(text_elem) | |
# Extract links | |
links = self.extract_links(text_elem) | |
if links: | |
forwarded_data['links'] = links | |
# Get forwarded media | |
media_wrap = forwarded_body.select_one('.media_wrap') | |
if media_wrap: | |
media_data = [] | |
media_elems = media_wrap.select('.media') | |
for media_elem in media_elems: | |
media_info = {} | |
# Determine media type | |
class_list = media_elem.get('class', []) | |
media_type = next((c for c in class_list if c.startswith('media_')), '') | |
media_type = media_type.replace('media_', '') | |
media_info['type'] = media_type | |
# Get title, description, file info | |
title_elem = media_elem.select_one('.title') | |
if title_elem: | |
media_info['title'] = title_elem.get_text(strip=True) | |
desc_elem = media_elem.select_one('.description') | |
if desc_elem: | |
media_info['description'] = desc_elem.get_text(strip=True) | |
status_elem = media_elem.select_one('.status.details') | |
if status_elem: | |
media_info['file_info'] = status_elem.get_text(strip=True) | |
# Check for links in media | |
a_elem = media_elem.select_one('a') | |
if a_elem: | |
media_info['url'] = a_elem.get('href', '') | |
media_data.append(media_info) | |
forwarded_data['media'] = media_data | |
message_data['forwarded'] = forwarded_data | |
return message_data | |
def save_message_batch(self, message_batch): | |
"""Save a batch of messages with a single transaction""" | |
if not message_batch: | |
return | |
try: | |
# Begin transaction | |
self.conn.execute("BEGIN TRANSACTION") | |
for message_data in message_batch: | |
if message_data.get('is_service', False): | |
# Handle service message | |
self.cursor.execute( | |
""" | |
INSERT INTO messages | |
(message_id, text, is_service, date_text, is_joined, class_list) | |
VALUES | |
(?, ?, ?, ?, ?, ?) | |
""", | |
( | |
message_data.get('message_id', ''), | |
message_data.get('text', ''), | |
True, | |
message_data.get('date_text', ''), | |
message_data.get('is_joined', False), | |
message_data.get('class_list', '[]') | |
) | |
) | |
message_id = self.cursor.lastrowid | |
# Save service action if present | |
service_action = message_data.get('service_action') | |
if service_action: | |
self.cursor.execute( | |
""" | |
INSERT INTO service_actions | |
(message_id, type, target_message_id, action_text) | |
VALUES | |
(?, ?, ?, ?) | |
""", | |
( | |
message_id, | |
service_action.get('type', ''), | |
service_action.get('target_message_id', ''), | |
service_action.get('action_text', '') | |
) | |
) | |
else: | |
# Handle regular message | |
from_name = message_data.get('from_name', '') | |
initials = message_data.get('initials', '') | |
userpic_style = message_data.get('userpic_style', '') | |
userpic_class = message_data.get('userpic_class', '') | |
title = message_data.get('title', '') | |
# Get or create user | |
if from_name: | |
user_id = self.get_or_create_user(from_name, initials, userpic_style, userpic_class, title) | |
else: | |
user_id = None | |
# Save message | |
self.cursor.execute( | |
""" | |
INSERT INTO messages | |
(message_id, user_id, timestamp, text, is_service, reply_to_message_id, is_joined, class_list) | |
VALUES | |
(?, ?, ?, ?, ?, ?, ?, ?) | |
""", | |
( | |
message_data.get('message_id', ''), | |
user_id, | |
message_data.get('timestamp', ''), | |
message_data.get('text', ''), | |
False, | |
message_data.get('reply_to_message_id', None), | |
message_data.get('is_joined', False), | |
message_data.get('class_list', '[]') | |
) | |
) | |
message_id = self.cursor.lastrowid | |
# Save links if present | |
links = message_data.get('links', []) | |
for link_item in links: | |
self.cursor.execute( | |
""" | |
INSERT INTO links | |
(message_id, url, text) | |
VALUES | |
(?, ?, ?) | |
""", | |
( | |
message_id, | |
link_item.get('url', ''), | |
link_item.get('text', '') | |
) | |
) | |
# Save media if present | |
media_list = message_data.get('media', []) | |
for media_item in media_list: | |
self.cursor.execute( | |
""" | |
INSERT INTO media | |
(message_id, type, title, description, file_info, url, class_list) | |
VALUES | |
(?, ?, ?, ?, ?, ?, ?) | |
""", | |
( | |
message_id, | |
media_item.get('type', ''), | |
media_item.get('title', ''), | |
media_item.get('description', ''), | |
media_item.get('file_info', ''), | |
media_item.get('url', ''), | |
media_item.get('class_list', '[]') | |
) | |
) | |
# Save poll if present | |
poll = message_data.get('poll') | |
if poll: | |
self.cursor.execute( | |
""" | |
INSERT INTO polls | |
(message_id, question, is_anonymous, details) | |
VALUES | |
(?, ?, ?, ?) | |
""", | |
( | |
message_id, | |
poll.get('question', ''), | |
poll.get('is_anonymous', False), | |
poll.get('details', '') | |
) | |
) | |
poll_id = self.cursor.lastrowid | |
# Save poll options | |
options = poll.get('options', []) | |
for option_text in options: | |
self.cursor.execute( | |
""" | |
INSERT INTO poll_options | |
(poll_id, text) | |
VALUES | |
(?, ?) | |
""", | |
(poll_id, option_text) | |
) | |
# Save sticker if present | |
sticker = message_data.get('sticker') | |
if sticker: | |
self.cursor.execute( | |
""" | |
INSERT INTO stickers | |
(message_id, emoji, file_path) | |
VALUES | |
(?, ?, ?) | |
""", | |
( | |
message_id, | |
sticker.get('emoji', ''), | |
sticker.get('file_path', '') | |
) | |
) | |
# Save reactions if present | |
reactions_list = message_data.get('reactions', []) | |
for reaction_item in reactions_list: | |
self.cursor.execute( | |
""" | |
INSERT INTO reactions | |
(message_id, emoji, count) | |
VALUES | |
(?, ?, ?) | |
""", | |
( | |
message_id, | |
reaction_item.get('emoji', ''), | |
reaction_item.get('count', 0) | |
) | |
) | |
reaction_id = self.cursor.lastrowid | |
# Save reaction users if present | |
users = reaction_item.get('users', []) | |
for user_name in users: | |
user_id = self.get_or_create_user(user_name) | |
self.cursor.execute( | |
""" | |
INSERT INTO reaction_users | |
(reaction_id, user_id) | |
VALUES | |
(?, ?) | |
""", | |
(reaction_id, user_id) | |
) | |
# Save forwarded message if present | |
forwarded = message_data.get('forwarded', None) | |
if forwarded: | |
self.cursor.execute( | |
""" | |
INSERT INTO forwarded_messages | |
(message_id, from_name, timestamp, text) | |
VALUES | |
(?, ?, ?, ?) | |
""", | |
( | |
message_id, | |
forwarded.get('from_name', ''), | |
forwarded.get('timestamp', ''), | |
forwarded.get('text', '') | |
) | |
) | |
forwarded_message_id = self.cursor.lastrowid | |
# Save forwarded links if present | |
forwarded_links = forwarded.get('links', []) | |
for link_item in forwarded_links: | |
self.cursor.execute( | |
""" | |
INSERT INTO links | |
(message_id, url, text) | |
VALUES | |
(?, ?, ?) | |
""", | |
( | |
message_id, | |
link_item.get('url', ''), | |
link_item.get('text', '') | |
) | |
) | |
# Commit the transaction | |
self.conn.commit() | |
except Exception as e: | |
self.conn.rollback() | |
print(f"Error saving message batch: {e}") | |
def process_html_file(self, html_file): | |
"""Process a single HTML file and return message data""" | |
print(f"Parsing file: {html_file}") | |
message_batch = [] | |
try: | |
with open(html_file, 'r', encoding='utf-8') as f: | |
html_content = f.read() | |
# Use lxml parser for better performance | |
soup = BeautifulSoup(html_content, 'lxml') | |
messages = soup.select('.message') | |
for message in messages: | |
message_data = self.parse_message(message) | |
message_batch.append(message_data) | |
return message_batch | |
except Exception as e: | |
print(f"Error processing file {html_file}: {e}") | |
return [] | |
def parse_html_file_single_process(self, html_file): | |
"""Parse a single HTML file and extract all messages (used for single process mode)""" | |
message_batch = self.process_html_file(html_file) | |
self.save_message_batch(message_batch) | |
def process_files_parallel(self, html_files, max_workers=None): | |
"""Process HTML files in parallel""" | |
if max_workers is None: | |
max_workers = min(multiprocessing.cpu_count(), len(html_files)) | |
print(f"Processing {len(html_files)} files using {max_workers} workers") | |
# Use ProcessPoolExecutor for parallel processing | |
with ProcessPoolExecutor(max_workers=max_workers) as executor: | |
# Map each HTML file to a worker process | |
results = list(executor.map(self._process_file_worker, html_files)) | |
# Flatten the results | |
all_messages = [] | |
for message_batch in results: | |
all_messages.extend(message_batch) | |
# Save all messages to the database | |
for i in range(0, len(all_messages), self.batch_size): | |
batch = all_messages[i:i + self.batch_size] | |
self.save_message_batch(batch) | |
print(f"Saved batch {i//self.batch_size + 1}/{(len(all_messages) + self.batch_size - 1)//self.batch_size}") | |
@staticmethod | |
def _process_file_worker(html_file): | |
"""Worker function for parallel processing""" | |
parser = TelegramExportParser("", "") # Dummy initialization | |
return parser.process_html_file(html_file) | |
def process_all_files(self, use_parallel=True): | |
"""Process all HTML files and save data to database""" | |
self.initialize_db() | |
html_files = self.find_html_files() | |
print(f"Found {len(html_files)} HTML files to process") | |
print("Files will be processed in this order:") | |
for i, file in enumerate(html_files): | |
print(f"{i+1}. {os.path.basename(file)}") | |
if use_parallel and len(html_files) > 1: | |
# Process files in parallel | |
self.process_files_parallel(html_files) | |
else: | |
# Process files sequentially | |
for html_file in html_files: | |
self.parse_html_file_single_process(html_file) | |
print("Processing complete!") | |
print(f"Database saved to: {self.db_path}") | |
def close(self): | |
"""Close database connection""" | |
if self.conn: | |
# Final optimizations | |
self.cursor.execute("PRAGMA optimize") | |
self.conn.close() | |
if __name__ == "__main__": | |
import argparse | |
parser = argparse.ArgumentParser(description='Parse Telegram HTML export to SQLite database') | |
parser.add_argument('export_dir', help='Directory containing Telegram HTML export files') | |
parser.add_argument('--db', default='telegram_export.db', help='Output SQLite database path') | |
parser.add_argument('--batch-size', type=int, default=1000, help='Batch size for database operations') | |
parser.add_argument('--no-parallel', action='store_true', help='Disable parallel processing') | |
args = parser.parse_args() | |
parser = TelegramExportParser(args.export_dir, args.db, args.batch_size) | |
try: | |
parser.process_all_files(not args.no_parallel) | |
finally: | |
parser.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment