Kafka Consumer re-consume незакомиченные сообщения?

На сервисе реализована идемпотентная кафка.
Настройки app.yaml
# Kafka CONSUMER
spring:
  kafka:
    consumer:
      bootstrap-servers: localhost:29092
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      isolation-level: read_committed
      enable-auto-commit: false
      auto-offset-reset: earliest
    # Kafka PRODUCER
    producer:
      bootstrap-servers: localhost:29092
      transaction-id-prefix: tx-
      properties:
        enable.idempotence: true
        acks: all
        retries: 3
        max.in.flight.requests.per.connection: 5
        max.block.ms: 10000
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer


Listener:
@KafkaListener(topics = "#{'${topic.consume_topic}'}")
  public void listenInbound(MessageDto messageDto) {
    processService.handleMessage(messageDto);
  }


Сценарий такой :
Консюмер вычитывает сообщение, передает его на обработку и если происходит ошибка , то кафка консьюмер откатывает и оставляет незакомиченным. Пробелема в том, что Lisener видит что сообщение незакомиченное и пытается вычитать его снова , из-за этого в эластик попадают дублирующие данные.

Можно ли как-то указать кафка, что не нужно пытаться вычитывать ошибочные сообщения повторно или какую логику обычно реализовывают в таких сценариях?
  • Вопрос задан
  • 91 просмотр
Пригласить эксперта
Ответы на вопрос 1
@mayton2019
Bigdata Engineer
Консюмер вычитывает сообщение, передает его на обработку и если происходит ошибка , то кафка консьюмер откатывает и оставляет незакомиченным.

Покажи код консьюмера. Это твоя логика. И ты решаешь где тебе делать фиксацию пачки и где не делать.
Ответ написан
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы