Skip to content

Instantly share code, notes, and snippets.

@KurimuzonAkuma
Created June 3, 2023 13:07
Show Gist options
  • Save KurimuzonAkuma/683eec4d62e111578a42608d4485fc27 to your computer and use it in GitHub Desktop.
Save KurimuzonAkuma/683eec4d62e111578a42608d4485fc27 to your computer and use it in GitHub Desktop.
MySQL and SQLite storages for aiogram framework
import pickle
from asyncio import Lock
from typing import Any, Dict, Optional
import aiomysql
import aiosqlite
from aiogram import Bot
from aiogram.fsm.state import State
from aiogram.fsm.storage.base import BaseStorage, StateType, StorageKey
class MySQLStorage(BaseStorage):
"""
MySQL storage backend for FSM.\n
If database and table does not exist, it will be created automatically.
"""
def __init__(
self,
host: str,
user: str,
password: str,
database: str,
) -> None:
self.host = host
self.user = user
self.password = password
self.database = database
self.connection = None
self.cursor = None
self._lock = Lock()
async def __execute(self, query: str, values: tuple = None, commit: bool = False):
async with self._lock:
await self.connect()
await self.cursor.execute(query, values)
if commit:
await self.connection.commit()
async def __create_tables(self) -> None:
await self.cursor.execute(
f"CREATE DATABASE IF NOT EXISTS `{self.database}`;\n"
f"USE `{self.database}`;\n"
"CREATE TABLE IF NOT EXISTS `aiogram_fsm_states` ("
"`chat_id` BIGINT NOT NULL,"
"`user_id` BIGINT NOT NULL,"
"`state` TEXT,"
"PRIMARY KEY (`chat_id`)"
");\n"
"CREATE TABLE IF NOT EXISTS `aiogram_fsm_data` ("
"`chat_id` BIGINT NOT NULL,"
"`user_id` BIGINT NOT NULL,"
"`data` BLOB,"
"PRIMARY KEY (`chat_id`)"
");"
)
async def connect(self) -> aiomysql.Connection:
if self.connection is None:
self.connection = await aiomysql.connect(
host=self.host,
user=self.user,
password=self.password,
db=self.database,
)
self.cursor = await self.connection.cursor(aiomysql.DictCursor)
await self.__create_tables()
return self.connection
async def close(self) -> None:
if isinstance(self.connection, aiomysql.Connection):
await self.cursor.close()
self.connection.close()
self.connection = None
self.cursor = None
async def set_state(self, bot: Bot, key: StorageKey, state: StateType = None) -> None:
_state = state.state if isinstance(state, State) else state
if _state is None:
await self.__execute(
"DELETE FROM `aiogram_fsm_states` WHERE `chat_id` = %s AND `user_id` = %s;",
(key.chat_id, key.user_id),
commit=True,
)
return
await self.__execute(
"INSERT INTO `aiogram_fsm_states` (`chat_id`, `user_id`, `state`) VALUES (%s, %s, %s) "
"ON DUPLICATE KEY UPDATE `state` = %s;",
(key.chat_id, key.user_id, _state, _state),
commit=True,
)
async def get_state(self, bot: Bot, key: StorageKey) -> Optional[str]:
await self.__execute(
"SELECT `state` FROM `aiogram_fsm_states` WHERE `chat_id` = %s AND `user_id` = %s;",
(key.chat_id, key.user_id),
)
result = await self.cursor.fetchone()
return result["state"] if result else None
async def set_data(self, bot: Bot, key: StorageKey, data: Dict[str, Any]) -> None:
if not data:
await self.__execute(
"DELETE FROM `aiogram_fsm_data` WHERE `chat_id` = %s AND `user_id` = %s;",
(key.chat_id, key.user_id),
commit=True,
)
return
serialized_data = pickle.dumps(data)
await self.__execute(
"INSERT INTO `aiogram_fsm_data` (`chat_id`, `user_id`, `data`) VALUES (%s, %s, %s) "
"ON DUPLICATE KEY UPDATE `data` = %s;",
(key.chat_id, key.user_id, serialized_data, serialized_data),
commit=True,
)
async def get_data(self, bot: Bot, key: StorageKey) -> Dict[str, Any]:
await self.__execute(
"SELECT `data` FROM `aiogram_fsm_data` WHERE `chat_id` = %s AND `user_id` = %s;",
(key.chat_id, key.user_id),
)
result = await self.cursor.fetchone()
return pickle.loads(result["data"]) if result else {}
async def update_data(self, bot: Bot, key: StorageKey, data: Dict[str, Any]) -> Dict[str, Any]:
current_data = await self.get_data(bot=bot, key=key)
current_data.update(data)
await self.set_data(bot=bot, key=key, data=current_data)
return current_data
class SQLiteStorage(BaseStorage):
"""
SQLite storage backend for FSM.\n
Tables will be created automatically.
"""
def __init__(self, path: str) -> None:
self.path = path
self.connection = None
self.cursor = None
self._lock = Lock()
async def __execute(self, query: str, values: tuple = None, commit: bool = False):
async with self._lock:
await self.connect()
self.cursor.execute(query, values)
if commit:
await self.connection.commit()
async def __create_tables(self) -> None:
await self.cursor.execute(
"CREATE TABLE IF NOT EXISTS `aiogram_fsm_states` ("
"`chat_id` BIGINT NOT NULL,"
"`user_id` BIGINT NOT NULL,"
"`state` TEXT,"
"PRIMARY KEY (`chat_id`, `user_id`)"
");\n"
"CREATE TABLE IF NOT EXISTS `aiogram_fsm_data` ("
"`chat_id` BIGINT NOT NULL,"
"`user_id` BIGINT NOT NULL,"
"`data` BLOB,"
"PRIMARY KEY (`chat_id`, `user_id`)"
");"
)
async def connect(self) -> aiosqlite.Connection:
if self.connection is None:
self.connection = await aiosqlite.connect(self.path)
self.connection.row_factory = aiosqlite.Row
self.cursor = await self.connection.cursor()
await self.__create_tables()
return self.connection
async def close(self) -> None:
if isinstance(self.connection, aiosqlite.Connection):
await self.connection.close()
await self.cursor.close()
self.connection = None
self.cursor = None
async def set_state(self, bot: Bot, key: StorageKey, state: StateType = None) -> None:
_state = state.state if isinstance(state, State) else state
if _state is None:
await self.__execute(
"DELETE FROM `aiogram_fsm_states` WHERE `chat_id` = ? AND `user_id` = ?;",
(key.chat_id, key.user_id),
commit=True,
)
return
await self.__execute(
"INSERT INTO `aiogram_fsm_states` (`chat_id`, `user_id`, `state`) VALUES (?, ?, ?) "
"ON CONFLICT(`chat_id`, `user_id`) DO UPDATE SET `state` = ?;",
(key.chat_id, key.user_id, _state, _state),
commit=True,
)
async def get_state(self, bot: Bot, key: StorageKey) -> Optional[str]:
await self.__execute(
"SELECT `state` FROM `aiogram_fsm_states` WHERE `chat_id` = ? AND `user_id` = ?;",
(key.chat_id, key.user_id),
)
result = await self.cursor.fetchone()
return result["state"] if result else None
async def set_data(self, bot: Bot, key: StorageKey, data: Dict[str, Any]) -> None:
if not data:
await self.__execute(
"DELETE FROM `aiogram_fsm_data` WHERE `chat_id` = ? AND `user_id` = ?;",
(key.chat_id, key.user_id),
commit=True,
)
return
serialized_data = pickle.dumps(data)
await self.__execute(
"INSERT INTO `aiogram_fsm_data` (`chat_id`, `user_id`, `data`) VALUES (?, ?, ?) "
"ON CONFLICT(`chat_id`, `user_id`) DO UPDATE SET `data` = ?;",
(key.chat_id, key.user_id, serialized_data, serialized_data),
commit=True,
)
async def get_data(self, bot: Bot, key: StorageKey) -> Dict[str, Any]:
await self.__execute(
"SELECT `data` FROM `aiogram_fsm_data` WHERE `chat_id` = ? AND `user_id` = ?;",
(key.chat_id, key.user_id),
)
result = await self.cursor.fetchone()
return pickle.loads(result["data"]) if result else {}
async def update_data(self, bot: Bot, key: StorageKey, data: Dict[str, Any]) -> Dict[str, Any]:
current_data = await self.get_data(bot=bot, key=key)
current_data.update(data)
await self.set_data(bot=bot, key=key, data=current_data)
return current_data
@messireL
Copy link

