Уважаемые знатоки мультиплексорного программирования под java (в частности, netty)
Передо мной стоит следующая задача:
- есть внешний сервер apache kafka (упрощённо - это очередь бинарных сообщений с собственным курсором для каждого подключения; то есть для тех, кто не знаком с кафкой, можно рассматривать например базу данных mysql, суть вопроса это не меняет)
- есть заранее неизвестное количество внешних потребителей этих данных из кафки (ориентируемся на 100+), независимых друг от друга (то есть поток данных от одного мы не можем использовать для другого)
- нужен некий модуль, который будет принимать подключения от этих потребителей, подписываться отдельным соединением на каждого из них в кафку (для создания независимых курсоров), вычитывать данные из кафки, что-то с ними делать, затем передавать преобразованные данные потребителям и управлять курсором кафки (делать коммит) по получению обратной связи от потребителя
Решение в лоб:
- Создаю слушающее tcp гнездо и/или гнездо unix domain socket
- Принимаю соединение от потребителя
- Создаю новый поток для работы с этим потребителем
- В этом потоке слушаю и обрабатываю входящие команды от потребителя
- Создаю ещё один поток для подключения к кафке (использую штатный драйвер apache)
- В этом потоке делаю poll пачки данных из кафки, обрабатываю её и отправляю потребителю
Недостаток этого подхода в огромном количестве потоков и потерях на переключении контекста (величину не оценивал). Расчётные нагрузки - порядка 5-10к сообщений в секунду на одного потребителя.
У задачи есть две особенности, упрощающих решение:
- в самом модуле обработка сообщений очень быстрая
- сообщения обрабатываются только пачками, причём следующая пачка модулем будет читаться только после того, как предыдущая была полностью обработана потребителем, и был от него получен коммит
То есть в идеале я вижу архитектуру модуля, как несколько мультиплексорных потоков, каждый из которых обрабатывает несколько десятков потребителей. Поскольку сам я пришёл из мира node.js, там подобное решается на раз-два. К сожалению, в данном случае использование node.js не представляется возможным.
Попробовал использовать для этих целей netty... Но постоянно натыкаюсь на различные подводные камни.
В частности, хотелось бы иметь возможность перевести на мультиплексоры netty соединения с кафкой без плясок с бубном и переписывания драйвера кафки (подозреваю, не получится)
Или даже самое простое, при попытке поллинга из кафки в том же потоке, что и обработка команд клиента, получаю блокировку потока:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
switch (messageName) {
case "init":
initKafkaConsumer(message);
consume(ctx);
break;
case "commit":
commit(ctx);
consume(ctx);
break;
default:
throw new BotlaneException("No handler for message " + messageName);
}
}
private void consume(ChannelHandlerContext ctx) {
ConsumerRecords<String, KafkaAvroRawDeserializer.Result> records;
do {
records = consumer.poll(Duration.ofMillis(pollTimeout));
} while (records.isEmpty() && !closed);
...
}
Здесь, например, после инициализации соединения и отправки первого батча, ни коммит от клиента, ни даже разрыв соединения в channelUnregistered не будет обработано из-за poll и цикла. Но как по другому решить, пока не придумал
А как вы решали подобные задачи? Если можно, с примером кода. Буду очень благодарен