Как понять, что все сообщения в очереди обработаны?
Раньше с очередями дела не имел, так что мог соорудить жуткий велосипед. Критика приветствуется. Очень может быть, что я просто не вижу очевидного или сам себя загнал в тупик.
Использую последний beanstalkd (1.13) , клиенты написаны с использованием pheanstalk (v5.0.5).
На всякий случай ремарка: на этих версиях воркер подключается к очереди. даже если она пустая, и бесконечно висит в ожидании задания (job-а). Встроенные таймауты тут неприменимы ( что логично, брокер-то живой и здоровый) или я чего-то не понимаю. Соответственно, узнать длину очереди в таком состоянии воркер не может.
Задача: мне прилетает здоровенный файл, я разбираю его на кусочки и запихиваю в очередь. По ту сторону очереди ждут 4 воркера. После того, как все задания будут обработаны, мне надо запустить некоторые агрегирующие процессы - поэтому я жду опустошения этой очереди. Но очередь опустеет в момент, когда последнее задание только ушло в обработку. К тому же, в теории, воркер может внезапно умереть (ни разу такого не было), и тогда это же задание по таймауту вернётся в очередь и его возьмет другой воркер.
Плюс, воркер в процессе собирает статистику, и должен как-то отдать её в конце работы. Я пока что решил это таким способом: напихиваю в конец очереди N (по числу воркеров) специальных заданий останова - воркер получает такое задание, отключается от очереди и делает всё необходимое.
Но вот как к этой конструкции приспособить агрегатные задачи? Или как изменить конструкцию, чтобы стало легко и просто жить?
( если это важно: агрегатным заданиям не нужно знать, какие именно задания были в очереди. )
Нужно разбираться в предметной области и текущей архитектуре. Возможно, лучше проверять результат работы воркеров, а не послыать сигнал, когда закончилось формирование очереди. Или, например, некий менеджер, который будет собирать информацию с воркеров, и по этим данным будет решать закончилась обработка или нет. Т.е. он знает сколько было отправлено заданий в очередь и от воркеров знает, сколько фактически сделано.
В варианте с заданием-стопером вы создаете зависимость между брокером и воркерами, брокер должен всегда точно знать сколько работает воркеров. Возможно в вашей архитектуре это и не проблема.
Другой момент, такое задание-стопер - это сигнал воркеру, который передается как обычное задание, нужно продумать ситуацию, когда задание-стопер не получилось распознать/обработать. Если ваша очередь останавливается при любой ошибке, то не проблема. Но, то что задание возвращается в очередь по тайм-ауту уже может добавить проблем.
Также, из-за того что эти задания не предназначены определенному воркеру, то один воркер может обработать 2 задания-стопера, а второму не достанется ни одного. Нужно, чтобы задания-стоперы были адресованы конкретному воркеру.
Или, например, некий менеджер, который будет собирать информацию с воркеров
Этот момент мне неясен. В моём представлении, это так:
1) есть некое оперативное хранилище ( например, Redis )
2) Паблишер туда пишет: "я поставил N заданий". И закончил работу.
3) Каждый Воркер в то же хранилище пишет свой инкрементный счётчик, сколько заданий он сделал.
4) Менеджер работает параллельно воркерам и считает сумму их счётчиков. Как только сумма сошлась с числом от паблишера, все задания обработаны.
Так?
В варианте с заданием-стопером вы создаете зависимость между брокером и воркерами, брокер должен всегда точно знать сколько работает воркеров.
Нет, брокер (beanstalkd) знает, сколько воркеров, но не умеет ставить задания. ( не буду дальше позориться и рассказывать устройство, и так вижу, что тут коряво вышло) . Но Вы правы, тут могла быть проблема из-за несоответствия количества воркеров и заданий-стопперов в очереди.
Теперь вижу, что подход в целом неверный. Если я боюсь утечек памяти, надо было брать пример с Апача: Менеджер мониторит количество воркеров и запускает новые, если нужно, а воркер обрабатывает фиксированное число заданий ( тысячу, 10 тысяч ) и завершается. Если хочется принудельно завершить воркера, надо смотреть в сторону сигналов.
Также, из-за того что эти задания не предназначены определенному воркеру, то один воркер может обработать 2 задания-стопера
Нет, к счастью, не может. воркер способен обрабатывать только одно задание за раз, и там всё так примитивно, что он точно завершится. Я бы скорее опасался сценария, что воркер почему-то не смог сообщить брокеру, что он обработал стоппер - тогда beanstalkd вернёт это задание в очередь по таймауту, и в очереди станет больше стопперов, чем живых воркеров.
скидки на товары. В каждой скидке свой перечень товаров, и если цены на какие-то товары из перечня поменялись, то скидку надо "пересчитать". ( сходу не скажу, почему). Из дизайна видно, что расточительно пересчитывать скидку после изменения каждого товара, который в неё входит; лучше дождаться всех изменений по товарам, и только после этого пересчитывать скидки.
Этот момент мне неясен. В моём представлении, это так:
1) есть некое оперативное хранилище ( например, Redis )
2) Паблишер туда пишет: "я поставил N заданий". И закончил работу.
3) Каждый Воркер в то же хранилище пишет свой инкрементный счётчик, сколько заданий он сделал.
4) Менеджер работает параллельно воркерам и считает сумму их счётчиков. Как только сумма сошлась с числом от паблишера, все задания обработаны.
Так?
Можно так. В этой схеме слабое звено, что мы считаем кол-во выполненных заданий, а не знаем какие выполнены. Если поставим инкрементацию кол-ва выполненных до финализации задачи в очереди, то при сбоях в воркере (если он упал после инкрементации, но до финализации задачи) кол-во выполненных может быть больше реального кол-ва задач, некрасиво, но работу не нарушит. Можно фиксировать id выполненных задач, а не просто их считать, тогда не будет расхождений, но особой нужды наверное в этом нет.
Надо учитывать, что когда происходит инкрементация кол-ва, фактически задача воркера уже выполнена, если он потом не сможет ее финализировать, а сделает это позже, это не важно. Т.е. повторное выполнение той же самой задачи воркером никак не должно нам мешать.
Но лучше смотреть не на задачи в очереди, а на выполненную работу. Допустим задачи меняют данные в строчках таблицы, тогда добавляем к каждой строчке updated_at или version, а некий демон периодически или по сигналу проверяет, что какие-то строчки обновились и для них нужно выполнить какие-то действия. Т.е. опираться на бизнесовые изменения, а не на техническую очередь.
не буду дальше позориться и рассказывать устройство, и так вижу, что тут коряво вышло
красиво, производительно и универсально сразу только у теоретиков выходит, на практике решение которое сегодня кажется отличным, завтра может принести множество проблем, из-за банальной ошибки или просто изменятся требования. так что не обращайте внимания, кто что сказал, если в этих словах не видите ничего полезного для себя.
Теперь вижу, что подход в целом неверный. Если я боюсь утечек памяти, надо было брать пример с Апача: Менеджер мониторит количество воркеров и запускает новые, если нужно, а воркер обрабатывает фиксированное число заданий ( тысячу, 10 тысяч ) и завершается. Если хочется принудельно завершить воркера, надо смотреть в сторону сигналов.
Я бы не был столь категоричным. Прям менеджер воркеров, не для сбора инфы с воркеров, а который управляет ими, это не очень тривиальная штука. Самый простой и эффективный вариант - это запуск воркеров через systemd unit, там же прописать их перезапуск при сбое. Упадет воркер - не проблема, его перезапустят. Главное проработать возможные ошибки, если воркер вылетел на середине задачи, а затем начал ее сначала. Но это в любом случае должно быть.
Я говорю про архитектуру, когда воркеры запущены постоянно, если нет задач, они простаивают. Их кол-во фиксированно и изменяется редко.
Если поставим инкрементацию кол-ва выполненных до финализации задачи в очереди, то при сбоях в воркере (если он упал после инкрементации, но до финализации задачи) кол-во выполненных может быть больше реального кол-ва задач, некрасиво, но работу не нарушит.
Тут только 2 варианта последовательности действий воркера:
1) увеличить свой счётчик, затем сообщить брокеру, что задача готова;
2) сообщить брокеру, что задача готова, затем увеличить свой счётчик.
На мой взгляд, второй вариант выглядит "более правильным". Но тогда возможна ситуация, когда сумма счётчиков будет меньше, чем количество выполненных задач.
В менеджер можно добавить такой вариант: если сумма счётчиков не меняется в течение какого-то времени, то спросить у брокера количество задач в очереди. Если там ноль, то можно считать, что все задачи выполнены.
Т.е. повторное выполнение той же самой задачи воркером никак не должно нам мешать.
Это добавляет баллов второму варианту. В нём, если воркер упал до того, как сообщить брокеру, что задача выполнена, счётчик не увеличится.
Главное проработать возможные ошибки, если воркер вылетел на середине задачи, а затем начал ее сначала.
За это отвечает брокер - если по таймауту задача не подтверждена, она автоматически возвращается в очередь. Это была одна из причин, почему я выбрал beanstalk, а не Redis Pub/Sub ( у Redis сейчас заявлен новый механизм, Streams , но мне он показался сложнее )
сообщить брокеру, что задача готова, затем увеличить свой счётчик
это плохой вариант, появляется вариант, когда задачи по факту выполнены, но весь процесс остановится, так как счетчик не заполнится полностью.
если сумма счётчиков не меняется в течение какого-то времени, то спросить у брокера количество задач в очереди
я так понимаю это не вариант, т.к. в очереди могут быть задачи из другой новой группы
За это отвечает брокер - если по таймауту задача не подтверждена, она автоматически возвращается в очередь. Это была одна из причин, почему я выбрал beanstalk, а не Redis Pub/Sub ( у Redis сейчас заявлен новый механизм, Streams , но мне он показался сложнее )
Это понятно, но и воркер должен работать так, что при повторной обработке задачи не было проблем.
Зато там есть потоки и группы и скорее всего через них можно решить текущую проблему, но нужно убедится, что можно определять что в группе нет заданий.
когда задачи по факту выполнены, но весь процесс остановится, так как счетчик не заполнится полностью.
В моём случае остановка на 5 минут - не страшно. ( Но для кого-то ещё, кто увидит это обсуждение, может быть важным, так что спасибо за замечание ).
я так понимаю это не вариант, т.к. в очереди могут быть задачи из другой новой группы
опять же, я прикинул - в моём случае это нормально. Было бы плохо, если бы задания прилетали в очередь достаточно плотно, но в моей реальности этого нет.
Это понятно, но и воркер должен работать так, что при повторной обработке задачи не было проблем.
В моём случае проблем нет. Обработка - это возможный апдейт значений. При повторной обработке даже апдейта не будет.
Если чтото сможет диагностировать, что произошел сбой, тогда да. Но, лучше если такой сбой вообще невозможен.
опять же, я прикинул - в моём случае это нормально. Было бы плохо, если бы задания прилетали в очередь достаточно плотно, но в моей реальности этого нет.
Опять же, сейчас так, но в будущем может изменится.
Тут, нужно оценивать критичность подобных сбоев, если решаете конкретную задачу, а не делаете универсальное решение для всех, то в некоторых случаях можно оставить нерешенными отдельные проблемы, если они требуют сложных и долгих решений.
По мне, так для того, чтобы знать когда задача обработана, достаточно отправлять сообщение об окончании в отдельную очередь "оповещения". Обработчик очереди будет слушать ее и может запустить "агрегирующие процессы".
Соответственно, узнать длину очереди в таком состоянии воркер не может.
Обработчики очередей не должны заботиться о длине очереди. Это не их ответственность. Они находятся в постоянной готовности обработать следующую задачу.
Плюс, воркер в процессе собирает статистику, и должен как-то отдать её в конце работы. Я пока что решил это таким способом: напихиваю в конец очереди N (по числу воркеров) специальных заданий останова - воркер получает такое задание, отключается от очереди и делает всё необходимое.
статистику можно сохранять в СУБД, отправлять в какую-либо отдельную очередь "отчеты" и пр. (в общем обмениваться через IPC).
достаточно отправлять сообщение об окончании в отдельную очередь "оповещения".
В моём случае это избыточно, достаточно использовать счётчик.
Решение, бывшее на старте, частично страдало от опыта предыдущего, однопоточного подхода: получил пачку заданий, в цикле обработал, потом подвёл итоги. С появлением параллельной обработки этот опыт стал непригоден, но от инерции мышления избавиться трудно