Source code for aiogram.fsm.storage.base

from abc import ABC, abstractmethod
from contextlib import asynccontextmanager
from dataclasses import dataclass
from typing import Any, AsyncGenerator, Dict, Literal, Optional, Union

from aiogram.fsm.state import State

StateType = Optional[Union[str, State]]

DEFAULT_DESTINY = "default"


@dataclass(frozen=True)
class StorageKey:
    bot_id: int
    chat_id: int
    user_id: int
    thread_id: Optional[int] = None
    business_connection_id: Optional[str] = None
    destiny: str = DEFAULT_DESTINY


[docs] class KeyBuilder(ABC): """Base class for key builder."""
[docs] @abstractmethod def build( self, key: StorageKey, part: Optional[Literal["data", "state", "lock"]] = None, ) -> str: """ Build key to be used in storage's db queries :param key: contextual key :param part: part of the record :return: key to be used in storage's db queries """ pass
[docs] class DefaultKeyBuilder(KeyBuilder): """ Simple key builder with default prefix. Generates a colon-joined string with prefix, chat_id, user_id, optional bot_id, business_connection_id, destiny and field. Format: :code:`<prefix>:<bot_id?>:<business_connection_id?>:<chat_id>:<user_id>:<destiny?>:<field?>` """ def __init__( self, *, prefix: str = "fsm", separator: str = ":", with_bot_id: bool = False, with_business_connection_id: bool = False, with_destiny: bool = False, ) -> None: """ :param prefix: prefix for all records :param separator: separator :param with_bot_id: include Bot id in the key :param with_business_connection_id: include business connection id :param with_destiny: include destiny key """ self.prefix = prefix self.separator = separator self.with_bot_id = with_bot_id self.with_business_connection_id = with_business_connection_id self.with_destiny = with_destiny
[docs] def build( self, key: StorageKey, part: Optional[Literal["data", "state", "lock"]] = None, ) -> str: parts = [self.prefix] if self.with_bot_id: parts.append(str(key.bot_id)) if self.with_business_connection_id and key.business_connection_id: parts.append(str(key.business_connection_id)) parts.append(str(key.chat_id)) if key.thread_id: parts.append(str(key.thread_id)) parts.append(str(key.user_id)) if self.with_destiny: parts.append(key.destiny) elif key.destiny != DEFAULT_DESTINY: error_message = ( "Default key builder is not configured to use key destiny other than the default." "\n\nProbably, you should set `with_destiny=True` in for DefaultKeyBuilder." ) raise ValueError(error_message) if part: parts.append(part) return self.separator.join(parts)
[docs] class BaseStorage(ABC): """ Base class for all FSM storages """
[docs] @abstractmethod async def set_state(self, key: StorageKey, state: StateType = None) -> None: """ Set state for specified key :param key: storage key :param state: new state """ pass
[docs] @abstractmethod async def get_state(self, key: StorageKey) -> Optional[str]: """ Get key state :param key: storage key :return: current state """ pass
[docs] @abstractmethod async def set_data(self, key: StorageKey, data: Dict[str, Any]) -> None: """ Write data (replace) :param key: storage key :param data: new data """ pass
[docs] @abstractmethod async def get_data(self, key: StorageKey) -> Dict[str, Any]: """ Get current data for key :param key: storage key :return: current data """ pass
[docs] async def update_data(self, key: StorageKey, data: Dict[str, Any]) -> Dict[str, Any]: """ Update date in the storage for key (like dict.update) :param key: storage key :param data: partial data :return: new data """ current_data = await self.get_data(key=key) current_data.update(data) await self.set_data(key=key, data=current_data) return current_data.copy()
[docs] @abstractmethod async def close(self) -> None: # pragma: no cover """ Close storage (database connection, file or etc.) """ pass
class BaseEventIsolation(ABC): @abstractmethod @asynccontextmanager async def lock(self, key: StorageKey) -> AsyncGenerator[None, None]: """ Isolate events with lock. Will be used as context manager :param key: storage key :return: An async generator """ yield None @abstractmethod async def close(self) -> None: pass