Advertisement
Guest User

Server

a guest
Jul 31st, 2013
26
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 17.05 KB | None | 0 0
  1. package com.airit.mobile.server.local.topic;
  2.  
  3. import static javax.ejb.ConcurrencyManagementType.BEAN;
  4.  
  5. import java.sql.Timestamp;
  6. import java.text.DateFormat;
  7. import java.text.ParseException;
  8. import java.text.SimpleDateFormat;
  9. import java.util.ArrayList;
  10. import java.util.Arrays;
  11. import java.util.Calendar;
  12. import java.util.GregorianCalendar;
  13. import java.util.HashMap;
  14. import java.util.HashSet;
  15. import java.util.List;
  16. import java.util.Properties;
  17. import java.util.Set;
  18. import java.util.concurrent.ConcurrentHashMap;
  19.  
  20. import javax.annotation.PostConstruct;
  21. import javax.annotation.PreDestroy;
  22. import javax.ejb.ConcurrencyManagement;
  23. import javax.ejb.Singleton;
  24. import javax.ejb.Startup;
  25. import javax.enterprise.context.ApplicationScoped;
  26. import javax.inject.Inject;
  27. import javax.jms.ExceptionListener;
  28. import javax.jms.JMSException;
  29. import javax.jms.Message;
  30. import javax.jms.MessageListener;
  31. import javax.jms.TextMessage;
  32. import javax.jms.Topic;
  33. import javax.jms.TopicConnection;
  34. import javax.jms.TopicConnectionFactory;
  35. import javax.jms.TopicSession;
  36. import javax.jms.TopicSubscriber;
  37. import javax.naming.Context;
  38. import javax.naming.InitialContext;
  39. import javax.naming.NamingException;
  40.  
  41. import org.apache.log4j.Logger;
  42. import org.jboss.errai.bus.client.api.QueueSession;
  43. import org.jboss.errai.bus.client.api.SessionEndEvent;
  44. import org.jboss.errai.bus.client.api.SessionEndListener;
  45. import org.jboss.errai.bus.client.api.base.MessageBuilder;
  46. import org.jboss.errai.bus.client.api.messaging.MessageBus;
  47. import org.jboss.errai.bus.client.api.messaging.MessageCallback;
  48. import org.jboss.errai.bus.client.api.messaging.RequestDispatcher;
  49. import org.jboss.errai.bus.server.annotations.Service;
  50. import org.jboss.errai.common.client.protocols.MessageParts;
  51. import org.jboss.errai.common.client.protocols.Resources;
  52.  
  53. import com.airit.aep.common.exception.UBException;
  54. import com.airit.aep.common.transfer.UBTransfer;
  55. import com.airit.aep.common.transfer.ws.UBTransferBridgeUtil;
  56. import com.airit.aep.common.util.UBNullComparator;
  57. import com.airit.mobile.client.shared.ClientMessage;
  58. import com.airit.mobile.client.shared.Flight;
  59. import com.airit.mobile.client.shared.FlightQuery;
  60. import com.airit.mobile.client.shared.ServerMessage;
  61. import com.airit.mobile.server.local.util.FlightBuilder;
  62. @Service("FlightUpdateService")
  63. @Singleton
  64. @ApplicationScoped
  65. @Startup
  66. @ConcurrencyManagement(BEAN)
  67.  
  68. public class FlightUpdateServiceImpl implements FlightUpdateService,MessageListener, ExceptionListener,MessageCallback,SessionEndListener {
  69. private TopicConnection conn ;
  70. private RequestDispatcher dispatcher;
  71. private ConcurrentHashMap<String,HashSet<Long>> sessionMap;
  72. private ConcurrentHashMap<String,FlightQuery> queryMap;
  73. private ConcurrentHashMap<String,HashMap<String,ClientMessage>> cachedMessagesMap;
  74. private HashSet<String> trackedFields;
  75. // @Inject Event<FlightUpdate> flightUpdateEvent;
  76. private static Logger LOGGER = Logger
  77. .getLogger(FlightUpdateServiceImpl.class);
  78. @Inject
  79. public FlightUpdateServiceImpl(RequestDispatcher dispatcher,MessageBus bus) {
  80. this.dispatcher = dispatcher;
  81. bus.subscribe("FlightUpdateService", this);
  82. initTrackedFields();
  83. }
  84. public FlightUpdateServiceImpl(){
  85. initTrackedFields();
  86. }
  87. private void initTrackedFields(){
  88. trackedFields = new HashSet<String> ();
  89. trackedFields.add("odFlightNumber");
  90. trackedFields.add("odRapCodeDestination");
  91. trackedFields.add("odRfsCode");
  92. trackedFields.add("odBtd");
  93. trackedFields.add("odStd");
  94. trackedFields.add("odRspCode");
  95. trackedFields.add("odGatePcaInd");
  96. trackedFields.add("odRalCode");
  97. trackedFields.add("odPaxTotal");
  98. trackedFields.add("odInternalComment");
  99. }
  100. @Override
  101. public void onMessage(Message message) {
  102. if ( message != null )
  103. {
  104. LOGGER.debug("Received message");
  105. List<UBTransfer> response;
  106. try {
  107. response = UBTransferBridgeUtil.convertUBTransferInternalMarkupToUBTransferList(Arrays.asList(((TextMessage)message).getText()), true);
  108. for ( final UBTransfer transfer : response )
  109. {
  110. // System.out.println(UBTransferUtil.generateDebugString(transfer));
  111. if (transfer.getSimpleTransferType() != null
  112. && "com.airit.fim.ejb.entity.OpDepartures"
  113. .equals(transfer.getSimpleTransferType())) {
  114. if (transfer.getAction() != null && !transfer.getAction().toString().equals(ClientMessage.NO_ACTION) && getIsChanged(transfer)) {
  115. FlightBuilder fb = new FlightBuilder();
  116. LOGGER.info("Transfer Type:"
  117. + transfer.getTransferType());
  118. LOGGER.info("Action:"
  119. + transfer.getAction().toString());
  120. LOGGER.info("Simple Transfer Type:"
  121. + transfer.getSimpleTransferType());
  122. ClientMessage fm = new ClientMessage();
  123. fm.setAction(transfer.getAction().toString());
  124. fm.setFlight(fb.buildFlight(transfer, false));
  125. LOGGER.info("Processing Flight:"+fm.getFlight().getFlightNumber());
  126. sendFlightToClient(fm);
  127. }else{
  128. LOGGER.debug("Message Not Qualified");
  129. }
  130. }
  131. // transfer.getValue("odFlightNumber");
  132. }
  133. } catch (UBException e) {
  134. // TODO Auto-generated catch block
  135. e.printStackTrace();
  136. } catch (JMSException e) {
  137. // TODO Auto-generated catch block
  138. e.printStackTrace();
  139. }
  140. }
  141.  
  142. }
  143. private boolean getIsChanged(UBTransfer transfer) {
  144.  
  145. for(String field : trackedFields){
  146. if(transfer.isChanged(field)){
  147. LOGGER.info("Field Changed:"+field);
  148. return true;
  149. }
  150. }
  151. return false;
  152. }
  153. private void sendFlightToClient(ClientMessage cm) {
  154.  
  155. if(cm.getAction().equals(cm.INSERT_ACTION)){
  156. HashSet<String> sessionList = buildSessionList(cm.getFlight());
  157. for (String sessionId : sessionList){
  158. sendMessageToClient(sessionId,cm);
  159. }
  160. }else{
  161. Long id = cm.getFlight().getId();
  162. Set<String> sessionList = getSessionMap().keySet();
  163. for (String sessionId : sessionList){
  164. if(getSessionMap().get(sessionId).contains(id)){
  165. sendMessageToClient(sessionId,cm);
  166. }
  167. }
  168. }
  169.  
  170. }
  171. private HashSet<String> buildSessionList(Flight f) {
  172. Set<String> sessionList = getQueryMap().keySet();
  173. HashSet<String> qualifiedList = new HashSet<String>();
  174. UBNullComparator nc = new UBNullComparator();
  175. for (String sessionId : sessionList){
  176. boolean isValid = true;
  177. FlightQuery query = getQueryMap().get(sessionId);
  178. try {
  179. Timestamp start = parseTimestampFromString(query.getStartDate());
  180. Timestamp end = parseTimestampFromString(query.getEndDate());
  181. if(nc.compare(f.getStd(), start) < 0 ){
  182. LOGGER.info("Query 1 start:"+start);
  183. LOGGER.info("Query 1 end:"+end);
  184. LOGGER.info("Flight 1 std:"+f.getStd());
  185. LOGGER.info("Compare 1:"+nc.compare(f.getStd(), start));
  186. isValid = false;
  187. }
  188. if(isValid && nc.compare(f.getStd(), end) > 0 ){
  189. LOGGER.info("Query 2 start:"+start);
  190. LOGGER.info("Query 2 end:"+end);
  191. LOGGER.info("Flight 2 std:"+f.getStd());
  192. LOGGER.info("Compare 2:"+nc.compare(f.getStd(), end));
  193. isValid = false;
  194. }
  195. if(isValid && query.getAirline()!=null && nc.compare(f.getAirline(), query.getAirline()) != 0 ){
  196. LOGGER.info("Check 3 Flight Airline:"+f.getAirline());
  197. LOGGER.info("Query 3 Airline:"+query.getAirline());
  198. LOGGER.info("Compare 3:"+nc.compare(f.getAirline(), query.getAirline()));
  199. isValid = false;
  200. }
  201. if(isValid && query.getDestination()!=null && nc.compare(f.getDestination(), query.getDestination()) != 0 ){
  202. LOGGER.info("Check 4 Flight Dest:"+f.getDestination());
  203. LOGGER.info("Query 4 Dest:"+query.getDestination());
  204. LOGGER.info("Compare 4:"+nc.compare(f.getDestination(), query.getDestination()));
  205. isValid = false;
  206. }
  207.  
  208. if(isValid){
  209. qualifiedList.add(sessionId);
  210. }
  211. } catch (ParseException e) {
  212. // TODO Auto-generated catch block
  213. e.printStackTrace();
  214. }
  215.  
  216.  
  217. }
  218.  
  219. return qualifiedList;
  220. }
  221. public Timestamp parseTimestampFromString(String timestamp) throws ParseException {
  222. String format = "MM/dd/yyyy HH:mm";
  223. DateFormat df = new SimpleDateFormat(format);
  224. // Set to comply with ISO 8601 standard
  225. Calendar c = new GregorianCalendar();
  226. df.setCalendar(c);
  227.  
  228. return new Timestamp(df.parse(timestamp).getTime());
  229. }
  230. @PostConstruct
  231. void init() {
  232. LOGGER.info("STARTING MY BEAN...");
  233. initConnection();
  234. // other initialization logic
  235.  
  236. }
  237.  
  238. private void initConnection() {
  239. Properties properties = new Properties();
  240. properties.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
  241. properties.put(Context.URL_PKG_PREFIXES, "org.jboss.naming:org.jnp.interfaces");
  242. properties.put(Context.PROVIDER_URL, "jnp://10.6.164.36:1099");
  243. InitialContext iniCtx;
  244. try {
  245. iniCtx = new InitialContext(properties);
  246. Object connectionFactory = iniCtx.lookup("ConnectionFactory");
  247. TopicConnectionFactory qcf = (TopicConnectionFactory) connectionFactory;
  248. conn = qcf.createTopicConnection();
  249.  
  250. Topic updateTopic = (Topic) iniCtx.lookup("topic/MobileUpdateTopic");
  251. TopicSession session = conn.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
  252.  
  253. conn.start();
  254.  
  255. TopicSubscriber subscriber = session.createSubscriber(updateTopic);
  256. LOGGER.info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>msg listener ADDED");
  257. subscriber.setMessageListener(this);
  258. conn.setExceptionListener(this);
  259. } catch (NamingException e) {
  260. // TODO Auto-generated catch block
  261. e.printStackTrace();
  262. } catch (JMSException e) {
  263. // TODO Auto-generated catch block
  264. e.printStackTrace();
  265. }
  266.  
  267. }
  268.  
  269. private void closeConnection() {
  270. try {
  271. conn.close();
  272. System.out.println("msg listener closed");
  273. } catch(Exception e) { }
  274.  
  275. }
  276. @PreDestroy
  277. void cleanUp() {
  278. closeConnection();
  279. // other cleanup logic
  280.  
  281. }
  282. @Override
  283. public void onException(JMSException arg0) {
  284. cleanUp();
  285. // TODO Auto-generated method stub
  286.  
  287. }
  288. @Override
  289. public void callback(
  290. org.jboss.errai.bus.client.api.messaging.Message message) {
  291. ServerMessage serverMessage = message.get(ServerMessage.class, "serverMessage");
  292. if(serverMessage.getAction() != null){
  293. if(serverMessage.getAction().equals(ServerMessage.SESSION_ID_REQUEST)){
  294. LOGGER.info(">>>>>>> Received Session Id Request from client");
  295. String sessionId = getSessionId(message);
  296. LOGGER.info(">>>>>>> SENDING Session ID:"+sessionId);
  297. }else if(serverMessage.getAction().equals(ServerMessage.ACK_MESSAGE_RECEIVED) && serverMessage.getMessageId()!= null && serverMessage.getSessionId()!= null){
  298. LOGGER.info(">>>>>>> ReceivedACK Message from client >>"+serverMessage.getMessageId());
  299. removeCachedMessage(serverMessage.getSessionId(),serverMessage.getMessageId());
  300. }else if(serverMessage.getAction().equals(ServerMessage.REQ_OFFLINE_MESSAGES) && serverMessage.getSessionId()!= null){
  301. LOGGER.info(">>>>>>> Requesting Offline Message from client:"+serverMessage.getSessionId());
  302. checkCachedMessages(serverMessage.getSessionId());
  303. }
  304. }
  305.  
  306. }
  307. private String getSessionId(org.jboss.errai.bus.client.api.messaging.Message message){
  308. QueueSession sess = message.getResource(QueueSession.class, Resources.Session.name());
  309. sess.addSessionEndListener(this);
  310. String sessionId = sess.getSessionId();
  311. ClientMessage clientMessage = new ClientMessage();
  312. clientMessage.setType(ClientMessage.INIT_SESSION_MESSAGE_TYPE);
  313. clientMessage.setSessionId(sessionId);
  314.  
  315. MessageBuilder.createConversation(message)
  316. .toSubject("FlightClientReceiver") // (1)
  317. .signalling()
  318. // .with(MessageParts.SessionID, sessionId)// (2)
  319. // .with(MessageParts.CommandType, "getSessionId")
  320. .with("clientMessage", clientMessage)
  321. .noErrorHandling() // (4)
  322. .sendNowWith(dispatcher);
  323. return sessionId;
  324. }
  325. private void checkCachedMessages(String sessionId) {
  326. LOGGER.info(">>>>>>>CCM Checking Cached Messages for "+sessionId);
  327. if(getCachedMessagesMap().containsKey(sessionId)){
  328. HashMap<String,ClientMessage> messageMap = getCachedMessagesMap().get(sessionId);
  329. Set<String> messageIds = messageMap.keySet();
  330. LOGGER.info("<<<<<<<< Found Messages CCM - " +messageIds.size());
  331. for(String messageId : messageIds){
  332. ClientMessage cm = messageMap.get(messageId);
  333. sendMessageToClient(sessionId,cm);
  334. }
  335. }else{
  336. LOGGER.info("<<<<<<<< No Messages Found >>>>>>>>>>> ");
  337. }
  338.  
  339. }
  340. private void removeCachedMessage(String sessionId, String messageId) {
  341. LOGGER.info(">>>>>>>RCM Removing Cached Messages for "+sessionId);
  342. if(getCachedMessagesMap().containsKey(sessionId)){
  343. LOGGER.info("<<<<<<<< Found Messages RCM- " +getCachedMessagesMap().get(sessionId).entrySet().size());
  344. HashMap<String,ClientMessage> messageMap = getCachedMessagesMap().get(sessionId);
  345. if(messageMap.containsKey(messageId)){
  346. messageMap.remove(messageId);
  347. }
  348. LOGGER.info("<<<<<<<< POST REMOVE Messages RCM- " +getCachedMessagesMap().get(sessionId).entrySet().size());
  349. }
  350. }
  351. private void cacheMessage(String sessionId, ClientMessage clientMessage,
  352. String messageId) {
  353. if(getCachedMessagesMap().containsKey(sessionId)){
  354. HashMap<String,ClientMessage> messageMap = getCachedMessagesMap().get(sessionId);
  355. messageMap.put(messageId, clientMessage);
  356.  
  357. }else{
  358. HashMap<String,ClientMessage> messageMap = new HashMap<String,ClientMessage>();
  359. messageMap.put(messageId, clientMessage);
  360. getCachedMessagesMap().put(sessionId, messageMap);
  361. }
  362.  
  363. }
  364. private void sendMessageToClient( String sessionId,ClientMessage clientMessage){
  365. if(getQueryMap().containsKey(sessionId)) {
  366. try {
  367. String messageId= getMessageId(sessionId,clientMessage.getFlight().getId());
  368. clientMessage.setMessageId(messageId);
  369. cacheMessage(sessionId,clientMessage,messageId);
  370. clientMessage.setType(ClientMessage.FLIGHT_UPDATE_MESSAGE_TYPE);
  371. clientMessage.setSessionId(sessionId);
  372. System.out.println("Sending Flight Update To Session ID :"+sessionId);
  373. MessageBuilder.createMessage()
  374. .toSubject("FlightClientReceiver") // (1)
  375. .signalling()
  376. .with(MessageParts.SessionID, sessionId)// (2)
  377. .with("clientMessage", clientMessage)
  378. .noErrorHandling() // (4)
  379. .sendNowWith(dispatcher);
  380. } catch (RuntimeException re) {
  381. String message = re.getMessage();
  382. LOGGER.info(">>>>>>> ERROR OCCURRED SENDING MESSAGE:"+message);
  383. } catch (Exception e) {
  384. // TODO Auto-generated catch block
  385. e.printStackTrace();
  386. }
  387. }else{
  388. System.out.println("###<<<<<<<<<<<>>>>>>>>>>>>SESSION ENDED:PLACING MESSAGE IN CACHE:"+sessionId);
  389. // if(getClientCacheMap().containsKey(sessionId)){
  390. // getClientCacheMap().get(sessionId).add(flightMessage);
  391. // }else{
  392. // HashSet<FlightMessage> messages = new HashSet<FlightMessage>();
  393. // messages.add(flightMessage);
  394. // getClientCacheMap().put(sessionId, messages);
  395. // }
  396. }
  397.  
  398.  
  399. }
  400.  
  401. @Override
  402. public void onSessionEnd(SessionEndEvent event) {
  403. // TODO Auto-generated method stub
  404. String sessionId = event.getSession().getSessionId();
  405. System.out.println("SESSION ENDED:"+sessionId);
  406. if(sessionId!= null){
  407. getSessionMap().remove(sessionId);
  408. getQueryMap().remove(sessionId);
  409. getCachedMessagesMap().remove(sessionId);
  410. }
  411. }
  412. @Override
  413. public void updateSessionMap(FlightQuery query,ArrayList<Flight> flightList){
  414. if(query.getSessionId() != null){
  415. getSessionMap().put(query.getSessionId(), getIds(flightList));
  416. getQueryMap().put(query.getSessionId(), query);
  417. }
  418. }
  419. private HashSet<Long> getIds(ArrayList<Flight> flightList) {
  420. HashSet<Long> idList = new HashSet<Long>();
  421. for(Flight f : flightList){
  422. idList.add(f.getId());
  423. }
  424. // TODO Auto-generated method stub
  425. return idList;
  426. }
  427. private ConcurrentHashMap<String,HashSet<Long>> getSessionMap(){
  428. if(sessionMap== null){
  429. sessionMap = new ConcurrentHashMap<String,HashSet<Long>>();
  430. }
  431. return sessionMap;
  432. }
  433.  
  434. private ConcurrentHashMap<String,FlightQuery> getQueryMap(){
  435. if(queryMap== null){
  436. queryMap = new ConcurrentHashMap<String,FlightQuery>();
  437. }
  438. return queryMap;
  439. }
  440. private ConcurrentHashMap<String,HashMap<String,ClientMessage>>getCachedMessagesMap(){
  441. if(cachedMessagesMap== null){
  442. cachedMessagesMap = new ConcurrentHashMap<String,HashMap<String,ClientMessage>>();
  443. }
  444. return cachedMessagesMap;
  445. }
  446. private String getMessageId(String sessionId,Long flightId){
  447. String messageId = flightId .toString()+"-"+sessionId;
  448. return messageId;
  449. }
  450. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement