@Dik_Nik

Как избежать ошибки консьюмера после NotEnoughReplicasException в продюсере aiokafka?

Используется библиотека aiokafka, как в продюсере, так и консьюмере.
Сообщения продюсятся в транзакции.
Есть три брокера. Репликейшен фактор равен 3. min.insync.replicas=2.
Ситуация следующая:
1. Выходит из строя один из брокеров.
2. Продюсер пытается отправить сообщение и вылетает NotEnoughReplicasException.
3. С этим сообщением внутри библиотеки от крутится в ретраях, пока брокер не восстановится.
4. Он отправляет сообщение, но транзакция оборотится из-за протухшего тайм-аута транзакции.
5. Консьюмер получает сообщение, но выкидывает исключение: KeyError
KeyError случается из-за того, что он пытается удалить producer_id из множества aborted_producers, но его там нет и вылетает исключение.
Сама ошибка:
{"asctime": "2022-11-11 14:52:19,121", "process": 1, "levelname": "ERROR", "module": "aiokafka", "name": "common.apache_kafka.aiokafka", "funcName": "_consume", "lineno": 210, "message": "Error while consuming message", "exc_info": "Traceback (most recent call last):\n  File \"/app/common/apache_kafka/aiokafka.py\", line 199, in _consume\n    message = await self._client.getone()\n  File \"/usr/local/lib/python3.8/site-packages/aiokafka/consumer/consumer.py\", line 1136, in getone\n    msg = await self._fetcher.next_record(partitions)\n  File \"/usr/local/lib/python3.8/site-packages/aiokafka/consumer/fetcher.py\", line 1030, in next_record\n    message = res_or_error.getone()\n  File \"/usr/local/lib/python3.8/site-packages/aiokafka/consumer/fetcher.py\", line 117, in getone\n    msg = next(self._partition_records)\n  File \"/usr/local/lib/python3.8/site-packages/aiokafka/consumer/fetcher.py\", line 198, in __next__\n    return next(self._records_iterator)\n  File \"/usr/local/lib/python3.8/site-packages/aiokafka/consumer/fetcher.py\", line 223, in _unpack_records\n    self._aborted_producers.remove(next_batch.producer_id)\nKeyError: 197072"}


Есть подозрение, что из-за некорректных конфигов Кафки, он не может записать информацию о транзакции в топик __transaction_state и из-за этого консьюмер не может получить информацию о брокере, на котором случилась проблема.
При этом, ожидаемо, если isolation_lavel сделать равным read_uncommited, сообщение свободно прожевывается.
Буду благодарен, если возникнут мысли как справиться с проблемой. Пофиг даже если теоретические)
  • Вопрос задан
  • 222 просмотра
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы
02 дек. 2024, в 20:03
75000 руб./за проект
02 дек. 2024, в 19:15
10000 руб./за проект
02 дек. 2024, в 18:47
300000 руб./за проект