Created
April 16, 2024 13:55
-
-
Save scf37/4071839f25e197e38e4b070cfbed977b to your computer and use it in GitHub Desktop.
Emulated time on Twitter Futures
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.twitter.util.Await | |
import com.twitter.util.Closable | |
import com.twitter.util.Duration | |
import com.twitter.util.Future | |
import com.twitter.util.Promise | |
import com.twitter.util.Time | |
import scala.collection.mutable | |
/** | |
* Timer instance that emulates time flow. | |
* Very useful for testing asynchronous code. | |
* | |
* @param startTime start timestamp for this timer | |
*/ | |
class MockTimer(startTime: Long = System.currentTimeMillis()) extends Timer { | |
private[this] var closed = false | |
private[this] var now: Long = startTime | |
private[this] val tasks = mutable.PriorityQueue.empty[Task] | |
/** | |
* Execute next scheduled task, advancing internal clock as needed | |
* | |
* @return true if something were executed | |
*/ | |
def tick(): Boolean = synchronized { | |
if (tasks.isEmpty) false else { | |
val t = tasks.dequeue() | |
now = t.startAt | |
t.task() | |
true | |
} | |
} | |
/** | |
* Consume all scheduled tasks or fail if there is no end to them | |
*/ | |
def tickAll(): Unit = { | |
val startTime = nowMillis | |
while (tick()) { | |
if (nowMillis - startTime > 1000 * 60 * 60 * 24 * 365) { | |
throw new RuntimeException("One year of emulated time has passed. Unable to consume all scheduled tasks.") | |
} | |
} | |
} | |
/** | |
* Return completed value of this future. This method behaves like Await.result with one year timeout | |
* except for it uses emulated time instead of real one. I.e. always comletes instantly | |
* | |
* @param a future to wait for | |
* @tparam T | |
* @return | |
*/ | |
def result[T](a: Future[T]): T = { | |
val startTime = nowMillis | |
while (!a.isDefined) { | |
if (!tick()) { | |
throw new RuntimeException("This Future will never complete") | |
} | |
if (nowMillis - startTime > 1000 * 60 * 60 * 24 * 365) { | |
throw new RuntimeException("One year of emulated time have passed. This Future will probably never complete.") | |
} | |
} | |
Await.result(a) | |
} | |
/** | |
* Execute all tasks scheduled for interval from now to now + [delay] | |
* @param delay | |
*/ | |
def waitFor(delay: Duration): Unit = synchronized { | |
val until = now + delay.inMillis | |
while (now < until) { | |
if (tasks.isEmpty) { | |
now = until | |
} else { | |
if (tasks.head.startAt <= until) { | |
tick() | |
} else { | |
now = until | |
} | |
} | |
} | |
} | |
/** | |
* Execute callback after specified delay and return execution result | |
*/ | |
override def schedule[T](delay: Duration)(task: => Future[T]): Future[T] = synchronized { | |
val p = Promise[T] | |
val t: Task = Task(now + delay.inMillis, () => { | |
p.become(task) | |
}, periodic = false) | |
tasks += t | |
p | |
} | |
/** | |
* Execute callback periodically. | |
* 'period' is delay between task finish (callback future completes) and next task start | |
*/ | |
override def schedule(delay: Duration, period: Duration)(task: => Future[Unit]): Closable = { | |
@volatile var closed = false | |
def loop(): Unit = if (!closed) { | |
schedule(period) { | |
if (!closed) { | |
task.ensure { | |
loop() | |
} | |
} | |
Future value Unit | |
} | |
} | |
schedule(delay) { | |
if (!closed) { | |
task.ensure { | |
loop() | |
} | |
} | |
Future value Unit | |
} | |
new Closable { | |
override def close(deadline: Time): Future[Unit] = { | |
require(!closed, "already closed") | |
closed = true | |
Future value Unit | |
} | |
} | |
} | |
/** | |
* | |
* @return relative time, in nanoseconds | |
*/ | |
override def nowNano: Long = now * 1000000 | |
override def nowMillis: Long = now | |
override def close(deadline: Time): Future[Unit] = synchronized { | |
require(!closed, "Already closed") | |
closed = true | |
Future value Unit | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment