Created
January 8, 2025 20:10
-
-
Save mshafiee/8670a56304fd394599c5b1d14c503028 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from telethon import TelegramClient | |
from telethon.tl.types import PeerChannel | |
import os | |
# Replace these with your own values | |
api_id = 'YOUR_API_ID' # Your API ID from my.telegram.org | |
api_hash = 'YOUR_API_HASH' # Your API Hash from my.telegram.org | |
phone_number = 'YOUR_PHONE_NUMBER' # Your phone number with country code | |
# List of private and public channel links | |
channel_links = [ | |
'https://t.me/c/1234567890/123', # Private channel link | |
'https://t.me/abcdefghklmnop/123', # Public channel link | |
# Add more links as needed | |
] | |
# Base directory to store all channel content | |
base_download_folder = 'telegram_channels' | |
# Create the base download folder if it doesn't exist | |
if not os.path.exists(base_download_folder): | |
os.makedirs(base_download_folder) | |
# Initialize the Telegram client | |
client = TelegramClient('session_name', api_id, api_hash) | |
async def download_channel_content(channel_link): | |
# Determine if the link is private or public | |
if channel_link.startswith('https://t.me/c/'): # Private channel link | |
parts = channel_link.split('/') | |
channel_id = int(parts[-2]) # Extract the channel ID | |
post_id = int(parts[-1]) # Extract the post ID | |
# Resolve the channel ID to its entity | |
try: | |
channel = await client.get_entity(PeerChannel(channel_id)) | |
print(f"Fetching content from private channel: {channel.title}") | |
except Exception as e: | |
print(f"Failed to resolve private channel: {e}") | |
return | |
elif channel_link.startswith('https://t.me/'): # Public channel link | |
parts = channel_link.split('/') | |
channel_username = parts[-2] # Extract the channel username | |
post_id = int(parts[-1]) # Extract the post ID | |
# Resolve the channel username to its entity | |
try: | |
channel = await client.get_entity(channel_username) | |
print(f"Fetching content from public channel: {channel.title}") | |
except Exception as e: | |
print(f"Failed to resolve public channel: {e}") | |
return | |
else: | |
print(f"Invalid link format: {channel_link}") | |
return | |
# Create a directory for the channel | |
channel_folder = os.path.join(base_download_folder, channel.title.replace('/', '_')) | |
if not os.path.exists(channel_folder): | |
os.makedirs(channel_folder) | |
# File to store all text messages for this channel | |
text_file_path = os.path.join(channel_folder, 'all_messages.txt') | |
# Open the text file to store all messages | |
with open(text_file_path, 'w', encoding='utf-8') as text_file: | |
# Iterate through all messages in the channel (from the beginning) | |
async for message in client.iter_messages(channel, reverse=True): | |
if message.text: # Save text messages to the text file | |
text_file.write(f"Message ID: {message.id}\n") | |
text_file.write(f"Date: {message.date}\n") | |
text_file.write(f"Text: {message.text}\n") | |
text_file.write("-" * 40 + "\n") | |
print(f"Saved text message (ID: {message.id}) from {channel.title}") | |
if message.media and hasattr(message.media, 'document'): # Download documents | |
file_name = f"{message.id}_{message.file.name if message.file else 'file'}" | |
file_path = os.path.join(channel_folder, file_name) | |
print(f"Downloading {file_name} from {channel.title}...") | |
await message.download_media(file=file_path) | |
print(f"Saved to {file_path}") | |
print(f"Download completed for channel: {channel.title}") | |
async def main(): | |
# Log in to your account | |
await client.start(phone=phone_number) | |
print("Logged in successfully!") | |
# Download content for each channel | |
for link in channel_links: | |
await download_channel_content(link) | |
# Run the client | |
with client: | |
client.loop.run_until_complete(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment