Last active
September 23, 2019 12:18
-
-
Save yasuabe/d5f5b3809eeef71950478fac0e51c406 to your computer and use it in GitHub Desktop.
fs2 queue sample in which stdin and timer streams enqueue data and dequeuer side prints to stdout
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package qiita | |
import cats.effect._ | |
import cats.syntax.functor._ | |
import fs2.concurrent.Queue | |
import fs2.{Pipe, Stream, io, text} | |
import scala.concurrent.duration._ | |
import scala.language.postfixOps | |
object concurrency { | |
implicit class StreamOps[F[_], O](val fo: F[O]) extends AnyVal { | |
def eval: Stream[F, O] = Stream.eval(fo) | |
} | |
} | |
import concurrency._ | |
trait ConcurrencyDemoApp extends IOApp { | |
def run(args: List[String]): IO[ExitCode] = | |
Stream.resource(Blocker[IO]).flatMap(implicit bl => stream[IO]).compile.drain.as(ExitCode.Success) | |
def stream[F[_] : Concurrent : Timer : ContextShift](implicit bl: Blocker): Stream[F, Unit] | |
} | |
/* | |
<stdin> >>> [string manipulation] >>>>>> [enqueue]>>─┐ | |
├>> [dequeue] >> [stdout] >>─┐ | |
<timer> >>> [retrieve current second] >> [enqueue]>>─┘ ├>> end | |
[sleep 15 sec] >>>>─┘ | |
*/ | |
object QueueTestApp extends ConcurrencyDemoApp { | |
def stdinStream[F[_] : Sync : ContextShift](bl: Blocker): Stream[F, String] = | |
io.stdin[F](4096, bl) | |
.through(text.utf8Decode) | |
.through(text.lines) | |
.map(_.trim) | |
.filter(_.nonEmpty) | |
def timerStream[F[_] : Timer](implicit F: Concurrent[F]): Stream[F, Long] = { | |
val timer: Stream[F, Unit] = Stream.fixedDelay[F](2.second) | |
val seconds: Stream[F, Long] = F.delay(System.currentTimeMillis).eval.repeat | |
(timer zipRight seconds) map (_ / 1000 % 60) | |
} | |
def join[F[_]: ContextShift : Timer](q: Queue[F, String]) | |
(implicit F: Concurrent[F], bl: Blocker): Stream[F, Unit] = { | |
val timerSink: Pipe[F, Long, Unit] = _.evalMap(n => q.enqueue1(s"$n from timer\n")) | |
val stdinSink: Pipe[F, String, Unit] = _.evalMap(s => q.enqueue1(s"$s from stdin\n")) | |
val stdoutSink: Pipe[F, String, Unit] = s => text.utf8Encode(s) through io.stdout[F](bl) | |
val enq1: Stream[F, Unit] = timerStream[F] through timerSink | |
val enq2: Stream[F, Unit] = stdinStream[F](bl) through stdinSink | |
val deq: Stream[F, Unit] = q.dequeue through stdoutSink | |
Stream(enq1, enq2, deq) parJoin 3 | |
} | |
def stream[F[_] : Concurrent : Timer : ContextShift](implicit bl: Blocker): Stream[F, Unit] = for { | |
q <- Queue.bounded[F, String](100).eval | |
_ <- Stream.sleep_[F](15.seconds) concurrently join(q).drain | |
} yield () | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment