Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- @RestController
- public class Controller
- {
- @PostMapping("/doStuff")
- public Mono<Response> doStuff(@RequestBody Mono<Request> request)
- {
- ...
- }
- }
- @PostMapping("/doStuff")
- public Mono<Response> doStuff(@RequestBody Mono<Request> request)
- {
- return request.flux()
- .groupBy(r -> r.someProperty())
- .flatMap(gf -> gf.map(r -> doStuff(r)));
- }
- private AtomicReference<FluxSink<Request>> sink = new AtomicReference<>();
- private Flux<Response> serializingStream;
- public Controller()
- {
- this.serializingStream =
- Flux.<Request>create(fluxSink -> sink.set(fluxSink), ERROR)
- .groupBy(r -> r.someProperty())
- .flatMap(gf -> gf.map(r -> doStuff(r)));
- .publish()
- .autoConnect();
- this.serializingStream.subscribe().dispose(); //dummy subscription to set the sink;
- }
- @PostMapping("/doStuff")
- public Mono<Response> doStuff(@RequestBody Request request)
- {
- req.setReqId(UUID.randomUUID().toString());
- return
- serializingStream
- .doOnSubscribe(__ -> sink.get().next(req))
- .filter(resp -> resp.getReqId().equals(req.getReqId()))
- .take(1)
- .single();
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement