sortarage
@sortarage
Я тучка-тучка-тучка, я вовсе не медведь

Можно ли с помощью Kafka объединять результаты задач на базе общего id?

Можно ли, используя Kafka, объединить результаты задач в single data entry (например, JSON) на базе определенного параметра (например, order_id)?

То есть, падает в систему order в виде JSON, для его обработки нужно выполнить N задач - какие-то локальные, какие-то ждут ответа от external API, то есть разное время выполнения. Задачи загружаются в Kafka, которая стучится в микросервисы. Каждая задача получает результат тоже в JSON, включая order_id.

Собственно, вопрос можно ли объединить результаты этих задач автоматически, сразу после того как последняя задача order’а выполнена? Если да, то как?

Если я неправильно уловил логику/порядок событий/etc. - пните. Если неправильно выбрал инструмент и это лучше реализовать через решение X - тоже. В остальном, буду благодарен за любой topic/stream-based совет.
  • Вопрос задан
  • 252 просмотра
Решения вопроса 1
Vamp
@Vamp
Вашу задачу в такой постановке вполне можно решить. Вот только склеиванием результатов придётся заниматься вручную.

Создайте топик с результатами и в качестве ключа возьмите order_id. Далее считывайте результаты из топика и складывайте в коллекцию Map<Integer, Set<TaskResult>> (где Integer - order_id). Как только количество элементов в Set станет равным количеству ранее отправленных задач по данному order_id - можно считать, что все ответы получены и передавать их все разом на дальнейшую обработку.

Останется только продумать крайние случаи. Например, нельзя до бесконечности ждать поступления всех результатов - external api может не ответить, а локальная задача вылететь с эксепшеном и не сгенерировать TaskResult. В этом случае количество ответов будет меньше количества отправленных задач. Придется прикручивать таймауты и/или отправлять задачи повторно. А что делать если вдруг ответов поступит больше, чем отправлялось запросов?

Плюс ещё вопрос когда коммитить офсеты. Если сразу, то возникает опасность получить только половину результатов. Например, если сборщик результатов крашнется после того как соберёт первую половину и закоммитит её, то после рестарта он вычитает только вторую половину и никогда не соберёт полный ответ.

Можно создавать отдельный топик под каждый order. Здесь упрощается обработка некоторых corner кейсов, но возникает проблема если order'ов много (сотни тысяч - миллионы).

С kafka streams не работал, но бегло пробежав по документации, могу предположить, что комбинация groupByKey() + reduce() может решить вопрос меньшим количеством кода, чем у предыдущих двух вариантов.
Ответ написан
Пригласить эксперта
Ответы на вопрос 2
@freeg0r
.. some dude ..
Вопрос сформулирован не понятно. Но в общем Кафка ничего не обьеденяет и никуда не стучится. Кафка это просто распределенный data feeds, очередь сообщений с гарантированной доставкой.
Ответ написан
mayton2019
@mayton2019
Bigdata Engineer
Автор не правильно формулирует задачу.
Ответ написан
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы