Skip to content

Instantly share code, notes, and snippets.

@batakpout
Created February 15, 2023 07:03
Show Gist options
  • Save batakpout/ad0b7186dd140a1aaeb0279954bb9eea to your computer and use it in GitHub Desktop.
Save batakpout/ad0b7186dd140a1aaeb0279954bb9eea to your computer and use it in GitHub Desktop.
import java.nio.file.{Files, Paths}
import cats.effect.{ExitCode, IO, IOApp, Resource}
import cats.implicits._
import scala.collection.mutable
import scala.io.Source
case class SensorData(sensorId: String, humidity: Double)
object Main extends IOApp {
def parseFile(file: String): IO[List[SensorData]] =
Resource
.fromAutoCloseable(IO(Source.fromFile(file)))
.use { source =>
IO {
source
.getLines()
.drop(1)
.map { line =>
val Array(sensorId, humidity) = line.split(",")
SensorData(sensorId, if (humidity == "NaN") Double.NaN else humidity.toDouble)
}
.toList
}
}
def processData(
processedData: mutable.Map[String, (Double, Double, Double, Int)],
data: SensorData
): mutable.Map[String, (Double, Double, Double, Int)] = {
if (!data.humidity.isNaN) {
val (min, avg, max, count) = processedData.getOrElse(data.sensorId, (Double.MaxValue, 0.0, Double.MinValue, 0))
processedData.update(
data.sensorId,
(math.min(min, data.humidity), avg + data.humidity, math.max(max, data.humidity), count + 1)
)
}
processedData
}
def calculateStats(
processedData: mutable.Map[String, (Double, Double, Double, Int)]
): List[(String, (Double, Double, Double))] =
processedData.toList.map { case (sensorId, (min, avg, max, count)) => (sensorId, (min, avg / count, max)) }
def sortData(data: List[(String, (Double, Double, Double))]): List[(String, (Double, Double, Double))] =
data.sortBy { case (_, (_, avg, _)) => -avg }
import scala.jdk.CollectionConverters._
def run(dir: String): IO[Unit] = {
for {
filePaths <- IO {
val dirStream = Files.newDirectoryStream(Paths.get(dir), "*.csv")
dirStream.iterator().asScala.map(_.toString).toList
}
measurements <- filePaths.traverse(parseFile)
processedData = measurements.flatten.foldLeft(mutable.Map[String, (Double, Double, Double, Int)]())(processData)
calculatedData = calculateStats(processedData)
sortedData = sortData(calculatedData)
_ <- IO(println(s"Num of processed files: ${filePaths.length}"))
_ <- IO(println(s"Num of processed measurements: ${measurements.map(_.length).sum}"))
_ <- IO(println(s"Num of failed measurements: ${processedData.values.map(_._4).sum}"))
_ <- IO(println("\nSensors with highest avg humidity:\n"))
_ <- IO(println("sensor-id,min,avg,max"))
_ <- sortedData.traverse {
case (sensor, (minHumidity, avgHumidity, maxHumidity)) =>
IO(println(s"$sensor,$minHumidity,$avgHumidity,$maxHumidity"))
}
} yield ()
}
override def run(args: List[String]): IO[ExitCode] = {
run("/Users/awani/Documents/deletetest") *>
IO(println("--Done--")).as(ExitCode.Success)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment