Source code for aiogram.dispatcher.router

from __future__ import annotations

from typing import Any, Dict, Final, Generator, List, Optional, Set

from ..types import TelegramObject
from .event.bases import REJECTED, UNHANDLED
from .event.event import EventObserver
from .event.telegram import TelegramEventObserver

INTERNAL_UPDATE_TYPES: Final[frozenset[str]] = frozenset({"update", "error"})


[docs] class Router: """ Router can route update, and it nested update types like messages, callback query, polls and all other event types. Event handlers can be registered in observer by two ways: - By observer method - :obj:`router.<event_type>.register(handler, <filters, ...>)` - By decorator - :obj:`@router.<event_type>(<filters, ...>)` """
[docs] def __init__(self, *, name: Optional[str] = None) -> None: """ :param name: Optional router name, can be useful for debugging """ self.name = name or hex(id(self)) self._parent_router: Optional[Router] = None self.sub_routers: List[Router] = [] # Observers self.message = TelegramEventObserver(router=self, event_name="message") self.edited_message = TelegramEventObserver(router=self, event_name="edited_message") self.channel_post = TelegramEventObserver(router=self, event_name="channel_post") self.edited_channel_post = TelegramEventObserver( router=self, event_name="edited_channel_post" ) self.inline_query = TelegramEventObserver(router=self, event_name="inline_query") self.chosen_inline_result = TelegramEventObserver( router=self, event_name="chosen_inline_result" ) self.callback_query = TelegramEventObserver(router=self, event_name="callback_query") self.shipping_query = TelegramEventObserver(router=self, event_name="shipping_query") self.pre_checkout_query = TelegramEventObserver( router=self, event_name="pre_checkout_query" ) self.poll = TelegramEventObserver(router=self, event_name="poll") self.poll_answer = TelegramEventObserver(router=self, event_name="poll_answer") self.my_chat_member = TelegramEventObserver(router=self, event_name="my_chat_member") self.chat_member = TelegramEventObserver(router=self, event_name="chat_member") self.chat_join_request = TelegramEventObserver(router=self, event_name="chat_join_request") self.message_reaction = TelegramEventObserver(router=self, event_name="message_reaction") self.message_reaction_count = TelegramEventObserver( router=self, event_name="message_reaction_count" ) self.chat_boost = TelegramEventObserver(router=self, event_name="chat_boost") self.removed_chat_boost = TelegramEventObserver( router=self, event_name="removed_chat_boost" ) self.deleted_business_messages = TelegramEventObserver( router=self, event_name="deleted_business_messages" ) self.business_connection = TelegramEventObserver( router=self, event_name="business_connection" ) self.edited_business_message = TelegramEventObserver( router=self, event_name="edited_business_message" ) self.business_message = TelegramEventObserver(router=self, event_name="business_message") self.errors = self.error = TelegramEventObserver(router=self, event_name="error") self.startup = EventObserver() self.shutdown = EventObserver() self.observers: Dict[str, TelegramEventObserver] = { "message": self.message, "edited_message": self.edited_message, "channel_post": self.channel_post, "edited_channel_post": self.edited_channel_post, "inline_query": self.inline_query, "chosen_inline_result": self.chosen_inline_result, "callback_query": self.callback_query, "shipping_query": self.shipping_query, "pre_checkout_query": self.pre_checkout_query, "poll": self.poll, "poll_answer": self.poll_answer, "my_chat_member": self.my_chat_member, "chat_member": self.chat_member, "chat_join_request": self.chat_join_request, "message_reaction": self.message_reaction, "message_reaction_count": self.message_reaction_count, "chat_boost": self.chat_boost, "removed_chat_boost": self.removed_chat_boost, "deleted_business_messages": self.deleted_business_messages, "business_connection": self.business_connection, "edited_business_message": self.edited_business_message, "business_message": self.business_message, "error": self.errors, }
def __str__(self) -> str: return f"{type(self).__name__} {self.name!r}" def __repr__(self) -> str: return f"<{self}>"
[docs] def resolve_used_update_types(self, skip_events: Optional[Set[str]] = None) -> List[str]: """ Resolve registered event names Is useful for getting updates only for registered event types. :param skip_events: skip specified event names :return: set of registered names """ handlers_in_use: Set[str] = set() if skip_events is None: skip_events = set() skip_events = {*skip_events, *INTERNAL_UPDATE_TYPES} for router in self.chain_tail: for update_name, observer in router.observers.items(): if observer.handlers and update_name not in skip_events: handlers_in_use.add(update_name) return list(sorted(handlers_in_use)) # NOQA: C413
async def propagate_event(self, update_type: str, event: TelegramObject, **kwargs: Any) -> Any: kwargs.update(event_router=self) observer = self.observers.get(update_type) async def _wrapped(telegram_event: TelegramObject, **data: Any) -> Any: return await self._propagate_event( observer=observer, update_type=update_type, event=telegram_event, **data ) if observer: return await observer.wrap_outer_middleware(_wrapped, event=event, data=kwargs) return await _wrapped(event, **kwargs) async def _propagate_event( self, observer: Optional[TelegramEventObserver], update_type: str, event: TelegramObject, **kwargs: Any, ) -> Any: response = UNHANDLED if observer: # Check globally defined filters before any other handler will be checked. # This check is placed here instead of `trigger` method to add possibility # to pass context to handlers from global filters. result, data = await observer.check_root_filters(event, **kwargs) if not result: return UNHANDLED kwargs.update(data) response = await observer.trigger(event, **kwargs) if response is REJECTED: # pragma: no cover # Possible only if some handler returns REJECTED return UNHANDLED if response is not UNHANDLED: return response for router in self.sub_routers: response = await router.propagate_event(update_type=update_type, event=event, **kwargs) if response is not UNHANDLED: break return response @property def chain_head(self) -> Generator[Router, None, None]: router: Optional[Router] = self while router: yield router router = router.parent_router @property def chain_tail(self) -> Generator[Router, None, None]: yield self for router in self.sub_routers: yield from router.chain_tail @property def parent_router(self) -> Optional[Router]: return self._parent_router @parent_router.setter def parent_router(self, router: Router) -> None: """ Internal property setter of parent router fot this router. Do not use this method in own code. All routers should be included via `include_router` method. Self- and circular- referencing are not allowed here :param router: """ if not isinstance(router, Router): raise ValueError(f"router should be instance of Router not {type(router).__name__!r}") if self._parent_router: raise RuntimeError(f"Router is already attached to {self._parent_router!r}") if self == router: raise RuntimeError("Self-referencing routers is not allowed") parent: Optional[Router] = router while parent is not None: if parent == self: raise RuntimeError("Circular referencing of Router is not allowed") parent = parent.parent_router self._parent_router = router router.sub_routers.append(self)
[docs] def include_routers(self, *routers: Router) -> None: """ Attach multiple routers. :param routers: :return: """ if not routers: raise ValueError("At least one router must be provided") for router in routers: self.include_router(router)
[docs] def include_router(self, router: Router) -> Router: """ Attach another router. :param router: :return: """ if not isinstance(router, Router): raise ValueError( f"router should be instance of Router not {type(router).__class__.__name__}" ) router.parent_router = self return router
async def emit_startup(self, *args: Any, **kwargs: Any) -> None: """ Recursively call startup callbacks :param args: :param kwargs: :return: """ kwargs.update(router=self) await self.startup.trigger(*args, **kwargs) for router in self.sub_routers: await router.emit_startup(*args, **kwargs) async def emit_shutdown(self, *args: Any, **kwargs: Any) -> None: """ Recursively call shutdown callbacks to graceful shutdown :param args: :param kwargs: :return: """ kwargs.update(router=self) await self.shutdown.trigger(*args, **kwargs) for router in self.sub_routers: await router.emit_shutdown(*args, **kwargs)