<dependency>
<groupId>org.hibernate</groupId>
<artifactId>hibernate-core</artifactId>
<version>4.3.11.Final</version>
</dependency>
public static void main(String[] args) throws Exception {
Sinks.Many<Integer> sinks = Sinks.many()
.multicast()
.onBackpressureBuffer(); // Изучайте документацию. Тут много вариантов
sinks.asFlux()
.subscribe(System.out::println);
// В очереди ничего нет. Ничего не происходит
Thread.sleep(1000);
// Наполняем очередь элементами
sinks.tryEmitNext(1);
sinks.tryEmitNext(2);
sinks.tryEmitNext(3);
}
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import java.time.Duration;
import java.util.stream.IntStream;
@Slf4j
@RequiredArgsConstructor
@RestController
public class WebFluxDemo {
private final ObjectMapper objectMapper;
@GetMapping("/getMyObjs")
Flux<MyObj> getMyObjs() {
return Flux.fromStream(IntStream.range(1, 6).boxed()
.map(String::valueOf)
)
.map(MyObj::new)
.delayElements(Duration.ofSeconds(1))
.log()
;
}
@GetMapping("/getMyObjsAsString")
Flux<String> getMyObjsAsString() {
return Flux.fromStream(IntStream.range(1, 6).boxed()
.map(String::valueOf))
.map(MyObj::new)
.map(this::mapToObj)
.delayElements(Duration.ofSeconds(1))
.log()
;
}
@SneakyThrows
private String mapToObj(MyObj myObj) {
return objectMapper.writeValueAsString(myObj);
}
private record MyObj(String id) {
}
}