Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- private MQueue queue(QueueProps p) {
- LOG.log(INFO, "MQ: creating queue [{0}]", p.queueName);
- try {
- Channel channel = conn.createChannel();
- channel.basicQos(p.prefetchCount);
- channel.queueDeclare(p.queueName, p.durable, p.exclusive, p.autoDelete, /*args*/null);
- MQueue mQueue = new MQueue(channel, p.queueName, p.exchangeName);
- channel.basicConsume(p.queueName, p.autoAck, new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
- byte[] body) throws IOException {
- String replyTo = properties.getReplyTo();
- String routingKey = envelope.getRoutingKey();
- String correlationId = properties.getCorrelationId();
- Object traceHeader = ofNullable(properties.getHeaders()).orElse(emptyMap()).get("trace");
- Trace trace = Trace.of(routingKey, replyTo, correlationId, traceHeader);
- try {
- LOG.log(INFO, "onMessage: {0}", routingKey);
- txManager.txChecked(() -> mQueue.onMessage(new String(body, UTF8), trace));
- if (!p.autoAck) channel.basicAck(envelope.getDeliveryTag(), /*multiple*/false);
- } catch (Exception ex) {
- LOG.log(SEVERE, "", ex);
- if (!p.autoAck) {
- channel.basicNack(envelope.getDeliveryTag(), /*multiple*/false, /*requeue*/true);
- // at this point, the queue is propably clogged, so we do not hurry anywhere
- try {Thread.sleep(3000);} catch (InterruptedException ignore) {}
- }
- }
- }
- });
- return mQueue;
- } catch (IOException ex) {
- throw new UncheckedIOException(ex);
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment