Created
August 30, 2024 21:48
-
-
Save outofmbufs/c39201ff01189161eb37ba68a2c95a5f to your computer and use it in GitHub Desktop.
Dynamically calculate delay needed to execute code at a desired constant interval
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# MIT License | |
# | |
# Copyright (c) 2024 Neil Webber | |
# | |
# Permission is hereby granted, free of charge, to any person obtaining a copy | |
# of this software and associated documentation files (the "Software"), to deal | |
# in the Software without restriction, including without limitation the rights | |
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
# copies of the Software, and to permit persons to whom the Software is | |
# furnished to do so, subject to the following conditions: | |
# | |
# The above copyright notice and this permission notice shall be included | |
# in all copies or substantial portions of the Software. | |
# | |
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
# SOFTWARE. | |
# | |
# A DriftAdjuster computes an adjusted delay for a desired/fixed repeating | |
# interval, compensating for overhead over a series of such intervals. | |
# | |
# For example, in a loop like this: | |
# fixed_interval = 0.01 # 10 msecs | |
# while True: | |
# ... do something here ... | |
# time.sleep(fixed_interval) | |
# | |
# If this loop executes N times, it will certainly take longer than | |
# N * fixed_interval | |
# seconds of real time, because of the overhead in the "do something here" | |
# code and other general system/python/etc overhead and variabilities. | |
# | |
# If the goal was to execute the loop once per fixed_interval in true time, | |
# the application needs to measure that overhead and compensate for it when | |
# calling sleep(). A DriftAdjuster does exactly that. | |
# | |
# For example: | |
# adj = DriftAdjuster(0.01) # interval = 10 msecs | |
# while True: | |
# ... do something here ... | |
# dt = adj.adjusted_dt() | |
# time.sleep(dt) | |
# | |
# In contrast to the fixed_interval code, the requested sleep() will now | |
# be for a variable amount of time as determined by adjusted_dt(). That | |
# adjustment is done using a PI controller (PID w/Kd=0) looking at the | |
# difference between the measured TOTAL elapsed time since the first interval | |
# and the expected total elapsed time. The controller attempts to gradually | |
# drive that difference to zero. | |
# | |
# The original use case for this was a tkintr simulation using .after() | |
# events and wanting them to run in near-accurate real time despite the | |
# graphics overhead. In any similar application with fairly consistent | |
# loop overhead the integral term in the PI controller will dynamically | |
# discover that overhead and quickly converge on nulling it out. | |
# | |
# However, if the overhead varies wildly and inconsistently from interval | |
# to interval, the DriftAdjuster idea may not be applicable at all. | |
# | |
# Between those extremes (it works well / it won't work at all) there may | |
# be some cases where it can work but needs a more elaborate PID controller. | |
# An application can supply an alternate pid object if so. | |
# | |
# Lastly, note that of course python is not a good language for implementing | |
# applications with "hard" real-time requirements. | |
# | |
import time | |
class DriftAdjuster: | |
# Gains. Determined ad-hoc, with a low-ish Kp to reduce oscillation | |
# behavior and let the integral do the work. | |
PID_Kp = 0.3 | |
PID_Ki = 0.12 # i.e., Ti = 2.5 (seconds) | |
PID_Kd = 0 # D term not necessary here | |
# The minimum dt that will ever be returned. | |
# Note that if using with tkintr, less than 2msec can't | |
# really be compensated very well because resolution is 1msec | |
MIN_INTERVAL = 0.002 # 2 msec, arbitrary | |
MAX_FACTOR = 3 # N x desired_interval longest delay | |
def __init__(self, desired_interval, /, *, pid=None, max_excursion=None): | |
# if no pid provided, use the built-in (DPID) with these gains | |
self.pid = pid or self.DPID(Kp=self.PID_Kp, | |
Ki=self.PID_Ki, | |
Kd=self.PID_Kd, | |
dt=desired_interval) | |
self.desired_interval = desired_interval | |
# If the application does not specify max_excursion, it defaults... | |
if max_excursion is None: | |
max_excursion = 10 * desired_interval # see _datafilter() | |
self.max_excursion = max_excursion | |
self.start() # users can also (re)start it themselves later | |
def start(self): | |
self.t0 = time.monotonic() | |
self.ticknum = 0 | |
self._prev_dt = self.desired_interval | |
def adjusted_dt(self): | |
"""Return appropriate dt (delay time) until next interval.""" | |
dt = self._adj() | |
self.previous_dt = dt | |
return min(max(dt, self.MIN_INTERVAL), | |
self.desired_interval * self.MAX_FACTOR) | |
def _adj(self): | |
drift = self._compute_drift() | |
# some situations are filtered out | |
x = self._datafilter(drift) | |
if x is not None: | |
return x | |
return self.desired_interval + self.pid.pid(drift) | |
def _compute_drift(self): | |
true_elapsed = time.monotonic() - self.t0 | |
expected_elapsed = self.ticknum * self.desired_interval | |
self.ticknum += 1 | |
return true_elapsed - expected_elapsed # note: positive is slow | |
# The datafilter prevents atypical delays from poisoning the control | |
# algorithm. Two things are filtered out here: | |
# * Startup. Nothing happens until there are "enough" data points. | |
# * Excursions beyond max_excursion (if it is non-zero) are ignored. | |
# | |
# Drifts exceeding max_excursion in either direction are ignored. | |
# Their time duration is "baked in" to an updated t0 so it is as if | |
# they never happened. Typically such excursions are either laptop | |
# sleep/resume or else something GUI related. For example: a window | |
# resizing driven by the human usually pauses an application's event | |
# loop; this can easily introduce a large one-time delay that really | |
# should not attempt to be compensated for in subsequent intervals. | |
# | |
def _datafilter(self, drift): | |
"""Returns a filter-forced dt value, or None.""" | |
if self.ticknum < 2: | |
return self.desired_interval | |
if self.max_excursion > 0 and abs(drift) > self.max_excursion: | |
self.t0 += drift | |
return self.previous_dt | |
return None | |
# The DriftAdjuster expects class attribute DPID to be a PID controller | |
# class following the PID/PID convention from | |
# https://github.com/outofmbufs/pidcontrol | |
# | |
# By default, __BuiltinDPID is the pid controller. | |
# SEE THE "DPID = __BuildingDPID" statement after this class definition. | |
# | |
# To use a different PID object, bash .DPID or override it in a subclass. | |
class __BuiltinDPID: | |
"""Built-in PID control for DriftAdjuster.""" | |
def __init__(self, /, *, Kp=0, Ki=0, Kd=0, dt=None): | |
"""Create a PID controller with the given parameters. | |
Kp, Ki, Kd -- gains for P/I/D control signals | |
""" | |
self.Kp = Kp | |
self.Ki = Ki | |
self.Kd = Kd | |
self.dt = dt | |
self.integration = 0 | |
self.previous_e = 0 | |
def pid(self, pv, dt=None): | |
"""Return the new commanded control value for given pv/dt. | |
NOTE: Because in this trivial _PID implementation there is no | |
setpoint, the pv is actually the error (i.e., difference in | |
pv from setpoint is just pv). | |
NOTE: POSITIVE pv means running slow (i.e., taking too long) | |
""" | |
if dt is not None: | |
self.dt = dt | |
# the setpoint is implicitly zero, so the error is: | |
e = -pv | |
# P (proportional) term | |
p = e | |
# I (integral) term | |
self.integration += (e * self.dt) | |
i = self.integration | |
# D (derivative) term | |
d = (self.previous_e - e) / self.dt | |
self.previous_e = e | |
u = (p * self.Kp) + (i * self.Ki) + (d * self.Kd) | |
return u | |
# To use a different underlying PID controller, either override this | |
# attribute in a subclass, or just provide the alternate controller | |
# when instantiating the DriftAdjuster (via pid=foo) | |
DPID = __BuiltinDPID | |
if __name__ == "__main__": | |
import unittest | |
import math | |
from pid import PIDPlus, PIDHistory | |
class TestMethods(unittest.TestCase): | |
# these simple tests could have been written as a loop over | |
# the underlying subclasses (and subTest()) but the advantage | |
# of doing it this way is to get "progress report" dots because | |
# each test takes about two seconds. | |
def test_simple1(self): | |
self._simpletest(DriftAdjuster) | |
def test_simple2(self): | |
# Same as simple1 but use a PIDPlus as the underlying control. | |
# This shows one simple way to do that, with a trivial modifier | |
# list (just PIDHIstory). | |
class SubKlass(DriftAdjuster): | |
def DPID(self, *args, **kwargs): | |
return PIDPlus(*args, modifiers=PIDHistory(), **kwargs) | |
self._simpletest(SubKlass) | |
def test_simple3(self): | |
# This is another way to do it, overriding __init__ instead | |
# of subclassing. It's a little less elegant but might seem | |
# more "obvious" of a way to do it. Also, in a real application | |
# there'd probably be no need to actually subclass; just build | |
# the pid and pass it directly into DriftAdjuster | |
class SubKlass(DriftAdjuster): | |
def __init__(self, desired_interval, *args, **kwargs): | |
z = PIDPlus(Kp=self.PID_Kp, | |
Ki=self.PID_Ki, | |
Kd=self.PID_Kd, | |
dt=desired_interval, | |
modifiers=PIDHistory()) | |
super().__init__(desired_interval, *args, pid=z, **kwargs) | |
self._simpletest(SubKlass) | |
def _simpletest(self, klass): | |
# the actual test logic for the test variations above | |
interval = 0.01 | |
z = klass(interval, max_excursion=0) # no excursion | |
n = 200 | |
t0 = time.time() | |
for i in range(n): | |
dt = z.adjusted_dt() | |
time.sleep(dt) | |
elapsed = time.time() - t0 | |
diff = abs(elapsed - (n * interval)) | |
# it should certainly be able to converge to less than | |
# one full interval of error over the entire loop | |
self.assertTrue(diff < interval) | |
unittest.main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment