Created
June 25, 2019 21:15
-
-
Save BalmungSan/d4a5d524cab529e18fbf05f100ec3296 to your computer and use it in GitHub Desktop.
10 code snippets to introducing cats.effect.IO & fs2.Stream
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// IO: A Monad for side-effects. | |
import $ivy.`org.typelevel::cats-effect:1.3.1` | |
import cats.effect.IO | |
import scala.concurrent.ExecutionContext | |
implicit val IOTimer = IO.timer(ExecutionContext.global) | |
implicit val IOShift = IO.contextShift(ExecutionContext.global) | |
// ---------------------------------------------- | |
// Program 1: Printing to console. | |
val ioa = IO { println("Hello, World!") } | |
val program1: IO[Unit] = | |
for { | |
_ <- ioa | |
_ <- ioa | |
} yield () | |
program1.unsafeRunSync() | |
// ---------------------------------------------- | |
// Program 2: Reading from console. | |
def putStrlLn(value: String) = IO { println(value) } | |
val readLn = IO(scala.io.StdIn.readLine) | |
val program2 = for { | |
_ <- putStrlLn("What's your name?") | |
n <- readLn | |
_ <- putStrlLn(s"Hello, $n!") | |
} yield () | |
program2.unsafeRunSync() | |
// ---------------------------------------------- | |
// Program 3: Async & Cancellabele operations. | |
import cats.effect.{SyncIO, CancelToken} | |
import java.util.concurrent.ScheduledExecutorService | |
import java.util.concurrent.Executors | |
import scala.concurrent.duration._ // Provides duration units. | |
def delayedTick(d: FiniteDuration) | |
(sc: ScheduledExecutorService): SyncIO[CancelToken[IO]] = { | |
val tick: IO[Unit] = IO.cancelable { cb => | |
val r = new Runnable { def run() = cb(Right(())) } | |
val f = sc.schedule(r, d.length, d.unit) | |
// Returning the cancellation token needed to | |
// cancel the scheduling and release resources early | |
IO { | |
println("Canceled!") | |
f.cancel(false) | |
} | |
} | |
tick.runCancelable(_ => IO { println("Tick!") }) | |
} | |
val sc = Executors.newSingleThreadScheduledExecutor() | |
val program3 = delayedTick(10 seconds)(sc) | |
val token = program3.unsafeRunSync() | |
token.unsafeRunSync() | |
// ---------------------------------------------- | |
// Program 4: Parallel operations. | |
import cats.syntax.apply._ // Provides the *> operator. | |
import cats.syntax.parallel._ // Provided the parMapN & parTraverse extension methods. | |
val ioA = IO.sleep(1 second) *> IO(println("Running ioA")) | |
val ioB = IO.sleep(1 second) *> IO(println("Running ioB")) | |
val ioC = IO.sleep(1 second) *> IO(println("Running ioC")) | |
val program4_1 = (ioA, ioB, ioC).parMapN { (_, _, _) => () } | |
program4_1.unsafeRunSync() | |
import cats.data.NonEmptyList | |
val program4_2 = NonEmptyList.of(1, 2, 3).parTraverse { i => | |
IO.sleep(1 second) *> IO { | |
println(i) | |
(i * 5) + 3 | |
} | |
} | |
program4_2.unsafeRunSync() | |
// ---------------------------------------------- | |
// Program 5: Error handling and resource managment. | |
import cats.effect.Resource | |
import scala.io.Source | |
def fahrenheitToCelsius(f: Double): Double = | |
(f - 32.0) * (5.0 / 9.0) | |
def parser(data: IO[List[String]]): IO[List[Double]] = data.map { lines => | |
for { | |
line <- lines | |
if (!line.trim.isEmpty && !line.startsWith("//")) | |
fahrenheit = line.toDouble | |
celsius = fahrenheitToCelsius(fahrenheit) | |
} yield celsius | |
} | |
// File at: https://github.com/functional-streams-for-scala/fs2/blob/series/1.1/testdata/fahrenheit.txt | |
val file = Resource.fromAutoCloseable(IO(Source.fromFile("fahrenheit.txt"))) | |
val parsed = parser( | |
data = file.use(source => IO(source.getLines.toList)) | |
) | |
val program5 = parsed.attempt.flatMap { | |
case Right(data) => | |
IO { | |
println(s"Output: ${data.take(5).mkString("[", ", ", ", ...]")}") | |
} | |
case Left(ex) => | |
IO { | |
println(s"Error: ${ex.getMessage}") | |
} | |
} | |
program5.unsafeRunSync() | |
// ---------------------------------------------- | |
// Stream: A Monad for effectual data streams. | |
import $ivy.`co.fs2::fs2-core:1.0.5` | |
import $ivy.`co.fs2::fs2-io:1.0.5` | |
import fs2.Stream | |
// ---------------------------------------------- | |
// Program 6: Streaming I/O. | |
import fs2.{io, text} | |
import java.nio.file.Paths | |
val blockingExecutionContext = | |
Resource.make( | |
IO( | |
ExecutionContext.fromExecutorService( | |
Executors.newFixedThreadPool(2) | |
) | |
) | |
) { ec => IO(ec.shutdown()) } | |
val converter = | |
Stream.resource(blockingExecutionContext).flatMap { blockingEC => | |
io.file.readAll[IO](Paths.get("fahrenheit.txt"), blockingEC, 4096) | |
.through(text.utf8Decode) | |
.through(text.lines) | |
.filter(line => !line.trim.isEmpty && !line.startsWith("//")) | |
.map(line => fahrenheitToCelsius(line.toDouble).toString) | |
.intersperse("\n") | |
.through(text.utf8Encode) | |
.through(io.file.writeAll(Paths.get("celsius.txt"), blockingEC)) | |
} | |
val program6 = converter.compile.drain | |
program6.unsafeRunSync() | |
// ---------------------------------------------- | |
// Program 7: Concurrency. | |
def greet(from: String): IO[Unit] = IO { | |
println(s"Hello from: ${from}") | |
} | |
def timedGreets(from: String, times: Int): Stream[IO, Unit] = { | |
val greetStream = Stream.repeatEval(greet(from)).take(times) | |
val tickStream = Stream.repeatEval(IO.sleep(1 second)) | |
greetStream interleave tickStream | |
} | |
val a = timedGreets(from = "A", times = 5) | |
val b = timedGreets(from = "B", times = 3) | |
val serial = a ++ b | |
val program7_1 = serial.compile.drain | |
program7_1.unsafeRunSync() | |
val concurrent = a merge b | |
val program7_2 = concurrent.compile.drain | |
program7_2.unsafeRunSync() | |
// ---------------------------------------------- | |
// Program 8: Interruptions. | |
final case object Err extends Throwable("Err") | |
val program8_1 = (Stream(1) ++ (throw Err)).take(1).toList | |
val program8_2 = (Stream(1) ++ Stream.raiseError[IO](Err)).take(2).compile.toList | |
program8_2.unsafeRunSync() | |
val program8_3 = | |
Stream(1, 2, 3) | |
.covary[IO] | |
.onFinalize(IO(println("finalized!"))) | |
.take(1) | |
.compile | |
.toList | |
program8_3.unsafeRunSync() | |
val program8_4 = | |
Stream(1, 2, 3) | |
.append(Stream.raiseError[IO](Err)) | |
.append(Stream(4, 5, 6)) | |
.onFinalize(IO(println("finalized!"))) | |
.take(5) | |
.compile | |
.toList | |
program8_4.unsafeRunSync() | |
val program8_5 = | |
(Stream(1, 2) ++ Stream.raiseError[IO](Err) ++ Stream(3)).mask.compile.toList | |
program8_5.unsafeRunSync() | |
// ---------------------------------------------- | |
// Program 9: Error handling. | |
final case object Fatal extends Throwable("Fatal") | |
val program9_1 = | |
Stream(1, 2, 3) | |
.append(Stream.raiseError[IO](Err)) | |
.append(Stream(4, 5, 6)) | |
.attempt | |
.map { | |
case Right(i) => s"${i}!" | |
case Left(_) => "" | |
}.compile | |
.toList | |
program9_1.unsafeRunSync() | |
val program9_2 = | |
Stream(1, 2, 3) | |
.append(Stream.raiseError[IO](Fatal)) | |
.attempt | |
.map { | |
case Right(i) => Right(s"${i}!") | |
case Left(Err) => Right("") | |
case Left(ex) => Left(ex) | |
}.rethrow | |
.compile | |
.toList | |
program9_2.unsafeRunSync() | |
val program9_3 = | |
Stream(1, 2, 3) | |
.append(Stream.raiseError[IO](Fatal)) | |
.handleErrorWith { | |
case Err => Stream(0) | |
case ex => Stream.eval_(IO(println(s"Errro: ${ex.getMessage}"))) | |
}.append(Stream(4, 5, 6)) | |
.compile | |
.toList | |
program9_3.unsafeRunSync() | |
// ---------------------------------------------- | |
// Program 10: Control Flow. | |
import scala.io.StdIn | |
import scala.util.Try | |
val program10_1 = | |
Stream.unfold(0)(i => if (i <= 5) Some(i -> (i + 1)) else None).toList | |
val readNumberFromConsole: IO[Option[Double]] = IO { | |
Option(StdIn.readLine()).flatMap { input => | |
Try(input.toDouble).toOption | |
} | |
} | |
val consoleNumbers: Stream[IO, Double] = | |
Stream.repeatEval(readNumberFromConsole).unNoneTerminate | |
val program10_2 = consoleNumbers.compile.toList | |
program10_2.unsafeRunSync() | |
// ---------------------------------------------- |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment