Ответы пользователя по тегу Kafka
  • Можно ли с помощью Kafka объединять результаты задач на базе общего id?

    Vamp
    @Vamp
    Вашу задачу в такой постановке вполне можно решить. Вот только склеиванием результатов придётся заниматься вручную.

    Создайте топик с результатами и в качестве ключа возьмите order_id. Далее считывайте результаты из топика и складывайте в коллекцию Map<Integer, Set<TaskResult>> (где Integer - order_id). Как только количество элементов в Set станет равным количеству ранее отправленных задач по данному order_id - можно считать, что все ответы получены и передавать их все разом на дальнейшую обработку.

    Останется только продумать крайние случаи. Например, нельзя до бесконечности ждать поступления всех результатов - external api может не ответить, а локальная задача вылететь с эксепшеном и не сгенерировать TaskResult. В этом случае количество ответов будет меньше количества отправленных задач. Придется прикручивать таймауты и/или отправлять задачи повторно. А что делать если вдруг ответов поступит больше, чем отправлялось запросов?

    Плюс ещё вопрос когда коммитить офсеты. Если сразу, то возникает опасность получить только половину результатов. Например, если сборщик результатов крашнется после того как соберёт первую половину и закоммитит её, то после рестарта он вычитает только вторую половину и никогда не соберёт полный ответ.

    Можно создавать отдельный топик под каждый order. Здесь упрощается обработка некоторых corner кейсов, но возникает проблема если order'ов много (сотни тысяч - миллионы).

    С kafka streams не работал, но бегло пробежав по документации, могу предположить, что комбинация groupByKey() + reduce() может решить вопрос меньшим количеством кода, чем у предыдущих двух вариантов.
    Ответ написан
    2 комментария