Advertisement
Guest User

Untitled

a guest
Jan 15th, 2019
155
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.98 KB | None | 0 0
  1.  
  2. ##Active MQ configuration
  3. activemq.broker-url=(tcp://172.18.13.68:61616,tcp://172.18.13.69:61617)?ha=true&retryInterval=1000&reconnectAttempts=-1
  4. activemq.user=admin
  5. activemq.password=admin
  6. activemq.topic=TestTopic
  7. server.port=8084
  8.  
  9. Code:
  10.  
  11. @Service
  12. public class TopicMessageConsumer implements MessageListener,ExceptionListener, InitializingBean, DisposableBean {
  13.  
  14.  
  15. @Value("${activemq.broker-url}")
  16. private String activeMqBrokerUri;
  17.  
  18. @Value("${activemq.user}")
  19. private String username;
  20.  
  21. @Value("${activemq.password}")
  22. private String password;
  23.  
  24. @Value("${activemq.topic}")
  25. private String topicDestination;
  26.  
  27. private Connection connection;
  28.  
  29. private Session session;
  30.  
  31. private long ms=0;
  32.  
  33. public void run() throws JMSException {
  34. ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(activeMqBrokerUri,username, password);
  35. connection = factory.createConnection();
  36. connection.setClientID("clientID2");
  37. connection.setExceptionListener(this);
  38. connection.start();
  39. session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  40. Topic destination = session.createTopic(topicDestination);
  41. MessageConsumer consumer = session.createDurableSubscriber(destination,"subsname");
  42. consumer.setMessageListener(this);
  43. System.out.println(String.format("TopicMessageConsumer Waiting for messages at %s %s", topicDestination, this.activeMqBrokerUri));
  44. }
  45.  
  46. @Override
  47. public void onMessage(Message message) {
  48. String msg;
  49. try {
  50. msg = ((TextMessage) message).getText().replaceAll("#", "");
  51. System.out.println(new Date()+ " ["+ms+"] "+ msg);
  52. ms++;
  53. } catch (Exception e) {
  54. e.printStackTrace();
  55. }
  56. }
  57.  
  58.  
  59. @Override
  60. public void destroy() throws Exception {
  61. session.close();
  62. connection.close();
  63. }
  64.  
  65. @Override
  66. public void afterPropertiesSet() throws Exception {
  67. run();
  68. }
  69.  
  70. @Override
  71. public void onException(JMSException exception) {
  72. System.out.println("Excepcion---------------------");
  73.  
  74. }
  75.  
  76. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement