Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package net.borak.support.csv
- import kotlinx.coroutines.*
- import kotlinx.coroutines.channels.ReceiveChannel
- import kotlinx.coroutines.channels.produce
- import org.slf4j.Logger
- import org.slf4j.LoggerFactory
- import java.io.BufferedReader
- class CSVParser {
- companion object {
- // Comma
- private const val SEPARATOR: Char = 44.toChar()
- private const val DOUBLE_QUOTE: Char = 34.toChar()
- private const val ESCAPE: Char = 92.toChar()
- private const val PARSER_JOBS: Int = 10
- }
- private val logger: Logger = LoggerFactory.getLogger(CSVParser::class.java)
- fun parse(
- csvReader: BufferedReader,
- callback: (List<String>) -> Unit
- ) = runBlocking {
- logger.info("parsing csv started")
- val parserChannel = parseCsv(
- reader = csvReader
- )
- (0 until PARSER_JOBS).map { index ->
- parseRecordAsync(index, parserChannel, callback)
- }.forEach { future ->
- future.await()
- }
- logger.info("parsing csv finished")
- }
- private inline fun CoroutineScope.parseRecordAsync(
- id: Int,
- parserChannel: ReceiveChannel<String>,
- crossinline callback: (List<String>) -> Unit
- ) = async(CoroutineName("CSV record parser $id")) {
- var count = 0
- logger.info("record parser listening for records")
- for (line in parserChannel) {
- callback(parseRecord(line))
- count += 1
- }
- logger.info("record parser $id processed $count elements")
- }
- private fun parseRecord(rawRecord: String): List<String> {
- var withinField = false
- var escape = false
- var startIndex = 0
- val record: MutableList<String> = mutableListOf()
- for (index in 0 until rawRecord.length) {
- val char = rawRecord[index]
- when {
- !escape && !withinField && char == SEPARATOR -> {
- record.add(rawRecord.slice(startIndex until index))
- startIndex = index + 1
- }
- !escape && char == DOUBLE_QUOTE ->
- withinField = !withinField
- !escape && char == ESCAPE ->
- escape = true
- escape -> escape = false
- }
- }
- return record
- }
- private fun CoroutineScope.parseCsv(reader: BufferedReader) = produce(Dispatchers.IO) {
- var size: Long = 0
- var count = 0
- logger.info("csv line parser ready for sending records")
- while(true) {
- val line = reader.readLine()
- if (line != null) {
- size += line.length
- count += 1
- send(line)
- } else {
- logger.info("Items: {}, size: {}", count, size)
- break
- }
- }
- }
- }
Add Comment
Please, Sign In to add comment