Last active
July 6, 2023 00:19
-
-
Save gvolpe/40b1f38ebbcbb76266dc40cad587c469 to your computer and use it in GitHub Desktop.
CSV file reader using the Fs2 streaming library.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cats.effect._ | |
import cats.syntax.functor._ | |
import fs2._ | |
import java.nio.file.Paths | |
import java.util.concurrent.Executors | |
import scala.concurrent.ExecutionContext | |
import scala.util.Try | |
object CsvApp extends IOApp { | |
val blockingExecutionContext = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(2)) | |
case class Sample( | |
policyID: Long, | |
stateCode: String, | |
county: String, | |
eqSiteLimit: Double, | |
huSiteLimit: Double, | |
flSiteLimit: Double, | |
frSiteLimit: Double, | |
tiv2011: Double, | |
tiv2012: Double, | |
eqSiteDeductible: Double, | |
huSiteDeductible: Double, | |
flSiteDeductible: Double, | |
frSiteDeductible: Double, | |
pointLatitude: Double, | |
pointLongitude: Double, | |
line: String, | |
construction: String, | |
pointGranularity: Int | |
) | |
val parseSample: List[String] => Option[Sample] = { | |
case (id :: code :: county :: esl :: hsl :: fll :: frl :: tv1 :: tv2 :: esd :: hsd :: fld :: frd :: plat :: plon :: line :: cons :: pg :: Nil) => | |
Try( | |
Sample( | |
policyID = id.toLong, | |
stateCode = code, | |
county = county, | |
eqSiteLimit = esl.toDouble, | |
huSiteLimit = hsl.toDouble, | |
flSiteLimit = fll.toDouble, | |
frSiteLimit = frl.toDouble, | |
tiv2011 = tv1.toDouble, | |
tiv2012 = tv2.toDouble, | |
eqSiteDeductible = esd.toDouble, | |
huSiteDeductible = hsd.toDouble, | |
flSiteDeductible = fld.toDouble, | |
frSiteDeductible = frd.toDouble, | |
pointLatitude = plat.toDouble, | |
pointLongitude = plon.toDouble, | |
line = line, | |
construction = cons, | |
pointGranularity = pg.trim.toInt | |
) | |
).toOption | |
case _ => None | |
} | |
def csvParser[F[_]]: Pipe[F, Byte, List[String]] = | |
_.through(text.utf8Decode) | |
.through(text.lines) | |
.drop(1) // remove headers | |
.map(_.split(',').toList) // separate by comma | |
// Get file from https://support.spatialkey.com/spatialkey-sample-csv-data/ and convert it to UTF-8 | |
val parser: Stream[IO, Unit] = | |
io.file | |
.readAll[IO](Paths.get("/home/gvolpe/sample.csv"), blockingExecutionContext, 4096) | |
.through(csvParser) | |
.map(parseSample) // parse each line into a valid sample | |
.unNoneTerminate // terminate when done | |
.evalMap(x => IO(println(x))) | |
val program: IO[Unit] = | |
parser.compile.drain.guarantee(IO(blockingExecutionContext.shutdown())) | |
override def run(args: List[String]): IO[ExitCode] = | |
program.as(ExitCode.Success) | |
} |
haha
?
NICE...Thanks for this awesome gist. Talk about local reasoning and this is it. Not much better (as of this writing) than fs2 and cats-effect when it comes to streaming and effects. Of course this comment gets stale within the next few months.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
haha