3o_3v

Untitled

Nov 3rd, 2023
675
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 6.39 KB | None | 0 0
  1. package com.example.reactive;
  2.  
  3. import org.springframework.boot.CommandLineRunner;
  4. import org.springframework.boot.SpringApplication;
  5. import org.springframework.boot.autoconfigure.SpringBootApplication;
  6. import org.springframework.context.annotation.Bean;
  7. import reactor.core.publisher.BaseSubscriber;
  8. import reactor.core.publisher.Flux;
  9. import reactor.core.publisher.Mono;
  10. import reactor.core.scheduler.Scheduler;
  11. import reactor.core.scheduler.Schedulers;
  12.  
  13. import java.util.Random;
  14.  
  15. @SpringBootApplication
  16. public class ReactiveApplication {
  17.  
  18.     public static void main(String[] args) {
  19.         SpringApplication.run(ReactiveApplication.class, args);
  20.     }
  21.  
  22.     @Bean
  23.     public CommandLineRunner runner() {
  24.         return (args) -> hardAndEasy();
  25.     }
  26.  
  27.     public void hardAndEasy() throws InterruptedException {
  28.         Flux<Integer> publisher = Flux.just(2, 4, -2, 0, 9);
  29.         Scheduler hard = Schedulers.newParallel("hard", 10);
  30.         Scheduler easy = Schedulers.newParallel("easy", 2);
  31.         Flux<Integer> result = publisher
  32.                 .publishOn(easy)
  33.                 .doOnNext((x) -> System.out.println("easy: " + Thread.currentThread().getName()))
  34.                 .flatMap(number -> {
  35.                     Random random = new Random();
  36.                     int randomDelay = random.nextInt(3000);
  37.                     return Mono.just(number)
  38.                             .subscribeOn(hard)
  39.                             .doOnNext((x) -> System.out.println("hard: " + Thread.currentThread().getName()))
  40.                             // hard work
  41.                             .delayElement(java.time.Duration.ofMillis(randomDelay))
  42.                             .map(x -> x * x);
  43.                 })
  44.                 .publishOn(easy);
  45.         result
  46.                 .doOnNext((x) -> System.out.println("result: " + Thread.currentThread().getName()))
  47.                 .subscribe(System.out::println);
  48.         Thread.sleep(5000);
  49.         easy.dispose();
  50.         hard.dispose();
  51.     }
  52.  
  53.     public void saveOrder() throws InterruptedException {
  54.         Flux<Integer> publisher = Flux.just(2, 4, -2, 0, 9);
  55.         Scheduler s = Schedulers.parallel();
  56.         Flux<Integer> result = publisher
  57.                 .doOnNext((x) -> System.out.println("Processing: " + x))
  58.                 .flatMapSequential(number -> {
  59.                     Random random = new Random();
  60.                     int randomDelay = random.nextInt(3000);
  61.                     return Mono.just(number)
  62.                         .subscribeOn(s)
  63.                         // hard work
  64.                         .delayElement(java.time.Duration.ofMillis(randomDelay))
  65.                         .map(x -> x * x);
  66.         });
  67.         result.publishOn(Schedulers.single())
  68.                 .subscribe(System.out::println);
  69.         Thread.sleep(5000);
  70.         s.dispose();
  71.     }
  72.  
  73.     private void A() {
  74.         Flux.<String>generate((sink) -> {
  75.             sink.next("Hello");
  76.         })
  77.                 .take(4)
  78.                 .subscribe(System.out::println);
  79.     }
  80.  
  81.     private void B() {
  82.         Flux.generate(
  83.                 () -> 1,
  84.                 (state, sink) -> {
  85.                     if (state > 5) {
  86.                         sink.complete();
  87.                     }
  88.                     sink.next(state);
  89.                     return state + 1;
  90.                 }).subscribe(System.out::println);
  91.     }
  92.  
  93.     private void C() {
  94.         Flux<Object> publisher = getSomePublisher();
  95.  
  96.         Flux.create(sink -> publisher.subscribe(new BaseSubscriber<>() {
  97.             @Override
  98.             protected void hookOnNext(Object value) {
  99.                 sink.next(value);
  100.             }
  101.  
  102.             @Override
  103.             protected void hookOnComplete() {
  104.                 sink.complete();
  105.             }
  106.         })).subscribe(System.out::println);
  107.     }
  108.  
  109.     public void D() {
  110.         Flux<Object> publisher = getSomePublisher();
  111.  
  112.         Flux.create(sink -> {
  113.             sink.onRequest(r -> {
  114.                 sink.next("DB returns: " + publisher.blockFirst());
  115.             });
  116.         }).subscribe(System.out::println);
  117.     }
  118.  
  119.     public void firstChain() {
  120.         // первый дженерик - отвечает за то что выдаем, второй - за то, что есть state
  121.         Flux<Integer> a = Flux.<Integer, Integer>generate(() -> 0,
  122.                 (state, sink) -> {
  123.                     if (state > 5) sink.complete();
  124.                     sink.next(state);
  125.                     return state + 1;
  126.                 })
  127.                 .doOnNext(i -> System.out.println("Before: " + Thread.currentThread().getName()))
  128.                 .publishOn(Schedulers.newParallel("test"))
  129.                 .doOnNext(i -> System.out.println("After: " + Thread.currentThread().getName()))
  130.                 .map(i -> {
  131.                     System.out.println("In map: "  + Thread.currentThread().getName());
  132.                     return i * i;
  133.                 });
  134.  
  135.         a.subscribe((i) -> {
  136.             System.out.println("In subscriber: " + Thread.currentThread().getName());
  137.         });
  138.     }
  139.  
  140.     public void secondChain() {
  141.         Flux<Integer> a = Flux.<Integer, Integer>generate(() -> 0,
  142.                         (state, sink) -> {
  143.                             if (state > 5) sink.complete();
  144.                             sink.next(state);
  145.                             return state + 1;
  146.                         })
  147.                 .doOnNext(i -> System.out.println("Before: " + Thread.currentThread().getName()))
  148.                 //.subscribeOn(Schedulers.newParallel("subs_1"))
  149.                 //.publishOn(Schedulers.newParallel("test"))
  150.                 .doOnNext(i -> System.out.println("After: " + Thread.currentThread().getName()))
  151.                 .map(i -> {
  152.                     System.out.println("In map: "  + Thread.currentThread().getName());
  153.                     return i * i;
  154.                 })
  155.                 .subscribeOn(Schedulers.newParallel("subs_2"));
  156.  
  157.         a.subscribe((i) -> {
  158.             System.out.println("In subscriber: " + Thread.currentThread().getName());
  159.         });
  160.     }
  161.  
  162.     private Flux<Object> getSomePublisher() {
  163.         return Flux.generate(
  164.                 () -> 0,
  165.                 (state, sink) -> {
  166.                     if (state > 5) sink.complete();
  167.                     sink.next(state + ";");
  168.                     return state + 1;
  169.                 }
  170.         );
  171.     }
  172. }
  173.  
Advertisement
Add Comment
Please, Sign In to add comment