Advertisement
Guest User

Untitled

a guest
Apr 6th, 2016
62
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 7.31 KB | None | 0 0
  1. import java.sql.*;
  2. import java.util.*;
  3. import java.util.concurrent.*;
  4.  
  5. import org.slf4j.Logger;
  6. import org.slf4j.LoggerFactory;
  7.  
  8. public class FetchRows {
  9.  
  10. private static final Logger log = LoggerFactory.getLogger(FetchRows.class);
  11.  
  12. public static void main(String[] args) {
  13.  
  14. try {
  15. new FetchRows().print();
  16. } catch (Exception e) {
  17. e.printStackTrace();
  18. }
  19. }
  20.  
  21. void print() throws Exception {
  22.  
  23. Class.forName("com.mysql.jdbc.Driver").newInstance();
  24. Properties dbProps = new Properties();
  25. dbProps.setProperty("user", "test");
  26. dbProps.setProperty("password", "test");
  27.  
  28. try (Connection conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", dbProps)) {
  29. try (Statement st = conn.createStatement()) {
  30. prepareTestData(st);
  31. }
  32. // http://stackoverflow.com/a/2448019/3080094
  33. try (Statement st = conn.createStatement(java.sql.ResultSet.TYPE_FORWARD_ONLY,
  34. java.sql.ResultSet.CONCUR_READ_ONLY)) {
  35. st.setFetchSize(Integer.MIN_VALUE);
  36. fetchAndPrintTestData(st);
  37. }
  38. }
  39. }
  40.  
  41. boolean refreshTestData = true;
  42. int maxRecords = 5_555;
  43.  
  44. void prepareTestData(Statement st) throws SQLException {
  45.  
  46. int recordCount = 0;
  47. if (refreshTestData) {
  48. st.execute("drop table if exists fetchrecords");
  49. st.execute("create table fetchrecords (id mediumint not null auto_increment primary key, created timestamp default current_timestamp)");
  50. for (int i = 0; i < maxRecords; i++) {
  51. st.addBatch("insert into fetchrecords () values ()");
  52. if (i % 500 == 0) {
  53. st.executeBatch();
  54. log.debug("{} records available.", i);
  55. }
  56. }
  57. st.executeBatch();
  58. recordCount = maxRecords;
  59. } else {
  60. try (ResultSet rs = st.executeQuery("select count(*) from fetchrecords")) {
  61. rs.next();
  62. recordCount = rs.getInt(1);
  63. }
  64. }
  65. log.info("{} records available for testing.", recordCount);
  66. }
  67.  
  68. int batchSize = 1_000;
  69. int maxBatchesInMem = 3;
  70. int printFinishTimeoutS = 5;
  71.  
  72. void fetchAndPrintTestData(Statement st) throws SQLException, InterruptedException {
  73.  
  74. final BlockingQueue<List<FetchRecordBean>> printQueue = new LinkedBlockingQueue<List<FetchRecordBean>>(maxBatchesInMem);
  75. final PrintToConsole printTask = new PrintToConsole(printQueue);
  76. new Thread(printTask).start();
  77. try (ResultSet rs = st.executeQuery("select * from fetchrecords")) {
  78. List<FetchRecordBean> l = new LinkedList<>();
  79. while (rs.next()) {
  80. FetchRecordBean bean = new FetchRecordBean();
  81. bean.setId(rs.getInt("id"));
  82. bean.setCreated(new java.util.Date(rs.getTimestamp("created").getTime()));
  83. l.add(bean);
  84. if (l.size() % batchSize == 0) {
  85. /*
  86. * The printTask can stop itself when this producer is too slow to put records on the print-queue.
  87. * Therefor, also check printTask.isStopping() to break the while-loop.
  88. */
  89. if (printTask.isStopping()) {
  90. throw new TimeoutException("Print task has stopped.");
  91. }
  92. enqueue(printQueue, l);
  93. l = new LinkedList<>();
  94. }
  95. }
  96. if (l.size() > 0) {
  97. enqueue(printQueue, l);
  98. }
  99. } catch (TimeoutException | InterruptedException e) {
  100. log.error("Unable to finish printing records to console: {}", e.getMessage());
  101. printTask.stop();
  102. } finally {
  103. log.info("Reading records finished.");
  104. if (!printTask.isStopping()) {
  105. try {
  106. enqueue(printQueue, Collections.<FetchRecordBean> emptyList());
  107. } catch (Exception e) {
  108. log.error("Unable to signal last record to print.", e);
  109. printTask.stop();
  110. }
  111. }
  112. if (!printTask.await(printFinishTimeoutS, TimeUnit.SECONDS)) {
  113. log.error("Print to console task did not finish.");
  114. }
  115. }
  116. }
  117.  
  118. int enqueueTimeoutS = 5;
  119. // To test a slow printer, see also Thread.sleep statement in PrintToConsole.print.
  120. // int enqueueTimeoutS = 1;
  121.  
  122. void enqueue(BlockingQueue<List<FetchRecordBean>> printQueue, List<FetchRecordBean> l) throws InterruptedException, TimeoutException {
  123.  
  124. log.debug("Adding {} records to print-queue.", l.size());
  125. if (!printQueue.offer(l, enqueueTimeoutS, TimeUnit.SECONDS)) {
  126. throw new TimeoutException("Unable to put print data on queue within " + enqueueTimeoutS + " seconds.");
  127. }
  128. }
  129.  
  130. int dequeueTimeoutS = 5;
  131.  
  132. class PrintToConsole implements Runnable {
  133.  
  134. private final BlockingQueue<List<FetchRecordBean>> q;
  135. private final CountDownLatch finishedLock = new CountDownLatch(1);
  136. private volatile boolean stop;
  137.  
  138. public PrintToConsole(BlockingQueue<List<FetchRecordBean>> q) {
  139. this.q = q;
  140. }
  141.  
  142. @Override
  143. public void run() {
  144.  
  145. try {
  146. while (!stop) {
  147. List<FetchRecordBean> l = q.poll(dequeueTimeoutS, TimeUnit.SECONDS);
  148. if (l == null) {
  149. log.error("Unable to get print data from queue within {} seconds.", dequeueTimeoutS);
  150. break;
  151. }
  152. if (l.isEmpty()) {
  153. break;
  154. }
  155. print(l);
  156. }
  157. if (stop) {
  158. log.error("Printing to console was stopped.");
  159. }
  160. } catch (Exception e) {
  161. log.error("Unable to print records to console.", e);
  162. } finally {
  163. if (!stop) {
  164. stop = true;
  165. log.info("Printing to console finished.");
  166. }
  167. finishedLock.countDown();
  168. }
  169. }
  170.  
  171. void print(List<FetchRecordBean> l) {
  172.  
  173. log.info("Got list with {} records from print-queue.", l.size());
  174. // To test a slow printer, see also enqueueTimeoutS.
  175. // try { Thread.sleep(1500L); } catch (Exception ignored) {}
  176. }
  177.  
  178. public void stop() {
  179. stop = true;
  180. }
  181.  
  182. public boolean isStopping() {
  183. return stop;
  184. }
  185.  
  186. public void await() throws InterruptedException {
  187. finishedLock.await();
  188. }
  189.  
  190. public boolean await(long timeout, TimeUnit tunit) throws InterruptedException {
  191. return finishedLock.await(timeout, tunit);
  192. }
  193.  
  194. }
  195.  
  196. class FetchRecordBean {
  197.  
  198. private int id;
  199. private java.util.Date created;
  200.  
  201. public int getId() {
  202. return id;
  203. }
  204. public void setId(int id) {
  205. this.id = id;
  206. }
  207. public java.util.Date getCreated() {
  208. return created;
  209. }
  210. public void setCreated(java.util.Date created) {
  211. this.created = created;
  212. }
  213.  
  214. }
  215. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement