import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import java.time.Duration;
import java.util.stream.IntStream;
@Slf4j
@RequiredArgsConstructor
@RestController
public class WebFluxDemo {
private final ObjectMapper objectMapper;
@GetMapping("/getMyObjs")
Flux<MyObj> getMyObjs() {
return Flux.fromStream(IntStream.range(1, 6).boxed()
.map(String::valueOf)
)
.map(MyObj::new)
.delayElements(Duration.ofSeconds(1))
.log()
;
}
@GetMapping("/getMyObjsAsString")
Flux<String> getMyObjsAsString() {
return Flux.fromStream(IntStream.range(1, 6).boxed()
.map(String::valueOf))
.map(MyObj::new)
.map(this::mapToObj)
.delayElements(Duration.ofSeconds(1))
.log()
;
}
@SneakyThrows
private String mapToObj(MyObj myObj) {
return objectMapper.writeValueAsString(myObj);
}
private record MyObj(String id) {
}
}
webClient.get()
.uri("https://your.awesome.uri")
.retrieve()
.bodyToFlux(YourAwesomeObject.class)
.log()
// TODO :: map here .map(yourAwesomeObject -> )
// recomended way : .subscribe(result -> ...);
// depricated way : .block();
static int a
, которая только одна.@Test
void testMultiproducer() throws InterruptedException {
Sinks.Many<Integer> sinksA = Sinks.many().multicast().onBackpressureBuffer();
Sinks.Many<Integer> sinksB = Sinks.many().multicast().onBackpressureBuffer();
Sinks.Many<Integer> sinksC = Sinks.many().multicast().onBackpressureBuffer();
Sinks.Many<Integer> sinkCommon = Sinks.many()
.multicast()
.onBackpressureBuffer();
sinkCommon.asFlux()
.mergeWith(sinksA.asFlux().delayElements(Duration.ofMillis(100)))
.mergeWith(sinksB.asFlux().delayElements(Duration.ofMillis(200)))
.mergeWith(sinksC.asFlux().delayElements(Duration.ofMillis(300)))
.subscribe(e -> log.info("Element {}", e)); // Your consumer
IntStream.range(0, 100)
.forEach(sinksA::tryEmitNext); // Your producers
IntStream.range(1000, 1100)
.forEach(sinksB::tryEmitNext);
IntStream.range(2000, 2100)
.forEach(sinksC::tryEmitNext);
Thread.sleep(10000); // Is necessary for test only
}
public Publisher<Void> addReport(HttpServerRequest req, HttpServerResponse resp) {
return req.receive()
.asString()
.doOnNext(string -> {
// todo :: make some handy job with string
System.out.println(string);
})
.flatMap(__ -> resp.status(HttpResponseStatus.OK)
.addHeader(ACCESS_CONTROL_ALLOW_ORIGIN, "*") // CORS
.sendString(Mono.just("")));
}