Last active
October 13, 2021 05:18
-
-
Save ChristopherDavenport/cea3274057780cd3170bae60a19b4e17 to your computer and use it in GitHub Desktop.
Advanced KeyPool with additional features and abilties.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cats.syntax.all._ | |
import org.typelevel.keypool._ | |
import io.chrisdavenport.mapref._ | |
import scala.concurrent.duration._ | |
import cats.effect.kernel._ | |
import scala.collection.immutable.Queue | |
object KeypoolAdvanced { | |
private[KeypoolAdvanced] object Implementation { | |
// A created resource this can be loaned or can be in the pool. | |
case class ExtantResource[F[_], A](resource: A, created: FiniteDuration, resourceIdentity: Unique.Token, shutdown: F[Unit]) | |
object ExtantResource { | |
def toManaged[F[_], A]( | |
extantResource: ExtantResource[F, A], | |
defaultReuseable: Reusable, | |
isReused: Boolean | |
)(implicit F: Concurrent[F]): F[Managed[F, A]] = | |
Ref[F].of(defaultReuseable).map(ref => new Managed[F, A](extantResource.resource, isReused, ref)) | |
} | |
case class InPool[F[_], A](insertedAt: FiniteDuration, resource: ExtantResource[F, A]) | |
case class LoanedResource[F[_], A](takenAt: FiniteDuration, resource: ExtantResource[F, A]) // If we hold reference to the resource we can detect and murder it. Uncertain if thats good | |
case class PairedLoaner[F[_], A](managed: Managed[F, A], extant: ExtantResource[F, A]) | |
sealed trait KeyState[F[_], A] | |
object KeyState { | |
case class Empty[F[_], A]() extends KeyState[F, A] | |
object Empty { | |
private val instance = Empty() | |
def as[F[_], A]: Empty[F, A] = instance.asInstanceOf[Empty[F, A]] | |
} | |
case class Available[F[_], A](loaned: List[LoanedResource[F, A]], available: Queue[InPool[F, A]]) extends KeyState[F, A] | |
// Waiters are used when the pool is currently fully loaned out to its maximum | |
case class Waiter[F[_], A](waiterIdentity: Unique.Token, deferred: Deferred[F, Either[Throwable, PairedLoaner[F, A]]]) | |
object Waiter { | |
def create[F[_]: Concurrent, A]: F[Waiter[F, A]] = | |
(Unique[F].unique, Deferred[F, Either[Throwable, PairedLoaner[F, A]]]).mapN(Waiter(_, _)) | |
} | |
case class Waiting[F[_], A](loaned: List[LoanedResource[F, A]], waiters: Queue[Waiter[F, A]]) extends KeyState[F, A] | |
} | |
sealed trait PoolActions[F[_], A] | |
object PoolActions { | |
case class Return[F[_], A](managed: Managed[F, A], extantResource: ExtantResource[F, A]) extends PoolActions[F, A] | |
} | |
sealed trait OnOverdueLoans[F[_], K, V] | |
object OnOverdueLoans { | |
case class KillResource[F[_], K, V](additionalActions: (K, V) => F[Unit]) extends OnOverdueLoans[F, K, V] | |
case class RemoveButLeaveAlive[F[_], K, V](additionalActions: (K, V) => F[Unit]) extends OnOverdueLoans[F, K, V] | |
} | |
/** | |
* This tracks time resources are being used | |
**/ | |
class Advanced[F[_]: Temporal, K, V]( | |
poolState: MapRef[F, K, KeyState[F, V]], | |
// Can Either take keys or take the whole state to do as a single atomic. | |
// Single atomic is much simpler but not relevant for systems with distributed keying mechanisms. | |
// Opting for Generic for iteration. | |
keys: F[List[K]], | |
// ---- Tools required for knowing how and when to create new resources ----// | |
createResource: K => Resource[F, V], | |
defaultReuseState: Reusable, | |
maxPerKey: K => Int, // Max total per key both in the pool and loaned simultaneously before queueing waiters.. | |
// ---- For Managing Backgrounds and resource shutdowns ---- // | |
maxIdleLifetime: Duration, // Max time this resource can sit in the pool unused. | |
maxResourceLifetime: Duration, // Max time the resource can be alive. If its in the pool after this time its killed. | |
maxLoanTime: Duration, // Max time this resource can be loaned before it will no longer be valid in the pool. | |
onOverdueLoans: OnOverdueLoans[F, K, V], // How to handle overdue loans | |
resourceValid: V => F[Boolean], // How we can check if a resource is valid or not while in the pool. | |
) extends KeyPool[F, K, V]{ | |
def createExtantResourceF(k: K): F[ExtantResource[F, V]] = | |
Concurrent[F].uncancelable(poll => | |
(poll(createResource(k).allocated), Temporal[F].realTime, Unique[F].unique) | |
.mapN{ case ((a, shutdown), now, token) => ExtantResource(a, now, token, shutdown)} | |
) | |
def createManaged(resource: ExtantResource[F, V], isReused: Boolean): F[Managed[F, V]] = | |
ExtantResource.toManaged(resource, defaultReuseState, isReused) | |
def take(k: K): Resource[F,Managed[F,V]] = { | |
val maxForKey: Int = maxPerKey(k) | |
Resource.eval(KeyState.Waiter.create[F, V]).flatMap{(waiter: KeyState.Waiter[F, V]) => | |
Resource.uncancelable{(poll: Poll[Resource[F, *]]) => | |
Resource.eval(poolState(k).modify{ | |
case KeyState.Empty() => ??? | |
case KeyState.Available(loaned, available) => ??? | |
case KeyState.Waiting(loaned, waiters) => ??? | |
}) | |
} | |
} | |
} | |
def state: F[(Int, Map[K,Int])] = keys.flatMap(l => | |
l.traverse(k => poolState(k).get.map{ | |
case KeyState.Empty() => k -> 0 | |
case KeyState.Available(loaned, available) => k -> available.size | |
case KeyState.Waiting(loaned, waiters) => k -> 0 | |
}).map{l => | |
val m = l.toMap | |
val total = l.foldLeft(0){ case (total, (_, i)) => total + i } | |
total -> m | |
} | |
) | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cats.syntax.all._ | |
import org.typelevel.keypool._ | |
import io.chrisdavenport.mapref._ | |
import scala.concurrent.duration._ | |
import cats.effect.kernel._ | |
import scala.collection.immutable.Queue | |
import cats.Applicative | |
import cats.effect.syntax.all._ | |
import java.awt.RenderingHints.Key | |
object KeypoolAdvanced { | |
private[KeypoolAdvanced] object Implementation { | |
// A created resource this can be loaned or can be in the pool. | |
case class ExtantResource[F[_], A](resource: A, created: FiniteDuration, resourceIdentity: Unique.Token, shutdown: F[Unit]) | |
object ExtantResource { | |
def toManaged[F[_], A]( | |
extantResource: ExtantResource[F, A], | |
defaultReuseable: Reusable, | |
isReused: Boolean | |
)(implicit F: Concurrent[F]): F[Managed[F, A]] = | |
Ref[F].of(defaultReuseable).map(ref => new Managed[F, A](extantResource.resource, isReused, ref)) | |
} | |
case class InPool[F[_], A](insertedAt: FiniteDuration, resource: ExtantResource[F, A]) | |
case class LoanedResource[F[_], A](takenAt: FiniteDuration, resource: ExtantResource[F, A]) // If we hold reference to the resource we can detect and murder it. Uncertain if thats good | |
case class PairedLoaner[F[_], A](managed: Managed[F, A], extant: ExtantResource[F, A]) | |
sealed trait KeyState[F[_], A] | |
object KeyState { | |
// Waiters are used when the pool is currently fully loaned out to its maximum | |
case class Waiter[F[_], A](waiterIdentity: Unique.Token, deferred: Deferred[F, Either[Throwable, PairedLoaner[F, A]]]) | |
object Waiter { | |
def create[F[_]: Concurrent, A]: F[Waiter[F, A]] = | |
(Unique[F].unique, Deferred[F, Either[Throwable, PairedLoaner[F, A]]]).mapN(Waiter(_, _)) | |
} | |
case class Empty[F[_], A]() extends KeyState[F, A] | |
object Empty { | |
private val instance = Empty() | |
def empty[F[_], A]: Empty[F, A] = instance.asInstanceOf[Empty[F, A]] | |
} | |
case class Available[F[_], A](loaned: List[LoanedResource[F, A]], available: Queue[InPool[F, A]], creating: List[Waiter[F, A]]) extends KeyState[F, A] | |
case class Waiting[F[_], A](loaned: List[LoanedResource[F, A]], waiters: Queue[Waiter[F, A]], creating: List[Waiter[F, A]]) extends KeyState[F, A] | |
} | |
sealed trait PoolActions[F[_], A] | |
object PoolActions { | |
case class Return[F[_], A](managed: Managed[F, A], extantResource: ExtantResource[F, A]) extends PoolActions[F, A] | |
} | |
sealed trait OnOverdueLoans[F[_], K, V] | |
object OnOverdueLoans { | |
case class KillResource[F[_], K, V](additionalActions: (K, V) => F[Unit]) extends OnOverdueLoans[F, K, V] | |
case class RemoveButLeaveAlive[F[_], K, V](additionalActions: (K, V) => F[Unit]) extends OnOverdueLoans[F, K, V] | |
} | |
sealed trait Command | |
object Command { | |
case class CreateNewForWaiter() extends Command | |
} | |
/** | |
* This tracks time resources are being used | |
**/ | |
class Advanced[F[_], K, V]( | |
poolState: MapRef[F, K, KeyState[F, V]], | |
// Can Either take keys or take the whole state to do as a single atomic. | |
// Single atomic is much simpler but not relevant for systems with distributed keying mechanisms. | |
// Opting for Generic for iteration. | |
keys: F[List[K]], | |
// I chose to use a processing queue for spawning background creations for other tasks. | |
// otherwise we could iterate infinitely trying to create if the resource is dead. | |
commands: cats.effect.std.Queue[F, Command], | |
// ---- Tools required for knowing how and when to create new resources ----// | |
createResource: K => Resource[F, V], | |
defaultReuseState: Reusable, | |
maxPerKey: K => Int, // Max total per key both in the pool and loaned simultaneously before queueing waiters.. | |
// ---- For Managing Backgrounds and resource shutdowns ---- // | |
maxIdleLifetime: Duration, // Max time this resource can sit in the pool unused. | |
maxResourceLifetime: Duration, // Max time the resource can be alive. If its in the pool after this time its killed. | |
maxLoanTime: Duration, // Max time this resource can be loaned before it will no longer be valid in the pool. | |
onOverdueLoans: OnOverdueLoans[F, K, V], // How to handle overdue loans | |
resourceValid: V => F[Boolean], // How we can check if a resource is valid or not while in the pool. | |
)(implicit F: Temporal[F]) extends KeyPool[F, K, V]{ | |
def createExtantResourceF(k: K, poll: Poll[F]): F[ExtantResource[F, V]] = | |
(poll(createResource(k).allocated), Temporal[F].realTime, Unique[F].unique) | |
.mapN{ case ((a, shutdown), now, token) => ExtantResource(a, now, token, shutdown)} | |
def createManaged(resource: ExtantResource[F, V], isReused: Boolean): F[Managed[F, V]] = | |
ExtantResource.toManaged(resource, defaultReuseState, isReused) | |
def create(k: K, poll: Poll[F]): F[PairedLoaner[F, V]] = { | |
createExtantResourceF(k, poll).flatMap{resource => | |
poll(createManaged(resource, false)).map{managed => | |
PairedLoaner(managed, resource) | |
} | |
} | |
} | |
def createNew(k: K, waiter: KeyState.Waiter[F, V], poll: Poll[F]): F[PairedLoaner[F, V]] = { | |
createGuarantees(k, waiter, poll, create(k, poll)) | |
} | |
def createFromPool(k: K, waiter: KeyState.Waiter[F, V], poll: Poll[F], inpool: InPool[F, V]): F[PairedLoaner[F, V]] = { | |
val action = poll(createManaged(inpool.resource, true)).map(PairedLoaner(_, inpool.resource)) | |
createGuarantees(k, waiter, poll, action) | |
} | |
def createGuarantees(k: K, waiter: KeyState.Waiter[F, V], poll: Poll[F], create: F[PairedLoaner[F, V]]): F[PairedLoaner[F, V]] = { | |
create.guaranteeCase{ | |
case Outcome.Succeeded(fLoaner) => | |
fLoaner.flatMap(loaner => | |
F.realTime.flatMap{ now => | |
val loaned = LoanedResource(now, loaner.extant) | |
poolState(k).modify{ | |
case KeyState.Empty() => | |
(KeyState.Available(loaned :: Nil, Queue(), List()), F.unit) | |
case KeyState.Available(loans, available, creating) => | |
(KeyState.Available(loaned :: loans, available, creating.filter(_.waiterIdentity != waiter.waiterIdentity)), F.unit) | |
case KeyState.Waiting(loaned, waiters, creating) => | |
val exists = creating.exists(_.waiterIdentity === waiter.waiterIdentity) | |
val command = if (exists) commands.offer(Command.CreateNewForWaiter()) else F.unit | |
(KeyState.Waiting(loaned, waiters, creating.filter(_.waiterIdentity != waiter.waiterIdentity)), command) | |
}.flatten | |
} | |
) | |
case _ => | |
poolState(k).modify{ | |
case KeyState.Empty() => (KeyState.Empty(),F .unit) | |
case KeyState.Available(loaned, available, creating) => | |
(KeyState.Available(loaned, available, creating.filter(_.waiterIdentity != waiter.waiterIdentity)), F.unit) | |
case KeyState.Waiting(loaned, waiters, creating) => | |
val exists = creating.exists(_.waiterIdentity === waiter.waiterIdentity) | |
val command = if (exists) commands.offer(Command.CreateNewForWaiter()) else F.unit | |
(KeyState.Waiting(loaned, waiters, creating.filter(_.waiterIdentity != waiter.waiterIdentity)), command) | |
}.flatten | |
} | |
} | |
def takeMake(k: K, waiter: KeyState.Waiter[F, V], poll: Poll[F]): F[PairedLoaner[F, V]] = { | |
val maxForKey: Int = maxPerKey(k) | |
poolState(k).modify{ | |
case [email protected]() => | |
if (maxForKey > 0) { | |
// Swap to Available this is so while the waiter is present we treat it as a member of the pool | |
// preventing additional resources from being created past the cap. | |
(KeyState.Available(List(), Queue(), waiter :: Nil): KeyState[F, V], createNew(k, waiter, poll)) | |
} else if (maxForKey == 1) { | |
(KeyState.Waiting(List(), Queue(), waiter :: Nil), createNew(k, waiter, poll)) | |
} else { | |
(ks, F.raiseError[PairedLoaner[F, V]](new RuntimeException("Key Requested that will never receive a value"))) | |
} | |
case KeyState.Available(loaned, available, creating) => | |
available.dequeueOption match { | |
case None => | |
val next = loaned.size + creating.size + 1 | |
if (next >= maxForKey) { | |
(KeyState.Waiting(loaned, Queue(), waiter :: creating), createNew(k, waiter, poll)) | |
} else { | |
(KeyState.Available(loaned, available, waiter :: creating), createNew(k, waiter, poll)) | |
} | |
case Some((inpool, rest)) => | |
(KeyState.Available(loaned, rest, waiter :: creating), createFromPool(k, waiter, poll, inpool)) | |
} | |
case KeyState.Waiting(loaned, waiters, creating) => | |
(KeyState.Waiting(loaned, waiters.enqueue(waiter), creating), waiter.deferred.get.rethrow) | |
}.flatten | |
} | |
def take(k: K): Resource[F,Managed[F,V]] = { | |
Resource.eval(KeyState.Waiter.create[F, V]).flatMap{(waiter: KeyState.Waiter[F, V]) => | |
Resource.makeCaseFull{(poll: Poll[F]) => | |
takeMake(k, waiter, poll) | |
}{ | |
// How do we handle Resource shutdown | |
case (_, _) => Applicative[F].unit | |
}.map(_.managed) | |
} | |
} | |
def state: F[(Int, Map[K,Int])] = keys.flatMap(l => | |
l.traverse(k => poolState(k).get.map{ | |
case KeyState.Empty() => k -> 0 | |
case KeyState.Available(loaned, available, _) => k -> available.size | |
case KeyState.Waiting(loaned, waiters, _) => k -> 0 | |
}).map{l => | |
val m = l.toMap | |
val total = l.foldLeft(0){ case (total, (_, i)) => total + i } | |
total -> m | |
} | |
) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment