Skip to content

Instantly share code, notes, and snippets.

@dwhitney
Created October 7, 2016 14:18
Show Gist options
  • Save dwhitney/e767a13da564ff9efcd988085bcf4ea5 to your computer and use it in GitHub Desktop.
Save dwhitney/e767a13da564ff9efcd988085bcf4ea5 to your computer and use it in GitHub Desktop.
Quick example showing fs2 and Freek. Ran it over ~8gb of data at constant memory
package scripts
import java.io.File
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
import scala.language.higherKinds
import fs2._
import fs2.Task._
import fs2.interop.cats._
import fs2.util.{Attempt, Catchable, Suspendable}
import cats.free.Free
import cats.data._
import cats._
import cats.implicits._
import freek._
sealed trait EDSL[A]
case class Fail[A](msg: String) extends EDSL[A]
case class ListFiles[F[_]](dir: File) extends EDSL[Stream[F,File]]
case class ReadLines[F[_]](file: Stream[F,File]) extends EDSL[Stream[F,String]]
trait EDSLInterpreter[F[_]] extends (EDSL ~> Task){
implicit val suspendable: Suspendable[F]
def fail[A](msg: String): Task[Unit] = Task.now(println(s"File: $msg"))
def listFiles(dir: File): Task[Stream[F, File]] =
Task.now(Stream.emits[F, File](dir.listFiles.toList))
def readLines(files: Stream[F,File]): Task[Stream[F, String]] =
Task.now(files.flatMap{ file =>
fs2.io.file.readAll[F](file.toPath, 4096)
.through(text.utf8Decode)
.through(text.lines)
})
def apply[A](edsl: EDSL[A]): Task[A] = edsl match {
case Fail(msg: String) => fail(msg).asInstanceOf[Task[A]]
case ListFiles(dir) => listFiles(dir)
case ReadLines(files: Stream[F, File]) => readLines(files)
}
}
object EDSLService{
type PRG = EDSL :|: NilDSL
val PRG = DSL.Make[PRG]
type PRGF[A] = Free[PRG.Cop, A]
implicit val catchableFree = new Catchable[Free[PRG.Cop,?]]{
def fail[A](err: Throwable): Free[PRG.Cop, A] = Fail[A](err.getMessage).upcast[EDSL[A]].freek[PRG]
def attempt[A](fa: Free[PRG.Cop, A]): Free[PRG.Cop, Attempt[A]] = fa.map(Attempt(_))
def pure[A](a: A): Free[PRG.Cop, A] = Free.pure[PRG.Cop, A](a)
def flatMap[A, B](a: Free[PRG.Cop, A])(f: A => Free[PRG.Cop,B]): Free[PRG.Cop, B] = a.flatMap(f)
}
def wordCount(dir: File): Free[PRG.Cop, Long] = {
for{
fileStream <- ListFiles[PRGF](dir).freek[PRG]//.map(_.take(1))
lines <- ReadLines[PRGF](fileStream).freek[PRG]
count <- lines.runFold(0L){ (num, _) =>
if(num % 10000 == 0) println(num)
num + 1
}
} yield count
}
object TaskInterpreter extends EDSLInterpreter[PRGF]{
implicit val suspendable: Suspendable[PRGF] = new Suspendable[PRGF]{
def suspend[A](value: => PRGF[A]): PRGF[A] = Free.pure(()).flatMap(_ => value)
def pure[A](a: A): scripts.EDSLService.PRGF[A] = Free.pure(a)
def flatMap[A, B](a: PRGF[A])(f: A => PRGF[B]): PRGF[B] = a.flatMap(f)
}
}
}
object Experiment extends App{
import ExecutionContext.Implicits.global
import cats.implicits._
implicit val tailrec = RecursiveTailRecM.create[Task]
val program = EDSLService.wordCount(new File("/Users/dustinwhitney/Desktop/linkshare"))
println(program.foldMap(EDSLService.TaskInterpreter.nat).unsafeRun)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment