Какую выбрать архитектуру для обработки событий, когда возникают ситуации, что еще необработанное событие уже не нужно обрабатывать?
Давайте вместе подумаем над решением вот такой задачи.
Есть входящий поток прайслистов, которые нужно обновлять в базе.
Прайс проходит две стадии обработки.
1. Предобработка (нормализация)
2. Загрузка в базу
Поставщиков прайсов тысячи.
В среднем один прайс обновляется один раз в день.
Но возникают ситуации, когда один поставщик может прислать быстро два-три прайса и актуальный - это именно последний. Избежать предобработки каждого из прайсов не получится.
А вот в базу хотелось бы грузить не все.
Например, представьте такую хронологию. Это парйсы одного конкретного поставщика.
1 ноября 8:00 новый прайс
2 ноября 8:00 новый прайс
2 ноября 8:10 новый прайс
2 ноября 8:20 новый прайс
Один прайс - это несколько миллионов строк. Обработка и загрузка в базу - это трата вычислительных ресурсов и времени. Обычно после предобработки о непосредственно загрузкой в базу проходит 30-60 минут.
КЛЮЧЕВОЙ вопрос: как не грузить прайсы в базу, приступившие 2 ноября в 8:00 и 8:10, а загрузить только тот, который пришел в 8:20 ? Т.е. явно будет такой момент, когда все три прайса (8:00 8:10 и 8:20) уже прошли предобработку и ждут (видимо в какой-то очереди), когда они перейдут на второй этап загрузки. И вот тут хотелось не выполнять холостую работу, по загрузке прайсов 8:00 и 8:10, а загрузить сразу самый актуальный, поступивший в 8:20
На первый взгляд тут все просто.
Но вот дополнительные вводные:
- предобработчиков много и они работают в параллеле и друг о друге ничего не знают, тем более они не знают о загрузчиках в базу
- загрузчиков в базу много и они работают в параллеле и друг о друге ничего не знают
- ВАЖНО: чтобы было минимум блокировок и никаких deadlock'ов
Я сейчас не буду описывать наше решение.
Хотелось бы услышать от вас, как архитектурно вы бы решили эту задачу.
Стали бы все делать только в базе? Возьмете в помощники какую-нибудь систему очередей типа RabbitMQ? Все сделаете в ней или база тоже понадобится? Может быть вообще и не база и не кролик?
Если что - я не троллю. Это реальная задача и хочется понять, насколько мы правильно ее решили.
RabbitMQ позволяет выставить время жизни каждого отдельного задания.
Это подходит при более менее строгой периодичности заливки заданий.
В вашем случае перед заливкой очередного прайса очищать все очереди.
Либо рассовывать на обработку через промежуточную очередь в виде обычной БД.
В Rabbit отправлять из неё порциями после завершения обработки предыдущей порции.
Очищать очередь в БД при необходимости заливки очередного прайса.
Время жизни сообщения к кролике нам не подойдет, т.к. не известно, через сколько штатно будет обработан прайс.
Не понял идею про очистку всех очередей. Можете развить?
Очередь в чистой БД не очень хорошо. Как на чистой БД гарантировать, что два воркера не возьмутся за одну и ту же задачу? Блокировать таблицы? Не хочется :(
И я просто не понял, зачем тогда Rabbit? Пожалуйста, развейте тему. Вдруг это то, что нам нужно?
Я очищаю очереди в кроле пустыми воркерами, их задача просто принять и сразу же подтвердить задание. Очищает не моментально, но довольно быстро.
Про БД:
Все задания сначала складываются в промежуточную БД.
Из неё в кроля порциями допустим по 1000 заданий, в момент когда хотя бы один из воркеров встал без дела.
Как только подгружают новый прайс очистить все в промежуточной БД и залить в ней новые задаиния.
Можно сделать плагин для RabbitMQ. При добавлении задания в очередь будет проходить очередь и удалять задания от того же поставщика. Само собой, если поставщик уже на обработке, то нужно усложнять решение.
IMHO для такой задачи писать и поддерживать плагин - это из пушки по воробьям. Правда я не знаю, как писать плагины к кролику и окажется, что это 2 строчки кода на эрланге, но в любом случае в команде знакомых с эрлангом нет и в продакшене на такое решение я не пойду.
Можно сохранять в базе версию прайса (timestamp его появления, например) и пробрасывать её с задачей. Когда задача приходит на этап загрузки в базу - сверять версию с актуальной, и если меньше - отбрасывать её. Возможна ситуация, когда уже после начала загрузки приходит новый прайс, тогда для прерывания загрузки устаревшего прайса можно в процессе загрузки периодически ходить в базу и сверять версию. Очереди можно гонять через rabbitmq, если он уже есть в проекте, или запилить в базе, если его нет и плодить сущности не хочется.