@Test
void testMultiproducer() throws InterruptedException {
Sinks.Many<Integer> sinksA = Sinks.many().multicast().onBackpressureBuffer();
Sinks.Many<Integer> sinksB = Sinks.many().multicast().onBackpressureBuffer();
Sinks.Many<Integer> sinksC = Sinks.many().multicast().onBackpressureBuffer();
Sinks.Many<Integer> sinkCommon = Sinks.many()
.multicast()
.onBackpressureBuffer();
sinkCommon.asFlux()
.mergeWith(sinksA.asFlux().delayElements(Duration.ofMillis(100)))
.mergeWith(sinksB.asFlux().delayElements(Duration.ofMillis(200)))
.mergeWith(sinksC.asFlux().delayElements(Duration.ofMillis(300)))
.subscribe(e -> log.info("Element {}", e)); // Your consumer
IntStream.range(0, 100)
.forEach(sinksA::tryEmitNext); // Your producers
IntStream.range(1000, 1100)
.forEach(sinksB::tryEmitNext);
IntStream.range(2000, 2100)
.forEach(sinksC::tryEmitNext);
Thread.sleep(10000); // Is necessary for test only
}
public Publisher<Void> addReport(HttpServerRequest req, HttpServerResponse resp) {
return req.receive()
.asString()
.doOnNext(string -> {
// todo :: make some handy job with string
System.out.println(string);
})
.flatMap(__ -> resp.status(HttpResponseStatus.OK)
.addHeader(ACCESS_CONTROL_ALLOW_ORIGIN, "*") // CORS
.sendString(Mono.just("")));
}
var collectionOfElements = Stream.concat(highs.stream(), lows.stream())
.collect(Collectors.toMap(MyObject::id, Function.identity(), (o1, o2) -> {
return ... // TODO :: merge two objects;
})).values();