Guest User

Untitled

a guest
Nov 22nd, 2018
151
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.54 KB | None | 0 0
  1. public class StompTcpFactory implements NetStreams.TcpClientFactory<Message<byte[]>, Message<byte[]>> {
  2. private final Environment environment = new Environment(new SynchronousDispatcherConfigReader());
  3. private final List<URI> endpoints;
  4. private final boolean isSsl;
  5.  
  6. public StompTcpFactory(List<String> endpoints) {
  7. this.endpoints = endpoints.stream()
  8. .map(e -> contains(e, "://") ? e : "fake://" + e)
  9. .map(URI::create)
  10. .collect(toList());
  11. isSsl = this.endpoints.stream().anyMatch(StompTcpFactory::isSsl);
  12.  
  13. boolean anyNotSsl = this.endpoints.stream().anyMatch(not(StompTcpFactory::isSsl));
  14. if (isSsl && anyNotSsl)
  15. throw new IllegalArgumentException("Cannot configure STOMP to use SSL and regular connections at the same time: " + endpoints);
  16. }
  17.  
  18. @Override
  19. public Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> apply(Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> tcpClientSpec) {
  20. return tcpClientSpec
  21. .env(environment)
  22. .codec(new Reactor2StompCodec(new StompEncoder(), new StompDecoder()))
  23. .ssl(isSsl ? new SslOptions() : null)
  24. .connect(new InetSocketAddressSupplier(endpoints));
  25. }
  26.  
  27. private static boolean isSsl(URI endpoint) {
  28. return containsIgnoreCase(endpoint.getScheme(), "ssl");
  29. }
  30.  
  31. private static class SynchronousDispatcherConfigReader implements ConfigurationReader {
  32. @Override
  33. public ReactorConfiguration read() {
  34. return new ReactorConfiguration(emptyList(), "sync", new Properties());
  35. }
  36. }
  37.  
  38. }
  39.  
  40. public class InetSocketAddressSupplier implements Supplier<InetSocketAddress> {
  41.  
  42. private static final AtomicInteger counter = new AtomicInteger(0);
  43.  
  44. private final Logger logger = LoggerFactory.getLogger(getClass());
  45. private final List<URI> endpoints;
  46.  
  47. public InetSocketAddressSupplier(List<URI> endpoints) {
  48. this.endpoints = endpoints;
  49. }
  50.  
  51. @Override
  52. public InetSocketAddress get() {
  53. int endpointIndex = counter.getAndUpdate(i -> ++i % endpoints.size());
  54. URI endpoint = endpoints.get(endpointIndex);
  55.  
  56. logger.info("nnConnecting to broker[{}]: {}:{}nn", endpointIndex, endpoint.getHost(), endpoint.getPort());
  57. return new InetSocketAddress(endpoint.getHost(), endpoint.getPort());
  58. }
  59.  
  60. }
  61.  
  62. @Override
  63. public void configureMessageBroker(MessageBrokerRegistry registry) {
  64. String environment = configService.getString(ENVIRONMENT);
  65.  
  66. String user = configService.getString(WEBSOCKET_EXTERNAL_BROKER_USER);
  67. String password = configService.getString(WEBSOCKET_EXTERNAL_BROKER_PASSWORD);
  68. registry
  69. .enableStompBrokerRelay("/queue", "/topic")
  70. .setRelayHost(UNUSED_RELAY_HOST)
  71. .setClientLogin(user)
  72. .setClientPasscode(password)
  73. .setSystemLogin(user)
  74. .setSystemPasscode(password)
  75. .setUserDestinationBroadcast(format("/topic/%s-unresolved-user-destination", environment))
  76. .setUserRegistryBroadcast(format("/topic/%s-simp-user-registry", environment))
  77. .setTcpClient(createTcpClient());
  78.  
  79. registry.setApplicationDestinationPrefixes(format("/%s-websocket-app", environment));
  80. }
  81.  
  82.  
  83. private TcpOperations<byte[]> createTcpClient() {
  84. List<String> endpoints = asList(configService.getStringArray(WEBSOCKET_EXTERNAL_BROKER_ENDPOINTS));
  85. logger.info("Configuring websocket brokers to: {}", endpoints);
  86. return new Reactor2TcpClient<>(new StompTcpFactory(endpoints));
  87. }
Add Comment
Please, Sign In to add comment