Guest User

Untitled

a guest
Feb 21st, 2018
92
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 6.16 KB | None | 0 0
  1. import java.sql.DriverManager
  2. import java.util.concurrent.LinkedBlockingQueue
  3. import java.util.zip.GZIPInputStream
  4. import javax.xml.stream.XMLInputFactory
  5. import javax.xml.stream.XMLStreamReader
  6.  
  7. class StaxCategory {
  8. static Object get(XMLStreamReader self, String key) {
  9. return self.getAttributeValue(null, key)
  10. }
  11. static String name(XMLStreamReader self) {
  12. return self.name.toString()
  13. }
  14. static String text(XMLStreamReader self) {
  15. return self.elementText
  16. }
  17. }
  18.  
  19. url = "jdbc:hsqldb:hsql://localhost/xdb"
  20. //url = "jdbc:hsqldb:file:event_db"
  21. //url = "jdbc:hsqldb:mem:event_db"
  22.  
  23. Class.forName "org.hsqldb.jdbcDriver"
  24.  
  25. con = DriverManager.getConnection(url,"sa","")
  26. con.createStatement().execute """
  27. CREATE CACHED TABLE log_event (
  28. id bigint primary key,
  29. attr_name varchar,
  30. attr_type varchar,
  31. attr_timestamp numeric,
  32. attr_id bigint
  33. );
  34.  
  35. CREATE CACHED TABLE log_event_argument (
  36. id bigint primary key,
  37. attr_value varchar,
  38. attr_type varchar,
  39. attr_id bigint,
  40. log_event_id bigint
  41. );
  42.  
  43. CREATE CACHED TABLE log_batch (
  44. id bigint primary key,
  45. attr_name varchar
  46. );
  47.  
  48. CREATE CACHED TABLE log_transaction (
  49. id bigint primary key,
  50. attr_id bigint
  51. );
  52.  
  53. CREATE CACHED TABLE log_batch_transaction (
  54. log_batch_id bigint,
  55. log_transaction_id bigint
  56. );
  57.  
  58. CREATE CACHED TABLE log_event_batch (
  59. log_event_id bigint,
  60. log_batch_id bigint
  61. );
  62. """
  63.  
  64. logTransactionStmt = "INSERT INTO log_transaction (id,attr_id) VALUES (?,?)"
  65. logBatchStmt = "INSERT INTO log_batch (id,attr_name) VALUES (?,?)"
  66. logEventStmt = "INSERT INTO log_event (id,attr_name,attr_type,attr_timestamp,attr_id) VALUES (?,?,?,?,?)"
  67. logEventArgumentStmt = "INSERT INTO log_event_argument (id,attr_value,attr_type,attr_id,log_event_id) VALUES (?,?,?,?,?)"
  68. logEventBatchStmt = "INSERT INTO log_event_batch (log_event_id,log_batch_id) VALUES (?,?)"
  69. logBatchTransactionStmt = "INSERT INTO log_batch_transaction (log_batch_id,log_transaction_id) VALUES (?,?)"
  70.  
  71. logTransactionId = 0
  72. logBatchId = 0
  73. logEventId = 0
  74. logArgumentId = 0
  75.  
  76. inTransaction = false
  77. inBatch = false
  78.  
  79. logTransactionQueue = new LinkedBlockingQueue()
  80. logBatchQueue = new LinkedBlockingQueue()
  81. logEventQueue = new LinkedBlockingQueue()
  82. logEventArgumentQueue = new LinkedBlockingQueue()
  83. logEventBatchQueue = new LinkedBlockingQueue()
  84. logBatchTransactionQueue = new LinkedBlockingQueue()
  85.  
  86. batchsize = 10000
  87.  
  88. def consumer = { stmt, queue ->
  89. def con = DriverManager.getConnection(url,"sa","")
  90. con.autoCommit = false
  91. def ps = con.prepareStatement(stmt)
  92. def stmtCount = 0
  93. def argsList
  94. while (argsList = queue.take()) {
  95. def i = 1
  96. argsList.each {
  97. ps.setObject(i++, it == '' ? null : it)
  98. }
  99. ps.addBatch()
  100. stmtCount++
  101. if (stmtCount == batchsize) {
  102. ps.executeBatch()
  103. con.commit()
  104. ps.clearBatch()
  105. stmtCount = 0
  106. print " [Commit]"
  107. }
  108. }
  109. ps.executeBatch()
  110. con.commit()
  111. }
  112.  
  113. Thread.start { consumer(logTransactionStmt, logTransactionQueue) }
  114. Thread.start { consumer(logBatchStmt, logBatchQueue) }
  115. Thread.start { consumer(logEventStmt, logEventQueue) }
  116. Thread.start { consumer(logEventArgumentStmt, logEventArgumentQueue) }
  117. Thread.start { consumer(logEventBatchStmt, logEventBatchQueue) }
  118. Thread.start { consumer(logBatchTransactionStmt, logBatchTransactionQueue) }
  119.  
  120. Thread.start {
  121. while (true) {
  122. println """
  123. === Queue sizes ===
  124. Transaction: ${logTransactionQueue.size()}
  125. Batch: ${logBatchQueue.size()}
  126. Event: ${logEventQueue.size()}
  127. Argument: ${logEventArgumentQueue.size()}
  128.  
  129. """
  130. Thread.sleep 3000
  131. }
  132.  
  133. }
  134.  
  135. def processStartElement(element) {
  136. switch (element.name()) {
  137.  
  138. case "transaction":
  139. inTransaction = true
  140. logTransactionId++
  141. logTransactionQueue.add ([logTransactionId, element.id])
  142. break
  143.  
  144. case "batch":
  145. inBatch = true
  146. logBatchId++
  147. logBatchQueue.add ([logBatchId, element.name])
  148. if (inTransaction) {
  149. logBatchTransactionQueue.add ([logBatchId, logTransactionId])
  150. }
  151. break
  152.  
  153. case "event":
  154. logEventId++
  155. logEventQueue.add ([logEventId,element.name,element.type,element.timestamp,element.id])
  156. if (inBatch) {
  157. logEventBatchQueue.add ([logEventId, logBatchId])
  158. }
  159. break
  160.  
  161. case "argument":
  162. logArgumentId++
  163. logEventArgumentQueue.add ([logArgumentId,element.value,element.type,element.id,logEventId])
  164. break
  165.  
  166. }
  167. //println "Start ${element.name()}"
  168. }
  169.  
  170. def processEndElement(element) {
  171. switch (element.name()) {
  172. case "transaction":
  173. inTransaction = false
  174. break
  175. case "batch":
  176. inBatch = false
  177. break
  178. }
  179. //println "End ${element.name()}"
  180. }
  181.  
  182. def insertEvents(source, desc) {
  183. println "Processing ${desc}..."
  184.  
  185. reader = inputFactory.createXMLStreamReader (source)
  186. while (reader.hasNext()) {
  187. if (reader.startElement)
  188. processStartElement reader
  189. if (reader.endElement) {
  190. processEndElement reader
  191. }
  192. reader.next()
  193. }
  194. reader.close()
  195. }
  196.  
  197. dir = "/Users/peter/jobb/assa_abloy/stoorn/"
  198.  
  199. inputFactory = XMLInputFactory.newInstance()
  200.  
  201. use (StaxCategory) {
  202. currentlog = new File(dir, "events\\currentlog.dat")
  203. insertEvents(currentlog.newInputStream(), "currentlog.dat")
  204.  
  205. gzipFiles = new File(dir).listFiles({ file, name -> name.endsWith ".gz" } as FilenameFilter)
  206. gzipFiles.each {
  207. stream = new GZIPInputStream(it.newInputStream())
  208. if (it.size() < 100000 || true)
  209. insertEvents(stream, "${it.name} (${it.size()/1000} k)")
  210. else
  211. println "Skipping ${it.name} (too large)"
  212. }
  213. }
  214. queue.add(false)
  215.  
  216. println "DONE"
Add Comment
Please, Sign In to add comment