Используется библиотека aiokafka, как в продюсере, так и консьюмере.
Сообщения продюсятся в транзакции.
Есть три брокера. Репликейшен фактор равен 3. min.insync.replicas=2.
Ситуация следующая:
1. Выходит из строя один из брокеров.
2. Продюсер пытается отправить сообщение и вылетает NotEnoughReplicasException.
3. С этим сообщением внутри библиотеки от крутится в ретраях, пока брокер не восстановится.
4. Он отправляет сообщение, но транзакция оборотится из-за протухшего тайм-аута транзакции.
5. Консьюмер получает сообщение, но выкидывает исключение: KeyError
KeyError случается из-за того, что он пытается удалить producer_id из множества aborted_producers, но его там нет и вылетает исключение.
Сама ошибка:
{"asctime": "2022-11-11 14:52:19,121", "process": 1, "levelname": "ERROR", "module": "aiokafka", "name": "common.apache_kafka.aiokafka", "funcName": "_consume", "lineno": 210, "message": "Error while consuming message", "exc_info": "Traceback (most recent call last):\n File \"/app/common/apache_kafka/aiokafka.py\", line 199, in _consume\n message = await self._client.getone()\n File \"/usr/local/lib/python3.8/site-packages/aiokafka/consumer/consumer.py\", line 1136, in getone\n msg = await self._fetcher.next_record(partitions)\n File \"/usr/local/lib/python3.8/site-packages/aiokafka/consumer/fetcher.py\", line 1030, in next_record\n message = res_or_error.getone()\n File \"/usr/local/lib/python3.8/site-packages/aiokafka/consumer/fetcher.py\", line 117, in getone\n msg = next(self._partition_records)\n File \"/usr/local/lib/python3.8/site-packages/aiokafka/consumer/fetcher.py\", line 198, in __next__\n return next(self._records_iterator)\n File \"/usr/local/lib/python3.8/site-packages/aiokafka/consumer/fetcher.py\", line 223, in _unpack_records\n self._aborted_producers.remove(next_batch.producer_id)\nKeyError: 197072"}
Есть подозрение, что из-за некорректных конфигов Кафки, он не может записать информацию о транзакции в топик __transaction_state и из-за этого консьюмер не может получить информацию о брокере, на котором случилась проблема.
При этом, ожидаемо, если isolation_lavel сделать равным read_uncommited, сообщение свободно прожевывается.
Буду благодарен, если возникнут мысли как справиться с проблемой. Пофиг даже если теоретические)