Добрый день! Не совсем ясно: как в теле Midlewares оказывается bot: Bot? Ведь мы не передаём в неё это.
Вызов идёт через storage = MysqlStorage(host, user, password, database).
Если удалить из кода bot: Bot, то всё работает отлично.
Ещё было бы здорово проверять наличие этих таблиц, которые создаются в начале.
Благодарю.

@Goofeat
Copy link

Goofeat commented Jul 30, 2024

Добрый день! Не совсем ясно: как в теле Midlewares оказывается bot: Bot? Ведь мы не передаём в неё это. Вызов идёт через storage = MysqlStorage(host, user, password, database). Если удалить из кода bot: Bot, то всё работает отлично. Ещё было бы здорово проверять наличие этих таблиц, которые создаются в начале. Благодарю.

Этому гисту уже более года. Я изменил этот код под себя, но и с этим у меня проблемы: не записываются даты в бд...

@messireL
Copy link

Добрый день! Не совсем ясно: как в теле Midlewares оказывается bot: Bot? Ведь мы не передаём в неё это. Вызов идёт через storage = MysqlStorage(host, user, password, database). Если удалить из кода bot: Bot, то всё работает отлично. Ещё было бы здорово проверять наличие этих таблиц, которые создаются в начале. Благодарю.

Этому гисту уже более года. Я изменил этот код под себя, но и с этим у меня проблемы: не записываются даты в бд...

