Отложенные задачи помещаются в кафку, отдельный контейнер берёт доступные сообщения и запускает отложенные задачи в фоне, как только задача выполнилась, коммитится офсет, но проблема в том, что не факт, что задачи идут в порядке очереди, поэтому при коммите офсета могут закомититься задачи, которые не были ещё выполнены, соответственно при следующем перезапуске они потеряются. Надеюсь, вы меня поняли.
Вызываю следующий код при коммите офсета:
def commit_message(self, msg):
offsets = {TopicPartition(msg.topic, msg.partition): OffsetAndMetadata(msg.offset + 1, None)}
self._consumer.commit(offsets)
Вопрос, как мне коммитить только одно сообщение или, если я делаю что-то не так, как перестроить логику, чтобы сообщения не терялись при перезапуске?