Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.theeven.demo;
- import org.apache.log4j.Logger;
- import javax.jms.ExceptionListener;
- import javax.jms.JMSException;
- import javax.jms.Message;
- import javax.jms.TextMessage;
- import javax.jms.Topic;
- import javax.jms.TopicConnection;
- import javax.jms.TopicConnectionFactory;
- import javax.jms.TopicSession;
- import javax.jms.TopicSubscriber;
- import javax.naming.InitialContext;
- import java.util.concurrent.atomic.AtomicBoolean;
- /**
- * @author <a href="mailto:theeven@gmail.com">Deven Panchal</a>
- */
- public class DemoMessageConsumer implements ExceptionListener {
- private static final Logger LOG = Logger.getLogger(DemoMessageConsumer.class.getName());
- private static final long RETRY_AFTER = 5000;
- private static final int MAX_RETRIES = 10;
- private final AtomicBoolean connected = new AtomicBoolean();
- private TopicConnection connection = null;
- private TopicSubscriber receiver = null;
- private final String topicname;
- private final String selector;
- public DemoMessageConsumer(String topicname, String selector) {
- this.topicname = topicname;
- this.selector = selector;
- }
- @Override
- public void onException(JMSException jmse) {
- LOG.error(jmse.getMessage() + " : " + jmse);
- connected.set(false);
- }
- public TextMessage consume() throws Exception {
- int retries = 0;
- for (;;) {
- try {
- if (!connected.get()) {
- setUp();
- }
- Message message = receiver.receive();
- if (message != null && message instanceof TextMessage) {
- return (TextMessage) message;
- }else{
- LOG.error("not a good message. " + message);
- }
- } catch (JMSException jmse) {
- LOG.error("error while consuming message " + jmse.getMessage());
- if (connected.get() || ++retries == MAX_RETRIES) {
- throw jmse;
- }
- LOG.error("connection error.");
- pause();
- }
- }
- }
- private TopicConnectionFactory createConnectionFactory() throws JMSException {
- try {
- InitialContext iniCtx = new InitialContext();
- return (TopicConnectionFactory) iniCtx.lookup("ConnectionFactory");
- } catch (Exception che) {
- che.printStackTrace();
- JMSException jmse = new JMSException(che.getMessage());
- jmse.setLinkedException(che);
- throw jmse;
- }
- }
- private void setUp() throws Exception {
- if (connected.get() || connection != null) {
- close();
- }
- int retries = 0;
- for (; !connected.get();) {
- try {
- TopicConnectionFactory tcf = createConnectionFactory();
- connection = tcf.createTopicConnection("deven", "panchal");
- connection.setExceptionListener(this);
- connection.start();
- TopicSession session = connection.createTopicSession(false, TopicSession.CLIENT_ACKNOWLEDGE);
- Topic topic = session.createTopic(topicname);
- receiver = session.createDurableSubscriber(topic, selector);
- connected.set(true);
- } catch (JMSException jmse) {
- connected.set(false);
- close();
- if (++retries == MAX_RETRIES) {
- throw new RuntimeException("connection error. retry exhuast.", jmse);
- }
- LOG.error("connection error. retrying...");
- pause();
- }
- }
- }
- private static void pause() {
- try {
- Thread.sleep(DemoMessageConsumer.RETRY_AFTER);
- } catch (Exception ignored) {
- }
- }
- void close() {
- try {
- if (connection != null) {
- connection.close();
- }
- } catch (Exception e) {
- LOG.error(e.getMessage() + " : " + e);
- } finally {
- connected.set(false);
- connection = null;
- }
- }
- public static void main(String args[]) throws Exception {
- System.out.println("Start durable MessageConsumer");
- DemoMessageConsumer consumer = new DemoMessageConsumer("topic://topicname", "selectorstring");
- Thread consumerThread = new Thread(new Consumer(consumer));
- consumerThread.start();
- }
- public static class Consumer implements Runnable {
- final DemoMessageConsumer consumer;
- public Consumer(final DemoMessageConsumer consumer) {
- this.consumer = consumer;
- }
- @Override
- public void run() {
- while (true) {
- try {
- TextMessage message = consumer.consume();
- message.acknowledge();
- System.out.println("Received " + message.getText());
- } catch (Exception e) {
- System.out.println("Producer " + e);
- }
- }
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement