Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- spring
- rabbitmq:
- host: rabbitmq.service.consul
- port: 5672
- virtualHost: /
- template:
- retry:
- enabled: true
- @Autowired private RabbitTemplate rt; // From RabbitAutoConfiguration
- @Bean
- public DirectExchange emailExchange() {
- return new DirectExchange("email");
- }
- public void sendEmail() {
- this.rt.send("email", "email.send", "test payload");
- }
- spring.rabbitmq.username=test
- spring.rabbitmq.password=test
- spring.rabbitmq.template.retry.enabled=true
- spring.rabbitmq.template.retry.initial-interval=1ms
- logging.level.org.springframework.retry=DEBUG
- @SpringBootApplication
- public class So49155945Application {
- public static void main(String[] args) {
- ConfigurableApplicationContext applicationContext = SpringApplication.run(So49155945Application.class, args);
- RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
- try {
- rabbitTemplate.convertAndSend("foo", "foo");
- }
- catch (AmqpException e) {
- System.err.println("Error during sending: " + e.getCause().getCause().getMessage());
- }
- }
- }
- Error during sending: ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile.
- spring.rabbitmq.publisher-confirms=true
- spring.rabbitmq.template.mandatory=true
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.rabbit.annotation.*;
- import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
- import org.springframework.cloud.endpoint.RefreshEndpoint;
- import org.springframework.messaging.MessageHeaders;
- import org.springframework.stereotype.Component;
- import static org.springframework.amqp.core.ExchangeTypes.TOPIC;
- @Component
- public class ReuathenticationListener {
- private static Logger log = LoggerFactory.getLogger(ReuathenticationListener.class);
- @Autowired private RabbitProperties rabbitProperties;
- @Autowired private RefreshEndpoint refreshEndpoint;
- @Autowired private CachingConnectionFactory connectionFactory;
- @RabbitListener(
- id = "credential_expiry_listener",
- bindings = @QueueBinding(value = @Queue(value="credentials.expiry", autoDelete="true", durable="false"),
- exchange = @Exchange(value="amq.rabbitmq.event", type=TOPIC, internal="true", durable="true"),
- key = "user.#")
- )
- public void expiryHandler(final MessageHeaders headers) {
- final String key = (String) headers.get("amqp_receivedRoutingKey");
- // See: https://www.rabbitmq.com/event-exchange.html
- if (!key.equals("user.deleted") &&
- !key.equals("user.authentication.failure")) {
- return;
- }
- final String failedName = (String) headers.get("name");
- final String prevUsername = rabbitProperties.getUsername();
- if (!failedName.equals(prevUsername)) {
- log.debug("Ignore expiry of unrelated user: " + failedName);
- return;
- }
- log.info("Refreshing Rabbit credentials...");
- refreshEndpoint.refresh();
- log.info("Refreshed username: '" + prevUsername + "' => '" + rabbitProperties.getUsername() + "'");
- connectionFactory.setUsername(rabbitProperties.getUsername());
- connectionFactory.setPassword(rabbitProperties.getPassword());
- connectionFactory.resetConnection();
- log.info("CachingConnectionFactory reset, reconnection should now begin.");
- }
- }
Add Comment
Please, Sign In to add comment