Skip to content

Instantly share code, notes, and snippets.

@denis-bz
Last active August 24, 2025 15:26
Show Gist options
  • Save denis-bz/1412ad31c3a6de5787859c7af520744a to your computer and use it in GitHub Desktop.
Save denis-bz/1412ad31c3a6de5787859c7af520744a to your computer and use it in GitHub Desktop.
Callback to track and stop scipy solve_ivp 24 Aug 2025

Callback to track and stop scipy solve_ivp

Keywords: scipy solve_ivp callback ODE ordinary-differential-equation IVP initial-value-problem python

Purpose

add a callback function to the scipy ODE solver solve_ivp, so that you can

  • print progress at each timestep
  • tell the solver to stop if h is too small, or KeyboardInterrupt, or ... and return the sol so far.

See solve_ivp Runge-Kutta .
This is a minimal implementation, a bird in the hand.

Description

Solve_ivp has one new parameter:

callback : callable, or default None. Called after each solver step with one argument, the locals() of _step_impl; this is a dict containing the current t y h ... If the callback returns a string such as "stop from callback ...", then solve_ivp stops, and returns sol with sol.t sol.y ... up to that point.

Why callbacks ?

Many old programs, including solve_ivp , run blind: you start it running, wait until the end, then look at sol to see what happened. Then you can vary parameters y0 atol rtol etc., run again ... pinball. Print statements inside your derivative function F(t, Y) may help (but RK calls this at intermediate steps too). And you can hit ^C to kill a runaway, but then it's dead.

Callbacks are a minimal way of seeing what a solver is doing, with good old print statements, and ^C from the keyboard to return the sol so far.

If debugging is the process of removing software bugs, then programming must be the process of putting them in.

The files in this gist

rk-callback.py : scipy rk.py (RK = Runge-Kutta) + a 10-line patch for callback= radau-callback.py : radau.py for callback0.py : a simple example callback, solve_ivp( ... callback=this )
test-callback.py : a testcase, e^(e^y) to drive h -> 0

3 utility functions, useful for callbacks --
callevery.py : a filter to call e.g. once a second and every 100 th call
kbint.py : keyboardinterrupt()
timer.py : seconds since the previous call

Possibly a logfile *.log and a plot like test-callback-Radau

How it works

rk.py and radau.py are patched to callback( locals() ). The callback unpacks this, and can stop the run, like this:

def callback0( _locals ) -> "stop msg" or None:
    t = _locals["t"]
    y = _locals["y"]
    h = _locals["h"]
    hnew = _locals["h_abs"]
	print ...
	if hnew < 1e-8:
        return "stop from callback: hnew %.3g is too small  at t %g" % (hnew, t)

Pros and cons of this approach: \

  • simple: a 10-line patch.
    Won't break anything; several people can check it; low maintenace.
  • you have to know what's in locals(), here t y h hnew.
    That's easy, and better in a separate wiki than in mainline scipy.

Man-in-the-loop, feedback from a solver or optimizer while it's running, is far better than NO feedback.

How to install rk.py and radau.py with callbacks

First install a copy of scipy in a virtual environment, see venv . Then overwrite the files rk.py and radau.py there, as follows:

  1. find rk.py on your computer:
from scipy.integrate import _ivp
print( _ivp.__path__ )
  1. In that directory, make a backup of scipy's rk.py, then copy in the one with callback:
rk=rk  # or radau
cd .../lib/python3.xx/site-packages/scipy/integrate/_ivp  # from _ivp.__path__
mv $rk.py $rk-nocallback.py
cp -p .../$rk-callback.py $rk.py
	# diff $rk.py $rk-nocallback.py -- 10 lines
  1. run a little testcase with sol = solve_ivp( ... callback=mycallback ), such as test-callback.py .

Notes

For Stiff equations, tracking / displaying Nd Jacobians is for people with experience and good stiff test cases, not me.

Pilots who have the right stuff enjoy flying blind.

See also

500 scicomp.stack questions/tagged/ode
scipy/scipy#21741 from January 2025
extensisq : a dozen scipy-compatible solvers (but no callbacks)

Comments welcome

cheers
— denis-bz-py t-online.de 2025-08-24 August

