Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- @Scheduled(fixedDelayString="300000", initialDelay=1000)
- public void registerAmqpSubmitEndpoints() {
- List<Credential> credentials = credentialRepository.findByStatus(Status.ENABLED);
- SubmitMessageListener submitMessageListener = applicationContext.getBean(SubmitMessageListener.class);
- credentials.stream().filter(e->e.getBindType() != null && e.getBindType() != BindType.RECEIVER)
- .forEach( e-> {
- String queueName = "submit_sm_queue_" + e.getM360CredentialId();
- RateLimiter rateLimiter = RateLimiter.create(e.getTps());
- submitMessageListener.setRateLimiter(rateLimiter);
- if (containers.get(queueName) == null) {
- createQueueIfAbsent(queueName);
- SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
- container.setConnectionFactory(connectionFactory);
- container.setQueueNames(queueName);
- container.setConcurrentConsumers(threadsPerQueue);
- container.setMaxConcurrentConsumers(threadsPerQueue*2);
- container.setMessageListener(submitMessageListener);
- container.setTaskExecutor(executors);
- container.setDefaultRequeueRejected(false);
- containers.put(queueName, container);
- container.start();
- LOGGER.info("Registered AMQP Listener Container for {}", queueName);
- }
- });
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement