Advertisement
Guest User

Untitled

a guest
Apr 21st, 2016
64
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.51 KB | None | 0 0
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one or more
  3. * contributor license agreements. See the NOTICE file distributed with
  4. * this work for additional information regarding copyright ownership.
  5. * The ASF licenses this file to You under the Apache License, Version 2.0
  6. * (the "License"); you may not use this file except in compliance with
  7. * the License. You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. */
  17. package example;
  18.  
  19. import org.apache.activemq.ActiveMQConnectionFactory;
  20. import org.apache.activemq.command.ActiveMQTopic;
  21.  
  22. import javax.jms.*;
  23.  
  24. class DurableSub {
  25.  
  26. public static void main(String []args) throws JMSException {
  27.  
  28. String user = env("ACTIVEMQ_USER", "admin");
  29. String password = env("ACTIVEMQ_PASSWORD", "password");
  30. String host = env("ACTIVEMQ_HOST", "localhost");
  31. int port = Integer.parseInt(env("ACTIVEMQ_PORT", "61616"));
  32. String destination = arg(args, 0, "event");
  33.  
  34. ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://" + host + ":" + port);
  35.  
  36. Connection connection = factory.createConnection(user, password);
  37. connection.setClientID("broker1.consumer1");
  38. connection.start();
  39. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  40. Topic dest = new ActiveMQTopic(destination);
  41.  
  42. MessageConsumer consumer = session.createDurableSubscriber(dest,"SubDemo");
  43. long start = System.currentTimeMillis();
  44. long count = 1;
  45. System.out.println("Waiting for messages...");
  46. while(true) {
  47. Message msg = consumer.receive();
  48. if( msg instanceof TextMessage ) {
  49. String body = ((TextMessage) msg).getText();
  50. if( "SHUTDOWN".equals(body)) {
  51. long diff = System.currentTimeMillis() - start;
  52. System.out.println(String.format("Received %d in %.2f seconds", count, (1.0*diff/1000.0)));
  53. break;
  54. } else {
  55. if( count != msg.getIntProperty("id") ) {
  56. System.out.println("mismatch: "+count+"!="+msg.getIntProperty("id"));
  57. }
  58. count = msg.getIntProperty("id");
  59.  
  60. if( count == 0 ) {
  61. start = System.currentTimeMillis();
  62. }
  63. if( count % 1000 == 0 ) {
  64. System.out.println(String.format("Received %d messages.", count));
  65. }
  66. count ++;
  67. }
  68.  
  69. } else {
  70. System.out.println("Unexpected message type: "+msg.getClass());
  71. }
  72. }
  73. connection.close();
  74. }
  75.  
  76. private static String env(String key, String defaultValue) {
  77. String rc = System.getenv(key);
  78. if( rc== null )
  79. return defaultValue;
  80. return rc;
  81. }
  82.  
  83. private static String arg(String []args, int index, String defaultValue) {
  84. if( index < args.length )
  85. return args[index];
  86. else
  87. return defaultValue;
  88. }
  89. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement