Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.example.reactive;
- import org.springframework.boot.CommandLineRunner;
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
- import org.springframework.context.annotation.Bean;
- import reactor.core.publisher.BaseSubscriber;
- import reactor.core.publisher.Flux;
- import reactor.core.publisher.Mono;
- import reactor.core.scheduler.Scheduler;
- import reactor.core.scheduler.Schedulers;
- import java.util.Random;
- @SpringBootApplication
- public class ReactiveApplication {
- public static void main(String[] args) {
- SpringApplication.run(ReactiveApplication.class, args);
- }
- @Bean
- public CommandLineRunner runner() {
- return (args) -> hardAndEasy();
- }
- public void hardAndEasy() throws InterruptedException {
- Flux<Integer> publisher = Flux.just(2, 4, -2, 0, 9);
- Scheduler hard = Schedulers.newParallel("hard", 10);
- Scheduler easy = Schedulers.newParallel("easy", 2);
- Flux<Integer> result = publisher
- .publishOn(easy)
- .doOnNext((x) -> System.out.println("easy: " + Thread.currentThread().getName()))
- .flatMap(number -> {
- Random random = new Random();
- int randomDelay = random.nextInt(3000);
- return Mono.just(number)
- .subscribeOn(hard)
- .doOnNext((x) -> System.out.println("hard: " + Thread.currentThread().getName()))
- // hard work
- .delayElement(java.time.Duration.ofMillis(randomDelay))
- .map(x -> x * x);
- })
- .publishOn(easy);
- result
- .doOnNext((x) -> System.out.println("result: " + Thread.currentThread().getName()))
- .subscribe(System.out::println);
- Thread.sleep(5000);
- easy.dispose();
- hard.dispose();
- }
- public void saveOrder() throws InterruptedException {
- Flux<Integer> publisher = Flux.just(2, 4, -2, 0, 9);
- Scheduler s = Schedulers.parallel();
- Flux<Integer> result = publisher
- .doOnNext((x) -> System.out.println("Processing: " + x))
- .flatMapSequential(number -> {
- Random random = new Random();
- int randomDelay = random.nextInt(3000);
- return Mono.just(number)
- .subscribeOn(s)
- // hard work
- .delayElement(java.time.Duration.ofMillis(randomDelay))
- .map(x -> x * x);
- });
- result.publishOn(Schedulers.single())
- .subscribe(System.out::println);
- Thread.sleep(5000);
- s.dispose();
- }
- private void A() {
- Flux.<String>generate((sink) -> {
- sink.next("Hello");
- })
- .take(4)
- .subscribe(System.out::println);
- }
- private void B() {
- Flux.generate(
- () -> 1,
- (state, sink) -> {
- if (state > 5) {
- sink.complete();
- }
- sink.next(state);
- return state + 1;
- }).subscribe(System.out::println);
- }
- private void C() {
- Flux<Object> publisher = getSomePublisher();
- Flux.create(sink -> publisher.subscribe(new BaseSubscriber<>() {
- @Override
- protected void hookOnNext(Object value) {
- sink.next(value);
- }
- @Override
- protected void hookOnComplete() {
- sink.complete();
- }
- })).subscribe(System.out::println);
- }
- public void D() {
- Flux<Object> publisher = getSomePublisher();
- Flux.create(sink -> {
- sink.onRequest(r -> {
- sink.next("DB returns: " + publisher.blockFirst());
- });
- }).subscribe(System.out::println);
- }
- public void firstChain() {
- // первый дженерик - отвечает за то что выдаем, второй - за то, что есть state
- Flux<Integer> a = Flux.<Integer, Integer>generate(() -> 0,
- (state, sink) -> {
- if (state > 5) sink.complete();
- sink.next(state);
- return state + 1;
- })
- .doOnNext(i -> System.out.println("Before: " + Thread.currentThread().getName()))
- .publishOn(Schedulers.newParallel("test"))
- .doOnNext(i -> System.out.println("After: " + Thread.currentThread().getName()))
- .map(i -> {
- System.out.println("In map: " + Thread.currentThread().getName());
- return i * i;
- });
- a.subscribe((i) -> {
- System.out.println("In subscriber: " + Thread.currentThread().getName());
- });
- }
- public void secondChain() {
- Flux<Integer> a = Flux.<Integer, Integer>generate(() -> 0,
- (state, sink) -> {
- if (state > 5) sink.complete();
- sink.next(state);
- return state + 1;
- })
- .doOnNext(i -> System.out.println("Before: " + Thread.currentThread().getName()))
- //.subscribeOn(Schedulers.newParallel("subs_1"))
- //.publishOn(Schedulers.newParallel("test"))
- .doOnNext(i -> System.out.println("After: " + Thread.currentThread().getName()))
- .map(i -> {
- System.out.println("In map: " + Thread.currentThread().getName());
- return i * i;
- })
- .subscribeOn(Schedulers.newParallel("subs_2"));
- a.subscribe((i) -> {
- System.out.println("In subscriber: " + Thread.currentThread().getName());
- });
- }
- private Flux<Object> getSomePublisher() {
- return Flux.generate(
- () -> 0,
- (state, sink) -> {
- if (state > 5) sink.complete();
- sink.next(state + ";");
- return state + 1;
- }
- );
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment