Advertisement
Guest User

Rabbit MQ Event Bus for Axon 2.0

a guest
Mar 22nd, 2013
659
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 3.15 KB | None | 0 0
  1. import java.util.Arrays;
  2. import java.util.List;
  3. import java.util.concurrent.Executors;
  4.  
  5. import org.axonframework.domain.EventMessage;
  6. import org.axonframework.eventhandling.SimpleEventBus;
  7. import org.slf4j.Logger;
  8. import org.slf4j.LoggerFactory;
  9. import org.springframework.amqp.core.AmqpAdmin;
  10. import org.springframework.amqp.core.AmqpTemplate;
  11. import org.springframework.amqp.core.AnonymousQueue;
  12. import org.springframework.amqp.core.Binding;
  13. import org.springframework.amqp.core.BindingBuilder;
  14. import org.springframework.amqp.core.Exchange;
  15. import org.springframework.amqp.core.FanoutExchange;
  16. import org.springframework.amqp.core.Message;
  17. import org.springframework.amqp.core.MessageListener;
  18. import org.springframework.amqp.core.MessageProperties;
  19. import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
  20. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  21. import org.springframework.amqp.rabbit.core.RabbitAdmin;
  22. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  23. import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
  24.  
  25. public class RabbitMQSimpleEventBus extends SimpleEventBus implements MessageListener{
  26.  
  27.     private static final Logger logger = LoggerFactory.getLogger(RabbitMQSimpleEventBus.class);
  28.    
  29.     public static final String FANOUT_EXCHANGE_NAME = "MyFanOut";
  30.    
  31.     private AmqpTemplate template;
  32.     private KryoMessgeConverter kryoMessgeConverter = new KryoMessgeConverter();
  33.    
  34.  
  35.     public RabbitMQSimpleEventBus() {
  36.         ConnectionFactory connectionFactory = new CachingConnectionFactory();
  37.         AmqpAdmin admin = new RabbitAdmin(connectionFactory);
  38.  
  39.         Exchange topicExchange = createTopicExchange();
  40.         admin.declareExchange(topicExchange);
  41.  
  42.         AnonymousQueue queue = new AnonymousQueue();
  43.         admin.declareQueue(queue);
  44.         Binding b = BindingBuilder.bind(queue).to(topicExchange).with("*").noargs();
  45.         admin.declareBinding(b);
  46.  
  47.         // receive messages async
  48.         SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
  49.         container.setConnectionFactory(connectionFactory);
  50.         container.setQueueNames(queue.getName());
  51.         container.setMessageListener(this);
  52.         container.setTaskExecutor(Executors.newCachedThreadPool());
  53.         container.start();
  54.        
  55.         template = new RabbitTemplate(connectionFactory);
  56.     }
  57.  
  58.     private Exchange createTopicExchange() {
  59.         FanoutExchange fanout = new FanoutExchange(FANOUT_EXCHANGE_NAME);
  60.         return fanout;
  61.     }
  62.  
  63.     @Override
  64.     public void publish(EventMessage... events) {
  65.         publishToRabbitMQ(events);
  66.     }
  67.    
  68.     private void publishToRabbitMQ(EventMessage[] events) {
  69.         List<EventMessage> eventAsList =  Arrays.asList(events);
  70.        
  71.         Message message = kryoMessgeConverter.toMessage(eventAsList, new MessageProperties());
  72.        
  73.         template.send(FANOUT_EXCHANGE_NAME, "*", message);
  74.        
  75.         logger.debug("send message : " + eventAsList);
  76.     }
  77.  
  78.     @Override
  79.     public void onMessage(Message message) {
  80.        
  81.         logger.debug("got message: "+ message);
  82.        
  83.         List<EventMessage> eventAsList = (List<EventMessage>) kryoMessgeConverter.fromMessage(message);
  84.        
  85.         logger.debug("decoded as: " + eventAsList);
  86.        
  87.         super.publish(eventAsList.toArray(new EventMessage[0]));
  88.     }
  89. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement