@m0ody
backend dev (python, django, postgresql, celery)

Celery: вложенный chord и события

Коллеги, возникла непонятная ситуация с вложенными chord.
Есть таски:
@app.task(base=CheckerTask, name='task_a')
def task_a():
    pass

@app.task(base=CheckerTask, name='task_b')
def task_b():
    pass

@app.task
def complete():
    pass

Создаю несколько chord'ов с данными тасками и с событием по завершению - complete:
chord_a = chord([task_a.s(...).set(queue='st'), task_b.s(...).set(queue='st')], complete.s(...).set(queue='publish'))
chord_b = chord([task_a.s(...).set(queue='st'), task_b.s(...).set(queue='st')], complete.s(...).set(queue='publish'))

Далее создаю "главный" chord с ранее созданными chord'ами и выполняю его:
chord([chord_a.s(), chord_b.s()], main_complete.s().set(queue='publish')).apply_async()

Проблема в том, что таски отрабатывают нормально, а события complete, main_complete не вызываются.
Если вызывать chord_a.apply_async(), chord_b.apply_async() напрямую, то события complete вызываются успешно, но мне нужно событие завершения этих chord'ов.
Брокер: redis, Results backend: redis.
  • Вопрос задан
  • 3948 просмотров
Пригласить эксперта
Ответы на вопрос 1
suguby
@suguby
программист, python, django, mysql, git, hg, linux
Только что сделал тест:
import celery
from celery.canvas import chord

@celery.task
def task_a(*args, **kwargs):
    logger.info('task_a')
    return 1

@celery.task
def task_b(*args, **kwargs):
    logger.info('task_b')
    return 1

@celery.task
def task_c(*args, **kwargs):
    logger.info('task_c')
    return 1

@celery.task
def task_d(*args, **kwargs):
    logger.info('task_d')
    return 1

@celery.task
def notify_a(*args, **kwargs):
    logger.info('notify_a')
    return 1

@celery.task
def notify_b(*args, **kwargs):
    logger.info('notify_b')
    return 1

@celery.task
def finish(*args, **kwargs):
    logger.info('finish')
    return 1


def test_chord_chord():
    ch1 = chord([task_a.s(), task_b.s()], notify_a.s())
    ch2 = chord([task_c.s(), task_d.s()], notify_b.s())

    main_task = chord([ch1, ch2], finish.s())
    main_task.apply_async()

все задачи выполнились.

