Как понять, что все сообщения в очереди обработаны?

Раньше с очередями дела не имел, так что мог соорудить жуткий велосипед. Критика приветствуется. Очень может быть, что я просто не вижу очевидного или сам себя загнал в тупик.

Использую последний beanstalkd (1.13) , клиенты написаны с использованием pheanstalk (v5.0.5).
На всякий случай ремарка: на этих версиях воркер подключается к очереди. даже если она пустая, и бесконечно висит в ожидании задания (job-а). Встроенные таймауты тут неприменимы ( что логично, брокер-то живой и здоровый) или я чего-то не понимаю. Соответственно, узнать длину очереди в таком состоянии воркер не может.

Задача: мне прилетает здоровенный файл, я разбираю его на кусочки и запихиваю в очередь. По ту сторону очереди ждут 4 воркера. После того, как все задания будут обработаны, мне надо запустить некоторые агрегирующие процессы - поэтому я жду опустошения этой очереди. Но очередь опустеет в момент, когда последнее задание только ушло в обработку. К тому же, в теории, воркер может внезапно умереть (ни разу такого не было), и тогда это же задание по таймауту вернётся в очередь и его возьмет другой воркер.
Плюс, воркер в процессе собирает статистику, и должен как-то отдать её в конце работы. Я пока что решил это таким способом: напихиваю в конец очереди N (по числу воркеров) специальных заданий останова - воркер получает такое задание, отключается от очереди и делает всё необходимое.

Но вот как к этой конструкции приспособить агрегатные задачи? Или как изменить конструкцию, чтобы стало легко и просто жить?
( если это важно: агрегатным заданиям не нужно знать, какие именно задания были в очереди. )
  • Вопрос задан
  • 178 просмотров
Пригласить эксперта
Ответы на вопрос 1
По мне, так для того, чтобы знать когда задача обработана, достаточно отправлять сообщение об окончании в отдельную очередь "оповещения". Обработчик очереди будет слушать ее и может запустить "агрегирующие процессы".

Соответственно, узнать длину очереди в таком состоянии воркер не может.
Обработчики очередей не должны заботиться о длине очереди. Это не их ответственность. Они находятся в постоянной готовности обработать следующую задачу.

Плюс, воркер в процессе собирает статистику, и должен как-то отдать её в конце работы. Я пока что решил это таким способом: напихиваю в конец очереди N (по числу воркеров) специальных заданий останова - воркер получает такое задание, отключается от очереди и делает всё необходимое.

статистику можно сохранять в СУБД, отправлять в какую-либо отдельную очередь "отчеты" и пр. (в общем обмениваться через IPC).
Ответ написан
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы