Guest User

Untitled

a guest
Feb 16th, 2019
110
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.82 KB | None | 0 0
  1. package net.borak.support.csv
  2.  
  3. import kotlinx.coroutines.*
  4. import kotlinx.coroutines.channels.ReceiveChannel
  5. import kotlinx.coroutines.channels.produce
  6. import org.slf4j.Logger
  7. import org.slf4j.LoggerFactory
  8. import java.io.BufferedReader
  9.  
  10. class CSVParser {
  11.  
  12. companion object {
  13. // Comma
  14. private const val SEPARATOR: Char = 44.toChar()
  15. private const val DOUBLE_QUOTE: Char = 34.toChar()
  16. private const val ESCAPE: Char = 92.toChar()
  17. private const val PARSER_JOBS: Int = 10
  18. }
  19.  
  20. private val logger: Logger = LoggerFactory.getLogger(CSVParser::class.java)
  21.  
  22. fun parse(
  23. csvReader: BufferedReader,
  24. callback: (List<String>) -> Unit
  25. ) = runBlocking {
  26.  
  27. logger.info("parsing csv started")
  28.  
  29. val parserChannel = parseCsv(
  30. reader = csvReader
  31. )
  32.  
  33. (0 until PARSER_JOBS).map { index ->
  34. parseRecordAsync(index, parserChannel, callback)
  35. }.forEach { future ->
  36. future.await()
  37. }
  38.  
  39. logger.info("parsing csv finished")
  40. }
  41.  
  42. private inline fun CoroutineScope.parseRecordAsync(
  43. id: Int,
  44. parserChannel: ReceiveChannel<String>,
  45. crossinline callback: (List<String>) -> Unit
  46. ) = async(CoroutineName("CSV record parser $id")) {
  47.  
  48. var count = 0
  49.  
  50. logger.info("record parser listening for records")
  51.  
  52. for (line in parserChannel) {
  53. callback(parseRecord(line))
  54. count += 1
  55. }
  56.  
  57. logger.info("record parser $id processed $count elements")
  58. }
  59.  
  60. private fun parseRecord(rawRecord: String): List<String> {
  61. var withinField = false
  62. var escape = false
  63. var startIndex = 0
  64. val record: MutableList<String> = mutableListOf()
  65.  
  66. for (index in 0 until rawRecord.length) {
  67. val char = rawRecord[index]
  68.  
  69. when {
  70. !escape && !withinField && char == SEPARATOR -> {
  71. record.add(rawRecord.slice(startIndex until index))
  72. startIndex = index + 1
  73. }
  74. !escape && char == DOUBLE_QUOTE ->
  75. withinField = !withinField
  76. !escape && char == ESCAPE ->
  77. escape = true
  78. escape -> escape = false
  79. }
  80. }
  81.  
  82. return record
  83. }
  84.  
  85. private fun CoroutineScope.parseCsv(reader: BufferedReader) = produce(Dispatchers.IO) {
  86.  
  87. var size: Long = 0
  88. var count = 0
  89.  
  90. logger.info("csv line parser ready for sending records")
  91.  
  92. while(true) {
  93.  
  94. val line = reader.readLine()
  95.  
  96. if (line != null) {
  97. size += line.length
  98. count += 1
  99. send(line)
  100. } else {
  101. logger.info("Items: {}, size: {}", count, size)
  102. break
  103. }
  104. }
  105. }
  106. }
Add Comment
Please, Sign In to add comment