Skip to content

Instantly share code, notes, and snippets.

@outofmbufs
Created August 30, 2024 21:48
Show Gist options
  • Save outofmbufs/c39201ff01189161eb37ba68a2c95a5f to your computer and use it in GitHub Desktop.
Save outofmbufs/c39201ff01189161eb37ba68a2c95a5f to your computer and use it in GitHub Desktop.
Dynamically calculate delay needed to execute code at a desired constant interval
# MIT License
#
# Copyright (c) 2024 Neil Webber
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
# A DriftAdjuster computes an adjusted delay for a desired/fixed repeating
# interval, compensating for overhead over a series of such intervals.
#
# For example, in a loop like this:
# fixed_interval = 0.01 # 10 msecs
# while True:
# ... do something here ...
# time.sleep(fixed_interval)
#
# If this loop executes N times, it will certainly take longer than
# N * fixed_interval
# seconds of real time, because of the overhead in the "do something here"
# code and other general system/python/etc overhead and variabilities.
#
# If the goal was to execute the loop once per fixed_interval in true time,
# the application needs to measure that overhead and compensate for it when
# calling sleep(). A DriftAdjuster does exactly that.
#
# For example:
# adj = DriftAdjuster(0.01) # interval = 10 msecs
# while True:
# ... do something here ...
# dt = adj.adjusted_dt()
# time.sleep(dt)
#
# In contrast to the fixed_interval code, the requested sleep() will now
# be for a variable amount of time as determined by adjusted_dt(). That
# adjustment is done using a PI controller (PID w/Kd=0) looking at the
# difference between the measured TOTAL elapsed time since the first interval
# and the expected total elapsed time. The controller attempts to gradually
# drive that difference to zero.
#
# The original use case for this was a tkintr simulation using .after()
# events and wanting them to run in near-accurate real time despite the
# graphics overhead. In any similar application with fairly consistent
# loop overhead the integral term in the PI controller will dynamically
# discover that overhead and quickly converge on nulling it out.
#
# However, if the overhead varies wildly and inconsistently from interval
# to interval, the DriftAdjuster idea may not be applicable at all.
#
# Between those extremes (it works well / it won't work at all) there may
# be some cases where it can work but needs a more elaborate PID controller.
# An application can supply an alternate pid object if so.
#
# Lastly, note that of course python is not a good language for implementing
# applications with "hard" real-time requirements.
#
import time
class DriftAdjuster:
# Gains. Determined ad-hoc, with a low-ish Kp to reduce oscillation
# behavior and let the integral do the work.
PID_Kp = 0.3
PID_Ki = 0.12 # i.e., Ti = 2.5 (seconds)
PID_Kd = 0 # D term not necessary here
# The minimum dt that will ever be returned.
# Note that if using with tkintr, less than 2msec can't
# really be compensated very well because resolution is 1msec
MIN_INTERVAL = 0.002 # 2 msec, arbitrary
MAX_FACTOR = 3 # N x desired_interval longest delay
def __init__(self, desired_interval, /, *, pid=None, max_excursion=None):
# if no pid provided, use the built-in (DPID) with these gains
self.pid = pid or self.DPID(Kp=self.PID_Kp,
Ki=self.PID_Ki,
Kd=self.PID_Kd,
dt=desired_interval)
self.desired_interval = desired_interval
# If the application does not specify max_excursion, it defaults...
if max_excursion is None:
max_excursion = 10 * desired_interval # see _datafilter()
self.max_excursion = max_excursion
self.start() # users can also (re)start it themselves later
def start(self):
self.t0 = time.monotonic()
self.ticknum = 0
self._prev_dt = self.desired_interval
def adjusted_dt(self):
"""Return appropriate dt (delay time) until next interval."""
dt = self._adj()
self.previous_dt = dt
return min(max(dt, self.MIN_INTERVAL),
self.desired_interval * self.MAX_FACTOR)
def _adj(self):
drift = self._compute_drift()
# some situations are filtered out
x = self._datafilter(drift)
if x is not None:
return x
return self.desired_interval + self.pid.pid(drift)
def _compute_drift(self):
true_elapsed = time.monotonic() - self.t0
expected_elapsed = self.ticknum * self.desired_interval
self.ticknum += 1
return true_elapsed - expected_elapsed # note: positive is slow
# The datafilter prevents atypical delays from poisoning the control
# algorithm. Two things are filtered out here:
# * Startup. Nothing happens until there are "enough" data points.
# * Excursions beyond max_excursion (if it is non-zero) are ignored.
#
# Drifts exceeding max_excursion in either direction are ignored.
# Their time duration is "baked in" to an updated t0 so it is as if
# they never happened. Typically such excursions are either laptop
# sleep/resume or else something GUI related. For example: a window
# resizing driven by the human usually pauses an application's event
# loop; this can easily introduce a large one-time delay that really
# should not attempt to be compensated for in subsequent intervals.
#
def _datafilter(self, drift):
"""Returns a filter-forced dt value, or None."""
if self.ticknum < 2:
return self.desired_interval
if self.max_excursion > 0 and abs(drift) > self.max_excursion:
self.t0 += drift
return self.previous_dt
return None
# The DriftAdjuster expects class attribute DPID to be a PID controller
# class following the PID/PID convention from
# https://github.com/outofmbufs/pidcontrol
#
# By default, __BuiltinDPID is the pid controller.
# SEE THE "DPID = __BuildingDPID" statement after this class definition.
#
# To use a different PID object, bash .DPID or override it in a subclass.
class __BuiltinDPID:
"""Built-in PID control for DriftAdjuster."""
def __init__(self, /, *, Kp=0, Ki=0, Kd=0, dt=None):
"""Create a PID controller with the given parameters.
Kp, Ki, Kd -- gains for P/I/D control signals
"""
self.Kp = Kp
self.Ki = Ki
self.Kd = Kd
self.dt = dt
self.integration = 0
self.previous_e = 0
def pid(self, pv, dt=None):
"""Return the new commanded control value for given pv/dt.
NOTE: Because in this trivial _PID implementation there is no
setpoint, the pv is actually the error (i.e., difference in
pv from setpoint is just pv).
NOTE: POSITIVE pv means running slow (i.e., taking too long)
"""
if dt is not None:
self.dt = dt
# the setpoint is implicitly zero, so the error is:
e = -pv
# P (proportional) term
p = e
# I (integral) term
self.integration += (e * self.dt)
i = self.integration
# D (derivative) term
d = (self.previous_e - e) / self.dt
self.previous_e = e
u = (p * self.Kp) + (i * self.Ki) + (d * self.Kd)
return u
# To use a different underlying PID controller, either override this
# attribute in a subclass, or just provide the alternate controller
# when instantiating the DriftAdjuster (via pid=foo)
DPID = __BuiltinDPID
if __name__ == "__main__":
import unittest
import math
from pid import PIDPlus, PIDHistory
class TestMethods(unittest.TestCase):
# these simple tests could have been written as a loop over
# the underlying subclasses (and subTest()) but the advantage
# of doing it this way is to get "progress report" dots because
# each test takes about two seconds.
def test_simple1(self):
self._simpletest(DriftAdjuster)
def test_simple2(self):
# Same as simple1 but use a PIDPlus as the underlying control.
# This shows one simple way to do that, with a trivial modifier
# list (just PIDHIstory).
class SubKlass(DriftAdjuster):
def DPID(self, *args, **kwargs):
return PIDPlus(*args, modifiers=PIDHistory(), **kwargs)
self._simpletest(SubKlass)
def test_simple3(self):
# This is another way to do it, overriding __init__ instead
# of subclassing. It's a little less elegant but might seem
# more "obvious" of a way to do it. Also, in a real application
# there'd probably be no need to actually subclass; just build
# the pid and pass it directly into DriftAdjuster
class SubKlass(DriftAdjuster):
def __init__(self, desired_interval, *args, **kwargs):
z = PIDPlus(Kp=self.PID_Kp,
Ki=self.PID_Ki,
Kd=self.PID_Kd,
dt=desired_interval,
modifiers=PIDHistory())
super().__init__(desired_interval, *args, pid=z, **kwargs)
self._simpletest(SubKlass)
def _simpletest(self, klass):
# the actual test logic for the test variations above
interval = 0.01
z = klass(interval, max_excursion=0) # no excursion
n = 200
t0 = time.time()
for i in range(n):
dt = z.adjusted_dt()
time.sleep(dt)
elapsed = time.time() - t0
diff = abs(elapsed - (n * interval))
# it should certainly be able to converge to less than
# one full interval of error over the entire loop
self.assertTrue(diff < interval)
unittest.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment