SHARE
TWEET

Untitled

a guest Jun 8th, 2017 63 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. package example;
  2.  
  3. import org.apache.activemq.ActiveMQConnectionFactory;
  4. import org.apache.activemq.command.ActiveMQTopic;
  5.  
  6. import javax.jms.*;
  7.  
  8. class DurableSub {
  9.  
  10.     public static void main(String []args) throws JMSException {
  11.  
  12.         String user = env("ACTIVEMQ_USER", "admin");
  13.         String password = env("ACTIVEMQ_PASSWORD", "password");
  14.         String host = env("ACTIVEMQ_HOST", "localhost");
  15.         int port = Integer.parseInt(env("ACTIVEMQ_PORT", "61616"));
  16.         String destination = arg(args, 0, "event");
  17.  
  18.         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://" + host + ":" + port);
  19.  
  20.         Connection connection = factory.createConnection(user, password);
  21.         connection.setClientID("broker1.consumer1");
  22.         connection.start();
  23.         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  24.         Topic dest = new ActiveMQTopic(destination);
  25.  
  26.         MessageConsumer consumer = session.createDurableSubscriber(dest,"SubDemo");
  27.         long start = System.currentTimeMillis();
  28.         long count = 1;
  29.         System.out.println("Waiting for messages...");
  30.         while(true) {
  31.             Message msg = consumer.receive();
  32.             if( msg instanceof  TextMessage ) {
  33.                 String body = ((TextMessage) msg).getText();
  34.                 if( "SHUTDOWN".equals(body)) {
  35.                     long diff = System.currentTimeMillis() - start;
  36.                     System.out.println(String.format("Received %d in %.2f seconds", count, (1.0*diff/1000.0)));
  37.                     break;
  38.                 } else {
  39.                     if( count != msg.getIntProperty("id") ) {
  40.                         System.out.println("mismatch: "+count+"!="+msg.getIntProperty("id"));
  41.                     }
  42.                     count = msg.getIntProperty("id");
  43.  
  44.                     if( count == 0 ) {
  45.                         start = System.currentTimeMillis();
  46.                     }
  47.                     if( count % 1000 == 0 ) {
  48.                         System.out.println(String.format("Received %d messages.", count));
  49.                     }
  50.                     count ++;
  51.                 }
  52.  
  53.             } else {
  54.                 System.out.println("Unexpected message type: "+msg.getClass());
  55.             }
  56.         }
  57.         connection.close();
  58.     }
  59.  
  60.     private static String env(String key, String defaultValue) {
  61.         String rc = System.getenv(key);
  62.         if( rc== null )
  63.             return defaultValue;
  64.         return rc;
  65.     }
  66.  
  67.     private static String arg(String []args, int index, String defaultValue) {
  68.         if( index < args.length )
  69.             return args[index];
  70.         else
  71.             return defaultValue;
  72.     }
  73. }
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top