Created
September 29, 2021 06:50
-
-
Save adrobisch/f530ab482c35a0a8dcd395355eea5c2d to your computer and use it in GitHub Desktop.
handle pending work after a stop signal with fs2 and cats-effect
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package stop | |
import cats.effect.std.Queue | |
import cats.effect.{ExitCode, IO, IOApp} | |
import fs2._ | |
import fs2.concurrent.SignallingRef | |
import sun.misc.{Signal, SignalHandler} | |
import java.time.LocalDateTime | |
import scala.concurrent.duration.DurationInt | |
// libraryDependencies += "co.fs2" %% "fs2-core" % "3.1.1" | |
// libraryDependencies += "org.typelevel" %% "cats-effect" % "3.2.9" | |
object StopOnSignal extends IOApp { | |
def producer(queue: Queue[IO, Option[String]], | |
stopSignal: SignallingRef[IO, Boolean]): Stream[IO, Unit] = | |
Stream | |
.awakeEvery[IO](250.millis) | |
.interruptWhen(stopSignal) | |
.evalMap(timeSinceStart => IO.pure(s"MSG: ${timeSinceStart.toMillis}")) | |
.evalTap(message => IO(println(s"produced $message"))) | |
.enqueueNoneTerminated(queue) | |
override def run(args: List[String]): IO[ExitCode] = for { | |
stopSignal <- SignallingRef.of[IO, Boolean](initial = false) | |
_ <- IO { | |
val stopIt : SignalHandler = (signal: Signal) => { | |
println(s"stopping, got $signal ...") | |
stopSignal.set(true).unsafeRunSync()(runtime) | |
} | |
Signal.handle(new Signal("INT"), stopIt) | |
Signal.handle(new Signal("TERM"), stopIt) | |
} | |
sleepAndStop = | |
Stream.sleep[IO](10.seconds) >> | |
Stream.eval(IO(println("stopping..."))) >> | |
Stream.eval(stopSignal.set(true)) | |
// enqueue will block on full capacity | |
workQueue <- Queue.bounded[IO, Option[String]](capacity = 1000) | |
processQueue = | |
Stream | |
.fromQueueNoneTerminated(workQueue) | |
.parEvalMap(maxConcurrent = 2) { message => | |
// the processing will be slower than the consumer, so we will have pending work after the stop | |
IO(println(s"processing $message (time: ${LocalDateTime.now()})")) >> IO.sleep(1.seconds) | |
} | |
_ <- | |
// note: the order matters in this composition, | |
// producer(...).concurrently(processQueue) would stop at the signal and not finish the work | |
// the passed streams can be seen as child streams which stop with the parent | |
processQueue | |
.concurrently(producer(workQueue, stopSignal)) | |
.concurrently(sleepAndStop) | |
.compile | |
.drain | |
.uncancelable | |
} yield ExitCode.Success | |
} | |
//produced MSG: 257 | |
//processing MSG: 257 (time: 2021-09-29T08:46:12.485133353) | |
//produced MSG: 501 | |
//processing MSG: 501 (time: 2021-09-29T08:46:12.654272077) | |
//produced MSG: 751 | |
//produced MSG: 1001 | |
//produced MSG: 1251 | |
//processing MSG: 751 (time: 2021-09-29T08:46:13.534405469) | |
//produced MSG: 1501 | |
//processing MSG: 1001 (time: 2021-09-29T08:46:13.660407026) | |
//produced MSG: 1751 | |
//produced MSG: 2001 | |
//produced MSG: 2251 | |
//processing MSG: 1251 (time: 2021-09-29T08:46:14.540848768) | |
//produced MSG: 2501 | |
//processing MSG: 1501 (time: 2021-09-29T08:46:14.666566666) | |
//produced MSG: 2750 | |
//produced MSG: 3000 | |
//produced MSG: 3250 | |
//processing MSG: 1751 (time: 2021-09-29T08:46:15.546715572) | |
//produced MSG: 3501 | |
//processing MSG: 2001 (time: 2021-09-29T08:46:15.670774071) | |
//stopping, got SIGINT ... | |
//processing MSG: 2251 (time: 2021-09-29T08:46:16.551449078) | |
//processing MSG: 2501 (time: 2021-09-29T08:46:16.676261631) | |
//processing MSG: 2750 (time: 2021-09-29T08:46:17.555131663) | |
//processing MSG: 3000 (time: 2021-09-29T08:46:17.680603397) | |
//processing MSG: 3250 (time: 2021-09-29T08:46:18.560474060) | |
//processing MSG: 3501 (time: 2021-09-29T08:46:18.686133482) | |
// | |
//Process finished with exit code 0 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment