Advertisement
ibragimova_mariam

Untitled

Nov 3rd, 2020
40
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 57.80 KB | None | 0 0
  1. public class DelayedDataSnapshot extends Action
  2. {
  3. private static final String QUERIES_PATH = getInstance().getConfigFiles().getCfgDir() +
  4. "delayedOrderBookSnapshot/sql/";
  5. private static final String CREATE_RESULT_TABLE = "createResultTable.sql";
  6.  
  7. private static final String SELECT_ALL_MESSAGES_BY_TIME = "SELECT * FROM %s WHERE " +
  8. "'#LeftTimestamp'<MessageTimestamp AND MessageTimestamp<='#RightTimestamp' ORDER BY MessageTimestamp";
  9. private static final int MAX_BATCH_SIZE = 100;
  10.  
  11. private static final String MESSAGE_TYPE = "Message_Type";
  12. private static final String SIDE = "Side";
  13. private static final String ORDER_ID = "Order_ID";
  14. private static final String PRICE = "Price";
  15. private static final String MESSAGE_TIMESTAMP = "MessageTimestamp";
  16.  
  17. private static final String AGGREGATED_DATA_TABLE = "AggregatedData";
  18. private static final String PREVIOUS_SNAPSHOT_TABLE = "PreviousSnapshot", CURRENT_SNAPSHOT_TABLE = "CurrentSnapshot";
  19. private static final String TEMP_PREVIOUS_SNAPSHOT_TABLE = "TempPreviousSnapshot", TEMP_CURRENT_SNAPSHOT_TABLE =
  20. "TempCurrentSnapshot";
  21.  
  22. private static String insertFromAllDataQuery;
  23.  
  24. private GdlGlobalContext globalContext;
  25. private String addOrderTable, modifyOrderTable, deleteOrderTable, orderBookClearTable, orderBookSnapshotTable,
  26. orderBookSnapshotTempTable;
  27. private String cthEndTime;
  28. private String dbConnectionName;
  29. private SnapshotHandler snapshotHandler;
  30. private String previousSnapshotTime, currentSnapshotTime, orderBookSnapshotTime;
  31. private Map<String, String> currentQueryParams;
  32. private PreparedStatement insertFromAllData, insertAddRowToSnapshot, insertAddRowToTempSnapshot,
  33. insertDeleteRowToSnapshot, insertDeleteRowWithSpecParams, insertFromAllDataWithSpecParamsSource,
  34. insertDeleteRowFromModifyToSnap, deleteFromTemp, insertAddRowWithSpecParamsSource,
  35. insertAddRowWithSpecParamsOrderBook, worseBuyPriceInTop5OneInstrument, worseSellPriceInTop5OneInstrument,
  36. worseBuyPriceInTop5AllInstrument, worseSellPriceInTop5AllInstrument,
  37. bestTop5OneInstrument, bestRest5OneInstrument, bestTop5AllInstrument, bestRest5AllInstrument;
  38. private Map<PreparedStatement, Integer> batchSize;
  39.  
  40. private String finalPreviousSnapshotTable, finalCurrentSnapshotTable;
  41.  
  42. @Override
  43. protected Result run(StepContext stepContext, MatrixContext matrixContext, GlobalContext globalContext)
  44. throws ResultException
  45. {
  46. this.globalContext = (GdlGlobalContext) globalContext;
  47. initParameters();
  48.  
  49. batchSize = new HashMap<>();
  50. logger.debug("CthEndTime: {} ({})", cthEndTime, GdlTimestampUtils.nanosStringToDateWithFractions(cthEndTime,
  51. "6"));
  52. currentSnapshotTime = GdlTimestampUtils.timestampNanosToRoundedTimestampMinusMinutesStr(cthEndTime, 0);
  53. previousSnapshotTime = GdlTimestampUtils.timestampNanosToRoundedTimestampMinusMinutesStr(currentSnapshotTime, 1);
  54. orderBookSnapshotTime =
  55. GdlTimestampUtils.timestampNanosToRoundedTimestampMinusMinutesStr(previousSnapshotTime, 1);
  56. logger.debug("CurrentSnapshotTime: {} ({})", currentSnapshotTime,
  57. GdlTimestampUtils.nanosStringToDateWithFractions(currentSnapshotTime, "6"));
  58. logger.debug("PreviousSnapshotTime: {} ({})", previousSnapshotTime,
  59. GdlTimestampUtils.nanosStringToDateWithFractions(previousSnapshotTime, "6"));
  60. logger.debug("OrderBookSnapshotTime: {} ({})", orderBookSnapshotTime,
  61. GdlTimestampUtils.nanosStringToDateWithFractions(orderBookSnapshotTime, "6"));
  62.  
  63. try
  64. {
  65. executeQueries();
  66. }
  67. catch (SQLException e)
  68. {
  69. String errMessage = "Error while committing changes";
  70. this.logger.error(errMessage, e);
  71. throw new ResultException(errMessage, e);
  72. }
  73.  
  74. return DefaultResult.passed("Two snapshots were made successfully, result in two tables - 'FinalPreviousSnapshotTable' " +
  75. "and 'FinalCurrentSnapshotTable'.");
  76. }
  77.  
  78. private void initParameters()
  79. {
  80. this.getLogger().debug("Initializing special action parameters");
  81. InputParamsHandler handler = new InputParamsHandler(this.inputParams);
  82.  
  83. this.addOrderTable = handler.getRequiredString("AddOrderTable");
  84. this.modifyOrderTable = handler.getRequiredString("ModifyOrderTable");
  85. this.deleteOrderTable = handler.getRequiredString("DeleteOrderTable");
  86. this.orderBookClearTable = handler.getRequiredString("OrderBookClearTable");
  87. this.orderBookSnapshotTable = handler.getRequiredString("OrderBookSnapshotTable");
  88.  
  89. handler.getRequiredString("Instrument");
  90. handler.getRequiredString("SourceVenue");
  91. this.cthEndTime = handler.getRequiredString("CTHEndTime");
  92.  
  93. this.dbConnectionName = handler.getRequiredString("DbConnection");
  94.  
  95. this.finalPreviousSnapshotTable = handler.getRequiredString("FinalPreviousSnapshotTable");
  96. this.finalCurrentSnapshotTable = handler.getRequiredString("FinalCurrentSnapshotTable");
  97.  
  98. handler.check();
  99.  
  100. this.orderBookSnapshotTempTable = orderBookSnapshotTable + "TempForDelayed";
  101. insertFromAllDataQuery = loadQuery("insertFromAllData.sql");
  102. }
  103.  
  104. private void createIndexes(Connection connection, String table)
  105. {
  106. executeQuery(connection, format("CREATE INDEX idx_orderid ON %s (Order_ID)", table));
  107. executeQuery(connection, format("CREATE INDEX idx_message_timestamp ON %s (MessageTimestamp)",
  108. table));
  109. }
  110.  
  111. private void executeQueries() throws SQLException
  112. {
  113. Connection connection = this.globalContext.getDbConnectionOrResultEx(dbConnectionName);
  114. connection.setAutoCommit(false);
  115. currentQueryParams = new HashMap<>(inputParams);
  116.  
  117. // Create tables
  118. executeQuery(connection, format(loadQuery(CREATE_RESULT_TABLE), AGGREGATED_DATA_TABLE)); // table for add, modify,
  119. // delete and clear table data
  120. createIndexes(connection, AGGREGATED_DATA_TABLE);
  121. executeQuery(connection, format(loadQuery(CREATE_RESULT_TABLE), TEMP_PREVIOUS_SNAPSHOT_TABLE));
  122. createIndexes(connection, TEMP_PREVIOUS_SNAPSHOT_TABLE);
  123. executeQuery(connection, format(loadQuery(CREATE_RESULT_TABLE), PREVIOUS_SNAPSHOT_TABLE));
  124. createIndexes(connection, PREVIOUS_SNAPSHOT_TABLE);
  125. executeQuery(connection, format(loadQuery(CREATE_RESULT_TABLE), TEMP_CURRENT_SNAPSHOT_TABLE));
  126. createIndexes(connection, TEMP_CURRENT_SNAPSHOT_TABLE);
  127. executeQuery(connection, format(loadQuery(CREATE_RESULT_TABLE), CURRENT_SNAPSHOT_TABLE));
  128. createIndexes(connection, CURRENT_SNAPSHOT_TABLE);
  129.  
  130. executeQuery(connection, format(loadQuery(CREATE_RESULT_TABLE), orderBookSnapshotTempTable));
  131. createIndexes(connection, orderBookSnapshotTempTable);
  132.  
  133. executeQuery(connection, format(loadQuery(CREATE_RESULT_TABLE), finalPreviousSnapshotTable));
  134. createIndexes(connection, finalPreviousSnapshotTable);
  135. executeQuery(connection, format(loadQuery(CREATE_RESULT_TABLE), finalCurrentSnapshotTable));
  136. createIndexes(connection, finalCurrentSnapshotTable);
  137.  
  138. snapshotHandler = new SnapshotHandler(connection, finalPreviousSnapshotTable, TEMP_PREVIOUS_SNAPSHOT_TABLE);
  139.  
  140. String instrument = inputParams.get("Instrument");
  141. if (instrument.trim().equalsIgnoreCase("all")) // we need to execute action for all instruments from input
  142. // tables.
  143. {
  144. worseBuyPriceInTop5OneInstrument = connection.prepareStatement(format("SELECT MIN(Price) AS Price FROM " +
  145. "(SELECT DISTINCT Price FROM %s WHERE Message_Type='70' AND Side='66' " +
  146. "AND Instrument_ID=? ORDER BY Price DESC LIMIT 5) AS distPrices", orderBookSnapshotTempTable));
  147. worseSellPriceInTop5OneInstrument = connection.prepareStatement(format("SELECT MAX(Price) AS Price FROM " +
  148. "(SELECT DISTINCT Price FROM %s WHERE Message_Type='70' AND Side='83' " +
  149. "AND Instrument_ID=? ORDER BY Price LIMIT 5) as distPrices", orderBookSnapshotTempTable));
  150.  
  151. cleanAllTempTables(connection);
  152. // Fill allTablesData table (all instruments)
  153. executeParametrizedQuery(connection, format(loadQuery("insertFromAddOrderAllInstruments.sql"),
  154. AGGREGATED_DATA_TABLE, addOrderTable), currentQueryParams);
  155. connection.commit();
  156. executeParametrizedQuery(connection, format(loadQuery("insertFromModifyOrderAllInstruments.sql"),
  157. AGGREGATED_DATA_TABLE, modifyOrderTable), currentQueryParams);
  158. connection.commit();
  159. executeParametrizedQuery(connection, format(loadQuery("insertFromOrderDeleteAllInstruments.sql"),
  160. AGGREGATED_DATA_TABLE, deleteOrderTable), currentQueryParams);
  161. connection.commit();
  162. executeParametrizedQuery(connection, format(loadQuery("insertFromOrderBookClearAllInstruments.sql"),
  163. AGGREGATED_DATA_TABLE, orderBookClearTable), currentQueryParams);
  164. connection.commit();
  165.  
  166. // Fill temp OrderBookSnapshot
  167. currentQueryParams.put("Timestamp", orderBookSnapshotTime);
  168. executeParametrizedQuery(connection, format(loadQuery(
  169. "insertFromOrderBookSnapToTempAllInstruments" +
  170. ".sql"), orderBookSnapshotTempTable, orderBookSnapshotTable), currentQueryParams);
  171. connection.commit();
  172.  
  173. insertFromAllData = connection.prepareStatement(format(loadQuery("insertFromAllData.sql"),
  174. finalPreviousSnapshotTable, orderBookSnapshotTempTable));
  175. insertAddRowToTempSnapshot = connection.prepareStatement(format(loadQuery("insertAddRowToSnapshot.sql"),
  176. TEMP_PREVIOUS_SNAPSHOT_TABLE, orderBookSnapshotTempTable));
  177.  
  178. try(DbDataReader instrumentReader = getTableReader(connection, format("SELECT DISTINCT Instrument_ID FROM %s",
  179. orderBookSnapshotTempTable), currentQueryParams)) // Add to PrevSnapshot
  180. {
  181. instrumentReader.start();
  182. while (instrumentReader.hasMoreData())
  183. {
  184. TableRow<String, String> tableRow = instrumentReader.readRow();
  185. String currentInstrument = tableRow.getValue("Instrument_ID");
  186.  
  187. logger.debug("Current instrument (read from OrderBookSnapshot table): {}.", currentInstrument);
  188. currentQueryParams.put("Instrument", currentInstrument);
  189.  
  190. snapshotHandler.setOrderDataSourceTable(orderBookSnapshotTempTable);
  191.  
  192. bestTop5OneInstrument = connection.prepareStatement(format(
  193. "SELECT * from %s WHERE Message_Type='70' AND Instrument_ID=? " +
  194. "AND ((Side='66' AND Price>=?) OR (Side='83' AND Price<=?) " +
  195. "OR (Order_Type='1' AND Price='0'))", orderBookSnapshotTempTable));
  196. bestRest5OneInstrument = connection.prepareStatement(format(
  197. "SELECT * from %s WHERE Message_Type='70' AND Instrument_ID=? " +
  198. "AND NOT((Side='66' AND Price>=?) OR (Side='83' AND Price<=?) " +
  199. "OR (Order_Type='1' AND Price='0'))", orderBookSnapshotTempTable));
  200.  
  201. bestTop5OneInstrument.setString(1, currentInstrument);
  202. bestRest5OneInstrument.setString(1, currentInstrument);
  203.  
  204. String minBuyTop5Price = getWorsePriceInTop5( true, true, currentInstrument);
  205. logger.trace("Min Buy Top-5 Price in OrderBookSnapshot table: {}", minBuyTop5Price);
  206. bestTop5OneInstrument.setString(2, minBuyTop5Price);
  207. bestRest5OneInstrument.setString(2, minBuyTop5Price);
  208.  
  209. String maxSellTop5Price = getWorsePriceInTop5(false, true, currentInstrument);
  210. logger.trace("Max Sell Top-5 Price in OrderBookSnapshot table: {}", maxSellTop5Price);
  211. bestTop5OneInstrument.setString(3, maxSellTop5Price);
  212. bestRest5OneInstrument.setString(3, maxSellTop5Price);
  213.  
  214. try (DbDataReader reader = new DbDataReader(bestTop5OneInstrument)) // Add to PrevSnapshot
  215. {
  216. reader.start();
  217. while (reader.hasMoreData())
  218. {
  219. addRecord(reader.readRow(), currentInstrument);
  220. }
  221. }
  222. try (DbDataReader reader = new DbDataReader(bestRest5OneInstrument)) // Add to TempPrevSnapshot
  223. {
  224. reader.start();
  225. while (reader.hasMoreData())
  226. {
  227. addRecord(reader.readRow(), currentInstrument);
  228. }
  229. }
  230. // Stop reading data from OrderBookSnapshot
  231. }
  232. }
  233. catch (IOException e)
  234. {
  235. String errMessage = "Error while select rows from OrderBookSnapshotTempTable";
  236. this.logger.error(errMessage, e);
  237. throw new ResultException(errMessage, e);
  238. }
  239. executeBatch(insertFromAllData, "insertFromAllData");
  240. executeBatch(insertAddRowToTempSnapshot, "insertAddRowToTempSnapshot");
  241. Utils.closeResource(insertFromAllData);
  242. Utils.closeResource(insertAddRowToTempSnapshot);
  243. Utils.closeResource(worseBuyPriceInTop5OneInstrument);
  244. Utils.closeResource(worseSellPriceInTop5OneInstrument);
  245. Utils.closeResource(bestTop5OneInstrument);
  246. Utils.closeResource(bestRest5OneInstrument);
  247. connection.commit();
  248.  
  249. snapshotHandler.setOrderDataSourceTable(AGGREGATED_DATA_TABLE);
  250.  
  251. insertFromAllData = connection.prepareStatement(format(loadQuery("insertFromAllData.sql"),
  252. finalPreviousSnapshotTable, AGGREGATED_DATA_TABLE));
  253. insertAddRowToSnapshot = connection.prepareStatement(format(loadQuery("insertAddRowToSnapshot.sql"),
  254. finalPreviousSnapshotTable, AGGREGATED_DATA_TABLE));
  255. insertAddRowToTempSnapshot = connection.prepareStatement(format(loadQuery("insertAddRowToSnapshot.sql"),
  256. TEMP_PREVIOUS_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  257. insertDeleteRowToSnapshot = connection.prepareStatement(format(loadQuery("insertDeleteRowToSnapshot.sql"),
  258. finalPreviousSnapshotTable, AGGREGATED_DATA_TABLE));
  259. insertDeleteRowWithSpecParams = connection.prepareStatement(format(loadQuery(
  260. "insertDeleteRowWithSpecificParams.sql"), finalPreviousSnapshotTable, AGGREGATED_DATA_TABLE));
  261. insertFromAllDataWithSpecParamsSource = connection.prepareStatement(format(loadQuery(
  262. "insertFromAllDataWithSpecificParams.sql"), finalPreviousSnapshotTable, AGGREGATED_DATA_TABLE));
  263. insertDeleteRowFromModifyToSnap = connection.prepareStatement(format(loadQuery(
  264. "insertDeleteRowFromModifyToSnapshot.sql"), finalPreviousSnapshotTable, AGGREGATED_DATA_TABLE));
  265. deleteFromTemp = connection.prepareStatement(format("DELETE FROM %s WHERE Side=? AND Order_ID=? AND MessageTimestamp=?",
  266. TEMP_PREVIOUS_SNAPSHOT_TABLE));
  267. insertAddRowWithSpecParamsSource = connection.prepareStatement(format(loadQuery(
  268. "insertAddRowWithSpecificParams.sql"), finalPreviousSnapshotTable, AGGREGATED_DATA_TABLE));
  269. insertAddRowWithSpecParamsOrderBook = connection.prepareStatement(format(loadQuery(
  270. "insertAddRowWithSpecificParams.sql"), finalPreviousSnapshotTable, orderBookSnapshotTempTable));
  271.  
  272. currentQueryParams.put("LeftTimestamp", orderBookSnapshotTime);
  273. currentQueryParams.put("RightTimestamp", previousSnapshotTime);
  274. createSnapshot(connection, String.format(SELECT_ALL_MESSAGES_BY_TIME,
  275. AGGREGATED_DATA_TABLE)); // first snapshot
  276.  
  277. executeBatch(insertFromAllData, "insertFromAllData");
  278. executeBatch(insertAddRowToSnapshot, "insertAddRowToSnapshot");
  279. executeBatch(insertAddRowToTempSnapshot, "insertAddRowToTempSnapshot");
  280. executeBatch(insertDeleteRowToSnapshot, "insertDeleteRowToSnapshot");
  281. executeBatch(insertDeleteRowWithSpecParams, "insertDeleteRowWithSpecParams");
  282. executeBatch(insertFromAllDataWithSpecParamsSource, "insertFromAllDataWithSpecParamsSource");
  283. executeBatch(insertDeleteRowFromModifyToSnap, "insertDeleteRowFromModifyToSnap");
  284. executeBatch(deleteFromTemp, "deleteFromTemp");
  285. executeBatch(insertAddRowWithSpecParamsOrderBook, "insertAddRowWithSpecParamsOrderBook");
  286. executeBatch(insertAddRowWithSpecParamsSource, "insertAddRowWithSpecParamsSource");
  287.  
  288. Utils.closeResource(insertFromAllData);
  289. Utils.closeResource(insertAddRowToSnapshot);
  290. Utils.closeResource(insertAddRowToTempSnapshot);
  291. Utils.closeResource(insertDeleteRowToSnapshot);
  292. Utils.closeResource(insertDeleteRowWithSpecParams);
  293. Utils.closeResource(insertFromAllDataWithSpecParamsSource);
  294. Utils.closeResource(insertDeleteRowFromModifyToSnap);
  295. Utils.closeResource(deleteFromTemp);
  296. Utils.closeResource(insertAddRowWithSpecParamsOrderBook);
  297. Utils.closeResource(insertAddRowWithSpecParamsSource);
  298. connection.commit();
  299.  
  300. // transfer data from prev snapshot to current
  301. snapshotHandler.nextSnapshot(finalCurrentSnapshotTable, TEMP_CURRENT_SNAPSHOT_TABLE);
  302.  
  303. insertFromAllData = connection.prepareStatement(format(loadQuery("insertFromAllData.sql"),
  304. finalCurrentSnapshotTable, AGGREGATED_DATA_TABLE));
  305. insertAddRowToSnapshot = connection.prepareStatement(format(loadQuery("insertAddRowToSnapshot.sql"),
  306. finalCurrentSnapshotTable, AGGREGATED_DATA_TABLE));
  307. insertAddRowToTempSnapshot = connection.prepareStatement(format(loadQuery("insertAddRowToSnapshot.sql"),
  308. TEMP_CURRENT_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  309. insertDeleteRowToSnapshot = connection.prepareStatement(format(loadQuery("insertDeleteRowToSnapshot.sql"),
  310. finalCurrentSnapshotTable, AGGREGATED_DATA_TABLE));
  311. insertDeleteRowWithSpecParams = connection.prepareStatement(format(loadQuery(
  312. "insertDeleteRowWithSpecificParams.sql"), finalCurrentSnapshotTable, AGGREGATED_DATA_TABLE));
  313. insertFromAllDataWithSpecParamsSource = connection.prepareStatement(format(loadQuery(
  314. "insertFromAllDataWithSpecificParams.sql"), finalCurrentSnapshotTable, AGGREGATED_DATA_TABLE));
  315. insertDeleteRowFromModifyToSnap = connection.prepareStatement(format(loadQuery(
  316. "insertDeleteRowFromModifyToSnapshot.sql"), finalCurrentSnapshotTable, AGGREGATED_DATA_TABLE));
  317. deleteFromTemp = connection.prepareStatement(format("DELETE FROM %s WHERE Side=? AND Order_ID=? AND MessageTimestamp=?", TEMP_CURRENT_SNAPSHOT_TABLE));
  318. insertAddRowWithSpecParamsSource = connection.prepareStatement(format(loadQuery(
  319. "insertAddRowWithSpecificParams.sql"), finalCurrentSnapshotTable, AGGREGATED_DATA_TABLE));
  320. insertAddRowWithSpecParamsOrderBook = connection.prepareStatement(format(loadQuery(
  321. "insertAddRowWithSpecificParams.sql"), finalCurrentSnapshotTable, orderBookSnapshotTempTable));
  322.  
  323. currentQueryParams.put("LeftTimestamp", previousSnapshotTime);
  324. currentQueryParams.put("RightTimestamp", currentSnapshotTime);
  325. createSnapshot(connection, String.format(SELECT_ALL_MESSAGES_BY_TIME,
  326. AGGREGATED_DATA_TABLE)); // second snapshot
  327.  
  328. executeBatch(insertFromAllData, "insertFromAllData");
  329. executeBatch(insertAddRowToSnapshot, "insertAddRowToSnapshot");
  330. executeBatch(insertAddRowToTempSnapshot, "insertAddRowToTempSnapshot");
  331. executeBatch(insertDeleteRowToSnapshot, "insertDeleteRowToSnapshot");
  332. executeBatch(insertDeleteRowWithSpecParams, "insertDeleteRowWithSpecParams");
  333. executeBatch(insertFromAllDataWithSpecParamsSource, "insertFromAllDataWithSpecParamsSource");
  334. executeBatch(insertDeleteRowFromModifyToSnap, "insertDeleteRowFromModifyToSnap");
  335. executeBatch(deleteFromTemp, "deleteFromTemp");
  336. executeBatch(insertAddRowWithSpecParamsSource, "insertAddRowWithSpecParamsSource");
  337. executeBatch(insertAddRowWithSpecParamsOrderBook, "insertAddRowWithSpecParamsOrderBook");
  338.  
  339. Utils.closeResource(insertFromAllData);
  340. Utils.closeResource(insertAddRowToSnapshot);
  341. Utils.closeResource(insertAddRowToTempSnapshot);
  342. Utils.closeResource(insertDeleteRowToSnapshot);
  343. Utils.closeResource(insertDeleteRowWithSpecParams);
  344. Utils.closeResource(insertFromAllDataWithSpecParamsSource);
  345. Utils.closeResource(insertDeleteRowFromModifyToSnap);
  346. Utils.closeResource(deleteFromTemp);
  347. Utils.closeResource(insertAddRowWithSpecParamsSource);
  348. Utils.closeResource(insertAddRowWithSpecParamsOrderBook);
  349. connection.commit();
  350.  
  351. //transfer result to final tables
  352. /*executeParametrizedQuery(connection, format(loadQuery("insertToFinalSnapshot.sql"),
  353. finalPreviousSnapshotTable, PREVIOUS_SNAPSHOT_TABLE), currentQueryParams);
  354. executeParametrizedQuery(connection, format(loadQuery("insertToFinalSnapshot.sql"),
  355. finalCurrentSnapshotTable, CURRENT_SNAPSHOT_TABLE), currentQueryParams);*/
  356. }
  357. else
  358. {
  359. worseBuyPriceInTop5AllInstrument = connection.prepareStatement(format("SELECT MIN(Price) AS Price FROM " +
  360. "(SELECT DISTINCT Price FROM %s WHERE Message_Type='70' AND Side='66' ORDER BY Price DESC " +
  361. "LIMIT 5) as distPrices", orderBookSnapshotTempTable));
  362. worseSellPriceInTop5AllInstrument = connection.prepareStatement(format("SELECT MAX(Price) AS Price FROM " +
  363. "(SELECT DISTINCT Price FROM %s WHERE Message_Type='70' AND Side='83' ORDER BY Price " +
  364. "LIMIT 5) as distPrices", orderBookSnapshotTempTable));
  365.  
  366. String[] allInstruments = instrument.split(",");
  367. logger.debug("There are {} distinct instruments in input parameter 'Instrument'.", allInstruments.length);
  368.  
  369. for (String currentInstrument : allInstruments)
  370. {
  371. cleanAllTempTables(connection);
  372.  
  373.  
  374. currentInstrument = currentInstrument.trim();
  375. logger.debug("Current instrument: {}.", currentInstrument);
  376. currentQueryParams.put("Instrument", currentInstrument);
  377.  
  378. // Fill temp OrderBookSnapshot
  379. currentQueryParams.put("Timestamp", orderBookSnapshotTime);
  380. executeParametrizedQuery(connection, format(loadQuery("insertFromOrderBookSnapToTemp.sql"),
  381. orderBookSnapshotTempTable, orderBookSnapshotTable), currentQueryParams);
  382.  
  383. bestTop5AllInstrument = connection.prepareStatement(format("SELECT * from %s WHERE " +
  384. "Message_Type='70' AND ((Side='66' AND Price>=?) OR (Side='83' AND Price<=?) " +
  385. "OR (Order_Type='1' AND Price='0'))", orderBookSnapshotTempTable));
  386. bestRest5AllInstrument = connection.prepareStatement(format("SELECT * from %s WHERE " +
  387. "Message_Type='70' AND NOT((Side='66' AND Price>=?) OR (Side='83' AND Price<=?) " +
  388. "OR (Order_Type='1' AND Price='0'))", orderBookSnapshotTempTable));
  389. insertFromAllData = connection.prepareStatement(format(loadQuery("insertFromAllData.sql"),
  390. finalPreviousSnapshotTable, orderBookSnapshotTempTable));
  391. insertAddRowToTempSnapshot = connection.prepareStatement(format(loadQuery("insertAddRowToSnapshot.sql"),
  392. TEMP_PREVIOUS_SNAPSHOT_TABLE, orderBookSnapshotTempTable));
  393.  
  394. // Fill allTablesData table
  395. executeParametrizedQuery(connection, format(loadQuery("insertFromAddOrder.sql"), AGGREGATED_DATA_TABLE,
  396. addOrderTable), currentQueryParams);
  397. executeParametrizedQuery(connection, format(loadQuery("insertFromModifyOrder.sql"), AGGREGATED_DATA_TABLE,
  398. modifyOrderTable), currentQueryParams);
  399. executeParametrizedQuery(connection, format(loadQuery("insertFromOrderDelete.sql"), AGGREGATED_DATA_TABLE,
  400. deleteOrderTable), currentQueryParams);
  401. executeParametrizedQuery(connection, format(loadQuery("insertFromOrderBookClear.sql"), AGGREGATED_DATA_TABLE,
  402. orderBookClearTable), currentQueryParams);
  403.  
  404. snapshotHandler.setOrderDataSourceTable(orderBookSnapshotTempTable);
  405. try
  406. {
  407. String minBuyTop5Price = getWorsePriceInTop5( true, false, null);
  408. logger.trace("Min Buy Top-5 Price in OrderBookSnapshot table for instrument {}: {}",
  409. currentInstrument, minBuyTop5Price);
  410. bestTop5AllInstrument.setString(1, minBuyTop5Price);
  411. bestRest5AllInstrument.setString(1, minBuyTop5Price);
  412.  
  413. String maxSellTop5Price = getWorsePriceInTop5( false, false, null);
  414. logger.trace("Max Sell Top-5 Price in OrderBookSnapshot table for instrument {}: {}",
  415. currentInstrument, maxSellTop5Price);
  416. bestTop5AllInstrument.setString(2, maxSellTop5Price);
  417. bestRest5AllInstrument.setString(2, maxSellTop5Price);
  418.  
  419. try (DbDataReader reader = new DbDataReader(bestTop5AllInstrument)) // Add to PrevSnapshot
  420. {
  421. reader.start();
  422. while (reader.hasMoreData())
  423. {
  424. addRecord(reader.readRow(), currentInstrument);
  425. }
  426. }
  427. try (DbDataReader reader = new DbDataReader(bestRest5AllInstrument)) // Add to TempPrevSnapshot
  428. {
  429. reader.start();
  430. while (reader.hasMoreData())
  431. {
  432. addRecord(reader.readRow(), currentInstrument);
  433. }
  434. }
  435. }
  436. catch (IOException e)
  437. {
  438. String errMessage = "Error while select rows from OrderBookSnapshotTempTable";
  439. this.logger.error(errMessage, e);
  440. throw new ResultException(errMessage, e);
  441. }
  442.  
  443. executeBatch(insertFromAllData, "insertFromAllData");
  444. executeBatch(insertAddRowToTempSnapshot, "insertAddRowToTempSnapshot");
  445. Utils.closeResource(insertFromAllData);
  446. Utils.closeResource(insertAddRowToTempSnapshot);
  447. Utils.closeResource(bestTop5AllInstrument);
  448. Utils.closeResource(bestRest5AllInstrument);
  449.  
  450. snapshotHandler.setOrderDataSourceTable(AGGREGATED_DATA_TABLE);
  451.  
  452. insertFromAllData = connection.prepareStatement(format(loadQuery("insertFromAllData.sql"),
  453. PREVIOUS_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  454. insertAddRowToSnapshot = connection.prepareStatement(format(loadQuery("insertAddRowToSnapshot.sql"),
  455. PREVIOUS_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  456. insertAddRowToTempSnapshot = connection.prepareStatement(format(loadQuery("insertAddRowToSnapshot.sql"),
  457. TEMP_PREVIOUS_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  458. insertDeleteRowToSnapshot = connection.prepareStatement(format(loadQuery("insertDeleteRowToSnapshot.sql"),
  459. PREVIOUS_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  460. insertDeleteRowWithSpecParams = connection.prepareStatement(format(loadQuery(
  461. "insertDeleteRowWithSpecificParams.sql"), PREVIOUS_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  462. insertFromAllDataWithSpecParamsSource = connection.prepareStatement(format(loadQuery(
  463. "insertFromAllDataWithSpecificParams.sql"), PREVIOUS_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  464. insertDeleteRowFromModifyToSnap = connection.prepareStatement(format(loadQuery(
  465. "insertDeleteRowFromModifyToSnapshot.sql"), PREVIOUS_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  466. deleteFromTemp = connection.prepareStatement(format("DELETE FROM %s WHERE Side=? AND Order_ID=? AND " +
  467. "MessageTimestamp=?", TEMP_PREVIOUS_SNAPSHOT_TABLE));
  468. insertAddRowWithSpecParamsSource = connection.prepareStatement(format(loadQuery(
  469. "insertAddRowWithSpecificParams.sql"), PREVIOUS_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  470. insertAddRowWithSpecParamsOrderBook = connection.prepareStatement(format(loadQuery(
  471. "insertAddRowWithSpecificParams.sql"), PREVIOUS_SNAPSHOT_TABLE, orderBookSnapshotTempTable));
  472.  
  473. currentQueryParams.put("LeftTimestamp", orderBookSnapshotTime);
  474. currentQueryParams.put("RightTimestamp", previousSnapshotTime);
  475. createSnapshot(connection, String.format(SELECT_ALL_MESSAGES_BY_TIME,
  476. AGGREGATED_DATA_TABLE)); // first snapshot
  477.  
  478. executeBatch(insertFromAllData, "insertFromAllData");
  479. executeBatch(insertAddRowToSnapshot, "insertAddRowToSnapshot");
  480. executeBatch(insertAddRowToTempSnapshot, "insertAddRowToTempSnapshot");
  481. executeBatch(insertDeleteRowToSnapshot, "insertDeleteRowToSnapshot");
  482. executeBatch(insertDeleteRowWithSpecParams, "insertDeleteRowWithSpecParams");
  483. executeBatch(insertFromAllDataWithSpecParamsSource, "insertFromAllDataWithSpecParamsSource");
  484. executeBatch(insertDeleteRowFromModifyToSnap, "insertDeleteRowFromModifyToSnap");
  485. executeBatch(deleteFromTemp, "deleteFromTemp");
  486. executeBatch(insertAddRowWithSpecParamsOrderBook, "insertAddRowWithSpecParamsOrderBook");
  487. executeBatch(insertAddRowWithSpecParamsSource, "insertAddRowWithSpecParamsSource");
  488.  
  489. Utils.closeResource(insertFromAllData);
  490. Utils.closeResource(insertAddRowToSnapshot);
  491. Utils.closeResource(insertAddRowToTempSnapshot);
  492. Utils.closeResource(insertDeleteRowToSnapshot);
  493. Utils.closeResource(insertDeleteRowWithSpecParams);
  494. Utils.closeResource(insertFromAllDataWithSpecParamsSource);
  495. Utils.closeResource(insertDeleteRowFromModifyToSnap);
  496. Utils.closeResource(deleteFromTemp);
  497. Utils.closeResource(insertAddRowWithSpecParamsOrderBook);
  498. Utils.closeResource(insertAddRowWithSpecParamsSource);
  499.  
  500. // transfer data from prev snapshot to current
  501. snapshotHandler.nextSnapshot(CURRENT_SNAPSHOT_TABLE, TEMP_CURRENT_SNAPSHOT_TABLE);
  502.  
  503. insertFromAllData = connection.prepareStatement(format(loadQuery("insertFromAllData.sql"),
  504. CURRENT_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  505. insertAddRowToSnapshot = connection.prepareStatement(format(loadQuery("insertAddRowToSnapshot.sql"),
  506. CURRENT_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  507. insertAddRowToTempSnapshot = connection.prepareStatement(format(loadQuery("insertAddRowToSnapshot.sql"),
  508. TEMP_CURRENT_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  509. insertDeleteRowToSnapshot = connection.prepareStatement(format(loadQuery("insertDeleteRowToSnapshot.sql"),
  510. CURRENT_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  511. insertDeleteRowWithSpecParams = connection.prepareStatement(format(loadQuery(
  512. "insertDeleteRowWithSpecificParams.sql"), CURRENT_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  513. insertFromAllDataWithSpecParamsSource = connection.prepareStatement(format(loadQuery(
  514. "insertFromAllDataWithSpecificParams.sql"), CURRENT_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  515. insertDeleteRowFromModifyToSnap = connection.prepareStatement(format(loadQuery(
  516. "insertDeleteRowFromModifyToSnapshot.sql"), CURRENT_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  517. deleteFromTemp = connection.prepareStatement(format("DELETE FROM %s WHERE Side=? AND Order_ID=? " +
  518. "AND MessageTimestamp=?", TEMP_CURRENT_SNAPSHOT_TABLE));
  519. insertAddRowWithSpecParamsSource = connection.prepareStatement(format(loadQuery(
  520. "insertAddRowWithSpecificParams.sql"), CURRENT_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  521. insertAddRowWithSpecParamsOrderBook = connection.prepareStatement(format(loadQuery(
  522. "insertAddRowWithSpecificParams.sql"), CURRENT_SNAPSHOT_TABLE, orderBookSnapshotTempTable));
  523.  
  524. currentQueryParams.put("LeftTimestamp", previousSnapshotTime);
  525. currentQueryParams.put("RightTimestamp", currentSnapshotTime);
  526. createSnapshot(connection, String.format(SELECT_ALL_MESSAGES_BY_TIME,
  527. AGGREGATED_DATA_TABLE)); // second snapshot
  528.  
  529. executeBatch(insertFromAllData, "insertFromAllData");
  530. executeBatch(insertAddRowToSnapshot, "insertAddRowToSnapshot");
  531. executeBatch(insertAddRowToTempSnapshot, "insertAddRowToTempSnapshot");
  532. executeBatch(insertDeleteRowToSnapshot, "insertDeleteRowToSnapshot");
  533. executeBatch(insertDeleteRowWithSpecParams, "insertDeleteRowWithSpecParams");
  534. executeBatch(insertFromAllDataWithSpecParamsSource, "insertFromAllDataWithSpecParamsSource");
  535. executeBatch(insertDeleteRowFromModifyToSnap, "insertDeleteRowFromModifyToSnap");
  536. executeBatch(deleteFromTemp, "deleteFromTemp");
  537. executeBatch(insertAddRowWithSpecParamsSource, "insertAddRowWithSpecParamsSource");
  538. executeBatch(insertAddRowWithSpecParamsOrderBook, "insertAddRowWithSpecParamsOrderBook");
  539.  
  540. Utils.closeResource(insertFromAllData);
  541. Utils.closeResource(insertAddRowToSnapshot);
  542. Utils.closeResource(insertAddRowToTempSnapshot);
  543. Utils.closeResource(insertDeleteRowToSnapshot);
  544. Utils.closeResource(insertDeleteRowWithSpecParams);
  545. Utils.closeResource(insertFromAllDataWithSpecParamsSource);
  546. Utils.closeResource(insertDeleteRowFromModifyToSnap);
  547. Utils.closeResource(deleteFromTemp);
  548. Utils.closeResource(insertAddRowWithSpecParamsSource);
  549. Utils.closeResource(insertAddRowWithSpecParamsOrderBook);
  550.  
  551. //transfer result to final tables
  552. executeParametrizedQuery(connection, format(loadQuery("insertToFinalSnapshot.sql"),
  553. finalPreviousSnapshotTable, PREVIOUS_SNAPSHOT_TABLE), currentQueryParams);
  554. executeParametrizedQuery(connection, format(loadQuery("insertToFinalSnapshot.sql"),
  555. finalCurrentSnapshotTable, CURRENT_SNAPSHOT_TABLE), currentQueryParams);
  556.  
  557. connection.commit();
  558. }
  559.  
  560. Utils.closeResource(worseBuyPriceInTop5AllInstrument);
  561. Utils.closeResource(worseSellPriceInTop5AllInstrument);
  562. }
  563. }
  564.  
  565. private void cleanAllTempTables(Connection connection)
  566. {
  567. executeParametrizedQuery(connection, format("DELETE FROM %s", AGGREGATED_DATA_TABLE), currentQueryParams);
  568. executeParametrizedQuery(connection, format("DELETE FROM %s", orderBookSnapshotTempTable), currentQueryParams);
  569.  
  570. executeParametrizedQuery(connection, format("DELETE FROM %s", TEMP_PREVIOUS_SNAPSHOT_TABLE), currentQueryParams);
  571. executeParametrizedQuery(connection, format("DELETE FROM %s", PREVIOUS_SNAPSHOT_TABLE), currentQueryParams);
  572. executeParametrizedQuery(connection, format("DELETE FROM %s", TEMP_CURRENT_SNAPSHOT_TABLE), currentQueryParams);
  573. executeParametrizedQuery(connection, format("DELETE FROM %s", CURRENT_SNAPSHOT_TABLE), currentQueryParams);
  574. }
  575.  
  576. /*
  577. private void printTableColumns(StringTableData td)
  578. {
  579. CommaBuilder commaBuilder = new CommaBuilder();
  580. Iterator<String> headerIterator = td.getHeader().iterator();
  581. String currentColumn;
  582. while(headerIterator.hasNext())
  583. {
  584. currentColumn = headerIterator.next();
  585. String newColumn = "'" + currentColumn + "'";
  586. commaBuilder.append(newColumn);
  587. }
  588. String columns = commaBuilder.toString();
  589. logger.trace("Temp allData table columns: {}", columns);
  590.  
  591. }
  592. */
  593.  
  594. private void createSnapshot(Connection connection, String selectOrderBookPartQuery) throws SQLException
  595. {
  596. long MAX_ROWS = 100_000, currentRow = 0;
  597. try (DbDataReader reader = getTableReader(connection, selectOrderBookPartQuery, currentQueryParams))
  598. {
  599. reader.start();
  600. //printTableColumns(td);
  601. while (reader.hasMoreData())
  602. {
  603. if (currentRow == MAX_ROWS)
  604. {
  605. connection.commit();
  606. currentRow = 0;
  607. }
  608. else
  609. {
  610. currentRow++;
  611. }
  612. TableRow<String, String> tr = reader.readRow();
  613. String messageType = tr.getValue(MESSAGE_TYPE);
  614. String currentInstrument = tr.getValue("Instrument_ID");
  615. switch (messageType)
  616. {
  617. case ADD_ORDER_TYPE_DEC:
  618. addRecord(tr, currentInstrument);
  619. break;
  620. case MODIFY_ORDER_TYPE_DEC:
  621. modifyRecord(tr, currentInstrument);
  622. break;
  623. case DELETE_ORDER_TYPE_DEC:
  624. deleteRecord(tr, currentInstrument);
  625. break;
  626. case CLEAR_BOOK_TYPE_DEC:
  627. deleteAllRecords(currentInstrument);
  628. break;
  629. default:
  630. throw new ResultException(format("Invalid value in column %s: %s", MESSAGE_TYPE,
  631. messageType));
  632. }
  633. }
  634. }
  635. catch (IOException e)
  636. {
  637. String errMessage = "Error while reading data";
  638. this.logger.error(errMessage, e);
  639. throw new ResultException(errMessage, e);
  640. }
  641. }
  642.  
  643. private void addRecord(TableRow<String, String> tableRow, String currentInstrument) throws SQLException
  644. {
  645. String side = tableRow.getValue(SIDE);
  646. String orderId = tableRow.getValue(ORDER_ID);
  647. String timestamp = tableRow.getValue(MESSAGE_TIMESTAMP);
  648.  
  649. currentQueryParams.put(SIDE, side);
  650. currentQueryParams.put("OrderId", orderId);
  651. currentQueryParams.put("Timestamp", timestamp);
  652.  
  653. String price = tableRow.getValue(PRICE);
  654. String orderType = tableRow.getValue("Order_Type");
  655. String dss = tableRow.getValue("dss_rt_bus_seq_no");
  656. snapshotHandler.getInstrumentInfo(currentInstrument).addOrder(side, orderId, timestamp,
  657. Double.parseDouble(price), orderType, dss);
  658. }
  659.  
  660. private void modifyRecord(TableRow<String, String> tableRow, String currentInstrument) throws SQLException
  661. {
  662. String side = tableRow.getValue(SIDE);
  663. String orderId = tableRow.getValue(ORDER_ID);
  664. String timestamp = tableRow.getValue(MESSAGE_TIMESTAMP);
  665.  
  666. currentQueryParams.put(SIDE, side);
  667. currentQueryParams.put("OrderId", orderId);
  668. currentQueryParams.put("Timestamp", timestamp);
  669.  
  670. String oldPrice = tableRow.getValue("Old_Price");
  671. String newPrice = tableRow.getValue("Price");
  672. String newSize = tableRow.getValue("Size");
  673. String orderType = tableRow.getValue("Order_Type");
  674. String dss = tableRow.getValue("dss_rt_bus_seq_no");
  675. snapshotHandler.getInstrumentInfo(currentInstrument).modifyOrder(side, orderId,
  676. timestamp, Double.parseDouble(oldPrice), Double.parseDouble(newPrice), newSize, orderType, dss);
  677. }
  678.  
  679. private void deleteRecord(TableRow<String, String> tableRow, String currentInstrument) throws SQLException
  680. {
  681. String side = tableRow.getValue(SIDE);
  682. String orderId = tableRow.getValue(ORDER_ID);
  683. String timestamp = tableRow.getValue(MESSAGE_TIMESTAMP);
  684.  
  685. currentQueryParams.put(SIDE, side);
  686. currentQueryParams.put("OrderId", orderId);
  687. currentQueryParams.put("Timestamp", timestamp);
  688.  
  689. String price = tableRow.getValue(PRICE);
  690. String orderType = tableRow.getValue("Order_Type");
  691. String dss = tableRow.getValue("dss_rt_bus_seq_no");
  692. snapshotHandler.getInstrumentInfo(currentInstrument).deleteOrder(side, orderId,
  693. Double.parseDouble(price), orderType, timestamp, dss);
  694. }
  695.  
  696. private void deleteAllRecords(String currentInstrument) throws SQLException
  697. {
  698. snapshotHandler.getInstrumentInfo(currentInstrument).clear();
  699. }
  700.  
  701. private String loadQuery(String queryFile)
  702. {
  703. String createResultTable;
  704. try
  705. {
  706. createResultTable = SQLUtils.loadQuery(rootRelative(QUERIES_PATH + queryFile));
  707. }
  708. catch (IOException e)
  709. {
  710. String errMessage = "Error while loading query from file '" + queryFile + "'";
  711. this.logger.error(errMessage, e);
  712. throw new ResultException(errMessage, e);
  713. }
  714. return createResultTable;
  715. }
  716.  
  717. private void executeParametrizedQuery(Connection connection, String query, Map<String, String> params)
  718. {
  719. logger.trace("Query before parsing: {}", query);
  720. ParametrizedQuery paramQuery = SQLUtils.parseSQLTemplate(query);
  721. logger.trace("Parametrized query: {}", paramQuery.getQuery());
  722.  
  723. try (PreparedStatement statement = paramQuery.createPreparedStatement(connection, params))
  724. {
  725. statement.execute();
  726. }
  727. catch (SQLException e)
  728. {
  729. String errMessage = "Error while prepared statement creating";
  730. this.logger.error(errMessage, e);
  731. throw new ResultException(errMessage, e);
  732. }
  733. }
  734.  
  735. private String getWorsePriceInTop5(boolean isBuy, boolean isSpecificInstrument,
  736. String instrument) throws SQLException
  737. {
  738. PreparedStatement preparedStatement;
  739. if (isBuy)
  740. {
  741. if (isSpecificInstrument)
  742. {
  743. preparedStatement = worseBuyPriceInTop5OneInstrument;
  744. preparedStatement.setString(1, instrument);
  745. }
  746. else
  747. {
  748. preparedStatement = worseBuyPriceInTop5AllInstrument;
  749. }
  750. }
  751. else // Sell
  752. {
  753. if (isSpecificInstrument)
  754. {
  755. preparedStatement = worseSellPriceInTop5OneInstrument;
  756. preparedStatement.setString(1, instrument);
  757. }
  758. else
  759. {
  760. preparedStatement = worseSellPriceInTop5AllInstrument;
  761. }
  762. }
  763.  
  764. try (ResultSet rs = preparedStatement.executeQuery())
  765. {
  766. if(rs.next())
  767. return String.valueOf(rs.getDouble("Price"));
  768. }
  769. catch (SQLException e)
  770. {
  771. String errMessage = "Error while prepared statement creating";
  772. this.logger.error(errMessage, e);
  773. throw new ResultException(errMessage, e);
  774. }
  775. return null;
  776. }
  777.  
  778. private void executeQuery(Connection connection, String query)
  779. {
  780. logger.trace("Query to execute: {}", query);
  781.  
  782. try (Statement statement = connection.createStatement())
  783. {
  784. statement.execute(query);
  785. }
  786. catch (SQLException e)
  787. {
  788. String errMessage = format("Error while executing query '%s'.", query);
  789. this.logger.error(errMessage, e);
  790. throw new ResultException(errMessage, e);
  791. }
  792. }
  793.  
  794. private DbDataReader getTableReader(Connection connection, String sqlTemplate, Map<String, String> params)
  795. {
  796. ParametrizedQuery query;
  797. PreparedStatement statement;
  798. try
  799. {
  800. query = SQLUtils.parseSQLTemplate(sqlTemplate);
  801. statement = query.createPreparedStatement(connection, params);
  802. }
  803. catch (SQLException e)
  804. {
  805. String errMessage = "Error while prepared statement creating";
  806. this.logger.error(errMessage, e);
  807. throw new ResultException(errMessage, e);
  808. }
  809. return new DbDataReader(statement);
  810. }
  811.  
  812. private class SnapshotHandler
  813. {
  814. private final Connection connection;
  815. private String snapshotTable;
  816. private String tempSnapshotTable;
  817. private Map<String, InstrumentInfo> instrumentsInfo;
  818. private String sourceTable;
  819.  
  820. public SnapshotHandler(Connection dbConnection, String snapshotTable, String tempSnapshotTable)
  821. {
  822. connection = dbConnection;
  823. this.snapshotTable = snapshotTable;
  824. this.tempSnapshotTable = tempSnapshotTable;
  825. instrumentsInfo = new HashMap<>();
  826. }
  827.  
  828. public void setOrderDataSourceTable(String sourceTable)
  829. {
  830. this.sourceTable = sourceTable;
  831. for (Map.Entry<String, InstrumentInfo> instrumentEntry : instrumentsInfo.entrySet())
  832. {
  833. InstrumentInfo ii = instrumentEntry.getValue();
  834. ii.setOrderDataSourceTable(sourceTable);
  835. instrumentEntry.setValue(ii);
  836. }
  837. }
  838.  
  839. public InstrumentInfo getInstrumentInfo(String instrument)
  840. {
  841. InstrumentInfo ii = instrumentsInfo.get(instrument);
  842. if (ii == null)
  843. {
  844. ii = new InstrumentInfo(connection, snapshotTable, tempSnapshotTable);
  845. ii.setOrderDataSourceTable(sourceTable);
  846. instrumentsInfo.put(instrument, ii);
  847. }
  848. return ii;
  849. }
  850.  
  851. public void nextSnapshot(String newSnapshotTable, String newTempSnapshotTable) throws SQLException
  852. {
  853. this.snapshotTable = newSnapshotTable;
  854. this.tempSnapshotTable = newTempSnapshotTable;
  855. for (Map.Entry<String, InstrumentInfo> instrumentEntry : instrumentsInfo.entrySet())
  856. {
  857. instrumentEntry.getValue().nextSnapshot(newSnapshotTable, newTempSnapshotTable);
  858. }
  859. }
  860. }
  861.  
  862. private class InstrumentInfo
  863. {
  864. private final Connection connection;
  865. private final PriceTopHandler buyHandler, sellHandler;
  866. private String snapshotTable;
  867. private String sourceTable;
  868.  
  869. public InstrumentInfo(Connection dbConnection, String snapshotTable, String tempSnapshotTable)
  870. {
  871. connection = dbConnection;
  872. buyHandler = new PriceTopHandler(true, connection, snapshotTable, tempSnapshotTable);
  873. sellHandler = new PriceTopHandler(false, connection, snapshotTable, tempSnapshotTable);
  874. this.snapshotTable = snapshotTable;
  875. }
  876.  
  877. public void setOrderDataSourceTable(String sourceTable)
  878. {
  879. this.sourceTable = sourceTable;
  880. buyHandler.setOrderDataSourceTable(sourceTable);
  881. sellHandler.setOrderDataSourceTable(sourceTable);
  882. }
  883.  
  884. public void addOrder(String side, String orderId, String timestamp, double price, String orderType,
  885. String dss) throws SQLException
  886. {
  887. if (price == 0 && StringUtils.equals(orderType, "1"))
  888. {
  889. // handle zero price
  890. executePreparedStatement(insertFromAllData, "insertFromAllData", side, orderId, timestamp);
  891. }
  892. else
  893. {
  894. if ("66".equalsIgnoreCase(side))
  895. buyHandler.addOrder(orderId, timestamp, price, dss);
  896. else if ("83".equalsIgnoreCase(side))
  897. sellHandler.addOrder(orderId, timestamp, price, dss);
  898. }
  899. }
  900.  
  901. public void deleteOrder(String side, String orderId, double price, String orderType, String timestamp,
  902. String dss) throws SQLException
  903. {
  904. if (price == 0 && (orderType == null || StringUtils.equals(orderType, "1")))
  905. {
  906. // handle zero price
  907. executePreparedStatementSpecParams(insertDeleteRowWithSpecParams, "insertDeleteRowWithSpecParams",
  908. timestamp, dss, side, orderId, timestamp);
  909. }
  910. else
  911. {
  912. if ("66".equalsIgnoreCase(side))
  913. buyHandler.deleteOrder(orderId, price, timestamp, dss);
  914. else if ("83".equalsIgnoreCase(side))
  915. sellHandler.deleteOrder(orderId, price, timestamp, dss);
  916. }
  917. }
  918.  
  919. public void modifyOrder(String side, String orderId, String timestamp, double oldPrice, double newPrice,
  920. String newSize, String orderType, String dss) throws SQLException
  921. {
  922. if (oldPrice == 0 && newPrice == 0 && (orderType == null || StringUtils.equals(orderType, "1")))
  923. {
  924. // handle zero price
  925. executePreparedStatement(insertFromAllData, "insertFromAllData", side, orderId, timestamp);
  926. }
  927. else
  928. {
  929. if ("66".equalsIgnoreCase(side))
  930. buyHandler.modifyOrder(orderId, timestamp, oldPrice, newPrice, newSize, dss);
  931. else if ("83".equalsIgnoreCase(side))
  932. sellHandler.modifyOrder(orderId, timestamp, oldPrice, newPrice, newSize, dss);
  933. }
  934. }
  935.  
  936. public void clear() throws SQLException
  937. {
  938. buyHandler.clear();
  939. sellHandler.clear();
  940. }
  941.  
  942. public void nextSnapshot(String newSnapshotTable, String newTempSnapshotTable) throws SQLException
  943. {
  944. buyHandler.nextSnapshot(newSnapshotTable, newTempSnapshotTable);
  945. sellHandler.nextSnapshot(newSnapshotTable, newTempSnapshotTable);
  946. this.snapshotTable = newSnapshotTable;
  947. }
  948. }
  949.  
  950. private class PriceTopHandler
  951. {
  952. private class OrderInfo
  953. {
  954. private final String orderId;
  955. private final String timestamp;
  956.  
  957. public OrderInfo(String orderId)
  958. {
  959. this(orderId, null);
  960. }
  961.  
  962. public OrderInfo(String orderId, String timestamp)
  963. {
  964. this.orderId = orderId;
  965. this.timestamp = timestamp;
  966. }
  967.  
  968. public String getOrderId()
  969. {
  970. return orderId;
  971. }
  972.  
  973. public String getTimestamp()
  974. {
  975. return timestamp;
  976. }
  977.  
  978. @Override
  979. public boolean equals(Object o)
  980. {
  981. if (this == o)
  982. return true;
  983.  
  984. if (o == null || getClass() != o.getClass())
  985. return false;
  986.  
  987. OrderInfo orderInfo = (OrderInfo) o;
  988. return orderId.equals(orderInfo.orderId);
  989. }
  990.  
  991. @Override
  992. public int hashCode()
  993. {
  994. return orderId.hashCode();
  995. }
  996. }
  997.  
  998. private static final int TOP_SIZE = 5;
  999. private final TreeMap<Double, Set<OrderInfo>> top;
  1000. private final TreeMap<Double, Set<OrderInfo>> rest;
  1001. private final Comparator<Double> comparator;
  1002. private String tempSnapshotTable;
  1003. private String snapshotTable;
  1004. private final Connection connection;
  1005. private String orderDataSourceTable;
  1006. private final String side;
  1007.  
  1008. public PriceTopHandler(boolean isBuy, Connection connection, String snapshotTable, String tempSnapshotTable)
  1009. {
  1010. this.connection = connection;
  1011. this.tempSnapshotTable = tempSnapshotTable;
  1012. this.snapshotTable = snapshotTable;
  1013.  
  1014. // First = worst, last = best
  1015. comparator = isBuy ? Comparator.naturalOrder() : Comparator.reverseOrder();
  1016. side = isBuy ? "66" : "83";
  1017.  
  1018. top = new TreeMap<>(comparator);
  1019. rest = new TreeMap<>(comparator);
  1020. }
  1021.  
  1022. public void setOrderDataSourceTable(String orderDataSourceTable)
  1023. {
  1024. this.orderDataSourceTable = orderDataSourceTable;
  1025. }
  1026.  
  1027. public void addOrder(String orderId, String timestamp, double price, String dss) throws SQLException
  1028. {
  1029. addOrder(orderId, timestamp, price, dss, false);
  1030. }
  1031.  
  1032. private void addOrder(String orderId, String timestamp, double price, String dss, boolean isModify)
  1033. throws SQLException
  1034. {
  1035. BiFunction<Double, Set<OrderInfo>, Set<OrderInfo>> ADD_ORDER_ID = (k, set) ->
  1036. {
  1037. if (set == null)
  1038. set = new HashSet<>();
  1039.  
  1040. set.add(new OrderInfo(orderId, timestamp));
  1041. return set;
  1042. };
  1043.  
  1044. if (top.size() == TOP_SIZE && comparator.compare(price, top.firstKey()) < 0)
  1045. { // 1.5 - worse then top-5, record go to the 'rest'
  1046. rest.compute(price, ADD_ORDER_ID);
  1047.  
  1048. if (!isModify)
  1049. {
  1050. // add to temp
  1051. executePreparedStatement(insertAddRowToTempSnapshot, "insertAddRowToTempSnapshot", side, orderId, timestamp);
  1052. }
  1053. }
  1054. else
  1055. {
  1056. // 1.2, 1.3
  1057. top.compute(price, ADD_ORDER_ID);
  1058.  
  1059. if (!isModify)
  1060. {
  1061. // add AddOrder to snapshot
  1062. executePreparedStatement(insertFromAllData, "insertFromAllData", side, orderId, timestamp);
  1063. }
  1064.  
  1065. if (top.size() > TOP_SIZE)
  1066. { // 1.4
  1067. Map.Entry<Double, Set<OrderInfo>> worst = top.pollFirstEntry(); // poll worse price
  1068. rest.put(worst.getKey(), worst.getValue());
  1069.  
  1070. if (!isModify)
  1071. {
  1072. // add to temp as AddOrder
  1073. // Remove from snapshot (add row to snapshot as DeleteOrder)
  1074. for (OrderInfo oi : worst.getValue())
  1075. {
  1076. executePreparedStatement(insertAddRowToTempSnapshot, "insertAddRowToTempSnapshot",
  1077. side, oi.getOrderId(), oi.getTimestamp());
  1078. executePreparedStatementSpecParams(insertDeleteRowWithSpecParams, "insertDeleteRowWithSpecParams",
  1079. timestamp, dss, side, oi.getOrderId(), oi.getTimestamp());
  1080. }
  1081. }
  1082. }
  1083. }
  1084. }
  1085.  
  1086. public void deleteOrder(String orderId, double price, String timestamp, String dss) throws SQLException
  1087. {
  1088. deleteOrder(orderId, price, timestamp, dss, false);
  1089. }
  1090.  
  1091. private void deleteOrder(String orderId, double price, String timestamp, String dss, boolean isModify)
  1092. throws SQLException
  1093. {
  1094. if (top.containsKey(price))
  1095. {
  1096. if (!isModify)
  1097. {
  1098. // delete from snapshot (Add delete row)
  1099. executePreparedStatement(insertDeleteRowToSnapshot, "insertDeleteRowToSnapshot", side, orderId, timestamp);
  1100. }
  1101. Set<OrderInfo> orderIdsByPrice = top.get(price);
  1102. orderIdsByPrice.remove(new OrderInfo(orderId)); // 3.1.1, 3.1.2
  1103.  
  1104. if (orderIdsByPrice.isEmpty()) // add from Temp
  1105. { // 3.1.1
  1106. top.remove(price);
  1107.  
  1108. if (!rest.isEmpty())
  1109. {
  1110. double bestFromTemp = rest.lastKey(); // get best price from temp
  1111. top.put(bestFromTemp, rest.remove(bestFromTemp));
  1112.  
  1113. if (!isModify)
  1114. {
  1115. for (OrderInfo oi : top.get(bestFromTemp))
  1116. {
  1117. // add to snapshot - data can be either in OrderBook, or in Source
  1118. executePreparedStatementSpecParams(insertAddRowWithSpecParamsOrderBook,
  1119. "insertAddRowWithSpecParamsOrderBook",timestamp,
  1120. dss, side, oi.getOrderId(), oi.getTimestamp());
  1121. executePreparedStatementSpecParams(insertAddRowWithSpecParamsSource,
  1122. "insertAddRowWithSpecParamsSource", timestamp,
  1123. dss, side, oi.getOrderId(), oi.getTimestamp());
  1124. // remove from temp
  1125. executePreparedStatement(deleteFromTemp, "deleteFromTemp", side, oi.getOrderId(), oi.getTimestamp());
  1126. }
  1127. }
  1128. }
  1129. }
  1130. }
  1131. else
  1132. {
  1133. rest.remove(price);
  1134.  
  1135. if (!isModify)
  1136. {
  1137. // HARD delete from temp
  1138. currentQueryParams.put("OrderId", String.valueOf(orderId));
  1139. currentQueryParams.put("Side", side); // in Temp only Add Order, so we do
  1140. // not need Message Timestamp
  1141. executeParametrizedQuery(connection,
  1142. format("DELETE FROM %s WHERE Side='#Side' AND Order_ID='#OrderId'",
  1143. tempSnapshotTable), currentQueryParams);
  1144. }
  1145. }
  1146. }
  1147.  
  1148. private boolean containsInTop(String orderId, double oldPrice)
  1149. {
  1150. if (!top.containsKey(oldPrice))
  1151. return false;
  1152.  
  1153. for (OrderInfo oi : top.get(oldPrice))
  1154. {
  1155. if (oi.getOrderId().equals(orderId))
  1156. return true;
  1157. }
  1158. return false;
  1159. }
  1160.  
  1161. public void modifyOrder(String orderId, String timestamp, double oldPrice, double newPrice, String newSize,
  1162. String dss) throws SQLException
  1163. {
  1164. // write modify
  1165. boolean wasTop = containsInTop(orderId, oldPrice);
  1166. Integer numberOfOrdersWithOldPrice = top.containsKey(oldPrice) ? top.get(oldPrice).size() : null;
  1167. double wasWorseTopPrice = top.isEmpty() ? 0 : top.firstKey();
  1168. double wasBestRestPrice = rest.isEmpty() ? 0 : rest.firstKey();
  1169. OrderInfo orderInfoBeforeChange = null;
  1170. if (!wasTop)
  1171. {
  1172. if (rest.containsKey(oldPrice))
  1173. {
  1174. for (OrderInfo oi : rest.get(oldPrice))
  1175. {
  1176. if (oi.getOrderId().equals(orderId))
  1177. orderInfoBeforeChange = oi;
  1178. }
  1179. }
  1180. }
  1181.  
  1182. deleteOrder(orderId, oldPrice, null, null, true);
  1183. addOrder(orderId, timestamp, newPrice, null, true);
  1184.  
  1185. boolean isTop = containsInTop(orderId, newPrice);
  1186.  
  1187. if (wasTop)
  1188. {
  1189. if (isTop)
  1190. {
  1191. // Add modify record
  1192. executePreparedStatement(insertFromAllData, "insertFromAllData", side, orderId, timestamp);
  1193.  
  1194. if (numberOfOrdersWithOldPrice != null && numberOfOrdersWithOldPrice > 1 && top.get(newPrice).size() == 1) // new price push out worse price from top
  1195. {
  1196. Double restFirstKey = rest.isEmpty() ? null : rest.firstKey(); // best in rest
  1197. if (restFirstKey != null)
  1198. {
  1199. for (OrderInfo oi : rest.get(restFirstKey))
  1200. {
  1201. // add delete to snap
  1202. executePreparedStatementSpecParams(insertDeleteRowWithSpecParams,
  1203. "insertDeleteRowWithSpecParams", timestamp, dss,
  1204. side, oi.getOrderId(), oi.getTimestamp());
  1205. // add to temp
  1206. executePreparedStatement(insertAddRowToTempSnapshot, "insertAddRowToTempSnapshot",
  1207. side, oi.getOrderId(), oi.getTimestamp());
  1208. }
  1209. }
  1210. }
  1211. if (newPrice == wasBestRestPrice)
  1212. {
  1213. for (OrderInfo oi : top.get(wasBestRestPrice))
  1214. {
  1215. if (!oi.getOrderId().equals(orderId))
  1216. {
  1217. // Add record
  1218. executePreparedStatementSpecParams(insertFromAllDataWithSpecParamsSource,
  1219. "insertFromAllDataWithSpecParamsSource", timestamp,
  1220. dss, side, oi.getOrderId(), oi.getTimestamp());
  1221. }
  1222. }
  1223. }
  1224. }
  1225. else // wasTop && !isTop
  1226. {
  1227. for (OrderInfo oi : rest.get(newPrice))
  1228. {
  1229. executePreparedStatement(insertDeleteRowFromModifyToSnap, "insertDeleteRowFromModifyToSnap",
  1230. side, oi.getOrderId(), oi.getTimestamp());
  1231. executePreparedStatement(insertAddRowToTempSnapshot, "insertAddRowToTempSnapshot",
  1232. side, oi.getOrderId(), oi.getTimestamp());
  1233. }
  1234.  
  1235. double worseTopPrice = top.firstKey(); // worse in top
  1236. if (wasWorseTopPrice != worseTopPrice)
  1237. {
  1238. for (OrderInfo oi : top.get(worseTopPrice))
  1239. {
  1240. // add to snap - data can be either in OrderBook, or in Source
  1241. executePreparedStatementSpecParams(insertAddRowWithSpecParamsOrderBook,
  1242. "insertAddRowWithSpecParamsOrderBook", timestamp, dss,
  1243. side, oi.getOrderId(), oi.getTimestamp());
  1244. executePreparedStatementSpecParams(insertAddRowWithSpecParamsSource,
  1245. "insertAddRowWithSpecParamsSource", timestamp, dss, side,
  1246. oi.getOrderId(), oi.getTimestamp());
  1247. // remove hard from temp
  1248. executePreparedStatement(deleteFromTemp, "deleteFromTemp",
  1249. side, oi.getOrderId(), oi.getTimestamp());
  1250. }
  1251. }
  1252. }
  1253. }
  1254. else // !wasTop
  1255. {
  1256. if (isTop)
  1257. {
  1258. // add to snap
  1259. executePreparedStatement(insertAddRowToSnapshot, "insertAddRowToSnapshot", side, orderId, timestamp);
  1260. // remove hard from temp
  1261. executePreparedStatement(deleteFromTemp, "deleteFromTemp", side, orderInfoBeforeChange.getOrderId(),
  1262. orderInfoBeforeChange.getTimestamp());
  1263.  
  1264. if (top.get(newPrice).size() == 1) // some orders were pushed off from top by order with new price
  1265. {
  1266. Double restLastKey = rest.isEmpty() ? null : rest.lastKey(); // best in rest
  1267. if (restLastKey != null)
  1268. {
  1269. for (OrderInfo oi : rest.get(restLastKey))
  1270. {
  1271. // add delete to snap
  1272. executePreparedStatementSpecParams(insertDeleteRowWithSpecParams,
  1273. "insertDeleteRowWithSpecParams", timestamp, dss,
  1274. side, oi.getOrderId(), oi.getTimestamp());
  1275. // add to temp
  1276. executePreparedStatement(insertAddRowToTempSnapshot, "insertAddRowToTempSnapshot",
  1277. side, oi.getOrderId(), oi.getTimestamp());
  1278. }
  1279. }
  1280. }
  1281. }
  1282. else // !isTop
  1283. {
  1284. // remove hard from temp
  1285. executePreparedStatement(deleteFromTemp, "deleteFromTemp", side, orderId, timestamp);
  1286. // add to temp
  1287. executePreparedStatement(insertAddRowToTempSnapshot, "insertAddRowToTempSnapshot", side, orderId, timestamp);
  1288. }
  1289. }
  1290. }
  1291.  
  1292. public void clear() throws SQLException
  1293. {
  1294. top.values().stream()
  1295. .flatMap(Collection::stream)
  1296. .forEach(orderInfo -> {
  1297. // delete from snapshot (add row as DeleteOder to snap)
  1298. try
  1299. {
  1300. executePreparedStatement(insertDeleteRowToSnapshot, "insertDeleteRowToSnapshot", side,
  1301. orderInfo.getOrderId(), orderInfo.getTimestamp());
  1302. }
  1303. catch (SQLException e)
  1304. {
  1305. throw new ResultException("Error while inserting DeleteOrder row.", e);
  1306. }
  1307. });
  1308.  
  1309. top.clear();
  1310.  
  1311. for (Iterator<Double> it = rest.descendingKeySet().iterator(); it.hasNext() && top.size() < TOP_SIZE; )
  1312. {
  1313. double price = it.next();
  1314. top.put(price, rest.get(price));
  1315. it.remove();
  1316.  
  1317. for (OrderInfo oi : top.get(price))
  1318. {
  1319. // add to snapshot
  1320. executePreparedStatement(insertFromAllData, "insertFromAllData", side, oi.getOrderId(), oi.getTimestamp());
  1321. // remove from temp
  1322. executePreparedStatement(deleteFromTemp, "deleteFromTemp", side, oi.getOrderId(), oi.getTimestamp());
  1323. }
  1324. }
  1325. }
  1326.  
  1327. public void nextSnapshot(String newSnapshotTable, String newTempSnapshotTable) throws SQLException
  1328. {
  1329. insertFromAllData = connection.prepareStatement(format(insertFromAllDataQuery, newTempSnapshotTable, tempSnapshotTable));
  1330. rest.values().stream()
  1331. .flatMap(Collection::stream)
  1332. .forEach(orderData -> {
  1333. // select last transaction with orderId and timestamp and insert into new table (temp
  1334. // snapshot)
  1335. try
  1336. {
  1337. executePreparedStatement(insertFromAllData, "insertFromAllData",
  1338. side, orderData.getOrderId(), orderData.getTimestamp());
  1339. }
  1340. catch (SQLException e)
  1341. {
  1342. throw new ResultException(format("Error while transferring data from %s to %s",
  1343. tempSnapshotTable, newTempSnapshotTable), e);
  1344. }
  1345. });
  1346. executeBatch(insertFromAllData, "insertFromAllData");
  1347.  
  1348. snapshotTable = newSnapshotTable;
  1349. tempSnapshotTable = newTempSnapshotTable;
  1350. }
  1351. }
  1352.  
  1353. private void executePreparedStatement(PreparedStatement ps, String psName, String side, String orderId, String timestamp)
  1354. throws SQLException
  1355. {
  1356. ps.setString(1, side);
  1357. ps.setString(2, orderId);
  1358. ps.setString(3, timestamp);
  1359. ps.addBatch();
  1360. batchSize.compute(ps, (k, v) -> v == null ? 0 : v + 1);
  1361. if (batchSize.get(ps) == MAX_BATCH_SIZE)
  1362. {
  1363. executeBatch(ps, psName);
  1364. }
  1365. }
  1366.  
  1367. private void executePreparedStatementSpecParams(PreparedStatement ps, String psName, String timestampToInsert,
  1368. String dss, String side, String orderId, String timestampToSelect) throws SQLException
  1369. {
  1370. ps.setString(1, timestampToInsert);
  1371. ps.setString(2, dss);
  1372. ps.setString(3, side);
  1373. ps.setString(4, orderId);
  1374. ps.setString(5, timestampToSelect);
  1375. ps.addBatch();
  1376. batchSize.compute(ps, (k, v) -> v == null ? 0 : v + 1);
  1377. if (batchSize.get(ps) == MAX_BATCH_SIZE)
  1378. {
  1379. executeBatch(ps, psName);
  1380. }
  1381. }
  1382.  
  1383. private void executeBatch(PreparedStatement ps, String psName) throws SQLException
  1384. {
  1385. long start = System.currentTimeMillis();
  1386. ps.executeBatch();
  1387. long end = System.currentTimeMillis();
  1388. int psBatchSize = batchSize.get(ps) == null ? 0 : batchSize.get(ps);
  1389. logger.debug("Executing batch {} (size = {}) took {} ms", psName, psBatchSize,
  1390. (end - start));
  1391. batchSize.put(ps, 0);
  1392. }
  1393. }
  1394.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement