Я хочу реализовать операцию блокировки очереди запросов с помощью инструментов Reactor Netty. То есть, если в текущую секунду поступает более 3 запросов, то происходит блокировка на 10 секунд и остальные запросы не обрабатываются. Но мое решение с виртуальным окном не работает (оно застревает в цикле) подскажите пожалуйста, может вы знаете другое решение.
public Mono<Void> handler(@NotNull HttpServerRequest request, HttpServerResponse response, TriFunction<JsonObject, HttpServerRequest, HttpServerResponse, Void> success, @NotNull Consumer<? super Throwable> failure) {
return Flux.interval(Duration.ofSeconds(1))
.window(Duration.ofSeconds(1))
.flatMap(Flux::count)
.doOnNext(System.out::println)
.filter(count -> count <= 3)
.delayElements(Duration.ofSeconds(5))
.then(sendRequest(request, response, success, failure));
}
private @NotNull Mono<Void> sendRequest(@NotNull HttpServerRequest request, HttpServerResponse response, TriFunction<JsonObject, HttpServerRequest, HttpServerResponse, Void> success, @NotNull Consumer<? super Throwable> failure) {
return request.receive()
.aggregate()
.asString()
.publishOn(Schedulers.boundedElastic())
.map(body -> Json.createReader(new StringReader(body)).readObject())
.doOnSuccess(json -> success.apply(json, request, response))
.doOnError(failure)
.then();
}