Last active
May 22, 2024 12:24
-
-
Save daviddanielng/7450c8f017f5baf3cb40f8a0c72eecef to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
from pathlib import Path | |
import os, sys | |
import asyncio | |
from telegram.request import HTTPXRequest | |
from telegram import Bot, Message | |
from utils.service import Service | |
from utils.log import setup | |
class FilesBot: | |
def __init__(self): | |
self.logger = setup("filesbot") | |
group_name = os.getenv("FILES_GROUP_NAME") | |
token = os.getenv("FILES_BOT_TOKEN") | |
server = os.getenv("FILES_BOT_SERVER") | |
timeoutstr = os.getenv("TIMEOUT") | |
from utils.log import general_logger | |
general_logger() | |
self.bot_name = "filesbot" | |
self.service = Service() | |
if ( | |
token is not None | |
and server is not None | |
and timeoutstr is not None | |
and group_name is not None | |
): | |
self.group_name = group_name | |
if timeoutstr is None or timeoutstr == "": | |
timeout = 5 | |
else: | |
timeout = int(float(timeoutstr)) | |
http = HTTPXRequest( | |
read_timeout=timeout, | |
write_timeout=timeout, | |
connect_timeout=timeout, | |
media_write_timeout=timeout, | |
) | |
self.bot = Bot( | |
token=token, | |
base_url=server, | |
local_mode=True, | |
base_file_url=f"{server}/files", | |
request=http, | |
) | |
else: | |
self.logger.error( | |
"environment variable UPLOADER_BOT_TOKEN or UPLOADER_BOT_SERVER, TIMEOUT is not set\n please set them in .env file or in the environment variable. exiting .." | |
) | |
sys.exit( | |
"environment variable UPLOADER_BOT_TOKEN or UPLOADER_BOT_SERVER, TIMEOUT are not set\n please set them in .env file or in the environment variable." | |
) | |
async def start_bot(self): | |
await self.bot.initialize() | |
async def send_message(self, message: str): | |
await asyncio.sleep(self.service.get_bot_queue_wait(self.bot_name)) | |
await self.bot.send_message(self.group_name, message) | |
async def send_document(self, path, caption="", filename="file"): | |
data = {"status": "sending file"} | |
await asyncio.sleep(self.service.get_bot_queue_wait(self.bot_name)) | |
try: | |
if os.path.exists(path): | |
file = open(path, "rb") | |
result: Message = await self.bot.send_document( | |
self.group_name, | |
file, | |
caption=caption, | |
filename=filename + f".{path.split('.')[-1]}", | |
) | |
if result is not None: | |
data["status"] = "success" | |
data["data"] = result.message_id | |
else: | |
data["status"] = "failed" | |
data["reason"] = "message Id was not received" | |
file.close() | |
os.remove(path) | |
else: | |
data["status"] = "failed" | |
data["reason"] = f"file {path} not found" | |
return data | |
except Exception as e: | |
data["status"] = "failed" | |
data["reason"] = f"An unexpected error {e} occurred" | |
return data | |
async def copy_file(self, file_id: int, to_chat_id: str, caption=""): | |
""" | |
Copies a file from one chat to another. | |
Args: | |
file_id (int): The ID of the file to be copied. | |
to_chat_id (str): The ID of the chat where the file should be copied to. | |
caption (str, optional): The caption to be added to the copied file. Defaults to "". | |
Returns: | |
dict: A dictionary containing the status and data of the copy operation. | |
- status (str): The status of the copy operation ("sending file", "success", or "failed"). | |
- data (int): The message ID of the copied file (if successful). | |
- reason (str, optional): The reason for failure (if applicable). | |
""" | |
data = {"status": "sending file", "data": 0} | |
await asyncio.sleep(self.service.get_bot_queue_wait(self.bot_name)) | |
try: | |
result: Message = await self.bot.copy_message( | |
chat_id=to_chat_id, | |
from_chat_id=self.group_name, | |
message_id=file_id, | |
caption=caption, | |
disable_notification=True, | |
) | |
if result is not None: | |
data["status"] = "success" | |
data["data"] = result.message_id | |
else: | |
data["status"] = "failed" | |
data["reason"] = "message Id was not received" | |
return data | |
except Exception as e: | |
data["status"] = "failed" | |
data["reason"] = f"An unexpected error {e} occurred" | |
return data | |
async def send_photo_to(self, chat_id, path, caption=""): | |
await asyncio.sleep(self.service.get_bot_queue_wait(self.bot_name)) | |
message_id = None | |
img = open(Path(path), "rb") | |
re = await self.bot.send_photo( | |
chat_id=chat_id, # type:ignore | |
photo=img, | |
caption=caption, # type:ignore | |
parse_mode="html", | |
) | |
img.close() | |
if re.message_id is not None: | |
message_id = re.message_id | |
else: | |
self.logger.warning("An error occurred while trying to upload img") | |
return message_id | |
async def close_bot(self): | |
await asyncio.sleep(self.service.get_bot_queue_wait(self.bot_name)) | |
await self.bot.close() # This is where the flood error happens everytime. | |
self.logger.info("Bot is closed") | |
return True |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment