Skip to content

Instantly share code, notes, and snippets.

@vslinko
Created April 26, 2024 05:45
Show Gist options
  • Save vslinko/f911f652cf2f5e1a7f4148ad3e5decb1 to your computer and use it in GitHub Desktop.
Save vslinko/f911f652cf2f5e1a7f4148ad3e5decb1 to your computer and use it in GitHub Desktop.
tg_backup
import telethon
import telethon.tl.types
import os
import json
import glob
api_id = 0
api_hash = ""
limit = 50
force_save = []
force_skip = []
skip_media_for_chats = []
async def main():
async with telethon.TelegramClient("anon", api_id, api_hash) as client:
users_met = set()
print("Available dialogs:")
async for dialog in client.iter_dialogs():
skip = False
name = dialog.name
if isinstance(dialog.entity, telethon.tl.types.Chat):
if dialog.entity.left or dialog.entity.deactivated:
skip = True
if isinstance(dialog.entity, telethon.tl.types.Channel):
if dialog.entity.broadcast or dialog.entity.megagroup:
skip = True
if isinstance(dialog.entity, telethon.tl.types.User):
if dialog.entity.deleted:
name = "Deactivated Account"
if not name:
name = dialog.entity.phone
if dialog.id in force_save:
skip = False
if dialog.id in force_skip:
skip = True
print(f" {dialog.id}: {name}")
if skip:
print(" skip")
else:
print(" backuping...")
await backup_dialog(client, dialog, users_met)
print()
print(f"Users met ({len(users_met)}):")
users_dir = "./data/telegram/users"
media_dir = f"{users_dir}/media"
os.makedirs(media_dir, exist_ok=True)
for user_id in users_met:
user_info_file = f"{users_dir}/{user_id}.json"
user = await client.get_entity(user_id)
print(f" {user_id}: {user.first_name} {user.last_name} {user.username} {user.phone}")
with open(user_info_file, "w") as f:
f.write(user.to_json())
if user.photo:
files = glob.glob(f"{media_dir}/{user.photo.photo_id}.*")
if len(files) == 0:
print(" downloading user photo")
path = await client.download_profile_photo(user)
ext = os.path.splitext(path)[1]
os.rename(
path,
f"{media_dir}/{user.photo.photo_id}{ext}",
)
async def backup_dialog(client, dialog, users_met):
dialog_id = dialog.id
chat_dir = f"./data/telegram/chats/{dialog_id}"
media_dir = f"{chat_dir}/media"
chat_file = f"{chat_dir}/chat.json"
messages_file = f"{chat_dir}/messages.json"
os.makedirs(media_dir, exist_ok=True)
if dialog.entity.photo and not isinstance(
dialog.entity.photo, telethon.tl.types.ChatPhotoEmpty
):
files = glob.glob(f"{media_dir}/{dialog.entity.photo.photo_id}.*")
if len(files) == 0:
print(" downloading chat photo")
path = await client.download_profile_photo(dialog.entity)
ext = os.path.splitext(path)[1]
os.rename(
path,
f"{media_dir}/{dialog.entity.photo.photo_id}{ext}",
)
with open(chat_file, "w") as f:
f.write(
json.dumps(
{
"id": dialog.id,
"name": dialog.name,
"entity": dialog.entity.to_json(),
}
)
)
last_message_id = 0
with open(messages_file, "r") as f:
for line in f:
message = json.loads(line)
if message["peer_id"] and message["peer_id"]["_"] == "PeerUser":
users_met.add(message["peer_id"]["user_id"])
if message["from_id"]:
users_met.add(message["from_id"]["user_id"])
last_message_id = message["id"]
if dialog.message and dialog.message.id <= last_message_id:
return
with open(messages_file, "a") as f:
async for message in client.iter_messages(
dialog_id, reverse=True, min_id=last_message_id
):
if message.media and dialog.id not in skip_media_for_chats:
if isinstance(
message.media,
(
telethon.tl.types.MessageMediaWebPage,
telethon.tl.types.MessageMediaContact,
telethon.tl.types.MessageMediaPoll,
telethon.tl.types.MessageMediaGeo,
telethon.tl.types.MessageMediaVenue,
),
):
pass
elif isinstance(message.media, telethon.tl.types.MessageMediaPhoto):
path = await client.download_media(message.media.photo)
ext = os.path.splitext(path)[1]
os.rename(
path,
f"{media_dir}/{message.media.photo.id}{ext}",
)
elif isinstance(message.media, telethon.tl.types.MessageMediaDocument):
path = await client.download_media(message.media.document)
ext = os.path.splitext(path)[1]
os.rename(
path,
f"{media_dir}/{message.media.document.id}{ext}",
)
else:
print("UNSUPPORTED MEDIA")
print(message.media.stringify())
exit(0)
if message.peer_id:
users_met.add(message.peer_id.user_id)
if message.from_id:
users_met.add(message.from_id.user_id)
f.write(message.to_json() + "\n")
if __name__ == "__main__":
import asyncio
asyncio.run(main())
import os
import json
import glob
import gzip
from datetime import datetime
user_photo_cache = {}
def find_user_photo(user_id, users_dir):
if user_id in user_photo_cache:
return user_photo_cache[user_id]
media_dir = f"{users_dir}/media"
user_info_file = os.path.join(users_dir, f"{user_id}.json")
if not os.path.exists(user_info_file):
return None
with open(user_info_file) as f:
user_data = json.load(f)
photo_data = user_data.get("photo", {})
photo_id = (
None if not photo_data else photo_data.get("photo_id", "Unknown photo")
)
if photo_id:
photo_files = glob.glob(f"{media_dir}/{photo_id}.*")
if photo_files:
url = photo_files[0].replace(
"./data/telegram/users", "../../data/telegram/users"
)
user_photo_cache[user_id] = url
return url
return None
def find_chat_media(chat_id, media_id):
media_dir = f"./data/telegram/chats/{chat_id}/media"
media_files = glob.glob(f"{media_dir}/{media_id}.*")
if media_files:
return media_files[0].replace(
"./data/telegram/chats", "../../data/telegram/chats"
)
return None
def generate_media_html(chat_id, msg_data):
media_html = ""
media = msg_data.get("media", {})
media_type = media.get("_", "")
if media_type == "MessageMediaPhoto":
media_id = media["photo"]["id"]
media_file = find_chat_media(chat_id, media_id)
if media_file:
onclick = f"window.open('{media_file}', '_blank');"
media_html = f'<img src="{media_file}" alt="Photo" onclick="{onclick}" class="photo-preview" loading=lazy>'
else:
media_html = f"<p>Photo not found: {media_id}</p>"
elif media_type == "MessageMediaDocument":
media_id = media["document"].get("id", "")
media_file = find_chat_media(chat_id, media_id)
if media_file:
# print(media_file)
ext = os.path.splitext(media_file)[1].lower()
if ext in [".webp", ".png", ".jpg", ".jpeg"]:
media_html = f'<img src="{media_file}" alt="Document" class="photo-preview" loading=lazy>'
elif ext in [".mp4", ".webm", ".avi"]:
media_html = f'<video src="{media_file}" controls style="height: 200px"></video>'
elif ext in [".mp3", ".oga"]:
media_html = f'<audio src="{media_file}" controls></audio>'
elif ext in [".tgs", ".json"]:
if ext == ".tgs":
real_file = media_file.replace("../../data/telegram/chats", "./data/telegram/chats")
with gzip.open(real_file, 'rb') as f:
file_content = f.read()
with open(real_file + ".json", 'wb') as f:
f.write(file_content)
media_file += ".json"
media_html = f'<lottie-player src="{media_file}" background="transparent" speed="1" style="width: 300px; height: 300px;" loop autoplay></lottie-player>'
elif ext in [".pdf", ".docx", ".ics", ".jar", ".txt", ".doc", ".zip", ".octet-stream", ".xlsx", ".csv", ".js", ".gz", ".log"]:
media_html = f'<a href="{media_file}">Download file</a>'
else:
print(f"Unknown document type: {ext}")
exit(1)
else:
media_html = f"<p>Document not found: {media_id}</p>"
elif media_type == "MessageMediaWebPage":
url = media["webpage"].get("url", "")
title = media["webpage"].get("title", "")
description = media["webpage"].get("description", "")
media_html = f'<p>Web page: <a href="{url}">{title}</a><br>{description}</p>'
elif media_type == "MessageMediaContact":
media_html = f'<p>Contact: {media.get("phone_number", "")} ({media.get("first_name", "")} {media.get("last_name", "")})</p>'
elif media_type == "MessageMediaGeo":
geo = media["geo"]
long = geo.get("long", "")
lat = geo.get("lat", "")
yandex_maps_url = f"https://yandex.ru/maps/?ll={long},{lat}&z=17"
media_html = f'<p>Location: <a href="{yandex_maps_url}">Yandex Maps</a></p>'
elif media_type == "MessageMediaPoll":
poll = media["poll"]
question = poll.get("question", "")
answers = poll.get("answers", [])
answers_html = "".join([f'<li>{answer["text"]}</li>' for answer in answers])
media_html = f"<p>Poll: {question}<ul>{answers_html}</ul></p>"
elif media_type == "MessageMediaVenue":
geo = media["geo"]
long = geo.get("long", "")
lat = geo.get("lat", "")
yandex_maps_url = f"https://yandex.ru/maps/?ll={long},{lat}&z=17"
title = media.get("title", "")
address = media.get("address", "")
media_html = (
f'<p>Venue: <a href="{yandex_maps_url}">{title}</a><br>{address}</p>'
)
else:
print(f"Unknown media type: {media_type}")
exit(1)
return media_html
def extract_message_text(message, msg_data):
entities = msg_data.get("entities", [])
entities.reverse() # Reverse to not mess up the indices when applying formats
prev_end = len(message)
for entity in entities:
start = entity["offset"]
length = entity["length"]
end = start + length
message = (
message[:end]
+ message[end:prev_end]
.replace("<", "&lt;")
.replace(">", "&gt;")
.replace("\n", "<br>")
+ message[prev_end:]
)
prev_end = start
if entity["_"] == "MessageEntityBold":
message = (
message[:start]
+ "<strong>"
+ message[start : start + length]
+ "</strong>"
+ message[start + length :]
)
elif entity["_"] == "MessageEntityUrl":
url = message[start : start + length]
href = url
if not href.startswith("http"):
href = f"http://{href}"
message = (
message[:start]
+ f'<a href="{href}">{url}</a>'
+ message[start + length :]
)
elif entity["_"] == "MessageEntityMention":
mention = message[start : start + length]
message = (
message[:start]
+ f'<span class="mention">{mention}</span>'
+ message[start + length :]
)
elif entity["_"] == "MessageEntityMentionName":
mention = message[start : start + length]
message = (
message[:start]
+ f'<span class="mention">{mention}</span>'
+ message[start + length :]
)
elif entity["_"] == "MessageEntityUnderline":
message = (
message[:start]
+ "<u>"
+ message[start : start + length]
+ "</u>"
+ message[start + length :]
)
elif entity["_"] == "MessageEntityStrike":
message = (
message[:start]
+ "<strike>"
+ message[start : start + length]
+ "</strike>"
+ message[start + length :]
)
elif entity["_"] == "MessageEntityHashtag":
hashtag = message[start : start + length]
message = (
message[:start]
+ f'<span class="hashtag">{hashtag}</span>'
+ message[start + length :]
)
elif entity["_"] == "MessageEntityPhone":
phone = message[start : start + length]
message = (
message[:start]
+ f'<span class="phone">{phone}</span>'
+ message[start + length :]
)
elif entity["_"] == "MessageEntityPre":
message = (
message[:start]
+ "<pre>"
+ message[start : start + length]
+ "</pre>"
+ message[start + length :]
)
elif entity["_"] == "MessageEntityCustomEmoji":
emoji = message[start : start + length]
message = (
message[:start]
+ f'<span class="emoji">{emoji}</span>'
+ message[start + length :]
)
elif entity["_"] == "MessageEntityItalic":
message = (
message[:start]
+ "<em>"
+ message[start : start + length]
+ "</em>"
+ message[start + length :]
)
elif entity["_"] == "MessageEntityCode":
code = message[start : start + length]
message = (
message[:start] + f"<code>{code}</code>" + message[start + length :]
)
elif entity["_"] == "MessageEntityBankCard":
card = message[start : start + length]
message = (
message[:start]
+ f'<span class="bank-card">{card}</span>'
+ message[start + length :]
)
elif entity["_"] == "MessageEntityEmail":
email = message[start : start + length]
message = (
message[:start]
+ f'<a href="mailto:{email}">{email}</a>'
+ message[start + length :]
)
elif entity["_"] == "MessageEntityTextUrl":
url = entity["url"]
message = (
message[:start]
+ f'<a href="{url}">{message[start : start + length]}</a>'
+ message[start + length :]
)
elif entity["_"] == "MessageEntityBotCommand":
command = message[start : start + length]
message = (
message[:start]
+ f'<span class="bot-command">{command}</span>'
+ message[start + length :]
)
else:
print(f"Unknown entity type: {entity['_']}")
exit(1)
return message
def generate_html_chat_histories():
chats_dir = "./data/telegram/chats"
users_dir = "./data/telegram/users"
media_dir = f"{users_dir}/media"
html_dir = "./html_output"
index_data = []
chats = os.listdir(chats_dir)
os.makedirs("./html_output/chats", exist_ok=True)
for chat in chats:
chat_path = os.path.join(chats_dir, chat)
chat_file = os.path.join(chat_path, "chat.json")
messages_file = os.path.join(chat_path, "messages.json")
message_html = []
if not os.path.exists(chat_file):
continue
with open(chat_file, "r") as f:
chat_data = json.load(f)
chat_name = chat_data.get("name", "Unknown chat")
with open(messages_file, "r") as f:
messages = f.read().strip().split("\n")
last_message = json.loads(messages[-1]) if messages else None
if last_message:
last_message_date = datetime.strptime(
last_message["date"], "%Y-%m-%dT%H:%M:%S%z"
)
for msg in messages:
msg_data = json.loads(msg)
msg_html = generate_message_html(chat, users_dir, media_dir, msg_data)
message_html.append(msg_html)
html_content = "\n".join(message_html)
html_output = f"""
<html>
<head>
<meta charset="UTF-8">
<title>Chat History: {chat_name}</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 20px; }}
.message {{ margin-bottom: 15px; padding: 10px; border: 1px solid #ccc; border-radius: 8px; }}
.meta-info {{ color: #666; font-size: 14px; margin-bottom: 5px; }}
.message-content {{ font-size: 16px; }}
.avatar {{ height: 50px; width: 50px; border-radius: 25px; }}
.media {{ margin-top: 10px; }}
.photo-preview {{ max-width: 100%; height: 200px; cursor: pointer; }}
a {{ color: #06c; text-decoration: none; }}
a:hover {{ text-decoration: underline; }}
strong {{ font-weight: bold; }}
</style>
</head>
<body>{html_content}</body>
<script>
// Scroll to the last message
document.getElementById("message-{last_message['id']}").scrollIntoView();
</script>
<script src="../../lottie-player.js"></script>
</html>
"""
chat_file_name = f"{chat}.html"
with open(
os.path.join("./html_output/chats", chat_file_name), "w"
) as html_file:
html_file.write(html_output)
if last_message_date:
index_data.append((chat_name, last_message_date, chat_file_name))
# Sort chats by the date of the last message
index_data.sort(key=lambda x: x[1], reverse=True) # Latest first
# Create index.html
with open(os.path.join(html_dir, "index.html"), "w") as index_file:
index_file.write("<html><head><meta charset=\"utf8\"><title>Chat Index</title></head><body>\n")
index_file.write("<h1>Chats Overview</h1>\n<ul>\n")
for chat_name, chat_date, chat_file in index_data:
index_file.write(
f'<li><a href="./chats/{chat_file}">{chat_name} - last message at {chat_date.strftime("%Y-%m-%d %H:%M:%S")}</a></li>\n'
)
index_file.write("</ul>\n</body></html>")
def generate_message_html(chat_id, users_dir, media_dir, msg_data):
msg_id = msg_data.get("id", "Unknown ID")
msg_type = msg_data.get("_", "")
out_message = msg_data.get("out", False)
from_id_data = msg_data.get("from_id")
from_id = None if not from_id_data else from_id_data.get("user_id", None)
date = msg_data.get("date", "No date").replace("T", " at ").replace("+00:00", "")
message_body = msg_data.get("message", "")
service_action = ""
if not from_id:
peer_id_data = msg_data.get("peer_id")
from_id = None if not peer_id_data else peer_id_data.get("user_id", None)
avatar_path = find_user_photo(from_id, users_dir)
if msg_type == "MessageService":
action_data = msg_data.get("action", {})
action = action_data.get("_", "")
if "MessageActionChatCreate" in action:
title = action_data.get("title", "Unknown")
service_action = f"created the chat with title {title}"
elif "MessageActionChatJoinedByLink" in action:
service_action = "joined the chat by link"
elif "MessageActionChannelMigrateFrom" in action:
title = action_data.get("title", "Unknown")
chat_id = action_data.get("chat_id")
service_action = f'channel migrated from "{title}" (chat id {chat_id})'
elif "MessageActionPinMessage" in action:
service_action = "pinned a message"
elif "MessageActionChatEditPhoto" in action:
service_action = "changed the chat photo"
elif "MessageActionChatAddUser" in action:
users = action_data.get("users", [])
users_str = ", ".join([str(u) for u in users])
service_action = f"added users {users_str}"
elif "MessageActionChatDeleteUser" in action:
user_id = action_data.get("user_id", "Unknown user")
service_action = f"deleted user {user_id}"
elif "MessageActionChatEditTitle" in action:
title = action_data.get("title", "Unknown title")
service_action = f"edited the chat title to {title}"
elif "MessageActionContactSignUp" in action:
service_action = "signed up for Telegram"
elif "MessageActionPhoneCall" in action:
call_id = action_data.get("call_id", "Unknown call")
video = action_data.get("video", False)
reason = action_data["reason"]["_"]
duration = action_data.get("duration", "Unknown duration")
service_action = f"started a phone call (call ID {call_id}, video: {video}, reason: {reason}, duration: {duration})"
elif "MessageActionHistoryClear" in action:
service_action = "cleared the chat history"
elif "MessageActionBotAllowed" in action:
app = action_data.get("app", "Unknown app")
app_title = app.get("title", "Unknown title")
service_action = f"allowed the bot {app_title}"
else:
print(f"Unknown service action: {action}")
exit(1)
if msg_type == "Message":
if message_body:
message_body = extract_message_text(message_body, msg_data)
media_html = ""
if msg_data.get("media", None):
media_html = generate_media_html(chat_id, msg_data)
avatar_html = (
f'<img src="{avatar_path}" alt="Avatar" class="avatar" loading=lazy>' if avatar_path else ""
)
# Adding a style for out messages
message_style = ' style="background-color: lightblue;"' if out_message else ""
msg_html = f"""
<div class="message" id="message-{msg_id}"{message_style}>
<div class="meta-info">
{avatar_html}
<span class="user">User ID: {from_id}</span>
<span class="date">{date}</span>
<span class="message-id">Message ID: {msg_id}</span>
</div>
<div class="message-content">
{service_action}{message_body}
</div>
<div class="media">
{media_html}
</div>
</div>
"""
return msg_html
if __name__ == "__main__":
generate_html_chat_histories()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment