Guest User

Untitled

a guest
Sep 20th, 2018
63
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 7.01 KB | None | 0 0
  1. @Configuration
  2. @IntegrationComponentScan
  3. public class RabbitConfig {
  4.  
  5. @Autowired // TODO constructor!
  6. private ConnectionFactory connectionFactory;
  7.  
  8. public RabbitConfig(
  9. @Value("${article.inbound.queue}") String queueName,
  10. @Value("${article.inbound.exchange}") String exchangeName,
  11. @Value("${article.inbound.routingkey}") String routingKey) {
  12. this.queueName = queueName;
  13. this.exchangeName = exchangeName;
  14. this.routingKey = routingKey;
  15. }
  16.  
  17. @Bean
  18. Exchange exchange() {
  19. return ExchangeBuilder
  20. .topicExchange(this.exchangeName)
  21. .durable(true)
  22. .build();
  23. }
  24.  
  25. @Bean
  26. Queue queue() {
  27. return QueueBuilder.durable(queueName).build();
  28. }
  29.  
  30. @Bean
  31. Binding binding() {
  32. return BindingBuilder.bind(queue())
  33. .to(exchange())
  34. .with(routingKey)
  35. .noargs();
  36. }
  37.  
  38. @Bean
  39. public MessageConverter jsonMessageConverter() {
  40. return new Jackson2JsonMessageConverter();
  41. }
  42.  
  43. @Bean
  44. public SimpleMessageListenerContainer articleListenerContainer(
  45. ConnectionFactory connectionFactory) {
  46. SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
  47. container.setQueues(queue());
  48. container.setMessageConverter(jsonMessageConverter());
  49. return container;
  50. }
  51.  
  52. @Bean
  53. IntegrationFlow fromMessageBroker(SimpleMessageListenerContainer messageListener) {
  54. return IntegrationFlows.from(Amqp.inboundAdapter(messageListener))
  55. .log()
  56. .handle(message -> {
  57. final MessageHeaders headers = message.getHeaders();
  58. final Object assetId = headers.get("assetId");
  59. log.info(assetId);
  60. })
  61. .get();
  62. }
  63.  
  64. 2018-09-20 13:11:08.240 INFO 49400 --- [ main] c.p.ftppush.article.MessageConsumer : Started MessageConsumer in 1.743 seconds (JVM running for 2.386)
  65. 2018-09-20 13:11:12.309 INFO 49400 --- [erContainer#0-1] o.s.integration.handler.LoggingHandler : GenericMessage [payload=byte[0], headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=article.original.orgn.123, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1b7db656, amqp_receivedExchange=que.article.content.pf.normal.trigger, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1b7db656, amqp_deliveryTag=1, assetId=1qh22m9027k6d1jz29tsi510x5, amqp_consumerQueue=exc.article.content, amqp_redelivered=false, id=0d04f22a-991f-000b-63f5-5cb087b915ab, amqp_consumerTag=amq.ctag-bK2-KaIxWpk57HJeQ_38AQ, timestamp=1537441872308}]
  66. 2018-09-20 13:11:12.310 INFO 49400 --- [erContainer#0-1] com.perform.ftppush.article.Flow : 1qh22m9027k6d1jz29tsi510x5
  67. 2018-09-20 13:11:12.317 ERROR 49400 --- [erContainer#0-1] o.s.a.r.l.SimpleMessageListenerContainer : Consumer thread error, thread abort.
  68.  
  69. java.lang.AbstractMethodError: org.springframework.integration.channel.interceptor.WireTap.postSend(Lorg/springframework/messaging/Message;Lorg/springframework/messaging/MessageChannel;Z)V
  70. at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.postSend(AbstractMessageChannel.java:607) ~[spring-integration-core-5.0.6.RELEASE.jar:5.0.6.RELEASE]
  71. at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:460) ~[spring-integration-core-5.0.6.RELEASE.jar:5.0.6.RELEASE]
  72. at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181) ~[spring-messaging-5.0.5.RELEASE.jar:5.0.5.RELEASE]
  73. at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:227) ~[spring-messaging-5.0.5.RELEASE.jar:5.0.5.RELEASE]
  74. at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:47) ~[spring-messaging-5.0.5.RELEASE.jar:5.0.5.RELEASE]
  75. at org.springframework.messaging.core.AbstractMessagingTemplate.sendAndReceive(AbstractMessagingTemplate.java:45) ~[spring-messaging-5.0.5.RELEASE.jar:5.0.5.RELEASE]
  76. at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:97) ~[spring-integration-core-5.0.6.RELEASE.jar:5.0.6.RELEASE]
  77. at org.springframework.integration.gateway.MessagingGatewaySupport.doSendAndReceive(MessagingGatewaySupport.java:497) ~[spring-integration-core-5.0.6.RELEASE.jar:5.0.6.RELEASE]
  78. at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceiveMessage(MessagingGatewaySupport.java:465) ~[spring-integration-core-5.0.6.RELEASE.jar:5.0.6.RELEASE]
  79. at org.springframework.integration.amqp.inbound.AmqpInboundGateway.access$1000(AmqpInboundGateway.java:66) ~[spring-integration-amqp-5.0.5.RELEASE.jar:5.0.5.RELEASE]
  80. at org.springframework.integration.amqp.inbound.AmqpInboundGateway$Listener.process(AmqpInboundGateway.java:315) ~[spring-integration-amqp-5.0.5.RELEASE.jar:5.0.5.RELEASE]
  81. at org.springframework.integration.amqp.inbound.AmqpInboundGateway$Listener.onMessage(AmqpInboundGateway.java:263) ~[spring-integration-amqp-5.0.5.RELEASE.jar:5.0.5.RELEASE]
  82. at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1414) ~[spring-rabbit-2.0.4.RELEASE.jar:2.0.4.RELEASE]
  83. at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1337) ~[spring-rabbit-2.0.4.RELEASE.jar:2.0.4.RELEASE]
  84. at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1324) ~[spring-rabbit-2.0.4.RELEASE.jar:2.0.4.RELEASE]
  85. at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1303) ~[spring-rabbit-2.0.4.RELEASE.jar:2.0.4.RELEASE]
  86. at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:817) ~[spring-rabbit-2.0.4.RELEASE.jar:2.0.4.RELEASE]
  87. at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:801) ~[spring-rabbit-2.0.4.RELEASE.jar:2.0.4.RELEASE]
  88. at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:77) ~[spring-rabbit-2.0.4.RELEASE.jar:2.0.4.RELEASE]
  89. at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1042) ~[spring-rabbit-2.0.4.RELEASE.jar:2.0.4.RELEASE]
  90. at java.base/java.lang.Thread.run(Thread.java:844) [na:na]
  91.  
  92. 2018-09-20 13:11:12.322 ERROR 49400 --- [erContainer#0-1] o.s.a.r.l.SimpleMessageListenerContainer : Stopping container from aborted consumer
  93. 2018-09-20 13:11:12.322 INFO 49400 --- [erContainer#0-1] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish.
  94. 2018-09-20 13:11:12.322 INFO 49400 --- [erContainer#0-1] o.s.a.r.l.SimpleMessageListenerContainer : Successfully waited for workers to finish.
Add Comment
Please, Sign In to add comment