public Publisher<Void> getFilterListByType(HttpServerRequest req, HttpServerResponse resp) {
return resp
.status(HttpResponseStatus.OK)
.addHeader(CONTENT_TYPE, "application/json")
.addHeader(ACCESS_CONTROL_ALLOW_ORIGIN, "*") // CORS
.sendString(
myService.getObjectsAsFlux() // получить Flux<MyObj>
.map(item -> toJson(item)) // преобразовать в JSON
);
}
{"id": 123, "desc": "OLOLO"} {"id": 456, "desc": "AZAZA"}
.map
код:.reduce((a, b) -> a + "," + b)
.map(item -> "[" + item + "]")
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import java.time.Duration;
import java.util.stream.IntStream;
@Slf4j
@RequiredArgsConstructor
@RestController
public class WebFluxDemo {
private final ObjectMapper objectMapper;
@GetMapping("/getMyObjs")
Flux<MyObj> getMyObjs() {
return Flux.fromStream(IntStream.range(1, 6).boxed()
.map(String::valueOf)
)
.map(MyObj::new)
.delayElements(Duration.ofSeconds(1))
.log()
;
}
@GetMapping("/getMyObjsAsString")
Flux<String> getMyObjsAsString() {
return Flux.fromStream(IntStream.range(1, 6).boxed()
.map(String::valueOf))
.map(MyObj::new)
.map(this::mapToObj)
.delayElements(Duration.ofSeconds(1))
.log()
;
}
@SneakyThrows
private String mapToObj(MyObj myObj) {
return objectMapper.writeValueAsString(myObj);
}
private record MyObj(String id) {
}
}
Flux<String> items = ... здесь исходный Flux ... ;
Flux<Integer> numbers = Flux.generate(
() -> 0,
(state, sink) -> {
sink.next(state);
return state + 1;
}
);
items.zipWith(numbers, (i,n) -> n + i)
.map(item-> item.replace("0{", "{").replaceFirst("^[0-9]+", ", "))
.startWith("[")
.concatWithValues("]")
.subscribe(System.out::println);