Created
February 13, 2019 04:04
-
-
Save seykron/9edc50762a164a83f5221ff75ea7bcd3 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package net.borak.support.csv | |
import kotlinx.coroutines.CoroutineScope | |
import kotlinx.coroutines.Dispatchers | |
import kotlinx.coroutines.async | |
import kotlinx.coroutines.channels.ReceiveChannel | |
import kotlinx.coroutines.channels.produce | |
import kotlinx.coroutines.runBlocking | |
import org.slf4j.Logger | |
import org.slf4j.LoggerFactory | |
import java.io.BufferedReader | |
import java.io.File | |
import java.nio.charset.Charset | |
class CSVParser { | |
companion object { | |
// Read buffer size. 150MB | |
private const val BUFFER_SIZE: Int = 1024 * 1024 * 150 | |
private const val PARSER_JOBS: Int = 10 | |
} | |
private val logger: Logger = LoggerFactory.getLogger(CSVParser::class.java) | |
fun parse(csvFile: File) = runBlocking { | |
logger.info("parsing started") | |
val parserChannel = loadRecords( | |
reader = csvFile.bufferedReader( | |
charset = Charset.defaultCharset(), | |
bufferSize = BUFFER_SIZE | |
) | |
) | |
(0 until PARSER_JOBS).map { index -> | |
parseRecord(index, parserChannel) | |
}.forEach { future -> | |
future.await() | |
} | |
logger.info("parsing finished") | |
} | |
private fun CoroutineScope.parseRecord(id: Int, | |
parserChannel: ReceiveChannel<String>) = async { | |
var count = 0 | |
for (line in parserChannel) { | |
count += 1 | |
} | |
logger.info("Parser $id has $count elements") | |
} | |
private fun CoroutineScope.loadRecords(reader: BufferedReader) = produce(Dispatchers.IO) { | |
var size: Long = 0 | |
var count = 0 | |
while(true) { | |
val line = reader.readLine() | |
if (line != null) { | |
size += line.length | |
count += 1 | |
send(line) | |
} else { | |
logger.info("Items: {}, size: {}", count, size) | |
break | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment