Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- @Slf4j
- @SpringBootApplication
- public class RSocketRequesterApplication {
- public static void main(String[] args) {
- SpringApplication.run(RSocketRequesterApplication.class);
- }
- @Bean
- RSocket rSocket() {
- return RSocketFactory
- .connect()
- .frameDecoder(PayloadDecoder.ZERO_COPY)
- .dataMimeType(MimeTypeUtils.APPLICATION_JSON_VALUE)
- .transport(TcpClientTransport.create(7000))
- .start()
- .block();
- }
- @Bean
- RSocketRequester rSocketRequester(RSocket rSocket, RSocketStrategies rSocketStrategies) {
- return RSocketRequester.wrap(rSocket, MimeTypeUtils.APPLICATION_JSON,
- rSocketStrategies);
- }
- @Component
- class CustomerServiceAdapter {
- private final RSocketRequester rSocketRequester;
- CustomerServiceAdapter(RSocketRequester rSocketRequester) {
- this.rSocketRequester = rSocketRequester;
- }
- Mono<CustomerResponse> getCustomer(String id) {
- return rSocketRequester
- .route("customer")
- .data(new CustomerRequest(id))
- .retrieveMono(CustomerResponse.class)
- .doOnNext(customerResponse -> log.info("Received customer as mono [{}]", customerResponse));
- }
- Flux<CustomerResponse> getCustomers(List<String> ids) {
- return rSocketRequester
- .route("customer-stream")
- .data(new MultipleCustomersRequest(ids))
- .retrieveFlux(CustomerResponse.class)
- .doOnNext(customerResponse -> log.info("Received customer as flux [{}]", customerResponse));
- }
- Flux<CustomerResponse> getCustomerChannel(Flux<CustomerRequest> customerRequestFlux) {
- return rSocketRequester
- .route("customer-channel")
- .data(customerRequestFlux, CustomerRequest.class)
- .retrieveFlux(CustomerResponse.class)
- .doOnNext(customerResponse -> log.info("Received customer as flux [{}]", customerResponse));
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement