Guest User

Re: connections and data sources

a guest
Nov 18th, 2014
234
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 1.89 KB | None | 0 0
  1. private MQueue queue(QueueProps p) {
  2.     LOG.log(INFO, "MQ: creating queue [{0}]", p.queueName);
  3.     try {
  4.         Channel channel = conn.createChannel();
  5.         channel.basicQos(p.prefetchCount);
  6.         channel.queueDeclare(p.queueName, p.durable, p.exclusive, p.autoDelete, /*args*/null);
  7.         MQueue mQueue = new MQueue(channel, p.queueName, p.exchangeName);
  8.         channel.basicConsume(p.queueName, p.autoAck, new DefaultConsumer(channel) {
  9.  
  10.             @Override
  11.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
  12.                     byte[] body) throws IOException {
  13.  
  14.                 String replyTo = properties.getReplyTo();
  15.                 String routingKey = envelope.getRoutingKey();
  16.                 String correlationId = properties.getCorrelationId();
  17.                 Object traceHeader = ofNullable(properties.getHeaders()).orElse(emptyMap()).get("trace");
  18.                 Trace trace = Trace.of(routingKey, replyTo, correlationId, traceHeader);
  19.                 try {
  20.                     LOG.log(INFO, "onMessage: {0}", routingKey);
  21.                     txManager.txChecked(() -> mQueue.onMessage(new String(body, UTF8), trace));
  22.                     if (!p.autoAck) channel.basicAck(envelope.getDeliveryTag(), /*multiple*/false);
  23.                 } catch (Exception ex) {
  24.                     LOG.log(SEVERE, "", ex);
  25.                     if (!p.autoAck) {
  26.                         channel.basicNack(envelope.getDeliveryTag(), /*multiple*/false, /*requeue*/true);
  27.                         // at this point, the queue is propably clogged, so we do not hurry anywhere
  28.                         try {Thread.sleep(3000);} catch (InterruptedException ignore) {}
  29.                     }
  30.                 }
  31.             }
  32.         });
  33.         return mQueue;
  34.     } catch (IOException ex) {
  35.         throw new UncheckedIOException(ex);
  36.     }
  37. }
Advertisement
Add Comment
Please, Sign In to add comment