Advertisement
aadddrr

AbstractConsumer

Oct 14th, 2018
86
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. package org.jleaf.jms;
  2.  
  3. import java.io.IOException;
  4. import java.io.PrintWriter;
  5. import java.io.StringWriter;
  6. import java.util.Arrays;
  7. import java.util.List;
  8.  
  9. import javax.jms.JMSException;
  10. import javax.jms.Message;
  11. import javax.jms.TextMessage;
  12. import javax.naming.NamingException;
  13.  
  14. import org.jleaf.core.BusinessFunction;
  15. import org.jleaf.core.BusinessTransaction;
  16. import org.jleaf.core.CoreException;
  17. import org.jleaf.core.Dto;
  18. import org.jleaf.core.GeneralConstants;
  19. import org.jleaf.jms.bo.JmsConstants;
  20. import org.jleaf.jms.bo.JmsExceptionConstants;
  21. import org.jleaf.util.DateUtil;
  22. import org.slf4j.Logger;
  23. import org.slf4j.LoggerFactory;
  24. import org.springframework.context.support.ClassPathXmlApplicationContext;
  25.  
  26. /**
  27.  * @author DanieL, 1 Mar 2012
  28.  * @version 1.0
  29.  */
  30.  
  31. public abstract class AbstractConsumer implements JleafJmsConsumer {
  32.  
  33.     private static final Logger log = LoggerFactory.getLogger(AbstractConsumer.class);
  34.  
  35.     private Long lineOk;
  36.     private Long lineWarning;
  37.     private Long lineError;
  38.  
  39.     /**
  40.      * Get next line OK
  41.      *
  42.      * @return
  43.      */
  44.     protected Long getNextLineOk() {
  45.         return ++this.lineOk;
  46.     }
  47.  
  48.     /**
  49.      * Get next line warning
  50.      *
  51.      * @return
  52.      */
  53.     protected Long getNextLineWarning() {
  54.         return ++this.lineWarning;
  55.     }
  56.  
  57.     /**
  58.      * Get next line error
  59.      *
  60.      * @return
  61.      */
  62.     protected Long getNextLineError() {
  63.         return ++this.lineError;
  64.     }
  65.  
  66.     /**
  67.      * Main function in Message Listener
  68.      */
  69.     @Override
  70.     public final void onMessage(Message message) {
  71.         if (message instanceof TextMessage) {
  72.             TextMessage textMessage = (TextMessage) message;
  73.             try {
  74.                 log.debug("BEGIN Consumer");
  75.                 log.info("Received TextMessage : " + textMessage.getText());
  76.  
  77.                 init();
  78.  
  79.                 log.debug("INIT SUCCESS .. ");
  80.  
  81.                 String messageString = textMessage.getText();
  82.                 Dto messageDto = new Dto(messageString);
  83.  
  84.                 log.debug("Getting MessageDTO : " + messageDto.toString());
  85.  
  86.                 String processName = messageDto.getString("processName");
  87.                 String processNo = messageDto.getString("processNo");
  88.                 Long tenantId = messageDto.getLong("tenantId");
  89.  
  90.                 log.debug("Getting : processName:" + processName + ";processNo:" + processNo + ";tenantId:" + tenantId);
  91.  
  92.                 Dto paramDto = prepareParamQueue(processName, processNo, tenantId);
  93.  
  94.                 if (paramDto != null) {
  95.  
  96.                     log.debug("Finish Executing Prepare Param Queue : " + paramDto.toString());
  97.  
  98.                     executeQueue(processName, processNo, tenantId, paramDto);
  99.  
  100.                     log.debug("Finish Executing Queue");
  101.  
  102.                     Dto resultDto = finishExecuteQueue(processName, processNo, tenantId);
  103.  
  104.                     log.debug("Final Result : " + resultDto.toString());
  105.                 } else {
  106.                     log.error("Cannot processing data (processName:" + processName + ";processNo:" + processNo + ";tenantId:"
  107.                             + tenantId + ") because error in prepareParamQueue");
  108.                 }
  109.  
  110.                 log.debug("END Consumer");
  111.             } catch (JMSException e) {
  112.                 e.printStackTrace();
  113.                 log.error("Error reading message: " + getFullPrintStackTrace(e));
  114.             }
  115.         } else {
  116.             // Another Message that not specified in this Abstract Consumer
  117.             // So for now, we list it as an error in Queue Messaging
  118.             System.exit(1);
  119.         }
  120.  
  121.     }
  122.  
  123.     /**
  124.      * Initiate line number for Process Ok, Error and Warning
  125.      */
  126.     private void init() {
  127.         this.lineError = 0L;
  128.         this.lineOk = 0L;
  129.         this.lineWarning = 0L;
  130.     }
  131.  
  132.     /**
  133.      * Get autowired bean from Application context
  134.      *
  135.      * @param beanName
  136.      * @return
  137.      */
  138.     protected final Object getAutowiredBean(String beanName) {
  139.         ClassPathXmlApplicationContext context = getQueueAppContext().getContext();
  140.         return context.getAutowireCapableBeanFactory().getBean(beanName);
  141.     }
  142.  
  143.     /**
  144.      * Get bean from application context
  145.      *
  146.      * @param beanName
  147.      * @return
  148.      */
  149.     protected final Object getBean(String beanName){
  150.         ClassPathXmlApplicationContext context = getQueueAppContext().getContext();
  151.         return context.getBean(beanName);
  152.     }
  153.     /**
  154.      * Used to get Business Transaction (Autowired Bean)
  155.      *
  156.      * @param beanName
  157.      * @return
  158.      */
  159.     protected final BusinessTransaction getBusinessTransaction(String beanName) {
  160.         return (BusinessTransaction) getAutowiredBean(beanName);
  161.     }
  162.  
  163.     /**
  164.      * Used to get Business Function (Autowired Bean)
  165.      *
  166.      * @param beanName
  167.      * @return
  168.      */
  169.     private final BusinessFunction getBusinessFunction(String beanName) {
  170.         return (BusinessFunction) getAutowiredBean(beanName);
  171.     }
  172.  
  173.     /**
  174.      * Write message to process result. Just 4096 character from Message will be saved
  175.      *
  176.      * @param processName
  177.      * @param processNo
  178.      * @param tenantId
  179.      * @param status
  180.      * @param message
  181.      * @param line
  182.      */
  183.     private void writeMessage(String processName, String processNo, Long tenantId, String status, String message, Long line) {
  184.         // Penambahan logic sesuai kesepakatan 23 Oct 2012, 11:00, bahwa field message harus dibatasi panjangnya hanya 4096
  185.         String messageProcessResult = message;
  186.         if (message.length() > 4096) {
  187.             messageProcessResult = messageProcessResult.substring(0, 4096);
  188.         }
  189.  
  190.         Dto writeMessageDto = new Dto();
  191.         writeMessageDto.put("processName", processName);
  192.         writeMessageDto.put("processNo", processNo);
  193.         writeMessageDto.put("tenantId", tenantId);
  194.         writeMessageDto.put("status", status);
  195.         writeMessageDto.put("message", messageProcessResult);
  196.         writeMessageDto.put("lineNo", line);
  197.  
  198.         try {
  199.             getBusinessTransaction("addProcessResult").execute(writeMessageDto);
  200.         } catch (Exception e) {
  201.             log.error("ERROR EXCEPTION : addProcessResult " + getFullPrintStackTrace(e));
  202.             if ((e instanceof CoreException)) {
  203.                 CoreException ce = (CoreException) e;
  204.  
  205.                 /*
  206.                  *  Hanya mungkin kejadian apabila ProcessMessage nya tidak terdaftar.
  207.                  *  Kejadian tidak terdaftar kemungkinan dikarenakan :
  208.                  *  - server-config yang salah. Database mengarah ke local, tetapi saat sending Queue, diarahkan ke Server lain
  209.                  *  - Kecepatan commit database kalah dengan kecepatan sending to Queue yang dengan segera di proses
  210.                  */
  211.                 if (ce.getErrorKey().equals(JmsExceptionConstants.PROCESS_MESSAGE_NOT_FOUND)) {
  212.                     log.error("addProcessResult error because Process Message not exists", ce);
  213.                     try {
  214.                         String processResultMessage = getFullPrintStackTrace(ce);
  215.                         Long userLoginId = GeneralConstants.NULL_REF_VALUE_LONG;
  216.                         String dateTime = DateUtil.dateTimeNow();
  217.  
  218.                         Dto inputDto = new Dto();
  219.                         inputDto.put("processName", processName);
  220.                         inputDto.put("processNo", processNo);
  221.                         inputDto.put("tenantId", tenantId);
  222.                         inputDto.put("processResultMessage", processResultMessage);
  223.                         inputDto.put("userLoginId", userLoginId);
  224.                         inputDto.put("datetime", dateTime);
  225.  
  226.                         getBusinessTransaction("addProcessResultFatalError").execute(inputDto);
  227.                     } catch (Exception e1) {
  228.                         log.error("addProcessResultFalatError error storing to DB", e1);
  229.                     }
  230.                 } else {
  231.                     writeMessageError(processName, processNo, tenantId, ce.getErrorKey() + Arrays.toString(ce.getParamValues()));
  232.                 }
  233.             } else {
  234.                 writeMessageError(processName, processNo, tenantId, getFullPrintStackTrace(e));
  235.             }
  236.         }
  237.     }
  238.  
  239.     /**
  240.      * Write message warning
  241.      *
  242.      * @param processName
  243.      * @param processNo
  244.      * @param tenantId
  245.      * @param message
  246.      */
  247.     protected final void writeMessageWarning(String processName, String processNo, Long tenantId, String message) {
  248.         writeMessage(processName, processNo, tenantId, JmsConstants.PROCESS_STATUS_WARNING, message, getNextLineWarning());
  249.     }
  250.  
  251.     /**
  252.      * Write message error
  253.      *
  254.      * @param processName
  255.      * @param processNo
  256.      * @param tenantId
  257.      * @param message
  258.      */
  259.     protected final void writeMessageError(String processName, String processNo, Long tenantId, String message) {
  260.         writeMessage(processName, processNo, tenantId, JmsConstants.PROCESS_STATUS_ERROR, message, getNextLineError());
  261.     }
  262.  
  263.     /**
  264.      * Write message ok
  265.      *
  266.      * @param processName
  267.      * @param processNo
  268.      * @param tenantId
  269.      * @param message
  270.      */
  271.     protected final void writeMessageOk(String processName, String processNo, Long tenantId, String message) {
  272.         writeMessage(processName, processNo, tenantId, JmsConstants.PROCESS_STATUS_OK, message, getNextLineOk());
  273.     }
  274.  
  275.     /**
  276.      * Preparing parameter for process queue
  277.      *
  278.      * @param processName
  279.      * @param processNo
  280.      * @param tenantId
  281.      * @return
  282.      */
  283.     private Dto prepareParamQueue(String processName, String processNo, Long tenantId) {
  284.         Dto prepareParamQueueDto = new Dto();
  285.         prepareParamQueueDto.put("processName", processName);
  286.         prepareParamQueueDto.put("processNo", processNo);
  287.         prepareParamQueueDto.put("tenantId", tenantId);
  288.  
  289.         Dto paramDto = null;
  290.  
  291.         try {
  292.             paramDto = getBusinessTransaction("startProcessQueue").execute(prepareParamQueueDto);
  293.         } catch (Exception e) {
  294.             log.error("ERROR EXCEPTION : startProcessQueue " + getFullPrintStackTrace(e));
  295.             if ((e instanceof CoreException)) {
  296.                 CoreException ce = (CoreException) e;
  297.                 writeMessageError(processName, processNo, tenantId, ce.getErrorKey() + Arrays.toString(ce.getParamValues()));
  298.             } else {
  299.                 writeMessageError(processName, processNo, tenantId, getFullPrintStackTrace(e));
  300.             }
  301.         }
  302.  
  303.         return paramDto;
  304.     }
  305.  
  306.     /**
  307.      * Executing process queue
  308.      *
  309.      * @param processName
  310.      * @param processNo
  311.      * @param tenantId
  312.      * @param paramDto
  313.      */
  314.     private void executeQueue(String processName, String processNo, Long tenantId, Dto paramDto) {
  315.         log.debug("BEGIN EXECUTING QUEUE");
  316.  
  317.         boolean success = false;
  318.         Exception exception = null;
  319.  
  320.         try {
  321.             paramDto.put("processName", processName);
  322.             paramDto.put("processNo", processNo);
  323.  
  324.             processExecuteQueue(processName, processNo, tenantId, paramDto);
  325.             success = true;
  326.             log.debug("FINISH EXECUTING");
  327.         } catch (Exception e) {
  328.             success = false;
  329.             exception = e;
  330.             log.error("ERROR EXCEPTION : executeQueue " + getFullPrintStackTrace(e));
  331.  
  332.             if ((e instanceof CoreException)) {
  333.                 CoreException ce = (CoreException) e;
  334.                 writeMessageError(processName, processNo, tenantId, ce.getErrorKey() + Arrays.toString(ce.getParamValues()));
  335.             } else {
  336.                 writeMessageError(processName, processNo, tenantId, getFullPrintStackTrace(e));
  337.             }
  338.         }
  339.  
  340.         // Process after executing Queue
  341.         try {
  342.             log.debug("After Executing Queue.." + success);
  343.             if (success) {
  344.                 onProcessExecuteSuccess(processName, processNo, tenantId, paramDto);
  345.             } else {
  346.                 onProcessExecuteError(processName, processNo, tenantId, paramDto, exception);
  347.             }
  348.         } catch (Exception e) {
  349.             log.error("Error After executing Queue.. ");
  350.             e.printStackTrace();
  351.  
  352.             if ((e instanceof CoreException)) {
  353.                 CoreException ce = (CoreException) e;
  354.                 writeMessageWarning(processName, processNo, tenantId, ce.getErrorKey() + Arrays.toString(ce.getParamValues()));
  355.             } else {
  356.                 writeMessageWarning(processName, processNo, tenantId, getFullPrintStackTrace(e));
  357.             }
  358.         }
  359.  
  360.         log.debug("END EXECUTING QUEUE");
  361.     }
  362.  
  363.     /**
  364.      * Finishing execute queue.
  365.      * If there is error message in process result, then process status = 'E'
  366.      * else if there is warning message, but there is no error message in process result, then process status = 'W'
  367.      * else if there is ok message, and there is no error and warning message in process result, then process status = 'S'
  368.      *
  369.      * @param processName
  370.      * @param processNo
  371.      * @param tenantId
  372.      * @return
  373.      */
  374.     private Dto finishExecuteQueue(String processName, String processNo, Long tenantId) {
  375.         Dto prepareFinishExecuteDto = new Dto();
  376.         prepareFinishExecuteDto.put("processName", processName);
  377.         prepareFinishExecuteDto.put("processNo", processNo);
  378.         prepareFinishExecuteDto.put("tenantId", tenantId);
  379.  
  380.         Dto finishResultDto = null;
  381.         try {
  382.             finishResultDto = getBusinessTransaction("finishProcessQueue").execute(prepareFinishExecuteDto);
  383.         } catch (Exception e) {
  384.             log.error("ERROR EXCEPTION : finishProcessQueue " + getFullPrintStackTrace(e));
  385.             if ((e instanceof CoreException)) {
  386.                 CoreException ce = (CoreException) e;
  387.                 writeMessageError(processName, processNo, tenantId, ce.getErrorKey() + Arrays.toString(ce.getParamValues()));
  388.             } else {
  389.                 writeMessageError(processName, processNo, tenantId, getFullPrintStackTrace(e));
  390.             }
  391.         }
  392.  
  393.         return finishResultDto;
  394.     }
  395.  
  396.     /**
  397.      * Get Application Context for Queue Process
  398.      *
  399.      * @return
  400.      */
  401.     public abstract AbstractQueueAppContext getQueueAppContext();
  402.  
  403.     /**
  404.      * Main executing process for queue
  405.      *
  406.      * @param processName
  407.      * @param processNo
  408.      * @param tenantId
  409.      * @param paramDto
  410.      * @throws Exception
  411.      */
  412.     protected abstract void processExecuteQueue(String processName, String processNo, Long tenantId, Dto paramDto)
  413.             throws Exception;
  414.  
  415.     /**
  416.      * For adding data to re-process
  417.      * just can add it when status = 'E'
  418.      *
  419.      * @param processName
  420.      * @param processNo
  421.      * @param tenantId
  422.      * @param lineQueue
  423.      * @param reprocessName
  424.      * @param reprocessNo
  425.      * @param reprocessParameterDto
  426.      * @param userLoginId
  427.      */
  428.     @Deprecated
  429.     protected void prepareReprocess(String processName, String processNo, Long tenantId, String lineQueue, String reprocessName,
  430.             String reprocessNo, Dto reprocessParameterDto, Long userLongId) {
  431.         log.debug("BEGIN PREPARING REPROCESS");
  432.  
  433.         Dto addReprocessDto = new Dto();
  434.         addReprocessDto.put("processName", processName);
  435.         addReprocessDto.put("processNo", processNo);
  436.         addReprocessDto.put("tenantId", tenantId);
  437.         addReprocessDto.put("lineQueue", lineQueue);
  438.         addReprocessDto.put("reprocessName", reprocessName);
  439.         addReprocessDto.put("reprocessNo", reprocessNo);
  440.         addReprocessDto.put("reprocessParameterDto", reprocessParameterDto);
  441.         addReprocessDto.put("userLoginId", userLongId);
  442.  
  443.         try {
  444.             getBusinessTransaction("addReprocess").execute(addReprocessDto);
  445.         } catch (Exception e) {
  446.             log.error("ERROR EXCEPTION : addReprocess " + getFullPrintStackTrace(e));
  447.             if ((e instanceof CoreException)) {
  448.                 CoreException ce = (CoreException) e;
  449.                 writeMessageError(processName, processNo, tenantId, ce.getErrorKey() + Arrays.toString(ce.getParamValues()));
  450.             } else {
  451.                 writeMessageError(processName, processNo, tenantId, getFullPrintStackTrace(e));
  452.             }
  453.         }
  454.  
  455.         log.debug("END PREPARING REPROCESS");
  456.     }
  457.  
  458.     /**
  459.      * Get Full Stack Trace in String
  460.      *
  461.      * http://stackoverflow.com/questions/4812570/how-to-store-printstacktrace-into-a-string
  462.      * @param ex
  463.      * @return
  464.      */
  465.     String getFullPrintStackTrace(Throwable ex) {
  466.         StringWriter errors = new StringWriter();
  467.         ex.printStackTrace(new PrintWriter(errors));
  468.         return errors.toString();
  469.     }
  470.  
  471.     /**
  472.      * on Exception
  473.      */
  474.     @Override
  475.     public void onException(JMSException exception) {
  476.         System.out.println("JMS Exception occured.  Shutting down client.");
  477.         exception.printStackTrace();
  478.         log.error("JMS Exception occurred.", exception);
  479.         System.exit(1);
  480.     }
  481.  
  482.     /**
  483.      * on process success
  484.      *
  485.      * @param processName
  486.      * @param processNo
  487.      * @param tenantId
  488.      * @param paramDto
  489.      * @throws Exception
  490.      */
  491.     void onProcessExecuteSuccess(String processName, String processNo, Long tenantId, Dto paramDto) throws Exception {
  492.         onProcessSuccess(processName, processNo, tenantId, paramDto);
  493.     }
  494.  
  495.     /**
  496.      * on process success user define
  497.      *
  498.      * @param processName
  499.      * @param processNo
  500.      * @param tenantId
  501.      * @param paramDto
  502.      */
  503.     protected void onProcessSuccess(String processName, String processNo, Long tenantId, Dto paramDto) throws Exception {
  504.         // DO NOTHING
  505.     }
  506.  
  507.     /**
  508.      * on process error
  509.      *
  510.      * @param processName
  511.      * @param processNo
  512.      * @param tenantId
  513.      * @param paramDto
  514.      * @throws NamingException
  515.      * @throws IOException
  516.      */
  517.     void onProcessExecuteError(String processName, String processNo, Long tenantId, Dto paramDto, Exception exception)
  518.             throws Exception {
  519.         onProcessError(processName, processNo, tenantId, paramDto);
  520.     }
  521.  
  522.     /**
  523.      * on process error user define
  524.      *
  525.      * @param processName
  526.      * @param processNo
  527.      * @param tenantId
  528.      * @param paramDto
  529.      */
  530.     protected void onProcessError(String processName, String processNo, Long tenantId, Dto paramDto) throws Exception {
  531.         // DO NOTHING
  532.     }
  533.  
  534.     /**
  535.      * Get process message by specified id
  536.      * Using BusinessFunction findProcessMessageByIndex
  537.      *
  538.      * @param processName
  539.      * @param processNo
  540.      * @param tenantId
  541.      * @return
  542.      * @throws Exception
  543.      */
  544.     private Long getProcessMessageId(String processName, String processNo, Long tenantId) throws Exception {
  545.         Dto processMessageDto = new Dto();
  546.         processMessageDto.put("processName", processName);
  547.         processMessageDto.put("processNo", processNo);
  548.         processMessageDto.put("tenantId", tenantId);
  549.         processMessageDto = getBusinessFunction("findProcessMessageByIndex").execute(processMessageDto);
  550.  
  551.         return processMessageDto.getLong("id");
  552.     }
  553.  
  554.     @SuppressWarnings("unchecked")
  555.     List<Dto> getProcessResultOkList(String processName, String processNo, Long tenantId) throws Exception {
  556.         Long processMessageId = getProcessMessageId(processName, processNo, tenantId);
  557.  
  558.         Dto processResultOkListDto = new Dto();
  559.         processResultOkListDto.put("processMessageId", processMessageId);
  560.         processResultOkListDto = getBusinessFunction("getProcessResultOkListByProcessMessageId").execute(processResultOkListDto);
  561.  
  562.         List<Dto> processResultOkList = processResultOkListDto.getList("processResultOkList");
  563.         return processResultOkList;
  564.     }
  565.  
  566.     @SuppressWarnings("unchecked")
  567.     List<Dto> getProcessResultWarningList(String processName, String processNo, Long tenantId) throws Exception {
  568.         Long processMessageId = getProcessMessageId(processName, processNo, tenantId);
  569.  
  570.         Dto processResultWarningListDto = new Dto();
  571.         processResultWarningListDto.put("processMessageId", processMessageId);
  572.         processResultWarningListDto = getBusinessFunction("getProcessResultWarningListByProcessMessageId").execute(
  573.                 processResultWarningListDto);
  574.  
  575.         List<Dto> processResultWarningList = processResultWarningListDto.getList("processResultWarningList");
  576.         return processResultWarningList;
  577.     }
  578.  
  579.     @SuppressWarnings("unchecked")
  580.     List<Dto> getProcessResultErrorList(String processName, String processNo, Long tenantId) throws Exception {
  581.         Long processMessageId = getProcessMessageId(processName, processNo, tenantId);
  582.  
  583.         Dto processResultErrorListDto = new Dto();
  584.         processResultErrorListDto.put("processMessageId", processMessageId);
  585.         processResultErrorListDto = getBusinessFunction("getProcessResultErrorListByProcessMessageId").execute(
  586.                 processResultErrorListDto);
  587.  
  588.         List<Dto> processResultErrorList = processResultErrorListDto.getList("processResultErrorList");
  589.         return processResultErrorList;
  590.     }
  591. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement