Advertisement
ibragimova_mariam

DelayedData

Nov 2nd, 2020
41
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 54.36 KB | None | 0 0
  1. public class DelayedDataSnapshot extends Action
  2. {
  3. private static final String QUERIES_PATH = getInstance().getConfigFiles().getCfgDir() +
  4. "delayedOrderBookSnapshot/sql/";
  5. private static final String CREATE_RESULT_TABLE = "createResultTable.sql";
  6.  
  7. private static final String SELECT_ALL_MESSAGES_BY_TIME = "SELECT * FROM %s WHERE " +
  8. "'#LeftTimestamp'<MessageTimestamp AND MessageTimestamp<='#RightTimestamp' ORDER BY MessageTimestamp";
  9. private static final int MAX_BATCH_SIZE = 100;
  10.  
  11. private static final String MESSAGE_TYPE = "Message_Type";
  12. private static final String SIDE = "Side";
  13. private static final String ORDER_ID = "Order_ID";
  14. private static final String PRICE = "Price";
  15. private static final String MESSAGE_TIMESTAMP = "MessageTimestamp";
  16.  
  17. private static final String AGGREGATED_DATA_TABLE = "AggregatedData";
  18. private static final String PREVIOUS_SNAPSHOT_TABLE = "PreviousSnapshot", CURRENT_SNAPSHOT_TABLE = "CurrentSnapshot";
  19. private static final String TEMP_PREVIOUS_SNAPSHOT_TABLE = "TempPreviousSnapshot", TEMP_CURRENT_SNAPSHOT_TABLE =
  20. "TempCurrentSnapshot";
  21.  
  22. private GdlGlobalContext globalContext;
  23. private String addOrderTable, modifyOrderTable, deleteOrderTable, orderBookClearTable, orderBookSnapshotTable,
  24. orderBookSnapshotTempTable;
  25. private String cthEndTime;
  26. private String dbConnectionName;
  27. private SnapshotHandler snapshotHandler;
  28. private String previousSnapshotTime, currentSnapshotTime, orderBookSnapshotTime;
  29. private Map<String, String> currentQueryParams;
  30. private PreparedStatement insertFromAllData, insertAddRowToSnapshot, insertAddRowToTempSnapshot,
  31. insertDeleteRowToSnapshot, insertDeleteRowWithSpecParams, insertFromAllDataWithSpecParamsSource,
  32. insertDeleteRowFromModifyToSnap, deleteFromTemp, insertAddRowWithSpecParamsSource,
  33. insertAddRowWithSpecParamsOrderBook, worseBuyPriceInTop5OneInstrument, worseSellPriceInTop5OneInstrument,
  34. worseBuyPriceInTop5AllInstrument, worseSellPriceInTop5AllInstrument,
  35. bestTop5OneInstrument, bestRest5OneInstrument, bestTop5AllInstrument, bestRest5AllInstrument;
  36. private Map<PreparedStatement, Integer> batchSize;
  37.  
  38. private String finalPreviousSnapshotTable, finalCurrentSnapshotTable;
  39.  
  40. @Override
  41. protected Result run(StepContext stepContext, MatrixContext matrixContext, GlobalContext globalContext)
  42. throws ResultException
  43. {
  44. this.globalContext = (GdlGlobalContext) globalContext;
  45. initParameters();
  46.  
  47. batchSize = new HashMap<>();
  48. logger.debug("CthEndTime: {} ({})", cthEndTime, GdlTimestampUtils.nanosStringToDateWithFractions(cthEndTime,
  49. "6"));
  50. currentSnapshotTime = GdlTimestampUtils.timestampNanosToRoundedTimestampMinusMinutesStr(cthEndTime, 0);
  51. previousSnapshotTime = GdlTimestampUtils.timestampNanosToRoundedTimestampMinusMinutesStr(currentSnapshotTime, 1);
  52. orderBookSnapshotTime =
  53. GdlTimestampUtils.timestampNanosToRoundedTimestampMinusMinutesStr(previousSnapshotTime, 1);
  54. logger.debug("CurrentSnapshotTime: {} ({})", currentSnapshotTime,
  55. GdlTimestampUtils.nanosStringToDateWithFractions(currentSnapshotTime, "6"));
  56. logger.debug("PreviousSnapshotTime: {} ({})", previousSnapshotTime,
  57. GdlTimestampUtils.nanosStringToDateWithFractions(previousSnapshotTime, "6"));
  58. logger.debug("OrderBookSnapshotTime: {} ({})", orderBookSnapshotTime,
  59. GdlTimestampUtils.nanosStringToDateWithFractions(orderBookSnapshotTime, "6"));
  60.  
  61. try
  62. {
  63. executeQueries();
  64. }
  65. catch (SQLException e)
  66. {
  67. String errMessage = "Error while committing changes";
  68. this.logger.error(errMessage, e);
  69. throw new ResultException(errMessage, e);
  70. }
  71.  
  72. return DefaultResult.passed("Two snapshots were made successfully, result in two tables - 'FinalPreviousSnapshotTable' " +
  73. "and 'FinalCurrentSnapshotTable'.");
  74. }
  75.  
  76. private void initParameters()
  77. {
  78. this.getLogger().debug("Initializing special action parameters");
  79. InputParamsHandler handler = new InputParamsHandler(this.inputParams);
  80.  
  81. this.addOrderTable = handler.getRequiredString("AddOrderTable");
  82. this.modifyOrderTable = handler.getRequiredString("ModifyOrderTable");
  83. this.deleteOrderTable = handler.getRequiredString("DeleteOrderTable");
  84. this.orderBookClearTable = handler.getRequiredString("OrderBookClearTable");
  85. this.orderBookSnapshotTable = handler.getRequiredString("OrderBookSnapshotTable");
  86.  
  87. handler.getRequiredString("Instrument");
  88. handler.getRequiredString("SourceVenue");
  89. this.cthEndTime = handler.getRequiredString("CTHEndTime");
  90.  
  91. this.dbConnectionName = handler.getRequiredString("DbConnection");
  92.  
  93. this.finalPreviousSnapshotTable = handler.getRequiredString("FinalPreviousSnapshotTable");
  94. this.finalCurrentSnapshotTable = handler.getRequiredString("FinalCurrentSnapshotTable");
  95.  
  96. handler.check();
  97.  
  98. this.orderBookSnapshotTempTable = orderBookSnapshotTable + "TempForDelayed";
  99.  
  100. }
  101.  
  102. private void executeQueries() throws SQLException
  103. {
  104. Connection connection = this.globalContext.getDbConnectionOrResultEx(dbConnectionName);
  105. connection.setAutoCommit(false);
  106. currentQueryParams = new HashMap<>(inputParams);
  107.  
  108. // Create tables
  109. executeQuery(connection, format(loadQuery(CREATE_RESULT_TABLE), AGGREGATED_DATA_TABLE)); // table for add, modify,
  110. // delete and clear table data
  111. executeQuery(connection, format(loadQuery(CREATE_RESULT_TABLE), TEMP_PREVIOUS_SNAPSHOT_TABLE));
  112. executeQuery(connection, format(loadQuery(CREATE_RESULT_TABLE), PREVIOUS_SNAPSHOT_TABLE));
  113. executeQuery(connection, format(loadQuery(CREATE_RESULT_TABLE), TEMP_CURRENT_SNAPSHOT_TABLE));
  114. executeQuery(connection, format(loadQuery(CREATE_RESULT_TABLE), CURRENT_SNAPSHOT_TABLE));
  115.  
  116. executeQuery(connection, format(loadQuery(CREATE_RESULT_TABLE), orderBookSnapshotTempTable));
  117.  
  118. executeQuery(connection, format(loadQuery(CREATE_RESULT_TABLE), finalPreviousSnapshotTable));
  119. executeQuery(connection, format(loadQuery(CREATE_RESULT_TABLE), finalCurrentSnapshotTable));
  120.  
  121. snapshotHandler = new SnapshotHandler(connection, finalPreviousSnapshotTable, TEMP_PREVIOUS_SNAPSHOT_TABLE);
  122.  
  123. String instrument = inputParams.get("Instrument");
  124. if (instrument.trim().equalsIgnoreCase("all")) // we need to execute action for all instruments from input
  125. // tables.
  126. {
  127. worseBuyPriceInTop5OneInstrument = connection.prepareStatement(format("SELECT MIN(Price) AS Price FROM " +
  128. "(SELECT DISTINCT Price FROM %s WHERE Message_Type='70' AND Side='66' " +
  129. "AND Instrument_ID=? ORDER BY Price DESC LIMIT 5) AS distPrices", orderBookSnapshotTempTable));
  130. worseSellPriceInTop5OneInstrument = connection.prepareStatement(format("SELECT MAX(Price) AS Price FROM " +
  131. "(SELECT DISTINCT Price FROM %s WHERE Message_Type='70' AND Side='83' " +
  132. "AND Instrument_ID=? ORDER BY Price LIMIT 5) as distPrices", orderBookSnapshotTempTable));
  133.  
  134. cleanAllTempTables(connection);
  135. // Fill allTablesData table (all instruments)
  136. executeParametrizedQuery(connection, format(loadQuery("insertFromAddOrderAllInstruments.sql"),
  137. AGGREGATED_DATA_TABLE, addOrderTable), currentQueryParams);
  138. connection.commit();
  139. executeParametrizedQuery(connection, format(loadQuery("insertFromModifyOrderAllInstruments.sql"),
  140. AGGREGATED_DATA_TABLE, modifyOrderTable), currentQueryParams);
  141. connection.commit();
  142. executeParametrizedQuery(connection, format(loadQuery("insertFromOrderDeleteAllInstruments.sql"),
  143. AGGREGATED_DATA_TABLE, deleteOrderTable), currentQueryParams);
  144. connection.commit();
  145. executeParametrizedQuery(connection, format(loadQuery("insertFromOrderBookClearAllInstruments.sql"),
  146. AGGREGATED_DATA_TABLE, orderBookClearTable), currentQueryParams);
  147. connection.commit();
  148.  
  149. // Fill temp OrderBookSnapshot
  150. currentQueryParams.put("Timestamp", orderBookSnapshotTime);
  151. executeParametrizedQuery(connection, format(loadQuery(
  152. "insertFromOrderBookSnapToTempAllInstruments" +
  153. ".sql"), orderBookSnapshotTempTable, orderBookSnapshotTable), currentQueryParams);
  154. connection.commit();
  155.  
  156. insertFromAllData = connection.prepareStatement(format(loadQuery("insertFromAllData.sql"),
  157. finalPreviousSnapshotTable, orderBookSnapshotTempTable));
  158. insertAddRowToTempSnapshot = connection.prepareStatement(format(loadQuery("insertAddRowToSnapshot.sql"),
  159. TEMP_PREVIOUS_SNAPSHOT_TABLE, orderBookSnapshotTempTable));
  160.  
  161. try(DbDataReader instrumentReader = getTableReader(connection, format("SELECT DISTINCT Instrument_ID FROM %s",
  162. orderBookSnapshotTempTable), currentQueryParams)) // Add to PrevSnapshot
  163. {
  164. instrumentReader.start();
  165. while (instrumentReader.hasMoreData())
  166. {
  167. TableRow<String, String> tableRow = instrumentReader.readRow();
  168. String currentInstrument = tableRow.getValue("Instrument_ID");
  169.  
  170. logger.trace("Current instrument (read from OrderBookSnapshot table): {}.", currentInstrument);
  171. currentQueryParams.put("Instrument", currentInstrument);
  172.  
  173. snapshotHandler.setOrderDataSourceTable(orderBookSnapshotTempTable);
  174.  
  175. bestTop5OneInstrument = connection.prepareStatement(format(
  176. "SELECT * from %s WHERE Message_Type='70' AND Instrument_ID=? " +
  177. "AND ((Side='66' AND Price>=?) OR (Side='83' AND Price<=?) " +
  178. "OR (Order_Type='1' AND Price='0'))", orderBookSnapshotTempTable));
  179. bestRest5OneInstrument = connection.prepareStatement(format(
  180. "SELECT * from %s WHERE Message_Type='70' AND Instrument_ID=? " +
  181. "AND NOT((Side='66' AND Price>=?) OR (Side='83' AND Price<=?) " +
  182. "OR (Order_Type='1' AND Price='0'))", orderBookSnapshotTempTable));
  183.  
  184. bestTop5OneInstrument.setString(1, currentInstrument);
  185. bestRest5OneInstrument.setString(1, currentInstrument);
  186.  
  187. String minBuyTop5Price = getWorsePriceInTop5( true, true, currentInstrument);
  188. logger.trace("Min Buy Top-5 Price in OrderBookSnapshot table: {}", minBuyTop5Price);
  189. bestTop5OneInstrument.setString(2, minBuyTop5Price);
  190. bestRest5OneInstrument.setString(2, minBuyTop5Price);
  191.  
  192. String maxSellTop5Price = getWorsePriceInTop5(false, true, currentInstrument);
  193. logger.trace("Max Sell Top-5 Price in OrderBookSnapshot table: {}", maxSellTop5Price);
  194. bestTop5OneInstrument.setString(3, maxSellTop5Price);
  195. bestRest5OneInstrument.setString(3, maxSellTop5Price);
  196.  
  197. try (DbDataReader reader = new DbDataReader(bestTop5OneInstrument)) // Add to PrevSnapshot
  198. {
  199. reader.start();
  200. while (reader.hasMoreData())
  201. {
  202. addRecord(reader.readRow(), currentInstrument);
  203. }
  204. }
  205. try (DbDataReader reader = new DbDataReader(bestRest5OneInstrument)) // Add to TempPrevSnapshot
  206. {
  207. reader.start();
  208. while (reader.hasMoreData())
  209. {
  210. addRecord(reader.readRow(), currentInstrument);
  211. }
  212. }
  213. // Stop reading data from OrderBookSnapshot
  214. }
  215. }
  216. catch (IOException e)
  217. {
  218. String errMessage = "Error while select rows from OrderBookSnapshotTempTable";
  219. this.logger.error(errMessage, e);
  220. throw new ResultException(errMessage, e);
  221. }
  222. insertFromAllData.executeBatch();
  223. insertAddRowToTempSnapshot.executeBatch();
  224. Utils.closeResource(insertFromAllData);
  225. Utils.closeResource(insertAddRowToTempSnapshot);
  226. Utils.closeResource(worseBuyPriceInTop5OneInstrument);
  227. Utils.closeResource(worseSellPriceInTop5OneInstrument);
  228. Utils.closeResource(bestTop5OneInstrument);
  229. Utils.closeResource(bestRest5OneInstrument);
  230. connection.commit();
  231.  
  232. snapshotHandler.setOrderDataSourceTable(AGGREGATED_DATA_TABLE);
  233.  
  234. insertFromAllData = connection.prepareStatement(format(loadQuery("insertFromAllData.sql"),
  235. finalPreviousSnapshotTable, AGGREGATED_DATA_TABLE));
  236. insertAddRowToSnapshot = connection.prepareStatement(format(loadQuery("insertAddRowToSnapshot.sql"),
  237. finalPreviousSnapshotTable, AGGREGATED_DATA_TABLE));
  238. insertAddRowToTempSnapshot = connection.prepareStatement(format(loadQuery("insertAddRowToSnapshot.sql"),
  239. TEMP_PREVIOUS_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  240. insertDeleteRowToSnapshot = connection.prepareStatement(format(loadQuery("insertDeleteRowToSnapshot.sql"),
  241. finalPreviousSnapshotTable, AGGREGATED_DATA_TABLE));
  242. insertDeleteRowWithSpecParams = connection.prepareStatement(format(loadQuery(
  243. "insertDeleteRowWithSpecificParams.sql"), finalPreviousSnapshotTable, AGGREGATED_DATA_TABLE));
  244. insertFromAllDataWithSpecParamsSource = connection.prepareStatement(format(loadQuery(
  245. "insertFromAllDataWithSpecificParams.sql"), finalPreviousSnapshotTable, AGGREGATED_DATA_TABLE));
  246. insertDeleteRowFromModifyToSnap = connection.prepareStatement(format(loadQuery(
  247. "insertDeleteRowFromModifyToSnapshot.sql"), finalPreviousSnapshotTable, AGGREGATED_DATA_TABLE));
  248. deleteFromTemp = connection.prepareStatement(format("DELETE FROM %s WHERE Side=? AND Order_ID=? AND MessageTimestamp=?",
  249. TEMP_PREVIOUS_SNAPSHOT_TABLE));
  250. insertAddRowWithSpecParamsSource = connection.prepareStatement(format(loadQuery(
  251. "insertAddRowWithSpecificParams.sql"), finalPreviousSnapshotTable, AGGREGATED_DATA_TABLE));
  252. insertAddRowWithSpecParamsOrderBook = connection.prepareStatement(format(loadQuery(
  253. "insertAddRowWithSpecificParams.sql"), finalPreviousSnapshotTable, orderBookSnapshotTempTable));
  254.  
  255. currentQueryParams.put("LeftTimestamp", orderBookSnapshotTime);
  256. currentQueryParams.put("RightTimestamp", previousSnapshotTime);
  257. createSnapshot(connection, String.format(SELECT_ALL_MESSAGES_BY_TIME,
  258. AGGREGATED_DATA_TABLE)); // first snapshot
  259.  
  260. insertFromAllData.executeBatch();
  261. insertAddRowToSnapshot.executeBatch();
  262. insertAddRowToTempSnapshot.executeBatch();
  263. insertDeleteRowToSnapshot.executeBatch();
  264. insertDeleteRowWithSpecParams.executeBatch();
  265. insertFromAllDataWithSpecParamsSource.executeBatch();
  266. insertDeleteRowFromModifyToSnap.executeBatch();
  267. deleteFromTemp.executeBatch();
  268. insertAddRowWithSpecParamsOrderBook.executeBatch();
  269. insertAddRowWithSpecParamsSource.executeBatch();
  270.  
  271. Utils.closeResource(insertFromAllData);
  272. Utils.closeResource(insertAddRowToSnapshot);
  273. Utils.closeResource(insertAddRowToTempSnapshot);
  274. Utils.closeResource(insertDeleteRowToSnapshot);
  275. Utils.closeResource(insertDeleteRowWithSpecParams);
  276. Utils.closeResource(insertFromAllDataWithSpecParamsSource);
  277. Utils.closeResource(insertDeleteRowFromModifyToSnap);
  278. Utils.closeResource(deleteFromTemp);
  279. Utils.closeResource(insertAddRowWithSpecParamsOrderBook);
  280. Utils.closeResource(insertAddRowWithSpecParamsSource);
  281. connection.commit();
  282.  
  283. // transfer data from prev snapshot to current
  284. snapshotHandler.nextSnapshot(finalCurrentSnapshotTable, TEMP_CURRENT_SNAPSHOT_TABLE);
  285.  
  286. insertFromAllData = connection.prepareStatement(format(loadQuery("insertFromAllData.sql"),
  287. finalCurrentSnapshotTable, AGGREGATED_DATA_TABLE));
  288. insertAddRowToSnapshot = connection.prepareStatement(format(loadQuery("insertAddRowToSnapshot.sql"),
  289. finalCurrentSnapshotTable, AGGREGATED_DATA_TABLE));
  290. insertAddRowToTempSnapshot = connection.prepareStatement(format(loadQuery("insertAddRowToSnapshot.sql"),
  291. TEMP_CURRENT_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  292. insertDeleteRowToSnapshot = connection.prepareStatement(format(loadQuery("insertDeleteRowToSnapshot.sql"),
  293. finalCurrentSnapshotTable, AGGREGATED_DATA_TABLE));
  294. insertDeleteRowWithSpecParams = connection.prepareStatement(format(loadQuery(
  295. "insertDeleteRowWithSpecificParams.sql"), finalCurrentSnapshotTable, AGGREGATED_DATA_TABLE));
  296. insertFromAllDataWithSpecParamsSource = connection.prepareStatement(format(loadQuery(
  297. "insertFromAllDataWithSpecificParams.sql"), finalCurrentSnapshotTable, AGGREGATED_DATA_TABLE));
  298. insertDeleteRowFromModifyToSnap = connection.prepareStatement(format(loadQuery(
  299. "insertDeleteRowFromModifyToSnapshot.sql"), finalCurrentSnapshotTable, AGGREGATED_DATA_TABLE));
  300. deleteFromTemp = connection.prepareStatement(format("DELETE FROM %s WHERE Side=? AND Order_ID=? AND MessageTimestamp=?", TEMP_CURRENT_SNAPSHOT_TABLE));
  301. insertAddRowWithSpecParamsSource = connection.prepareStatement(format(loadQuery(
  302. "insertAddRowWithSpecificParams.sql"), finalCurrentSnapshotTable, AGGREGATED_DATA_TABLE));
  303. insertAddRowWithSpecParamsOrderBook = connection.prepareStatement(format(loadQuery(
  304. "insertAddRowWithSpecificParams.sql"), finalCurrentSnapshotTable, orderBookSnapshotTempTable));
  305.  
  306. currentQueryParams.put("LeftTimestamp", previousSnapshotTime);
  307. currentQueryParams.put("RightTimestamp", currentSnapshotTime);
  308. createSnapshot(connection, String.format(SELECT_ALL_MESSAGES_BY_TIME,
  309. AGGREGATED_DATA_TABLE)); // second snapshot
  310.  
  311. insertFromAllData.executeBatch();
  312. insertAddRowToSnapshot.executeBatch();
  313. insertAddRowToTempSnapshot.executeBatch();
  314. insertDeleteRowToSnapshot.executeBatch();
  315. insertDeleteRowWithSpecParams.executeBatch();
  316. insertFromAllDataWithSpecParamsSource.executeBatch();
  317. insertDeleteRowFromModifyToSnap.executeBatch();
  318. deleteFromTemp.executeBatch();
  319. insertAddRowWithSpecParamsSource.executeBatch();
  320. insertAddRowWithSpecParamsOrderBook.executeBatch();
  321.  
  322. Utils.closeResource(insertFromAllData);
  323. Utils.closeResource(insertAddRowToSnapshot);
  324. Utils.closeResource(insertAddRowToTempSnapshot);
  325. Utils.closeResource(insertDeleteRowToSnapshot);
  326. Utils.closeResource(insertDeleteRowWithSpecParams);
  327. Utils.closeResource(insertFromAllDataWithSpecParamsSource);
  328. Utils.closeResource(insertDeleteRowFromModifyToSnap);
  329. Utils.closeResource(deleteFromTemp);
  330. Utils.closeResource(insertAddRowWithSpecParamsSource);
  331. Utils.closeResource(insertAddRowWithSpecParamsOrderBook);
  332. connection.commit();
  333.  
  334. //transfer result to final tables
  335. /*executeParametrizedQuery(connection, format(loadQuery("insertToFinalSnapshot.sql"),
  336. finalPreviousSnapshotTable, PREVIOUS_SNAPSHOT_TABLE), currentQueryParams);
  337. executeParametrizedQuery(connection, format(loadQuery("insertToFinalSnapshot.sql"),
  338. finalCurrentSnapshotTable, CURRENT_SNAPSHOT_TABLE), currentQueryParams);*/
  339. }
  340. else
  341. {
  342. worseBuyPriceInTop5AllInstrument = connection.prepareStatement(format("SELECT MIN(Price) AS Price FROM " +
  343. "(SELECT DISTINCT Price FROM %s WHERE Message_Type='70' AND Side='66' ORDER BY Price DESC " +
  344. "LIMIT 5) as distPrices", orderBookSnapshotTempTable));
  345. worseSellPriceInTop5AllInstrument = connection.prepareStatement(format("SELECT MAX(Price) AS Price FROM " +
  346. "(SELECT DISTINCT Price FROM %s WHERE Message_Type='70' AND Side='83' ORDER BY Price " +
  347. "LIMIT 5) as distPrices", orderBookSnapshotTempTable));
  348.  
  349. String[] allInstruments = instrument.split(",");
  350. logger.debug("There are {} distinct instruments in input parameter 'Instrument'.", allInstruments.length);
  351.  
  352. for (String currentInstrument : allInstruments)
  353. {
  354. cleanAllTempTables(connection);
  355.  
  356.  
  357. currentInstrument = currentInstrument.trim();
  358. logger.debug("Current instrument: {}.", currentInstrument);
  359. currentQueryParams.put("Instrument", currentInstrument);
  360.  
  361. // Fill temp OrderBookSnapshot
  362. currentQueryParams.put("Timestamp", orderBookSnapshotTime);
  363. executeParametrizedQuery(connection, format(loadQuery("insertFromOrderBookSnapToTemp.sql"),
  364. orderBookSnapshotTempTable, orderBookSnapshotTable), currentQueryParams);
  365.  
  366. bestTop5AllInstrument = connection.prepareStatement(format("SELECT * from %s WHERE " +
  367. "Message_Type='70' AND ((Side='66' AND Price>=?) OR (Side='83' AND Price<=?) " +
  368. "OR (Order_Type='1' AND Price='0'))", orderBookSnapshotTempTable));
  369. bestRest5AllInstrument = connection.prepareStatement(format("SELECT * from %s WHERE " +
  370. "Message_Type='70' AND NOT((Side='66' AND Price>=?) OR (Side='83' AND Price<=?) " +
  371. "OR (Order_Type='1' AND Price='0'))", orderBookSnapshotTempTable));
  372. insertFromAllData = connection.prepareStatement(format(loadQuery("insertFromAllData.sql"),
  373. finalPreviousSnapshotTable, orderBookSnapshotTempTable));
  374. insertAddRowToTempSnapshot = connection.prepareStatement(format(loadQuery("insertAddRowToSnapshot.sql"),
  375. TEMP_PREVIOUS_SNAPSHOT_TABLE, orderBookSnapshotTempTable));
  376.  
  377. // Fill allTablesData table
  378. executeParametrizedQuery(connection, format(loadQuery("insertFromAddOrder.sql"), AGGREGATED_DATA_TABLE,
  379. addOrderTable), currentQueryParams);
  380. executeParametrizedQuery(connection, format(loadQuery("insertFromModifyOrder.sql"), AGGREGATED_DATA_TABLE,
  381. modifyOrderTable), currentQueryParams);
  382. executeParametrizedQuery(connection, format(loadQuery("insertFromOrderDelete.sql"), AGGREGATED_DATA_TABLE,
  383. deleteOrderTable), currentQueryParams);
  384. executeParametrizedQuery(connection, format(loadQuery("insertFromOrderBookClear.sql"), AGGREGATED_DATA_TABLE,
  385. orderBookClearTable), currentQueryParams);
  386.  
  387. snapshotHandler.setOrderDataSourceTable(orderBookSnapshotTempTable);
  388. try
  389. {
  390. String minBuyTop5Price = getWorsePriceInTop5( true, false, null);
  391. logger.trace("Min Buy Top-5 Price in OrderBookSnapshot table for instrument {}: {}",
  392. currentInstrument, minBuyTop5Price);
  393. bestTop5AllInstrument.setString(1, minBuyTop5Price);
  394. bestRest5AllInstrument.setString(1, minBuyTop5Price);
  395.  
  396. String maxSellTop5Price = getWorsePriceInTop5( false, false, null);
  397. logger.trace("Max Sell Top-5 Price in OrderBookSnapshot table for instrument {}: {}",
  398. currentInstrument, maxSellTop5Price);
  399. bestTop5AllInstrument.setString(2, maxSellTop5Price);
  400. bestRest5AllInstrument.setString(2, maxSellTop5Price);
  401.  
  402. try (DbDataReader reader = new DbDataReader(bestTop5AllInstrument)) // Add to PrevSnapshot
  403. {
  404. reader.start();
  405. while (reader.hasMoreData())
  406. {
  407. addRecord(reader.readRow(), currentInstrument);
  408. }
  409. }
  410. try (DbDataReader reader = new DbDataReader(bestRest5AllInstrument)) // Add to TempPrevSnapshot
  411. {
  412. reader.start();
  413. while (reader.hasMoreData())
  414. {
  415. addRecord(reader.readRow(), currentInstrument);
  416. }
  417. }
  418. }
  419. catch (IOException e)
  420. {
  421. String errMessage = "Error while select rows from OrderBookSnapshotTempTable";
  422. this.logger.error(errMessage, e);
  423. throw new ResultException(errMessage, e);
  424. }
  425.  
  426. insertFromAllData.executeBatch();
  427. insertAddRowToTempSnapshot.executeBatch();
  428. Utils.closeResource(insertFromAllData);
  429. Utils.closeResource(insertAddRowToTempSnapshot);
  430. Utils.closeResource(bestTop5AllInstrument);
  431. Utils.closeResource(bestRest5AllInstrument);
  432.  
  433. snapshotHandler.setOrderDataSourceTable(AGGREGATED_DATA_TABLE);
  434.  
  435. insertFromAllData = connection.prepareStatement(format(loadQuery("insertFromAllData.sql"),
  436. PREVIOUS_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  437. insertAddRowToSnapshot = connection.prepareStatement(format(loadQuery("insertAddRowToSnapshot.sql"),
  438. PREVIOUS_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  439. insertAddRowToTempSnapshot = connection.prepareStatement(format(loadQuery("insertAddRowToSnapshot.sql"),
  440. TEMP_PREVIOUS_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  441. insertDeleteRowToSnapshot = connection.prepareStatement(format(loadQuery("insertDeleteRowToSnapshot.sql"),
  442. PREVIOUS_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  443. insertDeleteRowWithSpecParams = connection.prepareStatement(format(loadQuery(
  444. "insertDeleteRowWithSpecificParams.sql"), PREVIOUS_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  445. insertFromAllDataWithSpecParamsSource = connection.prepareStatement(format(loadQuery(
  446. "insertFromAllDataWithSpecificParams.sql"), PREVIOUS_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  447. insertDeleteRowFromModifyToSnap = connection.prepareStatement(format(loadQuery(
  448. "insertDeleteRowFromModifyToSnapshot.sql"), PREVIOUS_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  449. deleteFromTemp = connection.prepareStatement(format("DELETE FROM %s WHERE Side=? AND Order_ID=? AND " +
  450. "MessageTimestamp=?", TEMP_PREVIOUS_SNAPSHOT_TABLE));
  451. insertAddRowWithSpecParamsSource = connection.prepareStatement(format(loadQuery(
  452. "insertAddRowWithSpecificParams.sql"), PREVIOUS_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  453. insertAddRowWithSpecParamsOrderBook = connection.prepareStatement(format(loadQuery(
  454. "insertAddRowWithSpecificParams.sql"), PREVIOUS_SNAPSHOT_TABLE, orderBookSnapshotTempTable));
  455.  
  456. currentQueryParams.put("LeftTimestamp", orderBookSnapshotTime);
  457. currentQueryParams.put("RightTimestamp", previousSnapshotTime);
  458. createSnapshot(connection, String.format(SELECT_ALL_MESSAGES_BY_TIME,
  459. AGGREGATED_DATA_TABLE)); // first snapshot
  460.  
  461. insertFromAllData.executeBatch();
  462. insertAddRowToSnapshot.executeBatch();
  463. insertAddRowToTempSnapshot.executeBatch();
  464. insertDeleteRowToSnapshot.executeBatch();
  465. insertDeleteRowWithSpecParams.executeBatch();
  466. insertFromAllDataWithSpecParamsSource.executeBatch();
  467. insertDeleteRowFromModifyToSnap.executeBatch();
  468. deleteFromTemp.executeBatch();
  469. insertAddRowWithSpecParamsOrderBook.executeBatch();
  470. insertAddRowWithSpecParamsSource.executeBatch();
  471.  
  472. Utils.closeResource(insertFromAllData);
  473. Utils.closeResource(insertAddRowToSnapshot);
  474. Utils.closeResource(insertAddRowToTempSnapshot);
  475. Utils.closeResource(insertDeleteRowToSnapshot);
  476. Utils.closeResource(insertDeleteRowWithSpecParams);
  477. Utils.closeResource(insertFromAllDataWithSpecParamsSource);
  478. Utils.closeResource(insertDeleteRowFromModifyToSnap);
  479. Utils.closeResource(deleteFromTemp);
  480. Utils.closeResource(insertAddRowWithSpecParamsOrderBook);
  481. Utils.closeResource(insertAddRowWithSpecParamsSource);
  482.  
  483. // transfer data from prev snapshot to current
  484. snapshotHandler.nextSnapshot(CURRENT_SNAPSHOT_TABLE, TEMP_CURRENT_SNAPSHOT_TABLE);
  485.  
  486. insertFromAllData = connection.prepareStatement(format(loadQuery("insertFromAllData.sql"),
  487. CURRENT_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  488. insertAddRowToSnapshot = connection.prepareStatement(format(loadQuery("insertAddRowToSnapshot.sql"),
  489. CURRENT_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  490. insertAddRowToTempSnapshot = connection.prepareStatement(format(loadQuery("insertAddRowToSnapshot.sql"),
  491. TEMP_CURRENT_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  492. insertDeleteRowToSnapshot = connection.prepareStatement(format(loadQuery("insertDeleteRowToSnapshot.sql"),
  493. CURRENT_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  494. insertDeleteRowWithSpecParams = connection.prepareStatement(format(loadQuery(
  495. "insertDeleteRowWithSpecificParams.sql"), CURRENT_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  496. insertFromAllDataWithSpecParamsSource = connection.prepareStatement(format(loadQuery(
  497. "insertFromAllDataWithSpecificParams.sql"), CURRENT_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  498. insertDeleteRowFromModifyToSnap = connection.prepareStatement(format(loadQuery(
  499. "insertDeleteRowFromModifyToSnapshot.sql"), CURRENT_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  500. deleteFromTemp = connection.prepareStatement(format("DELETE FROM %s WHERE Side=? AND Order_ID=? " +
  501. "AND MessageTimestamp=?", TEMP_CURRENT_SNAPSHOT_TABLE));
  502. insertAddRowWithSpecParamsSource = connection.prepareStatement(format(loadQuery(
  503. "insertAddRowWithSpecificParams.sql"), CURRENT_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  504. insertAddRowWithSpecParamsOrderBook = connection.prepareStatement(format(loadQuery(
  505. "insertAddRowWithSpecificParams.sql"), CURRENT_SNAPSHOT_TABLE, orderBookSnapshotTempTable));
  506.  
  507. currentQueryParams.put("LeftTimestamp", previousSnapshotTime);
  508. currentQueryParams.put("RightTimestamp", currentSnapshotTime);
  509. createSnapshot(connection, String.format(SELECT_ALL_MESSAGES_BY_TIME,
  510. AGGREGATED_DATA_TABLE)); // second snapshot
  511.  
  512. insertFromAllData.executeBatch();
  513. insertAddRowToSnapshot.executeBatch();
  514. insertAddRowToTempSnapshot.executeBatch();
  515. insertDeleteRowToSnapshot.executeBatch();
  516. insertDeleteRowWithSpecParams.executeBatch();
  517. insertFromAllDataWithSpecParamsSource.executeBatch();
  518. insertDeleteRowFromModifyToSnap.executeBatch();
  519. deleteFromTemp.executeBatch();
  520. insertAddRowWithSpecParamsSource.executeBatch();
  521. insertAddRowWithSpecParamsOrderBook.executeBatch();
  522.  
  523. Utils.closeResource(insertFromAllData);
  524. Utils.closeResource(insertAddRowToSnapshot);
  525. Utils.closeResource(insertAddRowToTempSnapshot);
  526. Utils.closeResource(insertDeleteRowToSnapshot);
  527. Utils.closeResource(insertDeleteRowWithSpecParams);
  528. Utils.closeResource(insertFromAllDataWithSpecParamsSource);
  529. Utils.closeResource(insertDeleteRowFromModifyToSnap);
  530. Utils.closeResource(deleteFromTemp);
  531. Utils.closeResource(insertAddRowWithSpecParamsSource);
  532. Utils.closeResource(insertAddRowWithSpecParamsOrderBook);
  533.  
  534. //transfer result to final tables
  535. executeParametrizedQuery(connection, format(loadQuery("insertToFinalSnapshot.sql"),
  536. finalPreviousSnapshotTable, PREVIOUS_SNAPSHOT_TABLE), currentQueryParams);
  537. executeParametrizedQuery(connection, format(loadQuery("insertToFinalSnapshot.sql"),
  538. finalCurrentSnapshotTable, CURRENT_SNAPSHOT_TABLE), currentQueryParams);
  539.  
  540. connection.commit();
  541. }
  542.  
  543. Utils.closeResource(worseBuyPriceInTop5AllInstrument);
  544. Utils.closeResource(worseSellPriceInTop5AllInstrument);
  545. }
  546. }
  547.  
  548. private void cleanAllTempTables(Connection connection)
  549. {
  550. executeParametrizedQuery(connection, format("DELETE FROM %s", AGGREGATED_DATA_TABLE), currentQueryParams);
  551. executeParametrizedQuery(connection, format("DELETE FROM %s", orderBookSnapshotTempTable), currentQueryParams);
  552.  
  553. executeParametrizedQuery(connection, format("DELETE FROM %s", TEMP_PREVIOUS_SNAPSHOT_TABLE), currentQueryParams);
  554. executeParametrizedQuery(connection, format("DELETE FROM %s", PREVIOUS_SNAPSHOT_TABLE), currentQueryParams);
  555. executeParametrizedQuery(connection, format("DELETE FROM %s", TEMP_CURRENT_SNAPSHOT_TABLE), currentQueryParams);
  556. executeParametrizedQuery(connection, format("DELETE FROM %s", CURRENT_SNAPSHOT_TABLE), currentQueryParams);
  557. }
  558.  
  559. /*
  560. private void printTableColumns(StringTableData td)
  561. {
  562. CommaBuilder commaBuilder = new CommaBuilder();
  563. Iterator<String> headerIterator = td.getHeader().iterator();
  564. String currentColumn;
  565. while(headerIterator.hasNext())
  566. {
  567. currentColumn = headerIterator.next();
  568. String newColumn = "'" + currentColumn + "'";
  569. commaBuilder.append(newColumn);
  570. }
  571. String columns = commaBuilder.toString();
  572. logger.trace("Temp allData table columns: {}", columns);
  573.  
  574. }
  575. */
  576.  
  577. private void createSnapshot(Connection connection, String selectOrderBookPartQuery) throws SQLException
  578. {
  579. long MAX_ROWS = 100_000, currentRow = 0;
  580. try (DbDataReader reader = getTableReader(connection, selectOrderBookPartQuery, currentQueryParams))
  581. {
  582. reader.start();
  583. //printTableColumns(td);
  584. while (reader.hasMoreData())
  585. {
  586. if (currentRow == MAX_ROWS)
  587. {
  588. connection.commit();
  589. currentRow = 0;
  590. }
  591. else
  592. {
  593. currentRow++;
  594. }
  595. TableRow<String, String> tr = reader.readRow();
  596. String messageType = tr.getValue(MESSAGE_TYPE);
  597. String currentInstrument = tr.getValue("Instrument_ID");
  598. switch (messageType)
  599. {
  600. case ADD_ORDER_TYPE_DEC:
  601. addRecord(tr, currentInstrument);
  602. break;
  603. case MODIFY_ORDER_TYPE_DEC:
  604. modifyRecord(tr, currentInstrument);
  605. break;
  606. case DELETE_ORDER_TYPE_DEC:
  607. deleteRecord(tr, currentInstrument);
  608. break;
  609. case CLEAR_BOOK_TYPE_DEC:
  610. deleteAllRecords(currentInstrument);
  611. break;
  612. default:
  613. throw new ResultException(format("Invalid value in column %s: %s", MESSAGE_TYPE,
  614. messageType));
  615. }
  616. }
  617. }
  618. catch (IOException e)
  619. {
  620. String errMessage = "Error while reading data";
  621. this.logger.error(errMessage, e);
  622. throw new ResultException(errMessage, e);
  623. }
  624. }
  625.  
  626. private void addRecord(TableRow<String, String> tableRow, String currentInstrument) throws SQLException
  627. {
  628. String side = tableRow.getValue(SIDE);
  629. String orderId = tableRow.getValue(ORDER_ID);
  630. String timestamp = tableRow.getValue(MESSAGE_TIMESTAMP);
  631.  
  632. currentQueryParams.put(SIDE, side);
  633. currentQueryParams.put("OrderId", orderId);
  634. currentQueryParams.put("Timestamp", timestamp);
  635.  
  636. String price = tableRow.getValue(PRICE);
  637. String orderType = tableRow.getValue("Order_Type");
  638. String dss = tableRow.getValue("dss_rt_bus_seq_no");
  639. snapshotHandler.getInstrumentInfo(currentInstrument).addOrder(side, orderId, timestamp,
  640. Double.parseDouble(price), orderType, dss);
  641. }
  642.  
  643. private void modifyRecord(TableRow<String, String> tableRow, String currentInstrument) throws SQLException
  644. {
  645. String side = tableRow.getValue(SIDE);
  646. String orderId = tableRow.getValue(ORDER_ID);
  647. String timestamp = tableRow.getValue(MESSAGE_TIMESTAMP);
  648.  
  649. currentQueryParams.put(SIDE, side);
  650. currentQueryParams.put("OrderId", orderId);
  651. currentQueryParams.put("Timestamp", timestamp);
  652.  
  653. String oldPrice = tableRow.getValue("Old_Price");
  654. String newPrice = tableRow.getValue("Price");
  655. String newSize = tableRow.getValue("Size");
  656. String orderType = tableRow.getValue("Order_Type");
  657. String dss = tableRow.getValue("dss_rt_bus_seq_no");
  658. snapshotHandler.getInstrumentInfo(currentInstrument).modifyOrder(side, orderId,
  659. timestamp, Double.parseDouble(oldPrice), Double.parseDouble(newPrice), newSize, orderType, dss);
  660. }
  661.  
  662. private void deleteRecord(TableRow<String, String> tableRow, String currentInstrument) throws SQLException
  663. {
  664. String side = tableRow.getValue(SIDE);
  665. String orderId = tableRow.getValue(ORDER_ID);
  666. String timestamp = tableRow.getValue(MESSAGE_TIMESTAMP);
  667.  
  668. currentQueryParams.put(SIDE, side);
  669. currentQueryParams.put("OrderId", orderId);
  670. currentQueryParams.put("Timestamp", timestamp);
  671.  
  672. String price = tableRow.getValue(PRICE);
  673. String orderType = tableRow.getValue("Order_Type");
  674. String dss = tableRow.getValue("dss_rt_bus_seq_no");
  675. snapshotHandler.getInstrumentInfo(currentInstrument).deleteOrder(side, orderId,
  676. Double.parseDouble(price), orderType, timestamp, dss);
  677. }
  678.  
  679. private void deleteAllRecords(String currentInstrument) throws SQLException
  680. {
  681. snapshotHandler.getInstrumentInfo(currentInstrument).clear();
  682. }
  683.  
  684. private String loadQuery(String queryFile)
  685. {
  686. String createResultTable;
  687. try
  688. {
  689. createResultTable = SQLUtils.loadQuery(rootRelative(QUERIES_PATH + queryFile));
  690. }
  691. catch (IOException e)
  692. {
  693. String errMessage = "Error while loading query from file '" + queryFile + "'";
  694. this.logger.error(errMessage, e);
  695. throw new ResultException(errMessage, e);
  696. }
  697. return createResultTable;
  698. }
  699.  
  700. private void executeParametrizedQuery(Connection connection, String query, Map<String, String> params)
  701. {
  702. logger.trace("Query before parsing: {}", query);
  703. ParametrizedQuery paramQuery = SQLUtils.parseSQLTemplate(query);
  704. logger.trace("Parametrized query: {}", paramQuery.getQuery());
  705.  
  706. try (PreparedStatement statement = paramQuery.createPreparedStatement(connection, params))
  707. {
  708. statement.execute();
  709. }
  710. catch (SQLException e)
  711. {
  712. String errMessage = "Error while prepared statement creating";
  713. this.logger.error(errMessage, e);
  714. throw new ResultException(errMessage, e);
  715. }
  716. }
  717.  
  718. private String getWorsePriceInTop5(boolean isBuy, boolean isSpecificInstrument,
  719. String instrument) throws SQLException
  720. {
  721. PreparedStatement preparedStatement;
  722. if (isBuy)
  723. {
  724. if (isSpecificInstrument)
  725. {
  726. preparedStatement = worseBuyPriceInTop5OneInstrument;
  727. preparedStatement.setString(1, instrument);
  728. }
  729. else
  730. {
  731. preparedStatement = worseBuyPriceInTop5AllInstrument;
  732. }
  733. }
  734. else // Sell
  735. {
  736. if (isSpecificInstrument)
  737. {
  738. preparedStatement = worseSellPriceInTop5OneInstrument;
  739. preparedStatement.setString(1, instrument);
  740. }
  741. else
  742. {
  743. preparedStatement = worseSellPriceInTop5AllInstrument;
  744. }
  745. }
  746.  
  747. try (ResultSet rs = preparedStatement.executeQuery())
  748. {
  749. if(rs.next())
  750. return String.valueOf(rs.getDouble("Price"));
  751. }
  752. catch (SQLException e)
  753. {
  754. String errMessage = "Error while prepared statement creating";
  755. this.logger.error(errMessage, e);
  756. throw new ResultException(errMessage, e);
  757. }
  758. return null;
  759. }
  760.  
  761. private void executeQuery(Connection connection, String query)
  762. {
  763. logger.trace("Query to execute: {}", query);
  764.  
  765. try (Statement statement = connection.createStatement())
  766. {
  767. statement.execute(query);
  768. }
  769. catch (SQLException e)
  770. {
  771. String errMessage = format("Error while executing query '%s'.", query);
  772. this.logger.error(errMessage, e);
  773. throw new ResultException(errMessage, e);
  774. }
  775. }
  776.  
  777. private DbDataReader getTableReader(Connection connection, String sqlTemplate, Map<String, String> params)
  778. {
  779. ParametrizedQuery query;
  780. PreparedStatement statement;
  781. try
  782. {
  783. query = SQLUtils.parseSQLTemplate(sqlTemplate);
  784. statement = query.createPreparedStatement(connection, params);
  785. }
  786. catch (SQLException e)
  787. {
  788. String errMessage = "Error while prepared statement creating";
  789. this.logger.error(errMessage, e);
  790. throw new ResultException(errMessage, e);
  791. }
  792. return new DbDataReader(statement);
  793. }
  794.  
  795. private class SnapshotHandler
  796. {
  797. private final Connection connection;
  798. private String snapshotTable;
  799. private String tempSnapshotTable;
  800. private Map<String, InstrumentInfo> instrumentsInfo;
  801. private String sourceTable;
  802.  
  803. public SnapshotHandler(Connection dbConnection, String snapshotTable, String tempSnapshotTable)
  804. {
  805. connection = dbConnection;
  806. this.snapshotTable = snapshotTable;
  807. this.tempSnapshotTable = tempSnapshotTable;
  808. instrumentsInfo = new HashMap<>();
  809. }
  810.  
  811. public void setOrderDataSourceTable(String sourceTable)
  812. {
  813. this.sourceTable = sourceTable;
  814. for (Map.Entry<String, InstrumentInfo> instrumentEntry : instrumentsInfo.entrySet())
  815. {
  816. InstrumentInfo ii = instrumentEntry.getValue();
  817. ii.setOrderDataSourceTable(sourceTable);
  818. instrumentEntry.setValue(ii);
  819. }
  820. }
  821.  
  822. public InstrumentInfo getInstrumentInfo(String instrument)
  823. {
  824. InstrumentInfo ii = instrumentsInfo.get(instrument);
  825. if (ii == null)
  826. {
  827. ii = new InstrumentInfo(connection, snapshotTable, tempSnapshotTable);
  828. ii.setOrderDataSourceTable(sourceTable);
  829. instrumentsInfo.put(instrument, ii);
  830. }
  831. return ii;
  832. }
  833.  
  834. public void nextSnapshot(String newSnapshotTable, String newTempSnapshotTable) throws SQLException
  835. {
  836. this.snapshotTable = newSnapshotTable;
  837. this.tempSnapshotTable = newTempSnapshotTable;
  838. for (Map.Entry<String, InstrumentInfo> instrumentEntry : instrumentsInfo.entrySet())
  839. {
  840. instrumentEntry.getValue().nextSnapshot(newSnapshotTable, newTempSnapshotTable);
  841. }
  842. }
  843. }
  844.  
  845. private class InstrumentInfo
  846. {
  847. private final Connection connection;
  848. private final PriceTopHandler buyHandler, sellHandler;
  849. private String snapshotTable;
  850. private String sourceTable;
  851.  
  852. public InstrumentInfo(Connection dbConnection, String snapshotTable, String tempSnapshotTable)
  853. {
  854. connection = dbConnection;
  855. buyHandler = new PriceTopHandler(true, connection, snapshotTable, tempSnapshotTable);
  856. sellHandler = new PriceTopHandler(false, connection, snapshotTable, tempSnapshotTable);
  857. this.snapshotTable = snapshotTable;
  858. }
  859.  
  860. public void setOrderDataSourceTable(String sourceTable)
  861. {
  862. this.sourceTable = sourceTable;
  863. buyHandler.setOrderDataSourceTable(sourceTable);
  864. sellHandler.setOrderDataSourceTable(sourceTable);
  865. }
  866.  
  867. public void addOrder(String side, String orderId, String timestamp, double price, String orderType,
  868. String dss) throws SQLException
  869. {
  870. if (price == 0 && StringUtils.equals(orderType, "1"))
  871. {
  872. // handle zero price
  873. executePreparedStatement(insertFromAllData, side, orderId, timestamp);
  874. }
  875. else
  876. {
  877. if ("66".equalsIgnoreCase(side))
  878. buyHandler.addOrder(orderId, timestamp, price, dss);
  879. else if ("83".equalsIgnoreCase(side))
  880. sellHandler.addOrder(orderId, timestamp, price, dss);
  881. }
  882. }
  883.  
  884. public void deleteOrder(String side, String orderId, double price, String orderType, String timestamp,
  885. String dss) throws SQLException
  886. {
  887. if (price == 0 && (orderType == null || StringUtils.equals(orderType, "1")))
  888. {
  889. // handle zero price
  890. executePreparedStatementSpecParams(insertDeleteRowWithSpecParams, timestamp, dss, side, orderId, timestamp);
  891. }
  892. else
  893. {
  894. if ("66".equalsIgnoreCase(side))
  895. buyHandler.deleteOrder(orderId, price, timestamp, dss);
  896. else if ("83".equalsIgnoreCase(side))
  897. sellHandler.deleteOrder(orderId, price, timestamp, dss);
  898. }
  899. }
  900.  
  901. public void modifyOrder(String side, String orderId, String timestamp, double oldPrice, double newPrice,
  902. String newSize, String orderType, String dss) throws SQLException
  903. {
  904. if (oldPrice == 0 && newPrice == 0 && (orderType == null || StringUtils.equals(orderType, "1")))
  905. {
  906. // handle zero price
  907. executePreparedStatement(insertFromAllData, side, orderId, timestamp);
  908. }
  909. else
  910. {
  911. if ("66".equalsIgnoreCase(side))
  912. buyHandler.modifyOrder(orderId, timestamp, oldPrice, newPrice, newSize, dss);
  913. else if ("83".equalsIgnoreCase(side))
  914. sellHandler.modifyOrder(orderId, timestamp, oldPrice, newPrice, newSize, dss);
  915. }
  916. }
  917.  
  918. public void clear() throws SQLException
  919. {
  920. buyHandler.clear();
  921. sellHandler.clear();
  922. }
  923.  
  924. public void nextSnapshot(String newSnapshotTable, String newTempSnapshotTable) throws SQLException
  925. {
  926. buyHandler.nextSnapshot(newSnapshotTable, newTempSnapshotTable);
  927. sellHandler.nextSnapshot(newSnapshotTable, newTempSnapshotTable);
  928. this.snapshotTable = newSnapshotTable;
  929. }
  930. }
  931.  
  932. private class PriceTopHandler
  933. {
  934. private class OrderInfo
  935. {
  936. private final String orderId;
  937. private final String timestamp;
  938.  
  939. public OrderInfo(String orderId)
  940. {
  941. this(orderId, null);
  942. }
  943.  
  944. public OrderInfo(String orderId, String timestamp)
  945. {
  946. this.orderId = orderId;
  947. this.timestamp = timestamp;
  948. }
  949.  
  950. public String getOrderId()
  951. {
  952. return orderId;
  953. }
  954.  
  955. public String getTimestamp()
  956. {
  957. return timestamp;
  958. }
  959.  
  960. @Override
  961. public boolean equals(Object o)
  962. {
  963. if (this == o)
  964. return true;
  965.  
  966. if (o == null || getClass() != o.getClass())
  967. return false;
  968.  
  969. OrderInfo orderInfo = (OrderInfo) o;
  970. return orderId.equals(orderInfo.orderId);
  971. }
  972.  
  973. @Override
  974. public int hashCode()
  975. {
  976. return orderId.hashCode();
  977. }
  978. }
  979.  
  980. private static final int TOP_SIZE = 5;
  981. private final TreeMap<Double, Set<OrderInfo>> top;
  982. private final TreeMap<Double, Set<OrderInfo>> rest;
  983. private final Comparator<Double> comparator;
  984. private String tempSnapshotTable;
  985. private String snapshotTable;
  986. private final Connection connection;
  987. private String orderDataSourceTable;
  988. private final String side;
  989.  
  990. public PriceTopHandler(boolean isBuy, Connection connection, String snapshotTable, String tempSnapshotTable)
  991. {
  992. this.connection = connection;
  993. this.tempSnapshotTable = tempSnapshotTable;
  994. this.snapshotTable = snapshotTable;
  995.  
  996. // First = worst, last = best
  997. comparator = isBuy ? Comparator.naturalOrder() : Comparator.reverseOrder();
  998. side = isBuy ? "66" : "83";
  999.  
  1000. top = new TreeMap<>(comparator);
  1001. rest = new TreeMap<>(comparator);
  1002. }
  1003.  
  1004. public void setOrderDataSourceTable(String orderDataSourceTable)
  1005. {
  1006. this.orderDataSourceTable = orderDataSourceTable;
  1007. }
  1008.  
  1009. public void addOrder(String orderId, String timestamp, double price, String dss) throws SQLException
  1010. {
  1011. addOrder(orderId, timestamp, price, dss, false);
  1012. }
  1013.  
  1014. private void addOrder(String orderId, String timestamp, double price, String dss, boolean isModify)
  1015. throws SQLException
  1016. {
  1017. BiFunction<Double, Set<OrderInfo>, Set<OrderInfo>> ADD_ORDER_ID = (k, set) ->
  1018. {
  1019. if (set == null)
  1020. set = new HashSet<>();
  1021.  
  1022. set.add(new OrderInfo(orderId, timestamp));
  1023. return set;
  1024. };
  1025.  
  1026. if (top.size() == TOP_SIZE && comparator.compare(price, top.firstKey()) < 0)
  1027. { // 1.5 - worse then top-5, record go to the 'rest'
  1028. rest.compute(price, ADD_ORDER_ID);
  1029.  
  1030. if (!isModify)
  1031. {
  1032. // add to temp
  1033. executePreparedStatement(insertAddRowToTempSnapshot, side, orderId, timestamp);
  1034. }
  1035. }
  1036. else
  1037. {
  1038. // 1.2, 1.3
  1039. top.compute(price, ADD_ORDER_ID);
  1040.  
  1041. if (!isModify)
  1042. {
  1043. // add AddOrder to snapshot
  1044. executePreparedStatement(insertFromAllData, side, orderId, timestamp);
  1045. }
  1046.  
  1047. if (top.size() > TOP_SIZE)
  1048. { // 1.4
  1049. Map.Entry<Double, Set<OrderInfo>> worst = top.pollFirstEntry(); // poll worse price
  1050. rest.put(worst.getKey(), worst.getValue());
  1051.  
  1052. if (!isModify)
  1053. {
  1054. // add to temp as AddOrder
  1055. // Remove from snapshot (add row to snapshot as DeleteOrder)
  1056. for (OrderInfo oi : worst.getValue())
  1057. {
  1058. executePreparedStatement(insertAddRowToTempSnapshot, side, oi.getOrderId(), oi.getTimestamp());
  1059. executePreparedStatementSpecParams(insertDeleteRowWithSpecParams, timestamp, dss, side,
  1060. oi.getOrderId(), oi.getTimestamp());
  1061. }
  1062. }
  1063. }
  1064. }
  1065. }
  1066.  
  1067. public void deleteOrder(String orderId, double price, String timestamp, String dss) throws SQLException
  1068. {
  1069. deleteOrder(orderId, price, timestamp, dss, false);
  1070. }
  1071.  
  1072. private void deleteOrder(String orderId, double price, String timestamp, String dss, boolean isModify)
  1073. throws SQLException
  1074. {
  1075. if (top.containsKey(price))
  1076. {
  1077. if (!isModify)
  1078. {
  1079. // delete from snapshot (Add delete row)
  1080. executePreparedStatement(insertDeleteRowToSnapshot, side, orderId, timestamp);
  1081. }
  1082. Set<OrderInfo> orderIdsByPrice = top.get(price);
  1083. orderIdsByPrice.remove(new OrderInfo(orderId)); // 3.1.1, 3.1.2
  1084.  
  1085. if (orderIdsByPrice.isEmpty()) // add from Temp
  1086. { // 3.1.1
  1087. top.remove(price);
  1088.  
  1089. if (!rest.isEmpty())
  1090. {
  1091. double bestFromTemp = rest.lastKey(); // get best price from temp
  1092. top.put(bestFromTemp, rest.remove(bestFromTemp));
  1093.  
  1094. if (!isModify)
  1095. {
  1096. for (OrderInfo oi : top.get(bestFromTemp))
  1097. {
  1098. // add to snapshot - data can be either in OrderBook, or in Source
  1099. executePreparedStatementSpecParams(insertAddRowWithSpecParamsOrderBook, timestamp,
  1100. dss, side, oi.getOrderId(), oi.getTimestamp());
  1101. executePreparedStatementSpecParams(insertAddRowWithSpecParamsSource, timestamp,
  1102. dss, side, oi.getOrderId(), oi.getTimestamp());
  1103. // remove from temp
  1104. executePreparedStatement(deleteFromTemp, side, oi.getOrderId(), oi.getTimestamp());
  1105. }
  1106. }
  1107. }
  1108. }
  1109. }
  1110. else
  1111. {
  1112. rest.remove(price);
  1113.  
  1114. if (!isModify)
  1115. {
  1116. // HARD delete from temp
  1117. currentQueryParams.put("OrderId", String.valueOf(orderId));
  1118. currentQueryParams.put("Side", side); // in Temp only Add Order, so we do
  1119. // not need Message Timestamp
  1120. executeParametrizedQuery(connection,
  1121. format("DELETE FROM %s WHERE Side='#Side' AND Order_ID='#OrderId'",
  1122. tempSnapshotTable), currentQueryParams);
  1123. }
  1124. }
  1125. }
  1126.  
  1127. private boolean containsInTop(String orderId, double oldPrice)
  1128. {
  1129. if (!top.containsKey(oldPrice))
  1130. return false;
  1131.  
  1132. for (OrderInfo oi : top.get(oldPrice))
  1133. {
  1134. if (oi.getOrderId().equals(orderId))
  1135. return true;
  1136. }
  1137. return false;
  1138. }
  1139.  
  1140. public void modifyOrder(String orderId, String timestamp, double oldPrice, double newPrice, String newSize,
  1141. String dss) throws SQLException
  1142. {
  1143. // write modify
  1144. boolean wasTop = containsInTop(orderId, oldPrice);
  1145. Integer numberOfOrdersWithOldPrice = top.containsKey(oldPrice) ? top.get(oldPrice).size() : null;
  1146. double wasWorseTopPrice = top.isEmpty() ? 0 : top.firstKey();
  1147. double wasBestRestPrice = rest.isEmpty() ? 0 : rest.firstKey();
  1148. OrderInfo orderInfoBeforeChange = null;
  1149. if (!wasTop)
  1150. {
  1151. if (rest.containsKey(oldPrice))
  1152. {
  1153. for (OrderInfo oi : rest.get(oldPrice))
  1154. {
  1155. if (oi.getOrderId().equals(orderId))
  1156. orderInfoBeforeChange = oi;
  1157. }
  1158. }
  1159. }
  1160.  
  1161. deleteOrder(orderId, oldPrice, null, null, true);
  1162. addOrder(orderId, timestamp, newPrice, null, true);
  1163.  
  1164. boolean isTop = containsInTop(orderId, newPrice);
  1165.  
  1166. if (wasTop)
  1167. {
  1168. if (isTop)
  1169. {
  1170. // Add modify record
  1171. executePreparedStatement(insertFromAllData, side, orderId, timestamp);
  1172.  
  1173. if (numberOfOrdersWithOldPrice != null && numberOfOrdersWithOldPrice > 1 && top.get(newPrice).size() == 1) // new price push out worse price from top
  1174. {
  1175. Double restFirstKey = rest.isEmpty() ? null : rest.firstKey(); // best in rest
  1176. if (restFirstKey != null)
  1177. {
  1178. for (OrderInfo oi : rest.get(restFirstKey))
  1179. {
  1180. // add delete to snap
  1181. executePreparedStatementSpecParams(insertDeleteRowWithSpecParams, timestamp, dss,
  1182. side, oi.getOrderId(), oi.getTimestamp());
  1183. // add to temp
  1184. executePreparedStatement(insertAddRowToTempSnapshot, side, oi.getOrderId(), oi.getTimestamp());
  1185. }
  1186. }
  1187. }
  1188. if (newPrice == wasBestRestPrice)
  1189. {
  1190. for (OrderInfo oi : top.get(wasBestRestPrice))
  1191. {
  1192. if (!oi.getOrderId().equals(orderId))
  1193. {
  1194. // Add record
  1195. executePreparedStatementSpecParams(insertFromAllDataWithSpecParamsSource, timestamp,
  1196. dss, side, oi.getOrderId(), oi.getTimestamp());
  1197. }
  1198. }
  1199. }
  1200. }
  1201. else // wasTop && !isTop
  1202. {
  1203. for (OrderInfo oi : rest.get(newPrice))
  1204. {
  1205. executePreparedStatement(insertDeleteRowFromModifyToSnap, side, oi.getOrderId(), oi.getTimestamp());
  1206. executePreparedStatement(insertAddRowToTempSnapshot, side, oi.getOrderId(), oi.getTimestamp());
  1207. }
  1208.  
  1209. double worseTopPrice = top.firstKey(); // worse in top
  1210. if (wasWorseTopPrice != worseTopPrice)
  1211. {
  1212. for (OrderInfo oi : top.get(worseTopPrice))
  1213. {
  1214. // add to snap - data can be either in OrderBook, or in Source
  1215. executePreparedStatementSpecParams(insertAddRowWithSpecParamsOrderBook, timestamp, dss,
  1216. side, oi.getOrderId(), oi.getTimestamp());
  1217. executePreparedStatementSpecParams(insertAddRowWithSpecParamsSource, timestamp, dss, side,
  1218. oi.getOrderId(), oi.getTimestamp());
  1219. // remove hard from temp
  1220. executePreparedStatement(deleteFromTemp, side, oi.getOrderId(), oi.getTimestamp());
  1221. }
  1222. }
  1223. }
  1224. }
  1225. else // !wasTop
  1226. {
  1227. if (isTop)
  1228. {
  1229. // add to snap
  1230. executePreparedStatement(insertAddRowToSnapshot, side, orderId, timestamp);
  1231. // remove hard from temp
  1232. executePreparedStatement(deleteFromTemp, side, orderInfoBeforeChange.getOrderId(),
  1233. orderInfoBeforeChange.getTimestamp());
  1234.  
  1235. if (top.get(newPrice).size() == 1) // some orders were pushed off from top by order with new price
  1236. {
  1237. Double restLastKey = rest.isEmpty() ? null : rest.lastKey(); // best in rest
  1238. if (restLastKey != null)
  1239. {
  1240. for (OrderInfo oi : rest.get(restLastKey))
  1241. {
  1242. // add delete to snap
  1243. executePreparedStatementSpecParams(insertDeleteRowWithSpecParams, timestamp, dss,
  1244. side, oi.getOrderId(), oi.getTimestamp());
  1245. // add to temp
  1246. executePreparedStatement(insertAddRowToTempSnapshot, side, oi.getOrderId(), oi.getTimestamp());
  1247. }
  1248. }
  1249. }
  1250. }
  1251. else // !isTop
  1252. {
  1253. // remove hard from temp
  1254. executePreparedStatement(deleteFromTemp, side, orderId, timestamp);
  1255. // add to temp
  1256. executePreparedStatement(insertAddRowToTempSnapshot, side, orderId, timestamp);
  1257. }
  1258. }
  1259. }
  1260.  
  1261. public void clear() throws SQLException
  1262. {
  1263. top.values().stream()
  1264. .flatMap(Collection::stream)
  1265. .forEach(orderInfo -> {
  1266. // delete from snapshot (add row as DeleteOder to snap)
  1267. try
  1268. {
  1269. executePreparedStatement(insertDeleteRowToSnapshot, side, orderInfo.getOrderId(),
  1270. orderInfo.getTimestamp());
  1271. }
  1272. catch (SQLException e)
  1273. {
  1274. throw new ResultException("Error while inserting DeleteOrder row.", e);
  1275. }
  1276. });
  1277.  
  1278. top.clear();
  1279.  
  1280. for (Iterator<Double> it = rest.descendingKeySet().iterator(); it.hasNext() && top.size() < TOP_SIZE; )
  1281. {
  1282. double price = it.next();
  1283. top.put(price, rest.get(price));
  1284. it.remove();
  1285.  
  1286. for (OrderInfo oi : top.get(price))
  1287. {
  1288. // add to snapshot
  1289. executePreparedStatement(insertFromAllData, side, oi.getOrderId(), oi.getTimestamp());
  1290. // remove from temp
  1291. executePreparedStatement(deleteFromTemp, side, oi.getOrderId(), oi.getTimestamp());
  1292. }
  1293. }
  1294. }
  1295.  
  1296. public void nextSnapshot(String newSnapshotTable, String newTempSnapshotTable) throws SQLException
  1297. {
  1298. insertFromAllData = connection.prepareStatement(format(loadQuery("insertFromAllData.sql"),
  1299. newTempSnapshotTable, tempSnapshotTable));
  1300. rest.values().stream()
  1301. .flatMap(Collection::stream)
  1302. .forEach(orderData -> {
  1303. // select last transaction with orderId and timestamp and insert into new table (temp
  1304. // snapshot)
  1305. try
  1306. {
  1307. executePreparedStatement(insertFromAllData, side, orderData.getOrderId(), orderData.getTimestamp());
  1308. }
  1309. catch (SQLException e)
  1310. {
  1311. throw new ResultException(format("Error while transferring data from %s to %s",
  1312. tempSnapshotTable, newTempSnapshotTable), e);
  1313. }
  1314. });
  1315. insertFromAllData.executeBatch();
  1316.  
  1317. snapshotTable = newSnapshotTable;
  1318. tempSnapshotTable = newTempSnapshotTable;
  1319. }
  1320. }
  1321.  
  1322. private void executePreparedStatement(PreparedStatement ps, String side, String orderId, String timestamp)
  1323. throws SQLException
  1324. {
  1325. ps.setString(1, side);
  1326. ps.setString(2, orderId);
  1327. ps.setString(3, timestamp);
  1328. ps.addBatch();
  1329. batchSize.compute(ps, (k, v) -> v == null ? 0 : v + 1);
  1330. if (batchSize.get(ps) == MAX_BATCH_SIZE)
  1331. {
  1332. ps.executeBatch();
  1333. batchSize.remove(ps);
  1334. }
  1335. }
  1336.  
  1337. private void executePreparedStatementSpecParams(PreparedStatement ps, String timestampToInsert, String dss,
  1338. String side, String orderId, String timestampToSelect) throws SQLException
  1339. {
  1340. ps.setString(1, timestampToInsert);
  1341. ps.setString(2, dss);
  1342. ps.setString(3, side);
  1343. ps.setString(4, orderId);
  1344. ps.setString(5, timestampToSelect);
  1345. ps.addBatch();
  1346. batchSize.compute(ps, (k, v) -> v == null ? 0 : v + 1);
  1347. if (batchSize.get(ps) == MAX_BATCH_SIZE)
  1348. {
  1349. ps.executeBatch();
  1350. batchSize.remove(ps);
  1351. }
  1352. }
  1353. }
  1354.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement