Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import java.util.Arrays;
- import java.util.List;
- import java.util.concurrent.Executors;
- import org.axonframework.domain.EventMessage;
- import org.axonframework.eventhandling.SimpleEventBus;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.core.AmqpAdmin;
- import org.springframework.amqp.core.AmqpTemplate;
- import org.springframework.amqp.core.AnonymousQueue;
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.Exchange;
- import org.springframework.amqp.core.FanoutExchange;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.core.MessageListener;
- import org.springframework.amqp.core.MessageProperties;
- import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
- import org.springframework.amqp.rabbit.connection.ConnectionFactory;
- import org.springframework.amqp.rabbit.core.RabbitAdmin;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
- public class RabbitMQSimpleEventBus extends SimpleEventBus implements MessageListener{
- private static final Logger logger = LoggerFactory.getLogger(RabbitMQSimpleEventBus.class);
- public static final String FANOUT_EXCHANGE_NAME = "MyFanOut";
- private AmqpTemplate template;
- private KryoMessgeConverter kryoMessgeConverter = new KryoMessgeConverter();
- public RabbitMQSimpleEventBus() {
- ConnectionFactory connectionFactory = new CachingConnectionFactory();
- AmqpAdmin admin = new RabbitAdmin(connectionFactory);
- Exchange topicExchange = createTopicExchange();
- admin.declareExchange(topicExchange);
- AnonymousQueue queue = new AnonymousQueue();
- admin.declareQueue(queue);
- Binding b = BindingBuilder.bind(queue).to(topicExchange).with("*").noargs();
- admin.declareBinding(b);
- // receive messages async
- SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
- container.setConnectionFactory(connectionFactory);
- container.setQueueNames(queue.getName());
- container.setMessageListener(this);
- container.setTaskExecutor(Executors.newCachedThreadPool());
- container.start();
- template = new RabbitTemplate(connectionFactory);
- }
- private Exchange createTopicExchange() {
- FanoutExchange fanout = new FanoutExchange(FANOUT_EXCHANGE_NAME);
- return fanout;
- }
- @Override
- public void publish(EventMessage... events) {
- publishToRabbitMQ(events);
- }
- private void publishToRabbitMQ(EventMessage[] events) {
- List<EventMessage> eventAsList = Arrays.asList(events);
- Message message = kryoMessgeConverter.toMessage(eventAsList, new MessageProperties());
- template.send(FANOUT_EXCHANGE_NAME, "*", message);
- logger.debug("send message : " + eventAsList);
- }
- @Override
- public void onMessage(Message message) {
- logger.debug("got message: "+ message);
- List<EventMessage> eventAsList = (List<EventMessage>) kryoMessgeConverter.fromMessage(message);
- logger.debug("decoded as: " + eventAsList);
- super.publish(eventAsList.toArray(new EventMessage[0]));
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement