Created August 12, 2022 13:49
resetTimeout fs2
/** Execute `onTimeout` every time the stream goes `timeout` duration with no
* elements
def resetTimeout[F[_]: Temporal, A](
timeout: FiniteDuration,
onTimeout: F[Unit]
): fs2.Pipe[F, A, A] = {
def go(timedPull: Pull.Timed[F, A]): Pull[F, A, Unit] =
timedPull.timeout(timeout) >>
timedPull.uncons.flatMap {
case Some((Left(_), rest)) => Pull.eval(onTimeout) >> go(rest)
case Some((Right(chunk), rest)) => Pull.output(chunk) >> go(rest)
case None => Pull.done
