Advertisement
ibragimova_mariam

MyPaste_db

Oct 27th, 2020
1,064
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 42.63 KB | None | 0 0
  1. public class DelayedDataSnapshot extends Action
  2. {
  3.     private static final String QUERIES_PATH = getInstance().getConfigFiles().getCfgDir() +
  4.             "delayedOrderBookSnapshot/sql/";
  5.     private static final String CREATE_RESULT_TABLE = "createResultTable.sql";
  6.  
  7.     private static final String SELECT_ALL_MESSAGES_BY_TIME = "SELECT * FROM %s WHERE " +
  8.             "'#LeftTimestamp'<MessageTimestamp AND MessageTimestamp<='#RightTimestamp' ORDER BY MessageTimestamp";
  9.  
  10.     private static final String MESSAGE_TYPE = "Message Type";
  11.     private static final String SIDE = "Side";
  12.     private static final String ORDER_ID = "Order ID";
  13.     private static final String PRICE = "Price";
  14.     private static final String MESSAGE_TIMESTAMP = "MessageTimestamp";
  15.  
  16.     private static final String AGGREGATED_DATA_TABLE = "AggregatedData";
  17.     private static final String PREVIOUS_SNAPSHOT_TABLE = "PreviousSnapshot", CURRENT_SNAPSHOT_TABLE = "CurrentSnapshot";
  18.     private static final String TEMP_PREVIOUS_SNAPSHOT_TABLE = "TempPreviousSnapshot", TEMP_CURRENT_SNAPSHOT_TABLE =
  19.             "TempCurrentSnapshot";
  20.  
  21.     private GdlGlobalContext globalContext;
  22.     private String addOrderTable, modifyOrderTable, deleteOrderTable, orderBookClearTable, orderBookSnapshotTable,
  23.             orderBookSnapshotTempTable;
  24.     private String cthEndTime;
  25.     private String dbConnectionName;
  26.     private SnapshotHandler snapshotHandler;
  27.     private String previousSnapshotTime, currentSnapshotTime, orderBookSnapshotTime;
  28.     private Map<String, String> currentQueryParams;
  29.     private Set<String> instruments;
  30.     private String insertFromAllDataQuery, insertAddRowToSnapshotQuery, insertDeleteRowToSnapshotQuery,
  31.             insertDeleteRowWithSpecParamsQuery, insertFromAllDataWithSpecParamsQuery,
  32.             insertDeleteRowFromModifyToSnapQuery, deleteFromTempQuery;
  33.  
  34.     private String finalPreviousSnapshotTable, finalCurrentSnapshotTable;
  35.  
  36.     @Override
  37.     protected Result run(StepContext stepContext, MatrixContext matrixContext, GlobalContext globalContext)
  38.             throws ResultException
  39.     {
  40.         this.globalContext = (GdlGlobalContext) globalContext;
  41.         initParameters();
  42.  
  43.         logger.debug("CthEndTime: {} ({})", cthEndTime, GdlTimestampUtils.nanosStringToDateWithFractions(cthEndTime,
  44.                 "6"));
  45.         currentSnapshotTime = GdlTimestampUtils.timestampNanosToRoundedTimestampMinusMinutesStr(cthEndTime, 0);
  46.         previousSnapshotTime = GdlTimestampUtils.timestampNanosToRoundedTimestampMinusMinutesStr(currentSnapshotTime, 1);
  47.         orderBookSnapshotTime =
  48.                 GdlTimestampUtils.timestampNanosToRoundedTimestampMinusMinutesStr(previousSnapshotTime, 1);
  49.         logger.debug("CurrentSnapshotTime: {} ({})", currentSnapshotTime,
  50.                 GdlTimestampUtils.nanosStringToDateWithFractions(currentSnapshotTime, "6"));
  51.         logger.debug("PreviousSnapshotTime: {} ({})", previousSnapshotTime,
  52.                 GdlTimestampUtils.nanosStringToDateWithFractions(previousSnapshotTime, "6"));
  53.         logger.debug("OrderBookSnapshotTime: {} ({})", orderBookSnapshotTime,
  54.                 GdlTimestampUtils.nanosStringToDateWithFractions(orderBookSnapshotTime, "6"));
  55.  
  56.         try
  57.         {
  58.             executeQueries();
  59.         }
  60.         catch (SQLException e)
  61.         {
  62.             String errMessage = "Error while committing changes";
  63.             this.logger.error(errMessage, e);
  64.             throw new ResultException(errMessage, e);
  65.         }
  66.  
  67.         return DefaultResult.passed("Two snapshots were made successfully, result in two tables - 'FinalPreviousSnapshotTable' " +
  68.                 "and 'FinalCurrentSnapshotTable'.");
  69.     }
  70.  
  71.     private void initParameters()
  72.     {
  73.         this.getLogger().debug("Initializing special action parameters");
  74.         InputParamsHandler handler = new InputParamsHandler(this.inputParams);
  75.  
  76.         this.addOrderTable = handler.getRequiredString("AddOrderTable");
  77.         this.modifyOrderTable = handler.getRequiredString("ModifyOrderTable");
  78.         this.deleteOrderTable = handler.getRequiredString("DeleteOrderTable");
  79.         this.orderBookClearTable = handler.getRequiredString("OrderBookClearTable");
  80.         this.orderBookSnapshotTable = handler.getRequiredString("OrderBookSnapshotTable");
  81.  
  82.         handler.getRequiredString("Instrument");
  83.         handler.getRequiredString("SourceVenue");
  84.         this.cthEndTime = handler.getRequiredString("CTHEndTime");
  85.  
  86.         this.dbConnectionName = handler.getRequiredString("DbConnection");
  87.  
  88.         this.finalPreviousSnapshotTable = handler.getRequiredString("FinalPreviousSnapshotTable");
  89.         this.finalCurrentSnapshotTable = handler.getRequiredString("FinalCurrentSnapshotTable");
  90.  
  91.         handler.check();
  92.  
  93.         this.orderBookSnapshotTempTable = orderBookSnapshotTable + "TempForDelayed";
  94.  
  95.         insertFromAllDataQuery = loadQuery("insertFromAllData.sql");
  96.         insertAddRowToSnapshotQuery = loadQuery("insertAddRowToSnapshot.sql");
  97.         insertDeleteRowToSnapshotQuery = loadQuery("insertDeleteRowToSnapshot.sql");
  98.         insertDeleteRowWithSpecParamsQuery = loadQuery("insertDeleteRowWithSpecificParams.sql");
  99.         insertFromAllDataWithSpecParamsQuery = loadQuery("insertFromAllDataWithSpecificParams.sql");
  100.         insertDeleteRowFromModifyToSnapQuery = loadQuery("insertDeleteRowFromModifyToSnapshot.sql");
  101.         deleteFromTempQuery = "DELETE FROM %s WHERE Side='#Side' AND \"Order ID\"='#OrderId' AND MessageTimestamp='#Timestamp'";
  102.     }
  103.  
  104.     private void executeQueries() throws SQLException
  105.     {
  106.         Connection connection = this.globalContext.getDbConnectionOrResultEx(dbConnectionName);
  107.         connection.setAutoCommit(false);
  108.         currentQueryParams = new HashMap<>(inputParams);
  109.  
  110.         // Create tables
  111.         executeQuery(connection, format(loadQuery(CREATE_RESULT_TABLE), AGGREGATED_DATA_TABLE)); // table for add, modify,
  112.         // delete and clear table data
  113.         executeQuery(connection, format(loadQuery(CREATE_RESULT_TABLE), TEMP_PREVIOUS_SNAPSHOT_TABLE));
  114.         executeQuery(connection, format(loadQuery(CREATE_RESULT_TABLE), PREVIOUS_SNAPSHOT_TABLE));
  115.         executeQuery(connection, format(loadQuery(CREATE_RESULT_TABLE), TEMP_CURRENT_SNAPSHOT_TABLE));
  116.         executeQuery(connection, format(loadQuery(CREATE_RESULT_TABLE), CURRENT_SNAPSHOT_TABLE));
  117.  
  118.         executeQuery(connection, format(loadQuery(CREATE_RESULT_TABLE), orderBookSnapshotTempTable));
  119.  
  120.         executeQuery(connection, format(loadQuery(CREATE_RESULT_TABLE), finalPreviousSnapshotTable));
  121.         executeQuery(connection, format(loadQuery(CREATE_RESULT_TABLE), finalCurrentSnapshotTable));
  122.  
  123.         connection.commit();
  124.  
  125.         snapshotHandler = new SnapshotHandler(connection, finalPreviousSnapshotTable, TEMP_PREVIOUS_SNAPSHOT_TABLE);
  126.  
  127.         String instrument = inputParams.get("Instrument");
  128.         if (instrument.trim().equalsIgnoreCase("all")) // we need to execute action for all instruments from input
  129.         // tables.
  130.         {
  131.             cleanAllTempTables(connection);
  132.             // TODO
  133.             // Fill allTablesData table (all instruments)
  134.             executeParametrizedQuery(connection, format(loadQuery("insertFromAddOrderAllInstruments.sql"),
  135.                     AGGREGATED_DATA_TABLE, addOrderTable), currentQueryParams);
  136.             executeParametrizedQuery(connection, format(loadQuery("insertFromModifyOrderAllInstruments.sql"),
  137.                     AGGREGATED_DATA_TABLE, modifyOrderTable), currentQueryParams);
  138.             executeParametrizedQuery(connection, format(loadQuery("insertFromOrderDeleteAllInstruments.sql"),
  139.                     AGGREGATED_DATA_TABLE, deleteOrderTable), currentQueryParams);
  140.             executeParametrizedQuery(connection, format(loadQuery("insertFromOrderBookClearAllInstruments.sql"),
  141.                     AGGREGATED_DATA_TABLE, orderBookClearTable), currentQueryParams);
  142.  
  143.             // Fill temp OrderBookSnapshot
  144.             currentQueryParams.put("Timestamp", orderBookSnapshotTime);
  145.             executeParametrizedQuery(connection, format(loadQuery(
  146.                     "insertFromOrderBookSnapToTempAllInstruments" +
  147.                             ".sql"), orderBookSnapshotTempTable, orderBookSnapshotTable), currentQueryParams);
  148.  
  149.             try(DbDataReader instrumentReader = getTableReader(connection, format(
  150.                     "SELECT DISTINCT \"Instrument ID\" FROM %s",
  151.                     orderBookSnapshotTempTable), currentQueryParams)) // Add to PrevSnapshot
  152.             {
  153.                 instrumentReader.start();
  154.                 while (instrumentReader.hasMoreData())
  155.                 {
  156.                     TableRow<String, String> tableRow = instrumentReader.readRow();
  157.                     String currentInstrument = tableRow.getValue("Instrument ID");
  158.  
  159.                     logger.trace("Current instrument (read from OrderBookSnapshot table): {}.", currentInstrument);
  160.                     currentQueryParams.put("Instrument", currentInstrument);
  161.  
  162.                     snapshotHandler.setOrderDataSourceTable(orderBookSnapshotTempTable);
  163.  
  164.                     String minBuyTop5Price = getWorsePriceInTop5(connection, true, true);
  165.                     logger.trace("Min Buy Top-5 Price in OrderBookSnapshot table: {}", minBuyTop5Price);
  166.                     currentQueryParams.put("MinBuyTop5Price", minBuyTop5Price);
  167.  
  168.                     String maxSellTop5Price = getWorsePriceInTop5(connection, false, true);
  169.                     logger.trace("Max Sell Top-5 Price in OrderBookSnapshot table: {}", maxSellTop5Price);
  170.                     currentQueryParams.put("MaxSellTop5Price", maxSellTop5Price);
  171.  
  172.                     try (DbDataReader reader = getTableReader(connection, format(
  173.                             "SELECT * from %s WHERE \"Message Type\"='70' " +
  174.                                     "AND \"Instrument ID\"='#Instrument' " +
  175.                                     "AND ((Side='66' AND Price>='#MinBuyTop5Price') " +
  176.                                     "OR (Side='83' AND Price<='#MaxSellTop5Price') " +
  177.                                     "OR (\"Order Type\"='1' AND Price='0'))",
  178.                             orderBookSnapshotTempTable), currentQueryParams)) // Add to PrevSnapshot
  179.                     {
  180.                         reader.start();
  181.                         while (reader.hasMoreData())
  182.                         {
  183.                             addRecord(reader.readRow(), currentInstrument);
  184.                         }
  185.                     }
  186.                     try (DbDataReader reader = getTableReader(connection, format(
  187.                             "SELECT * from %s WHERE \"Message Type\"='70'" +
  188.                                     "AND \"Instrument ID\"='#Instrument' " +
  189.                                     "AND NOT ((Side='66' AND Price>='#MinBuyTop5Price') " +
  190.                                     "OR (Side='83' AND Price<='#MaxSellTop5Price') " +
  191.                                     "OR (\"Order Type\"='1' AND Price='0'))",
  192.                             orderBookSnapshotTempTable), currentQueryParams)) // Add to TempPrevSnapshot
  193.                     {
  194.                         reader.start();
  195.                         while (reader.hasMoreData())
  196.                         {
  197.                             addRecord(reader.readRow(), currentInstrument);
  198.                         }
  199.                     }
  200.                     // Stop reading data from OrderBookSnapshot
  201.                 }
  202.             }
  203.             catch (IOException e)
  204.             {
  205.                 String errMessage = "Error while select rows from OrderBookSnapshotTempTable";
  206.                 this.logger.error(errMessage, e);
  207.                 throw new ResultException(errMessage, e);
  208.             }
  209.  
  210.  
  211.             snapshotHandler.setOrderDataSourceTable(AGGREGATED_DATA_TABLE);
  212.  
  213.             currentQueryParams.put("LeftTimestamp", orderBookSnapshotTime);
  214.             currentQueryParams.put("RightTimestamp", previousSnapshotTime);
  215.             createSnapshot(connection, String.format(SELECT_ALL_MESSAGES_BY_TIME,
  216.                     AGGREGATED_DATA_TABLE)); // first snapshot
  217.  
  218.             // transfer data from prev snapshot to current
  219.             snapshotHandler.nextSnapshot(finalCurrentSnapshotTable, TEMP_CURRENT_SNAPSHOT_TABLE);
  220.  
  221.             currentQueryParams.put("LeftTimestamp", previousSnapshotTime);
  222.             currentQueryParams.put("RightTimestamp", currentSnapshotTime);
  223.             createSnapshot(connection, String.format(SELECT_ALL_MESSAGES_BY_TIME,
  224.                     AGGREGATED_DATA_TABLE)); // second snapshot
  225.  
  226.             //transfer result to final tables
  227.             /*executeParametrizedQuery(connection, format(loadQuery("insertToFinalSnapshot.sql"),
  228.                     finalPreviousSnapshotTable, PREVIOUS_SNAPSHOT_TABLE), currentQueryParams);
  229.             executeParametrizedQuery(connection, format(loadQuery("insertToFinalSnapshot.sql"),
  230.                             finalCurrentSnapshotTable, CURRENT_SNAPSHOT_TABLE), currentQueryParams);*/
  231.             connection.commit();
  232.         }
  233.         else
  234.         {
  235.             String[] allInstruments = instrument.split(",");
  236.             logger.debug("There are {} distinct instruments in input parameter 'Instrument'.", instruments.size());
  237.  
  238.             for (String currentInstrument : allInstruments)
  239.             {
  240.                 cleanAllTempTables(connection);
  241.  
  242.                 currentInstrument = currentInstrument.trim();
  243.                 logger.debug("Current instrument: {}.", currentInstrument);
  244.                 currentQueryParams.put("Instrument", currentInstrument);
  245.  
  246.                 // Fill temp OrderBookSnapshot
  247.                 currentQueryParams.put("Timestamp", orderBookSnapshotTime);
  248.                 executeParametrizedQuery(connection, format(loadQuery("insertFromOrderBookSnapToTemp.sql"),
  249.                         orderBookSnapshotTempTable, orderBookSnapshotTable), currentQueryParams);
  250.  
  251.                 // Fill allTablesData table
  252.                 executeParametrizedQuery(connection, format(loadQuery("insertFromAddOrder.sql"), AGGREGATED_DATA_TABLE,
  253.                         addOrderTable), currentQueryParams);
  254.                 executeParametrizedQuery(connection, format(loadQuery("insertFromModifyOrder.sql"), AGGREGATED_DATA_TABLE,
  255.                         modifyOrderTable), currentQueryParams);
  256.                 executeParametrizedQuery(connection, format(loadQuery("insertFromOrderDelete.sql"), AGGREGATED_DATA_TABLE,
  257.                         deleteOrderTable), currentQueryParams);
  258.                 executeParametrizedQuery(connection, format(loadQuery("insertFromOrderBookClear.sql"), AGGREGATED_DATA_TABLE,
  259.                         orderBookClearTable), currentQueryParams);
  260.  
  261.                 snapshotHandler.setOrderDataSourceTable(orderBookSnapshotTempTable);
  262.                 try
  263.                 {
  264.                     String minBuyTop5Price = getWorsePriceInTop5(connection, true, false);
  265.                     logger.trace("Min Buy Top-5 Price in OrderBookSnapshot table for instrument {}: {}",
  266.                             currentInstrument, minBuyTop5Price);
  267.                     currentQueryParams.put("MinBuyTop5Price", minBuyTop5Price);
  268.  
  269.                     String maxSellTop5Price = getWorsePriceInTop5(connection, false, false);
  270.                     logger.trace("Max Sell Top-5 Price in OrderBookSnapshot table for instrument {}: {}",
  271.                             currentInstrument, maxSellTop5Price);
  272.                     currentQueryParams.put("MaxSellTop5Price", maxSellTop5Price);
  273.  
  274.                     try (DbDataReader reader = getTableReader(connection, format(
  275.                             "SELECT * from %s WHERE \"Message Type\"='70' AND " +
  276.                                     "((Side='66' AND Price>='#MinBuyTop5Price') " +
  277.                                     "OR (Side='83' AND Price<='#MaxSellTop5Price') " +
  278.                                     "OR (\"Order Type\"='1' AND Price='0'))",
  279.                             orderBookSnapshotTempTable), currentQueryParams)) // Add to PrevSnapshot
  280.                     {
  281.                         reader.start();
  282.                         while (reader.hasMoreData())
  283.                         {
  284.                             addRecord(reader.readRow(), currentInstrument);
  285.                         }
  286.                     }
  287.                     try (DbDataReader reader = getTableReader(connection, format(
  288.                             "SELECT * from %s WHERE \"Message Type\"='70' AND NOT" +
  289.                                     "((Side='66' AND Price>='#MinBuyTop5Price') " +
  290.                                     "OR (Side='83' AND Price<='#MaxSellTop5Price') " +
  291.                                     "OR (\"Order Type\"='1' AND Price='0'))",
  292.                             orderBookSnapshotTempTable), currentQueryParams)) // Add to TempPrevSnapshot
  293.                     {
  294.                         reader.start();
  295.                         while (reader.hasMoreData())
  296.                         {
  297.                             addRecord(reader.readRow(), currentInstrument);
  298.                         }
  299.                     }
  300.                 }
  301.                 catch (IOException e)
  302.                 {
  303.                     String errMessage = "Error while select rows from OrderBookSnapshotTempTable";
  304.                     this.logger.error(errMessage, e);
  305.                     throw new ResultException(errMessage, e);
  306.                 }
  307.                 snapshotHandler.setOrderDataSourceTable(AGGREGATED_DATA_TABLE);
  308.  
  309.                 currentQueryParams.put("LeftTimestamp", orderBookSnapshotTime);
  310.                 currentQueryParams.put("RightTimestamp", previousSnapshotTime);
  311.                 createSnapshot(connection, String.format(SELECT_ALL_MESSAGES_BY_TIME,
  312.                         AGGREGATED_DATA_TABLE)); // first snapshot
  313.  
  314.                 // transfer data from prev snapshot to current
  315.                 snapshotHandler.nextSnapshot(CURRENT_SNAPSHOT_TABLE, TEMP_CURRENT_SNAPSHOT_TABLE);
  316.  
  317.                 currentQueryParams.put("LeftTimestamp", previousSnapshotTime);
  318.                 currentQueryParams.put("RightTimestamp", currentSnapshotTime);
  319.                 createSnapshot(connection, String.format(SELECT_ALL_MESSAGES_BY_TIME,
  320.                         AGGREGATED_DATA_TABLE)); // second snapshot
  321.  
  322.                 //transfer result to final tables
  323.                 executeParametrizedQuery(connection, format(loadQuery("insertToFinalSnapshot.sql"),
  324.                         finalPreviousSnapshotTable, PREVIOUS_SNAPSHOT_TABLE), currentQueryParams);
  325.                 executeParametrizedQuery(connection, format(loadQuery("insertToFinalSnapshot.sql"),
  326.                         finalCurrentSnapshotTable, CURRENT_SNAPSHOT_TABLE), currentQueryParams);
  327.                 connection.commit();
  328.             }
  329.         }
  330.     }
  331.  
  332.     private void cleanAllTempTables(Connection connection)
  333.     {
  334.         executeParametrizedQuery(connection, format("DELETE FROM %s", AGGREGATED_DATA_TABLE), currentQueryParams);
  335.         executeParametrizedQuery(connection, format("DELETE FROM %s", orderBookSnapshotTempTable), currentQueryParams);
  336.  
  337.         executeParametrizedQuery(connection, format("DELETE FROM %s", TEMP_PREVIOUS_SNAPSHOT_TABLE), currentQueryParams);
  338.         executeParametrizedQuery(connection, format("DELETE FROM %s", PREVIOUS_SNAPSHOT_TABLE), currentQueryParams);
  339.         executeParametrizedQuery(connection, format("DELETE FROM %s", TEMP_CURRENT_SNAPSHOT_TABLE), currentQueryParams);
  340.         executeParametrizedQuery(connection, format("DELETE FROM %s", CURRENT_SNAPSHOT_TABLE), currentQueryParams);
  341.     }
  342.  
  343.     /*
  344.     private void printTableColumns(StringTableData td)
  345.     {
  346.         CommaBuilder commaBuilder = new CommaBuilder();
  347.         Iterator<String> headerIterator = td.getHeader().iterator();
  348.         String currentColumn;
  349.         while(headerIterator.hasNext())
  350.         {
  351.             currentColumn = headerIterator.next();
  352.             String newColumn = "'" + currentColumn + "'";
  353.             commaBuilder.append(newColumn);
  354.         }
  355.         String columns = commaBuilder.toString();
  356.         logger.trace("Temp allData table columns: {}", columns);
  357.  
  358.     }
  359.      */
  360.  
  361.     private void createSnapshot(Connection connection, String selectOrderBookPartQuery) throws SQLException
  362.     {
  363.         try (DbDataReader reader = getTableReader(connection, selectOrderBookPartQuery, currentQueryParams))
  364.         {
  365.             reader.start();
  366.             //printTableColumns(td);
  367.             while (reader.hasMoreData())
  368.             {
  369.                 TableRow<String, String> tr = reader.readRow();
  370.                 String messageType = tr.getValue(MESSAGE_TYPE);
  371.                 String currentInstrument = tr.getValue("Instrument ID");
  372.                 switch (messageType)
  373.                 {
  374.                     case ADD_ORDER_TYPE_DEC:
  375.                         addRecord(tr, currentInstrument);
  376.                         break;
  377.                     case MODIFY_ORDER_TYPE_DEC:
  378.                         modifyRecord(tr, currentInstrument);
  379.                         break;
  380.                     case DELETE_ORDER_TYPE_DEC:
  381.                         deleteRecord(tr, currentInstrument);
  382.                         break;
  383.                     case CLEAR_BOOK_TYPE_DEC:
  384.                         deleteAllRecords(currentInstrument);
  385.                         break;
  386.                     default:
  387.                         throw new ResultException(format("Invalid value in column %s: %s", MESSAGE_TYPE,
  388.                                 messageType));
  389.                 }
  390.             }
  391.         }
  392.         catch (IOException e)
  393.         {
  394.             String errMessage = "Error while reading data";
  395.             this.logger.error(errMessage, e);
  396.             throw new ResultException(errMessage, e);
  397.         }
  398.     }
  399.  
  400.     private void addRecord(TableRow<String, String> tableRow, String currentInstrument) throws SQLException
  401.     {
  402.         String side = tableRow.getValue(SIDE);
  403.         String orderId = tableRow.getValue(ORDER_ID);
  404.         String timestamp = tableRow.getValue(MESSAGE_TIMESTAMP);
  405.  
  406.         currentQueryParams.put(SIDE, side);
  407.         currentQueryParams.put("OrderId", orderId);
  408.         currentQueryParams.put("Timestamp", timestamp);
  409.  
  410.         String price = tableRow.getValue(PRICE);
  411.         String orderType = tableRow.getValue("Order Type");
  412.         String dss = tableRow.getValue("dss_rt_bus_seq_no");
  413.         snapshotHandler.getInstrumentInfo(currentInstrument).addOrder(side, orderId, timestamp,
  414.                 Double.parseDouble(price), orderType, dss);
  415.     }
  416.  
  417.     private void modifyRecord(TableRow<String, String> tableRow, String currentInstrument) throws SQLException
  418.     {
  419.         String side = tableRow.getValue(SIDE);
  420.         String orderId = tableRow.getValue(ORDER_ID);
  421.         String timestamp = tableRow.getValue(MESSAGE_TIMESTAMP);
  422.  
  423.         currentQueryParams.put(SIDE, side);
  424.         currentQueryParams.put("OrderId", orderId);
  425.         currentQueryParams.put("Timestamp", timestamp);
  426.  
  427.         String oldPrice = tableRow.getValue("Old Price");
  428.         String newPrice = tableRow.getValue("Price");
  429.         String newSize = tableRow.getValue("Size");
  430.         String orderType = tableRow.getValue("Order Type");
  431.         String dss = tableRow.getValue("dss_rt_bus_seq_no");
  432.         snapshotHandler.getInstrumentInfo(currentInstrument).modifyOrder(side, orderId,
  433.             timestamp, Double.parseDouble(oldPrice), Double.parseDouble(newPrice), newSize, orderType, dss);
  434.     }
  435.  
  436.     private void deleteRecord(TableRow<String, String> tableRow, String currentInstrument) throws SQLException
  437.     {
  438.         String side = tableRow.getValue(SIDE);
  439.         String orderId = tableRow.getValue(ORDER_ID);
  440.         String timestamp = tableRow.getValue(MESSAGE_TIMESTAMP);
  441.  
  442.         currentQueryParams.put(SIDE, side);
  443.         currentQueryParams.put("OrderId", orderId);
  444.         currentQueryParams.put("Timestamp", timestamp);
  445.  
  446.         String price = tableRow.getValue(PRICE);
  447.         String orderType = tableRow.getValue("Order Type");
  448.         String dss = tableRow.getValue("dss_rt_bus_seq_no");
  449.         snapshotHandler.getInstrumentInfo(currentInstrument).deleteOrder(side, orderId,
  450.                 Double.parseDouble(price), orderType, timestamp, dss);
  451.     }
  452.  
  453.     private void deleteAllRecords(String currentInstrument) throws SQLException
  454.     {
  455.         snapshotHandler.getInstrumentInfo(currentInstrument).clear();
  456.     }
  457.  
  458.     private String loadQuery(String queryFile)
  459.     {
  460.         String createResultTable;
  461.         try
  462.         {
  463.             createResultTable = SQLUtils.loadQuery(rootRelative(QUERIES_PATH + queryFile));
  464.         }
  465.         catch (IOException e)
  466.         {
  467.             String errMessage = "Error while loading query from file '" + queryFile + "'";
  468.             this.logger.error(errMessage, e);
  469.             throw new ResultException(errMessage, e);
  470.         }
  471.         return createResultTable;
  472.     }
  473.  
  474.     private void executeParametrizedQuery(Connection connection, String query, Map<String, String> params)
  475.     {
  476.         logger.trace("Query before parsing: {}", query);
  477.         ParametrizedQuery paramQuery = SQLUtils.parseSQLTemplate(query);
  478.         logger.trace("Parametrized query: {}", paramQuery.getQuery());
  479.  
  480.         try (PreparedStatement statement = paramQuery.createPreparedStatement(connection, params))
  481.         {
  482.             statement.execute();
  483.         }
  484.         catch (SQLException e)
  485.         {
  486.             String errMessage = "Error while prepared statement creating";
  487.             this.logger.error(errMessage, e);
  488.             throw new ResultException(errMessage, e);
  489.         }
  490.     }
  491.  
  492.     private String getWorsePriceInTop5(Connection connection, boolean isBuy, boolean isSpecificInstrument)
  493.     {
  494.         String query;
  495.         if (isBuy)
  496.         {
  497.             query = format("SELECT MIN(Price) AS Price FROM (SELECT DISTINCT Price FROM %s " +
  498.                             "WHERE \"Message Type\"='70' AND Side='66' " +
  499.                             (isSpecificInstrument ? " AND \"Instrument ID\"='#Instrument'" : "") +
  500.                             "ORDER BY Price DESC LIMIT 5)",
  501.                             orderBookSnapshotTempTable);
  502.         }
  503.         else // Sell
  504.         {
  505.             query = format("SELECT MAX(Price) AS Price FROM (SELECT DISTINCT Price FROM %s " +
  506.                             "WHERE \"Message Type\"='70' AND Side='83' " +
  507.                             (isSpecificInstrument ? " AND \"Instrument ID\"='#Instrument'" : "") +
  508.                             "ORDER BY Price LIMIT 5)",
  509.                     orderBookSnapshotTempTable);
  510.         }
  511.  
  512.         logger.trace("Query before parsing: {}", query);
  513.         ParametrizedQuery paramQuery = SQLUtils.parseSQLTemplate(query);
  514.         logger.trace("Parametrized query: {}", paramQuery.getQuery());
  515.  
  516.         try (PreparedStatement statement = paramQuery.createPreparedStatement(connection, currentQueryParams);
  517.              ResultSet rs = statement.executeQuery())
  518.         {
  519.             return String.valueOf(rs.getDouble("Price"));
  520.         }
  521.         catch (SQLException e)
  522.         {
  523.             String errMessage = "Error while prepared statement creating";
  524.             this.logger.error(errMessage, e);
  525.             throw new ResultException(errMessage, e);
  526.         }
  527.     }
  528.  
  529.     private void executeQuery(Connection connection, String query)
  530.     {
  531.         logger.trace("Query to execute: {}", query);
  532.  
  533.         try (Statement statement = connection.createStatement())
  534.         {
  535.             statement.execute(query);
  536.         }
  537.         catch (SQLException e)
  538.         {
  539.             String errMessage = format("Error while executing query '%s'.", query);
  540.             this.logger.error(errMessage, e);
  541.             throw new ResultException(errMessage, e);
  542.         }
  543.     }
  544.  
  545.     private DbDataReader getTableReader(Connection connection, String sqlTemplate, Map<String, String> params)
  546.     {
  547.         ParametrizedQuery query;
  548.         PreparedStatement statement;
  549.         try
  550.         {
  551.             query = SQLUtils.parseSQLTemplate(sqlTemplate);
  552.             statement = query.createPreparedStatement(connection, params);
  553.         }
  554.         catch (SQLException e)
  555.         {
  556.             String errMessage = "Error while prepared statement creating";
  557.             this.logger.error(errMessage, e);
  558.             throw new ResultException(errMessage, e);
  559.         }
  560.         return new DbDataReader(statement);
  561.     }
  562.  
  563.     private class SnapshotHandler
  564.     {
  565.         private final Connection connection;
  566.         private String snapshotTable;
  567.         private String tempSnapshotTable;
  568.         private Map<String, InstrumentInfo> instrumentsInfo;
  569.         private String sourceTable;
  570.  
  571.         public SnapshotHandler(Connection dbConnection, String snapshotTable, String tempSnapshotTable)
  572.         {
  573.             connection = dbConnection;
  574.             this.snapshotTable = snapshotTable;
  575.             this.tempSnapshotTable = tempSnapshotTable;
  576.             instrumentsInfo = new HashMap<>();
  577.         }
  578.  
  579.         public void setOrderDataSourceTable(String sourceTable)
  580.         {
  581.             this.sourceTable = sourceTable;
  582.             for (Map.Entry<String, InstrumentInfo> instrumentEntry : instrumentsInfo.entrySet())
  583.             {
  584.                 InstrumentInfo ii = instrumentEntry.getValue();
  585.                 ii.setOrderDataSourceTable(sourceTable);
  586.                 instrumentEntry.setValue(ii);
  587.             }
  588.         }
  589.  
  590.         public InstrumentInfo getInstrumentInfo(String instrument)
  591.         {
  592.             InstrumentInfo ii = instrumentsInfo.get(instrument);
  593.             if (ii == null)
  594.             {
  595.                 ii = new InstrumentInfo(connection, snapshotTable, tempSnapshotTable);
  596.                 ii.setOrderDataSourceTable(sourceTable);
  597.                 instrumentsInfo.put(instrument, ii);
  598.             }
  599.             return ii;
  600.         }
  601.  
  602.         public void nextSnapshot(String newSnapshotTable, String newTempSnapshotTable) throws SQLException
  603.         {
  604.             this.snapshotTable = newSnapshotTable;
  605.             this.tempSnapshotTable = newTempSnapshotTable;
  606.             for (Map.Entry<String, InstrumentInfo> instrumentEntry : instrumentsInfo.entrySet())
  607.             {
  608.                 instrumentEntry.getValue().nextSnapshot(newSnapshotTable, newTempSnapshotTable);
  609.             }
  610.         }
  611.     }
  612.  
  613.     private class InstrumentInfo
  614.     {
  615.         private final Connection connection;
  616.         private final PriceTopHandler buyHandler, sellHandler;
  617.         private String snapshotTable;
  618.         private String sourceTable;
  619.        
  620.         public InstrumentInfo(Connection dbConnection, String snapshotTable, String tempSnapshotTable)
  621.         {
  622.             connection = dbConnection;
  623.             buyHandler = new PriceTopHandler(true, connection, snapshotTable, tempSnapshotTable);
  624.             sellHandler = new PriceTopHandler(false, connection, snapshotTable, tempSnapshotTable);
  625.             this.snapshotTable = snapshotTable;
  626.         }
  627.  
  628.         public void setOrderDataSourceTable(String sourceTable)
  629.         {
  630.             this.sourceTable = sourceTable;
  631.             buyHandler.setOrderDataSourceTable(sourceTable);
  632.             sellHandler.setOrderDataSourceTable(sourceTable);
  633.         }
  634.  
  635.         public void addOrder(String side, String orderId, String timestamp, double price, String orderType,
  636.                              String dss) throws SQLException
  637.         {
  638.             if (price == 0 && StringUtils.equals(orderType, "1"))
  639.             {
  640.                 // handle zero price
  641.                 executeParametrizedQuery(connection, format(insertFromAllDataQuery, snapshotTable,
  642.                         sourceTable), currentQueryParams);
  643.             }
  644.             else
  645.             {
  646.                 if ("66".equalsIgnoreCase(side))
  647.                     buyHandler.addOrder(orderId, timestamp, price, dss);
  648.                 else if ("83".equalsIgnoreCase(side))
  649.                     sellHandler.addOrder(orderId, timestamp, price, dss);
  650.             }
  651.         }
  652.  
  653.         public void deleteOrder(String side, String orderId, double price, String orderType, String timestamp,
  654.                                 String dss) throws SQLException
  655.         {
  656.             if (price == 0 && (orderType == null || StringUtils.equals(orderType, "1")))
  657.             {
  658.                 // handle zero price
  659.                 currentQueryParams.put("CurrentTimestamp", timestamp);
  660.                 currentQueryParams.put("CurrentDss", dss);
  661.                 executeParametrizedQuery(connection, format(insertDeleteRowWithSpecParamsQuery,
  662.                         snapshotTable, sourceTable), currentQueryParams);
  663.             }
  664.             else
  665.             {
  666.                 if ("66".equalsIgnoreCase(side))
  667.                     buyHandler.deleteOrder(orderId, price, timestamp, dss);
  668.                 else if ("83".equalsIgnoreCase(side))
  669.                     sellHandler.deleteOrder(orderId, price, timestamp, dss);
  670.             }
  671.         }
  672.  
  673.         public void modifyOrder(String side, String orderId, String timestamp, double oldPrice, double newPrice,
  674.                                 String newSize, String orderType, String dss) throws SQLException
  675.         {
  676.             if (oldPrice == 0 && newPrice == 0 && (orderType == null || StringUtils.equals(orderType, "1")))
  677.             {
  678.                 // handle zero price
  679.                 executeParametrizedQuery(connection, format(insertFromAllDataQuery, snapshotTable,
  680.                         sourceTable), currentQueryParams);
  681.             }
  682.             else
  683.             {
  684.                 if ("66".equalsIgnoreCase(side))
  685.                     buyHandler.modifyOrder(orderId, timestamp, oldPrice, newPrice, newSize, dss);
  686.                 else if ("83".equalsIgnoreCase(side))
  687.                     sellHandler.modifyOrder(orderId, timestamp, oldPrice, newPrice, newSize, dss);
  688.             }
  689.         }
  690.  
  691.         public void clear() throws SQLException
  692.         {
  693.             buyHandler.clear();
  694.             sellHandler.clear();
  695.         }
  696.  
  697.         public void nextSnapshot(String newSnapshotTable, String newTempSnapshotTable) throws SQLException
  698.         {
  699.             buyHandler.nextSnapshot(newSnapshotTable, newTempSnapshotTable);
  700.             sellHandler.nextSnapshot(newSnapshotTable, newTempSnapshotTable);
  701.             this.snapshotTable = newSnapshotTable;
  702.         }
  703.     }
  704.  
  705.     private class PriceTopHandler
  706.     {
  707.         private class OrderInfo
  708.         {
  709.             private final String orderId;
  710.             private final String timestamp;
  711.  
  712.             public OrderInfo(String orderId)
  713.             {
  714.                 this(orderId, null);
  715.             }
  716.  
  717.             public OrderInfo(String orderId, String timestamp)
  718.             {
  719.                 this.orderId = orderId;
  720.                 this.timestamp = timestamp;
  721.             }
  722.  
  723.             public String getOrderId()
  724.             {
  725.                 return orderId;
  726.             }
  727.  
  728.             public String getTimestamp()
  729.             {
  730.                 return timestamp;
  731.             }
  732.  
  733.             @Override
  734.             public boolean equals(Object o)
  735.             {
  736.                 if (this == o)
  737.                     return true;
  738.  
  739.                 if (o == null || getClass() != o.getClass())
  740.                     return false;
  741.  
  742.                 OrderInfo orderInfo = (OrderInfo) o;
  743.                 return orderId.equals(orderInfo.orderId);
  744.             }
  745.  
  746.             @Override
  747.             public int hashCode()
  748.             {
  749.                 return orderId.hashCode();
  750.             }
  751.         }
  752.  
  753.         private static final int TOP_SIZE = 5;
  754.         private final TreeMap<Double, Set<OrderInfo>> top;
  755.         private final TreeMap<Double, Set<OrderInfo>> rest;
  756.         private final Comparator<Double> comparator;
  757.         private String tempSnapshotTable;
  758.         private String snapshotTable;
  759.         private final Connection connection;
  760.         private String orderDataSourceTable;
  761.         private final String side;
  762.  
  763.         public PriceTopHandler(boolean isBuy, Connection connection, String snapshotTable, String tempSnapshotTable)
  764.         {
  765.             this.connection = connection;
  766.             this.tempSnapshotTable = tempSnapshotTable;
  767.             this.snapshotTable = snapshotTable;
  768.  
  769.             // First = worst, last = best
  770.             comparator = isBuy ? Comparator.naturalOrder() : Comparator.reverseOrder();
  771.             side = isBuy ? "66" : "83";
  772.  
  773.             top = new TreeMap<>(comparator);
  774.             rest = new TreeMap<>(comparator);
  775.         }
  776.  
  777.         public void setOrderDataSourceTable(String orderDataSourceTable)
  778.         {
  779.             this.orderDataSourceTable = orderDataSourceTable;
  780.         }
  781.        
  782.         public void addOrder(String orderId, String timestamp, double price, String dss) throws SQLException
  783.         {
  784.             addOrder(orderId, timestamp, price, dss, false);
  785.         }
  786.  
  787.         private void addOrder(String orderId, String timestamp, double price, String dss, boolean isModify)
  788.                 throws SQLException
  789.         {
  790.             BiFunction<Double, Set<OrderInfo>, Set<OrderInfo>> ADD_ORDER_ID = (k, set) ->
  791.             {
  792.                 if (set == null)
  793.                     set = new HashSet<>();
  794.  
  795.                 set.add(new OrderInfo(orderId, timestamp));
  796.                 return set;
  797.             };
  798.  
  799.             if (top.size() == TOP_SIZE && comparator.compare(price, top.firstKey()) < 0)
  800.             { // 1.5 - worse then top-5, record go to the 'rest'
  801.                 rest.compute(price, ADD_ORDER_ID);
  802.  
  803.                 if (!isModify)
  804.                 {
  805.                     // add to temp
  806.                     currentQueryParams.put("OrderId", orderId);
  807.                     currentQueryParams.put("Timestamp", timestamp);
  808.                     executeParametrizedQuery(connection, format(insertAddRowToSnapshotQuery, tempSnapshotTable,
  809.                             orderDataSourceTable), currentQueryParams);
  810.                 }
  811.             }
  812.             else
  813.             {
  814.                 // 1.2, 1.3
  815.                 top.compute(price, ADD_ORDER_ID);
  816.  
  817.                 if (!isModify)
  818.                 {
  819.                     // add AddOrder to snapshot
  820.                     executeParametrizedQuery(connection, format(insertFromAllDataQuery, snapshotTable,
  821.                             orderDataSourceTable), currentQueryParams);
  822.                 }
  823.  
  824.                 if (top.size() > TOP_SIZE)
  825.                 { // 1.4
  826.                     Map.Entry<Double, Set<OrderInfo>> worst = top.pollFirstEntry(); // poll worse price
  827.                     rest.put(worst.getKey(), worst.getValue());
  828.  
  829.                     if (!isModify)
  830.                     {
  831.                         currentQueryParams.put("CurrentTimestamp", timestamp);
  832.                         currentQueryParams.put("CurrentDss", dss);
  833.                         // add to temp as AddOrder
  834.                         // Remove from snapshot (add row to snapshot as DeleteOrder)
  835.                         for (OrderInfo oi : worst.getValue())
  836.                         {
  837.                             currentQueryParams.put("OrderId", String.valueOf(oi.getOrderId()));
  838.                             currentQueryParams.put("Timestamp", String.valueOf(oi.getTimestamp()));
  839.                             executeParametrizedQuery(connection, format(insertAddRowToSnapshotQuery,
  840.                                     tempSnapshotTable, snapshotTable), currentQueryParams);
  841.                             executeParametrizedQuery(connection, format(insertDeleteRowWithSpecParamsQuery,
  842.                                     snapshotTable, snapshotTable), currentQueryParams);
  843.                         }
  844.                     }
  845.                 }
  846.             }
  847.         }
  848.  
  849.         public void deleteOrder(String orderId, double price, String timestamp, String dss) throws SQLException
  850.         {
  851.             deleteOrder(orderId, price, timestamp, dss, false);
  852.         }
  853.  
  854.         private void deleteOrder(String orderId, double price, String timestamp, String dss, boolean isModify)
  855.                 throws SQLException
  856.         {
  857.             if (top.containsKey(price))
  858.             {
  859.                 if (!isModify)
  860.                 {
  861.                     // delete from snapshot (Add delete row)
  862.                     executeParametrizedQuery(connection, format(insertDeleteRowToSnapshotQuery,
  863.                             snapshotTable, orderDataSourceTable), currentQueryParams);
  864.                 }
  865.                 Set<OrderInfo> orderIdsByPrice = top.get(price);
  866.                 orderIdsByPrice.remove(new OrderInfo(orderId)); // 3.1.1, 3.1.2
  867.  
  868.                 if (orderIdsByPrice.isEmpty()) // add from Temp
  869.                 { // 3.1.1
  870.                     top.remove(price);
  871.  
  872.                     if (!rest.isEmpty())
  873.                     {
  874.                         double bestFromTemp = rest.lastKey(); // get best price from temp
  875.                         top.put(bestFromTemp, rest.remove(bestFromTemp));
  876.  
  877.                         currentQueryParams.put("CurrentTimestamp", timestamp);
  878.                         currentQueryParams.put("CurrentDss", dss);
  879.  
  880.                         if (!isModify)
  881.                         {
  882.                             for (OrderInfo oi : top.get(bestFromTemp))
  883.                             {
  884.                                 currentQueryParams.put("OrderId", String.valueOf(oi.getOrderId()));
  885.                                 currentQueryParams.put("Timestamp", String.valueOf(oi.getTimestamp()));
  886.                                 // add to snapshot
  887.                                 executeParametrizedQuery(connection, format(insertFromAllDataWithSpecParamsQuery,
  888.                                         snapshotTable, tempSnapshotTable), currentQueryParams);
  889.                                 // remove from temp
  890.                                 executeParametrizedQuery(connection,
  891.                                         format(deleteFromTempQuery, tempSnapshotTable), currentQueryParams);
  892.                             }
  893.                         }
  894.                     }
  895.                 }
  896.             }
  897.             else
  898.             {
  899.                 rest.remove(price);
  900.  
  901.                 if (!isModify)
  902.                 {
  903.                     // HARD delete from temp
  904.                     currentQueryParams.put("OrderId", String.valueOf(orderId)); // in Temp only Add Order, so we do
  905.                     // not need Message Timestamp
  906.                     executeParametrizedQuery(connection,
  907.                             format("DELETE FROM %s WHERE Side='#Side' AND \"Order ID\"='#OrderId'",
  908.                                     tempSnapshotTable), currentQueryParams);
  909.                 }
  910.             }
  911.         }
  912.  
  913.         private boolean containsInTop(String orderId, double oldPrice)
  914.         {
  915.             if (!top.containsKey(oldPrice))
  916.                 return false;
  917.  
  918.             for (OrderInfo oi : top.get(oldPrice))
  919.             {
  920.                 if (oi.getOrderId().equals(orderId))
  921.                     return true;
  922.             }
  923.             return false;
  924.         }
  925.  
  926.         public void modifyOrder(String orderId, String timestamp, double oldPrice, double newPrice, String newSize,
  927.                                 String dss) throws SQLException
  928.         {
  929.             // write modify
  930.             boolean wasTop = containsInTop(orderId, oldPrice);
  931.             Integer numberOfOrdersWithOldPrice = top.containsKey(oldPrice) ? top.get(oldPrice).size() : null;
  932.             double wasWorseTopPrice = top.isEmpty() ? 0 : top.firstKey();
  933.             double wasBestRestPrice = rest.isEmpty() ? 0 : rest.firstKey();
  934.             OrderInfo orderInfoBeforeChange = null;
  935.             if (!wasTop)
  936.             {
  937.                 if (rest.containsKey(oldPrice))
  938.                 {
  939.                     for (OrderInfo oi : rest.get(oldPrice))
  940.                     {
  941.                         if (oi.getOrderId().equals(orderId))
  942.                             orderInfoBeforeChange = oi;
  943.                     }
  944.                 }
  945.             }
  946.  
  947.             deleteOrder(orderId, oldPrice, null, null, true);
  948.             addOrder(orderId, timestamp, newPrice, null, true);
  949.  
  950.             boolean isTop = containsInTop(orderId, newPrice);
  951.  
  952.             if (wasTop)
  953.             {
  954.                 if (isTop)
  955.                 {
  956.                     // Add modify record
  957.                     executeParametrizedQuery(connection, format(insertFromAllDataQuery, snapshotTable,
  958.                             orderDataSourceTable), currentQueryParams);
  959.  
  960.                     currentQueryParams.put("CurrentTimestamp", timestamp);
  961.                     currentQueryParams.put("CurrentDss", dss);
  962.                     if (numberOfOrdersWithOldPrice != null && numberOfOrdersWithOldPrice > 1 && top.get(newPrice).size() == 1) // new price push out worse price from top
  963.                     {
  964.                         Double restFirstKey = rest.isEmpty() ? null : rest.firstKey(); // best in rest
  965.                         if (restFirstKey != null)
  966.                         {
  967.                             for (OrderInfo oi : rest.get(restFirstKey))
  968.                             {
  969.                                 currentQueryParams.put("OrderId", String.valueOf(oi.getOrderId()));
  970.                                 currentQueryParams.put("Timestamp", String.valueOf(oi.getTimestamp()));
  971.                                 // add delete to snap
  972.                                 executeParametrizedQuery(connection, format(insertDeleteRowWithSpecParamsQuery,
  973.                                         snapshotTable, snapshotTable), currentQueryParams);
  974.                                 // add to temp
  975.                                 executeParametrizedQuery(connection, format(insertAddRowToSnapshotQuery,
  976.                                         tempSnapshotTable, orderDataSourceTable), currentQueryParams);
  977.                             }
  978.                         }
  979.                     }
  980.                     if (newPrice == wasBestRestPrice)
  981.                     {
  982.                         for (OrderInfo oi : top.get(wasBestRestPrice))
  983.                         {
  984.                             if (!oi.getOrderId().equals(orderId))
  985.                             {
  986.                                 // Add record
  987.                                 currentQueryParams.put("OrderId", oi.getOrderId());
  988.                                 currentQueryParams.put("Timestamp", oi.getTimestamp());
  989.                                 executeParametrizedQuery(connection, format(insertFromAllDataWithSpecParamsQuery,
  990.                                         snapshotTable, orderDataSourceTable), currentQueryParams);
  991.                             }
  992.                         }
  993.                     }
  994.                     currentQueryParams.put("OrderId", orderId);
  995.                     currentQueryParams.put("Timestamp", timestamp);
  996.                 }
  997.                 else // !isTop
  998.                 {
  999.                     for (OrderInfo oi : rest.get(newPrice))
  1000.                     {
  1001.                         currentQueryParams.put("OrderId", oi.getOrderId());
  1002.                         currentQueryParams.put("Timestamp", oi.getTimestamp());
  1003.                         executeParametrizedQuery(connection, format(insertDeleteRowFromModifyToSnapQuery,
  1004.                                 snapshotTable, orderDataSourceTable), currentQueryParams);
  1005.                         executeParametrizedQuery(connection, format(insertAddRowToSnapshotQuery,
  1006.                                 tempSnapshotTable, orderDataSourceTable), currentQueryParams);
  1007.                     }
  1008.  
  1009.                     double worseTopPrice = top.firstKey(); // worse in top
  1010.                     if (wasWorseTopPrice != worseTopPrice)
  1011.                     {
  1012.                         currentQueryParams.put("CurrentTimestamp", timestamp);
  1013.                         currentQueryParams.put("CurrentDss", dss);
  1014.                         for (OrderInfo oi : top.get(worseTopPrice))
  1015.                         {
  1016.                             currentQueryParams.put("OrderId", String.valueOf(oi.getOrderId()));
  1017.                             currentQueryParams.put("Timestamp", String.valueOf(oi.getTimestamp()));
  1018.                             // add to snap
  1019.                             executeParametrizedQuery(connection, format(insertFromAllDataWithSpecParamsQuery,
  1020.                                     snapshotTable, tempSnapshotTable), currentQueryParams);
  1021.                             // remove hard from temp
  1022.                             executeParametrizedQuery(connection,
  1023.                                     format(deleteFromTempQuery, tempSnapshotTable), currentQueryParams);
  1024.                         }
  1025.                     }
  1026.                     currentQueryParams.put("OrderId", orderId);
  1027.                     currentQueryParams.put("Timestamp", timestamp);
  1028.                 }
  1029.             }
  1030.             else // !wasTop
  1031.             {
  1032.                 if (isTop)
  1033.                 {
  1034.                     // add to snap
  1035.                     executeParametrizedQuery(connection, format(insertAddRowToSnapshotQuery,
  1036.                             snapshotTable, orderDataSourceTable), currentQueryParams);
  1037.                     // remove hard from temp
  1038.                     currentQueryParams.put("OrderId", orderInfoBeforeChange.getOrderId());
  1039.                     currentQueryParams.put("Timestamp", orderInfoBeforeChange.getTimestamp());
  1040.                     executeParametrizedQuery(connection,
  1041.                             format(deleteFromTempQuery, tempSnapshotTable), currentQueryParams);
  1042.  
  1043.                     if (top.get(newPrice).size() == 1) // some orders were pushed off from top by order with new price
  1044.                     {
  1045.                         Double restLastKey = rest.isEmpty() ? null : rest.lastKey(); // best in rest
  1046.                         if (restLastKey != null)
  1047.                         {
  1048.                             currentQueryParams.put("CurrentTimestamp", timestamp);
  1049.                             currentQueryParams.put("CurrentDss", dss);
  1050.                             for (OrderInfo oi : rest.get(restLastKey))
  1051.                             {
  1052.                                 currentQueryParams.put("OrderId", String.valueOf(oi.getOrderId()));
  1053.                                 currentQueryParams.put("Timestamp", String.valueOf(oi.getTimestamp()));
  1054.                                 // add delete to snap
  1055.                                 executeParametrizedQuery(connection, format(insertDeleteRowWithSpecParamsQuery,
  1056.                                         snapshotTable, orderDataSourceTable), currentQueryParams);
  1057.                                 // add to temp
  1058.                                 executeParametrizedQuery(connection, format(insertAddRowToSnapshotQuery,
  1059.                                         tempSnapshotTable, orderDataSourceTable), currentQueryParams);
  1060.                             }
  1061.                         }
  1062.                     }
  1063.                 }
  1064.                 else // !isTop
  1065.                 {
  1066.                     // remove hard from temp
  1067.                     executeParametrizedQuery(connection,
  1068.                             format(deleteFromTempQuery, tempSnapshotTable), currentQueryParams);
  1069.                     // add to temp
  1070.                     executeParametrizedQuery(connection, format(insertAddRowToSnapshotQuery,
  1071.                             tempSnapshotTable, orderDataSourceTable), currentQueryParams);
  1072.                 }
  1073.             }
  1074.         }
  1075.  
  1076.         public void clear() throws SQLException
  1077.         {
  1078.             top.values().stream()
  1079.                     .flatMap(Collection::stream)
  1080.                     .forEach(orderInfo -> {
  1081.                         // delete from snapshot (add row as DeleteOder to snap)
  1082.                         currentQueryParams.put("OrderId", String.valueOf(orderInfo.getOrderId()));
  1083.                         currentQueryParams.put("Timestamp", String.valueOf(orderInfo.getTimestamp()));
  1084.                         executeParametrizedQuery(connection, format(insertDeleteRowToSnapshotQuery,
  1085.                                 snapshotTable, snapshotTable), currentQueryParams);
  1086.                     });
  1087.  
  1088.             top.clear();
  1089.  
  1090.             for (Iterator<Double> it = rest.descendingKeySet().iterator(); it.hasNext() && top.size() < TOP_SIZE; )
  1091.             {
  1092.                 double price = it.next();
  1093.                 top.put(price, rest.get(price));
  1094.                 it.remove();
  1095.  
  1096.                 for (OrderInfo oi : top.get(price))
  1097.                 {
  1098.                     currentQueryParams.put("OrderId", String.valueOf(oi.getOrderId()));
  1099.                     currentQueryParams.put("Timestamp", String.valueOf(oi.getTimestamp()));
  1100.                     // add to snapshot
  1101.                     executeParametrizedQuery(connection, format(insertFromAllDataQuery, snapshotTable,
  1102.                             orderDataSourceTable), currentQueryParams);
  1103.                     // remove from temp
  1104.                     executeParametrizedQuery(connection,
  1105.                             format(deleteFromTempQuery, tempSnapshotTable), currentQueryParams);
  1106.                 }
  1107.             }
  1108.         }
  1109.  
  1110.         public void nextSnapshot(String newSnapshotTable, String newTempSnapshotTable) throws SQLException
  1111.         {
  1112.             rest.values().stream()
  1113.                     .flatMap(Collection::stream)
  1114.                     .forEach(orderData -> {
  1115.                         // select last transaction with orderId and timestamp and insert into new table (temp
  1116.                         //  snapshot)
  1117.                         currentQueryParams.put("OrderId", String.valueOf(orderData.getOrderId()));
  1118.                         currentQueryParams.put("Timestamp", String.valueOf(orderData.getTimestamp()));
  1119.                         currentQueryParams.put(SIDE, side);
  1120.                         executeParametrizedQuery(connection, format(insertFromAllDataQuery,
  1121.                                 newTempSnapshotTable, tempSnapshotTable), currentQueryParams);
  1122.                     });
  1123.  
  1124.             snapshotTable = newSnapshotTable;
  1125.             tempSnapshotTable = newTempSnapshotTable;
  1126.         }
  1127.     }
  1128. }
  1129.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement