Advertisement
Guest User

Untitled

a guest
Jul 28th, 2016
80
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.71 KB | None | 0 0
  1.  
  2. package example;
  3.  
  4. import org.apache.activemq.ActiveMQConnectionFactory;
  5. import org.apache.activemq.command.ActiveMQTopic;
  6.  
  7. import javax.jms.*;
  8.  
  9. class DurableSub {
  10.  
  11. public static void main(String []args) throws JMSException {
  12.  
  13. String user = env("ACTIVEMQ_USER", "admin");
  14. String password = env("ACTIVEMQ_PASSWORD", "password");
  15. String host = env("ACTIVEMQ_HOST", "localhost");
  16. int port = Integer.parseInt(env("ACTIVEMQ_PORT", "61616"));
  17. String destination = arg(args, 0, "event");
  18.  
  19. ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://" + host + ":" + port);
  20.  
  21. Connection connection = factory.createConnection(user, password);
  22. connection.setClientID("broker1.consumer1");
  23. connection.start();
  24. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  25. Topic dest = new ActiveMQTopic(destination);
  26.  
  27. MessageConsumer consumer = session.createDurableSubscriber(dest,"SubDemo");
  28. long start = System.currentTimeMillis();
  29. long count = 1;
  30. System.out.println("Waiting for messages...");
  31. while(true) {
  32. Message msg = consumer.receive();
  33. if( msg instanceof TextMessage ) {
  34. String body = ((TextMessage) msg).getText();
  35. if( "SHUTDOWN".equals(body)) {
  36. long diff = System.currentTimeMillis() - start;
  37. System.out.println(String.format("Received %d in %.2f seconds", count, (1.0*diff/1000.0)));
  38. break;
  39. } else {
  40. if( count != msg.getIntProperty("id") ) {
  41. System.out.println("mismatch: "+count+"!="+msg.getIntProperty("id"));
  42. }
  43. count = msg.getIntProperty("id");
  44.  
  45. if( count == 0 ) {
  46. start = System.currentTimeMillis();
  47. }
  48. if( count % 1000 == 0 ) {
  49. System.out.println(String.format("Received %d messages.", count));
  50. }
  51. count ++;
  52. }
  53.  
  54. } else {
  55. System.out.println("Unexpected message type: "+msg.getClass());
  56. }
  57. }
  58. connection.close();
  59. }
  60.  
  61. private static String env(String key, String defaultValue) {
  62. String rc = System.getenv(key);
  63. if( rc== null )
  64. return defaultValue;
  65. return rc;
  66. }
  67.  
  68. private static String arg(String []args, int index, String defaultValue) {
  69. if( index < args.length )
  70. return args[index];
  71. else
  72. return defaultValue;
  73. }
  74. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement