from __future__ import annotations
import abc
import datetime
import json
import secrets
from enum import Enum
from http import HTTPStatus
from types import TracebackType
from typing import (
TYPE_CHECKING,
Any,
AsyncGenerator,
Callable,
Dict,
Final,
Optional,
Type,
cast,
)
from pydantic import ValidationError
from aiogram.exceptions import (
ClientDecodeError,
RestartingTelegram,
TelegramAPIError,
TelegramBadRequest,
TelegramConflictError,
TelegramEntityTooLarge,
TelegramForbiddenError,
TelegramMigrateToChat,
TelegramNotFound,
TelegramRetryAfter,
TelegramServerError,
TelegramUnauthorizedError,
)
from ...methods import Response, TelegramMethod
from ...methods.base import TelegramType
from ...types import InputFile, TelegramObject
from ..default import Default
from ..telegram import PRODUCTION, TelegramAPIServer
from .middlewares.manager import RequestMiddlewareManager
if TYPE_CHECKING:
from ..bot import Bot
_JsonLoads = Callable[..., Any]
_JsonDumps = Callable[..., str]
DEFAULT_TIMEOUT: Final[float] = 60.0
[docs]
class BaseSession(abc.ABC):
"""
This is base class for all HTTP sessions in aiogram.
If you want to create your own session, you must inherit from this class.
"""
def __init__(
self,
api: TelegramAPIServer = PRODUCTION,
json_loads: _JsonLoads = json.loads,
json_dumps: _JsonDumps = json.dumps,
timeout: float = DEFAULT_TIMEOUT,
) -> None:
"""
:param api: Telegram Bot API URL patterns
:param json_loads: JSON loader
:param json_dumps: JSON dumper
:param timeout: Session scope request timeout
"""
self.api = api
self.json_loads = json_loads
self.json_dumps = json_dumps
self.timeout = timeout
self.middleware = RequestMiddlewareManager()
[docs]
def check_response(
self, bot: Bot, method: TelegramMethod[TelegramType], status_code: int, content: str
) -> Response[TelegramType]:
"""
Check response status
"""
try:
json_data = self.json_loads(content)
except Exception as e:
# Handled error type can't be classified as specific error
# in due to decoder can be customized and raise any exception
raise ClientDecodeError("Failed to decode object", e, content)
try:
response_type = Response[method.__returning__] # type: ignore
response = response_type.model_validate(json_data, context={"bot": bot})
except ValidationError as e:
raise ClientDecodeError("Failed to deserialize object", e, json_data)
if HTTPStatus.OK <= status_code <= HTTPStatus.IM_USED and response.ok:
return response
description = cast(str, response.description)
if parameters := response.parameters:
if parameters.retry_after:
raise TelegramRetryAfter(
method=method, message=description, retry_after=parameters.retry_after
)
if parameters.migrate_to_chat_id:
raise TelegramMigrateToChat(
method=method,
message=description,
migrate_to_chat_id=parameters.migrate_to_chat_id,
)
if status_code == HTTPStatus.BAD_REQUEST:
raise TelegramBadRequest(method=method, message=description)
if status_code == HTTPStatus.NOT_FOUND:
raise TelegramNotFound(method=method, message=description)
if status_code == HTTPStatus.CONFLICT:
raise TelegramConflictError(method=method, message=description)
if status_code == HTTPStatus.UNAUTHORIZED:
raise TelegramUnauthorizedError(method=method, message=description)
if status_code == HTTPStatus.FORBIDDEN:
raise TelegramForbiddenError(method=method, message=description)
if status_code == HTTPStatus.REQUEST_ENTITY_TOO_LARGE:
raise TelegramEntityTooLarge(method=method, message=description)
if status_code >= HTTPStatus.INTERNAL_SERVER_ERROR:
if "restart" in description:
raise RestartingTelegram(method=method, message=description)
raise TelegramServerError(method=method, message=description)
raise TelegramAPIError(
method=method,
message=description,
)
[docs]
@abc.abstractmethod
async def close(self) -> None: # pragma: no cover
"""
Close client session
"""
pass
[docs]
@abc.abstractmethod
async def make_request(
self,
bot: Bot,
method: TelegramMethod[TelegramType],
timeout: Optional[int] = None,
) -> TelegramType: # pragma: no cover
"""
Make request to Telegram Bot API
:param bot: Bot instance
:param method: Method instance
:param timeout: Request timeout
:return:
:raise TelegramApiError:
"""
pass
[docs]
@abc.abstractmethod
async def stream_content(
self,
url: str,
headers: Optional[Dict[str, Any]] = None,
timeout: int = 30,
chunk_size: int = 65536,
raise_for_status: bool = True,
) -> AsyncGenerator[bytes, None]: # pragma: no cover
"""
Stream reader
"""
yield b""
[docs]
def prepare_value(
self,
value: Any,
bot: Bot,
files: Dict[str, Any],
_dumps_json: bool = True,
) -> Any:
"""
Prepare value before send
"""
if value is None:
return None
if isinstance(value, str):
return value
if isinstance(value, Default):
default_value = bot.default[value.name]
return self.prepare_value(default_value, bot=bot, files=files, _dumps_json=_dumps_json)
if isinstance(value, InputFile):
key = secrets.token_urlsafe(10)
files[key] = value
return f"attach://{key}"
if isinstance(value, dict):
value = {
key: prepared_item
for key, item in value.items()
if (
prepared_item := self.prepare_value(
item, bot=bot, files=files, _dumps_json=False
)
)
is not None
}
if _dumps_json:
return self.json_dumps(value)
return value
if isinstance(value, list):
value = [
prepared_item
for item in value
if (
prepared_item := self.prepare_value(
item, bot=bot, files=files, _dumps_json=False
)
)
is not None
]
if _dumps_json:
return self.json_dumps(value)
return value
if isinstance(value, datetime.timedelta):
now = datetime.datetime.now()
return str(round((now + value).timestamp()))
if isinstance(value, datetime.datetime):
return str(round(value.timestamp()))
if isinstance(value, Enum):
return self.prepare_value(value.value, bot=bot, files=files)
if isinstance(value, TelegramObject):
return self.prepare_value(
value.model_dump(warnings=False),
bot=bot,
files=files,
_dumps_json=_dumps_json,
)
if _dumps_json:
return self.json_dumps(value)
return value
async def __call__(
self,
bot: Bot,
method: TelegramMethod[TelegramType],
timeout: Optional[int] = None,
) -> TelegramType:
middleware = self.middleware.wrap_middlewares(self.make_request, timeout=timeout)
return cast(TelegramType, await middleware(bot, method))
async def __aenter__(self) -> BaseSession:
return self
async def __aexit__(
self,
exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> None:
await self.close()