[2014-06-04 16:30:29,811: INFO/MainProcess] Got task from broker: celery.chord[36d088c0-648b-482e-9075-59e6aa1d519a]
[2014-06-04 16:30:29,883: INFO/MainProcess] Task celery.chord[36d088c0-648b-482e-9075-59e6aa1d519a] succeeded in 0.0439801216125s: <GroupResult:...
[2014-06-04 16:30:30,821: INFO/MainProcess] Got task from broker: celery.chord_unlock[7deb4165-c2fb-4771-8c56-edf30912b4c3] eta:[2014-06-04 16:30:30.845580+04:00]
[2014-06-04 16:30:30,825: INFO/MainProcess] Got task from broker: celery.chord[41452081-7a54-4c8f-8328-a5861b9ceefc]
[2014-06-04 16:30:30,828: INFO/MainProcess] Got task from broker: celery.chord[2a448074-02a1-4bb2-bf2a-9942b6157baa]
[2014-06-04 16:30:30,875: INFO/MainProcess] Task celery.chord[2a448074-02a1-4bb2-bf2a-9942b6157baa] succeeded in 0.0350089073181s: <GroupResult:...
[2014-06-04 16:30:30,876: INFO/MainProcess] Task celery.chord[41452081-7a54-4c8f-8328-a5861b9ceefc] succeeded in 0.0363390445709s: <GroupResult:...
[2014-06-04 16:30:30,876: INFO/MainProcess] Task celery.chord_unlock[7deb4165-c2fb-4771-8c56-edf30912b4c3] retry: Retry in 1s
[2014-06-04 16:30:31,834: INFO/MainProcess] Got task from broker: celery.chord_unlock[6ffd6da8-e2b0-405d-912f-55e2ce52b10b] eta:[2014-06-04 16:30:31.846696+04:00]
[2014-06-04 16:30:31,837: INFO/MainProcess] Got task from broker: aggregator.tasks.task_c[1bc49e88-9348-441d-a5f2-776e35f3d178]
[2014-06-04 16:30:31,840: INFO/MainProcess] Got task from broker: aggregator.tasks.task_d[7ce8e784-2eee-45bf-81c4-ecebe8343c76]
[2014-06-04 16:30:31,843: INFO/MainProcess] Got task from broker: celery.chord_unlock[b6520638-e68e-4600-a306-9445ba54d7bd] eta:[2014-06-04 16:30:31.845846+04:00]
[2014-06-04 16:30:31,846: INFO/MainProcess] Got task from broker: aggregator.tasks.task_a[f624d9df-a32e-40a7-beff-cbf49f414a2f]
[2014-06-04 16:30:31,850: INFO/MainProcess] Got task from broker: aggregator.tasks.task_b[1979141c-2a9e-4dbb-a56f-848711c58afa]
[2014-06-04 16:30:31,856: INFO/MainProcess] Got task from broker: celery.chord_unlock[7deb4165-c2fb-4771-8c56-edf30912b4c3] eta:[2014-06-04 16:30:31.857219+04:00]
[2014-06-04 16:30:31,860: INFO/MainProcess] task_c
[2014-06-04 16:30:31,860: INFO/MainProcess] task_a
[2014-06-04 16:30:31,861: INFO/MainProcess] task_d
[2014-06-04 16:30:31,863: INFO/MainProcess] Task aggregator.tasks.task_c[1bc49e88-9348-441d-a5f2-776e35f3d178] succeeded in 0.00339508056641s: 1
[2014-06-04 16:30:31,865: INFO/MainProcess] task_b
[2014-06-04 16:30:31,866: INFO/MainProcess] Task aggregator.tasks.task_a[f624d9df-a32e-40a7-beff-cbf49f414a2f] succeeded in 0.00567007064819s: 1
[2014-06-04 16:30:31,867: INFO/MainProcess] Task aggregator.tasks.task_d[7ce8e784-2eee-45bf-81c4-ecebe8343c76] succeeded in 0.00664710998535s: 1
[2014-06-04 16:30:31,868: INFO/MainProcess] Task aggregator.tasks.task_b[1979141c-2a9e-4dbb-a56f-848711c58afa] succeeded in 0.00276899337769s: 1
[2014-06-04 16:30:31,870: INFO/MainProcess] Task celery.chord_unlock[6ffd6da8-e2b0-405d-912f-55e2ce52b10b] succeeded in 0.00752401351929s: None
[2014-06-04 16:30:31,885: INFO/MainProcess] Task celery.chord_unlock[7deb4165-c2fb-4771-8c56-edf30912b4c3] retry: Retry in 1s
[2014-06-04 16:30:31,890: INFO/MainProcess] Task celery.chord_unlock[b6520638-e68e-4600-a306-9445ba54d7bd] retry: Retry in 1s
[2014-06-04 16:30:32,865: INFO/MainProcess] Got task from broker: aggregator.tasks.notify_b[c7ee5cde-934f-4838-b7ec-4b965deab3f8]
[2014-06-04 16:30:32,869: INFO/MainProcess] Got task from broker: celery.chord_unlock[b6520638-e68e-4600-a306-9445ba54d7bd] eta:[2014-06-04 16:30:32.870193+04:00]
[2014-06-04 16:30:32,871: INFO/MainProcess] notify_b
[2014-06-04 16:30:32,873: INFO/MainProcess] Got task from broker: celery.chord_unlock[7deb4165-c2fb-4771-8c56-edf30912b4c3] eta:[2014-06-04 16:30:32.883596+04:00]
[2014-06-04 16:30:32,874: INFO/MainProcess] Task aggregator.tasks.notify_b[c7ee5cde-934f-4838-b7ec-4b965deab3f8] succeeded in 0.00337815284729s: 1
[2014-06-04 16:30:32,882: INFO/MainProcess] Task celery.chord_unlock[b6520638-e68e-4600-a306-9445ba54d7bd] succeeded in 0.00427103042603s: None
[2014-06-04 16:30:32,893: INFO/MainProcess] Task celery.chord_unlock[7deb4165-c2fb-4771-8c56-edf30912b4c3] retry: Retry in 1s
[2014-06-04 16:30:33,881: INFO/MainProcess] Got task from broker: celery.chord_unlock[7deb4165-c2fb-4771-8c56-edf30912b4c3] eta:[2014-06-04 16:30:33.888341+04:00]
[2014-06-04 16:30:33,884: INFO/MainProcess] Got task from broker: aggregator.tasks.notify_a[5fb97e6c-5dc3-42a9-b52f-f6aaf228bfbc]
[2014-06-04 16:30:33,888: INFO/MainProcess] notify_a
[2014-06-04 16:30:33,890: INFO/MainProcess] Task aggregator.tasks.notify_a[5fb97e6c-5dc3-42a9-b52f-f6aaf228bfbc] succeeded in 0.00259184837341s: 1
[2014-06-04 16:30:33,898: INFO/MainProcess] Task celery.chord_unlock[7deb4165-c2fb-4771-8c56-edf30912b4c3] succeeded in 0.00864100456238s: None
[2014-06-04 16:30:34,890: INFO/MainProcess] Got task from broker: aggregator.tasks.finish[1042518e-bea4-49fb-a11c-37b5ffa29c1a]
[2014-06-04 16:30:34,893: INFO/MainProcess] finish
[2014-06-04 16:30:34,895: INFO/MainProcess] Task aggregator.tasks.finish[1042518e-bea4-49fb-a11c-37b5ffa29c1a] succeeded in 0.00239610671997s: 1

requirements.txt:
Django==1.5.5
celery==3.0.19
celery-with-mongodb==3.0
django-celery==3.0.17
kombu==2.5.10

settings.py
INSTALLED_APPS = (
        ...
        'djcelery',
        'celery',
        ...
)
BROKER_URL = MONGO_CONN_STR
CELERY_RESULT_BACKEND = "mongodb"
Ответ написан
Комментировать
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы