пишу веб приложение с использованием
FastAPI, запросы у меня выполняются с помощью
aiohttp, сейчас как мне кажется не лучшим способом. По сути для каждого нового запроса в режиме контекстного менеджера - я создаю новую сессию и после запроса закрываю её.
from typing import Any
from aiohttp import ClientSession, ClientResponse
from aiohttp import ClientResponseError, ClientTimeout
from src.logger import get_logger
logger = get_logger(__name__)
class HTTPClient:
def __init__(self, base_url: str = None, timeout: int = 60) -> None:
self.base_url = base_url
self.timeout = ClientTimeout(timeout)
async def _request(
self,
method: str,
url: str,
**kwargs: Any
) -> ClientResponse:
try:
async with ClientSession(
self.base_url, timeout=self.timeout) as session:
async with session.request(method, url, **kwargs) as response:
response.raise_for_status()
logger.info(f'[{response.method}]'
f'[{response.status}]'
f' -> {response.url}')
return response if response else None
except ClientResponseError as error:
logger.error(f'CRE: [{error.request_info.method}][{error.status}]'
f' {error.request_info.url} -> {error.message}')
return None
except Exception as exc:
logger.error(f'Exception: {exc}')
return None
async def get(self, url: str, **kwargs: Any) -> ClientResponse:
return await self._request('GET', url, **kwargs)
async def post(self, url: str, **kwargs: Any) -> ClientResponse:
return await self._request('POST', url, **kwargs)
async def put(self, url: str, **kwargs: Any) -> ClientResponse:
return await self._request('PUT', url, **kwargs)
async def delete(self, url: str, **kwargs: Any) -> ClientResponse:
return await self._request('DELETE', url, **kwargs)
Такой подход как мне кажется пагубно влияет на производительность. Мне не понятно как используя эту библиотеку - работать с ней максимально правильно.
Возьмем пример с моим классом, допустим у меня есть несколько сервисов на которые я хочу отправлять запросы, для каждого такого сервиса я создал отдельный класс со своими методами и наследуюсь от
HTTPClient чтобы внутри использовать его методы. Допустим мы убрали работу асинхронного менеджера который позволяет нам автоматически закрывать сессию после запроса. И вот мы спокойно работаем через наследование - но вот пришло время остановить приложение и как только я его останавливаю - вероятнее всего у меня посыплются ошибки о не закрытом соединении.
Я хочу понять как это работает в больших приложениях, где куча различных экземпляров классов которые могут или наследоваться или просто создавать экземпляр и присваивать ему переменную. Потому как мне сейчас не понятно как это сделать. Возможно создать в том же файле сервиса с его классом - экземпляр класса и использовать его во всех нужных файлах в том числе и при остановке приложение, при учёте fastapi возможно это будет примерно так:
@asynccontextmanager
async def lifespan(app: FastAPI):
first_instance.init()
second_instance.init()
yield
first_instance.close()
second_instance.close()