Created
June 30, 2024 13:43
-
-
Save DavideGalilei/950842e8708b997cb326e39165013c80 to your computer and use it in GitHub Desktop.
Send a requestPeer button with pyrogram excerpt
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import traceback | |
from collections import defaultdict | |
from dataclasses import dataclass | |
from enum import Enum, auto | |
from hydrogram import Client, filters, raw | |
from hydrogram.raw.base import RequestPeerType, Update | |
from hydrogram.raw.types import ( | |
KeyboardButtonRequestPeer, | |
MessageService, | |
RequestPeerTypeBroadcast, | |
RequestPeerTypeChat, | |
UpdateNewMessage, | |
) | |
from hydrogram.types import ( | |
Chat, | |
InlineKeyboardButton, | |
InlineKeyboardMarkup, | |
KeyboardButton, | |
Message, | |
ReplyKeyboardMarkup, | |
ReplyKeyboardRemove, | |
User, | |
CallbackQuery, | |
) | |
from hydrogram.utils import get_channel_id | |
from loguru import logger | |
from plugins.shared import ChatsDB, Shared, background | |
logger.info("Loading the repost plugin") | |
class Status(Enum): | |
SELECT_CHAT = auto() | |
@dataclass | |
class State: | |
status: Status = Status.SELECT_CHAT | |
chat_id: int | None = None | |
class DestinationButton(KeyboardButton): | |
def __init__( | |
self, | |
text: str, | |
button_id: int, | |
peer_type: RequestPeerType | RequestPeerTypeChat | RequestPeerTypeBroadcast, | |
): | |
super().__init__(text=text) | |
self.button_id = button_id | |
self.peer_type = peer_type | |
def write(self): | |
return KeyboardButtonRequestPeer( | |
text=self.text, | |
button_id=self.button_id, | |
peer_type=self.peer_type, | |
) | |
state: defaultdict[int, State] = defaultdict(State) | |
@Client.on_message(Shared.admins & filters.private & filters.command("start")) | |
async def start(bot: Client, message: Message): | |
keyboard = [] | |
for chat in ChatsDB.select(): | |
chat: ChatsDB | |
keyboard.append( | |
[ | |
InlineKeyboardButton( | |
f"{chat.name or chat.chat_id}", callback_data=f"chat {chat.chat_id}" | |
) | |
] | |
) | |
await message.reply_text( | |
"To add a new channel, send /add", | |
reply_markup=InlineKeyboardMarkup(keyboard) if keyboard else None, | |
) | |
@Client.on_message(Shared.admins & filters.private & filters.command("add")) | |
async def add(bot: Client, message: Message): | |
state[message.chat.id] = State() | |
await message.reply_text( | |
"Select the channel", | |
reply_markup=ReplyKeyboardMarkup( | |
[ | |
[ | |
DestinationButton( | |
"Add channel", | |
1, | |
RequestPeerTypeBroadcast(), | |
) | |
], | |
[ | |
DestinationButton( | |
"Add chat", | |
2, | |
RequestPeerTypeChat(), | |
) | |
], | |
] | |
), | |
) | |
@Client.on_message(Shared.admins & filters.private & filters.command("cancel")) | |
async def cancel(bot: Client, message: Message): | |
state[message.chat.id] = State() | |
await message.reply_text("Operation cancelled", reply_markup=ReplyKeyboardRemove()) | |
@Client.on_callback_query( | |
Shared.admins & filters.regex(r"^chat (?P<chat_id>[-\d]+)$") | |
) | |
async def chat_info(bot: Client, query: CallbackQuery): | |
chat_id = int(query.matches[0].group("chat_id")) | |
chat = ChatsDB.get_or_none(chat_id=chat_id) | |
if not chat: | |
return await query.edit_message_text("Chat not found") | |
await query.edit_message_text( | |
f"Chat: {chat.name or chat.chat_id}", | |
reply_markup=InlineKeyboardMarkup( | |
[ | |
[ | |
InlineKeyboardButton( | |
"Remove", callback_data=f"delete_chat {chat.chat_id}" | |
) | |
] | |
] | |
), | |
) | |
@Client.on_callback_query( | |
Shared.admins & filters.regex(r"delete_chat (?P<chat_id>[-\d]+)$") | |
) | |
async def delete_chat(bot: Client, query: CallbackQuery): | |
chat_id = int(query.matches[0].group("chat_id")) | |
chat = ChatsDB.get_or_none(chat_id=chat_id) | |
if not chat: | |
return await query.edit_message_text("Chat not found") | |
chat.delete_instance() | |
await query.edit_message_text("Chat removed") | |
@Client.on_raw_update() | |
async def service( | |
bot: Client, update: Update, users: dict[int, User], _chats: dict[int, Chat] | |
): | |
try: | |
if isinstance(update, UpdateNewMessage): | |
if isinstance(update.message, MessageService): | |
message = update.message | |
# print(users, chats, update) | |
user = users[message.peer_id.user_id] | |
# destination = chats[message.action.peer.channel_id] | |
# print(user, destination) | |
if state[user.id].status != Status.SELECT_CHAT: | |
logger.info("Not in the right state") | |
return | |
elif user.id not in Shared.admins: | |
return | |
try: | |
raw_chat_id = message.action.peer.channel_id | |
chat_id = get_channel_id(raw_chat_id) | |
peer = await bot.resolve_peer(chat_id) | |
if isinstance(peer, raw.types.InputPeerChannel): | |
r = await bot.invoke( | |
raw.functions.channels.GetFullChannel(channel=peer) | |
) | |
chat = next(chat for chat in r.chats if chat.id == raw_chat_id) | |
name = chat.title | |
elif isinstance( | |
peer, (raw.types.InputPeerUser, raw.types.InputPeerSelf) | |
): | |
return await bot.send_message( | |
user.id, | |
"You can't add an user", | |
reply_markup=ReplyKeyboardRemove(), | |
) | |
else: | |
r = await bot.invoke( | |
raw.functions.messages.GetFullChat(chat_id=peer.chat_id) | |
) | |
chat = next(chat for chat in r.chats if chat.id == raw_chat_id) | |
name = chat.title | |
# TODO: basic group? | |
except Exception as e: | |
traceback.print_exc() | |
return await bot.send_message( | |
user.id, | |
f"Chat not found. Add the bot and try again. Error:\n{type(e).__name__}: {e}", | |
) | |
logger.info("Chat: {chat_id} {name}", chat_id=chat_id, name=name) | |
if ChatsDB.get_or_none(chat_id=chat_id): | |
return await bot.send_message( | |
user.id, | |
"Chat già aggiunta", | |
reply_markup=ReplyKeyboardRemove(), | |
) | |
logger.info("Adding the chat") | |
state[user.id].status = Status.SELECT_CHAT | |
state[user.id].chat_id = chat_id | |
chat_db = ChatsDB.create( | |
chat_id=chat_id, | |
name=name, | |
) | |
logger.success("Chat added") | |
await bot.send_message( | |
user.id, | |
"Chat added", | |
reply_markup=ReplyKeyboardRemove(), | |
) | |
background( | |
Shared.worker( | |
bot=bot, | |
chat=chat_db, | |
) | |
) | |
except Exception: | |
traceback.print_exc() | |
finally: | |
Message.continue_propagation(update) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment