Created
July 2, 2024 16:14
-
-
Save daviddanielng/5d7afe64c1be9192fac59e7df13fe7a1 to your computer and use it in GitHub Desktop.
Bot
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
from pathlib import Path | |
import os, sys | |
import asyncio | |
from telegram.request import HTTPXRequest | |
from telegram import Bot, Message, MessageId | |
from telegram.error import RetryAfter | |
from simpletap.utils.service import Service | |
class FilesBot: | |
def __init__(self): | |
self.logger = logging.getLogger() | |
group_name = os.getenv("FILES_GROUP_NAME") | |
token = os.getenv("FILES_BOT_TOKEN") | |
server = os.getenv("FILES_BOT_SERVER") | |
timeoutstr = os.getenv("TIMEOUT") | |
self.bot_name = "..." | |
self.service = Service() | |
from simpletap.utils.database import Database | |
self.database = Database() | |
self.retryIn = 30 | |
self.maxRetries = 15 | |
if ( | |
token is not None | |
and server is not None | |
and timeoutstr is not None | |
and group_name is not None | |
): | |
self.group_name = group_name | |
if timeoutstr is None or timeoutstr == "": | |
timeout = 5 | |
else: | |
timeout = int(float(timeoutstr)) | |
http = HTTPXRequest( | |
read_timeout=timeout, | |
write_timeout=timeout, | |
connect_timeout=timeout, | |
media_write_timeout=timeout, | |
) | |
self.bot = Bot( | |
token=token, | |
base_url=server, | |
local_mode=True, | |
base_file_url=f"{server}/files", | |
request=http, | |
) | |
else: | |
self.logger.error( | |
"environment variable FILES_BOT_TOKEN or FILES_BOT_SERVER, TIMEOUT is not set\n please set them in .env file or in the environment variable. exiting .." | |
) | |
sys.exit( | |
"environment variable FILES_BOT_TOKEN or FILES_BOT_SERVER, TIMEOUT are not set\n please set them in .env file or in the environment variable." | |
) | |
async def start_bot(self): | |
await self.bot.initialize() | |
async def send_message(self, message: str, retries: int = 0): | |
try: | |
await asyncio.sleep(self.service.get_bot_queue_wait(self.bot_name)) | |
await self.bot.send_message(self.group_name, message) | |
except RetryAfter as e: | |
if retries < self.maxRetries: | |
retries += 1 | |
self.logger.warning( | |
f"A retry after error occurred while trying to send message , retrying in {self.retryIn} seconds" | |
) | |
await asyncio.sleep(self.retryIn) | |
await self.send_message(message, retries) | |
else: | |
self.logger.exception( | |
f"A retry after error occurred while trying to send message {e} max retries reached" | |
) | |
async def send_document( | |
self, path: str, filename: str, caption="", retries: int = 0 | |
) -> int: | |
""" | |
Sends a document to a group chat. `Warning: this will remove the file after sending it.` | |
Args: | |
path (str): The path to the document file. | |
filename (str): how you want the document to be uploaded to group eg filename.mp3. | |
caption (str, optional): The caption for the document. Defaults to "". | |
Returns: | |
int: The message ID of the sent document, or 0 if the document failed to send. | |
""" | |
returnData: int = 0 | |
await asyncio.sleep(self.database.getBotDelay("group", self.group_name)) | |
logger = self.logger | |
logger.info(f"Uploading file {filename} to group") | |
try: | |
if os.path.exists(path): | |
with open(path, "rb") as file: | |
result: Message = await self.bot.send_document( | |
self.group_name, | |
file, | |
caption=caption, | |
filename=filename, | |
) | |
if result is not None: | |
returnData = result.message_id | |
os.remove(path) | |
else: | |
logger.error( | |
"An error occurred while trying to upload file, could not get message id" | |
) | |
returnData = 0 | |
else: | |
returnData = 0 | |
logger.error( | |
f"An error occurred while trying to upload file, file {path} does not exist" | |
) | |
except RetryAfter as e: | |
if retries < self.maxRetries: | |
retries += 1 | |
logger.warning( | |
f"A retry after error occurred while trying to upload file , retrying in {self.retryIn} seconds" | |
) | |
await asyncio.sleep(self.retryIn) | |
returnData = await self.send_document(path, filename, caption, retries) | |
logger.info( | |
f"File {filename} uploaded to group with message id {returnData}" | |
) | |
else: | |
logger.exception( | |
f"A retry after error occurred while trying to upload file {e} max retries reached" | |
) | |
returnData = 0 | |
except Exception as e: | |
logger.exception(f"An error occurred while trying to upload file {e}") | |
returnData = 0 | |
return returnData | |
async def copy_file( | |
self, file_id: int, to_chat_id: str, caption="", retries: int = 0 | |
) -> int: | |
""" | |
Copies a file with the given file_id to the specified chat_id. | |
Args: | |
file_id (int): The ID of the file to be copied. | |
to_chat_id (str): The ID of the chat where the file should be copied to. | |
caption (str, optional): The caption to be added to the copied file. Defaults to "". | |
Returns: | |
int: The message ID of the copied file in the destination chat, or 0 if an error occurred. | |
""" | |
returnData = 0 | |
logger = self.logger | |
logger.info(f"Copying file {file_id} to {to_chat_id}") | |
await asyncio.sleep(self.database.getBotDelay("group", self.group_name)) | |
try: | |
newData: MessageId = await self.bot.copy_message( | |
chat_id=to_chat_id, | |
from_chat_id=self.group_name, | |
message_id=file_id, | |
caption=caption, | |
disable_notification=True, | |
) | |
returnData = newData.message_id | |
logger.info( | |
f"File {file_id} copied to {to_chat_id} with message id {returnData}" | |
) | |
except RetryAfter as e: | |
if retries < self.maxRetries: | |
retries += 1 | |
logger.warning( | |
f"A retry after error occurred while trying to copy file , retrying in {self.retryIn} seconds" | |
) | |
await asyncio.sleep(self.retryIn) | |
returnData = await self.copy_file(file_id, to_chat_id, caption, retries) | |
else: | |
logger.exception( | |
f"A retry after error occurred while trying to copy file {e} max retries reached" | |
) | |
returnData = 0 | |
except Exception as e: | |
logger.exception(f"An error occurred while trying to copy file {e}") | |
returnData = 0 | |
return returnData | |
async def send_photo_to( | |
self, chat_id: str | int, path: str, caption: str = "", retries: int = 0 | |
) -> int: | |
""" | |
Sends a photo to the specified chat ID. | |
Args: | |
chat_id (str|int): The name of the chat to send the photo to eg `@chatname` or `-104863737`. | |
path (str): The path to the photo file. | |
caption (str, optional): The caption for the photo. Defaults to "". | |
Returns: | |
int : The message ID of the sent photo, or 0 if an error occurred. | |
""" | |
returnData: int = 0 | |
try: | |
await asyncio.sleep(self.database.getBotDelay("group", self.group_name)) | |
with open(Path(path), "rb") as img: | |
re = await self.bot.send_photo( | |
chat_id=chat_id, | |
photo=img, | |
caption=caption, | |
parse_mode="html", | |
) | |
if re.message_id is not None: | |
returnData = re.message_id | |
else: | |
self.logger.warning("An error occurred while trying to upload img") | |
except RetryAfter as e: | |
if retries < self.maxRetries: | |
retries += 1 | |
self.logger.warning( | |
f"A retry after error occurred while trying to upload img , retrying in {self.retryIn} seconds" | |
) | |
await asyncio.sleep(self.retryIn) | |
returnData = await self.send_photo_to(chat_id, path, caption, retries) | |
else: | |
self.logger.exception( | |
f"A retry after error occurred while trying to upload img {e} max retries reached" | |
) | |
returnData = 0 | |
except Exception as e: | |
self.logger.exception(f"An error occurred while trying to upload img {e}") | |
returnData = 0 | |
return returnData | |
async def deleteMessages( | |
self, chat_id: str | int, message_ids: list[int], retries: int = 0 | |
) -> bool: | |
""" | |
Deletes messages from a chat. | |
Args: | |
chat_id (str|int): The ID of the chat where the messages are located. | |
message_ids (list[int]): The IDs of the messages to delete. | |
Returns: | |
bool: True if the messages were deleted successfully, False otherwise. | |
""" | |
returnData: bool = False | |
try: | |
await asyncio.sleep(self.database.getBotDelay("group", self.group_name)) | |
await self.bot.delete_messages(chat_id, message_ids) | |
returnData = True | |
except RetryAfter as e: | |
if retries < self.maxRetries: | |
retries += 1 | |
self.logger.warning( | |
f"A retry after error occurred while trying to delete messages , retrying in {self.retryIn} seconds" | |
) | |
await asyncio.sleep(self.retryIn) | |
returnData = await self.deleteMessages(chat_id, message_ids, retries) | |
else: | |
self.logger.exception( | |
f"A retry after error occurred while trying to delete messages {e} max retries reached" | |
) | |
returnData = False | |
except Exception as e: | |
self.logger.exception( | |
f"An error occurred while trying to delete messages {e}" | |
) | |
returnData = False | |
return returnData | |
async def close_bot(self): | |
""" | |
Closes the bot. | |
This method does nothing. | |
Returns: | |
bool: Always returns True. | |
""" | |
self.logger.info("Bot is closed") | |
return True |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment