Задать вопрос
@TheRoSS

Java+netty+kafka: как перейти от многопоточности к мультиплексированию?

Уважаемые знатоки мультиплексорного программирования под java (в частности, netty)
Передо мной стоит следующая задача:
- есть внешний сервер apache kafka (упрощённо - это очередь бинарных сообщений с собственным курсором для каждого подключения; то есть для тех, кто не знаком с кафкой, можно рассматривать например базу данных mysql, суть вопроса это не меняет)
- есть заранее неизвестное количество внешних потребителей этих данных из кафки (ориентируемся на 100+), независимых друг от друга (то есть поток данных от одного мы не можем использовать для другого)
- нужен некий модуль, который будет принимать подключения от этих потребителей, подписываться отдельным соединением на каждого из них в кафку (для создания независимых курсоров), вычитывать данные из кафки, что-то с ними делать, затем передавать преобразованные данные потребителям и управлять курсором кафки (делать коммит) по получению обратной связи от потребителя

Решение в лоб:
- Создаю слушающее tcp гнездо и/или гнездо unix domain socket
- Принимаю соединение от потребителя
- Создаю новый поток для работы с этим потребителем
- В этом потоке слушаю и обрабатываю входящие команды от потребителя
- Создаю ещё один поток для подключения к кафке (использую штатный драйвер apache)
- В этом потоке делаю poll пачки данных из кафки, обрабатываю её и отправляю потребителю

Недостаток этого подхода в огромном количестве потоков и потерях на переключении контекста (величину не оценивал). Расчётные нагрузки - порядка 5-10к сообщений в секунду на одного потребителя.

У задачи есть две особенности, упрощающих решение:
- в самом модуле обработка сообщений очень быстрая
- сообщения обрабатываются только пачками, причём следующая пачка модулем будет читаться только после того, как предыдущая была полностью обработана потребителем, и был от него получен коммит

То есть в идеале я вижу архитектуру модуля, как несколько мультиплексорных потоков, каждый из которых обрабатывает несколько десятков потребителей. Поскольку сам я пришёл из мира node.js, там подобное решается на раз-два. К сожалению, в данном случае использование node.js не представляется возможным.

Попробовал использовать для этих целей netty... Но постоянно натыкаюсь на различные подводные камни.
В частности, хотелось бы иметь возможность перевести на мультиплексоры netty соединения с кафкой без плясок с бубном и переписывания драйвера кафки (подозреваю, не получится)

Или даже самое простое, при попытке поллинга из кафки в том же потоке, что и обработка команд клиента, получаю блокировку потока:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    switch (messageName) {
            case "init":
                initKafkaConsumer(message);
                consume(ctx);
                break;
            case "commit":
                commit(ctx);
                consume(ctx);
                break;
            default:
                throw new BotlaneException("No handler for message " + messageName);
        }
}

private void consume(ChannelHandlerContext ctx) {
    ConsumerRecords<String, KafkaAvroRawDeserializer.Result> records;
    do {
        records = consumer.poll(Duration.ofMillis(pollTimeout));
    } while (records.isEmpty() && !closed);

    ...
}

Здесь, например, после инициализации соединения и отправки первого батча, ни коммит от клиента, ни даже разрыв соединения в channelUnregistered не будет обработано из-за poll и цикла. Но как по другому решить, пока не придумал

А как вы решали подобные задачи? Если можно, с примером кода. Буду очень благодарен
  • Вопрос задан
  • 875 просмотров
Подписаться 1 Сложный 1 комментарий
Решения вопроса 1
sergey-gornostaev
@sergey-gornostaev Куратор тега Java
Седой и строгий
Не работал с Kafka, но на сколько я знаю, она синхронная до безобразия. По крайней мере в вопросе подписки. В голову приходят два способа решить проблему интеграции с асинхронным Netty.

Можно в инициализаторе конвейера или обработчике клиентского соединения запускать периодическую задачу, которая будет опрашивать очередь с нулевым таймаутом:
eventLoop.schedule(() -> {
   ConsumerRecords<String, String> records = consumer.poll(Duration.ZERO);
   // Какие-либо действия
}, 100, TimeUnit.MILLISECONDS);

Но этот вариант обрушит на сервер Kafka шквал запросов.

Другой вариант - это сделать костыль в виде дополнительной очереди, в которую отправлять сообщения о том, что в какой-либо из клиентских очередей появилось сообщение. Тогда можно в одном потоке заблокировать ожидание сообщений из этой очереди, а при получении порождать событие в цикле событий Netty:
class MessageListener implements Runnable {
    private final ChannelGroup group;
    private volatile boolean run = true;

    public MessageListener(ChannelGroup group) {
        this.group = group;
    }

    public void run() {
        while(run) {
            ConsumerRecords<String, String> records = notificationConsumer.poll(Duration.ofSecond(5));
            if (!records.isEmpty())
                group.forEach(c -> c.pipeline().fireUserEventTriggered(new NewMsgEvent()));
        }
    }

    public void stop() {
        run = false;
    }
}

class SomeHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if(evt instanceof NewMsgEvent) {
            ConsumerRecords<String, String> records = clientConsumer.poll(Duration.ZERO);
            records.forEach(record -> {
                ctx.write(Unpooled.wrappedBuffer(record.value().getBytes(StandardCharsets.UTF_8)));
            });
            ctx.flush();
        }
        else {
            super.userEventTriggered(ctx, evt);
        }
    }
}

Можно эту идею немного доработать, передавая в очереди уведомлений информацию о том, в какой именно из клиентских очередей появилось новое сообщение, чтобы MessageListener мог отправить событие только в один нужный конвейер или чтобы только нужный обработчик на событие отреагировал.
Ответ написан
Пригласить эксперта
Ответы на вопрос 1
@AlexHell
Я прочитал ответ от Сергей Горностаев в принципе согласен со 2м вариантом, если я его понял, но дополню как бы сделал я:
- кто-то отправляет сообщение к netty серверу "write"
- netty делает добавление в InMemory очередь для kafka (мгновенно)
- netty оповещает остальным заинтересованным (если клиент уже приконекчен, иначе он получит все пакеты в момент конекта) TCP пакет "notify" (мгновенно) чтобы потом клиент послал "readAll" если данные сразу слать не хочется, либо сразу пакет со всеми данными ему шлется предназначенными ему
- (если клиент еще не был приконекчен) заинтересованный клиент конектится и netty отдает ему все его сообщения (не комитит пока) из InMemory
- клиент шлет "commit" - netty фиксирует в InMemory до какого сообщения клиент дочитал, и отправляет в очередь также в kafka

по сути получается одна лишняя очередь, куда сохраняются KafkaTask
и еще текущее состояние InMemory (видимо с удалением когда клиент уж точно прочитал)

в этой очереди из InMemory в отдельном потоке или даже пуле, как и при БД - происходит запись в kafka реально для персистентности
я правильно понял цели?

если бы не персистентность то можно было бы без kafka обойтись даже и просто клиент шлет "write" и всем остальным клиентам рассылается "data" если они уже онлайн или складывается в InMemory и только в момент приконекта клиента отдается

когда персистентность нужна - добавляется только лишняя очередь чтоб в нее в итоге сохранялось, остальное все то же, и при перезапуске сервака - данные из kafka (или БД) бы восстанавливались в InMemory (если их не прям очень много, иначе может отложенным таском в потоке другом)
Ответ написан
Комментировать
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы