lxstvayne
@lxstvayne
Люблю Python

Как коммитить только одно сообщение в kafka-python?

Отложенные задачи помещаются в кафку, отдельный контейнер берёт доступные сообщения и запускает отложенные задачи в фоне, как только задача выполнилась, коммитится офсет, но проблема в том, что не факт, что задачи идут в порядке очереди, поэтому при коммите офсета могут закомититься задачи, которые не были ещё выполнены, соответственно при следующем перезапуске они потеряются. Надеюсь, вы меня поняли.

Вызываю следующий код при коммите офсета:
def commit_message(self, msg):
    offsets = {TopicPartition(msg.topic, msg.partition): OffsetAndMetadata(msg.offset + 1, None)}
    self._consumer.commit(offsets)


Вопрос, как мне коммитить только одно сообщение или, если я делаю что-то не так, как перестроить логику, чтобы сообщения не терялись при перезапуске?
  • Вопрос задан
  • 127 просмотров
Пригласить эксперта
Ответы на вопрос 1
mayton2019
@mayton2019
Bigdata Engineer
Проблема не очень понятна. Но вы можете сделать джобы полностью независимыми создав для каждого класса джобов свой топик.
Ответ написан
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы