Проблема: нужно синхронизировать мои данные со сторонними сервисами.
Дано:
- Мои данные ClientId, CustomerId, CustomerEmail, CustomerPoints
- Четыре (в перспективе до 20) сервиса с которыми нужно синхронизировать мои данные
- Примерно 15 000 клиентов (ClientId), у клиентов от 1000 до 500 000 customer-ов
- У каждого сервиса есть rate-limit. Доступ к каждому сервису для каждого клиента осуществляется по приватному ключу, т.е. rate-limit индивидуальный для связки ClientId-ServiceName
Текущее решение:
- Есть kafka со 100 партициями, партиция это ClientId % 100 = (0 - 99)
- При обновлении данных customer-а в kafka topic пишется сообщение о том чей он клиент (ClientId) и какой у него id
- Проверяется с какими сервисами у клиента настроена синхронизация
- Запускается параллельно синхронизация со всеми активными сервисами
- При неудаче синхронизация повторяется до тех пор пока не выполнится (rate-limit и отказы сервисов учитываются именно сдесь)
Какие сейчас есть проблемы:
- При подключении нового сервиса клиентом, мне необходимо синхронизировать всех его customer-ов с этим сервисом, а их у него может быть до 500 000. Партиция забьётся и те клиенты которым не повезло оказаться с ним в одной партиции будут очень долго синхронизироваться учитывая все rate-limit-ы.
- При достижении rate-limit-а у определённого клиента на каком либо из сервисов, страдает синхронизация для всех остальных сервисов и для всех клиентов сообщения от которых тоже находятся в этой партиции.
- Если сторонний сервис выходит из строя на 20 минут, то каждая партиция в которой появится сообщение для клиента данные которого необходимо синхронизировать с этим сервисом "зависнет" на 20 минут.
Какое решение для очереди сообщений мне нужно реализовать (найти готовое):
- Неограниченное количество партиций
- Партиция должна быть строкой или составным значением (ClientId, ServiceName)
- Каждая партиция должна обрабатываться конкретным consumer-ом
- Когда в партиции заканчиваются данные она пропадает и consumer переключается на обработку другой
- Возможность повторной обработки сообщений в случае ошибок
- Проверка есть ли в очереди сообщение по его ключу и значению (это не обязательно, но желательно что-бы избежать дубликатов)
- Проверка размера необработанных сообщений в определённой партиции (это не обязательно, но желательно)
Сразу отброшу kafka т.к. нет возможности заранее узнать все партиции и держать огромное их количество.
Рассматривал Apache Pulsar, но там количество партиций задаётся глобально в сетингах что тоже не подходит, или я невнимательно изучал этот вопрос, если так то поправьте.
RabbitMQ совсем как мне кажется про другое.
В голову лезут мысли про свой собственный велосипед поверх:
- redis streams
- redis hset+list
- mongodb (но тут точно возникнут проблемы с выполнением на нескольких серверах)
- sqs + в памяти разбивать по очередям для каждого ClientId+ServiceName (плохо параллелится и возможны проблемы по памяти)
Может у кого-то были схожие задачи и есть готовые реализации подобных очередей?