Created
April 26, 2024 05:45
-
-
Save vslinko/f911f652cf2f5e1a7f4148ad3e5decb1 to your computer and use it in GitHub Desktop.
tg_backup
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import telethon | |
import telethon.tl.types | |
import os | |
import json | |
import glob | |
api_id = 0 | |
api_hash = "" | |
limit = 50 | |
force_save = [] | |
force_skip = [] | |
skip_media_for_chats = [] | |
async def main(): | |
async with telethon.TelegramClient("anon", api_id, api_hash) as client: | |
users_met = set() | |
print("Available dialogs:") | |
async for dialog in client.iter_dialogs(): | |
skip = False | |
name = dialog.name | |
if isinstance(dialog.entity, telethon.tl.types.Chat): | |
if dialog.entity.left or dialog.entity.deactivated: | |
skip = True | |
if isinstance(dialog.entity, telethon.tl.types.Channel): | |
if dialog.entity.broadcast or dialog.entity.megagroup: | |
skip = True | |
if isinstance(dialog.entity, telethon.tl.types.User): | |
if dialog.entity.deleted: | |
name = "Deactivated Account" | |
if not name: | |
name = dialog.entity.phone | |
if dialog.id in force_save: | |
skip = False | |
if dialog.id in force_skip: | |
skip = True | |
print(f" {dialog.id}: {name}") | |
if skip: | |
print(" skip") | |
else: | |
print(" backuping...") | |
await backup_dialog(client, dialog, users_met) | |
print() | |
print(f"Users met ({len(users_met)}):") | |
users_dir = "./data/telegram/users" | |
media_dir = f"{users_dir}/media" | |
os.makedirs(media_dir, exist_ok=True) | |
for user_id in users_met: | |
user_info_file = f"{users_dir}/{user_id}.json" | |
user = await client.get_entity(user_id) | |
print(f" {user_id}: {user.first_name} {user.last_name} {user.username} {user.phone}") | |
with open(user_info_file, "w") as f: | |
f.write(user.to_json()) | |
if user.photo: | |
files = glob.glob(f"{media_dir}/{user.photo.photo_id}.*") | |
if len(files) == 0: | |
print(" downloading user photo") | |
path = await client.download_profile_photo(user) | |
ext = os.path.splitext(path)[1] | |
os.rename( | |
path, | |
f"{media_dir}/{user.photo.photo_id}{ext}", | |
) | |
async def backup_dialog(client, dialog, users_met): | |
dialog_id = dialog.id | |
chat_dir = f"./data/telegram/chats/{dialog_id}" | |
media_dir = f"{chat_dir}/media" | |
chat_file = f"{chat_dir}/chat.json" | |
messages_file = f"{chat_dir}/messages.json" | |
os.makedirs(media_dir, exist_ok=True) | |
if dialog.entity.photo and not isinstance( | |
dialog.entity.photo, telethon.tl.types.ChatPhotoEmpty | |
): | |
files = glob.glob(f"{media_dir}/{dialog.entity.photo.photo_id}.*") | |
if len(files) == 0: | |
print(" downloading chat photo") | |
path = await client.download_profile_photo(dialog.entity) | |
ext = os.path.splitext(path)[1] | |
os.rename( | |
path, | |
f"{media_dir}/{dialog.entity.photo.photo_id}{ext}", | |
) | |
with open(chat_file, "w") as f: | |
f.write( | |
json.dumps( | |
{ | |
"id": dialog.id, | |
"name": dialog.name, | |
"entity": dialog.entity.to_json(), | |
} | |
) | |
) | |
last_message_id = 0 | |
with open(messages_file, "r") as f: | |
for line in f: | |
message = json.loads(line) | |
if message["peer_id"] and message["peer_id"]["_"] == "PeerUser": | |
users_met.add(message["peer_id"]["user_id"]) | |
if message["from_id"]: | |
users_met.add(message["from_id"]["user_id"]) | |
last_message_id = message["id"] | |
if dialog.message and dialog.message.id <= last_message_id: | |
return | |
with open(messages_file, "a") as f: | |
async for message in client.iter_messages( | |
dialog_id, reverse=True, min_id=last_message_id | |
): | |
if message.media and dialog.id not in skip_media_for_chats: | |
if isinstance( | |
message.media, | |
( | |
telethon.tl.types.MessageMediaWebPage, | |
telethon.tl.types.MessageMediaContact, | |
telethon.tl.types.MessageMediaPoll, | |
telethon.tl.types.MessageMediaGeo, | |
telethon.tl.types.MessageMediaVenue, | |
), | |
): | |
pass | |
elif isinstance(message.media, telethon.tl.types.MessageMediaPhoto): | |
path = await client.download_media(message.media.photo) | |
ext = os.path.splitext(path)[1] | |
os.rename( | |
path, | |
f"{media_dir}/{message.media.photo.id}{ext}", | |
) | |
elif isinstance(message.media, telethon.tl.types.MessageMediaDocument): | |
path = await client.download_media(message.media.document) | |
ext = os.path.splitext(path)[1] | |
os.rename( | |
path, | |
f"{media_dir}/{message.media.document.id}{ext}", | |
) | |
else: | |
print("UNSUPPORTED MEDIA") | |
print(message.media.stringify()) | |
exit(0) | |
if message.peer_id: | |
users_met.add(message.peer_id.user_id) | |
if message.from_id: | |
users_met.add(message.from_id.user_id) | |
f.write(message.to_json() + "\n") | |
if __name__ == "__main__": | |
import asyncio | |
asyncio.run(main()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import json | |
import glob | |
import gzip | |
from datetime import datetime | |
user_photo_cache = {} | |
def find_user_photo(user_id, users_dir): | |
if user_id in user_photo_cache: | |
return user_photo_cache[user_id] | |
media_dir = f"{users_dir}/media" | |
user_info_file = os.path.join(users_dir, f"{user_id}.json") | |
if not os.path.exists(user_info_file): | |
return None | |
with open(user_info_file) as f: | |
user_data = json.load(f) | |
photo_data = user_data.get("photo", {}) | |
photo_id = ( | |
None if not photo_data else photo_data.get("photo_id", "Unknown photo") | |
) | |
if photo_id: | |
photo_files = glob.glob(f"{media_dir}/{photo_id}.*") | |
if photo_files: | |
url = photo_files[0].replace( | |
"./data/telegram/users", "../../data/telegram/users" | |
) | |
user_photo_cache[user_id] = url | |
return url | |
return None | |
def find_chat_media(chat_id, media_id): | |
media_dir = f"./data/telegram/chats/{chat_id}/media" | |
media_files = glob.glob(f"{media_dir}/{media_id}.*") | |
if media_files: | |
return media_files[0].replace( | |
"./data/telegram/chats", "../../data/telegram/chats" | |
) | |
return None | |
def generate_media_html(chat_id, msg_data): | |
media_html = "" | |
media = msg_data.get("media", {}) | |
media_type = media.get("_", "") | |
if media_type == "MessageMediaPhoto": | |
media_id = media["photo"]["id"] | |
media_file = find_chat_media(chat_id, media_id) | |
if media_file: | |
onclick = f"window.open('{media_file}', '_blank');" | |
media_html = f'<img src="{media_file}" alt="Photo" onclick="{onclick}" class="photo-preview" loading=lazy>' | |
else: | |
media_html = f"<p>Photo not found: {media_id}</p>" | |
elif media_type == "MessageMediaDocument": | |
media_id = media["document"].get("id", "") | |
media_file = find_chat_media(chat_id, media_id) | |
if media_file: | |
# print(media_file) | |
ext = os.path.splitext(media_file)[1].lower() | |
if ext in [".webp", ".png", ".jpg", ".jpeg"]: | |
media_html = f'<img src="{media_file}" alt="Document" class="photo-preview" loading=lazy>' | |
elif ext in [".mp4", ".webm", ".avi"]: | |
media_html = f'<video src="{media_file}" controls style="height: 200px"></video>' | |
elif ext in [".mp3", ".oga"]: | |
media_html = f'<audio src="{media_file}" controls></audio>' | |
elif ext in [".tgs", ".json"]: | |
if ext == ".tgs": | |
real_file = media_file.replace("../../data/telegram/chats", "./data/telegram/chats") | |
with gzip.open(real_file, 'rb') as f: | |
file_content = f.read() | |
with open(real_file + ".json", 'wb') as f: | |
f.write(file_content) | |
media_file += ".json" | |
media_html = f'<lottie-player src="{media_file}" background="transparent" speed="1" style="width: 300px; height: 300px;" loop autoplay></lottie-player>' | |
elif ext in [".pdf", ".docx", ".ics", ".jar", ".txt", ".doc", ".zip", ".octet-stream", ".xlsx", ".csv", ".js", ".gz", ".log"]: | |
media_html = f'<a href="{media_file}">Download file</a>' | |
else: | |
print(f"Unknown document type: {ext}") | |
exit(1) | |
else: | |
media_html = f"<p>Document not found: {media_id}</p>" | |
elif media_type == "MessageMediaWebPage": | |
url = media["webpage"].get("url", "") | |
title = media["webpage"].get("title", "") | |
description = media["webpage"].get("description", "") | |
media_html = f'<p>Web page: <a href="{url}">{title}</a><br>{description}</p>' | |
elif media_type == "MessageMediaContact": | |
media_html = f'<p>Contact: {media.get("phone_number", "")} ({media.get("first_name", "")} {media.get("last_name", "")})</p>' | |
elif media_type == "MessageMediaGeo": | |
geo = media["geo"] | |
long = geo.get("long", "") | |
lat = geo.get("lat", "") | |
yandex_maps_url = f"https://yandex.ru/maps/?ll={long},{lat}&z=17" | |
media_html = f'<p>Location: <a href="{yandex_maps_url}">Yandex Maps</a></p>' | |
elif media_type == "MessageMediaPoll": | |
poll = media["poll"] | |
question = poll.get("question", "") | |
answers = poll.get("answers", []) | |
answers_html = "".join([f'<li>{answer["text"]}</li>' for answer in answers]) | |
media_html = f"<p>Poll: {question}<ul>{answers_html}</ul></p>" | |
elif media_type == "MessageMediaVenue": | |
geo = media["geo"] | |
long = geo.get("long", "") | |
lat = geo.get("lat", "") | |
yandex_maps_url = f"https://yandex.ru/maps/?ll={long},{lat}&z=17" | |
title = media.get("title", "") | |
address = media.get("address", "") | |
media_html = ( | |
f'<p>Venue: <a href="{yandex_maps_url}">{title}</a><br>{address}</p>' | |
) | |
else: | |
print(f"Unknown media type: {media_type}") | |
exit(1) | |
return media_html | |
def extract_message_text(message, msg_data): | |
entities = msg_data.get("entities", []) | |
entities.reverse() # Reverse to not mess up the indices when applying formats | |
prev_end = len(message) | |
for entity in entities: | |
start = entity["offset"] | |
length = entity["length"] | |
end = start + length | |
message = ( | |
message[:end] | |
+ message[end:prev_end] | |
.replace("<", "<") | |
.replace(">", ">") | |
.replace("\n", "<br>") | |
+ message[prev_end:] | |
) | |
prev_end = start | |
if entity["_"] == "MessageEntityBold": | |
message = ( | |
message[:start] | |
+ "<strong>" | |
+ message[start : start + length] | |
+ "</strong>" | |
+ message[start + length :] | |
) | |
elif entity["_"] == "MessageEntityUrl": | |
url = message[start : start + length] | |
href = url | |
if not href.startswith("http"): | |
href = f"http://{href}" | |
message = ( | |
message[:start] | |
+ f'<a href="{href}">{url}</a>' | |
+ message[start + length :] | |
) | |
elif entity["_"] == "MessageEntityMention": | |
mention = message[start : start + length] | |
message = ( | |
message[:start] | |
+ f'<span class="mention">{mention}</span>' | |
+ message[start + length :] | |
) | |
elif entity["_"] == "MessageEntityMentionName": | |
mention = message[start : start + length] | |
message = ( | |
message[:start] | |
+ f'<span class="mention">{mention}</span>' | |
+ message[start + length :] | |
) | |
elif entity["_"] == "MessageEntityUnderline": | |
message = ( | |
message[:start] | |
+ "<u>" | |
+ message[start : start + length] | |
+ "</u>" | |
+ message[start + length :] | |
) | |
elif entity["_"] == "MessageEntityStrike": | |
message = ( | |
message[:start] | |
+ "<strike>" | |
+ message[start : start + length] | |
+ "</strike>" | |
+ message[start + length :] | |
) | |
elif entity["_"] == "MessageEntityHashtag": | |
hashtag = message[start : start + length] | |
message = ( | |
message[:start] | |
+ f'<span class="hashtag">{hashtag}</span>' | |
+ message[start + length :] | |
) | |
elif entity["_"] == "MessageEntityPhone": | |
phone = message[start : start + length] | |
message = ( | |
message[:start] | |
+ f'<span class="phone">{phone}</span>' | |
+ message[start + length :] | |
) | |
elif entity["_"] == "MessageEntityPre": | |
message = ( | |
message[:start] | |
+ "<pre>" | |
+ message[start : start + length] | |
+ "</pre>" | |
+ message[start + length :] | |
) | |
elif entity["_"] == "MessageEntityCustomEmoji": | |
emoji = message[start : start + length] | |
message = ( | |
message[:start] | |
+ f'<span class="emoji">{emoji}</span>' | |
+ message[start + length :] | |
) | |
elif entity["_"] == "MessageEntityItalic": | |
message = ( | |
message[:start] | |
+ "<em>" | |
+ message[start : start + length] | |
+ "</em>" | |
+ message[start + length :] | |
) | |
elif entity["_"] == "MessageEntityCode": | |
code = message[start : start + length] | |
message = ( | |
message[:start] + f"<code>{code}</code>" + message[start + length :] | |
) | |
elif entity["_"] == "MessageEntityBankCard": | |
card = message[start : start + length] | |
message = ( | |
message[:start] | |
+ f'<span class="bank-card">{card}</span>' | |
+ message[start + length :] | |
) | |
elif entity["_"] == "MessageEntityEmail": | |
email = message[start : start + length] | |
message = ( | |
message[:start] | |
+ f'<a href="mailto:{email}">{email}</a>' | |
+ message[start + length :] | |
) | |
elif entity["_"] == "MessageEntityTextUrl": | |
url = entity["url"] | |
message = ( | |
message[:start] | |
+ f'<a href="{url}">{message[start : start + length]}</a>' | |
+ message[start + length :] | |
) | |
elif entity["_"] == "MessageEntityBotCommand": | |
command = message[start : start + length] | |
message = ( | |
message[:start] | |
+ f'<span class="bot-command">{command}</span>' | |
+ message[start + length :] | |
) | |
else: | |
print(f"Unknown entity type: {entity['_']}") | |
exit(1) | |
return message | |
def generate_html_chat_histories(): | |
chats_dir = "./data/telegram/chats" | |
users_dir = "./data/telegram/users" | |
media_dir = f"{users_dir}/media" | |
html_dir = "./html_output" | |
index_data = [] | |
chats = os.listdir(chats_dir) | |
os.makedirs("./html_output/chats", exist_ok=True) | |
for chat in chats: | |
chat_path = os.path.join(chats_dir, chat) | |
chat_file = os.path.join(chat_path, "chat.json") | |
messages_file = os.path.join(chat_path, "messages.json") | |
message_html = [] | |
if not os.path.exists(chat_file): | |
continue | |
with open(chat_file, "r") as f: | |
chat_data = json.load(f) | |
chat_name = chat_data.get("name", "Unknown chat") | |
with open(messages_file, "r") as f: | |
messages = f.read().strip().split("\n") | |
last_message = json.loads(messages[-1]) if messages else None | |
if last_message: | |
last_message_date = datetime.strptime( | |
last_message["date"], "%Y-%m-%dT%H:%M:%S%z" | |
) | |
for msg in messages: | |
msg_data = json.loads(msg) | |
msg_html = generate_message_html(chat, users_dir, media_dir, msg_data) | |
message_html.append(msg_html) | |
html_content = "\n".join(message_html) | |
html_output = f""" | |
<html> | |
<head> | |
<meta charset="UTF-8"> | |
<title>Chat History: {chat_name}</title> | |
<style> | |
body {{ font-family: Arial, sans-serif; margin: 20px; }} | |
.message {{ margin-bottom: 15px; padding: 10px; border: 1px solid #ccc; border-radius: 8px; }} | |
.meta-info {{ color: #666; font-size: 14px; margin-bottom: 5px; }} | |
.message-content {{ font-size: 16px; }} | |
.avatar {{ height: 50px; width: 50px; border-radius: 25px; }} | |
.media {{ margin-top: 10px; }} | |
.photo-preview {{ max-width: 100%; height: 200px; cursor: pointer; }} | |
a {{ color: #06c; text-decoration: none; }} | |
a:hover {{ text-decoration: underline; }} | |
strong {{ font-weight: bold; }} | |
</style> | |
</head> | |
<body>{html_content}</body> | |
<script> | |
// Scroll to the last message | |
document.getElementById("message-{last_message['id']}").scrollIntoView(); | |
</script> | |
<script src="../../lottie-player.js"></script> | |
</html> | |
""" | |
chat_file_name = f"{chat}.html" | |
with open( | |
os.path.join("./html_output/chats", chat_file_name), "w" | |
) as html_file: | |
html_file.write(html_output) | |
if last_message_date: | |
index_data.append((chat_name, last_message_date, chat_file_name)) | |
# Sort chats by the date of the last message | |
index_data.sort(key=lambda x: x[1], reverse=True) # Latest first | |
# Create index.html | |
with open(os.path.join(html_dir, "index.html"), "w") as index_file: | |
index_file.write("<html><head><meta charset=\"utf8\"><title>Chat Index</title></head><body>\n") | |
index_file.write("<h1>Chats Overview</h1>\n<ul>\n") | |
for chat_name, chat_date, chat_file in index_data: | |
index_file.write( | |
f'<li><a href="./chats/{chat_file}">{chat_name} - last message at {chat_date.strftime("%Y-%m-%d %H:%M:%S")}</a></li>\n' | |
) | |
index_file.write("</ul>\n</body></html>") | |
def generate_message_html(chat_id, users_dir, media_dir, msg_data): | |
msg_id = msg_data.get("id", "Unknown ID") | |
msg_type = msg_data.get("_", "") | |
out_message = msg_data.get("out", False) | |
from_id_data = msg_data.get("from_id") | |
from_id = None if not from_id_data else from_id_data.get("user_id", None) | |
date = msg_data.get("date", "No date").replace("T", " at ").replace("+00:00", "") | |
message_body = msg_data.get("message", "") | |
service_action = "" | |
if not from_id: | |
peer_id_data = msg_data.get("peer_id") | |
from_id = None if not peer_id_data else peer_id_data.get("user_id", None) | |
avatar_path = find_user_photo(from_id, users_dir) | |
if msg_type == "MessageService": | |
action_data = msg_data.get("action", {}) | |
action = action_data.get("_", "") | |
if "MessageActionChatCreate" in action: | |
title = action_data.get("title", "Unknown") | |
service_action = f"created the chat with title {title}" | |
elif "MessageActionChatJoinedByLink" in action: | |
service_action = "joined the chat by link" | |
elif "MessageActionChannelMigrateFrom" in action: | |
title = action_data.get("title", "Unknown") | |
chat_id = action_data.get("chat_id") | |
service_action = f'channel migrated from "{title}" (chat id {chat_id})' | |
elif "MessageActionPinMessage" in action: | |
service_action = "pinned a message" | |
elif "MessageActionChatEditPhoto" in action: | |
service_action = "changed the chat photo" | |
elif "MessageActionChatAddUser" in action: | |
users = action_data.get("users", []) | |
users_str = ", ".join([str(u) for u in users]) | |
service_action = f"added users {users_str}" | |
elif "MessageActionChatDeleteUser" in action: | |
user_id = action_data.get("user_id", "Unknown user") | |
service_action = f"deleted user {user_id}" | |
elif "MessageActionChatEditTitle" in action: | |
title = action_data.get("title", "Unknown title") | |
service_action = f"edited the chat title to {title}" | |
elif "MessageActionContactSignUp" in action: | |
service_action = "signed up for Telegram" | |
elif "MessageActionPhoneCall" in action: | |
call_id = action_data.get("call_id", "Unknown call") | |
video = action_data.get("video", False) | |
reason = action_data["reason"]["_"] | |
duration = action_data.get("duration", "Unknown duration") | |
service_action = f"started a phone call (call ID {call_id}, video: {video}, reason: {reason}, duration: {duration})" | |
elif "MessageActionHistoryClear" in action: | |
service_action = "cleared the chat history" | |
elif "MessageActionBotAllowed" in action: | |
app = action_data.get("app", "Unknown app") | |
app_title = app.get("title", "Unknown title") | |
service_action = f"allowed the bot {app_title}" | |
else: | |
print(f"Unknown service action: {action}") | |
exit(1) | |
if msg_type == "Message": | |
if message_body: | |
message_body = extract_message_text(message_body, msg_data) | |
media_html = "" | |
if msg_data.get("media", None): | |
media_html = generate_media_html(chat_id, msg_data) | |
avatar_html = ( | |
f'<img src="{avatar_path}" alt="Avatar" class="avatar" loading=lazy>' if avatar_path else "" | |
) | |
# Adding a style for out messages | |
message_style = ' style="background-color: lightblue;"' if out_message else "" | |
msg_html = f""" | |
<div class="message" id="message-{msg_id}"{message_style}> | |
<div class="meta-info"> | |
{avatar_html} | |
<span class="user">User ID: {from_id}</span> | |
<span class="date">{date}</span> | |
<span class="message-id">Message ID: {msg_id}</span> | |
</div> | |
<div class="message-content"> | |
{service_action}{message_body} | |
</div> | |
<div class="media"> | |
{media_html} | |
</div> | |
</div> | |
""" | |
return msg_html | |
if __name__ == "__main__": | |
generate_html_chat_histories() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment