Задать вопрос
@constantinesx

Как правильно управлять жизненным циклом сессии sqlalchemy в долгих задачах?

Возник вопрос по построению правильной архитектуры приложения для эффективной работы с БД и, желательно, соблюдением принципов DI, ну и в целом чтобы почище было.

Изначальная ситуация:
В своих проектах использую dishka для инъекции зависимостей.
Например, вот такой код для управления объектом сессии sqlalchemy:

class ProviderDatabase(Provider):
    @provide(scope=Scope.APP)
    def provide_async_engine(self, config: DatabaseConfig) -> AsyncEngine:
        return create_async_engine(
            url=config.url,
            echo=config.sqlalchemy_echo,
        )

    @provide(scope=Scope.APP)
    def provide_async_session_maker(
        self, engine: AsyncEngine
    ) -> async_sessionmaker[AsyncSession]:
        return async_sessionmaker(
            bind=engine,
            class_=AsyncSession,
            expire_on_commit=False,
        )

    @provide(scope=Scope.REQUEST)
    async def provide_async_session(
        self, sessionmaker: async_sessionmaker[AsyncSession]
    ) -> AsyncIterable[AsyncSession]:
        async with sessionmaker() as session:
            try:
                yield session
                await session.commit()
            except Exception:
                await session.rollback()
                raise

Вот так выглядит провайдер дишка для слоя репозиториев:

class ProviderRepositories(Provider):
    @provide(scope=Scope.REQUEST)
    async def provide_job_repository(self, session: AsyncSession) -> JobRepository:
        return JobRepository(session)


Вот так выглядит провайдер дишка для слоя сервисов:
class ProviderServices(Provider):
    @provide(scope=Scope.REQUEST)
    async def provide_job_service(self, job_repo: JobRepository) -> JobService:
        return JobService(job_repo)

Получается следующая цепочка зависимостей: сервисный слой -> слой репозиториев -> объект сессии -> построитель сессии -> движок БД. Это достаточно хорошо работает в веб приложениях, где на каждый запрос создается объект сессии, репозиторий и сервис (движок и построитель живут пока запущено приложение).

Описание проблемы:
Проблема заключается в том, что данных подход не годиться для долгих задач, выполнение которых занимает несколько минут, объект сессии "протухает".
Например (использую taskiq):

@broker.task
async def long_task(job_service: FromDishka[JobService]):
    job = await job_service.start()
    ... # Тут выполняется некая долгая логика, порядка нескольких минут. Например, получение всех товаров с WB
    await job_service.finish()

При выполнении await job_service.finish() достаточно часто (но не всегда) происходят ошибки обращения к БД. ChatGPT говорит, что нельзя долго держать объект сессии и я склонен ему доверять в этом вопросе.

Возможные варианты решения:
1. Дробить одну большую задачу на три помельче: в первой стартовать джоб, во второй выполнять долгие операции, которые не требует сессии БД, в третьей записывать полученные данные в БД и финализировать джоб.
Плюсы: Не придётся менять существующий бойлерплэйт.
Минусы: Много задач для одного действия, расходы на сериализацию и десериализацию данных, передача большого объема данных через очередь. Возможны трудновоспроизводимые ошибки.
2. В конструктор репозитория передавать не объект сессии, а построитель сессии (async_sessionmaker).
Плюсы: Позволяет создавать сессию в методах репозитория и она живет очень короткое время.
Минусы: commit приходится делать тоже в методах репозитория, в связи с этим не ясно как откатывать изменения если что-то пойдет не так.
3. В конструктор сервиса передавать построитель сессии (async_sessionmaker).
Плюсы: Позволяет создавать сессию в методах сервиса, затем прокидывать ее в конструктор репозитория, сессия живет короткое время. Транзакцией можно управлять из метода сервиса, откатиться в случае ошибки можно без особых проблем.
Минусы: Создание зависимости (репозитория) в методах сервиса - DI идет нафиг.

В общем, идеального решения со своего уровня знаний и опыта не вижу.

Вопросы:
1. Вряд ли до меня никто не сталкивался с данной проблемой, хочется узнать мнение более опытных людей на этот счёт - как управлять объектом сессии в долгих задачах?
2. Может dishka из коробки предлагает какое-то решение?
3. Слабо понимаю паттерн UnitOfWork, стоит ли мне пытаться применить его для решения данной проблемы?
  • Вопрос задан
  • 86 просмотров
Подписаться 1 Средний Комментировать
Помогут разобраться в теме Все курсы
  • Нетология
    Fullstack-разработчик на Python + нейросети
    20 месяцев
    Далее
  • Академия Эдюсон
    Python-разработчик + ИИ
    9 месяцев
    Далее
  • ProductStar × РБК
    Python-разработчик с нуля
    6 месяцев
    Далее
Пригласить эксперта
Ответы на вопрос 1
opium
@opium
Просто люблю качественно работать
вариант 3 рабочий, "DI идёт нафиг" — это преувеличение. Прокидывай sessionmaker в сервис и в методах делай async with self.sessionmaker.begin() as session: — транзакция сама откроется и закроется с коммитом/роллбэком.

Кста, dishka нативно поддерживает taskiq (dishka.integrations.taskiq), там REQUEST-скоуп создаётся на каждый таск отдельно. Это не панацея от долгих сессий, но структуру правильную даёт сразу.
Ответ написан
Комментировать
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы