Don't like ads? PRO users don't see any ads ;-)
Guest

Untitled

By: a guest on Apr 29th, 2012  |  syntax: None  |  size: 7.64 KB  |  hits: 19  |  expires: Never
download  |  raw  |  embed  |  report abuse  |  print
Text below is selected. Please press Ctrl+C to copy to your clipboard. (⌘+C on Mac)
  1.  
  2. /**
  3.  * Copyright 2010 JBoss Inc
  4.  *
  5.  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
  6.  * use this file except in compliance with the License. You may obtain a copy of
  7.  * the License at
  8.  *
  9.  * http://www.apache.org/licenses/LICENSE-2.0
  10.  *
  11.  * Unless required by applicable law or agreed to in writing, software
  12.  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  13.  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  14.  * License for the specific language governing permissions and limitations under
  15.  * the License.
  16.  */
  17. package org.jbpm.task.service.hornetq;
  18.  
  19. import java.io.ByteArrayInputStream;
  20. import java.io.ByteArrayOutputStream;
  21. import java.io.IOException;
  22. import java.io.ObjectInputStream;
  23. import java.io.ObjectOutputStream;
  24. import java.util.HashMap;
  25. import java.util.Map;
  26. import java.util.concurrent.atomic.AtomicInteger;
  27.  
  28. import org.hornetq.api.core.HornetQException;
  29. import org.hornetq.api.core.TransportConfiguration;
  30. import org.hornetq.api.core.client.ClientConsumer;
  31. import org.hornetq.api.core.client.ClientMessage;
  32. import org.hornetq.api.core.client.ClientProducer;
  33. import org.hornetq.api.core.client.ClientSession;
  34. import org.hornetq.api.core.client.ClientSessionFactory;
  35. import org.hornetq.api.core.client.HornetQClient;
  36. import org.hornetq.api.core.client.ServerLocator;
  37. import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
  38. import org.hornetq.core.remoting.impl.netty.TransportConstants;
  39. import org.jbpm.task.service.BaseClientHandler;
  40. import org.jbpm.task.service.BaseHandler;
  41. import org.jbpm.task.service.TaskClientConnector;
  42. import org.slf4j.Logger;
  43. import org.slf4j.LoggerFactory;
  44.  
  45. public class HornetQTaskClientConnector implements TaskClientConnector {
  46.  
  47.     private static final Logger logger = LoggerFactory.getLogger(HornetQTaskClientConnector.class);
  48.     protected ClientSession session;
  49.     protected final BaseClientHandler handler;
  50.     protected final String name;
  51.     protected AtomicInteger counter;
  52.     private String address;
  53.     private Integer port;
  54.     private ServerLocator serverLocator;
  55.     private ClientProducer producer;
  56.     private ClientConsumer consumer;
  57.  
  58.     public HornetQTaskClientConnector(String name, BaseClientHandler handler) {
  59.         if (name == null) {
  60.             throw new IllegalArgumentException("Name can not be null");
  61.         }
  62.         this.name = name;
  63.         this.handler = handler;
  64.         this.counter = new AtomicInteger();
  65.     }
  66.  
  67.     public boolean connect(String address, int port) {
  68.         this.port = port;
  69.         this.address = address;
  70.         return connect();
  71.     }
  72.  
  73.     public boolean connect() {
  74.         if (session != null && !session.isClosed()) {
  75.             throw new IllegalStateException("Already connected. Disconnect first.");
  76.         }
  77.         try {
  78.             Map<String, Object> connectionParams = new HashMap<String, Object>();
  79.             if (address == null) {
  80.                 address = "127.0.0.1";
  81.             }
  82.             if (port == null) {
  83.                 port = 5445;
  84.             }
  85.             connectionParams.put(TransportConstants.PORT_PROP_NAME, port);
  86.             connectionParams.put(TransportConstants.HOST_PROP_NAME, address);
  87.  
  88.             TransportConfiguration transportConfiguration = new TransportConfiguration(NettyConnectorFactory.class.getCanonicalName(), connectionParams);
  89.             serverLocator = HornetQClient.createServerLocatorWithoutHA(transportConfiguration);
  90.             ClientSessionFactory factory = serverLocator.createSessionFactory(transportConfiguration);
  91.             session = factory.createSession();
  92.             producer = session.createProducer(HornetQTaskServer.SERVER_TASK_COMMANDS_QUEUE);
  93.  
  94.             createClientQueue();
  95.  
  96.             Thread responsesThread = new Thread(new Runnable() {
  97.  
  98.                 public void run() {
  99.                     try {
  100.                         consumer = session.createConsumer(name);
  101.                     } catch (HornetQException e) {
  102.                         logger.error("Error creating consumer. ", e);
  103.                         if (e.getCode() == HornetQException.OBJECT_CLOSED) {
  104.                             logger.info(e.getMessage());
  105.                             return;
  106.                         }
  107.                         throw new RuntimeException("Client Exception with class " + getClass()
  108.                                 + " using port " + port, e);
  109.                     }
  110.  
  111.  
  112.                     while (true) {
  113.                         try {
  114.                             ClientMessage serverMessage = consumer.receive();
  115.                             if (serverMessage != null) {
  116.                                 ((HornetQTaskClientHandler) handler).messageReceived(session, readMessage(serverMessage), BaseHornetQTaskServer.SERVER_TASK_COMMANDS_QUEUE);
  117.                             }
  118.                         } catch (HornetQException e) {
  119.                             if (e.getCode() != HornetQException.OBJECT_CLOSED) {
  120.                                 logger.error(e.getMessage());
  121.                                 return;
  122.                             }
  123.                            
  124.                         } catch (Exception e) {
  125.                             // LOG the exception and continue receiving
  126.                                                         // messages.
  127.                             logger.error(e.getMessage());
  128.                         }
  129.                     }
  130.  
  131.                 }
  132.             });
  133.             responsesThread.start();
  134.             session.start();
  135.             return true;
  136.         } catch (Exception e) {
  137.             logger.error(e.getMessage());
  138.             return false;
  139.         }
  140.     }
  141.  
  142.     private Object readMessage(ClientMessage serverMessage) throws IOException, ClassNotFoundException {
  143.         int bodySize = serverMessage.getBodySize();
  144.         byte[] message = new byte[bodySize];
  145.         serverMessage.getBodyBuffer().readBytes(message);
  146.         ByteArrayInputStream bais = new ByteArrayInputStream(message);
  147.         ObjectInputStream ois = new ObjectInputStream(bais);
  148.         return ois.readObject();
  149.     }
  150.  
  151.     private void createClientQueue() {
  152.         try {
  153.             session.createQueue(name, name, true);
  154.         } catch (HornetQException e) {
  155.             if (e.getCode() != HornetQException.QUEUE_EXISTS) {
  156.                 throw new RuntimeException("Client Exception with class " + getClass() + " using port " + port, e);
  157.             }
  158.             logger.info(e.getMessage());
  159.         }
  160.     }
  161.  
  162.     public void disconnect() throws Exception {
  163.         if (session != null && !session.isClosed()) {
  164.             session.close();
  165.             producer.close();
  166.             if (consumer != null) {
  167.                 consumer.close();
  168.             }
  169.             serverLocator.close();
  170.         }
  171.     }
  172.  
  173.     public void write(Object object) {
  174.         ByteArrayOutputStream baos = new ByteArrayOutputStream();
  175.         ObjectOutputStream oout;
  176.         try {
  177.             oout = new ObjectOutputStream(baos);
  178.             oout.writeObject(object);
  179.             ClientMessage message = session.createMessage(true);
  180.             message.getBodyBuffer().writeBytes(baos.toByteArray());
  181.             message.putStringProperty("producerId", name);
  182.             producer.send(message);
  183.         } catch (IOException e) {
  184.             throw new RuntimeException("Error creating message", e);
  185.         } catch (HornetQException e) {
  186.             throw new RuntimeException("Error writing message", e);
  187.         }
  188.     }
  189.  
  190.     public AtomicInteger getCounter() {
  191.         return counter;
  192.     }
  193.  
  194.     public BaseHandler getHandler() {
  195.         return handler;
  196.     }
  197.  
  198.     public String getName() {
  199.         return name;
  200.     }
  201. }