Skip to content

Instantly share code, notes, and snippets.

@ChristopherDavenport
Last active October 13, 2021 05:18
Show Gist options
  • Save ChristopherDavenport/cea3274057780cd3170bae60a19b4e17 to your computer and use it in GitHub Desktop.
Save ChristopherDavenport/cea3274057780cd3170bae60a19b4e17 to your computer and use it in GitHub Desktop.
Advanced KeyPool with additional features and abilties.
import cats.syntax.all._
import org.typelevel.keypool._
import io.chrisdavenport.mapref._
import scala.concurrent.duration._
import cats.effect.kernel._
import scala.collection.immutable.Queue
object KeypoolAdvanced {
private[KeypoolAdvanced] object Implementation {
// A created resource this can be loaned or can be in the pool.
case class ExtantResource[F[_], A](resource: A, created: FiniteDuration, resourceIdentity: Unique.Token, shutdown: F[Unit])
object ExtantResource {
def toManaged[F[_], A](
extantResource: ExtantResource[F, A],
defaultReuseable: Reusable,
isReused: Boolean
)(implicit F: Concurrent[F]): F[Managed[F, A]] =
Ref[F].of(defaultReuseable).map(ref => new Managed[F, A](extantResource.resource, isReused, ref))
}
case class InPool[F[_], A](insertedAt: FiniteDuration, resource: ExtantResource[F, A])
case class LoanedResource[F[_], A](takenAt: FiniteDuration, resource: ExtantResource[F, A]) // If we hold reference to the resource we can detect and murder it. Uncertain if thats good
case class PairedLoaner[F[_], A](managed: Managed[F, A], extant: ExtantResource[F, A])
sealed trait KeyState[F[_], A]
object KeyState {
case class Empty[F[_], A]() extends KeyState[F, A]
object Empty {
private val instance = Empty()
def as[F[_], A]: Empty[F, A] = instance.asInstanceOf[Empty[F, A]]
}
case class Available[F[_], A](loaned: List[LoanedResource[F, A]], available: Queue[InPool[F, A]]) extends KeyState[F, A]
// Waiters are used when the pool is currently fully loaned out to its maximum
case class Waiter[F[_], A](waiterIdentity: Unique.Token, deferred: Deferred[F, Either[Throwable, PairedLoaner[F, A]]])
object Waiter {
def create[F[_]: Concurrent, A]: F[Waiter[F, A]] =
(Unique[F].unique, Deferred[F, Either[Throwable, PairedLoaner[F, A]]]).mapN(Waiter(_, _))
}
case class Waiting[F[_], A](loaned: List[LoanedResource[F, A]], waiters: Queue[Waiter[F, A]]) extends KeyState[F, A]
}
sealed trait PoolActions[F[_], A]
object PoolActions {
case class Return[F[_], A](managed: Managed[F, A], extantResource: ExtantResource[F, A]) extends PoolActions[F, A]
}
sealed trait OnOverdueLoans[F[_], K, V]
object OnOverdueLoans {
case class KillResource[F[_], K, V](additionalActions: (K, V) => F[Unit]) extends OnOverdueLoans[F, K, V]
case class RemoveButLeaveAlive[F[_], K, V](additionalActions: (K, V) => F[Unit]) extends OnOverdueLoans[F, K, V]
}
/**
* This tracks time resources are being used
**/
class Advanced[F[_]: Temporal, K, V](
poolState: MapRef[F, K, KeyState[F, V]],
// Can Either take keys or take the whole state to do as a single atomic.
// Single atomic is much simpler but not relevant for systems with distributed keying mechanisms.
// Opting for Generic for iteration.
keys: F[List[K]],
// ---- Tools required for knowing how and when to create new resources ----//
createResource: K => Resource[F, V],
defaultReuseState: Reusable,
maxPerKey: K => Int, // Max total per key both in the pool and loaned simultaneously before queueing waiters..
// ---- For Managing Backgrounds and resource shutdowns ---- //
maxIdleLifetime: Duration, // Max time this resource can sit in the pool unused.
maxResourceLifetime: Duration, // Max time the resource can be alive. If its in the pool after this time its killed.
maxLoanTime: Duration, // Max time this resource can be loaned before it will no longer be valid in the pool.
onOverdueLoans: OnOverdueLoans[F, K, V], // How to handle overdue loans
resourceValid: V => F[Boolean], // How we can check if a resource is valid or not while in the pool.
) extends KeyPool[F, K, V]{
def createExtantResourceF(k: K): F[ExtantResource[F, V]] =
Concurrent[F].uncancelable(poll =>
(poll(createResource(k).allocated), Temporal[F].realTime, Unique[F].unique)
.mapN{ case ((a, shutdown), now, token) => ExtantResource(a, now, token, shutdown)}
)
def createManaged(resource: ExtantResource[F, V], isReused: Boolean): F[Managed[F, V]] =
ExtantResource.toManaged(resource, defaultReuseState, isReused)
def take(k: K): Resource[F,Managed[F,V]] = {
val maxForKey: Int = maxPerKey(k)
Resource.eval(KeyState.Waiter.create[F, V]).flatMap{(waiter: KeyState.Waiter[F, V]) =>
Resource.uncancelable{(poll: Poll[Resource[F, *]]) =>
Resource.eval(poolState(k).modify{
case KeyState.Empty() => ???
case KeyState.Available(loaned, available) => ???
case KeyState.Waiting(loaned, waiters) => ???
})
}
}
}
def state: F[(Int, Map[K,Int])] = keys.flatMap(l =>
l.traverse(k => poolState(k).get.map{
case KeyState.Empty() => k -> 0
case KeyState.Available(loaned, available) => k -> available.size
case KeyState.Waiting(loaned, waiters) => k -> 0
}).map{l =>
val m = l.toMap
val total = l.foldLeft(0){ case (total, (_, i)) => total + i }
total -> m
}
)
}
}
}
import cats.syntax.all._
import org.typelevel.keypool._
import io.chrisdavenport.mapref._
import scala.concurrent.duration._
import cats.effect.kernel._
import scala.collection.immutable.Queue
import cats.Applicative
import cats.effect.syntax.all._
import java.awt.RenderingHints.Key
object KeypoolAdvanced {
private[KeypoolAdvanced] object Implementation {
// A created resource this can be loaned or can be in the pool.
case class ExtantResource[F[_], A](resource: A, created: FiniteDuration, resourceIdentity: Unique.Token, shutdown: F[Unit])
object ExtantResource {
def toManaged[F[_], A](
extantResource: ExtantResource[F, A],
defaultReuseable: Reusable,
isReused: Boolean
)(implicit F: Concurrent[F]): F[Managed[F, A]] =
Ref[F].of(defaultReuseable).map(ref => new Managed[F, A](extantResource.resource, isReused, ref))
}
case class InPool[F[_], A](insertedAt: FiniteDuration, resource: ExtantResource[F, A])
case class LoanedResource[F[_], A](takenAt: FiniteDuration, resource: ExtantResource[F, A]) // If we hold reference to the resource we can detect and murder it. Uncertain if thats good
case class PairedLoaner[F[_], A](managed: Managed[F, A], extant: ExtantResource[F, A])
sealed trait KeyState[F[_], A]
object KeyState {
// Waiters are used when the pool is currently fully loaned out to its maximum
case class Waiter[F[_], A](waiterIdentity: Unique.Token, deferred: Deferred[F, Either[Throwable, PairedLoaner[F, A]]])
object Waiter {
def create[F[_]: Concurrent, A]: F[Waiter[F, A]] =
(Unique[F].unique, Deferred[F, Either[Throwable, PairedLoaner[F, A]]]).mapN(Waiter(_, _))
}
case class Empty[F[_], A]() extends KeyState[F, A]
object Empty {
private val instance = Empty()
def empty[F[_], A]: Empty[F, A] = instance.asInstanceOf[Empty[F, A]]
}
case class Available[F[_], A](loaned: List[LoanedResource[F, A]], available: Queue[InPool[F, A]], creating: List[Waiter[F, A]]) extends KeyState[F, A]
case class Waiting[F[_], A](loaned: List[LoanedResource[F, A]], waiters: Queue[Waiter[F, A]], creating: List[Waiter[F, A]]) extends KeyState[F, A]
}
sealed trait PoolActions[F[_], A]
object PoolActions {
case class Return[F[_], A](managed: Managed[F, A], extantResource: ExtantResource[F, A]) extends PoolActions[F, A]
}
sealed trait OnOverdueLoans[F[_], K, V]
object OnOverdueLoans {
case class KillResource[F[_], K, V](additionalActions: (K, V) => F[Unit]) extends OnOverdueLoans[F, K, V]
case class RemoveButLeaveAlive[F[_], K, V](additionalActions: (K, V) => F[Unit]) extends OnOverdueLoans[F, K, V]
}
sealed trait Command
object Command {
case class CreateNewForWaiter() extends Command
}
/**
* This tracks time resources are being used
**/
class Advanced[F[_], K, V](
poolState: MapRef[F, K, KeyState[F, V]],
// Can Either take keys or take the whole state to do as a single atomic.
// Single atomic is much simpler but not relevant for systems with distributed keying mechanisms.
// Opting for Generic for iteration.
keys: F[List[K]],
// I chose to use a processing queue for spawning background creations for other tasks.
// otherwise we could iterate infinitely trying to create if the resource is dead.
commands: cats.effect.std.Queue[F, Command],
// ---- Tools required for knowing how and when to create new resources ----//
createResource: K => Resource[F, V],
defaultReuseState: Reusable,
maxPerKey: K => Int, // Max total per key both in the pool and loaned simultaneously before queueing waiters..
// ---- For Managing Backgrounds and resource shutdowns ---- //
maxIdleLifetime: Duration, // Max time this resource can sit in the pool unused.
maxResourceLifetime: Duration, // Max time the resource can be alive. If its in the pool after this time its killed.
maxLoanTime: Duration, // Max time this resource can be loaned before it will no longer be valid in the pool.
onOverdueLoans: OnOverdueLoans[F, K, V], // How to handle overdue loans
resourceValid: V => F[Boolean], // How we can check if a resource is valid or not while in the pool.
)(implicit F: Temporal[F]) extends KeyPool[F, K, V]{
def createExtantResourceF(k: K, poll: Poll[F]): F[ExtantResource[F, V]] =
(poll(createResource(k).allocated), Temporal[F].realTime, Unique[F].unique)
.mapN{ case ((a, shutdown), now, token) => ExtantResource(a, now, token, shutdown)}
def createManaged(resource: ExtantResource[F, V], isReused: Boolean): F[Managed[F, V]] =
ExtantResource.toManaged(resource, defaultReuseState, isReused)
def create(k: K, poll: Poll[F]): F[PairedLoaner[F, V]] = {
createExtantResourceF(k, poll).flatMap{resource =>
poll(createManaged(resource, false)).map{managed =>
PairedLoaner(managed, resource)
}
}
}
def createNew(k: K, waiter: KeyState.Waiter[F, V], poll: Poll[F]): F[PairedLoaner[F, V]] = {
createGuarantees(k, waiter, poll, create(k, poll))
}
def createFromPool(k: K, waiter: KeyState.Waiter[F, V], poll: Poll[F], inpool: InPool[F, V]): F[PairedLoaner[F, V]] = {
val action = poll(createManaged(inpool.resource, true)).map(PairedLoaner(_, inpool.resource))
createGuarantees(k, waiter, poll, action)
}
def createGuarantees(k: K, waiter: KeyState.Waiter[F, V], poll: Poll[F], create: F[PairedLoaner[F, V]]): F[PairedLoaner[F, V]] = {
create.guaranteeCase{
case Outcome.Succeeded(fLoaner) =>
fLoaner.flatMap(loaner =>
F.realTime.flatMap{ now =>
val loaned = LoanedResource(now, loaner.extant)
poolState(k).modify{
case KeyState.Empty() =>
(KeyState.Available(loaned :: Nil, Queue(), List()), F.unit)
case KeyState.Available(loans, available, creating) =>
(KeyState.Available(loaned :: loans, available, creating.filter(_.waiterIdentity != waiter.waiterIdentity)), F.unit)
case KeyState.Waiting(loaned, waiters, creating) =>
val exists = creating.exists(_.waiterIdentity === waiter.waiterIdentity)
val command = if (exists) commands.offer(Command.CreateNewForWaiter()) else F.unit
(KeyState.Waiting(loaned, waiters, creating.filter(_.waiterIdentity != waiter.waiterIdentity)), command)
}.flatten
}
)
case _ =>
poolState(k).modify{
case KeyState.Empty() => (KeyState.Empty(),F .unit)
case KeyState.Available(loaned, available, creating) =>
(KeyState.Available(loaned, available, creating.filter(_.waiterIdentity != waiter.waiterIdentity)), F.unit)
case KeyState.Waiting(loaned, waiters, creating) =>
val exists = creating.exists(_.waiterIdentity === waiter.waiterIdentity)
val command = if (exists) commands.offer(Command.CreateNewForWaiter()) else F.unit
(KeyState.Waiting(loaned, waiters, creating.filter(_.waiterIdentity != waiter.waiterIdentity)), command)
}.flatten
}
}
def takeMake(k: K, waiter: KeyState.Waiter[F, V], poll: Poll[F]): F[PairedLoaner[F, V]] = {
val maxForKey: Int = maxPerKey(k)
poolState(k).modify{
case [email protected]() =>
if (maxForKey > 0) {
// Swap to Available this is so while the waiter is present we treat it as a member of the pool
// preventing additional resources from being created past the cap.
(KeyState.Available(List(), Queue(), waiter :: Nil): KeyState[F, V], createNew(k, waiter, poll))
} else if (maxForKey == 1) {
(KeyState.Waiting(List(), Queue(), waiter :: Nil), createNew(k, waiter, poll))
} else {
(ks, F.raiseError[PairedLoaner[F, V]](new RuntimeException("Key Requested that will never receive a value")))
}
case KeyState.Available(loaned, available, creating) =>
available.dequeueOption match {
case None =>
val next = loaned.size + creating.size + 1
if (next >= maxForKey) {
(KeyState.Waiting(loaned, Queue(), waiter :: creating), createNew(k, waiter, poll))
} else {
(KeyState.Available(loaned, available, waiter :: creating), createNew(k, waiter, poll))
}
case Some((inpool, rest)) =>
(KeyState.Available(loaned, rest, waiter :: creating), createFromPool(k, waiter, poll, inpool))
}
case KeyState.Waiting(loaned, waiters, creating) =>
(KeyState.Waiting(loaned, waiters.enqueue(waiter), creating), waiter.deferred.get.rethrow)
}.flatten
}
def take(k: K): Resource[F,Managed[F,V]] = {
Resource.eval(KeyState.Waiter.create[F, V]).flatMap{(waiter: KeyState.Waiter[F, V]) =>
Resource.makeCaseFull{(poll: Poll[F]) =>
takeMake(k, waiter, poll)
}{
// How do we handle Resource shutdown
case (_, _) => Applicative[F].unit
}.map(_.managed)
}
}
def state: F[(Int, Map[K,Int])] = keys.flatMap(l =>
l.traverse(k => poolState(k).get.map{
case KeyState.Empty() => k -> 0
case KeyState.Available(loaned, available, _) => k -> available.size
case KeyState.Waiting(loaned, waiters, _) => k -> 0
}).map{l =>
val m = l.toMap
val total = l.foldLeft(0){ case (total, (_, i)) => total + i }
total -> m
}
)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment