Created
June 1, 2024 20:31
-
-
Save outofmbufs/1cffcdc3d781b300370a4c1baed4c67d to your computer and use it in GitHub Desktop.
Python Adaptive delay - tracks cumulative delay and adjusts the sleep interval accordingly
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
from collections import deque | |
from itertools import pairwise, cycle | |
class AdaptiveDelay: | |
def __init__(self, interval, /, *, window_seconds=5): | |
self.interval = interval | |
self.window_seconds = window_seconds | |
# approximate (i.e., "expected") number of measurements. Minimum 2. | |
self.n = max(round(self.window_seconds / self.interval), 2) | |
# fixed-length FIFO of post-sleep timestamps | |
self.tq = deque([], self.n) | |
# these parameters are arbitrary and shouldn't need tweaking, but, | |
# well, here they are if they need to be tweaked | |
# _min_measures: number of measured intervals before any | |
# adjustment will be done (startup conditions) | |
# | |
# _makeup_n: number of intervals used to catch up | |
# | |
# _mindelay: absolute minimum delay to ever sleep (no matter | |
# how far behind the timing is) | |
self._min_measures = 10 | |
self._makeup_n = 5 | |
self._mindelay = 0.01 | |
# Having this as a method makes it possible to override for | |
# testing porpoises or other specialized requirements. | |
def _sleep(self, seconds): | |
time.sleep(seconds) | |
def delay(self): | |
"""Sleep for an interval, but adjusted based on real timing.""" | |
self._sleep(self._compute_delay()) | |
self.tq.append(time.perf_counter()) | |
def _compute_delay(self): | |
"""Return the adaptive amount to delay based on history.""" | |
# determine total delay over the measurement window | |
delays = list(map(lambda t: t[1] - t[0], pairwise(self.tq))) | |
n = len(delays) | |
if n < self._min_measures: | |
return self.interval | |
totaldelay = sum(delays) | |
# and the difference from what that should have been | |
# positive is: been running fast; negative is running slow | |
diff = (self.interval * n) - totaldelay | |
# make that up over the _makeup_n number of intervals, | |
# but never delay less than _mindelay | |
adj = diff / self._makeup_n | |
return max(self.interval + adj, self._mindelay) | |
if __name__ == "__main__": | |
import unittest | |
import random | |
class TestMethods(unittest.TestCase): | |
# empirically the tests get to within this percentage of target | |
# This is fundamentally fragile and there is a tradeoff between | |
# how close the AdaptiveDelay will get and how long the test | |
# has to run to get there. This seems to be a sweet spot. | |
TEST_TOLERANCE = 0.1 | |
TEST_DURATION = 30 # for EACH test that does this | |
def _distorted(self, interval, totaltime, fuzzes): | |
a = AdaptiveDelay(interval) | |
# crude, but effective | |
t_record = deque([], 1000) # useful for debugging / analysis | |
g = cycle(fuzzes) | |
def fuzzysleep(t): | |
t_record.append(t) | |
time.sleep(next(g) * t) | |
a._sleep = fuzzysleep | |
t0 = time.perf_counter() | |
for _ in range(round(totaltime / interval)): | |
a.delay() | |
return time.perf_counter() - t0, t_record | |
def _distorted_testlogic(self, fuzzes): | |
t, r = self._distorted(0.1, self.TEST_DURATION, fuzzes) | |
offby = (t - self.TEST_DURATION) / self.TEST_DURATION | |
self.assertTrue(abs(offby) < self.TEST_TOLERANCE) | |
def test_toolong(self): | |
self._distorted_testlogic([1.5]) | |
def test_waytoolong(self): | |
self._distorted_testlogic([3.0]) | |
# the "too short" conditions don't mesh well with the algorithm | |
# (which really "should be" a full-on PID controller) but in | |
# practice the delays are "never" too short so the robustness in | |
# that direction are less important. These pass... | |
def test_tooshort(self): | |
self._distorted_testlogic([0.9]) | |
def test_moretooshort(self): | |
self._distorted_testlogic([0.6]) | |
def test_varying_longs(self): | |
for fuzzes in ( | |
(1.1, 1.2, 1.3, 1.4, 1.5), | |
(1.2, 1.0, 1.2, 1.0, 1.2), | |
(2.0, 1.0, 1.0, 1.0), | |
(1.0, 1.0, 1.1, 3.0)): | |
with self.subTest(fuzzes=fuzzes): | |
self._distorted_testlogic(fuzzes) | |
def test_random(self): | |
for i in range(5): | |
fuzzes = [0.5 + random.random() for _ in range(20)] | |
with self.subTest(fuzzes=fuzzes): | |
self._distorted_testlogic(fuzzes) | |
print("NOTE: RUNNING THE COMPLETE TEST SUITE TAKES 4-5 MINUTES") | |
unittest.main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment