@toobinks

Flask Celery Singleton как?

Хочу сделать задачу синглтоном.

utils/celery.py
def init_celery(app, celery):
    celery.conf.update(app.config)
    TaskBase = celery.Task

    class ContextTask(TaskBase):
        abstract = True

        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)

    celery.Task = ContextTask


tasks.py
class CelerySingleton(celery.Task, Singleton):
    pass

@celery.task(base=CelerySingleton)
def calc():
# при обращении к sqlalchemy получаю
# application not registered on db instance and no application bound to current context


Я так понимаю, что в tasks.py класс CelerySingleton наследует celery.Task, который совсем не ContextTask. Но как сделать правильно?
  • Вопрос задан
  • 349 просмотров
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы