Guest User

RabbitMq Reconnection

a guest
May 14th, 2019
406
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 3.63 KB | None | 0 0
  1. package com.highq.workflow.rabbitmq;
  2.  
  3. import java.util.Map;
  4. import java.util.Map.Entry;
  5. import java.util.concurrent.ConcurrentHashMap;
  6. import org.springframework.amqp.rabbit.listener.AsyncConsumerStartedEvent;
  7. import org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer;
  8. import org.springframework.amqp.rabbit.listener.ListenerContainerConsumerTerminatedEvent;
  9. import org.springframework.beans.factory.annotation.Autowired;
  10. import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
  11. import org.springframework.context.event.EventListener;
  12. import org.springframework.scheduling.annotation.Scheduled;
  13. import org.springframework.stereotype.Component;
  14. import com.highq.workflow.utility.RabbitMQListenerUtil;
  15. import lombok.extern.slf4j.Slf4j;
  16.  
  17. /**
  18.  * The Class RabbitmqReConnection.
  19.  */
  20. @Component
  21. @Slf4j
  22. public class RabbitmqReConnection
  23. {
  24.  
  25.     /** The rabbit MQ listener util. */
  26.     @Autowired
  27.     RabbitMQListenerUtil rabbitMQListenerUtil;
  28.  
  29.     /** The queue connection track map. */
  30.     private Map<String, Boolean> queueConnectionTrackMap = new ConcurrentHashMap<>();
  31.  
  32.     /** The queue message container map. */
  33.     private Map<String, DirectMessageListenerContainer> queueMessageContainerMap = new ConcurrentHashMap<>();
  34.  
  35.     /**
  36.      * On listener container consumer terminated event.
  37.      *
  38.      * @param event the event
  39.      */
  40.     @EventListener
  41.     public void OnListenerContainerConsumerTerminatedEvent(ListenerContainerConsumerTerminatedEvent event)
  42.     {
  43.         DirectMessageListenerContainer directMessageListenerContainer = ((org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer) event.getSource());
  44.         String queueName = directMessageListenerContainer.getQueueNames()[0];
  45.         log.info(event.getReason() + " event " + event.toString() + " queuename " + queueName + " directMessageListenerContainer " + directMessageListenerContainer.toString());
  46.  
  47.         queueConnectionTrackMap.put(queueName, false);
  48.         queueMessageContainerMap.put(queueName, directMessageListenerContainer);
  49.     }
  50.  
  51.     /**
  52.      * On async consumer started event.
  53.      *
  54.      * @param event the event
  55.      */
  56.     @EventListener
  57.     public void onAsyncConsumerStartedEvent(AsyncConsumerStartedEvent event)
  58.     {
  59.         String consumerToString = event.getConsumer().toString();
  60.         String queueName = consumerToString.substring(consumerToString.indexOf("queue=") + 6, consumerToString.indexOf(","));
  61.         log.info("This is queuename start : " + queueName + event.getConsumer() + " event " + event.toString());
  62.  
  63.         queueConnectionTrackMap.put(queueName, true);
  64.     }
  65.  
  66.     /**
  67.      * Verify queue consumer and create if not exist.
  68.      */
  69.     @Scheduled(cron = "${rabbitmq.reconnect}") // 0 0/2 * * * ?
  70.     public void verifyQueueConsumerAndCreateIfNotExist()
  71.     {
  72.         log.info("This Map is showing which queue is conencted : " + queueConnectionTrackMap.toString());
  73.  
  74.         if (queueConnectionTrackMap.containsValue(true))// if all queue has false value then we won't do this
  75.         {
  76.             for (Entry<String, Boolean> queueEntry : queueConnectionTrackMap.entrySet())
  77.             {
  78.                 if (!queueEntry.getValue())
  79.                 {
  80.                     try
  81.                     {
  82.                         // First we will stope the previous one re connectino thread if it is still there and will create new one for this
  83.                         DirectMessageListenerContainer currentQueueMessageContainer = queueMessageContainerMap.get(queueEntry.getKey());
  84.                         currentQueueMessageContainer.stop();
  85.                         Thread.sleep(1000);
  86.                         // This will start previous container it self and stopped queue will start again without creating new listener for that queue.
  87.                         currentQueueMessageContainer.start();
  88.                     }
  89.                     catch (Exception e)
  90.                     {
  91.                         log.error(e.getMessage(), e);
  92.                     }
  93.                 }
  94.             }
  95.         }
  96.     }
  97. }
Add Comment
Please, Sign In to add comment