Skip to content

Instantly share code, notes, and snippets.

Created September 29, 2021 06:50
Show Gist options
  • Save adrobisch/f530ab482c35a0a8dcd395355eea5c2d to your computer and use it in GitHub Desktop.
Save adrobisch/f530ab482c35a0a8dcd395355eea5c2d to your computer and use it in GitHub Desktop.
handle pending work after a stop signal with fs2 and cats-effect
package stop
import cats.effect.std.Queue
import cats.effect.{ExitCode, IO, IOApp}
import fs2._
import fs2.concurrent.SignallingRef
import sun.misc.{Signal, SignalHandler}
import java.time.LocalDateTime
import scala.concurrent.duration.DurationInt
// libraryDependencies += "co.fs2" %% "fs2-core" % "3.1.1"
// libraryDependencies += "org.typelevel" %% "cats-effect" % "3.2.9"
object StopOnSignal extends IOApp {
def producer(queue: Queue[IO, Option[String]],
stopSignal: SignallingRef[IO, Boolean]): Stream[IO, Unit] =
.evalMap(timeSinceStart => IO.pure(s"MSG: ${timeSinceStart.toMillis}"))
.evalTap(message => IO(println(s"produced $message")))
override def run(args: List[String]): IO[ExitCode] = for {
stopSignal <- SignallingRef.of[IO, Boolean](initial = false)
_ <- IO {
val stopIt : SignalHandler = (signal: Signal) => {
println(s"stopping, got $signal ...")
Signal.handle(new Signal("INT"), stopIt)
Signal.handle(new Signal("TERM"), stopIt)
sleepAndStop =
Stream.sleep[IO](10.seconds) >>
Stream.eval(IO(println("stopping..."))) >>
// enqueue will block on full capacity
workQueue <- Queue.bounded[IO, Option[String]](capacity = 1000)
processQueue =
.parEvalMap(maxConcurrent = 2) { message =>
// the processing will be slower than the consumer, so we will have pending work after the stop
IO(println(s"processing $message (time: ${})")) >> IO.sleep(1.seconds)
_ <-
// note: the order matters in this composition,
// producer(...).concurrently(processQueue) would stop at the signal and not finish the work
// the passed streams can be seen as child streams which stop with the parent
.concurrently(producer(workQueue, stopSignal))
} yield ExitCode.Success
//produced MSG: 257
//processing MSG: 257 (time: 2021-09-29T08:46:12.485133353)
//produced MSG: 501
//processing MSG: 501 (time: 2021-09-29T08:46:12.654272077)
//produced MSG: 751
//produced MSG: 1001
//produced MSG: 1251
//processing MSG: 751 (time: 2021-09-29T08:46:13.534405469)
//produced MSG: 1501
//processing MSG: 1001 (time: 2021-09-29T08:46:13.660407026)
//produced MSG: 1751
//produced MSG: 2001
//produced MSG: 2251
//processing MSG: 1251 (time: 2021-09-29T08:46:14.540848768)
//produced MSG: 2501
//processing MSG: 1501 (time: 2021-09-29T08:46:14.666566666)
//produced MSG: 2750
//produced MSG: 3000
//produced MSG: 3250
//processing MSG: 1751 (time: 2021-09-29T08:46:15.546715572)
//produced MSG: 3501
//processing MSG: 2001 (time: 2021-09-29T08:46:15.670774071)
//stopping, got SIGINT ...
//processing MSG: 2251 (time: 2021-09-29T08:46:16.551449078)
//processing MSG: 2501 (time: 2021-09-29T08:46:16.676261631)
//processing MSG: 2750 (time: 2021-09-29T08:46:17.555131663)
//processing MSG: 3000 (time: 2021-09-29T08:46:17.680603397)
//processing MSG: 3250 (time: 2021-09-29T08:46:18.560474060)
//processing MSG: 3501 (time: 2021-09-29T08:46:18.686133482)
//Process finished with exit code 0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment