Как избавиться от дублирующихся сообщений в очереди RabbitMq?
Добрый день.
Есть два сервиса. Необходимо, чтобы один сервис при изменении какой-то сущности сообщал об этом другому сервису.
Сейчас обмен происходит с помощью сложных велосипедов и хранением очереди в MySql. Планируется делать это с помощью RabbitMQ, но есть некоторые проблемы.
Первый сервис меняет данные довольно часто (может быть десятки раз в минуту), а второй сервис получает эти данные с периодичность в 2 минуты.
Если класть каждое изменение сущности в очередь RabbitMQ, то при обработке очереди вторым сервисом один и тот же элемент будет обрабатываться многократно.
Как можно реализовать уникальность на момент начала обработки очереди?
Читать всю очередь и циклами группировать элементы - затратно и как-то костыльно.
Если читать по одному элементу, обрабатывать, и при чтении следующего проверять, был ли он обработан, и если был, просто помечать прочитанным, могут потеряться данные, когда мы обработали первый элемент, а уже после этого произошло обновление.
Существует ли в RabbitMQ какой-то механизм, решающий эту проблему?
Если есть какие-то другие предложения без написания своих велосипедов, тоже интересно было бы услышать.
Rabbit не позволяет заменять одно сообщение другим или каким-то другим образом произвести редактирование сообщение. Единственный вариант это фильтровать сообщения на уровне consumer
Использовать symfony lock например или написать простенький сервис для реализации mutex с помощью редиса. Это не костыль, это распространённая практика, которая пригодится не только для очередей, но и для предотвращения других проблем, связанных с многопоточностью, например когда на бэк в несколько потоков загружаются много файлов и надо на основе метаданных из файла создавать таксономийные сущности (может быть конфликт, что такая сущность уже есть).
Принцип такой: перед постановкой задачи получаешь блокировку $mutex->lock($key), если блокировку забрал кто-то другой, то такая задача уже в процессе выполнения, если нет, то ставишь задачу. После выполнения задачи, она снимает блокировку, либо блокировка снимается по TTL в редисе.