Source code for aiogram.types.input_file
import asyncio
import inspect
import io
import logging
import os
import secrets
from pathlib import Path
from typing import Union, Optional
import aiohttp
from . import base
from ..bot import api
CHUNK_SIZE = 65536
log = logging.getLogger('aiogram')
[docs]class InputFile(base.TelegramObject):
"""
This object represents the contents of a file to be uploaded.
Must be posted using multipart/form-data in the usual way that files are uploaded via the browser.
Also that is not typical TelegramObject!
https://core.telegram.org/bots/api#inputfile
"""
def __init__(self, path_or_bytesio: Union[str, io.IOBase, Path, '_WebPipe'], filename=None, conf=None):
"""
:param path_or_bytesio:
:param filename:
:param conf:
"""
super(InputFile, self).__init__(conf=conf)
if isinstance(path_or_bytesio, str):
# As path
self._file = open(path_or_bytesio, 'rb')
self._path = path_or_bytesio
if filename is None:
filename = os.path.split(path_or_bytesio)[-1]
elif isinstance(path_or_bytesio, (io.IOBase, _WebPipe)):
self._path = None
self._file = path_or_bytesio
elif isinstance(path_or_bytesio, Path):
self._file = path_or_bytesio.open("rb")
self._path = path_or_bytesio.resolve()
if filename is None:
filename = path_or_bytesio.name
else:
raise TypeError('Not supported file type.')
self._filename = filename
self.attachment_key = secrets.token_urlsafe(16)
def __del__(self):
"""
Close file descriptor
"""
if not hasattr(self, '_file'):
return
if inspect.iscoroutinefunction(self._file.close):
return asyncio.ensure_future(self._file.close())
self._file.close()
@property
def filename(self):
if self._filename is None:
self._filename = api.guess_filename(self._file)
return self._filename
@filename.setter
def filename(self, value):
self._filename = value
@property
def attach(self):
return f"attach://{self.attachment_key}"
@property
def file(self):
return self._file
[docs] @classmethod
def from_url(cls, url, filename=None, chunk_size=CHUNK_SIZE):
"""
Download file from URL
Manually is not required action. You can send urls instead!
:param url: target URL
:param filename: optional. set custom file name
:param chunk_size:
:return: InputFile
"""
pipe = _WebPipe(url, chunk_size=chunk_size)
if filename is None:
filename = pipe.name
return cls(pipe, filename)
[docs] def save(self, filename, chunk_size=CHUNK_SIZE):
"""
Write file to disk
:param filename:
:param chunk_size:
"""
with open(filename, 'wb') as fp:
while True:
# Chunk writer
data = self.file.read(chunk_size)
if not data:
break
fp.write(data)
# Flush all data
fp.flush()
# Go to start of file.
if self.file.seekable():
self.file.seek(0)
def __str__(self):
return f"<InputFile 'attach://{self.attachment_key}' with file='{self.file}'>"
__repr__ = __str__
[docs] @classmethod
def to_object(cls, data):
raise TypeError('Object of this type is not importable!')
class _WebPipe:
def __init__(self, url, chunk_size=-1):
self.url = url
self.chunk_size = chunk_size
self._session: Optional[aiohttp.ClientSession] = None
self._response: Optional[aiohttp.ClientResponse] = None
self._reader = None
self._name = None
self._lock = asyncio.Lock()
@property
def name(self):
if not self._name:
*_, part = self.url.rpartition('/')
self._name = part or secrets.token_urlsafe(24)
return self._name
async def open(self):
session = self._session = aiohttp.ClientSession()
self._response = await session.get(self.url) # type: aiohttp.ClientResponse
await self._lock.acquire()
return self
async def close(self):
if self._response and not self._response.closed:
self._response.close()
if self._session and not self._session.closed:
await self._session.close()
if self._lock.locked():
self._lock.release()
@property
def closed(self):
return not self._session or self._session.closed
def __aiter__(self):
return self
async def __anext__(self):
if self.closed:
await self.open()
chunk = await self.read(self.chunk_size)
if not chunk:
await self.close()
raise StopAsyncIteration
return chunk
async def read(self, chunk_size=-1):
if not self._response:
raise LookupError('I/O operation on closed stream')
response: aiohttp.ClientResponse = self._response
reader: aiohttp.StreamReader = response.content
return await reader.read(chunk_size)
def __str__(self):
result = f"WebPipe url='{self.url}', name='{self.name}'"
return '<' + result + '>'
__repr__ = __str__