# from: test-callback.py plot=2
# 24 Aug 2025 13:57z ~bz ode/scipy-ivp/callback
▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄
test-callback.py func=e^(e^y) method=Radau y0=[1] tmax=1 dt=0 atol=1e-06 rtol=0.001
t 0 1 h 0.00591 0.00591 y 1
t 0.0059087 1 h 0.00458 0.00458 y 1.1031
t 0.01049 1 h 0.000958 0.000958 y 1.2134
t 0.011448 1 h 0.00179 0.00179 y 1.2425
t 0.01324 1 h 0.00179 0.00179 y 1.3065
t 0.017864 1 h 0.000413 0.000413 y 1.6557
t 0.018277 1 h 0.000206 0.000206 y 1.7564
t 0.018483 1 h 0.000103 0.000103 y 1.8429
t 0.018587 1 h 2.06e-05 2.06e-05 y 1.914
t 0.018607 1 h 3.71e-05 3.71e-05 y 1.9333
t 0.018644 1 h 3.71e-05 3.71e-05 y 1.9765
t 0.018681 1 h 9.55e-06 9.55e-06 y 2.0396
t 0.018717 1 h 3.32e-06 3.32e-06 y 2.161
t 0.01872 1 h 7.36e-06 7.36e-06 y 2.1824
t 0.018728 1 h 3.68e-06 3.68e-06 y 2.2532
t 0.018733 1 h 4.3e-07 4.3e-07 y 2.3909
t 0.018734 1 h 4.3e-07 4.3e-07 y 2.4185
t 0.018735 1 h 3.62e-08 3.62e-08 y 2.5392
t 0.018735 1 h 1.01e-07 1.01e-07 y 2.5516
t 0.018735 1 h 5.05e-08 5.05e-08 y 2.6033
t 0.018735 1 h 5.65e-09 5.65e-09 y 2.7107
test-callback.py func=e^(e^y) method=Radau y0=[1] tmax=1 dt=0 atol=1e-06 rtol=0.001
Nt 27 nfev 368 njev 16
y y':
[[1 1.1 1.2 1.2 1.3 ... 2.5 2.6 2.6 2.7 2.7]
[14 21 29 32 41 ... 3.3e+05 3.9e+05 8.6e+05 1.8e+06 2.6e+06]]
T ΔT:
[0 0.0059 0.01 0.011 0.013 ... 0.019 0.019 0.019 0.019 0.019]
[0.0059 0.0046 0.00096 0.0018 0.0018 ... 4.3e-07 3.6e-08 1e-07 5e-08 2.5e-08]
sol.message: stop from callback: hnew 5.65e-09 is too small at t 0.0187347
> test-callback-Radau.png
#!/usr/bin/env python3
""" example callback0.py: solve_ivp( callback=callback0 or Callbackevery( callback0 )) """
import numpy as np
from kbint import keyboardinterrupt
def callback0( _locals ) -> "stop msg" or None:
""" example callback:
$scipy/integrate/_ivp/rk.py _step_impl with callback=
callback( locals() )
-> here -> "stop msg" or None
"""
t = _locals["t"]
y = _locals["y"]
h = _locals["h"]
hnew = _locals["h_abs"]
stepok = _locals["step_accepted"] # too big or too small: try again
# rkself = _locals["self"]
# nfev = rkself.nfev
print( "t %-8.5g %d h %-6.3g %-6.3g y %s " % (
t, stepok, h, hnew, _astr( y )))
if hnew < 1e-8:
return "stop from callback: hnew %.3g is too small at t %g" % (hnew, t)
maxnorm = np.linalg.norm( y, ord=np.inf )
if maxnorm > 1e10 or np.isnan( maxnorm ):
return "stop from callback: |y| %.3g is too big at t %g" % (maxnorm, t)
if keyboardinterrupt():
return "stop, KeyboardInterrupt at t %g" % t
return None
def _astr( x: "ndarray" ) -> "%g / %g %g / min av max":
x = np.asarray( x ).squeeze()
if x.size <= 1:
return "%.5g" % x.item()
elif x.shape == (2,):
return "%10.5g %10.5g" % (x[0], x[1])
else:
return _minavmax( x )
def _minavmax( x ):
x = np.asarray( x )
return "_-¯ %10.4g %10.4g %10.4g" % ( # _-¯ a glyph
x.min(), x.mean(), x.max() )
#!/usr/bin/env python3
""" wrap a func to call it say once a second and every 100 th call """
import numpy as np
from timer import Timer # eg once a second
#...............................................................................
class Callevery( object ):
""" wrap a func to call it say once a second and every 100 th call """
def __init__( s, func, seconds=1, every=100 ):
s.func = func
s.seconds = seconds # may be 0
s.every = int( every )
s.ncall = 0
s.timer = Timer( s.seconds ) # timer.t0 = time.time() wallclock
def __call__( s, **kw ) -> "stop msg" or None:
s.ncall += 1
sec = s.timer() # time.time() - t0
if (sec > 0
or (s.every > 0) and (s.ncall % s.every == 0)):
return s.func( **kw ) # kw args only, else use partial
else:
return None
#!/usr/bin/env python3
""" keyboardinterrupt(): ^C, signal.SIGINT """
import signal
_kbint = None # global, None -> False -> True
def keyboardinterrupt() -> "True after ^C == signal.SIGINT":
global _kbint
if _kbint is None:
_kbint = False
signal.signal( signal.SIGINT, handle_sigint ) # man 3 signal
return _kbint
def handle_sigint( signo, frame ) -> "global _kbint":
global _kbint
_kbint = True
# traceback.print_stack( limit=5 )
# pdb.set_trace()
if __name__ == "__main__":
import time
for t in range( 10 ):
time.sleep( 1 )
print( t )
if keyboardinterrupt():
print( "keyboardinterrupt" )
break
# radau-callback.py: scipy/integrate/_ivp/radau.py + callback 24aug #bz
import numpy as np
from scipy.linalg import lu_factor, lu_solve
from scipy.sparse import csc_matrix, issparse, eye
from scipy.sparse.linalg import splu
from scipy.optimize._numdiff import group_columns
from .common import (validate_max_step, validate_tol, select_initial_step,
norm, num_jac, EPS, warn_extraneous,
validate_first_step)
from .base import OdeSolver, DenseOutput
S6 = 6 ** 0.5
# Butcher tableau. A is not used directly, see below.
C = np.array([(4 - S6) / 10, (4 + S6) / 10, 1])
E = np.array([-13 - 7 * S6, -13 + 7 * S6, -1]) / 3
# Eigendecomposition of A is done: A = T L T**-1. There is 1 real eigenvalue
# and a complex conjugate pair. They are written below.
MU_REAL = 3 + 3 ** (2 / 3) - 3 ** (1 / 3)
MU_COMPLEX = (3 + 0.5 * (3 ** (1 / 3) - 3 ** (2 / 3))
- 0.5j * (3 ** (5 / 6) + 3 ** (7 / 6)))
# These are transformation matrices.
T = np.array([
[0.09443876248897524, -0.14125529502095421, 0.03002919410514742],
[0.25021312296533332, 0.20412935229379994, -0.38294211275726192],
[1, 1, 0]])
TI = np.array([
[4.17871859155190428, 0.32768282076106237, 0.52337644549944951],
[-4.17871859155190428, -0.32768282076106237, 0.47662355450055044],
[0.50287263494578682, -2.57192694985560522, 0.59603920482822492]])
# These linear combinations are used in the algorithm.
TI_REAL = TI[0]
TI_COMPLEX = TI[1] + 1j * TI[2]
# Interpolator coefficients.
P = np.array([
[13/3 + 7*S6/3, -23/3 - 22*S6/3, 10/3 + 5 * S6],
[13/3 - 7*S6/3, -23/3 + 22*S6/3, 10/3 - 5 * S6],
[1/3, -8/3, 10/3]])
NEWTON_MAXITER = 6 # Maximum number of Newton iterations.
MIN_FACTOR = 0.2 # Minimum allowed decrease in a step size.
MAX_FACTOR = 10 # Maximum allowed increase in a step size.
def solve_collocation_system(fun, t, y, h, Z0, scale, tol,
LU_real, LU_complex, solve_lu):
"""Solve the collocation system.
Parameters
----------
fun : callable
Right-hand side of the system.
t : float
Current time.
y : ndarray, shape (n,)
Current state.
h : float
Step to try.
Z0 : ndarray, shape (3, n)
Initial guess for the solution. It determines new values of `y` at
``t + h * C`` as ``y + Z0``, where ``C`` is the Radau method constants.
scale : ndarray, shape (n)
Problem tolerance scale, i.e. ``rtol * abs(y) + atol``.
tol : float
Tolerance to which solve the system. This value is compared with
the normalized by `scale` error.
LU_real, LU_complex
LU decompositions of the system Jacobians.
solve_lu : callable
Callable which solves a linear system given a LU decomposition. The
signature is ``solve_lu(LU, b)``.
Returns
-------
converged : bool
Whether iterations converged.
n_iter : int
Number of completed iterations.
Z : ndarray, shape (3, n)
Found solution.
rate : float
The rate of convergence.
"""
n = y.shape[0]
M_real = MU_REAL / h
M_complex = MU_COMPLEX / h
W = TI.dot(Z0)
Z = Z0
F = np.empty((3, n))
ch = h * C
dW_norm_old = None
dW = np.empty_like(W)
converged = False
rate = None
for k in range(NEWTON_MAXITER):
for i in range(3):
F[i] = fun(t + ch[i], y + Z[i])
if not np.all(np.isfinite(F)):
break
f_real = F.T.dot(TI_REAL) - M_real * W[0]
f_complex = F.T.dot(TI_COMPLEX) - M_complex * (W[1] + 1j * W[2])
dW_real = solve_lu(LU_real, f_real)
dW_complex = solve_lu(LU_complex, f_complex)
dW[0] = dW_real
dW[1] = dW_complex.real
dW[2] = dW_complex.imag
dW_norm = norm(dW / scale)
if dW_norm_old is not None:
rate = dW_norm / dW_norm_old
if (rate is not None and (rate >= 1 or
rate ** (NEWTON_MAXITER - k) / (1 - rate) * dW_norm > tol)):
break
W += dW
Z = T.dot(W)
if (dW_norm == 0 or
rate is not None and rate / (1 - rate) * dW_norm < tol):
converged = True
break
dW_norm_old = dW_norm
return converged, k + 1, Z, rate
def predict_factor(h_abs, h_abs_old, error_norm, error_norm_old):
"""Predict by which factor to increase/decrease the step size.
The algorithm is described in [1]_.
Parameters
----------
h_abs, h_abs_old : float
Current and previous values of the step size, `h_abs_old` can be None
(see Notes).
error_norm, error_norm_old : float
Current and previous values of the error norm, `error_norm_old` can
be None (see Notes).
Returns
-------
factor : float
Predicted factor.
Notes
-----
If `h_abs_old` and `error_norm_old` are both not None then a two-step
algorithm is used, otherwise a one-step algorithm is used.
References
----------
.. [1] E. Hairer, S. P. Norsett G. Wanner, "Solving Ordinary Differential
Equations II: Stiff and Differential-Algebraic Problems", Sec. IV.8.
"""
if error_norm_old is None or h_abs_old is None or error_norm == 0:
multiplier = 1
else:
multiplier = h_abs / h_abs_old * (error_norm_old / error_norm) ** 0.25
with np.errstate(divide='ignore'):
factor = min(1, multiplier) * error_norm ** -0.25
return factor
class Radau(OdeSolver):
"""Implicit Runge-Kutta method of Radau IIA family of order 5.
The implementation follows [1]_. The error is controlled with a
third-order accurate embedded formula. A cubic polynomial which satisfies
the collocation conditions is used for the dense output.
Parameters
----------
fun : callable
Right-hand side of the system: the time derivative of the state ``y``
at time ``t``. The calling signature is ``fun(t, y)``, where ``t`` is a
scalar and ``y`` is an ndarray with ``len(y) = len(y0)``. ``fun`` must
return an array of the same shape as ``y``. See `vectorized` for more
information.
t0 : float
Initial time.
y0 : array_like, shape (n,)
Initial state.
t_bound : float
Boundary time - the integration won't continue beyond it. It also
determines the direction of the integration.
first_step : float or None, optional
Initial step size. Default is ``None`` which means that the algorithm
should choose.
max_step : float, optional
Maximum allowed step size. Default is np.inf, i.e., the step size is not
bounded and determined solely by the solver.
rtol, atol : float and array_like, optional
Relative and absolute tolerances. The solver keeps the local error
estimates less than ``atol + rtol * abs(y)``. HHere `rtol` controls a
relative accuracy (number of correct digits), while `atol` controls
absolute accuracy (number of correct decimal places). To achieve the
desired `rtol`, set `atol` to be smaller than the smallest value that
can be expected from ``rtol * abs(y)`` so that `rtol` dominates the
allowable error. If `atol` is larger than ``rtol * abs(y)`` the
number of correct digits is not guaranteed. Conversely, to achieve the
desired `atol` set `rtol` such that ``rtol * abs(y)`` is always smaller
than `atol`. If components of y have different scales, it might be
beneficial to set different `atol` values for different components by
passing array_like with shape (n,) for `atol`. Default values are
1e-3 for `rtol` and 1e-6 for `atol`.
jac : {None, array_like, sparse_matrix, callable}, optional
Jacobian matrix of the right-hand side of the system with respect to
y, required by this method. The Jacobian matrix has shape (n, n) and
its element (i, j) is equal to ``d f_i / d y_j``.
There are three ways to define the Jacobian:
* If array_like or sparse_matrix, the Jacobian is assumed to
be constant.
* If callable, the Jacobian is assumed to depend on both
t and y; it will be called as ``jac(t, y)`` as necessary.
For the 'Radau' and 'BDF' methods, the return value might be a
sparse matrix.
* If None (default), the Jacobian will be approximated by
finite differences.
It is generally recommended to provide the Jacobian rather than
relying on a finite-difference approximation.
jac_sparsity : {None, array_like, sparse matrix}, optional
Defines a sparsity structure of the Jacobian matrix for a
finite-difference approximation. Its shape must be (n, n). This argument
is ignored if `jac` is not `None`. If the Jacobian has only few non-zero
elements in *each* row, providing the sparsity structure will greatly
speed up the computations [2]_. A zero entry means that a corresponding
element in the Jacobian is always zero. If None (default), the Jacobian
is assumed to be dense.
vectorized : bool, optional
Whether `fun` can be called in a vectorized fashion. Default is False.
If ``vectorized`` is False, `fun` will always be called with ``y`` of
shape ``(n,)``, where ``n = len(y0)``.
If ``vectorized`` is True, `fun` may be called with ``y`` of shape
``(n, k)``, where ``k`` is an integer. In this case, `fun` must behave
such that ``fun(t, y)[:, i] == fun(t, y[:, i])`` (i.e. each column of
the returned array is the time derivative of the state corresponding
with a column of ``y``).
Setting ``vectorized=True`` allows for faster finite difference
approximation of the Jacobian by this method, but may result in slower
execution overall in some circumstances (e.g. small ``len(y0)``).
Attributes
----------
n : int
Number of equations.
status : string
Current status of the solver: 'running', 'finished' or 'failed'.
t_bound : float
Boundary time.
direction : float
Integration direction: +1 or -1.
t : float
Current time.
y : ndarray
Current state.
t_old : float
Previous time. None if no steps were made yet.
step_size : float
Size of the last successful step. None if no steps were made yet.
nfev : int
Number of evaluations of the right-hand side.
njev : int
Number of evaluations of the Jacobian.
nlu : int
Number of LU decompositions.
References
----------
.. [1] E. Hairer, G. Wanner, "Solving Ordinary Differential Equations II:
Stiff and Differential-Algebraic Problems", Sec. IV.8.
.. [2] A. Curtis, M. J. D. Powell, and J. Reid, "On the estimation of
sparse Jacobian matrices", Journal of the Institute of Mathematics
and its Applications, 13, pp. 117-120, 1974.
"""
def __init__(self, fun, t0, y0, t_bound, max_step=np.inf,
rtol=1e-3, atol=1e-6, jac=None, jac_sparsity=None,
vectorized=False, first_step=None, callback=None, **extraneous): #bz
warn_extraneous(extraneous)
super().__init__(fun, t0, y0, t_bound, vectorized)
self.y_old = None
self.max_step = validate_max_step(max_step)
self.rtol, self.atol = validate_tol(rtol, atol, self.n)
self.f = self.fun(self.t, self.y)
# Select initial step assuming the same order which is used to control
# the error.
if first_step is None:
self.h_abs = select_initial_step(
self.fun, self.t, self.y, t_bound, max_step, self.f, self.direction,
3, self.rtol, self.atol)
else:
self.h_abs = validate_first_step(first_step, t0, t_bound)
self.h_abs_old = None
self.error_norm_old = None
self.newton_tol = max(10 * EPS / rtol, min(0.03, rtol ** 0.5))
self.sol = None
self.jac_factor = None
self.jac, self.J = self._validate_jac(jac, jac_sparsity)
if issparse(self.J):
def lu(A):
self.nlu += 1
return splu(A)
def solve_lu(LU, b):
return LU.solve(b)
I = eye(self.n, format='csc')
else:
def lu(A):
self.nlu += 1
return lu_factor(A, overwrite_a=True)
def solve_lu(LU, b):
return lu_solve(LU, b, overwrite_b=True)
I = np.identity(self.n)
self.lu = lu
self.solve_lu = solve_lu
self.I = I
self.current_jac = True
self.LU_real = None
self.LU_complex = None
self.Z = None
self.callback = callback #bz
def _validate_jac(self, jac, sparsity):
t0 = self.t
y0 = self.y
if jac is None:
if sparsity is not None:
if issparse(sparsity):
sparsity = csc_matrix(sparsity)
groups = group_columns(sparsity)
sparsity = (sparsity, groups)
def jac_wrapped(t, y, f):
self.njev += 1
J, self.jac_factor = num_jac(self.fun_vectorized, t, y, f,
self.atol, self.jac_factor,
sparsity)
return J
J = jac_wrapped(t0, y0, self.f)
elif callable(jac):
J = jac(t0, y0)
self.njev = 1
if issparse(J):
J = csc_matrix(J)
def jac_wrapped(t, y, _=None):
self.njev += 1
return csc_matrix(jac(t, y), dtype=float)
else:
J = np.asarray(J, dtype=float)
def jac_wrapped(t, y, _=None):
self.njev += 1
return np.asarray(jac(t, y), dtype=float)
if J.shape != (self.n, self.n):
raise ValueError(f"`jac` is expected to have shape {(self.n, self.n)},"
f" but actually has {J.shape}.")
else:
if issparse(jac):
J = csc_matrix(jac)
else:
J = np.asarray(jac, dtype=float)
if J.shape != (self.n, self.n):
raise ValueError(f"`jac` is expected to have shape {(self.n, self.n)},"
f" but actually has {J.shape}.")
jac_wrapped = None
return jac_wrapped, J
def _step_impl(self):
t = self.t
y = self.y
f = self.f
max_step = self.max_step
atol = self.atol
rtol = self.rtol
min_step = 10 * np.abs(np.nextafter(t, self.direction * np.inf) - t)
if self.h_abs > max_step:
h_abs = max_step
h_abs_old = None
error_norm_old = None
elif self.h_abs < min_step:
h_abs = min_step
h_abs_old = None
error_norm_old = None
else:
h_abs = self.h_abs
h_abs_old = self.h_abs_old
error_norm_old = self.error_norm_old
J = self.J
LU_real = self.LU_real
LU_complex = self.LU_complex
current_jac = self.current_jac
jac = self.jac
rejected = False
step_accepted = False
message = None
while not step_accepted:
if h_abs < min_step:
return False, self.TOO_SMALL_STEP
h = h_abs * self.direction
t_new = t + h
if self.direction * (t_new - self.t_bound) > 0:
t_new = self.t_bound
h = t_new - t
h_abs = np.abs(h)
if self.sol is None:
Z0 = np.zeros((3, y.shape[0]))
else:
Z0 = self.sol(t + h * C).T - y
scale = atol + np.abs(y) * rtol
converged = False
while not converged:
if LU_real is None or LU_complex is None:
LU_real = self.lu(MU_REAL / h * self.I - J)
LU_complex = self.lu(MU_COMPLEX / h * self.I - J)
converged, n_iter, Z, rate = solve_collocation_system(
self.fun, t, y, h, Z0, scale, self.newton_tol,
LU_real, LU_complex, self.solve_lu)
if not converged:
if current_jac:
break
J = self.jac(t, y, f)
current_jac = True
LU_real = None
LU_complex = None
if not converged:
h_abs *= 0.5
LU_real = None
LU_complex = None
continue
y_new = y + Z[-1]
ZE = Z.T.dot(E) / h
error = self.solve_lu(LU_real, f + ZE)
scale = atol + np.maximum(np.abs(y), np.abs(y_new)) * rtol
error_norm = norm(error / scale)
safety = 0.9 * (2 * NEWTON_MAXITER + 1) / (2 * NEWTON_MAXITER
+ n_iter)
if rejected and error_norm > 1:
error = self.solve_lu(LU_real, self.fun(t, y + error) + ZE)
error_norm = norm(error / scale)
if error_norm > 1:
factor = predict_factor(h_abs, h_abs_old,
error_norm, error_norm_old)
h_abs *= max(MIN_FACTOR, safety * factor)
LU_real = None
LU_complex = None
rejected = True
else:
step_accepted = True
recompute_jac = jac is not None and n_iter > 2 and rate > 1e-3
factor = predict_factor(h_abs, h_abs_old, error_norm, error_norm_old)
factor = min(MAX_FACTOR, safety * factor)
if not recompute_jac and factor < 1.2:
factor = 1
else:
LU_real = None
LU_complex = None
f_new = self.fun(t_new, y_new)
if recompute_jac:
J = jac(t_new, y_new, f_new)
current_jac = True
elif jac is not None:
current_jac = False
#...............................................................................
if self.callback is not None: #bz
stopmsg = self.callback( locals() ) # the lot, t y ... #bz
if stopmsg: #bz
if not isinstance( stopmsg, str ): #bz
stopmsg = "stopped by user callback from Radau" #bz
return False, stopmsg # end solve_ivp, return sol up to now #bz
self.h_abs_old = self.h_abs
self.error_norm_old = error_norm
self.h_abs = h_abs * factor
self.y_old = y
self.t = t_new
self.y = y_new
self.f = f_new
self.Z = Z
self.LU_real = LU_real
self.LU_complex = LU_complex
self.current_jac = current_jac
self.J = J
self.t_old = t
self.sol = self._compute_dense_output()
return step_accepted, message
def _compute_dense_output(self):
Q = np.dot(self.Z.T, P)
return RadauDenseOutput(self.t_old, self.t, self.y_old, Q)
def _dense_output_impl(self):
return self.sol
class RadauDenseOutput(DenseOutput):
def __init__(self, t_old, t, y_old, Q):
super().__init__(t_old, t)
self.h = t - t_old
self.Q = Q
self.order = Q.shape[1] - 1
self.y_old = y_old
def _call_impl(self, t):
x = (t - self.t_old) / self.h
if t.ndim == 0:
p = np.tile(x, self.order + 1)
p = np.cumprod(p)
else:
p = np.tile(x, (self.order + 1, 1))
p = np.cumprod(p, axis=0)
# Here we don't multiply by h, not a mistake.
y = np.dot(self.Q, p)
if y.ndim == 2:
y += self.y_old[:, None]
else:
y += self.y_old
return y
# rk.py: class RungeKutta + callback 16jul #bz
import numpy as np
from .base import OdeSolver, DenseOutput
from .common import (validate_max_step, validate_tol, select_initial_step,
norm, warn_extraneous, validate_first_step)
from . import dop853_coefficients
# Multiply steps computed from asymptotic behaviour of errors by this.
SAFETY = 0.9
MIN_FACTOR = 0.2 # Minimum allowed decrease in a step size.
MAX_FACTOR = 10 # Maximum allowed increase in a step size.
def rk_step(fun, t, y, f, h, A, B, C, K):
"""Perform a single Runge-Kutta step.
This function computes a prediction of an explicit Runge-Kutta method and
also estimates the error of a less accurate method.
Notation for Butcher tableau is as in [1]_.
Parameters
----------
fun : callable
Right-hand side of the system.
t : float
Current time.
y : ndarray, shape (n,)
Current state.
f : ndarray, shape (n,)
Current value of the derivative, i.e., ``fun(x, y)``.
h : float
Step to use.
A : ndarray, shape (n_stages, n_stages)
Coefficients for combining previous RK stages to compute the next
stage. For explicit methods the coefficients at and above the main
diagonal are zeros.
B : ndarray, shape (n_stages,)
Coefficients for combining RK stages for computing the final
prediction.
C : ndarray, shape (n_stages,)
Coefficients for incrementing time for consecutive RK stages.
The value for the first stage is always zero.
K : ndarray, shape (n_stages + 1, n)
Storage array for putting RK stages here. Stages are stored in rows.
The last row is a linear combination of the previous rows with
coefficients
Returns
-------
y_new : ndarray, shape (n,)
Solution at t + h computed with a higher accuracy.
f_new : ndarray, shape (n,)
Derivative ``fun(t + h, y_new)``.
References
----------
.. [1] E. Hairer, S. P. Norsett G. Wanner, "Solving Ordinary Differential
Equations I: Nonstiff Problems", Sec. II.4.
"""
K[0] = f
for s, (a, c) in enumerate(zip(A[1:], C[1:]), start=1):
dy = np.dot(K[:s].T, a[:s]) * h
K[s] = fun(t + c * h, y + dy)
y_new = y + h * np.dot(K[:-1].T, B)
f_new = fun(t + h, y_new)
K[-1] = f_new
return y_new, f_new
class RungeKutta(OdeSolver):
"""Base class for explicit Runge-Kutta methods."""
C: np.ndarray = NotImplemented
A: np.ndarray = NotImplemented
B: np.ndarray = NotImplemented
E: np.ndarray = NotImplemented
P: np.ndarray = NotImplemented
order: int = NotImplemented
error_estimator_order: int = NotImplemented
n_stages: int = NotImplemented
def __init__(self, fun, t0, y0, t_bound, max_step=np.inf,
rtol=1e-3, atol=1e-6, vectorized=False,
first_step=None, callback=None, **extraneous): #bz
warn_extraneous(extraneous)
super().__init__(fun, t0, y0, t_bound, vectorized,
support_complex=True)
self.y_old = None
self.max_step = validate_max_step(max_step)
self.rtol, self.atol = validate_tol(rtol, atol, self.n)
self.f = self.fun(self.t, self.y)
if first_step is None:
self.h_abs = select_initial_step(
self.fun, self.t, self.y, t_bound, max_step, self.f, self.direction,
self.error_estimator_order, self.rtol, self.atol)
else:
self.h_abs = validate_first_step(first_step, t0, t_bound)
self.K = np.empty((self.n_stages + 1, self.n), dtype=self.y.dtype)
self.error_exponent = -1 / (self.error_estimator_order + 1)
self.h_previous = None
self.callback = callback #bz
def _estimate_error(self, K, h):
return np.dot(K.T, self.E) * h
def _estimate_error_norm(self, K, h, scale):
return norm(self._estimate_error(K, h) / scale)
def _step_impl(self):
t = self.t
y = self.y
max_step = self.max_step
rtol = self.rtol
atol = self.atol
min_step = 10 * np.abs(np.nextafter(t, self.direction * np.inf) - t)
if self.h_abs > max_step:
h_abs = max_step
elif self.h_abs < min_step:
h_abs = min_step
else:
h_abs = self.h_abs
step_accepted = False
step_rejected = False
while not step_accepted:
if h_abs < min_step:
return False, self.TOO_SMALL_STEP
h = h_abs * self.direction
t_new = t + h
if self.direction * (t_new - self.t_bound) > 0:
t_new = self.t_bound
h = t_new - t
h_abs = np.abs(h)
y_new, f_new = rk_step(self.fun, t, y, self.f, h, self.A,
self.B, self.C, self.K)
scale = atol + np.maximum(np.abs(y), np.abs(y_new)) * rtol
error_norm = self._estimate_error_norm(self.K, h, scale)
if error_norm < 1:
if error_norm == 0:
factor = MAX_FACTOR
else:
factor = min(MAX_FACTOR,
SAFETY * error_norm ** self.error_exponent)
if step_rejected:
factor = min(1, factor)
h_abs *= factor
step_accepted = True
else:
h_abs *= max(MIN_FACTOR,
SAFETY * error_norm ** self.error_exponent)
step_rejected = True
#...............................................................................
if self.callback is not None: #bz
stopmsg = self.callback( locals() ) # the lot, t y ... #bz
if stopmsg: #bz
if not isinstance( stopmsg, str ): #bz
stopmsg = "stopped by user callback from RungeKutta" #bz
return False, stopmsg # end solve_ivp, return sol up to now #bz
self.h_previous = h
self.y_old = y
self.t = t_new
self.y = y_new
self.h_abs = h_abs
self.f = f_new
return True, None
def _dense_output_impl(self):
Q = self.K.T.dot(self.P)
return RkDenseOutput(self.t_old, self.t, self.y_old, Q)
class RK23(RungeKutta):
"""Explicit Runge-Kutta method of order 3(2).
This uses the Bogacki-Shampine pair of formulas [1]_. The error is controlled
assuming accuracy of the second-order method, but steps are taken using the
third-order accurate formula (local extrapolation is done). A cubic Hermite
polynomial is used for the dense output.
Can be applied in the complex domain.
Parameters
----------
fun : callable
Right-hand side of the system: the time derivative of the state ``y``
at time ``t``. The calling signature is ``fun(t, y)``, where ``t`` is a
scalar and ``y`` is an ndarray with ``len(y) = len(y0)``. ``fun`` must
return an array of the same shape as ``y``. See `vectorized` for more
information.
t0 : float
Initial time.
y0 : array_like, shape (n,)
Initial state.
t_bound : float
Boundary time - the integration won't continue beyond it. It also
determines the direction of the integration.
first_step : float or None, optional
Initial step size. Default is ``None`` which means that the algorithm
should choose.
max_step : float, optional
Maximum allowed step size. Default is np.inf, i.e., the step size is not
bounded and determined solely by the solver.
rtol, atol : float and array_like, optional
Relative and absolute tolerances. The solver keeps the local error
estimates less than ``atol + rtol * abs(y)``. Here `rtol` controls a
relative accuracy (number of correct digits), while `atol` controls
absolute accuracy (number of correct decimal places). To achieve the
desired `rtol`, set `atol` to be smaller than the smallest value that
can be expected from ``rtol * abs(y)`` so that `rtol` dominates the
allowable error. If `atol` is larger than ``rtol * abs(y)`` the
number of correct digits is not guaranteed. Conversely, to achieve the
desired `atol` set `rtol` such that ``rtol * abs(y)`` is always smaller
than `atol`. If components of y have different scales, it might be
beneficial to set different `atol` values for different components by
passing array_like with shape (n,) for `atol`. Default values are
1e-3 for `rtol` and 1e-6 for `atol`.
vectorized : bool, optional
Whether `fun` may be called in a vectorized fashion. False (default)
is recommended for this solver.
If ``vectorized`` is False, `fun` will always be called with ``y`` of
shape ``(n,)``, where ``n = len(y0)``.
If ``vectorized`` is True, `fun` may be called with ``y`` of shape
``(n, k)``, where ``k`` is an integer. In this case, `fun` must behave
such that ``fun(t, y)[:, i] == fun(t, y[:, i])`` (i.e. each column of
the returned array is the time derivative of the state corresponding
with a column of ``y``).
Setting ``vectorized=True`` allows for faster finite difference
approximation of the Jacobian by methods 'Radau' and 'BDF', but
will result in slower execution for this solver.
Attributes
----------
n : int
Number of equations.
status : string
Current status of the solver: 'running', 'finished' or 'failed'.
t_bound : float
Boundary time.
direction : float
Integration direction: +1 or -1.
t : float
Current time.
y : ndarray
Current state.
t_old : float
Previous time. None if no steps were made yet.
step_size : float
Size of the last successful step. None if no steps were made yet.
nfev : int
Number evaluations of the system's right-hand side.
njev : int
Number of evaluations of the Jacobian.
Is always 0 for this solver as it does not use the Jacobian.
nlu : int
Number of LU decompositions. Is always 0 for this solver.
References
----------
.. [1] P. Bogacki, L.F. Shampine, "A 3(2) Pair of Runge-Kutta Formulas",
Appl. Math. Lett. Vol. 2, No. 4. pp. 321-325, 1989.
"""
order = 3
error_estimator_order = 2
n_stages = 3
C = np.array([0, 1/2, 3/4])
A = np.array([
[0, 0, 0],
[1/2, 0, 0],
[0, 3/4, 0]
])
B = np.array([2/9, 1/3, 4/9])
E = np.array([5/72, -1/12, -1/9, 1/8])
P = np.array([[1, -4 / 3, 5 / 9],
[0, 1, -2/3],
[0, 4/3, -8/9],
[0, -1, 1]])
class RK45(RungeKutta):
"""Explicit Runge-Kutta method of order 5(4).
This uses the Dormand-Prince pair of formulas [1]_. The error is controlled
assuming accuracy of the fourth-order method accuracy, but steps are taken
using the fifth-order accurate formula (local extrapolation is done).
A quartic interpolation polynomial is used for the dense output [2]_.
Can be applied in the complex domain.
Parameters
----------
fun : callable
Right-hand side of the system. The calling signature is ``fun(t, y)``.
Here ``t`` is a scalar, and there are two options for the ndarray ``y``:
It can either have shape (n,); then ``fun`` must return array_like with
shape (n,). Alternatively it can have shape (n, k); then ``fun``
must return an array_like with shape (n, k), i.e., each column
corresponds to a single column in ``y``. The choice between the two
options is determined by `vectorized` argument (see below).
t0 : float
Initial time.
y0 : array_like, shape (n,)
Initial state.
t_bound : float
Boundary time - the integration won't continue beyond it. It also
determines the direction of the integration.
first_step : float or None, optional
Initial step size. Default is ``None`` which means that the algorithm
should choose.
max_step : float, optional
Maximum allowed step size. Default is np.inf, i.e., the step size is not
bounded and determined solely by the solver.
rtol, atol : float and array_like, optional
Relative and absolute tolerances. The solver keeps the local error
estimates less than ``atol + rtol * abs(y)``. Here `rtol` controls a
relative accuracy (number of correct digits), while `atol` controls
absolute accuracy (number of correct decimal places). To achieve the
desired `rtol`, set `atol` to be smaller than the smallest value that
can be expected from ``rtol * abs(y)`` so that `rtol` dominates the
allowable error. If `atol` is larger than ``rtol * abs(y)`` the
number of correct digits is not guaranteed. Conversely, to achieve the
desired `atol` set `rtol` such that ``rtol * abs(y)`` is always smaller
than `atol`. If components of y have different scales, it might be
beneficial to set different `atol` values for different components by
passing array_like with shape (n,) for `atol`. Default values are
1e-3 for `rtol` and 1e-6 for `atol`.
vectorized : bool, optional
Whether `fun` is implemented in a vectorized fashion. Default is False.
Attributes
----------
n : int
Number of equations.
status : string
Current status of the solver: 'running', 'finished' or 'failed'.
t_bound : float
Boundary time.
direction : float
Integration direction: +1 or -1.
t : float
Current time.
y : ndarray
Current state.
t_old : float
Previous time. None if no steps were made yet.
step_size : float
Size of the last successful step. None if no steps were made yet.
nfev : int
Number evaluations of the system's right-hand side.
njev : int
Number of evaluations of the Jacobian.
Is always 0 for this solver as it does not use the Jacobian.
nlu : int
Number of LU decompositions. Is always 0 for this solver.
References
----------
.. [1] J. R. Dormand, P. J. Prince, "A family of embedded Runge-Kutta
formulae", Journal of Computational and Applied Mathematics, Vol. 6,
No. 1, pp. 19-26, 1980.
.. [2] L. W. Shampine, "Some Practical Runge-Kutta Formulas", Mathematics
of Computation,, Vol. 46, No. 173, pp. 135-150, 1986.
"""
order = 5
error_estimator_order = 4
n_stages = 6
C = np.array([0, 1/5, 3/10, 4/5, 8/9, 1])
A = np.array([
[0, 0, 0, 0, 0],
[1/5, 0, 0, 0, 0],
[3/40, 9/40, 0, 0, 0],
[44/45, -56/15, 32/9, 0, 0],
[19372/6561, -25360/2187, 64448/6561, -212/729, 0],
[9017/3168, -355/33, 46732/5247, 49/176, -5103/18656]
])
B = np.array([35/384, 0, 500/1113, 125/192, -2187/6784, 11/84])
E = np.array([-71/57600, 0, 71/16695, -71/1920, 17253/339200, -22/525,
1/40])
# Corresponds to the optimum value of c_6 from [2]_.
P = np.array([
[1, -8048581381/2820520608, 8663915743/2820520608,
-12715105075/11282082432],
[0, 0, 0, 0],
[0, 131558114200/32700410799, -68118460800/10900136933,
87487479700/32700410799],
[0, -1754552775/470086768, 14199869525/1410260304,
-10690763975/1880347072],
[0, 127303824393/49829197408, -318862633887/49829197408,
701980252875 / 199316789632],
[0, -282668133/205662961, 2019193451/616988883, -1453857185/822651844],
[0, 40617522/29380423, -110615467/29380423, 69997945/29380423]])
class DOP853(RungeKutta):
"""Explicit Runge-Kutta method of order 8.
This is a Python implementation of "DOP853" algorithm originally written
in Fortran [1]_, [2]_. Note that this is not a literal translation, but
the algorithmic core and coefficients are the same.
Can be applied in the complex domain.
Parameters
----------
fun : callable
Right-hand side of the system. The calling signature is ``fun(t, y)``.
Here, ``t`` is a scalar, and there are two options for the ndarray ``y``:
It can either have shape (n,); then ``fun`` must return array_like with
shape (n,). Alternatively it can have shape (n, k); then ``fun``
must return an array_like with shape (n, k), i.e. each column
corresponds to a single column in ``y``. The choice between the two
options is determined by `vectorized` argument (see below).
t0 : float
Initial time.
y0 : array_like, shape (n,)
Initial state.
t_bound : float
Boundary time - the integration won't continue beyond it. It also
determines the direction of the integration.
first_step : float or None, optional
Initial step size. Default is ``None`` which means that the algorithm
should choose.
max_step : float, optional
Maximum allowed step size. Default is np.inf, i.e. the step size is not
bounded and determined solely by the solver.
rtol, atol : float and array_like, optional
Relative and absolute tolerances. The solver keeps the local error
estimates less than ``atol + rtol * abs(y)``. Here `rtol` controls a
relative accuracy (number of correct digits), while `atol` controls
absolute accuracy (number of correct decimal places). To achieve the
desired `rtol`, set `atol` to be smaller than the smallest value that
can be expected from ``rtol * abs(y)`` so that `rtol` dominates the
allowable error. If `atol` is larger than ``rtol * abs(y)`` the
number of correct digits is not guaranteed. Conversely, to achieve the
desired `atol` set `rtol` such that ``rtol * abs(y)`` is always smaller
than `atol`. If components of y have different scales, it might be
beneficial to set different `atol` values for different components by
passing array_like with shape (n,) for `atol`. Default values are
1e-3 for `rtol` and 1e-6 for `atol`.
vectorized : bool, optional
Whether `fun` is implemented in a vectorized fashion. Default is False.
Attributes
----------
n : int
Number of equations.
status : string
Current status of the solver: 'running', 'finished' or 'failed'.
t_bound : float
Boundary time.
direction : float
Integration direction: +1 or -1.
t : float
Current time.
y : ndarray
Current state.
t_old : float
Previous time. None if no steps were made yet.
step_size : float
Size of the last successful step. None if no steps were made yet.
nfev : int
Number evaluations of the system's right-hand side.
njev : int
Number of evaluations of the Jacobian. Is always 0 for this solver
as it does not use the Jacobian.
nlu : int
Number of LU decompositions. Is always 0 for this solver.
References
----------
.. [1] E. Hairer, S. P. Norsett G. Wanner, "Solving Ordinary Differential
Equations I: Nonstiff Problems", Sec. II.
.. [2] `Page with original Fortran code of DOP853
<http://www.unige.ch/~hairer/software.html>`_.
"""
n_stages = dop853_coefficients.N_STAGES
order = 8
error_estimator_order = 7
A = dop853_coefficients.A[:n_stages, :n_stages]
B = dop853_coefficients.B
C = dop853_coefficients.C[:n_stages]
E3 = dop853_coefficients.E3
E5 = dop853_coefficients.E5
D = dop853_coefficients.D
A_EXTRA = dop853_coefficients.A[n_stages + 1:]
C_EXTRA = dop853_coefficients.C[n_stages + 1:]
def __init__(self, fun, t0, y0, t_bound, max_step=np.inf,
rtol=1e-3, atol=1e-6, vectorized=False,
first_step=None, **extraneous):
super().__init__(fun, t0, y0, t_bound, max_step, rtol, atol,
vectorized, first_step, **extraneous)
self.K_extended = np.empty((dop853_coefficients.N_STAGES_EXTENDED,
self.n), dtype=self.y.dtype)
self.K = self.K_extended[:self.n_stages + 1]
def _estimate_error(self, K, h): # Left for testing purposes.
err5 = np.dot(K.T, self.E5)
err3 = np.dot(K.T, self.E3)
denom = np.hypot(np.abs(err5), 0.1 * np.abs(err3))
correction_factor = np.ones_like(err5)
mask = denom > 0
correction_factor[mask] = np.abs(err5[mask]) / denom[mask]
return h * err5 * correction_factor
def _estimate_error_norm(self, K, h, scale):
err5 = np.dot(K.T, self.E5) / scale
err3 = np.dot(K.T, self.E3) / scale
err5_norm_2 = np.linalg.norm(err5)**2
err3_norm_2 = np.linalg.norm(err3)**2
if err5_norm_2 == 0 and err3_norm_2 == 0:
return 0.0
denom = err5_norm_2 + 0.01 * err3_norm_2
return np.abs(h) * err5_norm_2 / np.sqrt(denom * len(scale))
def _dense_output_impl(self):
K = self.K_extended
h = self.h_previous
for s, (a, c) in enumerate(zip(self.A_EXTRA, self.C_EXTRA),
start=self.n_stages + 1):
dy = np.dot(K[:s].T, a[:s]) * h
K[s] = self.fun(self.t_old + c * h, self.y_old + dy)
F = np.empty((dop853_coefficients.INTERPOLATOR_POWER, self.n),
dtype=self.y_old.dtype)
f_old = K[0]
delta_y = self.y - self.y_old
F[0] = delta_y
F[1] = h * f_old - delta_y
F[2] = 2 * delta_y - h * (self.f + f_old)
F[3:] = h * np.dot(self.D, K)
return Dop853DenseOutput(self.t_old, self.t, self.y_old, F)
class RkDenseOutput(DenseOutput):
def __init__(self, t_old, t, y_old, Q):
super().__init__(t_old, t)
self.h = t - t_old
self.Q = Q
self.order = Q.shape[1] - 1
self.y_old = y_old
def _call_impl(self, t):
x = (t - self.t_old) / self.h
if t.ndim == 0:
p = np.tile(x, self.order + 1)
p = np.cumprod(p)
else:
p = np.tile(x, (self.order + 1, 1))
p = np.cumprod(p, axis=0)
y = self.h * np.dot(self.Q, p)
if y.ndim == 2:
y += self.y_old[:, None]
else:
y += self.y_old
return y
class Dop853DenseOutput(DenseOutput):
def __init__(self, t_old, t, y_old, F):
super().__init__(t_old, t)
self.h = t - t_old
self.F = F
self.y_old = y_old
def _call_impl(self, t):
x = (t - self.t_old) / self.h
if t.ndim == 0:
y = np.zeros_like(self.y_old)
else:
x = x[:, None]
y = np.zeros((len(x), len(self.y_old)), dtype=self.y_old.dtype)
for i, f in enumerate(reversed(self.F)):
y += f
if i % 2 == 0:
y *= x
else:
y *= 1 - x
y += self.y_old
return y.T
#!/usr/bin/env python3
""" test-callback.py: e^(e^y), h -> 0, stop callback """ # 24aug
import numpy as np
from scipy.integrate import solve_ivp # ( func( t, y ), t_span, y0 ... )
from matplotlib import pyplot as pl
import seaborn as sns
from pathlib import Path
import sys
import time
try:
from extensisq import BS5, Pr9 # https://github.com/WRKampi/extensisq
except ImportError:
pass
from callback0 import callback0 # basic, -> "stop mag" or None
from callevery import Callevery # filter
np.set_printoptions( edgeitems=5, threshold=10, linewidth=120, formatter=dict(
float = lambda x: "%.2g" % x ))
basepy = Path( sys.argv[0] ).stem
print( 80 * "▄" )
#...............................................................................
def eey( t, y ) -> "e^e^y":
time.sleep( 0.1 ) # test keyboardinterrupt
return np.exp( np.exp( y ))
# _ivp/rk.py:108: RuntimeWarning: invalid value encountered in dot return np.dot(K.T, self.E) * h
func = eey
y0 = [1] # [1., 0]
tmax = 1
dt = 0
atol = 1e-6
rtol = 1e-3
method = "Radau" # "RK45"
sec = 0.1
every = 1
plot = 0
# to change these params, run this.py a=1 b=2,None 'c = expr' ...
for arg in sys.argv[1:]:
exec( arg )
# callbackevery = Callevery( callback0, seconds=sec, every=every )
methodname = getattr( method, "__name__", method ) # extensisq
params = (f"{basepy}.py func=e^(e^y) method={methodname} {y0=} "
f"{tmax=:g} {dt=:g} {atol=} {rtol=} ")
print( params, "\n" )
#...............................................................................
sol = solve_ivp( func, [0, tmax], y0=y0, t_eval=None, callback=callback0,
method=method, atol=atol, rtol=rtol )
T = sol.t
Y = sol.y.squeeze()
nfev, njev = sol.nfev, sol.njev
params += "\nNt %d nfev %d njev %d" % ( len(T), sol.nfev, sol.njev )
print( params, "\n" )
yp = np.gradient( Y, T, edge_order=2 )
yyp = np.vstack(( Y, yp ))
print( "y y': \n", yyp )
dT = np.diff( T )
print( "T ΔT: \n%s \n%s \n" % ( T[:-1], dT ))
if not sol.success:
print( "sol.message:", sol.message )
if plot:
fig, (yax, tax) = pl.subplots( nrows=2 )
fig.suptitle( params, multialignment="left" )
yax.set( ylim=[1, 1000] )
yax.semilogy( T, yyp.T, label="y y' ".split() )
yax.legend()
tax.set( ylim=[1e-6, dT.max()], ylabel="ΔT" )
tax.semilogy( T[:-1], dT )
sns.rugplot( ax=tax, x=T, height=0.2, color="crimson", lw=0.4 )
if plot >= 2:
png = f"{basepy}-{methodname}.png"
print( ">", png )
pl.savefig( png )
#!/usr/bin/env python3
""" Timer: print info every say 60 seconds in a loop
Usage:
timer = Timer( seconds=60 ) # initialize `t0 = time.time()`
while True: # a long-running loop
...
timer( info ... ) # prints once every 60 seconds
# sec 10: info ...
# sec 61: info
# sec 234: info
If we're in the same minute as the previous call `timer()`,
return 0, do nothing
else print sec, info ... and
return time.time() - t0 seconds
sec = timer() # no args: just return (time.time() - t0 ) or 0
if sec > timelimit:
status = "Timeout: %.1f seconds > timelimit %.1f" % (sec, timelimit)
print( status )
break # return sol or opt as usual
timer( userfunc ... )
calls userfunc( timer.totalsec, ... ) once every __ seconds:
print in color, or plot, or do logging.
Notes:
print( info ... ) prints ndarrays with the caller's `np.set_printoptions()`.
`time.time()` is wallclock time, not time.process_time() = the sum of all cores.
See also: ivmon.py, wraps `func( t, y )` for scipy solve_ivp.
"""
# keywords: python time timer monitor once-a-minute
# see also: callbacks, ivmon.py, monitor.py
import numpy as np
import time
#...............................................................................
class Timer( object ):
def __init__( self, seconds=60 ):
self.seconds = seconds
self.t0 = time.time()
self.oldminute = -1
self.fmt = "sec %d:" # msec %.3f
def __call__( self, *args, **kw ):
if self.seconds <= 0:
return 0
self.totalsec = time.time() - self.t0 # wallclock since t0
minute = int( self.totalsec / self.seconds ) # or second or hour ...
if minute == self.oldminute:
return 0
self.oldminute = minute
if args:
arg0 = args[0]
if callable( arg0 ): # timer( userfunc ): call it
return arg0( self.totalsec, *args[1:], **kw )
else:
print( self.fmt % self.totalsec, *args, **kw )
return self.totalsec
def sec_windowtitle( sec, *args, **kw ) -> "print sec __: args in window title":
title = "\033]0;sec %d:" % sec # centred, wobbles
print( title, *args, end="\007", **kw ) # with np printoptions
__version__ = "2025-04-14 apr" # denis-bz-py t-online.de
#...............................................................................
if __name__ == "__main__":
import sys
print( 80 * "▄" )
np.set_printoptions( edgeitems=5, threshold=10, linewidth=120, formatter=dict(
float = lambda x: "%.2g" % x ))
sec = 1
nrand = 10
# to change these params, run this.py a=1 b=None 'c = expr' ... in sh or ipython --
for arg in sys.argv[1:]:
exec( arg )
rng = np.random.default_rng( seed=0 )
sleep = rng.exponential( size=nrand )
print( "sleep times:", sleep )
print( sleep.cumsum().astype(int) ) # != cumsum( ints )
timer = Timer( sec )
def psec( sec, *args ):
print( "%.3g" % sec, *args )
for s in sleep:
time.sleep( s ) # flaky on old mac ?
timer( 1, 2, 3 )
# timer( sec_windowtitle, 1, 2, 3 )
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment