Last active
December 25, 2015 21:39
-
-
Save sirmax/7043591 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.twitter.concurrent.{Offer, Broker} | |
import com.twitter.conversions.time._ | |
import com.twitter.finagle._ | |
import com.twitter.util._ | |
object Reeval { | |
/** Given a function `mkVar => Var[A]` and a time interval create another `Var[A]` that: | |
* - initially has a value equal to the value of evaluated `mkVar` | |
* - passes all updates of created var to the observers | |
* - after the interval passes, detaches itself from the var, evaluates `mkVar` again, and attaches to it. | |
* | |
* This may be used with `Var`s that keep the initially evaluated result, | |
* but their underlying value could actually change. | |
* For example as of Finagle 6.7 `Resolver.eval(name).bind()` yields a set of addresses | |
* that won't change even if there is a change to the network, the DNS, etc. | |
*/ | |
def aVar[T](mkVar: => Var[T])(implicit timer: Timer, reevalInterval: Duration = 5.seconds): Var[T] = new Var[T] { | |
private[this] val store = Var(mkVar()) | |
private[this] val observersCountBroker = new Broker[Int] | |
// All vars are guarded by the Offer loop | |
private[this] var observersCount = 0 | |
private[this] var currentObserver = Closable.nop | |
private[this] var nextUpdate: Offer[Unit] = Offer.never | |
private def loop(): Unit = Offer.select( | |
observersCountBroker.recv map { delta => | |
val oldCount = observersCount | |
observersCount += delta | |
(oldCount, observersCount) match { | |
case (0, 1) => | |
currentObserver = mkVar.observe(store.update) | |
nextUpdate = Offer.timeout(reevalInterval) | |
case (1, 0) => | |
currentObserver.close() | |
nextUpdate = Offer.never | |
case _ => // no changes needed | |
} | |
}, | |
nextUpdate const { | |
currentObserver.close() | |
currentObserver = mkVar.observe(store.update) | |
nextUpdate = Offer.timeout(reevalInterval) | |
} | |
).ensure(loop()) | |
protected def observe(depth: Int, callback: T => Unit) = { | |
observersCountBroker ! 1 | |
val closable = store.observe(callback) | |
Closable.sequence(closable, Closable make { _ => observersCountBroker ! -1 }) | |
} | |
{ | |
// Kick off the loop | |
loop() | |
} | |
} | |
def name(n: Name)(implicit timer: Timer, reevalInterval: Duration = 5.seconds): Name = new Name { | |
def bind() = aVar(n.bind()) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.scalatest.FreeSpec | |
import com.twitter.conversions.time._ | |
import com.twitter.util.{MockTimer, Time, Var} | |
class ReevalTest extends FreeSpec { | |
"reeval while observing" in Time.withCurrentTimeFrozen { tc => | |
val timer = new MockTimer | |
var countVar = Var(1) | |
def currentCountVar = countVar | |
var observedValue = 0 | |
val observer = Reeval.aVar(currentCountVar)(timer, 1.second).observe(observedValue = _) | |
// initial value | |
assert(observedValue === 1) | |
// update the var value | |
countVar() = 2 | |
assert(observedValue === 2) | |
// changed the var reference, but the reeval interval hasn't passes yet | |
countVar = Var(3) | |
tc.advance(500.millis) | |
timer.tick() | |
assert(observedValue === 2) | |
// over the reeval interval | |
tc.advance(1.second) | |
timer.tick() | |
assert(observedValue === 3) | |
// no updates after observer is closed | |
observer.close() | |
countVar() === 4 | |
assert(observedValue === 3) | |
countVar = Var(4) | |
tc.advance(1.second) | |
timer.tick() | |
assert(observedValue === 3) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment