Skip to content

Instantly share code, notes, and snippets.

@daviddanielng
Last active May 22, 2024 12:24
Show Gist options
  • Save daviddanielng/7450c8f017f5baf3cb40f8a0c72eecef to your computer and use it in GitHub Desktop.
Save daviddanielng/7450c8f017f5baf3cb40f8a0c72eecef to your computer and use it in GitHub Desktop.
import logging
from pathlib import Path
import os, sys
import asyncio
from telegram.request import HTTPXRequest
from telegram import Bot, Message
from utils.service import Service
from utils.log import setup
class FilesBot:
def __init__(self):
self.logger = setup("filesbot")
group_name = os.getenv("FILES_GROUP_NAME")
token = os.getenv("FILES_BOT_TOKEN")
server = os.getenv("FILES_BOT_SERVER")
timeoutstr = os.getenv("TIMEOUT")
from utils.log import general_logger
general_logger()
self.bot_name = "filesbot"
self.service = Service()
if (
token is not None
and server is not None
and timeoutstr is not None
and group_name is not None
):
self.group_name = group_name
if timeoutstr is None or timeoutstr == "":
timeout = 5
else:
timeout = int(float(timeoutstr))
http = HTTPXRequest(
read_timeout=timeout,
write_timeout=timeout,
connect_timeout=timeout,
media_write_timeout=timeout,
)
self.bot = Bot(
token=token,
base_url=server,
local_mode=True,
base_file_url=f"{server}/files",
request=http,
)
else:
self.logger.error(
"environment variable UPLOADER_BOT_TOKEN or UPLOADER_BOT_SERVER, TIMEOUT is not set\n please set them in .env file or in the environment variable. exiting .."
)
sys.exit(
"environment variable UPLOADER_BOT_TOKEN or UPLOADER_BOT_SERVER, TIMEOUT are not set\n please set them in .env file or in the environment variable."
)
async def start_bot(self):
await self.bot.initialize()
async def send_message(self, message: str):
await asyncio.sleep(self.service.get_bot_queue_wait(self.bot_name))
await self.bot.send_message(self.group_name, message)
async def send_document(self, path, caption="", filename="file"):
data = {"status": "sending file"}
await asyncio.sleep(self.service.get_bot_queue_wait(self.bot_name))
try:
if os.path.exists(path):
file = open(path, "rb")
result: Message = await self.bot.send_document(
self.group_name,
file,
caption=caption,
filename=filename + f".{path.split('.')[-1]}",
)
if result is not None:
data["status"] = "success"
data["data"] = result.message_id
else:
data["status"] = "failed"
data["reason"] = "message Id was not received"
file.close()
os.remove(path)
else:
data["status"] = "failed"
data["reason"] = f"file {path} not found"
return data
except Exception as e:
data["status"] = "failed"
data["reason"] = f"An unexpected error {e} occurred"
return data
async def copy_file(self, file_id: int, to_chat_id: str, caption=""):
"""
Copies a file from one chat to another.
Args:
file_id (int): The ID of the file to be copied.
to_chat_id (str): The ID of the chat where the file should be copied to.
caption (str, optional): The caption to be added to the copied file. Defaults to "".
Returns:
dict: A dictionary containing the status and data of the copy operation.
- status (str): The status of the copy operation ("sending file", "success", or "failed").
- data (int): The message ID of the copied file (if successful).
- reason (str, optional): The reason for failure (if applicable).
"""
data = {"status": "sending file", "data": 0}
await asyncio.sleep(self.service.get_bot_queue_wait(self.bot_name))
try:
result: Message = await self.bot.copy_message(
chat_id=to_chat_id,
from_chat_id=self.group_name,
message_id=file_id,
caption=caption,
disable_notification=True,
)
if result is not None:
data["status"] = "success"
data["data"] = result.message_id
else:
data["status"] = "failed"
data["reason"] = "message Id was not received"
return data
except Exception as e:
data["status"] = "failed"
data["reason"] = f"An unexpected error {e} occurred"
return data
async def send_photo_to(self, chat_id, path, caption=""):
await asyncio.sleep(self.service.get_bot_queue_wait(self.bot_name))
message_id = None
img = open(Path(path), "rb")
re = await self.bot.send_photo(
chat_id=chat_id, # type:ignore
photo=img,
caption=caption, # type:ignore
parse_mode="html",
)
img.close()
if re.message_id is not None:
message_id = re.message_id
else:
self.logger.warning("An error occurred while trying to upload img")
return message_id
async def close_bot(self):
await asyncio.sleep(self.service.get_bot_queue_wait(self.bot_name))
await self.bot.close() # This is where the flood error happens everytime.
self.logger.info("Bot is closed")
return True
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment