Как балансировать рассылку сообщений в Kafka на примере микросервисов на Nest.js?
Я создал приложение и несколько инстансов микросервиса Nest.js, которые взаимодействуют через Kafka.
Сейчас первый запущенный микросервис принимает сообщения, а второй и последующие копии - не получают сообщения.
Если я закрою первый микросервис, то спустя несколько секунд, сообщения начнёт получать тот, который подключился вторым.
Как мне разбалансировать рассылку сообщений, что б были задействованы все микросервисы? Пример с официально сайта Nest.js взял по взаимодействию с кафкой.
Любой Kafka-consumer имеет атрибут group.id. И если у нескольких консюмеров он совпадает
то они начинают делить топик поровну (по round-robin). В более расширенном варианте - нужно
настраивать поля самого сообщения чтоб был более умный партишенинг.
Несмотря на то что Кафка работает быстро, сам процесс включения и перебалансировки консюмеров
может быть не очень быстрым поэтому такие манипуляции следует делать не очень часто.
mayton2019 вот у меня как раз сейчас они не рандомно получают, а именно тот, кто подключился первый. У них один groupId.
Если каждой копии микросервиса даю свой groupId- то принимают все сразу,
а мне хочется балансировки, что б взял 10 копий поднял на каждое ядро по одному, и каждый запрос от основной аппки рандомно обрабатывал кто-то из них
И вот я пишу туда. Код на Java но суть - одинаковая во всех Kafka клиентах.
Не обращай внимания на транзакции. Смотри как я генерирую partition некое число
которое просто от 0 до 3 в диапазоне. В данном случае оно не несет никакого смысла.
Главное что оно распределяет месседжи на 4 partitions. Формулу распределения
ты можешь сам создать например исходя из бизнес-смыслов.
try(KafkaProducer<String, String> producer = new KafkaProducer<>(properties)) {
ProducerCallback producerCallback = new ProducerCallback();
Random r = new Random();
ProducerRecord<String, String> kafkaRecord;
producer.initTransactions();
try {
for (int i = begin; i < begin + size; i++) {
UUID key = UUID.randomUUID();
int value = (int) (30 * r.nextGaussian());
int partition = Math.abs(value % 4);
kafkaRecord = new ProducerRecord(
topic,
partition,
System.currentTimeMillis(),
key,
value);
producer.send(kafkaRecord, producerCallback);
}
producer.commitTransaction();