На сервисе реализована идемпотентная кафка.
Настройки app.yaml
# Kafka CONSUMER
spring:
kafka:
consumer:
bootstrap-servers: localhost:29092
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
isolation-level: read_committed
enable-auto-commit: false
auto-offset-reset: earliest
# Kafka PRODUCER
producer:
bootstrap-servers: localhost:29092
transaction-id-prefix: tx-
properties:
enable.idempotence: true
acks: all
retries: 3
max.in.flight.requests.per.connection: 5
max.block.ms: 10000
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
Listener:
@KafkaListener(topics = "#{'${topic.consume_topic}'}")
public void listenInbound(MessageDto messageDto) {
processService.handleMessage(messageDto);
}
Сценарий такой :
Консюмер вычитывает сообщение, передает его на обработку и если происходит ошибка , то кафка консьюмер откатывает и оставляет незакомиченным. Пробелема в том, что Lisener видит что сообщение незакомиченное и пытается вычитать его снова , из-за этого в эластик попадают дублирующие данные.
Можно ли как-то указать кафка, что не нужно пытаться вычитывать ошибочные сообщения повторно или какую логику обычно реализовывают в таких сценариях?