Last active
September 22, 2019 17:10
-
-
Save yasuabe/3bda54acbf91d54e8509f0946ce0e3e7 to your computer and use it in GitHub Desktop.
fs2 concurrency topic demo. To run this, uncomment ConcurrencyDemoApp and package object and move them to appropriate place.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package qiita | |
import fs2.concurrent.{SignallingRef, Topic} | |
import fs2.{INothing, io, text} | |
import scala.concurrent.duration._ | |
import scala.language.postfixOps | |
sealed trait Event | |
case object Start extends Event | |
case class Text(value: String) extends Event | |
case object Quit extends Event | |
import java.util.concurrent.{ExecutorService, Executors} | |
import cats.effect._ | |
import cats.syntax.functor._ | |
import fs2.Stream | |
import scala.concurrent.ExecutionContext | |
object concurrency { | |
implicit class StreamOps[F[_], O](val fo: F[O]) extends AnyVal { | |
def eval: Stream[F, O] = Stream.eval(fo) | |
} | |
} | |
import concurrency._ | |
trait ConcurrencyDemoApp extends IOApp { | |
private val blockingEC = { | |
val acquire = IO(ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(2))) | |
val release = (ec: ExecutorService) => IO(ec.shutdown()) | |
Resource.make(acquire)(release) | |
} | |
def run(args: List[String]): IO[ExitCode] = | |
Stream.resource(blockingEC).flatMap { ec => | |
implicit val bl: Blocker = Blocker.liftExecutionContext(ec) | |
stream[IO] | |
}.compile.drain.as(ExitCode.Success) | |
def stream[F[_] : Concurrent : Timer : ContextShift](implicit bl: Blocker): Stream[F, Unit] | |
} | |
object TopicDemoApp extends ConcurrencyDemoApp { | |
def stdinStream[F[_]: Sync : ContextShift](implicit bl: Blocker): Stream[F, String] = | |
io.stdin[F](4096, bl) | |
.through(text.utf8Decode) | |
.through(text.lines) | |
.map(_.trim) | |
.filter(_.nonEmpty) | |
.takeWhile(_ != ":q", takeFailure = true) | |
def printlnStream[F[_]: Sync : ContextShift](line: String)(implicit bl: Blocker): Stream[F, Unit] = | |
Stream(s"$line\n") | |
.through(text.utf8Encode) | |
.through(io.stdout[F](bl)) | |
class EventService[F[_] : ContextShift : Timer : Concurrent] | |
(topic: Topic[F, Event], haltWhenTrue: SignallingRef[F, Boolean])(implicit ec: Blocker) { | |
def publisher: Stream[F, Unit] = stdinStream flatMap { | |
case ":q" => topic.publish1(Quit).eval | |
case s => topic.publish1(Text(s"$s")).eval | |
} interruptWhen haltWhenTrue | |
def subscriber(id: String, d: FiniteDuration): Stream[F, Unit] = topic.subscribe(maxQueued = 1) flatMap { | |
case Start => printlnStream(s"#$id started") | |
case Text(text) => Stream.sleep_[F](d) ++ printlnStream(s"#$id processing text event: $text") | |
case Quit => haltWhenTrue.set(true).eval | |
} | |
/** | |
* / [subscriberA] >>> [stdout] | |
* [stdin] >>> [publisher] >>> [topic] ー [subscriberB] >>> [stdout] | |
* \ [subscriberC] >>> [stdout] | |
*/ | |
def start: Stream[F, INothing] = { | |
val subscribers: Stream[F, Unit] = | |
Stream(("A", 1.second), ("B", 2.second), ("C", 3.second)) | |
.map { case (id, d) => subscriber(id, d) } | |
.parJoin(3) | |
Stream(publisher concurrently subscribers) parJoin 3 drain | |
} | |
} | |
override def stream[F[_] : Concurrent : Timer : ContextShift](implicit bl: Blocker): Stream[F, Unit] = | |
for { | |
topic <- Topic[F, Event](Start).eval | |
signal <- SignallingRef(false).eval | |
_ <- new EventService[F](topic, signal).start | |
} yield () | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment