Задать вопрос
maximkv25
@maximkv25
web-developer

Как собрать выполнения всех тасков с одной функции для дальнейшего ответа пользователю?

Приветствую,

задача состоит в ожидании выполнения всех тасков с дальнейшим ответом клиенту, пример кода для более конкретного понимания

@task(base=NotifierTask)
first(*args):
	# runs some time
	return json_resp

@task(base=NotifierTask)
second(*args):
	# runs some time
	return json_resp

@task(base=NotifierTask)
third(*args):
	# runs some time
	return json_resp


# На этот урл идет запрос
# количество выполнения ф-ций может варьироваться в зависимости от запроса 
# может выполниться от 1 до 40 ф-ций
def main(request):
	# there we get some data
	data = request.POST.get('data')

	for some_func in data:
		if some_func == 'first':
			first.delay(args)
		elif some_func == 'second':
			second.delay(args)
		elif some_func == 'third':
			third.delay(args)

# Все это дело попадает на воркер селери

# Notifier Task
class NotifierTask(Task):
    """
    Tasks that sends notification on completion.
    """
    abstract = True

    def after_return(self, status, retval, task_id, args, kwargs, einfo):
        # Получаем результат выполнения и прокидываем через сокет на сторону клиента


Задача состоит в получение результата выполнения всех ф-ций, чтобы в дальнейшем сформировать ответ для пользователя.
Предназначение ф-ции main только в принятии запроса от пользователя, быстрое распределение задач по воркерам и ответ без ожидания их выполнения.
Какие варианты можете посоветовать?

Использование celery group
При тесте выполнение тасков ведет себя не предсказуемо
@task(base=NotifierTask)
def celery_test(id, position):
    time.sleep(id)
    return "Time delay: %s, Position: %s" % (str(id), str(position))


def some_request(request):
    from django.http import JsonResponse
    if request.method == 'GET':

        # tasks_list = []
        # count = 0
        # while count < 10:
        #     count += 1
        #     tasks_list.append(celery_test.s(count))
        #
        # job = group(tasks_list)

        job = group([
            celery_test.s(10, 1),
            celery_test.s(2, 2),
            celery_test.s(2, 3),
            celery_test.s(2, 4),
            celery_test.s(5, 5),
        ])

        job.apply_async()


        return JsonResponse(dict(success='Done'))


Результат выполнения
5a43a380d6d2b004415545.png

Видно что таск с id=10 выполнился мгновенно, без задержки и по результатам его значение суммировалось к остальным.
Чем объясняется такое поведение или же я что-то не верно делаю?

Выходит что таски сохраняют порядок выполнения заданный в group, это видно по flower и логу воркера
  • Вопрос задан
  • 151 просмотр
Подписаться 2 Простой 6 комментариев
Решения вопроса 1
собираем таски в лист и скармливаем это все в целери функцию chord
task_ = []
task_.append(clear_old_project.s(api_key=api_key))
for group_ in list_all_groups:
            task_.append(frequency_start.s(api_key=api_key,
                                           job_request_id=job_request_id,
                                           app_secret=app_secret,
                                           project_id=proj_id,
                                           group_id=group_.id,
                                           providers=providers,
                                           region_key=region_key,
                                           phrase_forms=phrase_forms))

chord(task_)(generate_reports_frequency.s(api_key, proj_id, job_request_id, app_secret, providers, region_key,o_currency))


по завершению всех тасков, результаты будут переданы generate_reports_frequency как лист return-ов jn от всех функций в одну переменную (первый параметр)
Ответ написан
Комментировать
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы