Проект нормально работал, до тех пор, пока не потребовалось внедрить периодические таски.
Так как декоратор убран из функционала использую
это# coding: utf8
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from api.controllers.message_scheduled import scheduled
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'backend.settings')
app = Celery('backend')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
# Redis
app.conf.broker_url = 'redis://redis:6379/0'
# Enable events
# app.control.enable_events()
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# sender.add_periodic_task(60.0, message_scheduled.s(), name='Scheduled message')
sender.add_periodic_task(10.0, scheduled.s(), name='add every 10')
# # Calls test('hello') every 10 seconds.
# sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')
#
# # Calls test('world') every 30 seconds
# sender.add_periodic_task(30.0, test.s('world'), expires=10)
#
# # Executes every Monday morning at 7:30 a.m.
# sender.add_periodic_task(
# crontab(hour=7, minute=30, day_of_week=1),
# test.s('Happy Mondays!'),
# )
@app.task
def test(arg):
print(arg)
В этом случаи получаю ошибку
web_1 | File "/usr/local/lib/python3.4/site-packages/django/apps/registry.py", line 239, in get_containing_app_config
web_1 | self.check_apps_ready()
web_1 | File "/usr/local/lib/python3.4/site-packages/django/apps/registry.py", line 124, in check_apps_ready
web_1 | raise AppRegistryNotReady("Apps aren't loaded yet.")
web_1 | django.core.exceptions.AppRegistryNotReady: Apps aren't loaded yet.
Как импортировать функции из джанго приложения?