Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package org.jleaf.jms;
- import java.io.IOException;
- import java.io.PrintWriter;
- import java.io.StringWriter;
- import java.util.Arrays;
- import java.util.List;
- import javax.jms.JMSException;
- import javax.jms.Message;
- import javax.jms.TextMessage;
- import javax.naming.NamingException;
- import org.jleaf.core.BusinessFunction;
- import org.jleaf.core.BusinessTransaction;
- import org.jleaf.core.CoreException;
- import org.jleaf.core.Dto;
- import org.jleaf.core.GeneralConstants;
- import org.jleaf.jms.bo.JmsConstants;
- import org.jleaf.jms.bo.JmsExceptionConstants;
- import org.jleaf.util.DateUtil;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.context.support.ClassPathXmlApplicationContext;
- /**
- * @author DanieL, 1 Mar 2012
- * @version 1.0
- */
- public abstract class AbstractConsumer implements JleafJmsConsumer {
- private static final Logger log = LoggerFactory.getLogger(AbstractConsumer.class);
- private Long lineOk;
- private Long lineWarning;
- private Long lineError;
- /**
- * Get next line OK
- *
- * @return
- */
- protected Long getNextLineOk() {
- return ++this.lineOk;
- }
- /**
- * Get next line warning
- *
- * @return
- */
- protected Long getNextLineWarning() {
- return ++this.lineWarning;
- }
- /**
- * Get next line error
- *
- * @return
- */
- protected Long getNextLineError() {
- return ++this.lineError;
- }
- /**
- * Main function in Message Listener
- */
- @Override
- public final void onMessage(Message message) {
- if (message instanceof TextMessage) {
- TextMessage textMessage = (TextMessage) message;
- try {
- log.debug("BEGIN Consumer");
- log.info("Received TextMessage : " + textMessage.getText());
- init();
- log.debug("INIT SUCCESS .. ");
- String messageString = textMessage.getText();
- Dto messageDto = new Dto(messageString);
- log.debug("Getting MessageDTO : " + messageDto.toString());
- String processName = messageDto.getString("processName");
- String processNo = messageDto.getString("processNo");
- Long tenantId = messageDto.getLong("tenantId");
- log.debug("Getting : processName:" + processName + ";processNo:" + processNo + ";tenantId:" + tenantId);
- Dto paramDto = prepareParamQueue(processName, processNo, tenantId);
- if (paramDto != null) {
- log.debug("Finish Executing Prepare Param Queue : " + paramDto.toString());
- executeQueue(processName, processNo, tenantId, paramDto);
- log.debug("Finish Executing Queue");
- Dto resultDto = finishExecuteQueue(processName, processNo, tenantId);
- log.debug("Final Result : " + resultDto.toString());
- } else {
- log.error("Cannot processing data (processName:" + processName + ";processNo:" + processNo + ";tenantId:"
- + tenantId + ") because error in prepareParamQueue");
- }
- log.debug("END Consumer");
- } catch (JMSException e) {
- e.printStackTrace();
- log.error("Error reading message: " + getFullPrintStackTrace(e));
- }
- } else {
- // Another Message that not specified in this Abstract Consumer
- // So for now, we list it as an error in Queue Messaging
- System.exit(1);
- }
- }
- /**
- * Initiate line number for Process Ok, Error and Warning
- */
- private void init() {
- this.lineError = 0L;
- this.lineOk = 0L;
- this.lineWarning = 0L;
- }
- /**
- * Get autowired bean from Application context
- *
- * @param beanName
- * @return
- */
- protected final Object getAutowiredBean(String beanName) {
- ClassPathXmlApplicationContext context = getQueueAppContext().getContext();
- return context.getAutowireCapableBeanFactory().getBean(beanName);
- }
- /**
- * Get bean from application context
- *
- * @param beanName
- * @return
- */
- protected final Object getBean(String beanName){
- ClassPathXmlApplicationContext context = getQueueAppContext().getContext();
- return context.getBean(beanName);
- }
- /**
- * Used to get Business Transaction (Autowired Bean)
- *
- * @param beanName
- * @return
- */
- protected final BusinessTransaction getBusinessTransaction(String beanName) {
- return (BusinessTransaction) getAutowiredBean(beanName);
- }
- /**
- * Used to get Business Function (Autowired Bean)
- *
- * @param beanName
- * @return
- */
- private final BusinessFunction getBusinessFunction(String beanName) {
- return (BusinessFunction) getAutowiredBean(beanName);
- }
- /**
- * Write message to process result. Just 4096 character from Message will be saved
- *
- * @param processName
- * @param processNo
- * @param tenantId
- * @param status
- * @param message
- * @param line
- */
- private void writeMessage(String processName, String processNo, Long tenantId, String status, String message, Long line) {
- // Penambahan logic sesuai kesepakatan 23 Oct 2012, 11:00, bahwa field message harus dibatasi panjangnya hanya 4096
- String messageProcessResult = message;
- if (message.length() > 4096) {
- messageProcessResult = messageProcessResult.substring(0, 4096);
- }
- Dto writeMessageDto = new Dto();
- writeMessageDto.put("processName", processName);
- writeMessageDto.put("processNo", processNo);
- writeMessageDto.put("tenantId", tenantId);
- writeMessageDto.put("status", status);
- writeMessageDto.put("message", messageProcessResult);
- writeMessageDto.put("lineNo", line);
- try {
- getBusinessTransaction("addProcessResult").execute(writeMessageDto);
- } catch (Exception e) {
- log.error("ERROR EXCEPTION : addProcessResult " + getFullPrintStackTrace(e));
- if ((e instanceof CoreException)) {
- CoreException ce = (CoreException) e;
- /*
- * Hanya mungkin kejadian apabila ProcessMessage nya tidak terdaftar.
- * Kejadian tidak terdaftar kemungkinan dikarenakan :
- * - server-config yang salah. Database mengarah ke local, tetapi saat sending Queue, diarahkan ke Server lain
- * - Kecepatan commit database kalah dengan kecepatan sending to Queue yang dengan segera di proses
- */
- if (ce.getErrorKey().equals(JmsExceptionConstants.PROCESS_MESSAGE_NOT_FOUND)) {
- log.error("addProcessResult error because Process Message not exists", ce);
- try {
- String processResultMessage = getFullPrintStackTrace(ce);
- Long userLoginId = GeneralConstants.NULL_REF_VALUE_LONG;
- String dateTime = DateUtil.dateTimeNow();
- Dto inputDto = new Dto();
- inputDto.put("processName", processName);
- inputDto.put("processNo", processNo);
- inputDto.put("tenantId", tenantId);
- inputDto.put("processResultMessage", processResultMessage);
- inputDto.put("userLoginId", userLoginId);
- inputDto.put("datetime", dateTime);
- getBusinessTransaction("addProcessResultFatalError").execute(inputDto);
- } catch (Exception e1) {
- log.error("addProcessResultFalatError error storing to DB", e1);
- }
- } else {
- writeMessageError(processName, processNo, tenantId, ce.getErrorKey() + Arrays.toString(ce.getParamValues()));
- }
- } else {
- writeMessageError(processName, processNo, tenantId, getFullPrintStackTrace(e));
- }
- }
- }
- /**
- * Write message warning
- *
- * @param processName
- * @param processNo
- * @param tenantId
- * @param message
- */
- protected final void writeMessageWarning(String processName, String processNo, Long tenantId, String message) {
- writeMessage(processName, processNo, tenantId, JmsConstants.PROCESS_STATUS_WARNING, message, getNextLineWarning());
- }
- /**
- * Write message error
- *
- * @param processName
- * @param processNo
- * @param tenantId
- * @param message
- */
- protected final void writeMessageError(String processName, String processNo, Long tenantId, String message) {
- writeMessage(processName, processNo, tenantId, JmsConstants.PROCESS_STATUS_ERROR, message, getNextLineError());
- }
- /**
- * Write message ok
- *
- * @param processName
- * @param processNo
- * @param tenantId
- * @param message
- */
- protected final void writeMessageOk(String processName, String processNo, Long tenantId, String message) {
- writeMessage(processName, processNo, tenantId, JmsConstants.PROCESS_STATUS_OK, message, getNextLineOk());
- }
- /**
- * Preparing parameter for process queue
- *
- * @param processName
- * @param processNo
- * @param tenantId
- * @return
- */
- private Dto prepareParamQueue(String processName, String processNo, Long tenantId) {
- Dto prepareParamQueueDto = new Dto();
- prepareParamQueueDto.put("processName", processName);
- prepareParamQueueDto.put("processNo", processNo);
- prepareParamQueueDto.put("tenantId", tenantId);
- Dto paramDto = null;
- try {
- paramDto = getBusinessTransaction("startProcessQueue").execute(prepareParamQueueDto);
- } catch (Exception e) {
- log.error("ERROR EXCEPTION : startProcessQueue " + getFullPrintStackTrace(e));
- if ((e instanceof CoreException)) {
- CoreException ce = (CoreException) e;
- writeMessageError(processName, processNo, tenantId, ce.getErrorKey() + Arrays.toString(ce.getParamValues()));
- } else {
- writeMessageError(processName, processNo, tenantId, getFullPrintStackTrace(e));
- }
- }
- return paramDto;
- }
- /**
- * Executing process queue
- *
- * @param processName
- * @param processNo
- * @param tenantId
- * @param paramDto
- */
- private void executeQueue(String processName, String processNo, Long tenantId, Dto paramDto) {
- log.debug("BEGIN EXECUTING QUEUE");
- boolean success = false;
- Exception exception = null;
- try {
- paramDto.put("processName", processName);
- paramDto.put("processNo", processNo);
- processExecuteQueue(processName, processNo, tenantId, paramDto);
- success = true;
- log.debug("FINISH EXECUTING");
- } catch (Exception e) {
- success = false;
- exception = e;
- log.error("ERROR EXCEPTION : executeQueue " + getFullPrintStackTrace(e));
- if ((e instanceof CoreException)) {
- CoreException ce = (CoreException) e;
- writeMessageError(processName, processNo, tenantId, ce.getErrorKey() + Arrays.toString(ce.getParamValues()));
- } else {
- writeMessageError(processName, processNo, tenantId, getFullPrintStackTrace(e));
- }
- }
- // Process after executing Queue
- try {
- log.debug("After Executing Queue.." + success);
- if (success) {
- onProcessExecuteSuccess(processName, processNo, tenantId, paramDto);
- } else {
- onProcessExecuteError(processName, processNo, tenantId, paramDto, exception);
- }
- } catch (Exception e) {
- log.error("Error After executing Queue.. ");
- e.printStackTrace();
- if ((e instanceof CoreException)) {
- CoreException ce = (CoreException) e;
- writeMessageWarning(processName, processNo, tenantId, ce.getErrorKey() + Arrays.toString(ce.getParamValues()));
- } else {
- writeMessageWarning(processName, processNo, tenantId, getFullPrintStackTrace(e));
- }
- }
- log.debug("END EXECUTING QUEUE");
- }
- /**
- * Finishing execute queue.
- * If there is error message in process result, then process status = 'E'
- * else if there is warning message, but there is no error message in process result, then process status = 'W'
- * else if there is ok message, and there is no error and warning message in process result, then process status = 'S'
- *
- * @param processName
- * @param processNo
- * @param tenantId
- * @return
- */
- private Dto finishExecuteQueue(String processName, String processNo, Long tenantId) {
- Dto prepareFinishExecuteDto = new Dto();
- prepareFinishExecuteDto.put("processName", processName);
- prepareFinishExecuteDto.put("processNo", processNo);
- prepareFinishExecuteDto.put("tenantId", tenantId);
- Dto finishResultDto = null;
- try {
- finishResultDto = getBusinessTransaction("finishProcessQueue").execute(prepareFinishExecuteDto);
- } catch (Exception e) {
- log.error("ERROR EXCEPTION : finishProcessQueue " + getFullPrintStackTrace(e));
- if ((e instanceof CoreException)) {
- CoreException ce = (CoreException) e;
- writeMessageError(processName, processNo, tenantId, ce.getErrorKey() + Arrays.toString(ce.getParamValues()));
- } else {
- writeMessageError(processName, processNo, tenantId, getFullPrintStackTrace(e));
- }
- }
- return finishResultDto;
- }
- /**
- * Get Application Context for Queue Process
- *
- * @return
- */
- public abstract AbstractQueueAppContext getQueueAppContext();
- /**
- * Main executing process for queue
- *
- * @param processName
- * @param processNo
- * @param tenantId
- * @param paramDto
- * @throws Exception
- */
- protected abstract void processExecuteQueue(String processName, String processNo, Long tenantId, Dto paramDto)
- throws Exception;
- /**
- * For adding data to re-process
- * just can add it when status = 'E'
- *
- * @param processName
- * @param processNo
- * @param tenantId
- * @param lineQueue
- * @param reprocessName
- * @param reprocessNo
- * @param reprocessParameterDto
- * @param userLoginId
- */
- @Deprecated
- protected void prepareReprocess(String processName, String processNo, Long tenantId, String lineQueue, String reprocessName,
- String reprocessNo, Dto reprocessParameterDto, Long userLongId) {
- log.debug("BEGIN PREPARING REPROCESS");
- Dto addReprocessDto = new Dto();
- addReprocessDto.put("processName", processName);
- addReprocessDto.put("processNo", processNo);
- addReprocessDto.put("tenantId", tenantId);
- addReprocessDto.put("lineQueue", lineQueue);
- addReprocessDto.put("reprocessName", reprocessName);
- addReprocessDto.put("reprocessNo", reprocessNo);
- addReprocessDto.put("reprocessParameterDto", reprocessParameterDto);
- addReprocessDto.put("userLoginId", userLongId);
- try {
- getBusinessTransaction("addReprocess").execute(addReprocessDto);
- } catch (Exception e) {
- log.error("ERROR EXCEPTION : addReprocess " + getFullPrintStackTrace(e));
- if ((e instanceof CoreException)) {
- CoreException ce = (CoreException) e;
- writeMessageError(processName, processNo, tenantId, ce.getErrorKey() + Arrays.toString(ce.getParamValues()));
- } else {
- writeMessageError(processName, processNo, tenantId, getFullPrintStackTrace(e));
- }
- }
- log.debug("END PREPARING REPROCESS");
- }
- /**
- * Get Full Stack Trace in String
- *
- * http://stackoverflow.com/questions/4812570/how-to-store-printstacktrace-into-a-string
- * @param ex
- * @return
- */
- String getFullPrintStackTrace(Throwable ex) {
- StringWriter errors = new StringWriter();
- ex.printStackTrace(new PrintWriter(errors));
- return errors.toString();
- }
- /**
- * on Exception
- */
- @Override
- public void onException(JMSException exception) {
- System.out.println("JMS Exception occured. Shutting down client.");
- exception.printStackTrace();
- log.error("JMS Exception occurred.", exception);
- System.exit(1);
- }
- /**
- * on process success
- *
- * @param processName
- * @param processNo
- * @param tenantId
- * @param paramDto
- * @throws Exception
- */
- void onProcessExecuteSuccess(String processName, String processNo, Long tenantId, Dto paramDto) throws Exception {
- onProcessSuccess(processName, processNo, tenantId, paramDto);
- }
- /**
- * on process success user define
- *
- * @param processName
- * @param processNo
- * @param tenantId
- * @param paramDto
- */
- protected void onProcessSuccess(String processName, String processNo, Long tenantId, Dto paramDto) throws Exception {
- // DO NOTHING
- }
- /**
- * on process error
- *
- * @param processName
- * @param processNo
- * @param tenantId
- * @param paramDto
- * @throws NamingException
- * @throws IOException
- */
- void onProcessExecuteError(String processName, String processNo, Long tenantId, Dto paramDto, Exception exception)
- throws Exception {
- onProcessError(processName, processNo, tenantId, paramDto);
- }
- /**
- * on process error user define
- *
- * @param processName
- * @param processNo
- * @param tenantId
- * @param paramDto
- */
- protected void onProcessError(String processName, String processNo, Long tenantId, Dto paramDto) throws Exception {
- // DO NOTHING
- }
- /**
- * Get process message by specified id
- * Using BusinessFunction findProcessMessageByIndex
- *
- * @param processName
- * @param processNo
- * @param tenantId
- * @return
- * @throws Exception
- */
- private Long getProcessMessageId(String processName, String processNo, Long tenantId) throws Exception {
- Dto processMessageDto = new Dto();
- processMessageDto.put("processName", processName);
- processMessageDto.put("processNo", processNo);
- processMessageDto.put("tenantId", tenantId);
- processMessageDto = getBusinessFunction("findProcessMessageByIndex").execute(processMessageDto);
- return processMessageDto.getLong("id");
- }
- @SuppressWarnings("unchecked")
- List<Dto> getProcessResultOkList(String processName, String processNo, Long tenantId) throws Exception {
- Long processMessageId = getProcessMessageId(processName, processNo, tenantId);
- Dto processResultOkListDto = new Dto();
- processResultOkListDto.put("processMessageId", processMessageId);
- processResultOkListDto = getBusinessFunction("getProcessResultOkListByProcessMessageId").execute(processResultOkListDto);
- List<Dto> processResultOkList = processResultOkListDto.getList("processResultOkList");
- return processResultOkList;
- }
- @SuppressWarnings("unchecked")
- List<Dto> getProcessResultWarningList(String processName, String processNo, Long tenantId) throws Exception {
- Long processMessageId = getProcessMessageId(processName, processNo, tenantId);
- Dto processResultWarningListDto = new Dto();
- processResultWarningListDto.put("processMessageId", processMessageId);
- processResultWarningListDto = getBusinessFunction("getProcessResultWarningListByProcessMessageId").execute(
- processResultWarningListDto);
- List<Dto> processResultWarningList = processResultWarningListDto.getList("processResultWarningList");
- return processResultWarningList;
- }
- @SuppressWarnings("unchecked")
- List<Dto> getProcessResultErrorList(String processName, String processNo, Long tenantId) throws Exception {
- Long processMessageId = getProcessMessageId(processName, processNo, tenantId);
- Dto processResultErrorListDto = new Dto();
- processResultErrorListDto.put("processMessageId", processMessageId);
- processResultErrorListDto = getBusinessFunction("getProcessResultErrorListByProcessMessageId").execute(
- processResultErrorListDto);
- List<Dto> processResultErrorList = processResultErrorListDto.getList("processResultErrorList");
- return processResultErrorList;
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement