@PaulTes

Как в Akka Streams выполнить проверку условия перед рассылкой данных через BroadcastHub?

Здравствуйте, я осваиваю библиотеку для работы с сокетами https://github.com/playframework/play-socket.io

У меня следующий вопрос, как с помощью нее я могу обрабатывать сущности до того как они будут разосланы всем подписчикам.

Сейчас пытаюсь сделать вот так:
Pair<Sink<Brand, NotUsed>, Source<Brand, NotUsed>> pair = MergeHub.of(Brand.class)
                .toMat(BroadcastHub.of(Brand.class), Keep.both())
                .run(materializer);

        Source<Brand, NotUsed> modifiedSource = pair.second().flatMapConcat((brand) -> {
            Logger.warn(brand.getName());
            return Source.fromCompletionStage(
                    brandRepository.put(brand)
            );
        });

        Flow<Brand, Brand, NotUsed> chatFlow = Flow.fromSinkAndSourceCoupled(
                pair.first(),
                modifiedSource
        );


Но есть проблема, код сохранения бренда выполняется столько раз сколько и подключений.
И если объясните как все-таки работают MergeHub и BroadcastHub буду благодарен.

Спасибо.
  • Вопрос задан
  • 96 просмотров
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы