Advertisement
Guest User

Untitled

a guest
Oct 27th, 2016
94
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 7.45 KB | None | 0 0
  1. /*
  2.  * To change this license header, choose License Headers in Project Properties.
  3.  * To change this template file, choose Tools | Templates
  4.  * and open the template in the editor.
  5.  */
  6. package activeMQ_exercici2;
  7.  
  8. import java.util.HashMap;
  9. import java.util.Map;
  10. import org.apache.activemq.ActiveMQConnection;
  11. import org.apache.activemq.ActiveMQConnectionFactory;
  12.  
  13. import javax.jms.*;
  14. import java.util.Random;
  15.  
  16. public class Exercici2 {
  17.    
  18.     public static void main(String[] args) throws JMSException, InterruptedException {
  19.  
  20.         Producer producer = new Producer();
  21.         producer.sendMessages("iam36580056.cua.2WIAM.exercici5.exercici7");
  22.        
  23.         Producer producer2 = new Producer();
  24.         producer2.sendMessages("iam36580056.cua.2WIAM.exercici4");
  25.        
  26.         Producer producer3 = new Producer();
  27.         producer3.sendMessages("iam36580056.cua.1WIAM.exercici20");
  28.  
  29.         Consumer consumer = new Consumer();
  30.         consumer.processMessages("iam36580056.cua.2WIAM.>");
  31.     }
  32. }
  33.  
  34. class Producer {
  35.    
  36.     public enum UserAction {
  37.  
  38.         RAMON("M06 - PERSITENCIA"),
  39.         GENIS("M03 - JAVA"),
  40.         PEP("M08 - ANDROID"),
  41.         VAL("M05 - ENTORNS");
  42.  
  43.         private final String userAction;
  44.  
  45.         private UserAction(String userAction) {
  46.             this.userAction = userAction;
  47.         }
  48.  
  49.         public String getActionAsString() {
  50.             return this.userAction;
  51.         }
  52.     }
  53.  
  54.     private static final Random RANDOM = new Random(System.currentTimeMillis());
  55.  
  56.     //private static final String URL = "tcp://192.168.2.59:61616";
  57.     private static final String URL = "tcp://localhost:61616";
  58.  
  59.     private static final String USER = ActiveMQConnection.DEFAULT_USER;
  60.  
  61.     private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
  62.  
  63.     private static final String DESTINATION_QUEUE = "iam36580056.cua.2WIAM.exercici5.";
  64.     //private static final String DESTINATION_TOPIC = "iam36580056-topic-2WIAM";
  65.  
  66.     private static final boolean TRANSACTED_SESSION = true;
  67.    
  68.     private static final int MESSAGES_TO_SEND = 100;
  69.  
  70.     public void sendMessages(String queue) throws JMSException, InterruptedException {
  71.  
  72.         // Crear una ConectionFactory
  73.         final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USER, PASSWORD, URL);
  74.        
  75.         // Crear una conexión nueva
  76.         Connection connection = connectionFactory.createConnection();
  77.         connection.start();
  78.  
  79.         // Crear la sesión
  80.         final Session session = connection.createSession(TRANSACTED_SESSION, Session.AUTO_ACKNOWLEDGE);
  81.        
  82.         // Crear la destinacion de los mensajes (Cola o Topic) en esta sesión
  83.         final Destination destination = session.createQueue(queue);
  84.         //final Destination destination = session.createTopic(DESTINATION_TOPIC);
  85.  
  86.         // Crear un MessageProducer que llevará los mensajes a los topics o colas
  87.         final MessageProducer producer = session.createProducer(destination);
  88.         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
  89.  
  90.         // Finalmente mandar los mensajes al broker
  91.         sendMessages(session, producer);
  92.         session.commit();
  93.  
  94.         // Cerrar sesión y conexión una vez se han mandado los mensajes
  95.         session.close();
  96.         connection.close();
  97.  
  98.         System.out.println("Tots els missatges enviats correctament");
  99.     }
  100.  
  101.     private void sendMessages(Session session, MessageProducer producer) throws JMSException, InterruptedException {
  102.         final Producer messageSender = new Producer();
  103.         for (int i = 1; i <= MESSAGES_TO_SEND; i++) {
  104.             final UserAction userActionToSend = getRandomUserAction();
  105.             // Enviar un texto cada 5 segundos
  106.             Thread.sleep(10);
  107.             //Thread.sleep(5000);
  108.             messageSender.sendMessage(userActionToSend.getActionAsString(), session, producer);
  109.             System.out.println("Missatge "+i+" enviat correctament");
  110.         }
  111.     }
  112.  
  113.     private void sendMessage(String message, Session session, MessageProducer producer) throws JMSException {
  114.         final TextMessage textMessage = session.createTextMessage(message);
  115.         producer.send(textMessage);
  116.     }
  117.  
  118.     // Crear una accion de forma aleatoria
  119.     private static UserAction getRandomUserAction() {
  120.         final int userActionNumber = (int) (RANDOM.nextFloat() * UserAction.values().length);
  121.         return UserAction.values()[userActionNumber];
  122.     }
  123. }
  124.  
  125. class Consumer {
  126.  
  127.     private static final String URL = "tcp://localhost:61616";
  128.  
  129.     private static final String USER = ActiveMQConnection.DEFAULT_USER;
  130.  
  131.     private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
  132.  
  133.     private static final String DESTINATION_QUEUE = "iam36580056-cua-2WIAM-exercici5";
  134.     //private static final String DESTINATION_TOPIC = "iam36580056-topic-2WIAM";
  135.  
  136.     private static final boolean TRANSACTED_SESSION = false;
  137.  
  138.     private static final int TIMEOUT = 1000;
  139.  
  140.     private final Map<String, Integer> consumedMessageTypes;
  141.  
  142.     private int totalConsumedMessages = 0;
  143.  
  144.     public Consumer() {
  145.         this.consumedMessageTypes = new HashMap<String, Integer>();
  146.     }
  147.  
  148.     public void processMessages(String queue) throws JMSException {
  149.  
  150.         // Crear una connectionFactory
  151.         final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USER, PASSWORD, URL);
  152.        
  153.         // Crear una conexión y iniciarla
  154.         final Connection connection = connectionFactory.createConnection();
  155.         connection.start();
  156.        
  157.         // Iniciar una nueva sesión
  158.         final Session session = connection.createSession(TRANSACTED_SESSION, Session.AUTO_ACKNOWLEDGE);
  159.         final Destination destination = session.createQueue(queue);
  160.         final MessageConsumer consumer = session.createConsumer(destination);
  161.  
  162.         processAllMessagesInQueue(consumer);
  163.  
  164.         consumer.close();
  165.         session.close();
  166.         connection.close();
  167.  
  168.         showProcessedResults();
  169.     }
  170.  
  171.     private void processAllMessagesInQueue(MessageConsumer consumer) throws JMSException {
  172.         Message message;
  173.         while ((message = consumer.receive(TIMEOUT)) != null) {
  174.             proccessMessage(message);
  175.         }
  176.     }
  177.  
  178.     private void proccessMessage(Message message) throws JMSException {
  179.         if (message instanceof TextMessage) {
  180.             final TextMessage textMessage = (TextMessage) message;
  181.             final String text = textMessage.getText();
  182.             incrementMessageType(text);
  183.             totalConsumedMessages++;
  184.         }
  185.     }
  186.  
  187.     private void incrementMessageType(String message) {
  188.         if (consumedMessageTypes.get(message) == null) {
  189.             consumedMessageTypes.put(message, 1);
  190.         } else {
  191.             final int numberOfTypeMessages = consumedMessageTypes.get(message);
  192.             consumedMessageTypes.put(message, numberOfTypeMessages + 1);
  193.         }
  194.     }
  195.  
  196.     private void showProcessedResults() {
  197.         System.out.println("Procesados un total de " + totalConsumedMessages + " mensajes");
  198.         for (String messageType : consumedMessageTypes.keySet()) {
  199.             final int numberOfTypeMessages = consumedMessageTypes.get(messageType);
  200.             System.out.println("Tipo " + messageType + " Procesados " + numberOfTypeMessages + " ("
  201.                     + (numberOfTypeMessages * 100 / totalConsumedMessages) + "%)");
  202.         }
  203.     }
  204. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement