@Test
void testMultiproducer() throws InterruptedException {
Sinks.Many<Integer> sinksA = Sinks.many().multicast().onBackpressureBuffer();
Sinks.Many<Integer> sinksB = Sinks.many().multicast().onBackpressureBuffer();
Sinks.Many<Integer> sinksC = Sinks.many().multicast().onBackpressureBuffer();
Sinks.Many<Integer> sinkCommon = Sinks.many()
.multicast()
.onBackpressureBuffer();
sinkCommon.asFlux()
.mergeWith(sinksA.asFlux().delayElements(Duration.ofMillis(100)))
.mergeWith(sinksB.asFlux().delayElements(Duration.ofMillis(200)))
.mergeWith(sinksC.asFlux().delayElements(Duration.ofMillis(300)))
.subscribe(e -> log.info("Element {}", e)); // Your consumer
IntStream.range(0, 100)
.forEach(sinksA::tryEmitNext); // Your producers
IntStream.range(1000, 1100)
.forEach(sinksB::tryEmitNext);
IntStream.range(2000, 2100)
.forEach(sinksC::tryEmitNext);
Thread.sleep(10000); // Is necessary for test only
}