Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /*
- * To change this license header, choose License Headers in Project Properties.
- * To change this template file, choose Tools | Templates
- * and open the template in the editor.
- */
- package activeMQ_exercici2;
- import java.util.HashMap;
- import java.util.Map;
- import org.apache.activemq.ActiveMQConnection;
- import org.apache.activemq.ActiveMQConnectionFactory;
- import javax.jms.*;
- import java.util.Random;
- public class Exercici2 {
- public static void main(String[] args) throws JMSException, InterruptedException {
- Producer producer = new Producer();
- producer.sendMessages("iam36580056.cua.2WIAM.exercici5.exercici7");
- Producer producer2 = new Producer();
- producer2.sendMessages("iam36580056.cua.2WIAM.exercici4");
- Producer producer3 = new Producer();
- producer3.sendMessages("iam36580056.cua.1WIAM.exercici20");
- Consumer consumer = new Consumer();
- consumer.processMessages("iam36580056.cua.2WIAM.>");
- }
- }
- class Producer {
- public enum UserAction {
- RAMON("M06 - PERSITENCIA"),
- GENIS("M03 - JAVA"),
- PEP("M08 - ANDROID"),
- VAL("M05 - ENTORNS");
- private final String userAction;
- private UserAction(String userAction) {
- this.userAction = userAction;
- }
- public String getActionAsString() {
- return this.userAction;
- }
- }
- private static final Random RANDOM = new Random(System.currentTimeMillis());
- //private static final String URL = "tcp://192.168.2.59:61616";
- private static final String URL = "tcp://localhost:61616";
- private static final String USER = ActiveMQConnection.DEFAULT_USER;
- private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
- private static final String DESTINATION_QUEUE = "iam36580056.cua.2WIAM.exercici5.";
- //private static final String DESTINATION_TOPIC = "iam36580056-topic-2WIAM";
- private static final boolean TRANSACTED_SESSION = true;
- private static final int MESSAGES_TO_SEND = 100;
- public void sendMessages(String queue) throws JMSException, InterruptedException {
- // Crear una ConectionFactory
- final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USER, PASSWORD, URL);
- // Crear una conexión nueva
- Connection connection = connectionFactory.createConnection();
- connection.start();
- // Crear la sesión
- final Session session = connection.createSession(TRANSACTED_SESSION, Session.AUTO_ACKNOWLEDGE);
- // Crear la destinacion de los mensajes (Cola o Topic) en esta sesión
- final Destination destination = session.createQueue(queue);
- //final Destination destination = session.createTopic(DESTINATION_TOPIC);
- // Crear un MessageProducer que llevará los mensajes a los topics o colas
- final MessageProducer producer = session.createProducer(destination);
- producer.setDeliveryMode(DeliveryMode.PERSISTENT);
- // Finalmente mandar los mensajes al broker
- sendMessages(session, producer);
- session.commit();
- // Cerrar sesión y conexión una vez se han mandado los mensajes
- session.close();
- connection.close();
- System.out.println("Tots els missatges enviats correctament");
- }
- private void sendMessages(Session session, MessageProducer producer) throws JMSException, InterruptedException {
- final Producer messageSender = new Producer();
- for (int i = 1; i <= MESSAGES_TO_SEND; i++) {
- final UserAction userActionToSend = getRandomUserAction();
- // Enviar un texto cada 5 segundos
- Thread.sleep(10);
- //Thread.sleep(5000);
- messageSender.sendMessage(userActionToSend.getActionAsString(), session, producer);
- System.out.println("Missatge "+i+" enviat correctament");
- }
- }
- private void sendMessage(String message, Session session, MessageProducer producer) throws JMSException {
- final TextMessage textMessage = session.createTextMessage(message);
- producer.send(textMessage);
- }
- // Crear una accion de forma aleatoria
- private static UserAction getRandomUserAction() {
- final int userActionNumber = (int) (RANDOM.nextFloat() * UserAction.values().length);
- return UserAction.values()[userActionNumber];
- }
- }
- class Consumer {
- private static final String URL = "tcp://localhost:61616";
- private static final String USER = ActiveMQConnection.DEFAULT_USER;
- private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
- private static final String DESTINATION_QUEUE = "iam36580056-cua-2WIAM-exercici5";
- //private static final String DESTINATION_TOPIC = "iam36580056-topic-2WIAM";
- private static final boolean TRANSACTED_SESSION = false;
- private static final int TIMEOUT = 1000;
- private final Map<String, Integer> consumedMessageTypes;
- private int totalConsumedMessages = 0;
- public Consumer() {
- this.consumedMessageTypes = new HashMap<String, Integer>();
- }
- public void processMessages(String queue) throws JMSException {
- // Crear una connectionFactory
- final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USER, PASSWORD, URL);
- // Crear una conexión y iniciarla
- final Connection connection = connectionFactory.createConnection();
- connection.start();
- // Iniciar una nueva sesión
- final Session session = connection.createSession(TRANSACTED_SESSION, Session.AUTO_ACKNOWLEDGE);
- final Destination destination = session.createQueue(queue);
- final MessageConsumer consumer = session.createConsumer(destination);
- processAllMessagesInQueue(consumer);
- consumer.close();
- session.close();
- connection.close();
- showProcessedResults();
- }
- private void processAllMessagesInQueue(MessageConsumer consumer) throws JMSException {
- Message message;
- while ((message = consumer.receive(TIMEOUT)) != null) {
- proccessMessage(message);
- }
- }
- private void proccessMessage(Message message) throws JMSException {
- if (message instanceof TextMessage) {
- final TextMessage textMessage = (TextMessage) message;
- final String text = textMessage.getText();
- incrementMessageType(text);
- totalConsumedMessages++;
- }
- }
- private void incrementMessageType(String message) {
- if (consumedMessageTypes.get(message) == null) {
- consumedMessageTypes.put(message, 1);
- } else {
- final int numberOfTypeMessages = consumedMessageTypes.get(message);
- consumedMessageTypes.put(message, numberOfTypeMessages + 1);
- }
- }
- private void showProcessedResults() {
- System.out.println("Procesados un total de " + totalConsumedMessages + " mensajes");
- for (String messageType : consumedMessageTypes.keySet()) {
- final int numberOfTypeMessages = consumedMessageTypes.get(messageType);
- System.out.println("Tipo " + messageType + " Procesados " + numberOfTypeMessages + " ("
- + (numberOfTypeMessages * 100 / totalConsumedMessages) + "%)");
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement