@Dogrtt
Qt/Python разработчик

Как правильно организовать связь между сервисами смешанного типа и оповещать пользователя о завершении процесса?

Я вовлечен в создание системы состоящей из нескольких клиентов и сервисов. В качестве клиентов через которые пользователь взаимодействует с системой выступают плагины для настольных приложений и WebUI. Весь список сервисов я указывать не буду, перечислю только те, что важны для обозначения ситуации:

  • Kong API Gateway за которым всё прячется;
  • Сервис работы с базой данных (MongoDB), написан на FastAPI, представляет из себя обычный REST API;
  • Сервис обработки информации, сейчас это тоже FastAPI, порождающий Celery задачи, в которых я запрашиваю файлы из сервиса БД и обрабатываю их;
  • Celery сервис, который принимает задачи и выполняет их (в качестве брокера используется Redis);
  • Сервис написанный на NestJS обслуживающий WebUI.

В текущей версии стандартный порядок работы следующий:
  1. Пользователь с помощью плагина отправляет HTTP запрос с данными к сервису БД;
  2. Сервис БД валидирует информацию, если всё хорошо, пишет в БД;
  3. После создания новой сущности в MongoDB, из сервиса БД отправляется запрос к сервису обработки информации (для конвертации в формат по умолчанию). Ответ не ожидается;
  4. Сервис обработки информации получает запрос и если никто до этого уже не проводил эту же операцию, создает Celery задачу;
  5. Celery сервис производит конвертацию, помещает результат в репозиторий и на этом всё;
  6. Пользователь открывает WebUI и видит сохранённую в БД сущность (сервис NestJS делает запрос к сервису БД), в окне просмотра отображается модель полученная в процессе конвертации (сервис NestJS делает запрос к сервису обработки информации);
  7. Если пользователь хочет конвертировать сущность не в формат по умолчанию, а в какой-нибудь специфический, то он делает запрос к эндпоинту сервиса обработки информации;
  8. Сервис обработки информации создаёт Celery задачу и отдаёт в ответе пользователю её task_id;
  9. Сервис NestJS периодически спрашивает сервис обработки информации о состоянии Celery задачи;
  10. Когда конвертация заканчивается и статус задачи меняется на SUCCEEDED пользователь запрашивает результат у сервиса обработки данных.

Было принято решение перейти от подобной системы, использующей для общения между сервисами HTTP запросы к общению через очередь сообщений. В качестве инструмента для реализации был выбран NATS Streaming. Сейчас, на этапе продумывания изменений возникают вопросы, на которые я, в силу своей неопытности, не могу объективно ответить самому себе.

Например: Если я после создания сущности в БД, помещу сообщение о событии в общую шину NATS, то могу с лёгкостью в новом сервисе произвести нужные действия и так же по их окончанию поместить новое сообщение оповещающее об окончании процесса обработки, тут всё хорошо, НО… Что, если кто-то захочет конвертировать сущность в другой формат? Что тогда? Допустим я оставлю REST сервис обработки данных для инициации этой процедуры, тогда в нём я должен поместить в NATS сообщение о событии “была запрошена конвертация в такой-то формат” вместо создания задачи Celery? Сервис “слушающий” подобные “события” начнёт конвертацию и… И… и что? Пользователь-то по HTTP запрос сделал, эндпоинт создал “эвент” и ответил, что мол ждите, а чего ждать-то? Как оповестить клиент, что конвертация закончилась или, что произошла ошибка? В текущем варианте с Celery есть task_id, по которому можно проверять текущее положение дел, а с новой системой как? Возникает мысль, что комбинировать REST сервисы с сервисами общающимися через MQ не очень хорошая идея. Но что тогда делать? Может создать отдельный сервис который держит постоянную связь с клиентом через WebSocket соединение, такой же как NestJS для WebUI только для других клиентов? Пользователь инициировал конвертацию, сервис слушает подобные события и при ответе оповещает пользователя по WS? Что тогда брать для реализации подобных вещей? Может вообще пускать всех через тот самый NestJS сервис который обслуживает WebUI, а чего, пусть и с плагинами держит соединение?! Но следом возникает вопрос авторизации, как в NATS фильтровать только события, нужные текущему пользователю? Если N пользователей приконнектилось, каждый запустил конвертацию своей сущности из БД в нужный формат, создалось N событий в NATS, вида entity.converted, то что, все сервисы слушающие шину событий на предмет завершения конвертации примут ВСЕ эвенты и вручную надо тело каждого сообщения сериализовать и смотреть моя ли это сущность была конвертирована?

В итоге, главный вопрос такой, что можно почитать, чтобы в голове всё встало на свои места?
  • Вопрос задан
  • 409 просмотров
Пригласить эксперта
Ответы на вопрос 2
REST синхронный, очередь сообщений асинхронная. Если переходите на асинхроту, для HTTP придётся WebSocket внедрить, или long-poll, или костыльно poll'ить раз в секунду, например. (Фу!)
Задачи идентифицировать, чтобы несколько запросивших одно, получили единственное.
Посмотрите паттерн Pub-Sub. Например, отправивший задание "хочу данные X в формате Y" подписывался на сообщения канала "готовченко" и ждал в нём свой "X-Y".
Планировать сообщения и события в асинхронной системе мне удобно в онлайн редакторе UML диаграмм. Сверху участники процессов. Вниз идёт "время". И взаимодействия, кто кому что отправил.
Ответ написан
Комментировать
xmoonlight
@xmoonlight
https://sitecoder.blogspot.com
Всего-то 3 компонента:
1. Генераторы событий - от разных систем.
2. Слушатели событий - те, кто заинтересован в статусе систем.
3. Менеджер очереди событий для конкретного пользователя (один из слушателей событий - это он же).

Доставка из менеджера в браузер - это вообще другой вопрос!
Вариантов - масса: вручную, js-таймаут (или иное событие клиента), long-polling и websocket.

Про UML - уже Сергей Соколов всё верно сказал.
Ответ написан
Комментировать
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы