Skip to content

Instantly share code, notes, and snippets.

@painor
Last active October 14, 2024 08:25
Show Gist options
  • Save painor/7e74de80ae0c819d3e9abcf9989a8dd6 to your computer and use it in GitHub Desktop.
Save painor/7e74de80ae0c819d3e9abcf9989a8dd6 to your computer and use it in GitHub Desktop.
This will increase the download/upload speed when using telethon

Based on parallel_file_transfer.py from mautrix-telegram, with permission to distribute under the MIT license Copyright (C) 2019 Tulir Asokan - https://github.com/tulir/mautrix-telegram

Update :

  • add support for floodwaits
  • added an example on how it works
  • updated to work with the latest telethon version 1.21
  • newest update. it's working fine again thanks to tulir's update
  • abandon all hope. nothing is working
  • it's actually working now
  • :D :( :D
  • even progress_callbacks ? idk maybe ?
  • use a different session for downloading because telegram might close the server.
  • ok broken again. files md5 is wrong and are corrupted
  • nvm found why. asyncio.wait doesn't return in order so the file isn't being written in correct order.
  • used asyncio.gather instead which returns in order. not sure if it's the best solution but it's working
  • using more than 20 workers will cause all of them to stop working (needs more testing with real accounts)
  • broken if it lives in another dc until lonami fixes it

Notes :

  • progress_callbacks are working fine.
  • the number of workers is chosen dynamically
  • increase the number of workers to increase the speed. (the faster your internet speed the faster you should put the workers).
  • if you put the number of workers too hight you'll get floodwaits.
  • the file uploaded will always be a bare document without any attributes. you can add attributes to it using utils.get_input_media() and utils.get_attributes
  • if you want it to be even faster make sure to use https://github.com/painor/pyaesni instead of cryptg

How to use :

  1. Put all of this in a seperate file
  2. import download_file and upload_file from it
  3. you can use download_file like so await download_file(client, msg.document, file, progress_callback=prog)
  4. you can use upload_file like so r = await upload_file(client, file,progress_callback=prog)
  5. file are objects that .read() .write() methods.
  6. Check the example file
import time
from telethon import events, utils
from telethon.sync import TelegramClient
from telethon.tl import types
from FastTelethon import download_file, upload_file
api_id: int =
api_hash: str = ""
token = ""
client = TelegramClient("bot", api_id, api_hash)
client.start(bot_token=token)
file_to_upload = "bunny.mp4"
class Timer:
def __init__(self, time_between=2):
self.start_time = time.time()
self.time_between = time_between
def can_send(self):
if time.time() > (self.start_time + self.time_between):
self.start_time = time.time()
return True
return False
@client.on(events.NewMessage())
async def download_or_upload(event):
type_of = ""
msg = None
timer = Timer()
async def progress_bar(current, total):
if timer.can_send():
await msg.edit("{} {}%".format(type_of, current * 100 / total))
if event.document:
type_of = "download"
msg = await event.reply("downloading started")
with open(event.file.name, "wb") as out:
await download_file(event.client, event.document, out, progress_callback=progress_bar)
await msg.edit("Finished downloading")
else:
type_of = "upload"
msg = await event.reply("uploading started")
with open(file_to_upload, "rb") as out:
res = await upload_file(client, out, progress_callback=progress_bar)
# result is InputFile()
# you can add more data to it
attributes, mime_type = utils.get_attributes(
file_to_upload,
)
media = types.InputMediaUploadedDocument(
file=res,
mime_type=mime_type,
attributes=attributes,
# not needed for most files, thumb=thumb,
force_file=False
)
await msg.edit("Finished uploading")
await event.reply(file=media)
# or just send it as it is
await event.reply(file=res)
client.run_until_disconnected()
# copied from https://github.com/tulir/mautrix-telegram/blob/master/mautrix_telegram/util/parallel_file_transfer.py
# Copyright (C) 2021 Tulir Asokan
import asyncio
import hashlib
import inspect
import logging
import math
import os
from collections import defaultdict
from typing import Optional, List, AsyncGenerator, Union, Awaitable, DefaultDict, Tuple, BinaryIO
from telethon import utils, helpers, TelegramClient
from telethon.crypto import AuthKey
from telethon.network import MTProtoSender
from telethon.tl.alltlobjects import LAYER
from telethon.tl.functions import InvokeWithLayerRequest
from telethon.tl.functions.auth import ExportAuthorizationRequest, ImportAuthorizationRequest
from telethon.tl.functions.upload import (GetFileRequest, SaveFilePartRequest,
SaveBigFilePartRequest)
from telethon.tl.types import (Document, InputFileLocation, InputDocumentFileLocation,
InputPhotoFileLocation, InputPeerPhotoFileLocation, TypeInputFile,
InputFileBig, InputFile)
try:
from mautrix.crypto.attachments import async_encrypt_attachment
except ImportError:
async_encrypt_attachment = None
log: logging.Logger = logging.getLogger("telethon")
TypeLocation = Union[Document, InputDocumentFileLocation, InputPeerPhotoFileLocation,
InputFileLocation, InputPhotoFileLocation]
class DownloadSender:
client: TelegramClient
sender: MTProtoSender
request: GetFileRequest
remaining: int
stride: int
def __init__(self, client: TelegramClient, sender: MTProtoSender, file: TypeLocation, offset: int, limit: int,
stride: int, count: int) -> None:
self.sender = sender
self.client = client
self.request = GetFileRequest(file, offset=offset, limit=limit)
self.stride = stride
self.remaining = count
async def next(self) -> Optional[bytes]:
if not self.remaining:
return None
result = await self.client._call(self.sender, self.request)
self.remaining -= 1
self.request.offset += self.stride
return result.bytes
def disconnect(self) -> Awaitable[None]:
return self.sender.disconnect()
class UploadSender:
client: TelegramClient
sender: MTProtoSender
request: Union[SaveFilePartRequest, SaveBigFilePartRequest]
part_count: int
stride: int
previous: Optional[asyncio.Task]
loop: asyncio.AbstractEventLoop
def __init__(self, client: TelegramClient, sender: MTProtoSender, file_id: int, part_count: int, big: bool,
index: int,
stride: int, loop: asyncio.AbstractEventLoop) -> None:
self.client = client
self.sender = sender
self.part_count = part_count
if big:
self.request = SaveBigFilePartRequest(file_id, index, part_count, b"")
else:
self.request = SaveFilePartRequest(file_id, index, b"")
self.stride = stride
self.previous = None
self.loop = loop
async def next(self, data: bytes) -> None:
if self.previous:
await self.previous
self.previous = self.loop.create_task(self._next(data))
async def _next(self, data: bytes) -> None:
self.request.bytes = data
log.debug(f"Sending file part {self.request.file_part}/{self.part_count}"
f" with {len(data)} bytes")
await self.client._call(self.sender, self.request)
self.request.file_part += self.stride
async def disconnect(self) -> None:
if self.previous:
await self.previous
return await self.sender.disconnect()
class ParallelTransferrer:
client: TelegramClient
loop: asyncio.AbstractEventLoop
dc_id: int
senders: Optional[List[Union[DownloadSender, UploadSender]]]
auth_key: AuthKey
upload_ticker: int
def __init__(self, client: TelegramClient, dc_id: Optional[int] = None) -> None:
self.client = client
self.loop = self.client.loop
self.dc_id = dc_id or self.client.session.dc_id
self.auth_key = (None if dc_id and self.client.session.dc_id != dc_id
else self.client.session.auth_key)
self.senders = None
self.upload_ticker = 0
async def _cleanup(self) -> None:
await asyncio.gather(*[sender.disconnect() for sender in self.senders])
self.senders = None
@staticmethod
def _get_connection_count(file_size: int, max_count: int = 20,
full_size: int = 100 * 1024 * 1024) -> int:
if file_size > full_size:
return max_count
return math.ceil((file_size / full_size) * max_count)
async def _init_download(self, connections: int, file: TypeLocation, part_count: int,
part_size: int) -> None:
minimum, remainder = divmod(part_count, connections)
def get_part_count() -> int:
nonlocal remainder
if remainder > 0:
remainder -= 1
return minimum + 1
return minimum
# The first cross-DC sender will export+import the authorization, so we always create it
# before creating any other senders.
self.senders = [
await self._create_download_sender(file, 0, part_size, connections * part_size,
get_part_count()),
*await asyncio.gather(
*[self._create_download_sender(file, i, part_size, connections * part_size,
get_part_count())
for i in range(1, connections)])
]
async def _create_download_sender(self, file: TypeLocation, index: int, part_size: int,
stride: int,
part_count: int) -> DownloadSender:
return DownloadSender(self.client, await self._create_sender(), file, index * part_size, part_size,
stride, part_count)
async def _init_upload(self, connections: int, file_id: int, part_count: int, big: bool
) -> None:
self.senders = [
await self._create_upload_sender(file_id, part_count, big, 0, connections),
*await asyncio.gather(
*[self._create_upload_sender(file_id, part_count, big, i, connections)
for i in range(1, connections)])
]
async def _create_upload_sender(self, file_id: int, part_count: int, big: bool, index: int,
stride: int) -> UploadSender:
return UploadSender(self.client, await self._create_sender(), file_id, part_count, big, index, stride,
loop=self.loop)
async def _create_sender(self) -> MTProtoSender:
dc = await self.client._get_dc(self.dc_id)
sender = MTProtoSender(self.auth_key, loggers=self.client._log)
await sender.connect(self.client._connection(dc.ip_address, dc.port, dc.id,
loggers=self.client._log,
proxy=self.client._proxy))
if not self.auth_key:
log.debug(f"Exporting auth to DC {self.dc_id}")
auth = await self.client(ExportAuthorizationRequest(self.dc_id))
self.client._init_request.query = ImportAuthorizationRequest(id=auth.id,
bytes=auth.bytes)
req = InvokeWithLayerRequest(LAYER, self.client._init_request)
await sender.send(req)
self.auth_key = sender.auth_key
return sender
async def init_upload(self, file_id: int, file_size: int, part_size_kb: Optional[float] = None,
connection_count: Optional[int] = None) -> Tuple[int, int, bool]:
connection_count = connection_count or self._get_connection_count(file_size)
part_size = (part_size_kb or utils.get_appropriated_part_size(file_size)) * 1024
part_count = (file_size + part_size - 1) // part_size
is_large = file_size > 10 * 1024 * 1024
await self._init_upload(connection_count, file_id, part_count, is_large)
return part_size, part_count, is_large
async def upload(self, part: bytes) -> None:
await self.senders[self.upload_ticker].next(part)
self.upload_ticker = (self.upload_ticker + 1) % len(self.senders)
async def finish_upload(self) -> None:
await self._cleanup()
async def download(self, file: TypeLocation, file_size: int,
part_size_kb: Optional[float] = None,
connection_count: Optional[int] = None) -> AsyncGenerator[bytes, None]:
connection_count = connection_count or self._get_connection_count(file_size)
part_size = (part_size_kb or utils.get_appropriated_part_size(file_size)) * 1024
part_count = math.ceil(file_size / part_size)
log.debug("Starting parallel download: "
f"{connection_count} {part_size} {part_count} {file!s}")
await self._init_download(connection_count, file, part_count, part_size)
part = 0
while part < part_count:
tasks = []
for sender in self.senders:
tasks.append(self.loop.create_task(sender.next()))
for task in tasks:
data = await task
if not data:
break
yield data
part += 1
log.debug(f"Part {part} downloaded")
log.debug("Parallel download finished, cleaning up connections")
await self._cleanup()
parallel_transfer_locks: DefaultDict[int, asyncio.Lock] = defaultdict(lambda: asyncio.Lock())
def stream_file(file_to_stream: BinaryIO, chunk_size=1024):
while True:
data_read = file_to_stream.read(chunk_size)
if not data_read:
break
yield data_read
async def _internal_transfer_to_telegram(client: TelegramClient,
response: BinaryIO,
progress_callback: callable
) -> Tuple[TypeInputFile, int]:
file_id = helpers.generate_random_long()
file_size = os.path.getsize(response.name)
hash_md5 = hashlib.md5()
uploader = ParallelTransferrer(client)
part_size, part_count, is_large = await uploader.init_upload(file_id, file_size)
buffer = bytearray()
for data in stream_file(response):
if progress_callback:
r = progress_callback(response.tell(), file_size)
if inspect.isawaitable(r):
await r
if not is_large:
hash_md5.update(data)
if len(buffer) == 0 and len(data) == part_size:
await uploader.upload(data)
continue
new_len = len(buffer) + len(data)
if new_len >= part_size:
cutoff = part_size - len(buffer)
buffer.extend(data[:cutoff])
await uploader.upload(bytes(buffer))
buffer.clear()
buffer.extend(data[cutoff:])
else:
buffer.extend(data)
if len(buffer) > 0:
await uploader.upload(bytes(buffer))
await uploader.finish_upload()
if is_large:
return InputFileBig(file_id, part_count, "upload"), file_size
else:
return InputFile(file_id, part_count, "upload", hash_md5.hexdigest()), file_size
async def download_file(client: TelegramClient,
location: TypeLocation,
out: BinaryIO,
progress_callback: callable = None
) -> BinaryIO:
size = location.size
dc_id, location = utils.get_input_location(location)
# We lock the transfers because telegram has connection count limits
downloader = ParallelTransferrer(client, dc_id)
downloaded = downloader.download(location, size)
async for x in downloaded:
out.write(x)
if progress_callback:
r = progress_callback(out.tell(), size)
if inspect.isawaitable(r):
await r
return out
async def upload_file(client: TelegramClient,
file: BinaryIO,
progress_callback: callable = None,
) -> TypeInputFile:
res = (await _internal_transfer_to_telegram(client, file, progress_callback))[0]
return res
@Abhi5033
Copy link

Abhi5033 commented Jun 15, 2024

Hello I have been using fasttelethon since 8 days and I'm facing below issue since last 4 days Error: RPCError 420: FLOOD_PREMIUM_WAIT_3 (caused by SaveBigFilePartRequest)

code - 1
from FastTelethonhelper import fast_upload
client = TelegramClient(session_name, api_id, api_hash)
await client.connect()

result = await fast_upload(client, file_path)
await client.send_file(group_username, result)
...

code - 2
from FastTelethonhelper import download_file, upload_file
client = TelegramClient(session_name, api_id, api_hash)
await client.connect()

with open(file_path, "rb") as out:
         result = await upload_file(client, out,name=file_path)

...
(maybe both codes are same) using windows 10, uploaded nearly 9-10GB (each file around 500MB to 1GB) using this method and then it started hurting me Looking at FLOOD_PREMIUM_WAIT_3, I thought I had to wait for 3 seconds, but didn't work, 3 hours wait didn't work It's been 3 days and I'm still receiving this error please help
Day 6 : less than 100MB uploads worked with fast upload Day 7 : 100MB also error now, less than 50MB working with fast upload

Interesting. It seems that Telegram added new FLOOD_PREMIUM_WAIT error, but, as stated in the Swiftgram post (https://t.me/s/swiftgram?before=73), it's just for notification, and should not stop uploading/downloading. However this is an error, so it does?

Try to bump your telethon library to the last commit from master. If you will still receive this "error", maybe it's worth a shot to ask in a Issue tab of Telethon library (https://github.com/LonamiWebs/Telethon/issues). Perhaps that's something that we can catch and write to logs instead of breaking the whole code, if Swiftgram devs are correct.

It doesn't seem to be an error from Telethon side sir, It is from FastTelethonHelper
Cause when I do
await client.send_file(group_username, file_path, caption=caption)
It is uploading very very slow but finally uploading

but when I do await fast_upload(client, file_path)
It is not uploading at all, throwing that floodwaiterror and stopping

@NotStatilko
Copy link

NotStatilko commented Jun 15, 2024

Hello I have been using fasttelethon since 8 days and I'm facing below issue since last 4 days Error: RPCError 420: FLOOD_PREMIUM_WAIT_3 (caused by SaveBigFilePartRequest)

code - 1
from FastTelethonhelper import fast_upload
client = TelegramClient(session_name, api_id, api_hash)
await client.connect()

result = await fast_upload(client, file_path)
await client.send_file(group_username, result)
...

code - 2
from FastTelethonhelper import download_file, upload_file
client = TelegramClient(session_name, api_id, api_hash)
await client.connect()

with open(file_path, "rb") as out:
         result = await upload_file(client, out,name=file_path)

...
(maybe both codes are same) using windows 10, uploaded nearly 9-10GB (each file around 500MB to 1GB) using this method and then it started hurting me Looking at FLOOD_PREMIUM_WAIT_3, I thought I had to wait for 3 seconds, but didn't work, 3 hours wait didn't work It's been 3 days and I'm still receiving this error please help
Day 6 : less than 100MB uploads worked with fast upload Day 7 : 100MB also error now, less than 50MB working with fast upload

Interesting. It seems that Telegram added new FLOOD_PREMIUM_WAIT error, but, as stated in the Swiftgram post (https://t.me/s/swiftgram?before=73), it's just for notification, and should not stop uploading/downloading. However this is an error, so it does?
Try to bump your telethon library to the last commit from master. If you will still receive this "error", maybe it's worth a shot to ask in a Issue tab of Telethon library (https://github.com/LonamiWebs/Telethon/issues). Perhaps that's something that we can catch and write to logs instead of breaking the whole code, if Swiftgram devs are correct.

It doesn't seem to be an error from Telethon side sir, It is from FastTelethonHelper Cause when I do await client.send_file(group_username, file_path, caption=caption) It is uploading very very slow but finally uploading

but when I do await fast_upload(client, file_path) It is not uploading at all, throwing that floodwaiterror and stopping

You're right, but FastTelethon is built around the Telethon library. I believe this error can be presented in Telethon too (if it's not already fixed, which, i think, was not.). As far i understand, this error is not the same as FloodWaitError, as it's just for "Woo, Buy our Premium!!!" prompt. Maybe we can fix it here by catching error. Can you provide the full Traceback?

Also, consider to look at this: https://t.me/TelethonChat/611380. Maybe this will quick fix this problem for now? Can not test it by myself because didn't see this error before.

@konglquan
Copy link

how to use pyaesni ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment