Хочу сделать задачу синглтоном.
utils/celery.py
def init_celery(app, celery):
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
tasks.py
class CelerySingleton(celery.Task, Singleton):
pass
@celery.task(base=CelerySingleton)
def calc():
# при обращении к sqlalchemy получаю
# application not registered on db instance and no application bound to current context
Я так понимаю, что в tasks.py класс CelerySingleton наследует celery.Task, который совсем не ContextTask. Но как сделать правильно?