На самом деле, всё работает и записывает. Нужно смотреть в чём у Вас ошибка (по логам). Но как мидлварь подключается нормально, запись state идёт нормально. Вот только что проверил.
Возможно, проект нуждается в некоторой доработке, но он в рабочем состоянии.

@Goofeat
Copy link

Goofeat commented Jul 31, 2024

На самом деле, всё работает и записывает.

Можете показать мне ваш код?

Но как мидлварь подключается нормально,

Нужно ли регистрировать сторедж в мидлвари? Как?

@KurimuzonAkuma
Copy link
Author

Если удалить из кода bot: Bot, то всё работает отлично.

Это вроде из второй версии такое пошло, для тройки надо убрать

@messireL
Copy link

messireL commented Jul 31, 2024

Нужно ли регистрировать сторедж в мидлвари? Как?
Вы имеете в виду router.message.middleware() ? Или какую ещё регистрацию?
Регистрировать не нужно. Создаём файл с именем storages.py в папке middleware (если у Вас другая папка или имя файла - исправьте по тексту)
`
---импорт класса мидлвари, обратите внимание на имя файла и папку, если нужно исправьте на свои
from middlewares.storages import MySQLStorage
---другие импорты, в том числе, и для этой мидлвари
...
---подключаем хранилище
try:
storage = MySQLStorage(host="", user="", password="", database="") # в значениях указываем свои данные, соответственно
except:
storage = MemoryStorage() # указываем хранилище в памяти, если не удалось инициализировать MySQL, можно заменить на Redis или что-то ещё по вкусу

bot = Bot(token=config('TOKEN'), default=DefaultBotProperties(parse_mode=ParseMode.HTML))
dp = Dispatcher(storage=storage)
`
Вот и всё.

  1. В тексте кода уберите по тексту bot: Bot
    (больше, по-моему ничего не менял)

@messireL
Copy link

messireL commented Jul 31, 2024

Если удалить из кода bot: Bot, то всё работает отлично.

Это вроде из второй версии такое пошло, для тройки надо убрать

Благодарю за код! Удачно подошёл )
Единственное, не по этому коду, можно спросить? У меня мидлварь наподобие AntiThrottle переделанная из Redis в работу с MySQL. Ловит флуд по @rate_limit
Мне непонятно как зарегистрировать мидлварь в аиограм 3 для дальнейшей работы в системе? Либо в начале каждого файла хендлера прописать router.message.middleware(названиекласса) и router.callbask.middleware(названиекласса) или как-то (как?) зарегить её сразу для всех файлов хендлеров?
Раньше было что-то типа Dispatcher.middleware.setup, но сейчас его вроде нет.
Спасибо!

@Goofeat
Copy link

Goofeat commented Jul 31, 2024

@messireL, Я кстати использую SQLite версию... Стейты у меня записываются на ура, а вот данные (state.set_data, state.update_data) – почему-то нет

UPD: они записываются, но как только хочу получить их через state.get_data, то данные сбрасываются

@messireL
Copy link

@messireL, Я кстати использую SQLite версию... Стейты у меня записываются на ура, а вот данные (state.set_data, state.update_data) – почему-то нет

UPD: они записываются, но как только хочу получить их через state.get_data, то данные сбрасываются

Смотрите по логам что происходит, есть ли реально данные в таблицах (в процессе заполнения), что по программному коду - нет ли state.clear() в ненужном месте и точно ли Вы берёт именно эти данные.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment