Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- // similar to Flux::groupBy but closes the GroupedFlux whenever a different key value appears potentially opening another one for the same value later.
- @Test
- public void groupOnSwitch() {
- StepVerifier
- .create(
- groupOnSwitch(
- Flux.just("one", "two", "twenty", "tissue", "berta", "blot", "thousand"),
- s -> s.substring(0, 1))
- .flatMap(Flux::materialize)
- .map(s -> s.isOnComplete() ? "WINDOW CLOSED" : s.get())
- )
- .expectNext("one", "WINDOW CLOSED")
- .expectNext("two", "twenty", "tissue", "WINDOW CLOSED")
- .expectNext("berta", "blot", "WINDOW CLOSED")
- .expectNext("thousand", "WINDOW CLOSED")
- .verifyComplete();
- }
- @Test
- public void groupOnSwitchKeys() {
- Flux<GroupedFlux<String, String>> fluxOfGroupedFluxes = groupOnSwitch(
- Flux.just("one", "two", "twenty", "tissue", "berta", "blot", "thousand"),
- s -> s.substring(0, 1));
- StepVerifier.create(
- fluxOfGroupedFluxes.map(gf -> gf.key())
- )
- .expectNext("o", "t", "b", "t")
- .verifyComplete();
- }
- private static <T, X> Flux<GroupedFlux<X, T>> groupOnSwitch(Flux<T> flux, Function<T, X> keyFunction) {
- ChangeTrigger changeTrigger = new ChangeTrigger();
- Flux<GroupedFlux<T, T>> fluxOfGroupedFluxes = flux.windowUntil(l -> changeTrigger.test(keyFunction.apply(l)), true);
- return fluxOfGroupedFluxes.flatMap(gf -> gf.groupBy(t -> keyFunction.apply(gf.key())));
- }
- private static class ChangeTrigger<T> {
- T last = null;
- boolean test(T value) {
- boolean startNew = !Objects.equals(last, value);
- last = value;
- System.out.println(String.format("%s, %s", value, startNew));
- return startNew;
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement