maximkv25
@maximkv25
web-developer

Как вызвать функцию из task в celery?

Есть функция в директории
app/stats/views.py

@task(name='Sorting response')
def sorting_task_response(*args):
    pass


Есть tasks.py
app/api/tasks.py

from __future__ import absolute_import, unicode_literals
# Create your tasks here
import random
import requests
import json
from datetime import datetime
from celery import task, shared_task, Task, group, chord
from stats.views import sorting_task_response


class NotifierTask(Task):
    """
    Tasks that sends notification on completion.
    """
    abstract = True

    def after_return(self, status, retval, task_id, args, kwargs, einfo):
        print(status, retval, task_id, args, kwargs, einfo)
        print('Task done')

def notify_client_ws(token, response):
    data = dict(token=token, notice=response[0])
    url = 'http://ws_server:8060/ws/celery/response'
    resp = requests.post(url, data=json.dumps(data))

# Celery callback
@task(name='CallbackNotifier')
def callback_notifier(*args, **kwargs):
    tasks_response = kwargs.get("tasks_response")
    notify_client_ws(token=kwargs.get("user_token"), response=args)
    return True


@task(name='Data after receiving statistics')
def stats_result(*args, **kwargs):
    sorting_task_response.d(args)


При импортировании sorting_task_response получаю ошибку
flower_1     |   File "/app/api/tasks.py", line 8, in <module>
flower_1     |     from stats.views import sorting_task_response
flower_1     | ImportError: cannot import name 'sorting_task_response'


Как импортировать функцию?
  • Вопрос задан
  • 427 просмотров
Пригласить эксперта
Ответы на вопрос 1
Ranc58
@Ranc58
Backend python developer
from .stats.views import sorting_task_response - так тоже выбивает ошибку?
Ответ написан
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы