Created
February 15, 2019 03:07
-
-
Save seykron/a5f093f81f1f168d8a7004c393556100 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package net.borak.support.csv | |
import kotlinx.coroutines.* | |
import kotlinx.coroutines.channels.ReceiveChannel | |
import kotlinx.coroutines.channels.produce | |
import org.slf4j.Logger | |
import org.slf4j.LoggerFactory | |
import java.io.BufferedReader | |
class CSVParser { | |
companion object { | |
// Comma | |
private const val SEPARATOR: Char = 44.toChar() | |
private const val DOUBLE_QUOTE: Char = 34.toChar() | |
private const val ESCAPE: Char = 92.toChar() | |
private const val PARSER_JOBS: Int = 10 | |
} | |
private val logger: Logger = LoggerFactory.getLogger(CSVParser::class.java) | |
fun parse( | |
csvReader: BufferedReader, | |
callback: (List<String>) -> Unit | |
) = runBlocking { | |
logger.info("parsing csv started") | |
val parserChannel = parseCsv( | |
reader = csvReader | |
) | |
(0 until PARSER_JOBS).map { index -> | |
parseRecordAsync(index, parserChannel, callback) | |
}.forEach { future -> | |
future.await() | |
} | |
logger.info("parsing csv finished") | |
} | |
private inline fun CoroutineScope.parseRecordAsync( | |
id: Int, | |
parserChannel: ReceiveChannel<String>, | |
crossinline callback: (List<String>) -> Unit | |
) = async(CoroutineName("CSV record parser $id")) { | |
var count = 0 | |
logger.info("record parser listening for records") | |
for (line in parserChannel) { | |
callback(parseRecord(line)) | |
count += 1 | |
} | |
logger.info("record parser $id processed $count elements") | |
} | |
private fun parseRecord(rawRecord: String): List<String> { | |
var withinField = false | |
var escape = false | |
var startIndex = 0 | |
val record: MutableList<String> = mutableListOf() | |
for (index in 0 until rawRecord.length) { | |
val char = rawRecord[index] | |
when { | |
!escape && !withinField && char == SEPARATOR -> { | |
record.add(rawRecord.slice(startIndex until index)) | |
startIndex = index + 1 | |
} | |
!escape && char == DOUBLE_QUOTE -> | |
withinField = !withinField | |
!escape && char == ESCAPE -> | |
escape = true | |
escape -> escape = false | |
} | |
} | |
return record | |
} | |
private fun CoroutineScope.parseCsv(reader: BufferedReader) = produce(Dispatchers.IO) { | |
var size: Long = 0 | |
var count = 0 | |
logger.info("csv line parser ready for sending records") | |
while(true) { | |
val line = reader.readLine() | |
if (line != null) { | |
size += line.length | |
count += 1 | |
send(line) | |
} else { | |
logger.info("Items: {}, size: {}", count, size) | |
break | |
} | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package net.borak.support.csv | |
import org.junit.Test | |
import org.slf4j.Logger | |
import org.slf4j.LoggerFactory | |
import java.io.File | |
import java.nio.charset.Charset | |
class CSVParserTest { | |
companion object { | |
private const val BUFFER_SIZE: Int = 1024 * 1024 * 150 | |
} | |
private val logger: Logger = LoggerFactory.getLogger(CSVParserTest::class.java) | |
private val source: File = File("huge.csv") | |
@Test | |
fun parse() { | |
val reader = source.bufferedReader( | |
charset = Charset.defaultCharset(), | |
bufferSize = BUFFER_SIZE | |
) | |
CSVParser().parse( | |
csvReader = reader | |
) { record -> | |
//logger.info(record.joinToString(" AND ")) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment