Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package transformador;
- import java.io.IOException;
- import java.util.Scanner;
- import java.util.concurrent.TimeoutException;
- import org.json.JSONArray;
- import org.json.JSONException;
- import org.json.JSONObject;
- import org.json.XML;
- import com.rabbitmq.client.AMQP;
- import com.rabbitmq.client.BuiltinExchangeType;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.Consumer;
- import com.rabbitmq.client.DefaultConsumer;
- import com.rabbitmq.client.Envelope;
- public class Transformador {
- private final static String NOMBRE_EXCHANGE = "ajuntament";
- public static void main(String[] args) throws IOException, TimeoutException, JSONException {
- // Conectarse con RabbitMQ
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- // Crear un canal de comunicaci�n de entrada
- Channel channelEnt = connection.createChannel();
- Channel channelOut = connection.createChannel();
- // Declaramos un Exchange de tipo Topic
- channelEnt.exchangeDeclare(NOMBRE_EXCHANGE, BuiltinExchangeType.TOPIC);
- channelOut.exchangeDeclare(NOMBRE_EXCHANGE, BuiltinExchangeType.TOPIC);
- // Solicitar la creaci�n de una cola y asociarle los temas de los mensajes a
- // recibir
- String COLA_CONSUMER = channelEnt.queueDeclare().getQueue();
- Scanner ent = new Scanner(System.in);
- System.out.println("Introduce trafico o bicis: ");
- String texto = ent.nextLine();
- ent.close();
- channelEnt.queueBind(COLA_CONSUMER, NOMBRE_EXCHANGE, texto);
- // Crear un manejador de mensajes
- Consumer consumer = new DefaultConsumer(channelEnt) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
- byte[] body) throws IOException {
- String message = new String(body, "UTF-8");
- // System.out.println("Recibido: '" + message + "'");
- System.out.println("Recibido, entrando en " + texto);
- String punto, tipo, descripcion;
- int latitud, longitud;
- JSONArray coleccion;
- try {
- coleccion = new JSONArray(message);
- // Le introduce los valores del mensaje
- if (texto == "trafico") { // es un json
- System.out.println("Mensaje de trafico, parseando");
- for (int i = 0; i < coleccion.length(); i++) {
- JSONObject obj = coleccion.getJSONObject(i);
- punto = obj.get("denominacion").toString();
- tipo = "datos de trafico";
- latitud = obj.getInt("coordinates[0]");
- longitud = obj.getInt("coordinates[1]");
- // 0=fluido, 1=denso, 2=congestionado, 3=cortado
- String[] estados = { "fluido", "denso", "congestionado", "cortado" };
- int desc = obj.getInt("estado");
- descripcion = estados[desc];
- // Mensaje canonico
- JSONObject json = new JSONObject();
- json.put("punto", punto);
- json.put("tipo", tipo);
- json.put("latitud", latitud);
- json.put("longitud", longitud);
- json.put("descripcion", descripcion);
- System.out.println(json.toString());
- channelOut.basicPublish(NOMBRE_EXCHANGE, "logger", null, json.toString().getBytes());
- }
- } else if (texto == "bicis") { // es un xml
- JSONObject xml = XML.toJSONObject(message);
- channelOut.basicPublish(NOMBRE_EXCHANGE, texto, null, xml.toString().getBytes());
- }
- } catch (JSONException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- };
- channelEnt.basicConsume(COLA_CONSUMER, true, consumer);
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement