import asyncio
import secrets
from abc import ABC, abstractmethod
from asyncio import Transport
from typing import Any, Awaitable, Callable, Dict, Optional, Set, Tuple, cast
from aiohttp import MultipartWriter, web
from aiohttp.abc import Application
from aiohttp.typedefs import Handler
from aiohttp.web_middlewares import middleware
from aiogram import Bot, Dispatcher, loggers
from aiogram.methods import TelegramMethod
from aiogram.methods.base import TelegramType
from aiogram.types import InputFile
from aiogram.webhook.security import IPFilter
def setup_application(app: Application, dispatcher: Dispatcher, /, **kwargs: Any) -> None:
"""
This function helps to configure a startup-shutdown process
:param app: aiohttp application
:param dispatcher: aiogram dispatcher
:param kwargs: additional data
:return:
"""
workflow_data = {
"app": app,
"dispatcher": dispatcher,
**dispatcher.workflow_data,
**kwargs,
}
async def on_startup(*a: Any, **kw: Any) -> None: # pragma: no cover
await dispatcher.emit_startup(**workflow_data)
async def on_shutdown(*a: Any, **kw: Any) -> None: # pragma: no cover
await dispatcher.emit_shutdown(**workflow_data)
app.on_startup.append(on_startup)
app.on_shutdown.append(on_shutdown)
def check_ip(ip_filter: IPFilter, request: web.Request) -> Tuple[str, bool]:
# Try to resolve client IP over reverse proxy
if forwarded_for := request.headers.get("X-Forwarded-For", ""):
# Get the left-most ip when there is multiple ips
# (request got through multiple proxy/load balancers)
# https://github.com/aiogram/aiogram/issues/672
forwarded_for, *_ = forwarded_for.split(",", maxsplit=1)
return forwarded_for, forwarded_for in ip_filter
# When reverse proxy is not configured IP address can be resolved from incoming connection
if peer_name := cast(Transport, request.transport).get_extra_info("peername"):
host, _ = peer_name
return host, host in ip_filter
# Potentially impossible case
return "", False # pragma: no cover
[docs]
def ip_filter_middleware(
ip_filter: IPFilter,
) -> Callable[[web.Request, Handler], Awaitable[Any]]:
"""
:param ip_filter:
:return:
"""
@middleware
async def _ip_filter_middleware(request: web.Request, handler: Handler) -> Any:
ip_address, accept = check_ip(ip_filter=ip_filter, request=request)
if not accept:
loggers.webhook.warning("Blocking request from an unauthorized IP: %s", ip_address)
raise web.HTTPUnauthorized()
return await handler(request)
return _ip_filter_middleware
[docs]
class BaseRequestHandler(ABC):
[docs]
def __init__(
self,
dispatcher: Dispatcher,
handle_in_background: bool = False,
**data: Any,
) -> None:
"""
Base handler that helps to handle incoming request from aiohttp
and propagate it to the Dispatcher
:param dispatcher: instance of :class:`aiogram.dispatcher.dispatcher.Dispatcher`
:param handle_in_background: immediately responds to the Telegram instead of
a waiting end of a handler process
"""
self.dispatcher = dispatcher
self.handle_in_background = handle_in_background
self.data = data
self._background_feed_update_tasks: Set[asyncio.Task[Any]] = set()
[docs]
def register(self, app: Application, /, path: str, **kwargs: Any) -> None:
"""
Register route and shutdown callback
:param app: instance of aiohttp Application
:param path: route path
:param kwargs:
"""
app.on_shutdown.append(self._handle_close)
app.router.add_route("POST", path, self.handle, **kwargs)
async def _handle_close(self, app: Application) -> None:
await self.close()
@abstractmethod
async def close(self) -> None:
pass
[docs]
@abstractmethod
async def resolve_bot(self, request: web.Request) -> Bot:
"""
This method should be implemented in subclasses of this class.
Resolve Bot instance from request.
:param request:
:return: Bot instance
"""
pass
@abstractmethod
def verify_secret(self, telegram_secret_token: str, bot: Bot) -> bool:
pass
async def _background_feed_update(self, bot: Bot, update: Dict[str, Any]) -> None:
result = await self.dispatcher.feed_raw_update(bot=bot, update=update, **self.data)
if isinstance(result, TelegramMethod):
await self.dispatcher.silent_call_request(bot=bot, result=result)
async def _handle_request_background(self, bot: Bot, request: web.Request) -> web.Response:
feed_update_task = asyncio.create_task(
self._background_feed_update(
bot=bot, update=await request.json(loads=bot.session.json_loads)
)
)
self._background_feed_update_tasks.add(feed_update_task)
feed_update_task.add_done_callback(self._background_feed_update_tasks.discard)
return web.json_response({}, dumps=bot.session.json_dumps)
def _build_response_writer(
self, bot: Bot, result: Optional[TelegramMethod[TelegramType]]
) -> MultipartWriter:
writer = MultipartWriter(
"form-data",
boundary=f"webhookBoundary{secrets.token_urlsafe(16)}",
)
if not result:
return writer
payload = writer.append(result.__api_method__)
payload.set_content_disposition("form-data", name="method")
files: Dict[str, InputFile] = {}
for key, value in result.model_dump(warnings=False).items():
value = bot.session.prepare_value(value, bot=bot, files=files)
if not value:
continue
payload = writer.append(value)
payload.set_content_disposition("form-data", name=key)
for key, value in files.items():
payload = writer.append(value.read(bot))
payload.set_content_disposition(
"form-data",
name=key,
filename=value.filename or key,
)
return writer
async def _handle_request(self, bot: Bot, request: web.Request) -> web.Response:
result: Optional[TelegramMethod[Any]] = await self.dispatcher.feed_webhook_update(
bot,
await request.json(loads=bot.session.json_loads),
**self.data,
)
return web.Response(body=self._build_response_writer(bot=bot, result=result))
async def handle(self, request: web.Request) -> web.Response:
bot = await self.resolve_bot(request)
if not self.verify_secret(request.headers.get("X-Telegram-Bot-Api-Secret-Token", ""), bot):
return web.Response(body="Unauthorized", status=401)
if self.handle_in_background:
return await self._handle_request_background(bot=bot, request=request)
return await self._handle_request(bot=bot, request=request)
__call__ = handle
[docs]
class SimpleRequestHandler(BaseRequestHandler):
[docs]
def __init__(
self,
dispatcher: Dispatcher,
bot: Bot,
handle_in_background: bool = True,
secret_token: Optional[str] = None,
**data: Any,
) -> None:
"""
Handler for single Bot instance
:param dispatcher: instance of :class:`aiogram.dispatcher.dispatcher.Dispatcher`
:param handle_in_background: immediately responds to the Telegram instead of
a waiting end of handler process
:param bot: instance of :class:`aiogram.client.bot.Bot`
"""
super().__init__(dispatcher=dispatcher, handle_in_background=handle_in_background, **data)
self.bot = bot
self.secret_token = secret_token
def verify_secret(self, telegram_secret_token: str, bot: Bot) -> bool:
if self.secret_token:
return secrets.compare_digest(telegram_secret_token, self.secret_token)
return True
[docs]
async def close(self) -> None:
"""
Close bot session
"""
await self.bot.session.close()
[docs]
async def resolve_bot(self, request: web.Request) -> Bot:
return self.bot
[docs]
class TokenBasedRequestHandler(BaseRequestHandler):
[docs]
def __init__(
self,
dispatcher: Dispatcher,
handle_in_background: bool = True,
bot_settings: Optional[Dict[str, Any]] = None,
**data: Any,
) -> None:
"""
Handler that supports multiple bots the context will be resolved
from path variable 'bot_token'
.. note::
This handler is not recommended in due to token is available in URL
and can be logged by reverse proxy server or other middleware.
:param dispatcher: instance of :class:`aiogram.dispatcher.dispatcher.Dispatcher`
:param handle_in_background: immediately responds to the Telegram instead of
a waiting end of handler process
:param bot_settings: kwargs that will be passed to new Bot instance
"""
super().__init__(dispatcher=dispatcher, handle_in_background=handle_in_background, **data)
if bot_settings is None:
bot_settings = {}
self.bot_settings = bot_settings
self.bots: Dict[str, Bot] = {}
def verify_secret(self, telegram_secret_token: str, bot: Bot) -> bool:
return True
async def close(self) -> None:
for bot in self.bots.values():
await bot.session.close()
[docs]
def register(self, app: Application, /, path: str, **kwargs: Any) -> None:
"""
Validate path, register route and shutdown callback
:param app: instance of aiohttp Application
:param path: route path
:param kwargs:
"""
if "{bot_token}" not in path:
raise ValueError("Path should contains '{bot_token}' substring")
super().register(app, path=path, **kwargs)
[docs]
async def resolve_bot(self, request: web.Request) -> Bot:
"""
Get bot token from a path and create or get from cache Bot instance
:param request:
:return:
"""
token = request.match_info["bot_token"]
if token not in self.bots:
self.bots[token] = Bot(token=token, **self.bot_settings)
return self.bots[token]