Created
October 7, 2016 14:18
-
-
Save dwhitney/e767a13da564ff9efcd988085bcf4ea5 to your computer and use it in GitHub Desktop.
Quick example showing fs2 and Freek. Ran it over ~8gb of data at constant memory
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package scripts | |
import java.io.File | |
import scala.concurrent.{Await, ExecutionContext, Future} | |
import scala.concurrent.duration._ | |
import scala.language.higherKinds | |
import fs2._ | |
import fs2.Task._ | |
import fs2.interop.cats._ | |
import fs2.util.{Attempt, Catchable, Suspendable} | |
import cats.free.Free | |
import cats.data._ | |
import cats._ | |
import cats.implicits._ | |
import freek._ | |
sealed trait EDSL[A] | |
case class Fail[A](msg: String) extends EDSL[A] | |
case class ListFiles[F[_]](dir: File) extends EDSL[Stream[F,File]] | |
case class ReadLines[F[_]](file: Stream[F,File]) extends EDSL[Stream[F,String]] | |
trait EDSLInterpreter[F[_]] extends (EDSL ~> Task){ | |
implicit val suspendable: Suspendable[F] | |
def fail[A](msg: String): Task[Unit] = Task.now(println(s"File: $msg")) | |
def listFiles(dir: File): Task[Stream[F, File]] = | |
Task.now(Stream.emits[F, File](dir.listFiles.toList)) | |
def readLines(files: Stream[F,File]): Task[Stream[F, String]] = | |
Task.now(files.flatMap{ file => | |
fs2.io.file.readAll[F](file.toPath, 4096) | |
.through(text.utf8Decode) | |
.through(text.lines) | |
}) | |
def apply[A](edsl: EDSL[A]): Task[A] = edsl match { | |
case Fail(msg: String) => fail(msg).asInstanceOf[Task[A]] | |
case ListFiles(dir) => listFiles(dir) | |
case ReadLines(files: Stream[F, File]) => readLines(files) | |
} | |
} | |
object EDSLService{ | |
type PRG = EDSL :|: NilDSL | |
val PRG = DSL.Make[PRG] | |
type PRGF[A] = Free[PRG.Cop, A] | |
implicit val catchableFree = new Catchable[Free[PRG.Cop,?]]{ | |
def fail[A](err: Throwable): Free[PRG.Cop, A] = Fail[A](err.getMessage).upcast[EDSL[A]].freek[PRG] | |
def attempt[A](fa: Free[PRG.Cop, A]): Free[PRG.Cop, Attempt[A]] = fa.map(Attempt(_)) | |
def pure[A](a: A): Free[PRG.Cop, A] = Free.pure[PRG.Cop, A](a) | |
def flatMap[A, B](a: Free[PRG.Cop, A])(f: A => Free[PRG.Cop,B]): Free[PRG.Cop, B] = a.flatMap(f) | |
} | |
def wordCount(dir: File): Free[PRG.Cop, Long] = { | |
for{ | |
fileStream <- ListFiles[PRGF](dir).freek[PRG]//.map(_.take(1)) | |
lines <- ReadLines[PRGF](fileStream).freek[PRG] | |
count <- lines.runFold(0L){ (num, _) => | |
if(num % 10000 == 0) println(num) | |
num + 1 | |
} | |
} yield count | |
} | |
object TaskInterpreter extends EDSLInterpreter[PRGF]{ | |
implicit val suspendable: Suspendable[PRGF] = new Suspendable[PRGF]{ | |
def suspend[A](value: => PRGF[A]): PRGF[A] = Free.pure(()).flatMap(_ => value) | |
def pure[A](a: A): scripts.EDSLService.PRGF[A] = Free.pure(a) | |
def flatMap[A, B](a: PRGF[A])(f: A => PRGF[B]): PRGF[B] = a.flatMap(f) | |
} | |
} | |
} | |
object Experiment extends App{ | |
import ExecutionContext.Implicits.global | |
import cats.implicits._ | |
implicit val tailrec = RecursiveTailRecM.create[Task] | |
val program = EDSLService.wordCount(new File("/Users/dustinwhitney/Desktop/linkshare")) | |
println(program.foldMap(EDSLService.TaskInterpreter.nat).unsafeRun) | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment