@alexg-nn

Как организовать дедупликацию сообщений?

Привет!

На уровне сервиса хочу организовать дедупликацию входящих от брокера сообщений.

Вводные:
1. У сообщения есть уникальный ID
2. У сообщения есть дата отправки продюсером.

Задачи:
1. Отклонить сообщение, если ID уже зарегистрирован.
2. Отклонить сообщение, если его дата старее, чем последняя дата принятого сообщения (этим добиваемся исключения ошибок в очерёдности доставки).

Решения:
Табличка Mysql видится вроде бы достаточно простой:
| ID | Date | Message data |

Первая задача решается просто из природы primary key во время операции INSERT.
Как решить вторую, чтобы избежать race condition? Я так понимаю, без каких-то локов тут не обойтись? В идеале хотелось бы видеть какую то атомарную операцию "вставил новую строку или отклонил". Смотрел на вариант INSERT...SELECT, но я так понимаю, между селектом и вставкой может вклиниться другая операция вставки и всё пойдёт насмарку.
  • Вопрос задан
  • 150 просмотров
Пригласить эксперта
Ответы на вопрос 3
Rsa97
@Rsa97
Для правильного вопроса надо знать половину ответа
Можно повесить триггер BEFORE INSERT, проверять в нём дату и выбрасывать при необходимости ошибку.
Можно так:
INSERT INTO `table` (`id`, `date`, `message`)
  (
    SELECT `t`.`id`, `t`.`date`, `t`.`message`
      FROM (SELECT :id AS `id`, :date AS `date`, :message AS `message`) AS `t`
      JOIN (SELECT MAX(`date`) AS `date` FROM `table`) AS `m`
      WHERE `t`.`date` > `m`.`date`
  )
Ответ написан
2ord
@2ord
Можно вот так производить вставки:
INSERT INTO `table1` (`date`, `message`) VALUES (?, ?)
ON DUPLICATE KEY UPDATE
    message = IF(VALUES(`date`) > `date`, VALUES(message), message),
    `date`  = IF(VALUES(`date`) > `date`, VALUES(`date`), `date`)

Это позволяет иметь много консьюмеров, обновляющих данные в БД.
IF можно заменить на конструкцию CASE
Ответ написан
@alexg-nn Автор вопроса
Пока не придумал ничего лучше, чем блокирующую транзакцию

SET TRANSACTION ISOLATION LEVEL SERIALIZABLE
BEGIN TRAN
INSERT INTO messages (id,date,msg) VALUES(?,?,?)
SELECT id FROM messages ORDER BY date DESC LIMIT 1
COMMIT TRAN

И потом на уровне приложения проверить, что ID из последнего селекта равен ID переданому в инсерт.
Ответ написан
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы