Не работал с Kafka, но на сколько я знаю, она синхронная до безобразия. По крайней мере в вопросе подписки. В голову приходят два способа решить проблему интеграции с асинхронным Netty.
Можно в инициализаторе конвейера или обработчике клиентского соединения запускать периодическую задачу, которая будет опрашивать очередь с нулевым таймаутом:
eventLoop.schedule(() -> {
ConsumerRecords<String, String> records = consumer.poll(Duration.ZERO);
// Какие-либо действия
}, 100, TimeUnit.MILLISECONDS);
Но этот вариант обрушит на сервер Kafka шквал запросов.
Другой вариант - это сделать костыль в виде дополнительной очереди, в которую отправлять сообщения о том, что в какой-либо из клиентских очередей появилось сообщение. Тогда можно в одном потоке заблокировать ожидание сообщений из этой очереди, а при получении порождать событие в цикле событий Netty:
class MessageListener implements Runnable {
private final ChannelGroup group;
private volatile boolean run = true;
public MessageListener(ChannelGroup group) {
this.group = group;
}
public void run() {
while(run) {
ConsumerRecords<String, String> records = notificationConsumer.poll(Duration.ofSecond(5));
if (!records.isEmpty())
group.forEach(c -> c.pipeline().fireUserEventTriggered(new NewMsgEvent()));
}
}
public void stop() {
run = false;
}
}
class SomeHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if(evt instanceof NewMsgEvent) {
ConsumerRecords<String, String> records = clientConsumer.poll(Duration.ZERO);
records.forEach(record -> {
ctx.write(Unpooled.wrappedBuffer(record.value().getBytes(StandardCharsets.UTF_8)));
});
ctx.flush();
}
else {
super.userEventTriggered(ctx, evt);
}
}
}
Можно эту идею немного доработать, передавая в очереди уведомлений информацию о том, в какой именно из клиентских очередей появилось новое сообщение, чтобы
MessageListener
мог отправить событие только в один нужный конвейер или чтобы только нужный обработчик на событие отреагировал.