@bitwheeze

Как подружить web flux long polling запрос и scheduled метод?

Не получается заставить работать параллельно long polling запрос и scheduled метод. У меня есть внешний источник данных. Раз в 3 секунды scheduled метод обращается к этому источнику данных и забирает некоторые данные сохраняя их в моей базе данных, добавляя временную метку (timestamp атрибут), когда данные были обновлены. Что бы облегчить жизнь веб приложению, я решил добавить long polling метод, который бы ждал изменения timestamp поля и в случае изменения возвращал бы dto с обновленными данными. Или старый dto если за 12 секунд не было изменений.

Scheduled метод понятно, простой метод с аннотацией @Schedule(fixedDelay)
Long polling методы выглядит примерно так

@GetMapping("/hook")
    public Mono<DataDto> publisher(@PathVariable String account) {
        var currentSession = service.load(account);
		if(currentSession.isEmpty()) {
			return Mono.empty();
		}
		try {
			Thread.sleep(3000);
			for(int i = 0; i < 4; i++) {
				var tempSession = service.load(account);
				if (currentSession.get().getTimestamp() < tempSession.get().getTimestamp()) {
					Mono.just(tempSession.get().dto());
				}
				Thread.sleep(3000);
			}
			Mono.just(currentSession.get().dto());
		} catch (Exception e) {
			log.error("error waiting new data", e);
		}
		throw new RuntimeException("dvferfge");
    }


Я сильно упростил, и поменял названия, что бы не выдавать секретов фирмы.

Так вот проблема в том, что все работает, но только на моем компьютере в Intellij. Если я запаковываю все в контейнер и запускаю в нем как java -jar app.jar, то почему то как только приходит long polling запрос, scheduled метод не срабатывает. Как только 12 секунд пройдут и long polling метод вернет старые данные, начинает снова работать scheduled метод и обновляет в базе данные. Это срабатывает и на сервере порой, то-есть scheduled метод после прихода запроса и во время его, но редко. Web flux же вроде должен работать в своем потоке. Насколько я это понимаю. Я грешу на service.load(account); Этот метод аннотирован @Transactional по сути я думаю ceuurentSession должно быть в статусе detached.

Мне только остается Scheduled метод переписать как отдельный поток, иначе не понимаю, почему он блокируется на время выполнения long polling метода.
  • Вопрос задан
  • 59 просмотров
Решения вопроса 1
@bitwheeze Автор вопроса
Заработало вроде. Переписал все реактивно. Обработчик перестал блокировать скедулер. Может с двумя Mono.defer перебор, но по другому пока не умею. Еще неясно мне пока с take(4) на своем ли месте этот вызов.

@GetMapping("/hooks/{account}/new_game")
    public Mono<TableauDto> publisher(@PathVariable String account) {
        log.debug("hooks/new_game {}", account);
        return Mono.defer(() -> {
            log.debug("get current startBlock {}", account);
            final var currentStartBlock = service
                    .load(account)
                    .map(session -> session.getStartBlock())
                    .orElse(0l);

            var res = Mono.defer(() -> service.getTableau(account, currentStartBlock))
                    .repeatWhenEmpty(c -> c.delayElements(Duration.ofMillis(3000)).take(4));

            log.debug("return res {}", res.single());

            return res;
        });
    }
Ответ написан
Комментировать
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы