Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.airit.mobile.server.local.topic;
- import static javax.ejb.ConcurrencyManagementType.BEAN;
- import java.sql.Timestamp;
- import java.text.DateFormat;
- import java.text.ParseException;
- import java.text.SimpleDateFormat;
- import java.util.ArrayList;
- import java.util.Arrays;
- import java.util.Calendar;
- import java.util.GregorianCalendar;
- import java.util.HashMap;
- import java.util.HashSet;
- import java.util.List;
- import java.util.Properties;
- import java.util.Set;
- import java.util.concurrent.ConcurrentHashMap;
- import javax.annotation.PostConstruct;
- import javax.annotation.PreDestroy;
- import javax.ejb.ConcurrencyManagement;
- import javax.ejb.Singleton;
- import javax.ejb.Startup;
- import javax.enterprise.context.ApplicationScoped;
- import javax.inject.Inject;
- import javax.jms.ExceptionListener;
- import javax.jms.JMSException;
- import javax.jms.Message;
- import javax.jms.MessageListener;
- import javax.jms.TextMessage;
- import javax.jms.Topic;
- import javax.jms.TopicConnection;
- import javax.jms.TopicConnectionFactory;
- import javax.jms.TopicSession;
- import javax.jms.TopicSubscriber;
- import javax.naming.Context;
- import javax.naming.InitialContext;
- import javax.naming.NamingException;
- import org.apache.log4j.Logger;
- import org.jboss.errai.bus.client.api.QueueSession;
- import org.jboss.errai.bus.client.api.SessionEndEvent;
- import org.jboss.errai.bus.client.api.SessionEndListener;
- import org.jboss.errai.bus.client.api.base.MessageBuilder;
- import org.jboss.errai.bus.client.api.messaging.MessageBus;
- import org.jboss.errai.bus.client.api.messaging.MessageCallback;
- import org.jboss.errai.bus.client.api.messaging.RequestDispatcher;
- import org.jboss.errai.bus.server.annotations.Service;
- import org.jboss.errai.common.client.protocols.MessageParts;
- import org.jboss.errai.common.client.protocols.Resources;
- import com.airit.aep.common.exception.UBException;
- import com.airit.aep.common.transfer.UBTransfer;
- import com.airit.aep.common.transfer.ws.UBTransferBridgeUtil;
- import com.airit.aep.common.util.UBNullComparator;
- import com.airit.mobile.client.shared.ClientMessage;
- import com.airit.mobile.client.shared.Flight;
- import com.airit.mobile.client.shared.FlightQuery;
- import com.airit.mobile.client.shared.ServerMessage;
- import com.airit.mobile.server.local.util.FlightBuilder;
- @Service("FlightUpdateService")
- @Singleton
- @ApplicationScoped
- @Startup
- @ConcurrencyManagement(BEAN)
- public class FlightUpdateServiceImpl implements FlightUpdateService,MessageListener, ExceptionListener,MessageCallback,SessionEndListener {
- private TopicConnection conn ;
- private RequestDispatcher dispatcher;
- private ConcurrentHashMap<String,HashSet<Long>> sessionMap;
- private ConcurrentHashMap<String,FlightQuery> queryMap;
- private ConcurrentHashMap<String,HashMap<String,ClientMessage>> cachedMessagesMap;
- private HashSet<String> trackedFields;
- // @Inject Event<FlightUpdate> flightUpdateEvent;
- private static Logger LOGGER = Logger
- .getLogger(FlightUpdateServiceImpl.class);
- @Inject
- public FlightUpdateServiceImpl(RequestDispatcher dispatcher,MessageBus bus) {
- this.dispatcher = dispatcher;
- bus.subscribe("FlightUpdateService", this);
- initTrackedFields();
- }
- public FlightUpdateServiceImpl(){
- initTrackedFields();
- }
- private void initTrackedFields(){
- trackedFields = new HashSet<String> ();
- trackedFields.add("odFlightNumber");
- trackedFields.add("odRapCodeDestination");
- trackedFields.add("odRfsCode");
- trackedFields.add("odBtd");
- trackedFields.add("odStd");
- trackedFields.add("odRspCode");
- trackedFields.add("odGatePcaInd");
- trackedFields.add("odRalCode");
- trackedFields.add("odPaxTotal");
- trackedFields.add("odInternalComment");
- }
- @Override
- public void onMessage(Message message) {
- if ( message != null )
- {
- LOGGER.debug("Received message");
- List<UBTransfer> response;
- try {
- response = UBTransferBridgeUtil.convertUBTransferInternalMarkupToUBTransferList(Arrays.asList(((TextMessage)message).getText()), true);
- for ( final UBTransfer transfer : response )
- {
- // System.out.println(UBTransferUtil.generateDebugString(transfer));
- if (transfer.getSimpleTransferType() != null
- && "com.airit.fim.ejb.entity.OpDepartures"
- .equals(transfer.getSimpleTransferType())) {
- if (transfer.getAction() != null && !transfer.getAction().toString().equals(ClientMessage.NO_ACTION) && getIsChanged(transfer)) {
- FlightBuilder fb = new FlightBuilder();
- LOGGER.info("Transfer Type:"
- + transfer.getTransferType());
- LOGGER.info("Action:"
- + transfer.getAction().toString());
- LOGGER.info("Simple Transfer Type:"
- + transfer.getSimpleTransferType());
- ClientMessage fm = new ClientMessage();
- fm.setAction(transfer.getAction().toString());
- fm.setFlight(fb.buildFlight(transfer, false));
- LOGGER.info("Processing Flight:"+fm.getFlight().getFlightNumber());
- sendFlightToClient(fm);
- }else{
- LOGGER.debug("Message Not Qualified");
- }
- }
- // transfer.getValue("odFlightNumber");
- }
- } catch (UBException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (JMSException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- }
- private boolean getIsChanged(UBTransfer transfer) {
- for(String field : trackedFields){
- if(transfer.isChanged(field)){
- LOGGER.info("Field Changed:"+field);
- return true;
- }
- }
- return false;
- }
- private void sendFlightToClient(ClientMessage cm) {
- if(cm.getAction().equals(cm.INSERT_ACTION)){
- HashSet<String> sessionList = buildSessionList(cm.getFlight());
- for (String sessionId : sessionList){
- sendMessageToClient(sessionId,cm);
- }
- }else{
- Long id = cm.getFlight().getId();
- Set<String> sessionList = getSessionMap().keySet();
- for (String sessionId : sessionList){
- if(getSessionMap().get(sessionId).contains(id)){
- sendMessageToClient(sessionId,cm);
- }
- }
- }
- }
- private HashSet<String> buildSessionList(Flight f) {
- Set<String> sessionList = getQueryMap().keySet();
- HashSet<String> qualifiedList = new HashSet<String>();
- UBNullComparator nc = new UBNullComparator();
- for (String sessionId : sessionList){
- boolean isValid = true;
- FlightQuery query = getQueryMap().get(sessionId);
- try {
- Timestamp start = parseTimestampFromString(query.getStartDate());
- Timestamp end = parseTimestampFromString(query.getEndDate());
- if(nc.compare(f.getStd(), start) < 0 ){
- LOGGER.info("Query 1 start:"+start);
- LOGGER.info("Query 1 end:"+end);
- LOGGER.info("Flight 1 std:"+f.getStd());
- LOGGER.info("Compare 1:"+nc.compare(f.getStd(), start));
- isValid = false;
- }
- if(isValid && nc.compare(f.getStd(), end) > 0 ){
- LOGGER.info("Query 2 start:"+start);
- LOGGER.info("Query 2 end:"+end);
- LOGGER.info("Flight 2 std:"+f.getStd());
- LOGGER.info("Compare 2:"+nc.compare(f.getStd(), end));
- isValid = false;
- }
- if(isValid && query.getAirline()!=null && nc.compare(f.getAirline(), query.getAirline()) != 0 ){
- LOGGER.info("Check 3 Flight Airline:"+f.getAirline());
- LOGGER.info("Query 3 Airline:"+query.getAirline());
- LOGGER.info("Compare 3:"+nc.compare(f.getAirline(), query.getAirline()));
- isValid = false;
- }
- if(isValid && query.getDestination()!=null && nc.compare(f.getDestination(), query.getDestination()) != 0 ){
- LOGGER.info("Check 4 Flight Dest:"+f.getDestination());
- LOGGER.info("Query 4 Dest:"+query.getDestination());
- LOGGER.info("Compare 4:"+nc.compare(f.getDestination(), query.getDestination()));
- isValid = false;
- }
- if(isValid){
- qualifiedList.add(sessionId);
- }
- } catch (ParseException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- return qualifiedList;
- }
- public Timestamp parseTimestampFromString(String timestamp) throws ParseException {
- String format = "MM/dd/yyyy HH:mm";
- DateFormat df = new SimpleDateFormat(format);
- // Set to comply with ISO 8601 standard
- Calendar c = new GregorianCalendar();
- df.setCalendar(c);
- return new Timestamp(df.parse(timestamp).getTime());
- }
- @PostConstruct
- void init() {
- LOGGER.info("STARTING MY BEAN...");
- initConnection();
- // other initialization logic
- }
- private void initConnection() {
- Properties properties = new Properties();
- properties.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
- properties.put(Context.URL_PKG_PREFIXES, "org.jboss.naming:org.jnp.interfaces");
- properties.put(Context.PROVIDER_URL, "jnp://10.6.164.36:1099");
- InitialContext iniCtx;
- try {
- iniCtx = new InitialContext(properties);
- Object connectionFactory = iniCtx.lookup("ConnectionFactory");
- TopicConnectionFactory qcf = (TopicConnectionFactory) connectionFactory;
- conn = qcf.createTopicConnection();
- Topic updateTopic = (Topic) iniCtx.lookup("topic/MobileUpdateTopic");
- TopicSession session = conn.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
- conn.start();
- TopicSubscriber subscriber = session.createSubscriber(updateTopic);
- LOGGER.info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>msg listener ADDED");
- subscriber.setMessageListener(this);
- conn.setExceptionListener(this);
- } catch (NamingException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (JMSException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- private void closeConnection() {
- try {
- conn.close();
- System.out.println("msg listener closed");
- } catch(Exception e) { }
- }
- @PreDestroy
- void cleanUp() {
- closeConnection();
- // other cleanup logic
- }
- @Override
- public void onException(JMSException arg0) {
- cleanUp();
- // TODO Auto-generated method stub
- }
- @Override
- public void callback(
- org.jboss.errai.bus.client.api.messaging.Message message) {
- ServerMessage serverMessage = message.get(ServerMessage.class, "serverMessage");
- if(serverMessage.getAction() != null){
- if(serverMessage.getAction().equals(ServerMessage.SESSION_ID_REQUEST)){
- LOGGER.info(">>>>>>> Received Session Id Request from client");
- String sessionId = getSessionId(message);
- LOGGER.info(">>>>>>> SENDING Session ID:"+sessionId);
- }else if(serverMessage.getAction().equals(ServerMessage.ACK_MESSAGE_RECEIVED) && serverMessage.getMessageId()!= null && serverMessage.getSessionId()!= null){
- LOGGER.info(">>>>>>> ReceivedACK Message from client >>"+serverMessage.getMessageId());
- removeCachedMessage(serverMessage.getSessionId(),serverMessage.getMessageId());
- }else if(serverMessage.getAction().equals(ServerMessage.REQ_OFFLINE_MESSAGES) && serverMessage.getSessionId()!= null){
- LOGGER.info(">>>>>>> Requesting Offline Message from client:"+serverMessage.getSessionId());
- checkCachedMessages(serverMessage.getSessionId());
- }
- }
- }
- private String getSessionId(org.jboss.errai.bus.client.api.messaging.Message message){
- QueueSession sess = message.getResource(QueueSession.class, Resources.Session.name());
- sess.addSessionEndListener(this);
- String sessionId = sess.getSessionId();
- ClientMessage clientMessage = new ClientMessage();
- clientMessage.setType(ClientMessage.INIT_SESSION_MESSAGE_TYPE);
- clientMessage.setSessionId(sessionId);
- MessageBuilder.createConversation(message)
- .toSubject("FlightClientReceiver") // (1)
- .signalling()
- // .with(MessageParts.SessionID, sessionId)// (2)
- // .with(MessageParts.CommandType, "getSessionId")
- .with("clientMessage", clientMessage)
- .noErrorHandling() // (4)
- .sendNowWith(dispatcher);
- return sessionId;
- }
- private void checkCachedMessages(String sessionId) {
- LOGGER.info(">>>>>>>CCM Checking Cached Messages for "+sessionId);
- if(getCachedMessagesMap().containsKey(sessionId)){
- HashMap<String,ClientMessage> messageMap = getCachedMessagesMap().get(sessionId);
- Set<String> messageIds = messageMap.keySet();
- LOGGER.info("<<<<<<<< Found Messages CCM - " +messageIds.size());
- for(String messageId : messageIds){
- ClientMessage cm = messageMap.get(messageId);
- sendMessageToClient(sessionId,cm);
- }
- }else{
- LOGGER.info("<<<<<<<< No Messages Found >>>>>>>>>>> ");
- }
- }
- private void removeCachedMessage(String sessionId, String messageId) {
- LOGGER.info(">>>>>>>RCM Removing Cached Messages for "+sessionId);
- if(getCachedMessagesMap().containsKey(sessionId)){
- LOGGER.info("<<<<<<<< Found Messages RCM- " +getCachedMessagesMap().get(sessionId).entrySet().size());
- HashMap<String,ClientMessage> messageMap = getCachedMessagesMap().get(sessionId);
- if(messageMap.containsKey(messageId)){
- messageMap.remove(messageId);
- }
- LOGGER.info("<<<<<<<< POST REMOVE Messages RCM- " +getCachedMessagesMap().get(sessionId).entrySet().size());
- }
- }
- private void cacheMessage(String sessionId, ClientMessage clientMessage,
- String messageId) {
- if(getCachedMessagesMap().containsKey(sessionId)){
- HashMap<String,ClientMessage> messageMap = getCachedMessagesMap().get(sessionId);
- messageMap.put(messageId, clientMessage);
- }else{
- HashMap<String,ClientMessage> messageMap = new HashMap<String,ClientMessage>();
- messageMap.put(messageId, clientMessage);
- getCachedMessagesMap().put(sessionId, messageMap);
- }
- }
- private void sendMessageToClient( String sessionId,ClientMessage clientMessage){
- if(getQueryMap().containsKey(sessionId)) {
- try {
- String messageId= getMessageId(sessionId,clientMessage.getFlight().getId());
- clientMessage.setMessageId(messageId);
- cacheMessage(sessionId,clientMessage,messageId);
- clientMessage.setType(ClientMessage.FLIGHT_UPDATE_MESSAGE_TYPE);
- clientMessage.setSessionId(sessionId);
- System.out.println("Sending Flight Update To Session ID :"+sessionId);
- MessageBuilder.createMessage()
- .toSubject("FlightClientReceiver") // (1)
- .signalling()
- .with(MessageParts.SessionID, sessionId)// (2)
- .with("clientMessage", clientMessage)
- .noErrorHandling() // (4)
- .sendNowWith(dispatcher);
- } catch (RuntimeException re) {
- String message = re.getMessage();
- LOGGER.info(">>>>>>> ERROR OCCURRED SENDING MESSAGE:"+message);
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }else{
- System.out.println("###<<<<<<<<<<<>>>>>>>>>>>>SESSION ENDED:PLACING MESSAGE IN CACHE:"+sessionId);
- // if(getClientCacheMap().containsKey(sessionId)){
- // getClientCacheMap().get(sessionId).add(flightMessage);
- // }else{
- // HashSet<FlightMessage> messages = new HashSet<FlightMessage>();
- // messages.add(flightMessage);
- // getClientCacheMap().put(sessionId, messages);
- // }
- }
- }
- @Override
- public void onSessionEnd(SessionEndEvent event) {
- // TODO Auto-generated method stub
- String sessionId = event.getSession().getSessionId();
- System.out.println("SESSION ENDED:"+sessionId);
- if(sessionId!= null){
- getSessionMap().remove(sessionId);
- getQueryMap().remove(sessionId);
- getCachedMessagesMap().remove(sessionId);
- }
- }
- @Override
- public void updateSessionMap(FlightQuery query,ArrayList<Flight> flightList){
- if(query.getSessionId() != null){
- getSessionMap().put(query.getSessionId(), getIds(flightList));
- getQueryMap().put(query.getSessionId(), query);
- }
- }
- private HashSet<Long> getIds(ArrayList<Flight> flightList) {
- HashSet<Long> idList = new HashSet<Long>();
- for(Flight f : flightList){
- idList.add(f.getId());
- }
- // TODO Auto-generated method stub
- return idList;
- }
- private ConcurrentHashMap<String,HashSet<Long>> getSessionMap(){
- if(sessionMap== null){
- sessionMap = new ConcurrentHashMap<String,HashSet<Long>>();
- }
- return sessionMap;
- }
- private ConcurrentHashMap<String,FlightQuery> getQueryMap(){
- if(queryMap== null){
- queryMap = new ConcurrentHashMap<String,FlightQuery>();
- }
- return queryMap;
- }
- private ConcurrentHashMap<String,HashMap<String,ClientMessage>>getCachedMessagesMap(){
- if(cachedMessagesMap== null){
- cachedMessagesMap = new ConcurrentHashMap<String,HashMap<String,ClientMessage>>();
- }
- return cachedMessagesMap;
- }
- private String getMessageId(String sessionId,Long flightId){
- String messageId = flightId .toString()+"-"+sessionId;
- return messageId;
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement