Last active
May 3, 2020 08:46
-
-
Save yasuabe/6fcbbdd77beec0a6f2b85a5b18d2dc59 to your computer and use it in GitHub Desktop.
fs2.concurrent Balance and Broadcast demo. To run these, uncomment ConcurrentyDemoApp.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package qiita | |
import cats.effect._ | |
import cats.syntax.functor._ | |
import fs2.{Pipe, Stream, io, text} | |
import scala.concurrent.duration._ | |
import scala.language.postfixOps | |
trait ConcurrencyDemoApp extends IOApp { | |
def run(args: List[String]): IO[ExitCode] = | |
Stream.resource(Blocker[IO]) | |
.flatMap(implicit bl => stream[IO]) | |
.compile.drain | |
.as(ExitCode.Success) | |
def stream[F[_] : Concurrent : Timer : ContextShift](implicit bl: Blocker): Stream[F, Unit] | |
} | |
object BalanceAndBroadcastDemo { | |
def readlineStream[F[_]: Sync : ContextShift](implicit bl: Blocker): Stream[F, String] = | |
io.stdin[F](4096, bl) | |
.through(text.utf8Decode) | |
.through(text.lines) | |
.map(_.trim) | |
.filter(_.nonEmpty) | |
.takeWhile(_ != ":q", takeFailure = true) | |
def printlnSink[F[_]: Sync : ContextShift](implicit bl: Blocker): Pipe[F, String, Unit] = | |
_.map(s => s"$s\n") | |
.through(text.utf8Encode) | |
.through(io.stdout[F](bl)) | |
def worker[F[_]: Sync: ContextShift: Timer](id: String, sec: Int) | |
(implicit bl: Blocker): Pipe[F, String, Int] = _ flatMap { s => | |
val sleep = Stream.sleep_[F](sec.second) | |
val print = Stream(s"worker#$id processing '$s' (${sec}s)") through printlnSink | |
(sleep ++ print) zipRight Stream(s).map(_.length) | |
} | |
} | |
import BalanceAndBroadcastDemo._ | |
object BalanceDemoApp extends ConcurrencyDemoApp { | |
def stream[F[_] : Concurrent : Timer : ContextShift](implicit bl: Blocker): Stream[F, Unit] = | |
readlineStream[F] | |
.balanceThrough(1)(worker("A", 1), worker("B", 2), worker("C", 3)) | |
.map(n => s"length: $n") | |
.through(printlnSink) | |
} | |
object BroadcastDemoApp extends ConcurrencyDemoApp { | |
def stream[F[_] : Concurrent : Timer : ContextShift](implicit bl: Blocker): Stream[F, Unit] = | |
readlineStream[F] | |
.broadcastThrough(worker("A", 1), worker("B", 2), worker("C", 3)) | |
.map(n => s"length: $n") | |
.through(printlnSink) | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment