Advertisement
Guest User

Untitled

a guest
Jun 19th, 2019
98
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.21 KB | None | 0 0
  1. @RestController
  2. public class Controller
  3. {
  4. @PostMapping("/doStuff")
  5. public Mono<Response> doStuff(@RequestBody Mono<Request> request)
  6. {
  7. ...
  8. }
  9. }
  10.  
  11. @PostMapping("/doStuff")
  12. public Mono<Response> doStuff(@RequestBody Mono<Request> request)
  13. {
  14. return request.flux()
  15. .groupBy(r -> r.someProperty())
  16. .flatMap(gf -> gf.map(r -> doStuff(r)));
  17. }
  18.  
  19. private AtomicReference<FluxSink<Request>> sink = new AtomicReference<>();
  20. private Flux<Response> serializingStream;
  21.  
  22.  
  23. public Controller()
  24. {
  25. this.serializingStream =
  26. Flux.<Request>create(fluxSink -> sink.set(fluxSink), ERROR)
  27. .groupBy(r -> r.someProperty())
  28. .flatMap(gf -> gf.map(r -> doStuff(r)));
  29. .publish()
  30. .autoConnect();
  31.  
  32. this.serializingStream.subscribe().dispose(); //dummy subscription to set the sink;
  33. }
  34.  
  35. @PostMapping("/doStuff")
  36. public Mono<Response> doStuff(@RequestBody Request request)
  37. {
  38. req.setReqId(UUID.randomUUID().toString());
  39. return
  40. serializingStream
  41. .doOnSubscribe(__ -> sink.get().next(req))
  42. .filter(resp -> resp.getReqId().equals(req.getReqId()))
  43. .take(1)
  44. .single();
  45. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement