Source code for aiogram.utils.chat_action

import asyncio
import logging
import time
from asyncio import Event, Lock
from contextlib import suppress
from types import TracebackType
from typing import Any, Awaitable, Callable, Dict, Optional, Type, Union

from aiogram import BaseMiddleware, Bot
from aiogram.dispatcher.flags import get_flag
from aiogram.types import Message, TelegramObject

logger = logging.getLogger(__name__)
DEFAULT_INTERVAL = 5.0
DEFAULT_INITIAL_SLEEP = 0.0


[docs] class ChatActionSender: """ This utility helps to automatically send chat action until long actions is done to take acknowledge bot users the bot is doing something and not crashed. Provides simply to use context manager. Technically sender start background task with infinity loop which works until action will be finished and sends the `chat action <https://core.telegram.org/bots/api#sendchataction>`_ every 5 seconds. """
[docs] def __init__( self, *, bot: Bot, chat_id: Union[str, int], message_thread_id: Optional[int] = None, action: str = "typing", interval: float = DEFAULT_INTERVAL, initial_sleep: float = DEFAULT_INITIAL_SLEEP, ) -> None: """ :param bot: instance of the bot :param chat_id: target chat id :param message_thread_id: unique identifier for the target message thread; supergroups only :param action: chat action type :param interval: interval between iterations :param initial_sleep: sleep before first sending of the action """ self.chat_id = chat_id self.message_thread_id = message_thread_id self.action = action self.interval = interval self.initial_sleep = initial_sleep self.bot = bot self._lock = Lock() self._close_event = Event() self._closed_event = Event() self._task: Optional[asyncio.Task[Any]] = None
@property def running(self) -> bool: return bool(self._task) async def _wait(self, interval: float) -> None: with suppress(asyncio.TimeoutError): await asyncio.wait_for(self._close_event.wait(), interval) async def _worker(self) -> None: logger.debug( "Started chat action %r sender in chat_id=%s via bot id=%d", self.action, self.chat_id, self.bot.id, ) try: counter = 0 await self._wait(self.initial_sleep) while not self._close_event.is_set(): start = time.monotonic() logger.debug( "Sent chat action %r to chat_id=%s via bot %d (already sent actions %d)", self.action, self.chat_id, self.bot.id, counter, ) await self.bot.send_chat_action( chat_id=self.chat_id, action=self.action, message_thread_id=self.message_thread_id, ) counter += 1 interval = self.interval - (time.monotonic() - start) await self._wait(interval) finally: logger.debug( "Finished chat action %r sender in chat_id=%s via bot id=%d", self.action, self.chat_id, self.bot.id, ) self._closed_event.set() async def _run(self) -> None: async with self._lock: self._close_event.clear() self._closed_event.clear() if self.running: raise RuntimeError("Already running") self._task = asyncio.create_task(self._worker()) async def _stop(self) -> None: async with self._lock: if not self.running: return if not self._close_event.is_set(): # pragma: no branches self._close_event.set() await self._closed_event.wait() self._task = None async def __aenter__(self) -> "ChatActionSender": await self._run() return self async def __aexit__( self, exc_type: Optional[Type[BaseException]], exc_value: Optional[BaseException], traceback: Optional[TracebackType], ) -> Any: await self._stop()
[docs] @classmethod def typing( cls, chat_id: Union[int, str], bot: Bot, message_thread_id: Optional[int] = None, interval: float = DEFAULT_INTERVAL, initial_sleep: float = DEFAULT_INITIAL_SLEEP, ) -> "ChatActionSender": """Create instance of the sender with `typing` action""" return cls( bot=bot, chat_id=chat_id, message_thread_id=message_thread_id, action="typing", interval=interval, initial_sleep=initial_sleep, )
[docs] @classmethod def upload_photo( cls, chat_id: Union[int, str], bot: Bot, message_thread_id: Optional[int] = None, interval: float = DEFAULT_INTERVAL, initial_sleep: float = DEFAULT_INITIAL_SLEEP, ) -> "ChatActionSender": """Create instance of the sender with `upload_photo` action""" return cls( bot=bot, chat_id=chat_id, message_thread_id=message_thread_id, action="upload_photo", interval=interval, initial_sleep=initial_sleep, )
[docs] @classmethod def record_video( cls, chat_id: Union[int, str], bot: Bot, message_thread_id: Optional[int] = None, interval: float = DEFAULT_INTERVAL, initial_sleep: float = DEFAULT_INITIAL_SLEEP, ) -> "ChatActionSender": """Create instance of the sender with `record_video` action""" return cls( bot=bot, chat_id=chat_id, message_thread_id=message_thread_id, action="record_video", interval=interval, initial_sleep=initial_sleep, )
[docs] @classmethod def upload_video( cls, chat_id: Union[int, str], bot: Bot, message_thread_id: Optional[int] = None, interval: float = DEFAULT_INTERVAL, initial_sleep: float = DEFAULT_INITIAL_SLEEP, ) -> "ChatActionSender": """Create instance of the sender with `upload_video` action""" return cls( bot=bot, chat_id=chat_id, message_thread_id=message_thread_id, action="upload_video", interval=interval, initial_sleep=initial_sleep, )
[docs] @classmethod def record_voice( cls, chat_id: Union[int, str], bot: Bot, message_thread_id: Optional[int] = None, interval: float = DEFAULT_INTERVAL, initial_sleep: float = DEFAULT_INITIAL_SLEEP, ) -> "ChatActionSender": """Create instance of the sender with `record_voice` action""" return cls( bot=bot, chat_id=chat_id, message_thread_id=message_thread_id, action="record_voice", interval=interval, initial_sleep=initial_sleep, )
[docs] @classmethod def upload_voice( cls, chat_id: Union[int, str], bot: Bot, message_thread_id: Optional[int] = None, interval: float = DEFAULT_INTERVAL, initial_sleep: float = DEFAULT_INITIAL_SLEEP, ) -> "ChatActionSender": """Create instance of the sender with `upload_voice` action""" return cls( bot=bot, chat_id=chat_id, message_thread_id=message_thread_id, action="upload_voice", interval=interval, initial_sleep=initial_sleep, )
[docs] @classmethod def upload_document( cls, chat_id: Union[int, str], bot: Bot, message_thread_id: Optional[int] = None, interval: float = DEFAULT_INTERVAL, initial_sleep: float = DEFAULT_INITIAL_SLEEP, ) -> "ChatActionSender": """Create instance of the sender with `upload_document` action""" return cls( bot=bot, chat_id=chat_id, message_thread_id=message_thread_id, action="upload_document", interval=interval, initial_sleep=initial_sleep, )
[docs] @classmethod def choose_sticker( cls, chat_id: Union[int, str], bot: Bot, message_thread_id: Optional[int] = None, interval: float = DEFAULT_INTERVAL, initial_sleep: float = DEFAULT_INITIAL_SLEEP, ) -> "ChatActionSender": """Create instance of the sender with `choose_sticker` action""" return cls( bot=bot, chat_id=chat_id, message_thread_id=message_thread_id, action="choose_sticker", interval=interval, initial_sleep=initial_sleep, )
[docs] @classmethod def find_location( cls, chat_id: Union[int, str], bot: Bot, message_thread_id: Optional[int] = None, interval: float = DEFAULT_INTERVAL, initial_sleep: float = DEFAULT_INITIAL_SLEEP, ) -> "ChatActionSender": """Create instance of the sender with `find_location` action""" return cls( bot=bot, chat_id=chat_id, message_thread_id=message_thread_id, action="find_location", interval=interval, initial_sleep=initial_sleep, )
[docs] @classmethod def record_video_note( cls, chat_id: Union[int, str], bot: Bot, message_thread_id: Optional[int] = None, interval: float = DEFAULT_INTERVAL, initial_sleep: float = DEFAULT_INITIAL_SLEEP, ) -> "ChatActionSender": """Create instance of the sender with `record_video_note` action""" return cls( bot=bot, chat_id=chat_id, message_thread_id=message_thread_id, action="record_video_note", interval=interval, initial_sleep=initial_sleep, )
[docs] @classmethod def upload_video_note( cls, chat_id: Union[int, str], bot: Bot, message_thread_id: Optional[int] = None, interval: float = DEFAULT_INTERVAL, initial_sleep: float = DEFAULT_INITIAL_SLEEP, ) -> "ChatActionSender": """Create instance of the sender with `upload_video_note` action""" return cls( bot=bot, chat_id=chat_id, message_thread_id=message_thread_id, action="upload_video_note", interval=interval, initial_sleep=initial_sleep, )
[docs] class ChatActionMiddleware(BaseMiddleware): """ Helps to automatically use chat action sender for all message handlers """ async def __call__( self, handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]], event: TelegramObject, data: Dict[str, Any], ) -> Any: if not isinstance(event, Message): return await handler(event, data) bot = data["bot"] chat_action = get_flag(data, "chat_action") or "typing" kwargs = {} if isinstance(chat_action, dict): if initial_sleep := chat_action.get("initial_sleep"): kwargs["initial_sleep"] = initial_sleep if interval := chat_action.get("interval"): kwargs["interval"] = interval if action := chat_action.get("action"): kwargs["action"] = action elif isinstance(chat_action, bool): kwargs["action"] = "typing" else: kwargs["action"] = chat_action kwargs["message_thread_id"] = ( event.message_thread_id if isinstance(event, Message) and event.is_topic_message else None ) async with ChatActionSender(bot=bot, chat_id=event.chat.id, **kwargs): return await handler(event, data)