Advertisement
Guest User

RabbitMQ consumer

a guest
Jul 23rd, 2019
271
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 8.53 KB | None | 0 0
  1. package hr.sedamit.eboxprocessor.rabbitmq;
  2.  
  3. import com.rabbitmq.client.*;
  4.  
  5. import org.apache.log4j.Logger;
  6.  
  7. import java.io.IOException;
  8. import java.util.UUID;
  9. import java.util.concurrent.TimeoutException;
  10.  
  11. /**
  12.  * Used for consuming and acknowledging messages from defined queue.
  13.  *
  14.  */
  15. public class Consumer {
  16.     private final static Logger logger = Logger.getLogger(Consumer.class);
  17.     // Maximum number of messages that can be on the consumer at a time
  18.     private static int prefetchCount = 1;
  19.    
  20.     // Internal enum which contains queue names and their exchange keys
  21.     private Queue queue;
  22.     private Channel channel;
  23.     private String consumerTag;
  24.     private String uuid = UUID.randomUUID().toString();
  25.     private boolean subscribed = false;
  26.     private DeliverCallback deliverCallback = this::handleDeliver;
  27.     private CancelCallback cancelCallback = this::handleCancel;
  28.     private ConsumerShutdownSignalCallback consumerShutdownSignalCallback = this::handleShutdown;
  29.  
  30.     /**
  31.      * The constructors sets the channel to RabbitMQ broker for the specified queue.
  32.      * Callback for events are set to their default implementation.
  33.      *
  34.      * @param queue RabbitMQ queue - this consumer will be assigned to this queue and will only be able to consume from it.
  35.      * @see #setDeliverCallback(DeliverCallback)
  36.      * @see #setCancelCallback(CancelCallback)
  37.      * @see #setConsumerShutdownSignalCallback(ConsumerShutdownSignalCallback)
  38.      */
  39.     public Consumer(Queue queue) {
  40.         this.queue = queue;
  41.  
  42.         try {
  43.             setUpChannel();
  44.  
  45.         } catch (IOException e) {
  46.             e.printStackTrace();
  47.  
  48.         }
  49.     }
  50.  
  51.     public Class getEntityClass() {
  52.         return Queue.getEntityClassForQueue(queue);
  53.     }
  54.  
  55.     public String getUuid() {
  56.         return uuid;
  57.     }
  58.  
  59.     public boolean isSubscribed() {
  60.         return subscribed;
  61.     }
  62.  
  63.     public DeliverCallback getDeliverCallback() {
  64.         return deliverCallback;
  65.     }
  66.  
  67.     public void setDeliverCallback(DeliverCallback deliverCallback) {
  68.         this.deliverCallback = deliverCallback;
  69.     }
  70.  
  71.     public CancelCallback getCancelCallback() {
  72.         return cancelCallback;
  73.     }
  74.  
  75.     public void setCancelCallback(CancelCallback cancelCallback) {
  76.         this.cancelCallback = cancelCallback;
  77.     }
  78.  
  79.     public ConsumerShutdownSignalCallback getConsumerShutdownSignalCallback() {
  80.         return consumerShutdownSignalCallback;
  81.     }
  82.  
  83.     public void setConsumerShutdownSignalCallback(ConsumerShutdownSignalCallback consumerShutdownSignalCallback) {
  84.         this.consumerShutdownSignalCallback = consumerShutdownSignalCallback;
  85.     }
  86.  
  87.  
  88.     /**
  89.      * <p>
  90.      * Subscribes to the set queue. The subscription can be cancelled using
  91.      * Checks if the queue is set up properly.
  92.      * </p>
  93.      * <p>
  94.      * Note: this is a non-blocking operation. The client will listen for incoming messages and handle them using
  95.      * the provided DeliveryCallback function but the execution of this operation will be on another thread.
  96.      * </p>
  97.      *
  98.      * @throws IOException if I/O problem is encountered.
  99.      */
  100.     public void subscribeToQueue() throws IOException {
  101.         if (channel != null) {
  102.             consumerTag = channel.basicConsume(
  103.                     queue.getQueueName(),
  104.                     deliverCallback,
  105.                     cancelCallback,
  106.                     consumerShutdownSignalCallback
  107.             );
  108.             subscribed = true;
  109.  
  110.         } else {
  111.             logger.error("Channel does not exist. Unable to consume message.");
  112.  
  113.         }
  114.     }
  115.  
  116.     /**
  117.      * Confirms the message has been successfully processed.
  118.      *
  119.      * @param deliveryTag Unique message tag generated by the server.
  120.      * @throws IOException if I/O problem is encountered.
  121.      */
  122.     public void acknowledgeMessageReceived(long deliveryTag) throws IOException {
  123.         if (channel != null) {
  124.             channel.basicAck(deliveryTag, false);
  125.  
  126.         } else {
  127.             logger.error("Channel does not exist. Unable to acknowledge message delivery.");
  128.  
  129.         }
  130.     }
  131.  
  132.     /**
  133.      * Sends a negative acknowledgement to RabbitMQ without re-queueing the message.
  134.      *
  135.      * @param deliveryTag Unique message tag generated by the server.
  136.      * @throws IOException if I/O problem is encountered.
  137.      */
  138.     public void rejectMessage(long deliveryTag) throws IOException {
  139.         if (channel != null) {
  140.             channel.basicReject(deliveryTag, false);
  141.  
  142.         } else {
  143.             logger.error("Channel does not exist. Unable to reject message delivery.");
  144.  
  145.         }
  146.     }
  147.  
  148.     /**
  149.      * Cancels consumer subscription to the queue.
  150.      * The consumer can be used for acknowledging messages, but will not receive new messages.
  151.      * This does not close the underlying channel. To close the channel use closeChannel() method.
  152.      *
  153.      * @throws IOException
  154.      * @see #subscribeToQueue()
  155.      * @see #closeChannel()
  156.      */
  157.     public void cancelSubscription() throws IOException {
  158.         if (channel != null) {
  159.             channel.basicCancel(this.consumerTag);
  160.             subscribed = false;
  161.  
  162.         } else {
  163.             logger.error("Channel does not exist. Unable to cancel consumer subscription.");
  164.         }
  165.     }
  166.  
  167.     /**
  168.      * Explicitly closes channel to the queue.
  169.      * After doing this you will not be able to use any of the methods of this class.
  170.      *
  171.      * @throws IOException      if I/O problem is encountered.
  172.      * @throws TimeoutException if connection problem occurs.
  173.      */
  174.     public void closeChannel() throws IOException, TimeoutException {
  175.         if (channel != null) {
  176.             channel.close();
  177.             channel = null;
  178.             logger.info("Closing RabbitMQ consumer channel...");
  179.  
  180.         } else {
  181.             logger.error("Channel already closed.");
  182.  
  183.         }
  184.     }
  185.  
  186.     /**
  187.      * Checks if the queue exists and creates the channel.
  188.      * If the queue does not exist channel is set to null and cannot be used.
  189.      *
  190.      * @throws IOException if I/O problem is encountered.
  191.      */
  192.     private void setUpChannel() throws IOException {
  193.  
  194.         channel = ChannelFactory.getInstance().createChannel();
  195.         try {
  196.             channel.queueDeclarePassive(queue.getQueueName());
  197.             channel.basicQos(prefetchCount);
  198.  
  199.         } catch (IOException e) {
  200.             // When this exception occurs it renders the channel unusable so it's best set to null.
  201.             channel = null;
  202.  
  203.             logger.error(String.format("Queue %s does not exist [%s]", queue.getQueueName(), e.getMessage()));
  204.             e.printStackTrace();
  205.  
  206.         }
  207.         logger.info("Setting up RabbitMQ consumer channel. Channel successfully initialized: " + (channel != null));
  208.     }
  209.  
  210.     /**
  211.      * Callback called when a message is delivered to the client.
  212.      * Default implementation. Callback acknowledges message received and does nothing with it.
  213.      * To use custom implementation use setDeliverCallback method.
  214.      *
  215.      * @param consumerTag The consumer tag associated with the consumer.
  216.      * @param message     Message object.
  217.      * @see #setDeliverCallback(DeliverCallback)
  218.      */
  219.     private void handleDeliver(String consumerTag, Delivery message) {
  220.         Envelope envelope = message.getEnvelope();
  221.         long deliveryTag = envelope.getDeliveryTag();
  222.  
  223.         logger.info("Message delivered: " + deliveryTag);
  224.  
  225.         try {
  226.             channel.basicAck(deliveryTag, false);
  227.  
  228.         } catch (IOException e) {
  229.             e.printStackTrace();
  230.  
  231.         }
  232.     }
  233.  
  234.     /**
  235.      * Callback called when a service is cancelled.
  236.      * Default implementation. To use custom implementation specify it in the constructor.
  237.      *
  238.      * @param consumerTag The consumer tag associated with the consumer.
  239.      */
  240.     private void handleCancel(String consumerTag) {
  241.         logger.info("Consumer (" + consumerTag + ") cancelled: ");
  242.     }
  243.  
  244.     /**
  245.      * Called when the consumer is abruptly shutdown due to termination of the underlying connection or channel.
  246.      * Default implementation. To use custom implementation specify it in the constructor.
  247.      *
  248.      * @param consumerTag The consumer tag associated with the consumer.
  249.      * @param exception   Shutdown reason.
  250.      */
  251.     private void handleShutdown(String consumerTag, ShutdownSignalException exception) {
  252.         logger.info(String.format("Consumer (%s) shutdown. Reason: %s", consumerTag, exception.getMessage()));
  253.         logger.info(exception);
  254.     }
  255. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement