Instantly share code, notes, and snippets.
Created
September 27, 2019 18:40
-
Star
0
(0)
You must be signed in to star a gist -
Fork
0
(0)
You must be signed in to fork a gist
-
Save Daenyth/5cad4e68ed9804ca3c855328f4e0b0ad to your computer and use it in GitHub Desktop.
PauseButton for cats-effect / fs2
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.time.Instant | |
import cats.effect.concurrent.Deferred | |
import cats.effect.implicits._ | |
import cats.effect.{Concurrent, Timer, Clock} | |
import cats.implicits._ | |
import fs2.concurrent.{Signal, SignallingRef} | |
import scala.concurrent.duration._ | |
/** Encodes "is paused" state along with the ability to set a "paused" state which automatically un-pauses at a specific time */ | |
trait PauseButton[F[_]] { | |
/** Sets the state to "paused" if `time` is in the future, returning immediately. | |
* | |
* The button will automatically become unpaused after `time` unless another call to | |
* `pauseUntil` specifies a new time farther in the future | |
* | |
* The returned `Deferred` will be completed when the button becomes unpaused | |
*/ | |
def pauseUntil(time: Instant): F[Deferred[F, Unit]] | |
/** The current pause state */ | |
def isPaused: Signal[F, Boolean] | |
} | |
object PauseButton { | |
def create[F[_]: Concurrent: Timer]: F[PauseButton[F]] = | |
SignallingRef[F, Option[(Instant, Deferred[F, Unit])]](None) | |
.map(new PauseButtonImpl[F](_)) | |
private class PauseButtonImpl[F[_]]( | |
signal: SignallingRef[F, Option[(Instant, Deferred[F, Unit])]] | |
)(implicit F: Concurrent[F], timer: Timer[F]) | |
extends PauseButton[F] { | |
override def pauseUntil(time: Instant): F[Deferred[F, Unit]] = | |
getNow.flatMap { now => | |
if (now.isAfter(time)) | |
Deferred[F, Unit].flatTap(_.complete(())) | |
else | |
signal | |
.modify[F[Deferred[F, Unit]]] { | |
case None => | |
val gate = Deferred.unsafe[F, Unit] | |
(time, gate).some -> resetAt(time).as(gate) | |
case Some((oldTimeout, oldGate)) => | |
if (oldTimeout.isBefore(time)) { | |
val newGate = Deferred.unsafe[F, Unit] | |
// This completes the old gate when the new gate completes | |
val closeOldGateLater = | |
(newGate.get >> oldGate.complete(())).start.void | |
val next = time -> newGate | |
val action = resetAt(time) >> closeOldGateLater | |
next.some -> action.as(newGate) | |
} else | |
(oldTimeout, oldGate).some -> oldGate.pure[F] | |
} | |
.flatten | |
} | |
private def resetAt(time: Instant): F[Unit] = | |
getNow.flatMap { now => | |
val sleep: FiniteDuration = Duration.fromNanos(java.time.Duration.between(now, time).toNanos) | |
val resetLater = | |
timer.sleep(sleep) >> | |
signal | |
.modify[F[Unit]] { | |
case Some((resetTime, gate)) if resetTime == time => | |
none[(Instant, Deferred[F, Unit])] -> gate.complete(()) | |
case s => | |
s -> F.unit | |
} | |
.flatten | |
resetLater.start.void | |
} | |
private val getNow = | |
Clock[F] | |
.realTime(MILLISECONDS) | |
.map(Instant.ofEpochMilli) | |
override val isPaused: Signal[F, Boolean] = | |
signal.map(_.isDefined) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cats.implicits._ | |
import org.scalatest.{AsyncFunSpec, Inspectors, Matchers, Assertion, AsyncTestSuite} | |
import cats.effect.laws.util.TestContext | |
import cats.effect._ | |
import org.scalactic.source.Position | |
import scala.concurrent.{ExecutionContext, Future} | |
import scala.concurrent.duration._ | |
class PauseButtonSpec | |
extends AsyncFunSpec with Matchers with TestContextShiftTest with Inspectors { | |
private implicit val javaInstantOrdering: Order[Instant] = Order.by(t => (t.getEpochSecond, t.getNano)) | |
describe("PauseButton") { | |
describe("pauseUntil") { | |
it("returns a Deferred that completes after the time specified") { | |
for { | |
now <- instantNow | |
later = now.plusSeconds(10) | |
pb <- PauseButton.create[IO] | |
gate <- pb.pauseUntil(later) | |
_ <- gate.get | |
end <- instantNow | |
} yield assert(end >= later) | |
} | |
describe("multiple calls") { | |
it("extends (sooner, later) calls to (later, later)") { | |
for { | |
now <- instantNow | |
sooner = now.plusSeconds(10) | |
later = sooner.plusSeconds(10) | |
pb <- PauseButton.create[IO] | |
gate <- pb.pauseUntil(sooner) | |
_ <- pb.pauseUntil(later) | |
_ <- gate.get | |
end <- instantNow | |
} yield { | |
assert(end >= later) | |
} | |
} | |
it("extends (later, sooner) calls to (later, later)") { | |
for { | |
now <- instantNow | |
sooner = now.plusSeconds(10) | |
later = sooner.plusSeconds(10) | |
pb <- PauseButton.create[IO] | |
_ <- pb.pauseUntil(later) | |
gate2 <- pb.pauseUntil(sooner) | |
_ <- gate2.get | |
end <- instantNow | |
} yield { | |
assert(end >= later) | |
} | |
} | |
} | |
it("doesn't pause when 'until' is in the past") { | |
for { | |
startTime <- instantNow | |
pb <- PauseButton.create[IO] | |
_ <- timer.sleep(1.second) | |
gate <- pb.pauseUntil(startTime) | |
afterPause <- instantNow | |
_ <- gate.get | |
afterGate <- instantNow | |
} yield assert(afterPause == afterGate) | |
} | |
} | |
describe("isPaused") { | |
it("emits pause status that updates when paused") { | |
for { | |
now <- instantNow | |
later = now.plusSeconds(10) | |
pb <- PauseButton.create[IO] | |
p1 <- pb.isPaused.get | |
gate <- pb.pauseUntil(later) | |
p2 <- pb.isPaused.get | |
_ <- gate.get | |
p3 <- pb.isPaused.get | |
} yield { | |
p1 shouldBe false | |
p2 shouldBe true | |
p3 shouldBe false | |
} | |
} | |
} | |
} | |
private val instantNow = teikametrics.clock.instantNow[IO] | |
} | |
trait TestContextShiftTest { this: AsyncTestSuite => | |
final protected val ctx = TestContext() | |
final protected implicit val CS: ContextShift[IO] = ctx.contextShift[IO](IO.ioEffect) | |
final protected implicit val timer: Timer[IO] = ctx.timer[IO] | |
final implicit def extremelyUnsafeIOAssertionToFuture( | |
test: IO[Assertion] | |
)(implicit pos: Position): Future[Assertion] = { | |
val result: Future[Assertion] = test.unsafeToFuture() | |
ctx.tick(1000.day) // Advance the clock | |
if (result.value.isDefined) | |
result | |
else { | |
fail( | |
s"""Test probably deadlocked. Test `IO` didn't resolve after simulating 1000 days of time. | |
| Remaining tasks: ${ctx.state.tasks}""".stripMargin | |
)(pos) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment