Есть задачи (task) такого типа - по заданному набору предложений я получаю список ссылок. Далее получаю содержимое страниц по этих ссылках и их анализирую (сам анализ может длиться долго, от 1 сек и до 5 минут). Конечно, в процессе анализа возникают исключения, которые нужно перехватить и прервать выполнение всего задания в целом. Отсюда следует, что нужно отменить выполнение и других задач, которые уже выполняются.
Сейчас работает примерно такой код
@celery_app.task(
soft_time_limit=180,
ignore_result=False
)
def search_job(...)
try:
markup, error = get_content(url)
# код анализа страницы здесь
except SoftTimeLimitExceeded:
log += url + " - Soft Time Limit Exceeded Error Catch!\n"
result['log'] = log
return result
@celery_app.task(
queue="ta",
soft_time_limit=600,
base=BaseCeleryTask
)
def search(...):
try:
# сдесь получение списка list_urls
group_work = group(search_job.s(..., url) for url in list_urls)()
while not group_work.ready():
time.sleep(1)
except SoftTimeLimitExceeded:
# Если это задание ждет на результат выполнения всех заданий в группе более 600 секунд
# я хочу отменить все "мелкие" задания и считать что задание в целом завершилось с ошибкою
group_work.revoke()
log += "Big task time limit exceeded!"
# Если все норм - получаем результаты. Оказывается, так делать нельзя(((
with allow_join_result:
results = group_work.get()
# Далее их сохраняем
save_results(results)
так вот, на практике мелкие задания зависают, SoftTimeLimitExceeded не ловиться. И они просто висят и кушают память. Такое бывает редко, но все таки.
Какое может быть лучшее решение для этой задачи?