Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- public class DelayedDataSnapshot extends Action
- {
- private static final String QUERIES_PATH = getInstance().getConfigFiles().getCfgDir() +
- "delayedOrderBookSnapshot/sql/";
- private static final String CREATE_RESULT_TABLE = "createResultTable.sql";
- private static final String SELECT_ALL_MESSAGES_BY_TIME = "SELECT * FROM %s WHERE " +
- "'#LeftTimestamp'<MessageTimestamp AND MessageTimestamp<='#RightTimestamp' ORDER BY MessageTimestamp";
- private static final int MAX_BATCH_SIZE = 100;
- private static final String MESSAGE_TYPE = "Message_Type";
- private static final String SIDE = "Side";
- private static final String ORDER_ID = "Order_ID";
- private static final String PRICE = "Price";
- private static final String MESSAGE_TIMESTAMP = "MessageTimestamp";
- private static final String AGGREGATED_DATA_TABLE = "AggregatedData";
- private static final String PREVIOUS_SNAPSHOT_TABLE = "PreviousSnapshot", CURRENT_SNAPSHOT_TABLE = "CurrentSnapshot";
- private static final String TEMP_PREVIOUS_SNAPSHOT_TABLE = "TempPreviousSnapshot", TEMP_CURRENT_SNAPSHOT_TABLE =
- "TempCurrentSnapshot";
- private static String insertFromAllDataQuery;
- private GdlGlobalContext globalContext;
- private String addOrderTable, modifyOrderTable, deleteOrderTable, orderBookClearTable, orderBookSnapshotTable,
- orderBookSnapshotTempTable;
- private String cthEndTime;
- private String dbConnectionName;
- private SnapshotHandler snapshotHandler;
- private String previousSnapshotTime, currentSnapshotTime, orderBookSnapshotTime;
- private Map<String, String> currentQueryParams;
- private PreparedStatement insertFromAllData, insertAddRowToSnapshot, insertAddRowToTempSnapshot,
- insertDeleteRowToSnapshot, insertDeleteRowWithSpecParams, insertFromAllDataWithSpecParamsSource,
- insertDeleteRowFromModifyToSnap, deleteFromTemp, insertAddRowWithSpecParamsSource,
- insertAddRowWithSpecParamsOrderBook, worseBuyPriceInTop5OneInstrument, worseSellPriceInTop5OneInstrument,
- worseBuyPriceInTop5AllInstrument, worseSellPriceInTop5AllInstrument,
- bestTop5OneInstrument, bestRest5OneInstrument, bestTop5AllInstrument, bestRest5AllInstrument;
- private Map<PreparedStatement, Integer> batchSize;
- private String finalPreviousSnapshotTable, finalCurrentSnapshotTable;
- @Override
- protected Result run(StepContext stepContext, MatrixContext matrixContext, GlobalContext globalContext)
- throws ResultException
- {
- this.globalContext = (GdlGlobalContext) globalContext;
- initParameters();
- batchSize = new HashMap<>();
- logger.debug("CthEndTime: {} ({})", cthEndTime, GdlTimestampUtils.nanosStringToDateWithFractions(cthEndTime,
- "6"));
- currentSnapshotTime = GdlTimestampUtils.timestampNanosToRoundedTimestampMinusMinutesStr(cthEndTime, 0);
- previousSnapshotTime = GdlTimestampUtils.timestampNanosToRoundedTimestampMinusMinutesStr(currentSnapshotTime, 1);
- orderBookSnapshotTime =
- GdlTimestampUtils.timestampNanosToRoundedTimestampMinusMinutesStr(previousSnapshotTime, 1);
- logger.debug("CurrentSnapshotTime: {} ({})", currentSnapshotTime,
- GdlTimestampUtils.nanosStringToDateWithFractions(currentSnapshotTime, "6"));
- logger.debug("PreviousSnapshotTime: {} ({})", previousSnapshotTime,
- GdlTimestampUtils.nanosStringToDateWithFractions(previousSnapshotTime, "6"));
- logger.debug("OrderBookSnapshotTime: {} ({})", orderBookSnapshotTime,
- GdlTimestampUtils.nanosStringToDateWithFractions(orderBookSnapshotTime, "6"));
- try
- {
- executeQueries();
- }
- catch (SQLException e)
- {
- String errMessage = "Error while committing changes";
- this.logger.error(errMessage, e);
- throw new ResultException(errMessage, e);
- }
- return DefaultResult.passed("Two snapshots were made successfully, result in two tables - 'FinalPreviousSnapshotTable' " +
- "and 'FinalCurrentSnapshotTable'.");
- }
- private void initParameters()
- {
- this.getLogger().debug("Initializing special action parameters");
- InputParamsHandler handler = new InputParamsHandler(this.inputParams);
- this.addOrderTable = handler.getRequiredString("AddOrderTable");
- this.modifyOrderTable = handler.getRequiredString("ModifyOrderTable");
- this.deleteOrderTable = handler.getRequiredString("DeleteOrderTable");
- this.orderBookClearTable = handler.getRequiredString("OrderBookClearTable");
- this.orderBookSnapshotTable = handler.getRequiredString("OrderBookSnapshotTable");
- handler.getRequiredString("Instrument");
- handler.getRequiredString("SourceVenue");
- this.cthEndTime = handler.getRequiredString("CTHEndTime");
- this.dbConnectionName = handler.getRequiredString("DbConnection");
- this.finalPreviousSnapshotTable = handler.getRequiredString("FinalPreviousSnapshotTable");
- this.finalCurrentSnapshotTable = handler.getRequiredString("FinalCurrentSnapshotTable");
- handler.check();
- this.orderBookSnapshotTempTable = orderBookSnapshotTable + "TempForDelayed";
- insertFromAllDataQuery = loadQuery("insertFromAllData.sql");
- }
- private void createIndexes(Connection connection, String table)
- {
- String orderIdIndex = format("CREATE INDEX idx_orderid%1$s ON %1$s (Order_ID)", table);
- String messageTimestampIndex = format("CREATE INDEX idx_message_timestamp%1$s ON %1$s (MessageTimestamp)",
- table);
- executeQuery(connection, orderIdIndex);
- executeQuery(connection, messageTimestampIndex);
- }
- private void dropIndexes(Connection connection, String table)
- {
- String orderIdIndex = format("DROP INDEX idx_orderid%1$s ON %1$s", table);
- String messageTimestampIndex = format("DROP INDEX idx_message_timestamp%1$s ON %1$s",
- table);
- executeQuery(connection, orderIdIndex);
- executeQuery(connection, messageTimestampIndex);
- }
- private void executeQueries() throws SQLException
- {
- Connection connection = this.globalContext.getDbConnectionOrResultEx(dbConnectionName);
- connection.setAutoCommit(false);
- currentQueryParams = new HashMap<>(inputParams);
- // Create tables
- executeQuery(connection, format(loadQuery(CREATE_RESULT_TABLE), AGGREGATED_DATA_TABLE)); // table for add, modify,
- // delete and clear table data
- createIndexes(connection, AGGREGATED_DATA_TABLE);
- executeQuery(connection, format(loadQuery(CREATE_RESULT_TABLE), TEMP_PREVIOUS_SNAPSHOT_TABLE));
- createIndexes(connection, TEMP_PREVIOUS_SNAPSHOT_TABLE);
- executeQuery(connection, format(loadQuery(CREATE_RESULT_TABLE), PREVIOUS_SNAPSHOT_TABLE));
- createIndexes(connection, PREVIOUS_SNAPSHOT_TABLE);
- executeQuery(connection, format(loadQuery(CREATE_RESULT_TABLE), TEMP_CURRENT_SNAPSHOT_TABLE));
- createIndexes(connection, TEMP_CURRENT_SNAPSHOT_TABLE);
- executeQuery(connection, format(loadQuery(CREATE_RESULT_TABLE), CURRENT_SNAPSHOT_TABLE));
- createIndexes(connection, CURRENT_SNAPSHOT_TABLE);
- executeQuery(connection, format(loadQuery(CREATE_RESULT_TABLE), orderBookSnapshotTempTable));
- createIndexes(connection, orderBookSnapshotTempTable);
- executeQuery(connection, format(loadQuery(CREATE_RESULT_TABLE), finalPreviousSnapshotTable));
- createIndexes(connection, finalPreviousSnapshotTable);
- executeQuery(connection, format(loadQuery(CREATE_RESULT_TABLE), finalCurrentSnapshotTable));
- createIndexes(connection, finalCurrentSnapshotTable);
- snapshotHandler = new SnapshotHandler(connection, finalPreviousSnapshotTable, TEMP_PREVIOUS_SNAPSHOT_TABLE);
- String instrument = inputParams.get("Instrument");
- if (instrument.trim().equalsIgnoreCase("all")) // we need to execute action for all instruments from input
- // tables.
- {
- worseBuyPriceInTop5OneInstrument = connection.prepareStatement(format("SELECT MIN(Price) AS Price FROM " +
- "(SELECT DISTINCT Price FROM %s WHERE Message_Type='70' AND Side='66' " +
- "AND Instrument_ID=? ORDER BY Price DESC LIMIT 5) AS distPrices", orderBookSnapshotTempTable));
- worseSellPriceInTop5OneInstrument = connection.prepareStatement(format("SELECT MAX(Price) AS Price FROM " +
- "(SELECT DISTINCT Price FROM %s WHERE Message_Type='70' AND Side='83' " +
- "AND Instrument_ID=? ORDER BY Price LIMIT 5) as distPrices", orderBookSnapshotTempTable));
- cleanAllTempTables(connection);
- // Fill allTablesData table (all instruments)
- executeParametrizedQuery(connection, format(loadQuery("insertFromAddOrderAllInstruments.sql"),
- AGGREGATED_DATA_TABLE, addOrderTable), currentQueryParams);
- connection.commit();
- executeParametrizedQuery(connection, format(loadQuery("insertFromModifyOrderAllInstruments.sql"),
- AGGREGATED_DATA_TABLE, modifyOrderTable), currentQueryParams);
- connection.commit();
- executeParametrizedQuery(connection, format(loadQuery("insertFromOrderDeleteAllInstruments.sql"),
- AGGREGATED_DATA_TABLE, deleteOrderTable), currentQueryParams);
- connection.commit();
- executeParametrizedQuery(connection, format(loadQuery("insertFromOrderBookClearAllInstruments.sql"),
- AGGREGATED_DATA_TABLE, orderBookClearTable), currentQueryParams);
- connection.commit();
- // Fill temp OrderBookSnapshot
- currentQueryParams.put("Timestamp", orderBookSnapshotTime);
- executeParametrizedQuery(connection, format(loadQuery(
- "insertFromOrderBookSnapToTempAllInstruments" +
- ".sql"), orderBookSnapshotTempTable, orderBookSnapshotTable), currentQueryParams);
- connection.commit();
- insertFromAllData = connection.prepareStatement(format(loadQuery("insertFromAllData.sql"),
- finalPreviousSnapshotTable, orderBookSnapshotTempTable));
- insertAddRowToTempSnapshot = connection.prepareStatement(format(loadQuery("insertAddRowToSnapshot.sql"),
- TEMP_PREVIOUS_SNAPSHOT_TABLE, orderBookSnapshotTempTable));
- try(DbDataReader instrumentReader = getTableReader(connection, format("SELECT DISTINCT Instrument_ID FROM %s",
- orderBookSnapshotTempTable), currentQueryParams)) // Add to PrevSnapshot
- {
- instrumentReader.start();
- while (instrumentReader.hasMoreData())
- {
- TableRow<String, String> tableRow = instrumentReader.readRow();
- String currentInstrument = tableRow.getValue("Instrument_ID");
- logger.debug("Current instrument (read from OrderBookSnapshot table): {}.", currentInstrument);
- currentQueryParams.put("Instrument", currentInstrument);
- snapshotHandler.setOrderDataSourceTable(orderBookSnapshotTempTable);
- bestTop5OneInstrument = connection.prepareStatement(format(
- "SELECT * from %s WHERE Message_Type='70' AND Instrument_ID=? " +
- "AND ((Side='66' AND Price>=?) OR (Side='83' AND Price<=?) " +
- "OR (Order_Type='1' AND Price='0'))", orderBookSnapshotTempTable));
- bestRest5OneInstrument = connection.prepareStatement(format(
- "SELECT * from %s WHERE Message_Type='70' AND Instrument_ID=? " +
- "AND NOT((Side='66' AND Price>=?) OR (Side='83' AND Price<=?) " +
- "OR (Order_Type='1' AND Price='0'))", orderBookSnapshotTempTable));
- bestTop5OneInstrument.setString(1, currentInstrument);
- bestRest5OneInstrument.setString(1, currentInstrument);
- String minBuyTop5Price = getWorsePriceInTop5( true, true, currentInstrument);
- logger.trace("Min Buy Top-5 Price in OrderBookSnapshot table: {}", minBuyTop5Price);
- bestTop5OneInstrument.setString(2, minBuyTop5Price);
- bestRest5OneInstrument.setString(2, minBuyTop5Price);
- String maxSellTop5Price = getWorsePriceInTop5(false, true, currentInstrument);
- logger.trace("Max Sell Top-5 Price in OrderBookSnapshot table: {}", maxSellTop5Price);
- bestTop5OneInstrument.setString(3, maxSellTop5Price);
- bestRest5OneInstrument.setString(3, maxSellTop5Price);
- try (DbDataReader reader = new DbDataReader(bestTop5OneInstrument)) // Add to PrevSnapshot
- {
- reader.start();
- while (reader.hasMoreData())
- {
- addRecord(reader.readRow(), currentInstrument);
- }
- }
- try (DbDataReader reader = new DbDataReader(bestRest5OneInstrument)) // Add to TempPrevSnapshot
- {
- reader.start();
- while (reader.hasMoreData())
- {
- addRecord(reader.readRow(), currentInstrument);
- }
- }
- // Stop reading data from OrderBookSnapshot
- }
- }
- catch (IOException e)
- {
- String errMessage = "Error while select rows from OrderBookSnapshotTempTable";
- this.logger.error(errMessage, e);
- throw new ResultException(errMessage, e);
- }
- executeBatch(insertFromAllData, "insertFromAllData");
- executeBatch(insertAddRowToTempSnapshot, "insertAddRowToTempSnapshot");
- Utils.closeResource(insertFromAllData);
- Utils.closeResource(insertAddRowToTempSnapshot);
- Utils.closeResource(worseBuyPriceInTop5OneInstrument);
- Utils.closeResource(worseSellPriceInTop5OneInstrument);
- Utils.closeResource(bestTop5OneInstrument);
- Utils.closeResource(bestRest5OneInstrument);
- connection.commit();
- snapshotHandler.setOrderDataSourceTable(AGGREGATED_DATA_TABLE);
- insertFromAllData = connection.prepareStatement(format(loadQuery("insertFromAllData.sql"),
- finalPreviousSnapshotTable, AGGREGATED_DATA_TABLE));
- insertAddRowToSnapshot = connection.prepareStatement(format(loadQuery("insertAddRowToSnapshot.sql"),
- finalPreviousSnapshotTable, AGGREGATED_DATA_TABLE));
- insertAddRowToTempSnapshot = connection.prepareStatement(format(loadQuery("insertAddRowToSnapshot.sql"),
- TEMP_PREVIOUS_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
- insertDeleteRowToSnapshot = connection.prepareStatement(format(loadQuery("insertDeleteRowToSnapshot.sql"),
- finalPreviousSnapshotTable, AGGREGATED_DATA_TABLE));
- insertDeleteRowWithSpecParams = connection.prepareStatement(format(loadQuery(
- "insertDeleteRowWithSpecificParams.sql"), finalPreviousSnapshotTable, AGGREGATED_DATA_TABLE));
- insertFromAllDataWithSpecParamsSource = connection.prepareStatement(format(loadQuery(
- "insertFromAllDataWithSpecificParams.sql"), finalPreviousSnapshotTable, AGGREGATED_DATA_TABLE));
- insertDeleteRowFromModifyToSnap = connection.prepareStatement(format(loadQuery(
- "insertDeleteRowFromModifyToSnapshot.sql"), finalPreviousSnapshotTable, AGGREGATED_DATA_TABLE));
- deleteFromTemp = connection.prepareStatement(format("DELETE FROM %s WHERE Side=? AND Order_ID=? AND MessageTimestamp=?",
- TEMP_PREVIOUS_SNAPSHOT_TABLE));
- insertAddRowWithSpecParamsSource = connection.prepareStatement(format(loadQuery(
- "insertAddRowWithSpecificParams.sql"), finalPreviousSnapshotTable, AGGREGATED_DATA_TABLE));
- insertAddRowWithSpecParamsOrderBook = connection.prepareStatement(format(loadQuery(
- "insertAddRowWithSpecificParams.sql"), finalPreviousSnapshotTable, orderBookSnapshotTempTable));
- currentQueryParams.put("LeftTimestamp", orderBookSnapshotTime);
- currentQueryParams.put("RightTimestamp", previousSnapshotTime);
- createSnapshot(connection, String.format(SELECT_ALL_MESSAGES_BY_TIME,
- AGGREGATED_DATA_TABLE)); // first snapshot
- executeBatch(insertFromAllData, "insertFromAllData");
- executeBatch(insertAddRowToSnapshot, "insertAddRowToSnapshot");
- executeBatch(insertAddRowToTempSnapshot, "insertAddRowToTempSnapshot");
- executeBatch(insertDeleteRowToSnapshot, "insertDeleteRowToSnapshot");
- executeBatch(insertDeleteRowWithSpecParams, "insertDeleteRowWithSpecParams");
- executeBatch(insertFromAllDataWithSpecParamsSource, "insertFromAllDataWithSpecParamsSource");
- executeBatch(insertDeleteRowFromModifyToSnap, "insertDeleteRowFromModifyToSnap");
- executeBatch(deleteFromTemp, "deleteFromTemp");
- executeBatch(insertAddRowWithSpecParamsOrderBook, "insertAddRowWithSpecParamsOrderBook");
- executeBatch(insertAddRowWithSpecParamsSource, "insertAddRowWithSpecParamsSource");
- Utils.closeResource(insertFromAllData);
- Utils.closeResource(insertAddRowToSnapshot);
- Utils.closeResource(insertAddRowToTempSnapshot);
- Utils.closeResource(insertDeleteRowToSnapshot);
- Utils.closeResource(insertDeleteRowWithSpecParams);
- Utils.closeResource(insertFromAllDataWithSpecParamsSource);
- Utils.closeResource(insertDeleteRowFromModifyToSnap);
- Utils.closeResource(deleteFromTemp);
- Utils.closeResource(insertAddRowWithSpecParamsOrderBook);
- Utils.closeResource(insertAddRowWithSpecParamsSource);
- connection.commit();
- // transfer data from prev snapshot to current
- snapshotHandler.nextSnapshot(finalCurrentSnapshotTable, TEMP_CURRENT_SNAPSHOT_TABLE);
- insertFromAllData = connection.prepareStatement(format(loadQuery("insertFromAllData.sql"),
- finalCurrentSnapshotTable, AGGREGATED_DATA_TABLE));
- insertAddRowToSnapshot = connection.prepareStatement(format(loadQuery("insertAddRowToSnapshot.sql"),
- finalCurrentSnapshotTable, AGGREGATED_DATA_TABLE));
- insertAddRowToTempSnapshot = connection.prepareStatement(format(loadQuery("insertAddRowToSnapshot.sql"),
- TEMP_CURRENT_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
- insertDeleteRowToSnapshot = connection.prepareStatement(format(loadQuery("insertDeleteRowToSnapshot.sql"),
- finalCurrentSnapshotTable, AGGREGATED_DATA_TABLE));
- insertDeleteRowWithSpecParams = connection.prepareStatement(format(loadQuery(
- "insertDeleteRowWithSpecificParams.sql"), finalCurrentSnapshotTable, AGGREGATED_DATA_TABLE));
- insertFromAllDataWithSpecParamsSource = connection.prepareStatement(format(loadQuery(
- "insertFromAllDataWithSpecificParams.sql"), finalCurrentSnapshotTable, AGGREGATED_DATA_TABLE));
- insertDeleteRowFromModifyToSnap = connection.prepareStatement(format(loadQuery(
- "insertDeleteRowFromModifyToSnapshot.sql"), finalCurrentSnapshotTable, AGGREGATED_DATA_TABLE));
- deleteFromTemp = connection.prepareStatement(format("DELETE FROM %s WHERE Side=? AND Order_ID=? AND MessageTimestamp=?", TEMP_CURRENT_SNAPSHOT_TABLE));
- insertAddRowWithSpecParamsSource = connection.prepareStatement(format(loadQuery(
- "insertAddRowWithSpecificParams.sql"), finalCurrentSnapshotTable, AGGREGATED_DATA_TABLE));
- insertAddRowWithSpecParamsOrderBook = connection.prepareStatement(format(loadQuery(
- "insertAddRowWithSpecificParams.sql"), finalCurrentSnapshotTable, orderBookSnapshotTempTable));
- currentQueryParams.put("LeftTimestamp", previousSnapshotTime);
- currentQueryParams.put("RightTimestamp", currentSnapshotTime);
- createSnapshot(connection, String.format(SELECT_ALL_MESSAGES_BY_TIME,
- AGGREGATED_DATA_TABLE)); // second snapshot
- executeBatch(insertFromAllData, "insertFromAllData");
- executeBatch(insertAddRowToSnapshot, "insertAddRowToSnapshot");
- executeBatch(insertAddRowToTempSnapshot, "insertAddRowToTempSnapshot");
- executeBatch(insertDeleteRowToSnapshot, "insertDeleteRowToSnapshot");
- executeBatch(insertDeleteRowWithSpecParams, "insertDeleteRowWithSpecParams");
- executeBatch(insertFromAllDataWithSpecParamsSource, "insertFromAllDataWithSpecParamsSource");
- executeBatch(insertDeleteRowFromModifyToSnap, "insertDeleteRowFromModifyToSnap");
- executeBatch(deleteFromTemp, "deleteFromTemp");
- executeBatch(insertAddRowWithSpecParamsSource, "insertAddRowWithSpecParamsSource");
- executeBatch(insertAddRowWithSpecParamsOrderBook, "insertAddRowWithSpecParamsOrderBook");
- Utils.closeResource(insertFromAllData);
- Utils.closeResource(insertAddRowToSnapshot);
- Utils.closeResource(insertAddRowToTempSnapshot);
- Utils.closeResource(insertDeleteRowToSnapshot);
- Utils.closeResource(insertDeleteRowWithSpecParams);
- Utils.closeResource(insertFromAllDataWithSpecParamsSource);
- Utils.closeResource(insertDeleteRowFromModifyToSnap);
- Utils.closeResource(deleteFromTemp);
- Utils.closeResource(insertAddRowWithSpecParamsSource);
- Utils.closeResource(insertAddRowWithSpecParamsOrderBook);
- connection.commit();
- dropIndexes(connection, AGGREGATED_DATA_TABLE);
- dropIndexes(connection, TEMP_PREVIOUS_SNAPSHOT_TABLE);
- dropIndexes(connection, PREVIOUS_SNAPSHOT_TABLE);
- dropIndexes(connection, TEMP_CURRENT_SNAPSHOT_TABLE);
- dropIndexes(connection, CURRENT_SNAPSHOT_TABLE);
- dropIndexes(connection, orderBookSnapshotTempTable);
- dropIndexes(connection, finalPreviousSnapshotTable);
- dropIndexes(connection, finalCurrentSnapshotTable);
- connection.commit();
- //transfer result to final tables
- /*executeParametrizedQuery(connection, format(loadQuery("insertToFinalSnapshot.sql"),
- finalPreviousSnapshotTable, PREVIOUS_SNAPSHOT_TABLE), currentQueryParams);
- executeParametrizedQuery(connection, format(loadQuery("insertToFinalSnapshot.sql"),
- finalCurrentSnapshotTable, CURRENT_SNAPSHOT_TABLE), currentQueryParams);*/
- }
- else
- {
- worseBuyPriceInTop5AllInstrument = connection.prepareStatement(format("SELECT MIN(Price) AS Price FROM " +
- "(SELECT DISTINCT Price FROM %s WHERE Message_Type='70' AND Side='66' ORDER BY Price DESC " +
- "LIMIT 5) as distPrices", orderBookSnapshotTempTable));
- worseSellPriceInTop5AllInstrument = connection.prepareStatement(format("SELECT MAX(Price) AS Price FROM " +
- "(SELECT DISTINCT Price FROM %s WHERE Message_Type='70' AND Side='83' ORDER BY Price " +
- "LIMIT 5) as distPrices", orderBookSnapshotTempTable));
- String[] allInstruments = instrument.split(",");
- logger.debug("There are {} distinct instruments in input parameter 'Instrument'.", allInstruments.length);
- for (String currentInstrument : allInstruments)
- {
- cleanAllTempTables(connection);
- currentInstrument = currentInstrument.trim();
- logger.debug("Current instrument: {}.", currentInstrument);
- currentQueryParams.put("Instrument", currentInstrument);
- // Fill temp OrderBookSnapshot
- currentQueryParams.put("Timestamp", orderBookSnapshotTime);
- executeParametrizedQuery(connection, format(loadQuery("insertFromOrderBookSnapToTemp.sql"),
- orderBookSnapshotTempTable, orderBookSnapshotTable), currentQueryParams);
- bestTop5AllInstrument = connection.prepareStatement(format("SELECT * from %s WHERE " +
- "Message_Type='70' AND ((Side='66' AND Price>=?) OR (Side='83' AND Price<=?) " +
- "OR (Order_Type='1' AND Price='0'))", orderBookSnapshotTempTable));
- bestRest5AllInstrument = connection.prepareStatement(format("SELECT * from %s WHERE " +
- "Message_Type='70' AND NOT((Side='66' AND Price>=?) OR (Side='83' AND Price<=?) " +
- "OR (Order_Type='1' AND Price='0'))", orderBookSnapshotTempTable));
- insertFromAllData = connection.prepareStatement(format(loadQuery("insertFromAllData.sql"),
- finalPreviousSnapshotTable, orderBookSnapshotTempTable));
- insertAddRowToTempSnapshot = connection.prepareStatement(format(loadQuery("insertAddRowToSnapshot.sql"),
- TEMP_PREVIOUS_SNAPSHOT_TABLE, orderBookSnapshotTempTable));
- // Fill allTablesData table
- executeParametrizedQuery(connection, format(loadQuery("insertFromAddOrder.sql"), AGGREGATED_DATA_TABLE,
- addOrderTable), currentQueryParams);
- executeParametrizedQuery(connection, format(loadQuery("insertFromModifyOrder.sql"), AGGREGATED_DATA_TABLE,
- modifyOrderTable), currentQueryParams);
- executeParametrizedQuery(connection, format(loadQuery("insertFromOrderDelete.sql"), AGGREGATED_DATA_TABLE,
- deleteOrderTable), currentQueryParams);
- executeParametrizedQuery(connection, format(loadQuery("insertFromOrderBookClear.sql"), AGGREGATED_DATA_TABLE,
- orderBookClearTable), currentQueryParams);
- snapshotHandler.setOrderDataSourceTable(orderBookSnapshotTempTable);
- try
- {
- String minBuyTop5Price = getWorsePriceInTop5( true, false, null);
- logger.trace("Min Buy Top-5 Price in OrderBookSnapshot table for instrument {}: {}",
- currentInstrument, minBuyTop5Price);
- bestTop5AllInstrument.setString(1, minBuyTop5Price);
- bestRest5AllInstrument.setString(1, minBuyTop5Price);
- String maxSellTop5Price = getWorsePriceInTop5( false, false, null);
- logger.trace("Max Sell Top-5 Price in OrderBookSnapshot table for instrument {}: {}",
- currentInstrument, maxSellTop5Price);
- bestTop5AllInstrument.setString(2, maxSellTop5Price);
- bestRest5AllInstrument.setString(2, maxSellTop5Price);
- try (DbDataReader reader = new DbDataReader(bestTop5AllInstrument)) // Add to PrevSnapshot
- {
- reader.start();
- while (reader.hasMoreData())
- {
- addRecord(reader.readRow(), currentInstrument);
- }
- }
- try (DbDataReader reader = new DbDataReader(bestRest5AllInstrument)) // Add to TempPrevSnapshot
- {
- reader.start();
- while (reader.hasMoreData())
- {
- addRecord(reader.readRow(), currentInstrument);
- }
- }
- }
- catch (IOException e)
- {
- String errMessage = "Error while select rows from OrderBookSnapshotTempTable";
- this.logger.error(errMessage, e);
- throw new ResultException(errMessage, e);
- }
- executeBatch(insertFromAllData, "insertFromAllData");
- executeBatch(insertAddRowToTempSnapshot, "insertAddRowToTempSnapshot");
- Utils.closeResource(insertFromAllData);
- Utils.closeResource(insertAddRowToTempSnapshot);
- Utils.closeResource(bestTop5AllInstrument);
- Utils.closeResource(bestRest5AllInstrument);
- snapshotHandler.setOrderDataSourceTable(AGGREGATED_DATA_TABLE);
- insertFromAllData = connection.prepareStatement(format(loadQuery("insertFromAllData.sql"),
- PREVIOUS_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
- insertAddRowToSnapshot = connection.prepareStatement(format(loadQuery("insertAddRowToSnapshot.sql"),
- PREVIOUS_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
- insertAddRowToTempSnapshot = connection.prepareStatement(format(loadQuery("insertAddRowToSnapshot.sql"),
- TEMP_PREVIOUS_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
- insertDeleteRowToSnapshot = connection.prepareStatement(format(loadQuery("insertDeleteRowToSnapshot.sql"),
- PREVIOUS_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
- insertDeleteRowWithSpecParams = connection.prepareStatement(format(loadQuery(
- "insertDeleteRowWithSpecificParams.sql"), PREVIOUS_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
- insertFromAllDataWithSpecParamsSource = connection.prepareStatement(format(loadQuery(
- "insertFromAllDataWithSpecificParams.sql"), PREVIOUS_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
- insertDeleteRowFromModifyToSnap = connection.prepareStatement(format(loadQuery(
- "insertDeleteRowFromModifyToSnapshot.sql"), PREVIOUS_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
- deleteFromTemp = connection.prepareStatement(format("DELETE FROM %s WHERE Side=? AND Order_ID=? AND " +
- "MessageTimestamp=?", TEMP_PREVIOUS_SNAPSHOT_TABLE));
- insertAddRowWithSpecParamsSource = connection.prepareStatement(format(loadQuery(
- "insertAddRowWithSpecificParams.sql"), PREVIOUS_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
- insertAddRowWithSpecParamsOrderBook = connection.prepareStatement(format(loadQuery(
- "insertAddRowWithSpecificParams.sql"), PREVIOUS_SNAPSHOT_TABLE, orderBookSnapshotTempTable));
- currentQueryParams.put("LeftTimestamp", orderBookSnapshotTime);
- currentQueryParams.put("RightTimestamp", previousSnapshotTime);
- createSnapshot(connection, String.format(SELECT_ALL_MESSAGES_BY_TIME,
- AGGREGATED_DATA_TABLE)); // first snapshot
- executeBatch(insertFromAllData, "insertFromAllData");
- executeBatch(insertAddRowToSnapshot, "insertAddRowToSnapshot");
- executeBatch(insertAddRowToTempSnapshot, "insertAddRowToTempSnapshot");
- executeBatch(insertDeleteRowToSnapshot, "insertDeleteRowToSnapshot");
- executeBatch(insertDeleteRowWithSpecParams, "insertDeleteRowWithSpecParams");
- executeBatch(insertFromAllDataWithSpecParamsSource, "insertFromAllDataWithSpecParamsSource");
- executeBatch(insertDeleteRowFromModifyToSnap, "insertDeleteRowFromModifyToSnap");
- executeBatch(deleteFromTemp, "deleteFromTemp");
- executeBatch(insertAddRowWithSpecParamsOrderBook, "insertAddRowWithSpecParamsOrderBook");
- executeBatch(insertAddRowWithSpecParamsSource, "insertAddRowWithSpecParamsSource");
- Utils.closeResource(insertFromAllData);
- Utils.closeResource(insertAddRowToSnapshot);
- Utils.closeResource(insertAddRowToTempSnapshot);
- Utils.closeResource(insertDeleteRowToSnapshot);
- Utils.closeResource(insertDeleteRowWithSpecParams);
- Utils.closeResource(insertFromAllDataWithSpecParamsSource);
- Utils.closeResource(insertDeleteRowFromModifyToSnap);
- Utils.closeResource(deleteFromTemp);
- Utils.closeResource(insertAddRowWithSpecParamsOrderBook);
- Utils.closeResource(insertAddRowWithSpecParamsSource);
- // transfer data from prev snapshot to current
- snapshotHandler.nextSnapshot(CURRENT_SNAPSHOT_TABLE, TEMP_CURRENT_SNAPSHOT_TABLE);
- insertFromAllData = connection.prepareStatement(format(loadQuery("insertFromAllData.sql"),
- CURRENT_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
- insertAddRowToSnapshot = connection.prepareStatement(format(loadQuery("insertAddRowToSnapshot.sql"),
- CURRENT_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
- insertAddRowToTempSnapshot = connection.prepareStatement(format(loadQuery("insertAddRowToSnapshot.sql"),
- TEMP_CURRENT_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
- insertDeleteRowToSnapshot = connection.prepareStatement(format(loadQuery("insertDeleteRowToSnapshot.sql"),
- CURRENT_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
- insertDeleteRowWithSpecParams = connection.prepareStatement(format(loadQuery(
- "insertDeleteRowWithSpecificParams.sql"), CURRENT_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
- insertFromAllDataWithSpecParamsSource = connection.prepareStatement(format(loadQuery(
- "insertFromAllDataWithSpecificParams.sql"), CURRENT_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
- insertDeleteRowFromModifyToSnap = connection.prepareStatement(format(loadQuery(
- "insertDeleteRowFromModifyToSnapshot.sql"), CURRENT_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
- deleteFromTemp = connection.prepareStatement(format("DELETE FROM %s WHERE Side=? AND Order_ID=? " +
- "AND MessageTimestamp=?", TEMP_CURRENT_SNAPSHOT_TABLE));
- insertAddRowWithSpecParamsSource = connection.prepareStatement(format(loadQuery(
- "insertAddRowWithSpecificParams.sql"), CURRENT_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
- insertAddRowWithSpecParamsOrderBook = connection.prepareStatement(format(loadQuery(
- "insertAddRowWithSpecificParams.sql"), CURRENT_SNAPSHOT_TABLE, orderBookSnapshotTempTable));
- currentQueryParams.put("LeftTimestamp", previousSnapshotTime);
- currentQueryParams.put("RightTimestamp", currentSnapshotTime);
- createSnapshot(connection, String.format(SELECT_ALL_MESSAGES_BY_TIME,
- AGGREGATED_DATA_TABLE)); // second snapshot
- executeBatch(insertFromAllData, "insertFromAllData");
- executeBatch(insertAddRowToSnapshot, "insertAddRowToSnapshot");
- executeBatch(insertAddRowToTempSnapshot, "insertAddRowToTempSnapshot");
- executeBatch(insertDeleteRowToSnapshot, "insertDeleteRowToSnapshot");
- executeBatch(insertDeleteRowWithSpecParams, "insertDeleteRowWithSpecParams");
- executeBatch(insertFromAllDataWithSpecParamsSource, "insertFromAllDataWithSpecParamsSource");
- executeBatch(insertDeleteRowFromModifyToSnap, "insertDeleteRowFromModifyToSnap");
- executeBatch(deleteFromTemp, "deleteFromTemp");
- executeBatch(insertAddRowWithSpecParamsSource, "insertAddRowWithSpecParamsSource");
- executeBatch(insertAddRowWithSpecParamsOrderBook, "insertAddRowWithSpecParamsOrderBook");
- Utils.closeResource(insertFromAllData);
- Utils.closeResource(insertAddRowToSnapshot);
- Utils.closeResource(insertAddRowToTempSnapshot);
- Utils.closeResource(insertDeleteRowToSnapshot);
- Utils.closeResource(insertDeleteRowWithSpecParams);
- Utils.closeResource(insertFromAllDataWithSpecParamsSource);
- Utils.closeResource(insertDeleteRowFromModifyToSnap);
- Utils.closeResource(deleteFromTemp);
- Utils.closeResource(insertAddRowWithSpecParamsSource);
- Utils.closeResource(insertAddRowWithSpecParamsOrderBook);
- //transfer result to final tables
- executeParametrizedQuery(connection, format(loadQuery("insertToFinalSnapshot.sql"),
- finalPreviousSnapshotTable, PREVIOUS_SNAPSHOT_TABLE), currentQueryParams);
- executeParametrizedQuery(connection, format(loadQuery("insertToFinalSnapshot.sql"),
- finalCurrentSnapshotTable, CURRENT_SNAPSHOT_TABLE), currentQueryParams);
- connection.commit();
- }
- Utils.closeResource(worseBuyPriceInTop5AllInstrument);
- Utils.closeResource(worseSellPriceInTop5AllInstrument);
- }
- }
- private void cleanAllTempTables(Connection connection)
- {
- executeParametrizedQuery(connection, format("DELETE FROM %s", AGGREGATED_DATA_TABLE), currentQueryParams);
- executeParametrizedQuery(connection, format("DELETE FROM %s", orderBookSnapshotTempTable), currentQueryParams);
- executeParametrizedQuery(connection, format("DELETE FROM %s", TEMP_PREVIOUS_SNAPSHOT_TABLE), currentQueryParams);
- executeParametrizedQuery(connection, format("DELETE FROM %s", PREVIOUS_SNAPSHOT_TABLE), currentQueryParams);
- executeParametrizedQuery(connection, format("DELETE FROM %s", TEMP_CURRENT_SNAPSHOT_TABLE), currentQueryParams);
- executeParametrizedQuery(connection, format("DELETE FROM %s", CURRENT_SNAPSHOT_TABLE), currentQueryParams);
- }
- /*
- private void printTableColumns(StringTableData td)
- {
- CommaBuilder commaBuilder = new CommaBuilder();
- Iterator<String> headerIterator = td.getHeader().iterator();
- String currentColumn;
- while(headerIterator.hasNext())
- {
- currentColumn = headerIterator.next();
- String newColumn = "'" + currentColumn + "'";
- commaBuilder.append(newColumn);
- }
- String columns = commaBuilder.toString();
- logger.trace("Temp allData table columns: {}", columns);
- }
- */
- private void createSnapshot(Connection connection, String selectOrderBookPartQuery) throws SQLException
- {
- long MAX_ROWS = 100_000, currentRow = 0;
- try (DbDataReader reader = getTableReader(connection, selectOrderBookPartQuery, currentQueryParams))
- {
- reader.start();
- //printTableColumns(td);
- while (reader.hasMoreData())
- {
- if (currentRow == MAX_ROWS)
- {
- connection.commit();
- currentRow = 0;
- }
- else
- {
- currentRow++;
- }
- TableRow<String, String> tr = reader.readRow();
- String messageType = tr.getValue(MESSAGE_TYPE);
- String currentInstrument = tr.getValue("Instrument_ID");
- switch (messageType)
- {
- case ADD_ORDER_TYPE_DEC:
- addRecord(tr, currentInstrument);
- break;
- case MODIFY_ORDER_TYPE_DEC:
- modifyRecord(tr, currentInstrument);
- break;
- case DELETE_ORDER_TYPE_DEC:
- deleteRecord(tr, currentInstrument);
- break;
- case CLEAR_BOOK_TYPE_DEC:
- deleteAllRecords(currentInstrument);
- break;
- default:
- throw new ResultException(format("Invalid value in column %s: %s", MESSAGE_TYPE,
- messageType));
- }
- }
- }
- catch (IOException e)
- {
- String errMessage = "Error while reading data";
- this.logger.error(errMessage, e);
- throw new ResultException(errMessage, e);
- }
- }
- private void addRecord(TableRow<String, String> tableRow, String currentInstrument) throws SQLException
- {
- String side = tableRow.getValue(SIDE);
- String orderId = tableRow.getValue(ORDER_ID);
- String timestamp = tableRow.getValue(MESSAGE_TIMESTAMP);
- currentQueryParams.put(SIDE, side);
- currentQueryParams.put("OrderId", orderId);
- currentQueryParams.put("Timestamp", timestamp);
- String price = tableRow.getValue(PRICE);
- String orderType = tableRow.getValue("Order_Type");
- String dss = tableRow.getValue("dss_rt_bus_seq_no");
- snapshotHandler.getInstrumentInfo(currentInstrument).addOrder(side, orderId, timestamp,
- Double.parseDouble(price), orderType, dss);
- }
- private void modifyRecord(TableRow<String, String> tableRow, String currentInstrument) throws SQLException
- {
- String side = tableRow.getValue(SIDE);
- String orderId = tableRow.getValue(ORDER_ID);
- String timestamp = tableRow.getValue(MESSAGE_TIMESTAMP);
- currentQueryParams.put(SIDE, side);
- currentQueryParams.put("OrderId", orderId);
- currentQueryParams.put("Timestamp", timestamp);
- String oldPrice = tableRow.getValue("Old_Price");
- String newPrice = tableRow.getValue("Price");
- String newSize = tableRow.getValue("Size");
- String orderType = tableRow.getValue("Order_Type");
- String dss = tableRow.getValue("dss_rt_bus_seq_no");
- snapshotHandler.getInstrumentInfo(currentInstrument).modifyOrder(side, orderId,
- timestamp, Double.parseDouble(oldPrice), Double.parseDouble(newPrice), newSize, orderType, dss);
- }
- private void deleteRecord(TableRow<String, String> tableRow, String currentInstrument) throws SQLException
- {
- String side = tableRow.getValue(SIDE);
- String orderId = tableRow.getValue(ORDER_ID);
- String timestamp = tableRow.getValue(MESSAGE_TIMESTAMP);
- currentQueryParams.put(SIDE, side);
- currentQueryParams.put("OrderId", orderId);
- currentQueryParams.put("Timestamp", timestamp);
- String price = tableRow.getValue(PRICE);
- String orderType = tableRow.getValue("Order_Type");
- String dss = tableRow.getValue("dss_rt_bus_seq_no");
- snapshotHandler.getInstrumentInfo(currentInstrument).deleteOrder(side, orderId,
- Double.parseDouble(price), orderType, timestamp, dss);
- }
- private void deleteAllRecords(String currentInstrument) throws SQLException
- {
- snapshotHandler.getInstrumentInfo(currentInstrument).clear();
- }
- private String loadQuery(String queryFile)
- {
- String createResultTable;
- try
- {
- createResultTable = SQLUtils.loadQuery(rootRelative(QUERIES_PATH + queryFile));
- }
- catch (IOException e)
- {
- String errMessage = "Error while loading query from file '" + queryFile + "'";
- this.logger.error(errMessage, e);
- throw new ResultException(errMessage, e);
- }
- return createResultTable;
- }
- private void executeParametrizedQuery(Connection connection, String query, Map<String, String> params)
- {
- logger.trace("Query before parsing: {}", query);
- ParametrizedQuery paramQuery = SQLUtils.parseSQLTemplate(query);
- logger.trace("Parametrized query: {}", paramQuery.getQuery());
- try (PreparedStatement statement = paramQuery.createPreparedStatement(connection, params))
- {
- statement.execute();
- }
- catch (SQLException e)
- {
- String errMessage = "Error while prepared statement creating";
- this.logger.error(errMessage, e);
- throw new ResultException(errMessage, e);
- }
- }
- private String getWorsePriceInTop5(boolean isBuy, boolean isSpecificInstrument,
- String instrument) throws SQLException
- {
- PreparedStatement preparedStatement;
- if (isBuy)
- {
- if (isSpecificInstrument)
- {
- preparedStatement = worseBuyPriceInTop5OneInstrument;
- preparedStatement.setString(1, instrument);
- }
- else
- {
- preparedStatement = worseBuyPriceInTop5AllInstrument;
- }
- }
- else // Sell
- {
- if (isSpecificInstrument)
- {
- preparedStatement = worseSellPriceInTop5OneInstrument;
- preparedStatement.setString(1, instrument);
- }
- else
- {
- preparedStatement = worseSellPriceInTop5AllInstrument;
- }
- }
- try (ResultSet rs = preparedStatement.executeQuery())
- {
- if(rs.next())
- return String.valueOf(rs.getDouble("Price"));
- }
- catch (SQLException e)
- {
- String errMessage = "Error while prepared statement creating";
- this.logger.error(errMessage, e);
- throw new ResultException(errMessage, e);
- }
- return null;
- }
- private void executeQuery(Connection connection, String query)
- {
- logger.trace("Query to execute: {}", query);
- try (Statement statement = connection.createStatement())
- {
- statement.execute(query);
- }
- catch (SQLException e)
- {
- String errMessage = format("Error while executing query '%s'.", query);
- this.logger.error(errMessage, e);
- throw new ResultException(errMessage, e);
- }
- }
- private DbDataReader getTableReader(Connection connection, String sqlTemplate, Map<String, String> params)
- {
- ParametrizedQuery query;
- PreparedStatement statement;
- try
- {
- query = SQLUtils.parseSQLTemplate(sqlTemplate);
- statement = query.createPreparedStatement(connection, params);
- }
- catch (SQLException e)
- {
- String errMessage = "Error while prepared statement creating";
- this.logger.error(errMessage, e);
- throw new ResultException(errMessage, e);
- }
- return new DbDataReader(statement);
- }
- private class SnapshotHandler
- {
- private final Connection connection;
- private String snapshotTable;
- private String tempSnapshotTable;
- private Map<String, InstrumentInfo> instrumentsInfo;
- private String sourceTable;
- public SnapshotHandler(Connection dbConnection, String snapshotTable, String tempSnapshotTable)
- {
- connection = dbConnection;
- this.snapshotTable = snapshotTable;
- this.tempSnapshotTable = tempSnapshotTable;
- instrumentsInfo = new HashMap<>();
- }
- public void setOrderDataSourceTable(String sourceTable)
- {
- this.sourceTable = sourceTable;
- for (Map.Entry<String, InstrumentInfo> instrumentEntry : instrumentsInfo.entrySet())
- {
- InstrumentInfo ii = instrumentEntry.getValue();
- ii.setOrderDataSourceTable(sourceTable);
- instrumentEntry.setValue(ii);
- }
- }
- public InstrumentInfo getInstrumentInfo(String instrument)
- {
- InstrumentInfo ii = instrumentsInfo.get(instrument);
- if (ii == null)
- {
- ii = new InstrumentInfo(connection, snapshotTable, tempSnapshotTable);
- ii.setOrderDataSourceTable(sourceTable);
- instrumentsInfo.put(instrument, ii);
- }
- return ii;
- }
- public void nextSnapshot(String newSnapshotTable, String newTempSnapshotTable) throws SQLException
- {
- this.snapshotTable = newSnapshotTable;
- this.tempSnapshotTable = newTempSnapshotTable;
- for (Map.Entry<String, InstrumentInfo> instrumentEntry : instrumentsInfo.entrySet())
- {
- instrumentEntry.getValue().nextSnapshot(newSnapshotTable, newTempSnapshotTable);
- }
- }
- }
- private class InstrumentInfo
- {
- private final Connection connection;
- private final PriceTopHandler buyHandler, sellHandler;
- private String snapshotTable;
- private String sourceTable;
- public InstrumentInfo(Connection dbConnection, String snapshotTable, String tempSnapshotTable)
- {
- connection = dbConnection;
- buyHandler = new PriceTopHandler(true, connection, snapshotTable, tempSnapshotTable);
- sellHandler = new PriceTopHandler(false, connection, snapshotTable, tempSnapshotTable);
- this.snapshotTable = snapshotTable;
- }
- public void setOrderDataSourceTable(String sourceTable)
- {
- this.sourceTable = sourceTable;
- buyHandler.setOrderDataSourceTable(sourceTable);
- sellHandler.setOrderDataSourceTable(sourceTable);
- }
- public void addOrder(String side, String orderId, String timestamp, double price, String orderType,
- String dss) throws SQLException
- {
- if (price == 0 && StringUtils.equals(orderType, "1"))
- {
- // handle zero price
- executePreparedStatement(insertFromAllData, "insertFromAllData", side, orderId, timestamp);
- }
- else
- {
- if ("66".equalsIgnoreCase(side))
- buyHandler.addOrder(orderId, timestamp, price, dss);
- else if ("83".equalsIgnoreCase(side))
- sellHandler.addOrder(orderId, timestamp, price, dss);
- }
- }
- public void deleteOrder(String side, String orderId, double price, String orderType, String timestamp,
- String dss) throws SQLException
- {
- if (price == 0 && (orderType == null || StringUtils.equals(orderType, "1")))
- {
- // handle zero price
- executePreparedStatementSpecParams(insertDeleteRowWithSpecParams, "insertDeleteRowWithSpecParams",
- timestamp, dss, side, orderId, timestamp);
- }
- else
- {
- if ("66".equalsIgnoreCase(side))
- buyHandler.deleteOrder(orderId, price, timestamp, dss);
- else if ("83".equalsIgnoreCase(side))
- sellHandler.deleteOrder(orderId, price, timestamp, dss);
- }
- }
- public void modifyOrder(String side, String orderId, String timestamp, double oldPrice, double newPrice,
- String newSize, String orderType, String dss) throws SQLException
- {
- if (oldPrice == 0 && newPrice == 0 && (orderType == null || StringUtils.equals(orderType, "1")))
- {
- // handle zero price
- executePreparedStatement(insertFromAllData, "insertFromAllData", side, orderId, timestamp);
- }
- else
- {
- if ("66".equalsIgnoreCase(side))
- buyHandler.modifyOrder(orderId, timestamp, oldPrice, newPrice, newSize, dss);
- else if ("83".equalsIgnoreCase(side))
- sellHandler.modifyOrder(orderId, timestamp, oldPrice, newPrice, newSize, dss);
- }
- }
- public void clear() throws SQLException
- {
- buyHandler.clear();
- sellHandler.clear();
- }
- public void nextSnapshot(String newSnapshotTable, String newTempSnapshotTable) throws SQLException
- {
- buyHandler.nextSnapshot(newSnapshotTable, newTempSnapshotTable);
- sellHandler.nextSnapshot(newSnapshotTable, newTempSnapshotTable);
- this.snapshotTable = newSnapshotTable;
- }
- }
- private class PriceTopHandler
- {
- private class OrderInfo
- {
- private final String orderId;
- private final String timestamp;
- public OrderInfo(String orderId)
- {
- this(orderId, null);
- }
- public OrderInfo(String orderId, String timestamp)
- {
- this.orderId = orderId;
- this.timestamp = timestamp;
- }
- public String getOrderId()
- {
- return orderId;
- }
- public String getTimestamp()
- {
- return timestamp;
- }
- @Override
- public boolean equals(Object o)
- {
- if (this == o)
- return true;
- if (o == null || getClass() != o.getClass())
- return false;
- OrderInfo orderInfo = (OrderInfo) o;
- return orderId.equals(orderInfo.orderId);
- }
- @Override
- public int hashCode()
- {
- return orderId.hashCode();
- }
- }
- private static final int TOP_SIZE = 5;
- private final TreeMap<Double, Set<OrderInfo>> top;
- private final TreeMap<Double, Set<OrderInfo>> rest;
- private final Comparator<Double> comparator;
- private String tempSnapshotTable;
- private String snapshotTable;
- private final Connection connection;
- private String orderDataSourceTable;
- private final String side;
- public PriceTopHandler(boolean isBuy, Connection connection, String snapshotTable, String tempSnapshotTable)
- {
- this.connection = connection;
- this.tempSnapshotTable = tempSnapshotTable;
- this.snapshotTable = snapshotTable;
- // First = worst, last = best
- comparator = isBuy ? Comparator.naturalOrder() : Comparator.reverseOrder();
- side = isBuy ? "66" : "83";
- top = new TreeMap<>(comparator);
- rest = new TreeMap<>(comparator);
- }
- public void setOrderDataSourceTable(String orderDataSourceTable)
- {
- this.orderDataSourceTable = orderDataSourceTable;
- }
- public void addOrder(String orderId, String timestamp, double price, String dss) throws SQLException
- {
- addOrder(orderId, timestamp, price, dss, false);
- }
- private void addOrder(String orderId, String timestamp, double price, String dss, boolean isModify)
- throws SQLException
- {
- BiFunction<Double, Set<OrderInfo>, Set<OrderInfo>> ADD_ORDER_ID = (k, set) ->
- {
- if (set == null)
- set = new HashSet<>();
- set.add(new OrderInfo(orderId, timestamp));
- return set;
- };
- if (top.size() == TOP_SIZE && comparator.compare(price, top.firstKey()) < 0)
- { // 1.5 - worse then top-5, record go to the 'rest'
- rest.compute(price, ADD_ORDER_ID);
- if (!isModify)
- {
- // add to temp
- executePreparedStatement(insertAddRowToTempSnapshot, "insertAddRowToTempSnapshot", side, orderId, timestamp);
- }
- }
- else
- {
- // 1.2, 1.3
- top.compute(price, ADD_ORDER_ID);
- if (!isModify)
- {
- // add AddOrder to snapshot
- executePreparedStatement(insertFromAllData, "insertFromAllData", side, orderId, timestamp);
- }
- if (top.size() > TOP_SIZE)
- { // 1.4
- Map.Entry<Double, Set<OrderInfo>> worst = top.pollFirstEntry(); // poll worse price
- rest.put(worst.getKey(), worst.getValue());
- if (!isModify)
- {
- // add to temp as AddOrder
- // Remove from snapshot (add row to snapshot as DeleteOrder)
- for (OrderInfo oi : worst.getValue())
- {
- executePreparedStatement(insertAddRowToTempSnapshot, "insertAddRowToTempSnapshot",
- side, oi.getOrderId(), oi.getTimestamp());
- executePreparedStatementSpecParams(insertDeleteRowWithSpecParams, "insertDeleteRowWithSpecParams",
- timestamp, dss, side, oi.getOrderId(), oi.getTimestamp());
- }
- }
- }
- }
- }
- public void deleteOrder(String orderId, double price, String timestamp, String dss) throws SQLException
- {
- deleteOrder(orderId, price, timestamp, dss, false);
- }
- private void deleteOrder(String orderId, double price, String timestamp, String dss, boolean isModify)
- throws SQLException
- {
- if (top.containsKey(price))
- {
- if (!isModify)
- {
- // delete from snapshot (Add delete row)
- executePreparedStatement(insertDeleteRowToSnapshot, "insertDeleteRowToSnapshot", side, orderId, timestamp);
- }
- Set<OrderInfo> orderIdsByPrice = top.get(price);
- orderIdsByPrice.remove(new OrderInfo(orderId)); // 3.1.1, 3.1.2
- if (orderIdsByPrice.isEmpty()) // add from Temp
- { // 3.1.1
- top.remove(price);
- if (!rest.isEmpty())
- {
- double bestFromTemp = rest.lastKey(); // get best price from temp
- top.put(bestFromTemp, rest.remove(bestFromTemp));
- if (!isModify)
- {
- for (OrderInfo oi : top.get(bestFromTemp))
- {
- // add to snapshot - data can be either in OrderBook, or in Source
- executePreparedStatementSpecParams(insertAddRowWithSpecParamsOrderBook,
- "insertAddRowWithSpecParamsOrderBook",timestamp,
- dss, side, oi.getOrderId(), oi.getTimestamp());
- executePreparedStatementSpecParams(insertAddRowWithSpecParamsSource,
- "insertAddRowWithSpecParamsSource", timestamp,
- dss, side, oi.getOrderId(), oi.getTimestamp());
- // remove from temp
- executePreparedStatement(deleteFromTemp, "deleteFromTemp", side, oi.getOrderId(), oi.getTimestamp());
- }
- }
- }
- }
- }
- else
- {
- rest.remove(price);
- if (!isModify)
- {
- // HARD delete from temp
- currentQueryParams.put("OrderId", String.valueOf(orderId));
- currentQueryParams.put("Side", side); // in Temp only Add Order, so we do
- // not need Message Timestamp
- executeParametrizedQuery(connection,
- format("DELETE FROM %s WHERE Side='#Side' AND Order_ID='#OrderId'",
- tempSnapshotTable), currentQueryParams);
- }
- }
- }
- private boolean containsInTop(String orderId, double oldPrice)
- {
- if (!top.containsKey(oldPrice))
- return false;
- for (OrderInfo oi : top.get(oldPrice))
- {
- if (oi.getOrderId().equals(orderId))
- return true;
- }
- return false;
- }
- public void modifyOrder(String orderId, String timestamp, double oldPrice, double newPrice, String newSize,
- String dss) throws SQLException
- {
- // write modify
- boolean wasTop = containsInTop(orderId, oldPrice);
- Integer numberOfOrdersWithOldPrice = top.containsKey(oldPrice) ? top.get(oldPrice).size() : null;
- double wasWorseTopPrice = top.isEmpty() ? 0 : top.firstKey();
- double wasBestRestPrice = rest.isEmpty() ? 0 : rest.firstKey();
- OrderInfo orderInfoBeforeChange = null;
- if (!wasTop)
- {
- if (rest.containsKey(oldPrice))
- {
- for (OrderInfo oi : rest.get(oldPrice))
- {
- if (oi.getOrderId().equals(orderId))
- orderInfoBeforeChange = oi;
- }
- }
- }
- deleteOrder(orderId, oldPrice, null, null, true);
- addOrder(orderId, timestamp, newPrice, null, true);
- boolean isTop = containsInTop(orderId, newPrice);
- if (wasTop)
- {
- if (isTop)
- {
- // Add modify record
- executePreparedStatement(insertFromAllData, "insertFromAllData", side, orderId, timestamp);
- if (numberOfOrdersWithOldPrice != null && numberOfOrdersWithOldPrice > 1 && top.get(newPrice).size() == 1) // new price push out worse price from top
- {
- Double restFirstKey = rest.isEmpty() ? null : rest.firstKey(); // best in rest
- if (restFirstKey != null)
- {
- for (OrderInfo oi : rest.get(restFirstKey))
- {
- // add delete to snap
- executePreparedStatementSpecParams(insertDeleteRowWithSpecParams,
- "insertDeleteRowWithSpecParams", timestamp, dss,
- side, oi.getOrderId(), oi.getTimestamp());
- // add to temp
- executePreparedStatement(insertAddRowToTempSnapshot, "insertAddRowToTempSnapshot",
- side, oi.getOrderId(), oi.getTimestamp());
- }
- }
- }
- if (newPrice == wasBestRestPrice)
- {
- for (OrderInfo oi : top.get(wasBestRestPrice))
- {
- if (!oi.getOrderId().equals(orderId))
- {
- // Add record
- executePreparedStatementSpecParams(insertFromAllDataWithSpecParamsSource,
- "insertFromAllDataWithSpecParamsSource", timestamp,
- dss, side, oi.getOrderId(), oi.getTimestamp());
- }
- }
- }
- }
- else // wasTop && !isTop
- {
- for (OrderInfo oi : rest.get(newPrice))
- {
- executePreparedStatement(insertDeleteRowFromModifyToSnap, "insertDeleteRowFromModifyToSnap",
- side, oi.getOrderId(), oi.getTimestamp());
- executePreparedStatement(insertAddRowToTempSnapshot, "insertAddRowToTempSnapshot",
- side, oi.getOrderId(), oi.getTimestamp());
- }
- double worseTopPrice = top.firstKey(); // worse in top
- if (wasWorseTopPrice != worseTopPrice)
- {
- for (OrderInfo oi : top.get(worseTopPrice))
- {
- // add to snap - data can be either in OrderBook, or in Source
- executePreparedStatementSpecParams(insertAddRowWithSpecParamsOrderBook,
- "insertAddRowWithSpecParamsOrderBook", timestamp, dss,
- side, oi.getOrderId(), oi.getTimestamp());
- executePreparedStatementSpecParams(insertAddRowWithSpecParamsSource,
- "insertAddRowWithSpecParamsSource", timestamp, dss, side,
- oi.getOrderId(), oi.getTimestamp());
- // remove hard from temp
- executePreparedStatement(deleteFromTemp, "deleteFromTemp",
- side, oi.getOrderId(), oi.getTimestamp());
- }
- }
- }
- }
- else // !wasTop
- {
- if (isTop)
- {
- // add to snap
- executePreparedStatement(insertAddRowToSnapshot, "insertAddRowToSnapshot", side, orderId, timestamp);
- // remove hard from temp
- executePreparedStatement(deleteFromTemp, "deleteFromTemp", side, orderInfoBeforeChange.getOrderId(),
- orderInfoBeforeChange.getTimestamp());
- if (top.get(newPrice).size() == 1) // some orders were pushed off from top by order with new price
- {
- Double restLastKey = rest.isEmpty() ? null : rest.lastKey(); // best in rest
- if (restLastKey != null)
- {
- for (OrderInfo oi : rest.get(restLastKey))
- {
- // add delete to snap
- executePreparedStatementSpecParams(insertDeleteRowWithSpecParams,
- "insertDeleteRowWithSpecParams", timestamp, dss,
- side, oi.getOrderId(), oi.getTimestamp());
- // add to temp
- executePreparedStatement(insertAddRowToTempSnapshot, "insertAddRowToTempSnapshot",
- side, oi.getOrderId(), oi.getTimestamp());
- }
- }
- }
- }
- else // !isTop
- {
- // remove hard from temp
- executePreparedStatement(deleteFromTemp, "deleteFromTemp", side, orderId, timestamp);
- // add to temp
- executePreparedStatement(insertAddRowToTempSnapshot, "insertAddRowToTempSnapshot", side, orderId, timestamp);
- }
- }
- }
- public void clear() throws SQLException
- {
- top.values().stream()
- .flatMap(Collection::stream)
- .forEach(orderInfo -> {
- // delete from snapshot (add row as DeleteOder to snap)
- try
- {
- executePreparedStatement(insertDeleteRowToSnapshot, "insertDeleteRowToSnapshot", side,
- orderInfo.getOrderId(), orderInfo.getTimestamp());
- }
- catch (SQLException e)
- {
- throw new ResultException("Error while inserting DeleteOrder row.", e);
- }
- });
- top.clear();
- for (Iterator<Double> it = rest.descendingKeySet().iterator(); it.hasNext() && top.size() < TOP_SIZE; )
- {
- double price = it.next();
- top.put(price, rest.get(price));
- it.remove();
- for (OrderInfo oi : top.get(price))
- {
- // add to snapshot
- executePreparedStatement(insertFromAllData, "insertFromAllData", side, oi.getOrderId(), oi.getTimestamp());
- // remove from temp
- executePreparedStatement(deleteFromTemp, "deleteFromTemp", side, oi.getOrderId(), oi.getTimestamp());
- }
- }
- }
- public void nextSnapshot(String newSnapshotTable, String newTempSnapshotTable) throws SQLException
- {
- insertFromAllData = connection.prepareStatement(format(insertFromAllDataQuery, newTempSnapshotTable, tempSnapshotTable));
- rest.values().stream()
- .flatMap(Collection::stream)
- .forEach(orderData -> {
- // select last transaction with orderId and timestamp and insert into new table (temp
- // snapshot)
- try
- {
- executePreparedStatement(insertFromAllData, "insertFromAllData",
- side, orderData.getOrderId(), orderData.getTimestamp());
- }
- catch (SQLException e)
- {
- throw new ResultException(format("Error while transferring data from %s to %s",
- tempSnapshotTable, newTempSnapshotTable), e);
- }
- });
- executeBatch(insertFromAllData, "insertFromAllData");
- snapshotTable = newSnapshotTable;
- tempSnapshotTable = newTempSnapshotTable;
- }
- }
- private void executePreparedStatement(PreparedStatement ps, String psName, String side, String orderId, String timestamp)
- throws SQLException
- {
- ps.setString(1, side);
- ps.setString(2, orderId);
- ps.setString(3, timestamp);
- ps.addBatch();
- batchSize.compute(ps, (k, v) -> v == null ? 0 : v + 1);
- if (batchSize.get(ps) == MAX_BATCH_SIZE)
- {
- executeBatch(ps, psName);
- }
- }
- private void executePreparedStatementSpecParams(PreparedStatement ps, String psName, String timestampToInsert,
- String dss, String side, String orderId, String timestampToSelect) throws SQLException
- {
- ps.setString(1, timestampToInsert);
- ps.setString(2, dss);
- ps.setString(3, side);
- ps.setString(4, orderId);
- ps.setString(5, timestampToSelect);
- ps.addBatch();
- batchSize.compute(ps, (k, v) -> v == null ? 0 : v + 1);
- if (batchSize.get(ps) == MAX_BATCH_SIZE)
- {
- executeBatch(ps, psName);
- }
- }
- private void executeBatch(PreparedStatement ps, String psName) throws SQLException
- {
- long start = System.currentTimeMillis();
- ps.executeBatch();
- long end = System.currentTimeMillis();
- int psBatchSize = batchSize.get(ps) == null ? 0 : batchSize.get(ps);
- logger.debug("Executing batch {} (size = {}) took {} ms", psName, psBatchSize,
- (end - start));
- batchSize.put(ps, 0);
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement