Skip to content

Instantly share code, notes, and snippets.

@daviddanielng
Created July 2, 2024 16:14
Show Gist options
  • Save daviddanielng/5d7afe64c1be9192fac59e7df13fe7a1 to your computer and use it in GitHub Desktop.
Save daviddanielng/5d7afe64c1be9192fac59e7df13fe7a1 to your computer and use it in GitHub Desktop.
Bot
import logging
from pathlib import Path
import os, sys
import asyncio
from telegram.request import HTTPXRequest
from telegram import Bot, Message, MessageId
from telegram.error import RetryAfter
from simpletap.utils.service import Service
class FilesBot:
def __init__(self):
self.logger = logging.getLogger()
group_name = os.getenv("FILES_GROUP_NAME")
token = os.getenv("FILES_BOT_TOKEN")
server = os.getenv("FILES_BOT_SERVER")
timeoutstr = os.getenv("TIMEOUT")
self.bot_name = "..."
self.service = Service()
from simpletap.utils.database import Database
self.database = Database()
self.retryIn = 30
self.maxRetries = 15
if (
token is not None
and server is not None
and timeoutstr is not None
and group_name is not None
):
self.group_name = group_name
if timeoutstr is None or timeoutstr == "":
timeout = 5
else:
timeout = int(float(timeoutstr))
http = HTTPXRequest(
read_timeout=timeout,
write_timeout=timeout,
connect_timeout=timeout,
media_write_timeout=timeout,
)
self.bot = Bot(
token=token,
base_url=server,
local_mode=True,
base_file_url=f"{server}/files",
request=http,
)
else:
self.logger.error(
"environment variable FILES_BOT_TOKEN or FILES_BOT_SERVER, TIMEOUT is not set\n please set them in .env file or in the environment variable. exiting .."
)
sys.exit(
"environment variable FILES_BOT_TOKEN or FILES_BOT_SERVER, TIMEOUT are not set\n please set them in .env file or in the environment variable."
)
async def start_bot(self):
await self.bot.initialize()
async def send_message(self, message: str, retries: int = 0):
try:
await asyncio.sleep(self.service.get_bot_queue_wait(self.bot_name))
await self.bot.send_message(self.group_name, message)
except RetryAfter as e:
if retries < self.maxRetries:
retries += 1
self.logger.warning(
f"A retry after error occurred while trying to send message , retrying in {self.retryIn} seconds"
)
await asyncio.sleep(self.retryIn)
await self.send_message(message, retries)
else:
self.logger.exception(
f"A retry after error occurred while trying to send message {e} max retries reached"
)
async def send_document(
self, path: str, filename: str, caption="", retries: int = 0
) -> int:
"""
Sends a document to a group chat. `Warning: this will remove the file after sending it.`
Args:
path (str): The path to the document file.
filename (str): how you want the document to be uploaded to group eg filename.mp3.
caption (str, optional): The caption for the document. Defaults to "".
Returns:
int: The message ID of the sent document, or 0 if the document failed to send.
"""
returnData: int = 0
await asyncio.sleep(self.database.getBotDelay("group", self.group_name))
logger = self.logger
logger.info(f"Uploading file {filename} to group")
try:
if os.path.exists(path):
with open(path, "rb") as file:
result: Message = await self.bot.send_document(
self.group_name,
file,
caption=caption,
filename=filename,
)
if result is not None:
returnData = result.message_id
os.remove(path)
else:
logger.error(
"An error occurred while trying to upload file, could not get message id"
)
returnData = 0
else:
returnData = 0
logger.error(
f"An error occurred while trying to upload file, file {path} does not exist"
)
except RetryAfter as e:
if retries < self.maxRetries:
retries += 1
logger.warning(
f"A retry after error occurred while trying to upload file , retrying in {self.retryIn} seconds"
)
await asyncio.sleep(self.retryIn)
returnData = await self.send_document(path, filename, caption, retries)
logger.info(
f"File {filename} uploaded to group with message id {returnData}"
)
else:
logger.exception(
f"A retry after error occurred while trying to upload file {e} max retries reached"
)
returnData = 0
except Exception as e:
logger.exception(f"An error occurred while trying to upload file {e}")
returnData = 0
return returnData
async def copy_file(
self, file_id: int, to_chat_id: str, caption="", retries: int = 0
) -> int:
"""
Copies a file with the given file_id to the specified chat_id.
Args:
file_id (int): The ID of the file to be copied.
to_chat_id (str): The ID of the chat where the file should be copied to.
caption (str, optional): The caption to be added to the copied file. Defaults to "".
Returns:
int: The message ID of the copied file in the destination chat, or 0 if an error occurred.
"""
returnData = 0
logger = self.logger
logger.info(f"Copying file {file_id} to {to_chat_id}")
await asyncio.sleep(self.database.getBotDelay("group", self.group_name))
try:
newData: MessageId = await self.bot.copy_message(
chat_id=to_chat_id,
from_chat_id=self.group_name,
message_id=file_id,
caption=caption,
disable_notification=True,
)
returnData = newData.message_id
logger.info(
f"File {file_id} copied to {to_chat_id} with message id {returnData}"
)
except RetryAfter as e:
if retries < self.maxRetries:
retries += 1
logger.warning(
f"A retry after error occurred while trying to copy file , retrying in {self.retryIn} seconds"
)
await asyncio.sleep(self.retryIn)
returnData = await self.copy_file(file_id, to_chat_id, caption, retries)
else:
logger.exception(
f"A retry after error occurred while trying to copy file {e} max retries reached"
)
returnData = 0
except Exception as e:
logger.exception(f"An error occurred while trying to copy file {e}")
returnData = 0
return returnData
async def send_photo_to(
self, chat_id: str | int, path: str, caption: str = "", retries: int = 0
) -> int:
"""
Sends a photo to the specified chat ID.
Args:
chat_id (str|int): The name of the chat to send the photo to eg `@chatname` or `-104863737`.
path (str): The path to the photo file.
caption (str, optional): The caption for the photo. Defaults to "".
Returns:
int : The message ID of the sent photo, or 0 if an error occurred.
"""
returnData: int = 0
try:
await asyncio.sleep(self.database.getBotDelay("group", self.group_name))
with open(Path(path), "rb") as img:
re = await self.bot.send_photo(
chat_id=chat_id,
photo=img,
caption=caption,
parse_mode="html",
)
if re.message_id is not None:
returnData = re.message_id
else:
self.logger.warning("An error occurred while trying to upload img")
except RetryAfter as e:
if retries < self.maxRetries:
retries += 1
self.logger.warning(
f"A retry after error occurred while trying to upload img , retrying in {self.retryIn} seconds"
)
await asyncio.sleep(self.retryIn)
returnData = await self.send_photo_to(chat_id, path, caption, retries)
else:
self.logger.exception(
f"A retry after error occurred while trying to upload img {e} max retries reached"
)
returnData = 0
except Exception as e:
self.logger.exception(f"An error occurred while trying to upload img {e}")
returnData = 0
return returnData
async def deleteMessages(
self, chat_id: str | int, message_ids: list[int], retries: int = 0
) -> bool:
"""
Deletes messages from a chat.
Args:
chat_id (str|int): The ID of the chat where the messages are located.
message_ids (list[int]): The IDs of the messages to delete.
Returns:
bool: True if the messages were deleted successfully, False otherwise.
"""
returnData: bool = False
try:
await asyncio.sleep(self.database.getBotDelay("group", self.group_name))
await self.bot.delete_messages(chat_id, message_ids)
returnData = True
except RetryAfter as e:
if retries < self.maxRetries:
retries += 1
self.logger.warning(
f"A retry after error occurred while trying to delete messages , retrying in {self.retryIn} seconds"
)
await asyncio.sleep(self.retryIn)
returnData = await self.deleteMessages(chat_id, message_ids, retries)
else:
self.logger.exception(
f"A retry after error occurred while trying to delete messages {e} max retries reached"
)
returnData = False
except Exception as e:
self.logger.exception(
f"An error occurred while trying to delete messages {e}"
)
returnData = False
return returnData
async def close_bot(self):
"""
Closes the bot.
This method does nothing.
Returns:
bool: Always returns True.
"""
self.logger.info("Bot is closed")
return True
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment