Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.highq.workflow.rabbitmq;
- import java.util.Map;
- import java.util.Map.Entry;
- import java.util.concurrent.ConcurrentHashMap;
- import org.springframework.amqp.rabbit.listener.AsyncConsumerStartedEvent;
- import org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer;
- import org.springframework.amqp.rabbit.listener.ListenerContainerConsumerTerminatedEvent;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
- import org.springframework.context.event.EventListener;
- import org.springframework.scheduling.annotation.Scheduled;
- import org.springframework.stereotype.Component;
- import com.highq.workflow.utility.RabbitMQListenerUtil;
- import lombok.extern.slf4j.Slf4j;
- /**
- * The Class RabbitmqReConnection.
- */
- @Component
- @Slf4j
- public class RabbitmqReConnection
- {
- /** The rabbit MQ listener util. */
- @Autowired
- RabbitMQListenerUtil rabbitMQListenerUtil;
- /** The queue connection track map. */
- private Map<String, Boolean> queueConnectionTrackMap = new ConcurrentHashMap<>();
- /** The queue message container map. */
- private Map<String, DirectMessageListenerContainer> queueMessageContainerMap = new ConcurrentHashMap<>();
- /**
- * On listener container consumer terminated event.
- *
- * @param event the event
- */
- @EventListener
- public void OnListenerContainerConsumerTerminatedEvent(ListenerContainerConsumerTerminatedEvent event)
- {
- DirectMessageListenerContainer directMessageListenerContainer = ((org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer) event.getSource());
- String queueName = directMessageListenerContainer.getQueueNames()[0];
- log.info(event.getReason() + " event " + event.toString() + " queuename " + queueName + " directMessageListenerContainer " + directMessageListenerContainer.toString());
- queueConnectionTrackMap.put(queueName, false);
- queueMessageContainerMap.put(queueName, directMessageListenerContainer);
- }
- /**
- * On async consumer started event.
- *
- * @param event the event
- */
- @EventListener
- public void onAsyncConsumerStartedEvent(AsyncConsumerStartedEvent event)
- {
- String consumerToString = event.getConsumer().toString();
- String queueName = consumerToString.substring(consumerToString.indexOf("queue=") + 6, consumerToString.indexOf(","));
- log.info("This is queuename start : " + queueName + event.getConsumer() + " event " + event.toString());
- queueConnectionTrackMap.put(queueName, true);
- }
- /**
- * Verify queue consumer and create if not exist.
- */
- @Scheduled(cron = "${rabbitmq.reconnect}") // 0 0/2 * * * ?
- public void verifyQueueConsumerAndCreateIfNotExist()
- {
- log.info("This Map is showing which queue is conencted : " + queueConnectionTrackMap.toString());
- if (queueConnectionTrackMap.containsValue(true))// if all queue has false value then we won't do this
- {
- for (Entry<String, Boolean> queueEntry : queueConnectionTrackMap.entrySet())
- {
- if (!queueEntry.getValue())
- {
- try
- {
- // First we will stope the previous one re connectino thread if it is still there and will create new one for this
- DirectMessageListenerContainer currentQueueMessageContainer = queueMessageContainerMap.get(queueEntry.getKey());
- currentQueueMessageContainer.stop();
- Thread.sleep(1000);
- // This will start previous container it self and stopped queue will start again without creating new listener for that queue.
- currentQueueMessageContainer.start();
- }
- catch (Exception e)
- {
- log.error(e.getMessage(), e);
- }
- }
- }
- }
- }
- }
Add Comment
Please, Sign In to add comment