Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package hr.sedamit.eboxprocessor.rabbitmq;
- import com.rabbitmq.client.*;
- import org.apache.log4j.Logger;
- import java.io.IOException;
- import java.util.UUID;
- import java.util.concurrent.TimeoutException;
- /**
- * Used for consuming and acknowledging messages from defined queue.
- *
- */
- public class Consumer {
- private final static Logger logger = Logger.getLogger(Consumer.class);
- // Maximum number of messages that can be on the consumer at a time
- private static int prefetchCount = 1;
- // Internal enum which contains queue names and their exchange keys
- private Queue queue;
- private Channel channel;
- private String consumerTag;
- private String uuid = UUID.randomUUID().toString();
- private boolean subscribed = false;
- private DeliverCallback deliverCallback = this::handleDeliver;
- private CancelCallback cancelCallback = this::handleCancel;
- private ConsumerShutdownSignalCallback consumerShutdownSignalCallback = this::handleShutdown;
- /**
- * The constructors sets the channel to RabbitMQ broker for the specified queue.
- * Callback for events are set to their default implementation.
- *
- * @param queue RabbitMQ queue - this consumer will be assigned to this queue and will only be able to consume from it.
- * @see #setDeliverCallback(DeliverCallback)
- * @see #setCancelCallback(CancelCallback)
- * @see #setConsumerShutdownSignalCallback(ConsumerShutdownSignalCallback)
- */
- public Consumer(Queue queue) {
- this.queue = queue;
- try {
- setUpChannel();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- public Class getEntityClass() {
- return Queue.getEntityClassForQueue(queue);
- }
- public String getUuid() {
- return uuid;
- }
- public boolean isSubscribed() {
- return subscribed;
- }
- public DeliverCallback getDeliverCallback() {
- return deliverCallback;
- }
- public void setDeliverCallback(DeliverCallback deliverCallback) {
- this.deliverCallback = deliverCallback;
- }
- public CancelCallback getCancelCallback() {
- return cancelCallback;
- }
- public void setCancelCallback(CancelCallback cancelCallback) {
- this.cancelCallback = cancelCallback;
- }
- public ConsumerShutdownSignalCallback getConsumerShutdownSignalCallback() {
- return consumerShutdownSignalCallback;
- }
- public void setConsumerShutdownSignalCallback(ConsumerShutdownSignalCallback consumerShutdownSignalCallback) {
- this.consumerShutdownSignalCallback = consumerShutdownSignalCallback;
- }
- /**
- * <p>
- * Subscribes to the set queue. The subscription can be cancelled using
- * Checks if the queue is set up properly.
- * </p>
- * <p>
- * Note: this is a non-blocking operation. The client will listen for incoming messages and handle them using
- * the provided DeliveryCallback function but the execution of this operation will be on another thread.
- * </p>
- *
- * @throws IOException if I/O problem is encountered.
- */
- public void subscribeToQueue() throws IOException {
- if (channel != null) {
- consumerTag = channel.basicConsume(
- queue.getQueueName(),
- deliverCallback,
- cancelCallback,
- consumerShutdownSignalCallback
- );
- subscribed = true;
- } else {
- logger.error("Channel does not exist. Unable to consume message.");
- }
- }
- /**
- * Confirms the message has been successfully processed.
- *
- * @param deliveryTag Unique message tag generated by the server.
- * @throws IOException if I/O problem is encountered.
- */
- public void acknowledgeMessageReceived(long deliveryTag) throws IOException {
- if (channel != null) {
- channel.basicAck(deliveryTag, false);
- } else {
- logger.error("Channel does not exist. Unable to acknowledge message delivery.");
- }
- }
- /**
- * Sends a negative acknowledgement to RabbitMQ without re-queueing the message.
- *
- * @param deliveryTag Unique message tag generated by the server.
- * @throws IOException if I/O problem is encountered.
- */
- public void rejectMessage(long deliveryTag) throws IOException {
- if (channel != null) {
- channel.basicReject(deliveryTag, false);
- } else {
- logger.error("Channel does not exist. Unable to reject message delivery.");
- }
- }
- /**
- * Cancels consumer subscription to the queue.
- * The consumer can be used for acknowledging messages, but will not receive new messages.
- * This does not close the underlying channel. To close the channel use closeChannel() method.
- *
- * @throws IOException
- * @see #subscribeToQueue()
- * @see #closeChannel()
- */
- public void cancelSubscription() throws IOException {
- if (channel != null) {
- channel.basicCancel(this.consumerTag);
- subscribed = false;
- } else {
- logger.error("Channel does not exist. Unable to cancel consumer subscription.");
- }
- }
- /**
- * Explicitly closes channel to the queue.
- * After doing this you will not be able to use any of the methods of this class.
- *
- * @throws IOException if I/O problem is encountered.
- * @throws TimeoutException if connection problem occurs.
- */
- public void closeChannel() throws IOException, TimeoutException {
- if (channel != null) {
- channel.close();
- channel = null;
- logger.info("Closing RabbitMQ consumer channel...");
- } else {
- logger.error("Channel already closed.");
- }
- }
- /**
- * Checks if the queue exists and creates the channel.
- * If the queue does not exist channel is set to null and cannot be used.
- *
- * @throws IOException if I/O problem is encountered.
- */
- private void setUpChannel() throws IOException {
- channel = ChannelFactory.getInstance().createChannel();
- try {
- channel.queueDeclarePassive(queue.getQueueName());
- channel.basicQos(prefetchCount);
- } catch (IOException e) {
- // When this exception occurs it renders the channel unusable so it's best set to null.
- channel = null;
- logger.error(String.format("Queue %s does not exist [%s]", queue.getQueueName(), e.getMessage()));
- e.printStackTrace();
- }
- logger.info("Setting up RabbitMQ consumer channel. Channel successfully initialized: " + (channel != null));
- }
- /**
- * Callback called when a message is delivered to the client.
- * Default implementation. Callback acknowledges message received and does nothing with it.
- * To use custom implementation use setDeliverCallback method.
- *
- * @param consumerTag The consumer tag associated with the consumer.
- * @param message Message object.
- * @see #setDeliverCallback(DeliverCallback)
- */
- private void handleDeliver(String consumerTag, Delivery message) {
- Envelope envelope = message.getEnvelope();
- long deliveryTag = envelope.getDeliveryTag();
- logger.info("Message delivered: " + deliveryTag);
- try {
- channel.basicAck(deliveryTag, false);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- /**
- * Callback called when a service is cancelled.
- * Default implementation. To use custom implementation specify it in the constructor.
- *
- * @param consumerTag The consumer tag associated with the consumer.
- */
- private void handleCancel(String consumerTag) {
- logger.info("Consumer (" + consumerTag + ") cancelled: ");
- }
- /**
- * Called when the consumer is abruptly shutdown due to termination of the underlying connection or channel.
- * Default implementation. To use custom implementation specify it in the constructor.
- *
- * @param consumerTag The consumer tag associated with the consumer.
- * @param exception Shutdown reason.
- */
- private void handleShutdown(String consumerTag, ShutdownSignalException exception) {
- logger.info(String.format("Consumer (%s) shutdown. Reason: %s", consumerTag, exception.getMessage()));
- logger.info(exception);
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement