daily pastebin goal
53%
SHARE
TWEET

Untitled

a guest Feb 16th, 2019 78 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. package net.borak.support.csv
  2.  
  3. import kotlinx.coroutines.*
  4. import kotlinx.coroutines.channels.ReceiveChannel
  5. import kotlinx.coroutines.channels.produce
  6. import org.slf4j.Logger
  7. import org.slf4j.LoggerFactory
  8. import java.io.BufferedReader
  9.  
  10. class CSVParser {
  11.  
  12.     companion object {
  13.         // Comma
  14.         private const val SEPARATOR: Char = 44.toChar()
  15.         private const val DOUBLE_QUOTE: Char = 34.toChar()
  16.         private const val ESCAPE: Char = 92.toChar()
  17.         private const val PARSER_JOBS: Int = 10
  18.     }
  19.  
  20.     private val logger: Logger = LoggerFactory.getLogger(CSVParser::class.java)
  21.  
  22.     fun parse(
  23.         csvReader: BufferedReader,
  24.         callback: (List<String>) -> Unit
  25.     ) = runBlocking {
  26.  
  27.         logger.info("parsing csv started")
  28.  
  29.         val parserChannel = parseCsv(
  30.             reader = csvReader
  31.         )
  32.  
  33.         (0 until PARSER_JOBS).map { index ->
  34.             parseRecordAsync(index, parserChannel, callback)
  35.         }.forEach { future ->
  36.             future.await()
  37.         }
  38.  
  39.         logger.info("parsing csv finished")
  40.     }
  41.  
  42.     private inline fun CoroutineScope.parseRecordAsync(
  43.         id: Int,
  44.         parserChannel: ReceiveChannel<String>,
  45.         crossinline callback: (List<String>) -> Unit
  46.     ) = async(CoroutineName("CSV record parser $id")) {
  47.  
  48.         var count = 0
  49.  
  50.         logger.info("record parser listening for records")
  51.  
  52.         for (line in parserChannel) {
  53.             callback(parseRecord(line))
  54.             count += 1
  55.         }
  56.  
  57.         logger.info("record parser $id processed $count elements")
  58.     }
  59.  
  60.     private fun parseRecord(rawRecord: String): List<String> {
  61.         var withinField = false
  62.         var escape = false
  63.         var startIndex = 0
  64.         val record: MutableList<String> = mutableListOf()
  65.  
  66.         for (index in 0 until rawRecord.length) {
  67.             val char = rawRecord[index]
  68.  
  69.             when {
  70.                 !escape && !withinField && char == SEPARATOR -> {
  71.                     record.add(rawRecord.slice(startIndex until index))
  72.                     startIndex = index + 1
  73.                 }
  74.                 !escape && char == DOUBLE_QUOTE ->
  75.                     withinField = !withinField
  76.                 !escape && char == ESCAPE ->
  77.                     escape = true
  78.                 escape -> escape = false
  79.             }
  80.         }
  81.  
  82.         return record
  83.     }
  84.  
  85.     private fun CoroutineScope.parseCsv(reader: BufferedReader) = produce(Dispatchers.IO) {
  86.  
  87.         var size: Long = 0
  88.         var count = 0
  89.  
  90.         logger.info("csv line parser ready for sending records")
  91.  
  92.         while(true) {
  93.  
  94.             val line = reader.readLine()
  95.  
  96.             if (line != null) {
  97.                 size += line.length
  98.                 count += 1
  99.                 send(line)
  100.             } else {
  101.                 logger.info("Items: {}, size: {}", count, size)
  102.                 break
  103.             }
  104.         }
  105.     }
  106. }
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top