Skip to content

Instantly share code, notes, and snippets.

@djspiewak
Created February 10, 2022 18:27
Show Gist options
  • Select an option

  • Save djspiewak/ef290d1dcfc4aa90069ebfd1ee290678 to your computer and use it in GitHub Desktop.

Select an option

Save djspiewak/ef290d1dcfc4aa90069ebfd1ee290678 to your computer and use it in GitHub Desktop.
private final class BoundedAsyncQueue[F[_], A](capacity: Int)(implicit F: Async[F]) extends Queue[F, A] {
require(capacity > 0)
private[this] val buffer = new UnsafeBounded[A](capacity)
private[this] val waiters = new UnsafeUnbounded[Either[Throwable, Unit] => Unit]()
def offer(a: A): F[Unit] = F defer {
try {
buffer.put(a)
notifyOne()
F.unit
} catch {
case FailureSignal =>
val wait = F.async[Unit] { k =>
F delay {
val clear = waiters.put(k)
try {
buffer.put(a)
clear()
k(EitherUnit)
notifyOne()
None
} catch {
case FailureSignal =>
Some(F.delay(clear()))
}
}
}
wait *> offer(a)
}
}
def tryOffer(a: A): F[Boolean] = F delay {
try {
buffer.put(a)
notifyOne()
true
} catch {
case FailureSignal =>
false
}
}
val size: F[Int] = F.delay(buffer.size())
val take: F[A] = F defer {
try {
val result = buffer.take()
notifyOne()
F.pure(result)
} catch {
case FailureSignal =>
var received = false
var result: A = null.asInstanceOf[A]
val wait = F.async[Unit] { k =>
F delay {
val clear = waiters.put(k)
try {
result = buffer.take()
clear()
received = true
k(EitherUnit)
notifyOne()
None
} catch {
case FailureSignal =>
Some(F.delay(clear()))
}
}
}
wait *> F.defer(if (received) F.pure(result) else take)
}
}
val tryTake: F[Option[A]] = F delay {
try {
val back = buffer.take()
notifyOne()
Some(back)
} catch {
case FailureSignal =>
None
}
}
// TODO could optimize notifications by checking if buffer is completely empty on put
private[this] def notifyOne(): Unit =
try {
waiters.take()(EitherUnit)
} catch {
case FailureSignal => ()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment