Advertisement
ibragimova_mariam

Untitled

Nov 3rd, 2020
62
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 58.71 KB | None | 0 0
  1. public class DelayedDataSnapshot extends Action
  2. {
  3. private static final String QUERIES_PATH = getInstance().getConfigFiles().getCfgDir() +
  4. "delayedOrderBookSnapshot/sql/";
  5. private static final String CREATE_RESULT_TABLE = "createResultTable.sql";
  6.  
  7. private static final String SELECT_ALL_MESSAGES_BY_TIME = "SELECT * FROM %s WHERE " +
  8. "'#LeftTimestamp'<MessageTimestamp AND MessageTimestamp<='#RightTimestamp' ORDER BY MessageTimestamp";
  9. private static final int MAX_BATCH_SIZE = 100;
  10.  
  11. private static final String MESSAGE_TYPE = "Message_Type";
  12. private static final String SIDE = "Side";
  13. private static final String ORDER_ID = "Order_ID";
  14. private static final String PRICE = "Price";
  15. private static final String MESSAGE_TIMESTAMP = "MessageTimestamp";
  16.  
  17. private static final String AGGREGATED_DATA_TABLE = "AggregatedData";
  18. private static final String PREVIOUS_SNAPSHOT_TABLE = "PreviousSnapshot", CURRENT_SNAPSHOT_TABLE = "CurrentSnapshot";
  19. private static final String TEMP_PREVIOUS_SNAPSHOT_TABLE = "TempPreviousSnapshot", TEMP_CURRENT_SNAPSHOT_TABLE =
  20. "TempCurrentSnapshot";
  21.  
  22. private static String insertFromAllDataQuery;
  23.  
  24. private GdlGlobalContext globalContext;
  25. private String addOrderTable, modifyOrderTable, deleteOrderTable, orderBookClearTable, orderBookSnapshotTable,
  26. orderBookSnapshotTempTable;
  27. private String cthEndTime;
  28. private String dbConnectionName;
  29. private SnapshotHandler snapshotHandler;
  30. private String previousSnapshotTime, currentSnapshotTime, orderBookSnapshotTime;
  31. private Map<String, String> currentQueryParams;
  32. private PreparedStatement insertFromAllData, insertAddRowToSnapshot, insertAddRowToTempSnapshot,
  33. insertDeleteRowToSnapshot, insertDeleteRowWithSpecParams, insertFromAllDataWithSpecParamsSource,
  34. insertDeleteRowFromModifyToSnap, deleteFromTemp, insertAddRowWithSpecParamsSource,
  35. insertAddRowWithSpecParamsOrderBook, worseBuyPriceInTop5OneInstrument, worseSellPriceInTop5OneInstrument,
  36. worseBuyPriceInTop5AllInstrument, worseSellPriceInTop5AllInstrument,
  37. bestTop5OneInstrument, bestRest5OneInstrument, bestTop5AllInstrument, bestRest5AllInstrument;
  38. private Map<PreparedStatement, Integer> batchSize;
  39.  
  40. private String finalPreviousSnapshotTable, finalCurrentSnapshotTable;
  41.  
  42. @Override
  43. protected Result run(StepContext stepContext, MatrixContext matrixContext, GlobalContext globalContext)
  44. throws ResultException
  45. {
  46. this.globalContext = (GdlGlobalContext) globalContext;
  47. initParameters();
  48.  
  49. batchSize = new HashMap<>();
  50. logger.debug("CthEndTime: {} ({})", cthEndTime, GdlTimestampUtils.nanosStringToDateWithFractions(cthEndTime,
  51. "6"));
  52. currentSnapshotTime = GdlTimestampUtils.timestampNanosToRoundedTimestampMinusMinutesStr(cthEndTime, 0);
  53. previousSnapshotTime = GdlTimestampUtils.timestampNanosToRoundedTimestampMinusMinutesStr(currentSnapshotTime, 1);
  54. orderBookSnapshotTime =
  55. GdlTimestampUtils.timestampNanosToRoundedTimestampMinusMinutesStr(previousSnapshotTime, 1);
  56. logger.debug("CurrentSnapshotTime: {} ({})", currentSnapshotTime,
  57. GdlTimestampUtils.nanosStringToDateWithFractions(currentSnapshotTime, "6"));
  58. logger.debug("PreviousSnapshotTime: {} ({})", previousSnapshotTime,
  59. GdlTimestampUtils.nanosStringToDateWithFractions(previousSnapshotTime, "6"));
  60. logger.debug("OrderBookSnapshotTime: {} ({})", orderBookSnapshotTime,
  61. GdlTimestampUtils.nanosStringToDateWithFractions(orderBookSnapshotTime, "6"));
  62.  
  63. try
  64. {
  65. executeQueries();
  66. }
  67. catch (SQLException e)
  68. {
  69. String errMessage = "Error while committing changes";
  70. this.logger.error(errMessage, e);
  71. throw new ResultException(errMessage, e);
  72. }
  73.  
  74. return DefaultResult.passed("Two snapshots were made successfully, result in two tables - 'FinalPreviousSnapshotTable' " +
  75. "and 'FinalCurrentSnapshotTable'.");
  76. }
  77.  
  78. private void initParameters()
  79. {
  80. this.getLogger().debug("Initializing special action parameters");
  81. InputParamsHandler handler = new InputParamsHandler(this.inputParams);
  82.  
  83. this.addOrderTable = handler.getRequiredString("AddOrderTable");
  84. this.modifyOrderTable = handler.getRequiredString("ModifyOrderTable");
  85. this.deleteOrderTable = handler.getRequiredString("DeleteOrderTable");
  86. this.orderBookClearTable = handler.getRequiredString("OrderBookClearTable");
  87. this.orderBookSnapshotTable = handler.getRequiredString("OrderBookSnapshotTable");
  88.  
  89. handler.getRequiredString("Instrument");
  90. handler.getRequiredString("SourceVenue");
  91. this.cthEndTime = handler.getRequiredString("CTHEndTime");
  92.  
  93. this.dbConnectionName = handler.getRequiredString("DbConnection");
  94.  
  95. this.finalPreviousSnapshotTable = handler.getRequiredString("FinalPreviousSnapshotTable");
  96. this.finalCurrentSnapshotTable = handler.getRequiredString("FinalCurrentSnapshotTable");
  97.  
  98. handler.check();
  99.  
  100. this.orderBookSnapshotTempTable = orderBookSnapshotTable + "TempForDelayed";
  101. insertFromAllDataQuery = loadQuery("insertFromAllData.sql");
  102. }
  103.  
  104. private void createIndexes(Connection connection, String table)
  105. {
  106. String orderIdIndex = format("CREATE INDEX idx_orderid%1$s ON %1$s (Order_ID)", table);
  107. String messageTimestampIndex = format("CREATE INDEX idx_message_timestamp%1$s ON %1$s (MessageTimestamp)",
  108. table);
  109. executeQuery(connection, orderIdIndex);
  110. executeQuery(connection, messageTimestampIndex);
  111. }
  112.  
  113. private void dropIndexes(Connection connection, String table)
  114. {
  115. String orderIdIndex = format("DROP INDEX idx_orderid%1$s ON %1$s", table);
  116. String messageTimestampIndex = format("DROP INDEX idx_message_timestamp%1$s ON %1$s",
  117. table);
  118. executeQuery(connection, orderIdIndex);
  119. executeQuery(connection, messageTimestampIndex);
  120. }
  121.  
  122. private void executeQueries() throws SQLException
  123. {
  124. Connection connection = this.globalContext.getDbConnectionOrResultEx(dbConnectionName);
  125. connection.setAutoCommit(false);
  126. currentQueryParams = new HashMap<>(inputParams);
  127.  
  128. // Create tables
  129. executeQuery(connection, format(loadQuery(CREATE_RESULT_TABLE), AGGREGATED_DATA_TABLE)); // table for add, modify,
  130. // delete and clear table data
  131. createIndexes(connection, AGGREGATED_DATA_TABLE);
  132. executeQuery(connection, format(loadQuery(CREATE_RESULT_TABLE), TEMP_PREVIOUS_SNAPSHOT_TABLE));
  133. createIndexes(connection, TEMP_PREVIOUS_SNAPSHOT_TABLE);
  134. executeQuery(connection, format(loadQuery(CREATE_RESULT_TABLE), PREVIOUS_SNAPSHOT_TABLE));
  135. createIndexes(connection, PREVIOUS_SNAPSHOT_TABLE);
  136. executeQuery(connection, format(loadQuery(CREATE_RESULT_TABLE), TEMP_CURRENT_SNAPSHOT_TABLE));
  137. createIndexes(connection, TEMP_CURRENT_SNAPSHOT_TABLE);
  138. executeQuery(connection, format(loadQuery(CREATE_RESULT_TABLE), CURRENT_SNAPSHOT_TABLE));
  139. createIndexes(connection, CURRENT_SNAPSHOT_TABLE);
  140.  
  141. executeQuery(connection, format(loadQuery(CREATE_RESULT_TABLE), orderBookSnapshotTempTable));
  142. createIndexes(connection, orderBookSnapshotTempTable);
  143.  
  144. executeQuery(connection, format(loadQuery(CREATE_RESULT_TABLE), finalPreviousSnapshotTable));
  145. createIndexes(connection, finalPreviousSnapshotTable);
  146. executeQuery(connection, format(loadQuery(CREATE_RESULT_TABLE), finalCurrentSnapshotTable));
  147. createIndexes(connection, finalCurrentSnapshotTable);
  148.  
  149. snapshotHandler = new SnapshotHandler(connection, finalPreviousSnapshotTable, TEMP_PREVIOUS_SNAPSHOT_TABLE);
  150.  
  151. String instrument = inputParams.get("Instrument");
  152. if (instrument.trim().equalsIgnoreCase("all")) // we need to execute action for all instruments from input
  153. // tables.
  154. {
  155. worseBuyPriceInTop5OneInstrument = connection.prepareStatement(format("SELECT MIN(Price) AS Price FROM " +
  156. "(SELECT DISTINCT Price FROM %s WHERE Message_Type='70' AND Side='66' " +
  157. "AND Instrument_ID=? ORDER BY Price DESC LIMIT 5) AS distPrices", orderBookSnapshotTempTable));
  158. worseSellPriceInTop5OneInstrument = connection.prepareStatement(format("SELECT MAX(Price) AS Price FROM " +
  159. "(SELECT DISTINCT Price FROM %s WHERE Message_Type='70' AND Side='83' " +
  160. "AND Instrument_ID=? ORDER BY Price LIMIT 5) as distPrices", orderBookSnapshotTempTable));
  161.  
  162. cleanAllTempTables(connection);
  163. // Fill allTablesData table (all instruments)
  164. executeParametrizedQuery(connection, format(loadQuery("insertFromAddOrderAllInstruments.sql"),
  165. AGGREGATED_DATA_TABLE, addOrderTable), currentQueryParams);
  166. connection.commit();
  167. executeParametrizedQuery(connection, format(loadQuery("insertFromModifyOrderAllInstruments.sql"),
  168. AGGREGATED_DATA_TABLE, modifyOrderTable), currentQueryParams);
  169. connection.commit();
  170. executeParametrizedQuery(connection, format(loadQuery("insertFromOrderDeleteAllInstruments.sql"),
  171. AGGREGATED_DATA_TABLE, deleteOrderTable), currentQueryParams);
  172. connection.commit();
  173. executeParametrizedQuery(connection, format(loadQuery("insertFromOrderBookClearAllInstruments.sql"),
  174. AGGREGATED_DATA_TABLE, orderBookClearTable), currentQueryParams);
  175. connection.commit();
  176.  
  177. // Fill temp OrderBookSnapshot
  178. currentQueryParams.put("Timestamp", orderBookSnapshotTime);
  179. executeParametrizedQuery(connection, format(loadQuery(
  180. "insertFromOrderBookSnapToTempAllInstruments" +
  181. ".sql"), orderBookSnapshotTempTable, orderBookSnapshotTable), currentQueryParams);
  182. connection.commit();
  183.  
  184. insertFromAllData = connection.prepareStatement(format(loadQuery("insertFromAllData.sql"),
  185. finalPreviousSnapshotTable, orderBookSnapshotTempTable));
  186. insertAddRowToTempSnapshot = connection.prepareStatement(format(loadQuery("insertAddRowToSnapshot.sql"),
  187. TEMP_PREVIOUS_SNAPSHOT_TABLE, orderBookSnapshotTempTable));
  188.  
  189. try(DbDataReader instrumentReader = getTableReader(connection, format("SELECT DISTINCT Instrument_ID FROM %s",
  190. orderBookSnapshotTempTable), currentQueryParams)) // Add to PrevSnapshot
  191. {
  192. instrumentReader.start();
  193. while (instrumentReader.hasMoreData())
  194. {
  195. TableRow<String, String> tableRow = instrumentReader.readRow();
  196. String currentInstrument = tableRow.getValue("Instrument_ID");
  197.  
  198. logger.debug("Current instrument (read from OrderBookSnapshot table): {}.", currentInstrument);
  199. currentQueryParams.put("Instrument", currentInstrument);
  200.  
  201. snapshotHandler.setOrderDataSourceTable(orderBookSnapshotTempTable);
  202.  
  203. bestTop5OneInstrument = connection.prepareStatement(format(
  204. "SELECT * from %s WHERE Message_Type='70' AND Instrument_ID=? " +
  205. "AND ((Side='66' AND Price>=?) OR (Side='83' AND Price<=?) " +
  206. "OR (Order_Type='1' AND Price='0'))", orderBookSnapshotTempTable));
  207. bestRest5OneInstrument = connection.prepareStatement(format(
  208. "SELECT * from %s WHERE Message_Type='70' AND Instrument_ID=? " +
  209. "AND NOT((Side='66' AND Price>=?) OR (Side='83' AND Price<=?) " +
  210. "OR (Order_Type='1' AND Price='0'))", orderBookSnapshotTempTable));
  211.  
  212. bestTop5OneInstrument.setString(1, currentInstrument);
  213. bestRest5OneInstrument.setString(1, currentInstrument);
  214.  
  215. String minBuyTop5Price = getWorsePriceInTop5( true, true, currentInstrument);
  216. logger.trace("Min Buy Top-5 Price in OrderBookSnapshot table: {}", minBuyTop5Price);
  217. bestTop5OneInstrument.setString(2, minBuyTop5Price);
  218. bestRest5OneInstrument.setString(2, minBuyTop5Price);
  219.  
  220. String maxSellTop5Price = getWorsePriceInTop5(false, true, currentInstrument);
  221. logger.trace("Max Sell Top-5 Price in OrderBookSnapshot table: {}", maxSellTop5Price);
  222. bestTop5OneInstrument.setString(3, maxSellTop5Price);
  223. bestRest5OneInstrument.setString(3, maxSellTop5Price);
  224.  
  225. try (DbDataReader reader = new DbDataReader(bestTop5OneInstrument)) // Add to PrevSnapshot
  226. {
  227. reader.start();
  228. while (reader.hasMoreData())
  229. {
  230. addRecord(reader.readRow(), currentInstrument);
  231. }
  232. }
  233. try (DbDataReader reader = new DbDataReader(bestRest5OneInstrument)) // Add to TempPrevSnapshot
  234. {
  235. reader.start();
  236. while (reader.hasMoreData())
  237. {
  238. addRecord(reader.readRow(), currentInstrument);
  239. }
  240. }
  241. // Stop reading data from OrderBookSnapshot
  242. }
  243. }
  244. catch (IOException e)
  245. {
  246. String errMessage = "Error while select rows from OrderBookSnapshotTempTable";
  247. this.logger.error(errMessage, e);
  248. throw new ResultException(errMessage, e);
  249. }
  250. executeBatch(insertFromAllData, "insertFromAllData");
  251. executeBatch(insertAddRowToTempSnapshot, "insertAddRowToTempSnapshot");
  252. Utils.closeResource(insertFromAllData);
  253. Utils.closeResource(insertAddRowToTempSnapshot);
  254. Utils.closeResource(worseBuyPriceInTop5OneInstrument);
  255. Utils.closeResource(worseSellPriceInTop5OneInstrument);
  256. Utils.closeResource(bestTop5OneInstrument);
  257. Utils.closeResource(bestRest5OneInstrument);
  258. connection.commit();
  259.  
  260. snapshotHandler.setOrderDataSourceTable(AGGREGATED_DATA_TABLE);
  261.  
  262. insertFromAllData = connection.prepareStatement(format(loadQuery("insertFromAllData.sql"),
  263. finalPreviousSnapshotTable, AGGREGATED_DATA_TABLE));
  264. insertAddRowToSnapshot = connection.prepareStatement(format(loadQuery("insertAddRowToSnapshot.sql"),
  265. finalPreviousSnapshotTable, AGGREGATED_DATA_TABLE));
  266. insertAddRowToTempSnapshot = connection.prepareStatement(format(loadQuery("insertAddRowToSnapshot.sql"),
  267. TEMP_PREVIOUS_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  268. insertDeleteRowToSnapshot = connection.prepareStatement(format(loadQuery("insertDeleteRowToSnapshot.sql"),
  269. finalPreviousSnapshotTable, AGGREGATED_DATA_TABLE));
  270. insertDeleteRowWithSpecParams = connection.prepareStatement(format(loadQuery(
  271. "insertDeleteRowWithSpecificParams.sql"), finalPreviousSnapshotTable, AGGREGATED_DATA_TABLE));
  272. insertFromAllDataWithSpecParamsSource = connection.prepareStatement(format(loadQuery(
  273. "insertFromAllDataWithSpecificParams.sql"), finalPreviousSnapshotTable, AGGREGATED_DATA_TABLE));
  274. insertDeleteRowFromModifyToSnap = connection.prepareStatement(format(loadQuery(
  275. "insertDeleteRowFromModifyToSnapshot.sql"), finalPreviousSnapshotTable, AGGREGATED_DATA_TABLE));
  276. deleteFromTemp = connection.prepareStatement(format("DELETE FROM %s WHERE Side=? AND Order_ID=? AND MessageTimestamp=?",
  277. TEMP_PREVIOUS_SNAPSHOT_TABLE));
  278. insertAddRowWithSpecParamsSource = connection.prepareStatement(format(loadQuery(
  279. "insertAddRowWithSpecificParams.sql"), finalPreviousSnapshotTable, AGGREGATED_DATA_TABLE));
  280. insertAddRowWithSpecParamsOrderBook = connection.prepareStatement(format(loadQuery(
  281. "insertAddRowWithSpecificParams.sql"), finalPreviousSnapshotTable, orderBookSnapshotTempTable));
  282.  
  283. currentQueryParams.put("LeftTimestamp", orderBookSnapshotTime);
  284. currentQueryParams.put("RightTimestamp", previousSnapshotTime);
  285. createSnapshot(connection, String.format(SELECT_ALL_MESSAGES_BY_TIME,
  286. AGGREGATED_DATA_TABLE)); // first snapshot
  287.  
  288. executeBatch(insertFromAllData, "insertFromAllData");
  289. executeBatch(insertAddRowToSnapshot, "insertAddRowToSnapshot");
  290. executeBatch(insertAddRowToTempSnapshot, "insertAddRowToTempSnapshot");
  291. executeBatch(insertDeleteRowToSnapshot, "insertDeleteRowToSnapshot");
  292. executeBatch(insertDeleteRowWithSpecParams, "insertDeleteRowWithSpecParams");
  293. executeBatch(insertFromAllDataWithSpecParamsSource, "insertFromAllDataWithSpecParamsSource");
  294. executeBatch(insertDeleteRowFromModifyToSnap, "insertDeleteRowFromModifyToSnap");
  295. executeBatch(deleteFromTemp, "deleteFromTemp");
  296. executeBatch(insertAddRowWithSpecParamsOrderBook, "insertAddRowWithSpecParamsOrderBook");
  297. executeBatch(insertAddRowWithSpecParamsSource, "insertAddRowWithSpecParamsSource");
  298.  
  299. Utils.closeResource(insertFromAllData);
  300. Utils.closeResource(insertAddRowToSnapshot);
  301. Utils.closeResource(insertAddRowToTempSnapshot);
  302. Utils.closeResource(insertDeleteRowToSnapshot);
  303. Utils.closeResource(insertDeleteRowWithSpecParams);
  304. Utils.closeResource(insertFromAllDataWithSpecParamsSource);
  305. Utils.closeResource(insertDeleteRowFromModifyToSnap);
  306. Utils.closeResource(deleteFromTemp);
  307. Utils.closeResource(insertAddRowWithSpecParamsOrderBook);
  308. Utils.closeResource(insertAddRowWithSpecParamsSource);
  309. connection.commit();
  310.  
  311. // transfer data from prev snapshot to current
  312. snapshotHandler.nextSnapshot(finalCurrentSnapshotTable, TEMP_CURRENT_SNAPSHOT_TABLE);
  313.  
  314. insertFromAllData = connection.prepareStatement(format(loadQuery("insertFromAllData.sql"),
  315. finalCurrentSnapshotTable, AGGREGATED_DATA_TABLE));
  316. insertAddRowToSnapshot = connection.prepareStatement(format(loadQuery("insertAddRowToSnapshot.sql"),
  317. finalCurrentSnapshotTable, AGGREGATED_DATA_TABLE));
  318. insertAddRowToTempSnapshot = connection.prepareStatement(format(loadQuery("insertAddRowToSnapshot.sql"),
  319. TEMP_CURRENT_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  320. insertDeleteRowToSnapshot = connection.prepareStatement(format(loadQuery("insertDeleteRowToSnapshot.sql"),
  321. finalCurrentSnapshotTable, AGGREGATED_DATA_TABLE));
  322. insertDeleteRowWithSpecParams = connection.prepareStatement(format(loadQuery(
  323. "insertDeleteRowWithSpecificParams.sql"), finalCurrentSnapshotTable, AGGREGATED_DATA_TABLE));
  324. insertFromAllDataWithSpecParamsSource = connection.prepareStatement(format(loadQuery(
  325. "insertFromAllDataWithSpecificParams.sql"), finalCurrentSnapshotTable, AGGREGATED_DATA_TABLE));
  326. insertDeleteRowFromModifyToSnap = connection.prepareStatement(format(loadQuery(
  327. "insertDeleteRowFromModifyToSnapshot.sql"), finalCurrentSnapshotTable, AGGREGATED_DATA_TABLE));
  328. deleteFromTemp = connection.prepareStatement(format("DELETE FROM %s WHERE Side=? AND Order_ID=? AND MessageTimestamp=?", TEMP_CURRENT_SNAPSHOT_TABLE));
  329. insertAddRowWithSpecParamsSource = connection.prepareStatement(format(loadQuery(
  330. "insertAddRowWithSpecificParams.sql"), finalCurrentSnapshotTable, AGGREGATED_DATA_TABLE));
  331. insertAddRowWithSpecParamsOrderBook = connection.prepareStatement(format(loadQuery(
  332. "insertAddRowWithSpecificParams.sql"), finalCurrentSnapshotTable, orderBookSnapshotTempTable));
  333.  
  334. currentQueryParams.put("LeftTimestamp", previousSnapshotTime);
  335. currentQueryParams.put("RightTimestamp", currentSnapshotTime);
  336. createSnapshot(connection, String.format(SELECT_ALL_MESSAGES_BY_TIME,
  337. AGGREGATED_DATA_TABLE)); // second snapshot
  338.  
  339. executeBatch(insertFromAllData, "insertFromAllData");
  340. executeBatch(insertAddRowToSnapshot, "insertAddRowToSnapshot");
  341. executeBatch(insertAddRowToTempSnapshot, "insertAddRowToTempSnapshot");
  342. executeBatch(insertDeleteRowToSnapshot, "insertDeleteRowToSnapshot");
  343. executeBatch(insertDeleteRowWithSpecParams, "insertDeleteRowWithSpecParams");
  344. executeBatch(insertFromAllDataWithSpecParamsSource, "insertFromAllDataWithSpecParamsSource");
  345. executeBatch(insertDeleteRowFromModifyToSnap, "insertDeleteRowFromModifyToSnap");
  346. executeBatch(deleteFromTemp, "deleteFromTemp");
  347. executeBatch(insertAddRowWithSpecParamsSource, "insertAddRowWithSpecParamsSource");
  348. executeBatch(insertAddRowWithSpecParamsOrderBook, "insertAddRowWithSpecParamsOrderBook");
  349.  
  350. Utils.closeResource(insertFromAllData);
  351. Utils.closeResource(insertAddRowToSnapshot);
  352. Utils.closeResource(insertAddRowToTempSnapshot);
  353. Utils.closeResource(insertDeleteRowToSnapshot);
  354. Utils.closeResource(insertDeleteRowWithSpecParams);
  355. Utils.closeResource(insertFromAllDataWithSpecParamsSource);
  356. Utils.closeResource(insertDeleteRowFromModifyToSnap);
  357. Utils.closeResource(deleteFromTemp);
  358. Utils.closeResource(insertAddRowWithSpecParamsSource);
  359. Utils.closeResource(insertAddRowWithSpecParamsOrderBook);
  360. connection.commit();
  361.  
  362. dropIndexes(connection, AGGREGATED_DATA_TABLE);
  363. dropIndexes(connection, TEMP_PREVIOUS_SNAPSHOT_TABLE);
  364. dropIndexes(connection, PREVIOUS_SNAPSHOT_TABLE);
  365. dropIndexes(connection, TEMP_CURRENT_SNAPSHOT_TABLE);
  366. dropIndexes(connection, CURRENT_SNAPSHOT_TABLE);
  367. dropIndexes(connection, orderBookSnapshotTempTable);
  368. dropIndexes(connection, finalPreviousSnapshotTable);
  369. dropIndexes(connection, finalCurrentSnapshotTable);
  370.  
  371. connection.commit();
  372.  
  373. //transfer result to final tables
  374. /*executeParametrizedQuery(connection, format(loadQuery("insertToFinalSnapshot.sql"),
  375. finalPreviousSnapshotTable, PREVIOUS_SNAPSHOT_TABLE), currentQueryParams);
  376. executeParametrizedQuery(connection, format(loadQuery("insertToFinalSnapshot.sql"),
  377. finalCurrentSnapshotTable, CURRENT_SNAPSHOT_TABLE), currentQueryParams);*/
  378. }
  379. else
  380. {
  381. worseBuyPriceInTop5AllInstrument = connection.prepareStatement(format("SELECT MIN(Price) AS Price FROM " +
  382. "(SELECT DISTINCT Price FROM %s WHERE Message_Type='70' AND Side='66' ORDER BY Price DESC " +
  383. "LIMIT 5) as distPrices", orderBookSnapshotTempTable));
  384. worseSellPriceInTop5AllInstrument = connection.prepareStatement(format("SELECT MAX(Price) AS Price FROM " +
  385. "(SELECT DISTINCT Price FROM %s WHERE Message_Type='70' AND Side='83' ORDER BY Price " +
  386. "LIMIT 5) as distPrices", orderBookSnapshotTempTable));
  387.  
  388. String[] allInstruments = instrument.split(",");
  389. logger.debug("There are {} distinct instruments in input parameter 'Instrument'.", allInstruments.length);
  390.  
  391. for (String currentInstrument : allInstruments)
  392. {
  393. cleanAllTempTables(connection);
  394.  
  395.  
  396. currentInstrument = currentInstrument.trim();
  397. logger.debug("Current instrument: {}.", currentInstrument);
  398. currentQueryParams.put("Instrument", currentInstrument);
  399.  
  400. // Fill temp OrderBookSnapshot
  401. currentQueryParams.put("Timestamp", orderBookSnapshotTime);
  402. executeParametrizedQuery(connection, format(loadQuery("insertFromOrderBookSnapToTemp.sql"),
  403. orderBookSnapshotTempTable, orderBookSnapshotTable), currentQueryParams);
  404.  
  405. bestTop5AllInstrument = connection.prepareStatement(format("SELECT * from %s WHERE " +
  406. "Message_Type='70' AND ((Side='66' AND Price>=?) OR (Side='83' AND Price<=?) " +
  407. "OR (Order_Type='1' AND Price='0'))", orderBookSnapshotTempTable));
  408. bestRest5AllInstrument = connection.prepareStatement(format("SELECT * from %s WHERE " +
  409. "Message_Type='70' AND NOT((Side='66' AND Price>=?) OR (Side='83' AND Price<=?) " +
  410. "OR (Order_Type='1' AND Price='0'))", orderBookSnapshotTempTable));
  411. insertFromAllData = connection.prepareStatement(format(loadQuery("insertFromAllData.sql"),
  412. finalPreviousSnapshotTable, orderBookSnapshotTempTable));
  413. insertAddRowToTempSnapshot = connection.prepareStatement(format(loadQuery("insertAddRowToSnapshot.sql"),
  414. TEMP_PREVIOUS_SNAPSHOT_TABLE, orderBookSnapshotTempTable));
  415.  
  416. // Fill allTablesData table
  417. executeParametrizedQuery(connection, format(loadQuery("insertFromAddOrder.sql"), AGGREGATED_DATA_TABLE,
  418. addOrderTable), currentQueryParams);
  419. executeParametrizedQuery(connection, format(loadQuery("insertFromModifyOrder.sql"), AGGREGATED_DATA_TABLE,
  420. modifyOrderTable), currentQueryParams);
  421. executeParametrizedQuery(connection, format(loadQuery("insertFromOrderDelete.sql"), AGGREGATED_DATA_TABLE,
  422. deleteOrderTable), currentQueryParams);
  423. executeParametrizedQuery(connection, format(loadQuery("insertFromOrderBookClear.sql"), AGGREGATED_DATA_TABLE,
  424. orderBookClearTable), currentQueryParams);
  425.  
  426. snapshotHandler.setOrderDataSourceTable(orderBookSnapshotTempTable);
  427. try
  428. {
  429. String minBuyTop5Price = getWorsePriceInTop5( true, false, null);
  430. logger.trace("Min Buy Top-5 Price in OrderBookSnapshot table for instrument {}: {}",
  431. currentInstrument, minBuyTop5Price);
  432. bestTop5AllInstrument.setString(1, minBuyTop5Price);
  433. bestRest5AllInstrument.setString(1, minBuyTop5Price);
  434.  
  435. String maxSellTop5Price = getWorsePriceInTop5( false, false, null);
  436. logger.trace("Max Sell Top-5 Price in OrderBookSnapshot table for instrument {}: {}",
  437. currentInstrument, maxSellTop5Price);
  438. bestTop5AllInstrument.setString(2, maxSellTop5Price);
  439. bestRest5AllInstrument.setString(2, maxSellTop5Price);
  440.  
  441. try (DbDataReader reader = new DbDataReader(bestTop5AllInstrument)) // Add to PrevSnapshot
  442. {
  443. reader.start();
  444. while (reader.hasMoreData())
  445. {
  446. addRecord(reader.readRow(), currentInstrument);
  447. }
  448. }
  449. try (DbDataReader reader = new DbDataReader(bestRest5AllInstrument)) // Add to TempPrevSnapshot
  450. {
  451. reader.start();
  452. while (reader.hasMoreData())
  453. {
  454. addRecord(reader.readRow(), currentInstrument);
  455. }
  456. }
  457. }
  458. catch (IOException e)
  459. {
  460. String errMessage = "Error while select rows from OrderBookSnapshotTempTable";
  461. this.logger.error(errMessage, e);
  462. throw new ResultException(errMessage, e);
  463. }
  464.  
  465. executeBatch(insertFromAllData, "insertFromAllData");
  466. executeBatch(insertAddRowToTempSnapshot, "insertAddRowToTempSnapshot");
  467. Utils.closeResource(insertFromAllData);
  468. Utils.closeResource(insertAddRowToTempSnapshot);
  469. Utils.closeResource(bestTop5AllInstrument);
  470. Utils.closeResource(bestRest5AllInstrument);
  471.  
  472. snapshotHandler.setOrderDataSourceTable(AGGREGATED_DATA_TABLE);
  473.  
  474. insertFromAllData = connection.prepareStatement(format(loadQuery("insertFromAllData.sql"),
  475. PREVIOUS_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  476. insertAddRowToSnapshot = connection.prepareStatement(format(loadQuery("insertAddRowToSnapshot.sql"),
  477. PREVIOUS_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  478. insertAddRowToTempSnapshot = connection.prepareStatement(format(loadQuery("insertAddRowToSnapshot.sql"),
  479. TEMP_PREVIOUS_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  480. insertDeleteRowToSnapshot = connection.prepareStatement(format(loadQuery("insertDeleteRowToSnapshot.sql"),
  481. PREVIOUS_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  482. insertDeleteRowWithSpecParams = connection.prepareStatement(format(loadQuery(
  483. "insertDeleteRowWithSpecificParams.sql"), PREVIOUS_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  484. insertFromAllDataWithSpecParamsSource = connection.prepareStatement(format(loadQuery(
  485. "insertFromAllDataWithSpecificParams.sql"), PREVIOUS_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  486. insertDeleteRowFromModifyToSnap = connection.prepareStatement(format(loadQuery(
  487. "insertDeleteRowFromModifyToSnapshot.sql"), PREVIOUS_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  488. deleteFromTemp = connection.prepareStatement(format("DELETE FROM %s WHERE Side=? AND Order_ID=? AND " +
  489. "MessageTimestamp=?", TEMP_PREVIOUS_SNAPSHOT_TABLE));
  490. insertAddRowWithSpecParamsSource = connection.prepareStatement(format(loadQuery(
  491. "insertAddRowWithSpecificParams.sql"), PREVIOUS_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  492. insertAddRowWithSpecParamsOrderBook = connection.prepareStatement(format(loadQuery(
  493. "insertAddRowWithSpecificParams.sql"), PREVIOUS_SNAPSHOT_TABLE, orderBookSnapshotTempTable));
  494.  
  495. currentQueryParams.put("LeftTimestamp", orderBookSnapshotTime);
  496. currentQueryParams.put("RightTimestamp", previousSnapshotTime);
  497. createSnapshot(connection, String.format(SELECT_ALL_MESSAGES_BY_TIME,
  498. AGGREGATED_DATA_TABLE)); // first snapshot
  499.  
  500. executeBatch(insertFromAllData, "insertFromAllData");
  501. executeBatch(insertAddRowToSnapshot, "insertAddRowToSnapshot");
  502. executeBatch(insertAddRowToTempSnapshot, "insertAddRowToTempSnapshot");
  503. executeBatch(insertDeleteRowToSnapshot, "insertDeleteRowToSnapshot");
  504. executeBatch(insertDeleteRowWithSpecParams, "insertDeleteRowWithSpecParams");
  505. executeBatch(insertFromAllDataWithSpecParamsSource, "insertFromAllDataWithSpecParamsSource");
  506. executeBatch(insertDeleteRowFromModifyToSnap, "insertDeleteRowFromModifyToSnap");
  507. executeBatch(deleteFromTemp, "deleteFromTemp");
  508. executeBatch(insertAddRowWithSpecParamsOrderBook, "insertAddRowWithSpecParamsOrderBook");
  509. executeBatch(insertAddRowWithSpecParamsSource, "insertAddRowWithSpecParamsSource");
  510.  
  511. Utils.closeResource(insertFromAllData);
  512. Utils.closeResource(insertAddRowToSnapshot);
  513. Utils.closeResource(insertAddRowToTempSnapshot);
  514. Utils.closeResource(insertDeleteRowToSnapshot);
  515. Utils.closeResource(insertDeleteRowWithSpecParams);
  516. Utils.closeResource(insertFromAllDataWithSpecParamsSource);
  517. Utils.closeResource(insertDeleteRowFromModifyToSnap);
  518. Utils.closeResource(deleteFromTemp);
  519. Utils.closeResource(insertAddRowWithSpecParamsOrderBook);
  520. Utils.closeResource(insertAddRowWithSpecParamsSource);
  521.  
  522. // transfer data from prev snapshot to current
  523. snapshotHandler.nextSnapshot(CURRENT_SNAPSHOT_TABLE, TEMP_CURRENT_SNAPSHOT_TABLE);
  524.  
  525. insertFromAllData = connection.prepareStatement(format(loadQuery("insertFromAllData.sql"),
  526. CURRENT_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  527. insertAddRowToSnapshot = connection.prepareStatement(format(loadQuery("insertAddRowToSnapshot.sql"),
  528. CURRENT_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  529. insertAddRowToTempSnapshot = connection.prepareStatement(format(loadQuery("insertAddRowToSnapshot.sql"),
  530. TEMP_CURRENT_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  531. insertDeleteRowToSnapshot = connection.prepareStatement(format(loadQuery("insertDeleteRowToSnapshot.sql"),
  532. CURRENT_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  533. insertDeleteRowWithSpecParams = connection.prepareStatement(format(loadQuery(
  534. "insertDeleteRowWithSpecificParams.sql"), CURRENT_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  535. insertFromAllDataWithSpecParamsSource = connection.prepareStatement(format(loadQuery(
  536. "insertFromAllDataWithSpecificParams.sql"), CURRENT_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  537. insertDeleteRowFromModifyToSnap = connection.prepareStatement(format(loadQuery(
  538. "insertDeleteRowFromModifyToSnapshot.sql"), CURRENT_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  539. deleteFromTemp = connection.prepareStatement(format("DELETE FROM %s WHERE Side=? AND Order_ID=? " +
  540. "AND MessageTimestamp=?", TEMP_CURRENT_SNAPSHOT_TABLE));
  541. insertAddRowWithSpecParamsSource = connection.prepareStatement(format(loadQuery(
  542. "insertAddRowWithSpecificParams.sql"), CURRENT_SNAPSHOT_TABLE, AGGREGATED_DATA_TABLE));
  543. insertAddRowWithSpecParamsOrderBook = connection.prepareStatement(format(loadQuery(
  544. "insertAddRowWithSpecificParams.sql"), CURRENT_SNAPSHOT_TABLE, orderBookSnapshotTempTable));
  545.  
  546. currentQueryParams.put("LeftTimestamp", previousSnapshotTime);
  547. currentQueryParams.put("RightTimestamp", currentSnapshotTime);
  548. createSnapshot(connection, String.format(SELECT_ALL_MESSAGES_BY_TIME,
  549. AGGREGATED_DATA_TABLE)); // second snapshot
  550.  
  551. executeBatch(insertFromAllData, "insertFromAllData");
  552. executeBatch(insertAddRowToSnapshot, "insertAddRowToSnapshot");
  553. executeBatch(insertAddRowToTempSnapshot, "insertAddRowToTempSnapshot");
  554. executeBatch(insertDeleteRowToSnapshot, "insertDeleteRowToSnapshot");
  555. executeBatch(insertDeleteRowWithSpecParams, "insertDeleteRowWithSpecParams");
  556. executeBatch(insertFromAllDataWithSpecParamsSource, "insertFromAllDataWithSpecParamsSource");
  557. executeBatch(insertDeleteRowFromModifyToSnap, "insertDeleteRowFromModifyToSnap");
  558. executeBatch(deleteFromTemp, "deleteFromTemp");
  559. executeBatch(insertAddRowWithSpecParamsSource, "insertAddRowWithSpecParamsSource");
  560. executeBatch(insertAddRowWithSpecParamsOrderBook, "insertAddRowWithSpecParamsOrderBook");
  561.  
  562. Utils.closeResource(insertFromAllData);
  563. Utils.closeResource(insertAddRowToSnapshot);
  564. Utils.closeResource(insertAddRowToTempSnapshot);
  565. Utils.closeResource(insertDeleteRowToSnapshot);
  566. Utils.closeResource(insertDeleteRowWithSpecParams);
  567. Utils.closeResource(insertFromAllDataWithSpecParamsSource);
  568. Utils.closeResource(insertDeleteRowFromModifyToSnap);
  569. Utils.closeResource(deleteFromTemp);
  570. Utils.closeResource(insertAddRowWithSpecParamsSource);
  571. Utils.closeResource(insertAddRowWithSpecParamsOrderBook);
  572.  
  573. //transfer result to final tables
  574. executeParametrizedQuery(connection, format(loadQuery("insertToFinalSnapshot.sql"),
  575. finalPreviousSnapshotTable, PREVIOUS_SNAPSHOT_TABLE), currentQueryParams);
  576. executeParametrizedQuery(connection, format(loadQuery("insertToFinalSnapshot.sql"),
  577. finalCurrentSnapshotTable, CURRENT_SNAPSHOT_TABLE), currentQueryParams);
  578.  
  579. connection.commit();
  580. }
  581.  
  582. Utils.closeResource(worseBuyPriceInTop5AllInstrument);
  583. Utils.closeResource(worseSellPriceInTop5AllInstrument);
  584. }
  585. }
  586.  
  587. private void cleanAllTempTables(Connection connection)
  588. {
  589. executeParametrizedQuery(connection, format("DELETE FROM %s", AGGREGATED_DATA_TABLE), currentQueryParams);
  590. executeParametrizedQuery(connection, format("DELETE FROM %s", orderBookSnapshotTempTable), currentQueryParams);
  591.  
  592. executeParametrizedQuery(connection, format("DELETE FROM %s", TEMP_PREVIOUS_SNAPSHOT_TABLE), currentQueryParams);
  593. executeParametrizedQuery(connection, format("DELETE FROM %s", PREVIOUS_SNAPSHOT_TABLE), currentQueryParams);
  594. executeParametrizedQuery(connection, format("DELETE FROM %s", TEMP_CURRENT_SNAPSHOT_TABLE), currentQueryParams);
  595. executeParametrizedQuery(connection, format("DELETE FROM %s", CURRENT_SNAPSHOT_TABLE), currentQueryParams);
  596. }
  597.  
  598. /*
  599. private void printTableColumns(StringTableData td)
  600. {
  601. CommaBuilder commaBuilder = new CommaBuilder();
  602. Iterator<String> headerIterator = td.getHeader().iterator();
  603. String currentColumn;
  604. while(headerIterator.hasNext())
  605. {
  606. currentColumn = headerIterator.next();
  607. String newColumn = "'" + currentColumn + "'";
  608. commaBuilder.append(newColumn);
  609. }
  610. String columns = commaBuilder.toString();
  611. logger.trace("Temp allData table columns: {}", columns);
  612.  
  613. }
  614. */
  615.  
  616. private void createSnapshot(Connection connection, String selectOrderBookPartQuery) throws SQLException
  617. {
  618. long MAX_ROWS = 100_000, currentRow = 0;
  619. try (DbDataReader reader = getTableReader(connection, selectOrderBookPartQuery, currentQueryParams))
  620. {
  621. reader.start();
  622. //printTableColumns(td);
  623. while (reader.hasMoreData())
  624. {
  625. if (currentRow == MAX_ROWS)
  626. {
  627. connection.commit();
  628. currentRow = 0;
  629. }
  630. else
  631. {
  632. currentRow++;
  633. }
  634. TableRow<String, String> tr = reader.readRow();
  635. String messageType = tr.getValue(MESSAGE_TYPE);
  636. String currentInstrument = tr.getValue("Instrument_ID");
  637. switch (messageType)
  638. {
  639. case ADD_ORDER_TYPE_DEC:
  640. addRecord(tr, currentInstrument);
  641. break;
  642. case MODIFY_ORDER_TYPE_DEC:
  643. modifyRecord(tr, currentInstrument);
  644. break;
  645. case DELETE_ORDER_TYPE_DEC:
  646. deleteRecord(tr, currentInstrument);
  647. break;
  648. case CLEAR_BOOK_TYPE_DEC:
  649. deleteAllRecords(currentInstrument);
  650. break;
  651. default:
  652. throw new ResultException(format("Invalid value in column %s: %s", MESSAGE_TYPE,
  653. messageType));
  654. }
  655. }
  656. }
  657. catch (IOException e)
  658. {
  659. String errMessage = "Error while reading data";
  660. this.logger.error(errMessage, e);
  661. throw new ResultException(errMessage, e);
  662. }
  663. }
  664.  
  665. private void addRecord(TableRow<String, String> tableRow, String currentInstrument) throws SQLException
  666. {
  667. String side = tableRow.getValue(SIDE);
  668. String orderId = tableRow.getValue(ORDER_ID);
  669. String timestamp = tableRow.getValue(MESSAGE_TIMESTAMP);
  670.  
  671. currentQueryParams.put(SIDE, side);
  672. currentQueryParams.put("OrderId", orderId);
  673. currentQueryParams.put("Timestamp", timestamp);
  674.  
  675. String price = tableRow.getValue(PRICE);
  676. String orderType = tableRow.getValue("Order_Type");
  677. String dss = tableRow.getValue("dss_rt_bus_seq_no");
  678. snapshotHandler.getInstrumentInfo(currentInstrument).addOrder(side, orderId, timestamp,
  679. Double.parseDouble(price), orderType, dss);
  680. }
  681.  
  682. private void modifyRecord(TableRow<String, String> tableRow, String currentInstrument) throws SQLException
  683. {
  684. String side = tableRow.getValue(SIDE);
  685. String orderId = tableRow.getValue(ORDER_ID);
  686. String timestamp = tableRow.getValue(MESSAGE_TIMESTAMP);
  687.  
  688. currentQueryParams.put(SIDE, side);
  689. currentQueryParams.put("OrderId", orderId);
  690. currentQueryParams.put("Timestamp", timestamp);
  691.  
  692. String oldPrice = tableRow.getValue("Old_Price");
  693. String newPrice = tableRow.getValue("Price");
  694. String newSize = tableRow.getValue("Size");
  695. String orderType = tableRow.getValue("Order_Type");
  696. String dss = tableRow.getValue("dss_rt_bus_seq_no");
  697. snapshotHandler.getInstrumentInfo(currentInstrument).modifyOrder(side, orderId,
  698. timestamp, Double.parseDouble(oldPrice), Double.parseDouble(newPrice), newSize, orderType, dss);
  699. }
  700.  
  701. private void deleteRecord(TableRow<String, String> tableRow, String currentInstrument) throws SQLException
  702. {
  703. String side = tableRow.getValue(SIDE);
  704. String orderId = tableRow.getValue(ORDER_ID);
  705. String timestamp = tableRow.getValue(MESSAGE_TIMESTAMP);
  706.  
  707. currentQueryParams.put(SIDE, side);
  708. currentQueryParams.put("OrderId", orderId);
  709. currentQueryParams.put("Timestamp", timestamp);
  710.  
  711. String price = tableRow.getValue(PRICE);
  712. String orderType = tableRow.getValue("Order_Type");
  713. String dss = tableRow.getValue("dss_rt_bus_seq_no");
  714. snapshotHandler.getInstrumentInfo(currentInstrument).deleteOrder(side, orderId,
  715. Double.parseDouble(price), orderType, timestamp, dss);
  716. }
  717.  
  718. private void deleteAllRecords(String currentInstrument) throws SQLException
  719. {
  720. snapshotHandler.getInstrumentInfo(currentInstrument).clear();
  721. }
  722.  
  723. private String loadQuery(String queryFile)
  724. {
  725. String createResultTable;
  726. try
  727. {
  728. createResultTable = SQLUtils.loadQuery(rootRelative(QUERIES_PATH + queryFile));
  729. }
  730. catch (IOException e)
  731. {
  732. String errMessage = "Error while loading query from file '" + queryFile + "'";
  733. this.logger.error(errMessage, e);
  734. throw new ResultException(errMessage, e);
  735. }
  736. return createResultTable;
  737. }
  738.  
  739. private void executeParametrizedQuery(Connection connection, String query, Map<String, String> params)
  740. {
  741. logger.trace("Query before parsing: {}", query);
  742. ParametrizedQuery paramQuery = SQLUtils.parseSQLTemplate(query);
  743. logger.trace("Parametrized query: {}", paramQuery.getQuery());
  744.  
  745. try (PreparedStatement statement = paramQuery.createPreparedStatement(connection, params))
  746. {
  747. statement.execute();
  748. }
  749. catch (SQLException e)
  750. {
  751. String errMessage = "Error while prepared statement creating";
  752. this.logger.error(errMessage, e);
  753. throw new ResultException(errMessage, e);
  754. }
  755. }
  756.  
  757. private String getWorsePriceInTop5(boolean isBuy, boolean isSpecificInstrument,
  758. String instrument) throws SQLException
  759. {
  760. PreparedStatement preparedStatement;
  761. if (isBuy)
  762. {
  763. if (isSpecificInstrument)
  764. {
  765. preparedStatement = worseBuyPriceInTop5OneInstrument;
  766. preparedStatement.setString(1, instrument);
  767. }
  768. else
  769. {
  770. preparedStatement = worseBuyPriceInTop5AllInstrument;
  771. }
  772. }
  773. else // Sell
  774. {
  775. if (isSpecificInstrument)
  776. {
  777. preparedStatement = worseSellPriceInTop5OneInstrument;
  778. preparedStatement.setString(1, instrument);
  779. }
  780. else
  781. {
  782. preparedStatement = worseSellPriceInTop5AllInstrument;
  783. }
  784. }
  785.  
  786. try (ResultSet rs = preparedStatement.executeQuery())
  787. {
  788. if(rs.next())
  789. return String.valueOf(rs.getDouble("Price"));
  790. }
  791. catch (SQLException e)
  792. {
  793. String errMessage = "Error while prepared statement creating";
  794. this.logger.error(errMessage, e);
  795. throw new ResultException(errMessage, e);
  796. }
  797. return null;
  798. }
  799.  
  800. private void executeQuery(Connection connection, String query)
  801. {
  802. logger.trace("Query to execute: {}", query);
  803.  
  804. try (Statement statement = connection.createStatement())
  805. {
  806. statement.execute(query);
  807. }
  808. catch (SQLException e)
  809. {
  810. String errMessage = format("Error while executing query '%s'.", query);
  811. this.logger.error(errMessage, e);
  812. throw new ResultException(errMessage, e);
  813. }
  814. }
  815.  
  816. private DbDataReader getTableReader(Connection connection, String sqlTemplate, Map<String, String> params)
  817. {
  818. ParametrizedQuery query;
  819. PreparedStatement statement;
  820. try
  821. {
  822. query = SQLUtils.parseSQLTemplate(sqlTemplate);
  823. statement = query.createPreparedStatement(connection, params);
  824. }
  825. catch (SQLException e)
  826. {
  827. String errMessage = "Error while prepared statement creating";
  828. this.logger.error(errMessage, e);
  829. throw new ResultException(errMessage, e);
  830. }
  831. return new DbDataReader(statement);
  832. }
  833.  
  834. private class SnapshotHandler
  835. {
  836. private final Connection connection;
  837. private String snapshotTable;
  838. private String tempSnapshotTable;
  839. private Map<String, InstrumentInfo> instrumentsInfo;
  840. private String sourceTable;
  841.  
  842. public SnapshotHandler(Connection dbConnection, String snapshotTable, String tempSnapshotTable)
  843. {
  844. connection = dbConnection;
  845. this.snapshotTable = snapshotTable;
  846. this.tempSnapshotTable = tempSnapshotTable;
  847. instrumentsInfo = new HashMap<>();
  848. }
  849.  
  850. public void setOrderDataSourceTable(String sourceTable)
  851. {
  852. this.sourceTable = sourceTable;
  853. for (Map.Entry<String, InstrumentInfo> instrumentEntry : instrumentsInfo.entrySet())
  854. {
  855. InstrumentInfo ii = instrumentEntry.getValue();
  856. ii.setOrderDataSourceTable(sourceTable);
  857. instrumentEntry.setValue(ii);
  858. }
  859. }
  860.  
  861. public InstrumentInfo getInstrumentInfo(String instrument)
  862. {
  863. InstrumentInfo ii = instrumentsInfo.get(instrument);
  864. if (ii == null)
  865. {
  866. ii = new InstrumentInfo(connection, snapshotTable, tempSnapshotTable);
  867. ii.setOrderDataSourceTable(sourceTable);
  868. instrumentsInfo.put(instrument, ii);
  869. }
  870. return ii;
  871. }
  872.  
  873. public void nextSnapshot(String newSnapshotTable, String newTempSnapshotTable) throws SQLException
  874. {
  875. this.snapshotTable = newSnapshotTable;
  876. this.tempSnapshotTable = newTempSnapshotTable;
  877. for (Map.Entry<String, InstrumentInfo> instrumentEntry : instrumentsInfo.entrySet())
  878. {
  879. instrumentEntry.getValue().nextSnapshot(newSnapshotTable, newTempSnapshotTable);
  880. }
  881. }
  882. }
  883.  
  884. private class InstrumentInfo
  885. {
  886. private final Connection connection;
  887. private final PriceTopHandler buyHandler, sellHandler;
  888. private String snapshotTable;
  889. private String sourceTable;
  890.  
  891. public InstrumentInfo(Connection dbConnection, String snapshotTable, String tempSnapshotTable)
  892. {
  893. connection = dbConnection;
  894. buyHandler = new PriceTopHandler(true, connection, snapshotTable, tempSnapshotTable);
  895. sellHandler = new PriceTopHandler(false, connection, snapshotTable, tempSnapshotTable);
  896. this.snapshotTable = snapshotTable;
  897. }
  898.  
  899. public void setOrderDataSourceTable(String sourceTable)
  900. {
  901. this.sourceTable = sourceTable;
  902. buyHandler.setOrderDataSourceTable(sourceTable);
  903. sellHandler.setOrderDataSourceTable(sourceTable);
  904. }
  905.  
  906. public void addOrder(String side, String orderId, String timestamp, double price, String orderType,
  907. String dss) throws SQLException
  908. {
  909. if (price == 0 && StringUtils.equals(orderType, "1"))
  910. {
  911. // handle zero price
  912. executePreparedStatement(insertFromAllData, "insertFromAllData", side, orderId, timestamp);
  913. }
  914. else
  915. {
  916. if ("66".equalsIgnoreCase(side))
  917. buyHandler.addOrder(orderId, timestamp, price, dss);
  918. else if ("83".equalsIgnoreCase(side))
  919. sellHandler.addOrder(orderId, timestamp, price, dss);
  920. }
  921. }
  922.  
  923. public void deleteOrder(String side, String orderId, double price, String orderType, String timestamp,
  924. String dss) throws SQLException
  925. {
  926. if (price == 0 && (orderType == null || StringUtils.equals(orderType, "1")))
  927. {
  928. // handle zero price
  929. executePreparedStatementSpecParams(insertDeleteRowWithSpecParams, "insertDeleteRowWithSpecParams",
  930. timestamp, dss, side, orderId, timestamp);
  931. }
  932. else
  933. {
  934. if ("66".equalsIgnoreCase(side))
  935. buyHandler.deleteOrder(orderId, price, timestamp, dss);
  936. else if ("83".equalsIgnoreCase(side))
  937. sellHandler.deleteOrder(orderId, price, timestamp, dss);
  938. }
  939. }
  940.  
  941. public void modifyOrder(String side, String orderId, String timestamp, double oldPrice, double newPrice,
  942. String newSize, String orderType, String dss) throws SQLException
  943. {
  944. if (oldPrice == 0 && newPrice == 0 && (orderType == null || StringUtils.equals(orderType, "1")))
  945. {
  946. // handle zero price
  947. executePreparedStatement(insertFromAllData, "insertFromAllData", side, orderId, timestamp);
  948. }
  949. else
  950. {
  951. if ("66".equalsIgnoreCase(side))
  952. buyHandler.modifyOrder(orderId, timestamp, oldPrice, newPrice, newSize, dss);
  953. else if ("83".equalsIgnoreCase(side))
  954. sellHandler.modifyOrder(orderId, timestamp, oldPrice, newPrice, newSize, dss);
  955. }
  956. }
  957.  
  958. public void clear() throws SQLException
  959. {
  960. buyHandler.clear();
  961. sellHandler.clear();
  962. }
  963.  
  964. public void nextSnapshot(String newSnapshotTable, String newTempSnapshotTable) throws SQLException
  965. {
  966. buyHandler.nextSnapshot(newSnapshotTable, newTempSnapshotTable);
  967. sellHandler.nextSnapshot(newSnapshotTable, newTempSnapshotTable);
  968. this.snapshotTable = newSnapshotTable;
  969. }
  970. }
  971.  
  972. private class PriceTopHandler
  973. {
  974. private class OrderInfo
  975. {
  976. private final String orderId;
  977. private final String timestamp;
  978.  
  979. public OrderInfo(String orderId)
  980. {
  981. this(orderId, null);
  982. }
  983.  
  984. public OrderInfo(String orderId, String timestamp)
  985. {
  986. this.orderId = orderId;
  987. this.timestamp = timestamp;
  988. }
  989.  
  990. public String getOrderId()
  991. {
  992. return orderId;
  993. }
  994.  
  995. public String getTimestamp()
  996. {
  997. return timestamp;
  998. }
  999.  
  1000. @Override
  1001. public boolean equals(Object o)
  1002. {
  1003. if (this == o)
  1004. return true;
  1005.  
  1006. if (o == null || getClass() != o.getClass())
  1007. return false;
  1008.  
  1009. OrderInfo orderInfo = (OrderInfo) o;
  1010. return orderId.equals(orderInfo.orderId);
  1011. }
  1012.  
  1013. @Override
  1014. public int hashCode()
  1015. {
  1016. return orderId.hashCode();
  1017. }
  1018. }
  1019.  
  1020. private static final int TOP_SIZE = 5;
  1021. private final TreeMap<Double, Set<OrderInfo>> top;
  1022. private final TreeMap<Double, Set<OrderInfo>> rest;
  1023. private final Comparator<Double> comparator;
  1024. private String tempSnapshotTable;
  1025. private String snapshotTable;
  1026. private final Connection connection;
  1027. private String orderDataSourceTable;
  1028. private final String side;
  1029.  
  1030. public PriceTopHandler(boolean isBuy, Connection connection, String snapshotTable, String tempSnapshotTable)
  1031. {
  1032. this.connection = connection;
  1033. this.tempSnapshotTable = tempSnapshotTable;
  1034. this.snapshotTable = snapshotTable;
  1035.  
  1036. // First = worst, last = best
  1037. comparator = isBuy ? Comparator.naturalOrder() : Comparator.reverseOrder();
  1038. side = isBuy ? "66" : "83";
  1039.  
  1040. top = new TreeMap<>(comparator);
  1041. rest = new TreeMap<>(comparator);
  1042. }
  1043.  
  1044. public void setOrderDataSourceTable(String orderDataSourceTable)
  1045. {
  1046. this.orderDataSourceTable = orderDataSourceTable;
  1047. }
  1048.  
  1049. public void addOrder(String orderId, String timestamp, double price, String dss) throws SQLException
  1050. {
  1051. addOrder(orderId, timestamp, price, dss, false);
  1052. }
  1053.  
  1054. private void addOrder(String orderId, String timestamp, double price, String dss, boolean isModify)
  1055. throws SQLException
  1056. {
  1057. BiFunction<Double, Set<OrderInfo>, Set<OrderInfo>> ADD_ORDER_ID = (k, set) ->
  1058. {
  1059. if (set == null)
  1060. set = new HashSet<>();
  1061.  
  1062. set.add(new OrderInfo(orderId, timestamp));
  1063. return set;
  1064. };
  1065.  
  1066. if (top.size() == TOP_SIZE && comparator.compare(price, top.firstKey()) < 0)
  1067. { // 1.5 - worse then top-5, record go to the 'rest'
  1068. rest.compute(price, ADD_ORDER_ID);
  1069.  
  1070. if (!isModify)
  1071. {
  1072. // add to temp
  1073. executePreparedStatement(insertAddRowToTempSnapshot, "insertAddRowToTempSnapshot", side, orderId, timestamp);
  1074. }
  1075. }
  1076. else
  1077. {
  1078. // 1.2, 1.3
  1079. top.compute(price, ADD_ORDER_ID);
  1080.  
  1081. if (!isModify)
  1082. {
  1083. // add AddOrder to snapshot
  1084. executePreparedStatement(insertFromAllData, "insertFromAllData", side, orderId, timestamp);
  1085. }
  1086.  
  1087. if (top.size() > TOP_SIZE)
  1088. { // 1.4
  1089. Map.Entry<Double, Set<OrderInfo>> worst = top.pollFirstEntry(); // poll worse price
  1090. rest.put(worst.getKey(), worst.getValue());
  1091.  
  1092. if (!isModify)
  1093. {
  1094. // add to temp as AddOrder
  1095. // Remove from snapshot (add row to snapshot as DeleteOrder)
  1096. for (OrderInfo oi : worst.getValue())
  1097. {
  1098. executePreparedStatement(insertAddRowToTempSnapshot, "insertAddRowToTempSnapshot",
  1099. side, oi.getOrderId(), oi.getTimestamp());
  1100. executePreparedStatementSpecParams(insertDeleteRowWithSpecParams, "insertDeleteRowWithSpecParams",
  1101. timestamp, dss, side, oi.getOrderId(), oi.getTimestamp());
  1102. }
  1103. }
  1104. }
  1105. }
  1106. }
  1107.  
  1108. public void deleteOrder(String orderId, double price, String timestamp, String dss) throws SQLException
  1109. {
  1110. deleteOrder(orderId, price, timestamp, dss, false);
  1111. }
  1112.  
  1113. private void deleteOrder(String orderId, double price, String timestamp, String dss, boolean isModify)
  1114. throws SQLException
  1115. {
  1116. if (top.containsKey(price))
  1117. {
  1118. if (!isModify)
  1119. {
  1120. // delete from snapshot (Add delete row)
  1121. executePreparedStatement(insertDeleteRowToSnapshot, "insertDeleteRowToSnapshot", side, orderId, timestamp);
  1122. }
  1123. Set<OrderInfo> orderIdsByPrice = top.get(price);
  1124. orderIdsByPrice.remove(new OrderInfo(orderId)); // 3.1.1, 3.1.2
  1125.  
  1126. if (orderIdsByPrice.isEmpty()) // add from Temp
  1127. { // 3.1.1
  1128. top.remove(price);
  1129.  
  1130. if (!rest.isEmpty())
  1131. {
  1132. double bestFromTemp = rest.lastKey(); // get best price from temp
  1133. top.put(bestFromTemp, rest.remove(bestFromTemp));
  1134.  
  1135. if (!isModify)
  1136. {
  1137. for (OrderInfo oi : top.get(bestFromTemp))
  1138. {
  1139. // add to snapshot - data can be either in OrderBook, or in Source
  1140. executePreparedStatementSpecParams(insertAddRowWithSpecParamsOrderBook,
  1141. "insertAddRowWithSpecParamsOrderBook",timestamp,
  1142. dss, side, oi.getOrderId(), oi.getTimestamp());
  1143. executePreparedStatementSpecParams(insertAddRowWithSpecParamsSource,
  1144. "insertAddRowWithSpecParamsSource", timestamp,
  1145. dss, side, oi.getOrderId(), oi.getTimestamp());
  1146. // remove from temp
  1147. executePreparedStatement(deleteFromTemp, "deleteFromTemp", side, oi.getOrderId(), oi.getTimestamp());
  1148. }
  1149. }
  1150. }
  1151. }
  1152. }
  1153. else
  1154. {
  1155. rest.remove(price);
  1156.  
  1157. if (!isModify)
  1158. {
  1159. // HARD delete from temp
  1160. currentQueryParams.put("OrderId", String.valueOf(orderId));
  1161. currentQueryParams.put("Side", side); // in Temp only Add Order, so we do
  1162. // not need Message Timestamp
  1163. executeParametrizedQuery(connection,
  1164. format("DELETE FROM %s WHERE Side='#Side' AND Order_ID='#OrderId'",
  1165. tempSnapshotTable), currentQueryParams);
  1166. }
  1167. }
  1168. }
  1169.  
  1170. private boolean containsInTop(String orderId, double oldPrice)
  1171. {
  1172. if (!top.containsKey(oldPrice))
  1173. return false;
  1174.  
  1175. for (OrderInfo oi : top.get(oldPrice))
  1176. {
  1177. if (oi.getOrderId().equals(orderId))
  1178. return true;
  1179. }
  1180. return false;
  1181. }
  1182.  
  1183. public void modifyOrder(String orderId, String timestamp, double oldPrice, double newPrice, String newSize,
  1184. String dss) throws SQLException
  1185. {
  1186. // write modify
  1187. boolean wasTop = containsInTop(orderId, oldPrice);
  1188. Integer numberOfOrdersWithOldPrice = top.containsKey(oldPrice) ? top.get(oldPrice).size() : null;
  1189. double wasWorseTopPrice = top.isEmpty() ? 0 : top.firstKey();
  1190. double wasBestRestPrice = rest.isEmpty() ? 0 : rest.firstKey();
  1191. OrderInfo orderInfoBeforeChange = null;
  1192. if (!wasTop)
  1193. {
  1194. if (rest.containsKey(oldPrice))
  1195. {
  1196. for (OrderInfo oi : rest.get(oldPrice))
  1197. {
  1198. if (oi.getOrderId().equals(orderId))
  1199. orderInfoBeforeChange = oi;
  1200. }
  1201. }
  1202. }
  1203.  
  1204. deleteOrder(orderId, oldPrice, null, null, true);
  1205. addOrder(orderId, timestamp, newPrice, null, true);
  1206.  
  1207. boolean isTop = containsInTop(orderId, newPrice);
  1208.  
  1209. if (wasTop)
  1210. {
  1211. if (isTop)
  1212. {
  1213. // Add modify record
  1214. executePreparedStatement(insertFromAllData, "insertFromAllData", side, orderId, timestamp);
  1215.  
  1216. if (numberOfOrdersWithOldPrice != null && numberOfOrdersWithOldPrice > 1 && top.get(newPrice).size() == 1) // new price push out worse price from top
  1217. {
  1218. Double restFirstKey = rest.isEmpty() ? null : rest.firstKey(); // best in rest
  1219. if (restFirstKey != null)
  1220. {
  1221. for (OrderInfo oi : rest.get(restFirstKey))
  1222. {
  1223. // add delete to snap
  1224. executePreparedStatementSpecParams(insertDeleteRowWithSpecParams,
  1225. "insertDeleteRowWithSpecParams", timestamp, dss,
  1226. side, oi.getOrderId(), oi.getTimestamp());
  1227. // add to temp
  1228. executePreparedStatement(insertAddRowToTempSnapshot, "insertAddRowToTempSnapshot",
  1229. side, oi.getOrderId(), oi.getTimestamp());
  1230. }
  1231. }
  1232. }
  1233. if (newPrice == wasBestRestPrice)
  1234. {
  1235. for (OrderInfo oi : top.get(wasBestRestPrice))
  1236. {
  1237. if (!oi.getOrderId().equals(orderId))
  1238. {
  1239. // Add record
  1240. executePreparedStatementSpecParams(insertFromAllDataWithSpecParamsSource,
  1241. "insertFromAllDataWithSpecParamsSource", timestamp,
  1242. dss, side, oi.getOrderId(), oi.getTimestamp());
  1243. }
  1244. }
  1245. }
  1246. }
  1247. else // wasTop && !isTop
  1248. {
  1249. for (OrderInfo oi : rest.get(newPrice))
  1250. {
  1251. executePreparedStatement(insertDeleteRowFromModifyToSnap, "insertDeleteRowFromModifyToSnap",
  1252. side, oi.getOrderId(), oi.getTimestamp());
  1253. executePreparedStatement(insertAddRowToTempSnapshot, "insertAddRowToTempSnapshot",
  1254. side, oi.getOrderId(), oi.getTimestamp());
  1255. }
  1256.  
  1257. double worseTopPrice = top.firstKey(); // worse in top
  1258. if (wasWorseTopPrice != worseTopPrice)
  1259. {
  1260. for (OrderInfo oi : top.get(worseTopPrice))
  1261. {
  1262. // add to snap - data can be either in OrderBook, or in Source
  1263. executePreparedStatementSpecParams(insertAddRowWithSpecParamsOrderBook,
  1264. "insertAddRowWithSpecParamsOrderBook", timestamp, dss,
  1265. side, oi.getOrderId(), oi.getTimestamp());
  1266. executePreparedStatementSpecParams(insertAddRowWithSpecParamsSource,
  1267. "insertAddRowWithSpecParamsSource", timestamp, dss, side,
  1268. oi.getOrderId(), oi.getTimestamp());
  1269. // remove hard from temp
  1270. executePreparedStatement(deleteFromTemp, "deleteFromTemp",
  1271. side, oi.getOrderId(), oi.getTimestamp());
  1272. }
  1273. }
  1274. }
  1275. }
  1276. else // !wasTop
  1277. {
  1278. if (isTop)
  1279. {
  1280. // add to snap
  1281. executePreparedStatement(insertAddRowToSnapshot, "insertAddRowToSnapshot", side, orderId, timestamp);
  1282. // remove hard from temp
  1283. executePreparedStatement(deleteFromTemp, "deleteFromTemp", side, orderInfoBeforeChange.getOrderId(),
  1284. orderInfoBeforeChange.getTimestamp());
  1285.  
  1286. if (top.get(newPrice).size() == 1) // some orders were pushed off from top by order with new price
  1287. {
  1288. Double restLastKey = rest.isEmpty() ? null : rest.lastKey(); // best in rest
  1289. if (restLastKey != null)
  1290. {
  1291. for (OrderInfo oi : rest.get(restLastKey))
  1292. {
  1293. // add delete to snap
  1294. executePreparedStatementSpecParams(insertDeleteRowWithSpecParams,
  1295. "insertDeleteRowWithSpecParams", timestamp, dss,
  1296. side, oi.getOrderId(), oi.getTimestamp());
  1297. // add to temp
  1298. executePreparedStatement(insertAddRowToTempSnapshot, "insertAddRowToTempSnapshot",
  1299. side, oi.getOrderId(), oi.getTimestamp());
  1300. }
  1301. }
  1302. }
  1303. }
  1304. else // !isTop
  1305. {
  1306. // remove hard from temp
  1307. executePreparedStatement(deleteFromTemp, "deleteFromTemp", side, orderId, timestamp);
  1308. // add to temp
  1309. executePreparedStatement(insertAddRowToTempSnapshot, "insertAddRowToTempSnapshot", side, orderId, timestamp);
  1310. }
  1311. }
  1312. }
  1313.  
  1314. public void clear() throws SQLException
  1315. {
  1316. top.values().stream()
  1317. .flatMap(Collection::stream)
  1318. .forEach(orderInfo -> {
  1319. // delete from snapshot (add row as DeleteOder to snap)
  1320. try
  1321. {
  1322. executePreparedStatement(insertDeleteRowToSnapshot, "insertDeleteRowToSnapshot", side,
  1323. orderInfo.getOrderId(), orderInfo.getTimestamp());
  1324. }
  1325. catch (SQLException e)
  1326. {
  1327. throw new ResultException("Error while inserting DeleteOrder row.", e);
  1328. }
  1329. });
  1330.  
  1331. top.clear();
  1332.  
  1333. for (Iterator<Double> it = rest.descendingKeySet().iterator(); it.hasNext() && top.size() < TOP_SIZE; )
  1334. {
  1335. double price = it.next();
  1336. top.put(price, rest.get(price));
  1337. it.remove();
  1338.  
  1339. for (OrderInfo oi : top.get(price))
  1340. {
  1341. // add to snapshot
  1342. executePreparedStatement(insertFromAllData, "insertFromAllData", side, oi.getOrderId(), oi.getTimestamp());
  1343. // remove from temp
  1344. executePreparedStatement(deleteFromTemp, "deleteFromTemp", side, oi.getOrderId(), oi.getTimestamp());
  1345. }
  1346. }
  1347. }
  1348.  
  1349. public void nextSnapshot(String newSnapshotTable, String newTempSnapshotTable) throws SQLException
  1350. {
  1351. insertFromAllData = connection.prepareStatement(format(insertFromAllDataQuery, newTempSnapshotTable, tempSnapshotTable));
  1352. rest.values().stream()
  1353. .flatMap(Collection::stream)
  1354. .forEach(orderData -> {
  1355. // select last transaction with orderId and timestamp and insert into new table (temp
  1356. // snapshot)
  1357. try
  1358. {
  1359. executePreparedStatement(insertFromAllData, "insertFromAllData",
  1360. side, orderData.getOrderId(), orderData.getTimestamp());
  1361. }
  1362. catch (SQLException e)
  1363. {
  1364. throw new ResultException(format("Error while transferring data from %s to %s",
  1365. tempSnapshotTable, newTempSnapshotTable), e);
  1366. }
  1367. });
  1368. executeBatch(insertFromAllData, "insertFromAllData");
  1369.  
  1370. snapshotTable = newSnapshotTable;
  1371. tempSnapshotTable = newTempSnapshotTable;
  1372. }
  1373. }
  1374.  
  1375. private void executePreparedStatement(PreparedStatement ps, String psName, String side, String orderId, String timestamp)
  1376. throws SQLException
  1377. {
  1378. ps.setString(1, side);
  1379. ps.setString(2, orderId);
  1380. ps.setString(3, timestamp);
  1381. ps.addBatch();
  1382. batchSize.compute(ps, (k, v) -> v == null ? 0 : v + 1);
  1383. if (batchSize.get(ps) == MAX_BATCH_SIZE)
  1384. {
  1385. executeBatch(ps, psName);
  1386. }
  1387. }
  1388.  
  1389. private void executePreparedStatementSpecParams(PreparedStatement ps, String psName, String timestampToInsert,
  1390. String dss, String side, String orderId, String timestampToSelect) throws SQLException
  1391. {
  1392. ps.setString(1, timestampToInsert);
  1393. ps.setString(2, dss);
  1394. ps.setString(3, side);
  1395. ps.setString(4, orderId);
  1396. ps.setString(5, timestampToSelect);
  1397. ps.addBatch();
  1398. batchSize.compute(ps, (k, v) -> v == null ? 0 : v + 1);
  1399. if (batchSize.get(ps) == MAX_BATCH_SIZE)
  1400. {
  1401. executeBatch(ps, psName);
  1402. }
  1403. }
  1404.  
  1405. private void executeBatch(PreparedStatement ps, String psName) throws SQLException
  1406. {
  1407. long start = System.currentTimeMillis();
  1408. ps.executeBatch();
  1409. long end = System.currentTimeMillis();
  1410. int psBatchSize = batchSize.get(ps) == null ? 0 : batchSize.get(ps);
  1411. logger.debug("Executing batch {} (size = {}) took {} ms", psName, psBatchSize,
  1412. (end - start));
  1413. batchSize.put(ps, 0);
  1414. }
  1415. }
  1416.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement