Created
January 10, 2025 23:20
-
-
Save Tishka17/7ef001551d43f2d9687c3ab7b6dca27a to your computer and use it in GitHub Desktop.
Stateless dialog rendering
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import logging | |
import os | |
from dataclasses import dataclass | |
from typing import Any | |
from aiogram import Bot, Dispatcher, Router, F | |
from aiogram.filters import CommandStart | |
from aiogram.fsm.state import State | |
from aiogram.types import Message, CallbackQuery | |
from aiogram_dialog import DialogManager, Window, ChatEvent | |
from aiogram_dialog.api.entities import EVENT_CONTEXT_KEY, Context | |
from aiogram_dialog.manager.message_manager import MessageManager | |
from aiogram_dialog.utils import CB_SEP, remove_intent_id | |
from aiogram_dialog.widgets.kbd import Select | |
from aiogram_dialog.widgets.text import Format | |
from aiogram_dialog.context.intent_middleware import ( | |
event_context_from_message, | |
event_context_from_callback, | |
) | |
class FakeDialog: | |
async def load_data(self, manager: DialogManager) -> dict: | |
return await manager.load_data() | |
@dataclass | |
class FakeContext: | |
id: str | |
class FakeManager: | |
def __init__(self, event: ChatEvent, data: dict, state: State): | |
self.event = event | |
self.middleware_data = data | |
self.state = state | |
def current_context(self) -> FakeContext: | |
print("context id", repr(self.state.state)) | |
return FakeContext(self.state.state) | |
def is_preview(self): | |
return False | |
async def load_data(self) -> dict: | |
return { | |
"dialog_data": {}, | |
"start_data": None, | |
"middleware_data": self.middleware_data, | |
"event": self.event, | |
} | |
def make_manager(event: ChatEvent, data: dict) -> FakeManager: | |
if isinstance(event, Message): | |
event_context = event_context_from_message(event) | |
elif isinstance(event, CallbackQuery): | |
event_context = event_context_from_callback(event) | |
else: | |
raise TypeError("unknown event") | |
data = data.copy() | |
manager = FakeManager(event, data, window.state) | |
data["dialog_manager"] = manager | |
data[EVENT_CONTEXT_KEY] = event_context | |
return manager | |
async def show(event: ChatEvent, window: Window, data: dict) -> Message: | |
new_message = await window.render(FakeDialog(), make_manager(event, data)) | |
message_manager = MessageManager() | |
bot = data["bot"] | |
return await message_manager.send_message(bot, new_message) | |
def register_window_handler(router: Router, window: Window): | |
prefix = window.state.state + CB_SEP | |
async def cb_handler(callback: ChatEvent, **kwargs: dict): | |
intent_id, callback_data = remove_intent_id(callback.data) | |
cleaned_callback = callback.model_copy(update={"data": callback_data}) | |
return await window.process_callback(cleaned_callback, FakeDialog(), make_manager(callback, kwargs)) | |
print("prefix", repr(prefix)) | |
router.callback_query(F.data.startswith(prefix))(cb_handler) | |
router.message(F.text.startswith(prefix))(window.process_message) | |
# ===================== | |
async def on_age_changed( | |
callback: ChatEvent, | |
select: Any, | |
manager: DialogManager, | |
item_id: str, | |
): | |
print("age changed", item_id) | |
await callback.answer(f"You have chosen: {item_id}") | |
async def get_data(dialog_manager: FakeManager, **kwargs): | |
return { | |
"name": dialog_manager.event.from_user.full_name, | |
} | |
window = Window( | |
Format("{name}! How old are you?"), | |
Select( | |
Format("{item}"), | |
items=["0-12", "12-18", "18-25", "25-40", "40+"], | |
item_id_getter=lambda x: x, | |
id="w_age", | |
on_click=on_age_changed, | |
), | |
state=State("w1"), | |
getter=get_data, | |
preview_data={"name": "Tishka17"}, | |
) | |
async def start(message: Message, **kwargs): | |
await show(message, window, kwargs) | |
API_TOKEN = os.getenv("BOT_TOKEN") | |
async def main(): | |
# real main | |
logging.basicConfig(level=logging.DEBUG) | |
bot = Bot(token=API_TOKEN) | |
dp = Dispatcher() | |
dp.message.register(start, CommandStart()) | |
register_window_handler(dp, window) | |
await dp.start_polling(bot) | |
if __name__ == "__main__": | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment