Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- public class StompTcpFactory implements NetStreams.TcpClientFactory<Message<byte[]>, Message<byte[]>> {
- private final Environment environment = new Environment(new SynchronousDispatcherConfigReader());
- private final List<URI> endpoints;
- private final boolean isSsl;
- public StompTcpFactory(List<String> endpoints) {
- this.endpoints = endpoints.stream()
- .map(e -> contains(e, "://") ? e : "fake://" + e)
- .map(URI::create)
- .collect(toList());
- isSsl = this.endpoints.stream().anyMatch(StompTcpFactory::isSsl);
- boolean anyNotSsl = this.endpoints.stream().anyMatch(not(StompTcpFactory::isSsl));
- if (isSsl && anyNotSsl)
- throw new IllegalArgumentException("Cannot configure STOMP to use SSL and regular connections at the same time: " + endpoints);
- }
- @Override
- public Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> apply(Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> tcpClientSpec) {
- return tcpClientSpec
- .env(environment)
- .codec(new Reactor2StompCodec(new StompEncoder(), new StompDecoder()))
- .ssl(isSsl ? new SslOptions() : null)
- .connect(new InetSocketAddressSupplier(endpoints));
- }
- private static boolean isSsl(URI endpoint) {
- return containsIgnoreCase(endpoint.getScheme(), "ssl");
- }
- private static class SynchronousDispatcherConfigReader implements ConfigurationReader {
- @Override
- public ReactorConfiguration read() {
- return new ReactorConfiguration(emptyList(), "sync", new Properties());
- }
- }
- }
- public class InetSocketAddressSupplier implements Supplier<InetSocketAddress> {
- private static final AtomicInteger counter = new AtomicInteger(0);
- private final Logger logger = LoggerFactory.getLogger(getClass());
- private final List<URI> endpoints;
- public InetSocketAddressSupplier(List<URI> endpoints) {
- this.endpoints = endpoints;
- }
- @Override
- public InetSocketAddress get() {
- int endpointIndex = counter.getAndUpdate(i -> ++i % endpoints.size());
- URI endpoint = endpoints.get(endpointIndex);
- logger.info("nnConnecting to broker[{}]: {}:{}nn", endpointIndex, endpoint.getHost(), endpoint.getPort());
- return new InetSocketAddress(endpoint.getHost(), endpoint.getPort());
- }
- }
- @Override
- public void configureMessageBroker(MessageBrokerRegistry registry) {
- String environment = configService.getString(ENVIRONMENT);
- String user = configService.getString(WEBSOCKET_EXTERNAL_BROKER_USER);
- String password = configService.getString(WEBSOCKET_EXTERNAL_BROKER_PASSWORD);
- registry
- .enableStompBrokerRelay("/queue", "/topic")
- .setRelayHost(UNUSED_RELAY_HOST)
- .setClientLogin(user)
- .setClientPasscode(password)
- .setSystemLogin(user)
- .setSystemPasscode(password)
- .setUserDestinationBroadcast(format("/topic/%s-unresolved-user-destination", environment))
- .setUserRegistryBroadcast(format("/topic/%s-simp-user-registry", environment))
- .setTcpClient(createTcpClient());
- registry.setApplicationDestinationPrefixes(format("/%s-websocket-app", environment));
- }
- private TcpOperations<byte[]> createTcpClient() {
- List<String> endpoints = asList(configService.getStringArray(WEBSOCKET_EXTERNAL_BROKER_ENDPOINTS));
- logger.info("Configuring websocket brokers to: {}", endpoints);
- return new Reactor2TcpClient<>(new StompTcpFactory(endpoints));
- }
Add Comment
Please, Sign In to add comment