Advertisement
Guest User

Untitled

a guest
Jun 8th, 2017
80
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.71 KB | None | 0 0
  1. package example;
  2.  
  3. import org.apache.activemq.ActiveMQConnectionFactory;
  4. import org.apache.activemq.command.ActiveMQTopic;
  5.  
  6. import javax.jms.*;
  7.  
  8. class DurableSub {
  9.  
  10. public static void main(String []args) throws JMSException {
  11.  
  12. String user = env("ACTIVEMQ_USER", "admin");
  13. String password = env("ACTIVEMQ_PASSWORD", "password");
  14. String host = env("ACTIVEMQ_HOST", "localhost");
  15. int port = Integer.parseInt(env("ACTIVEMQ_PORT", "61616"));
  16. String destination = arg(args, 0, "event");
  17.  
  18. ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://" + host + ":" + port);
  19.  
  20. Connection connection = factory.createConnection(user, password);
  21. connection.setClientID("broker1.consumer1");
  22. connection.start();
  23. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  24. Topic dest = new ActiveMQTopic(destination);
  25.  
  26. MessageConsumer consumer = session.createDurableSubscriber(dest,"SubDemo");
  27. long start = System.currentTimeMillis();
  28. long count = 1;
  29. System.out.println("Waiting for messages...");
  30. while(true) {
  31. Message msg = consumer.receive();
  32. if( msg instanceof TextMessage ) {
  33. String body = ((TextMessage) msg).getText();
  34. if( "SHUTDOWN".equals(body)) {
  35. long diff = System.currentTimeMillis() - start;
  36. System.out.println(String.format("Received %d in %.2f seconds", count, (1.0*diff/1000.0)));
  37. break;
  38. } else {
  39. if( count != msg.getIntProperty("id") ) {
  40. System.out.println("mismatch: "+count+"!="+msg.getIntProperty("id"));
  41. }
  42. count = msg.getIntProperty("id");
  43.  
  44. if( count == 0 ) {
  45. start = System.currentTimeMillis();
  46. }
  47. if( count % 1000 == 0 ) {
  48. System.out.println(String.format("Received %d messages.", count));
  49. }
  50. count ++;
  51. }
  52.  
  53. } else {
  54. System.out.println("Unexpected message type: "+msg.getClass());
  55. }
  56. }
  57. connection.close();
  58. }
  59.  
  60. private static String env(String key, String defaultValue) {
  61. String rc = System.getenv(key);
  62. if( rc== null )
  63. return defaultValue;
  64. return rc;
  65. }
  66.  
  67. private static String arg(String []args, int index, String defaultValue) {
  68. if( index < args.length )
  69. return args[index];
  70. else
  71. return defaultValue;
  72. }
  73. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement