Advertisement
Guest User

Untitled

a guest
Apr 28th, 2015
193
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.99 KB | None | 0 0
  1. package com.theeven.demo;
  2.  
  3. import org.apache.log4j.Logger;
  4.  
  5. import javax.jms.ExceptionListener;
  6. import javax.jms.JMSException;
  7. import javax.jms.Message;
  8. import javax.jms.TextMessage;
  9. import javax.jms.Topic;
  10. import javax.jms.TopicConnection;
  11. import javax.jms.TopicConnectionFactory;
  12. import javax.jms.TopicSession;
  13. import javax.jms.TopicSubscriber;
  14. import javax.naming.InitialContext;
  15. import java.util.concurrent.atomic.AtomicBoolean;
  16.  
  17. /**
  18. * @author <a href="mailto:theeven@gmail.com">Deven Panchal</a>
  19. */
  20. public class DemoMessageConsumer implements ExceptionListener {
  21.  
  22. private static final Logger LOG = Logger.getLogger(DemoMessageConsumer.class.getName());
  23. private static final long RETRY_AFTER = 5000;
  24. private static final int MAX_RETRIES = 10;
  25. private final AtomicBoolean connected = new AtomicBoolean();
  26. private TopicConnection connection = null;
  27. private TopicSubscriber receiver = null;
  28. private final String topicname;
  29. private final String selector;
  30.  
  31. public DemoMessageConsumer(String topicname, String selector) {
  32. this.topicname = topicname;
  33. this.selector = selector;
  34. }
  35.  
  36. @Override
  37. public void onException(JMSException jmse) {
  38. LOG.error(jmse.getMessage() + " : " + jmse);
  39. connected.set(false);
  40. }
  41.  
  42. public TextMessage consume() throws Exception {
  43. int retries = 0;
  44. for (;;) {
  45. try {
  46. if (!connected.get()) {
  47. setUp();
  48. }
  49. Message message = receiver.receive();
  50. if (message != null && message instanceof TextMessage) {
  51. return (TextMessage) message;
  52. }else{
  53. LOG.error("not a good message. " + message);
  54. }
  55. } catch (JMSException jmse) {
  56. LOG.error("error while consuming message " + jmse.getMessage());
  57. if (connected.get() || ++retries == MAX_RETRIES) {
  58. throw jmse;
  59. }
  60. LOG.error("connection error.");
  61. pause();
  62. }
  63. }
  64. }
  65.  
  66. private TopicConnectionFactory createConnectionFactory() throws JMSException {
  67. try {
  68. InitialContext iniCtx = new InitialContext();
  69. return (TopicConnectionFactory) iniCtx.lookup("ConnectionFactory");
  70. } catch (Exception che) {
  71. che.printStackTrace();
  72. JMSException jmse = new JMSException(che.getMessage());
  73. jmse.setLinkedException(che);
  74. throw jmse;
  75. }
  76. }
  77.  
  78. private void setUp() throws Exception {
  79. if (connected.get() || connection != null) {
  80. close();
  81. }
  82. int retries = 0;
  83. for (; !connected.get();) {
  84. try {
  85. TopicConnectionFactory tcf = createConnectionFactory();
  86. connection = tcf.createTopicConnection("deven", "panchal");
  87. connection.setExceptionListener(this);
  88. connection.start();
  89. TopicSession session = connection.createTopicSession(false, TopicSession.CLIENT_ACKNOWLEDGE);
  90. Topic topic = session.createTopic(topicname);
  91. receiver = session.createDurableSubscriber(topic, selector);
  92. connected.set(true);
  93. } catch (JMSException jmse) {
  94. connected.set(false);
  95. close();
  96. if (++retries == MAX_RETRIES) {
  97. throw new RuntimeException("connection error. retry exhuast.", jmse);
  98. }
  99. LOG.error("connection error. retrying...");
  100. pause();
  101. }
  102. }
  103. }
  104.  
  105. private static void pause() {
  106. try {
  107. Thread.sleep(DemoMessageConsumer.RETRY_AFTER);
  108. } catch (Exception ignored) {
  109. }
  110. }
  111.  
  112. void close() {
  113. try {
  114. if (connection != null) {
  115. connection.close();
  116. }
  117. } catch (Exception e) {
  118. LOG.error(e.getMessage() + " : " + e);
  119. } finally {
  120. connected.set(false);
  121. connection = null;
  122. }
  123. }
  124.  
  125. public static void main(String args[]) throws Exception {
  126. System.out.println("Start durable MessageConsumer");
  127. DemoMessageConsumer consumer = new DemoMessageConsumer("topic://topicname", "selectorstring");
  128. Thread consumerThread = new Thread(new Consumer(consumer));
  129. consumerThread.start();
  130. }
  131.  
  132. public static class Consumer implements Runnable {
  133. final DemoMessageConsumer consumer;
  134.  
  135. public Consumer(final DemoMessageConsumer consumer) {
  136. this.consumer = consumer;
  137. }
  138.  
  139. @Override
  140. public void run() {
  141. while (true) {
  142. try {
  143. TextMessage message = consumer.consume();
  144. message.acknowledge();
  145. System.out.println("Received " + message.getText());
  146. } catch (Exception e) {
  147. System.out.println("Producer " + e);
  148. }
  149. }
  150. }
  151. }
  152.  
  153. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement