"""QNDF: Variable-order quasi-constant-step NDF method (Shampine & Reichelt 1997)."""
import logging
import numpy as np
from andes.routines.adaptive import accept_reject, candidate_h, check_adaptive_bust, weighted_rms_error
from andes.shared import sparse, matrix
logger = logging.getLogger(__name__)
MAX_ORDER = 5
KAPPA = np.array([-37/200, -1/9, -823/10000, -83/2000, 0], dtype=float)
GAMMA = np.zeros(MAX_ORDER + 1)
for _i in range(1, MAX_ORDER + 1):
GAMMA[_i] = GAMMA[_i - 1] + 1.0 / _i
[docs]class QNDFCache:
"""Pre-allocated workspace. Created once at TDS.init()."""
[docs] def __init__(self, n, abstol, reltol, max_order=MAX_ORDER):
self.n = n
self.max_order = max_order
self.abstol = abstol
self.reltol = reltol
ncols = max_order + 3
self.D = np.zeros((n, ncols))
self.D_prev = np.zeros((n, ncols))
self.D_scratch = np.zeros((n, ncols))
self.x_pred = np.zeros(n)
self.phi = np.zeros(n)
self.dx_corr = np.zeros(n)
self.x_prev = np.zeros(n)
self.err_wt = np.zeros(n)
self.R = np.zeros((max_order, max_order))
self.U = np.zeros((max_order, max_order))
self.RU = np.zeros((max_order, max_order))
self.order = 1
self.order_prev = 1
self.h_prev = 0.0
self.consec_steps = 0
self.consec_fail_count = 0
self.err_est = 1.0
self.err_est_km1 = float('inf')
self.err_est_kp1 = float('inf')
self.first_step = True
def reset_for_event(self, x_current):
self.D[:] = 0.0
self.D[:, 0] = x_current
self.D_prev[:] = 0.0
self.order = 1
self.order_prev = 1
self.h_prev = 0.0
self.consec_steps = 0
self.consec_fail_count = 0
self.err_est = 1.0
self.err_est_km1 = float('inf')
self.err_est_kp1 = float('inf')
self.first_step = True
# --- Stateless helpers (zero-alloc in hot path) ---
[docs]def compute_beta0(k):
return 1.0 / ((1.0 - KAPPA[k - 1]) * GAMMA[k])
[docs]def error_constant(k):
return KAPPA[k - 1] * GAMMA[k] + 1.0 / (k + 1)
[docs]def compute_U(k, U):
for r in range(k):
U[0, r] = -(r + 1)
for j in range(1, k):
U[j, r] = U[j - 1, r] * (j - (r + 1)) / (j + 1)
[docs]def compute_R(k, rho, R):
for r in range(k):
R[0, r] = -(r + 1) * rho
for j in range(1, k):
R[j, r] = R[j - 1, r] * (j - (r + 1) * rho) / (j + 1)
[docs]def rescale_D(cache, rho, k):
if k < 1:
return
compute_R(k, rho, cache.R[:k, :k])
compute_U(k, cache.U[:k, :k])
RU = cache.R[:k, :k] @ cache.U[:k, :k]
cache.D[:, 1:k+1] = cache.D[:, 1:k+1] @ RU
[docs]def compute_predictor(cache, k):
cache.x_pred[:] = cache.D[:, k]
for j in range(k - 1, 0, -1):
cache.x_pred += cache.D[:, j]
cache.x_pred += cache.D[:, 0]
cache.phi[:] = 0.0
for j in range(1, k + 1):
cache.phi += GAMMA[j] * cache.D[:, j]
[docs]def update_D(D, dx_corr, k):
D[:, k + 2] = dx_corr - D[:, k + 1]
D[:, k + 1] = dx_corr
for j in range(k, 0, -1):
D[:, j] += D[:, j + 1]
[docs]def select_order_and_step(cache, h, order, err_est):
"""Pick optimal (order, step_size) from order-1, order, order+1."""
h_cur_order = candidate_h(err_est, h, order,
safety=(1.0 / 1.2),
min_factor=0.0,
max_factor=10.0)
order_new, h_new = order, h_cur_order
if order > 1 and cache.consec_steps >= order:
err_est_km1 = cache.err_est_km1
h_lower = candidate_h(err_est_km1, h, order - 1,
safety=(1.0 / 1.3),
min_factor=0.0,
max_factor=10.0)
if h_lower > h_new:
order_new, h_new = order - 1, h_lower
if order < cache.max_order and cache.consec_steps >= order + 2:
err_est_kp1 = cache.err_est_kp1
h_higher = candidate_h(err_est_kp1, h, order + 1,
safety=(1.0 / 1.4),
min_factor=0.0,
max_factor=10.0)
if h_higher > h_new:
order_new, h_new = order + 1, h_higher
# Standard NDF step dampening: avoid small changes during warm-up
# to reduce unnecessary Jacobian refactorizations.
prefer_const = (cache.consec_steps < order + 2)
if prefer_const:
q = h / h_new if h_new > 0 else 1.0
if 0.6 < q < 1.2:
h_new = h
return order_new, h_new
[docs]class QNDF:
"""Variable-order (1-5) quasi-constant-step NDF method."""
nolte_event_steps = 0
nolte_event_window = 0.0
requires_variable_step = True
@staticmethod
def calc_h(tds):
"""
Step size is set by ``step()``. Only check bust on failure.
"""
check_adaptive_bust(tds)
@staticmethod
def step(tds):
"""One QNDF step. Returns True (accepted) or False (rejected).
NEVER writes tds.h. Reads tds.h, writes tds.deltat.
On return (True or False), tds.deltat holds the recommended next h.
"""
system = tds.system
dae = system.dae
cache = tds.qndf_cache
if cache is None:
raise RuntimeError(
"QNDF cache not initialized. Call TDS.init() after setting method='qndf'.")
n = dae.n
if n == 0:
raise RuntimeError(
"QNDF requires differential equations (dae.n > 0). "
"Use 'trapezoid' or 'backeuler' for algebraic-only systems.")
h = tds.h # read once, never mutated
if h == 0:
return False
k = cache.order
# --- Save state for rollback ---
tds.x0[:] = dae.x
tds.y0[:] = dae.y
tds.f0[:] = dae.f
cache.x_prev[:] = dae.x[:n]
# --- Prepare D table ---
if cache.first_step:
cache.D[:] = 0.0
cache.D[:, 0] = dae.x[:n]
cache.first_step = False
elif cache.h_prev > 0 and abs(h - cache.h_prev) > 1e-14 * max(abs(h), 1e-30):
rho = h / cache.h_prev
rescale_D(cache, rho, k)
# Save D AFTER init/rescale so rollback restores valid state.
# On first-step Newton failure, D[:] = D_prev restores D[:,0] = dae.x[:n]
# so the predictor on retry produces x_pred = x0, not x_pred = 0.
cache.D_prev[:] = cache.D
beta0 = compute_beta0(k)
compute_predictor(cache, k)
# --- Newton initial guess from predictor ---
dae.x[:n] = cache.x_pred
system.vars_to_models()
# --- Newton-Raphson loop (mirrors ImplicitIter.step) ---
tds.mis = [1]
tds.mis_inc = [1]
tds.niter = 0
tds.converged = False
tds.chatter = False
use_ls = tds.config.linesearch
if use_ls:
merit_history = []
tds.fg_update(models=system.exist.pflow_tds)
while True:
reason = ''
if dae.t == 0:
reason = 't=0'
elif tds.config.honest:
reason = 'honest'
elif tds.custom_event:
reason = 'custom event'
elif not tds.last_converged:
reason = 'prev non-convergence'
elif tds.niter > 4 and (tds.niter + 1) % 3 == 0:
reason = 'periodic'
elif dae.t - tds._last_switch_t < 0.1:
reason = 'near event'
if reason:
system.j_update(models=system.exist.pflow_tds, info=reason)
tds.solver.worker.factorize = True
if tds.config.g_scale > 0:
gxs = tds.config.g_scale * h * dae.gx
gys = tds.config.g_scale * h * dae.gy
else:
gxs = dae.gx
gys = dae.gy
tds.Ac = sparse([
[tds.Teye - h * beta0 * dae.fx, gxs],
[-h * beta0 * dae.fy, gys]
], 'd')
tds.qg[:n] = (dae.Tf * (dae.x[:n] - cache.x_pred + beta0 * cache.phi)
- h * beta0 * dae.f)
for item in system.antiwindups:
for key, _, eqval in item.x_set:
np.put(tds.qg, key, eqval)
if tds.config.g_scale > 0:
tds.qg[n:] = tds.config.g_scale * h * dae.g
else:
tds.qg[n:] = dae.g
if not tds.config.linsolve:
inc = tds.solver.solve(tds.Ac, matrix(tds.qg))
else:
inc = tds.solver.linsolve(tds.Ac, matrix(tds.qg))
if np.isnan(inc).any():
logger.debug("QNDF: NaN in linear solve at t=%.6f, h=%.4g — treating as Newton failure",
dae.t, h)
tds.err_msg = 'NaN in linear solve'
break
if tds.config.reset_tiny:
inc[np.where(np.abs(inc) < tds.tol_zero)] = 0
tds.inc = inc
mis_arg = np.argmax(np.abs(inc))
mis_inc = inc[mis_arg]
mis_qg_arg = np.argmax(np.abs(tds.qg))
mis_qg = tds.qg[mis_qg_arg]
if tds.niter == 0:
tds.mis[0] = abs(mis_qg)
tds.mis_inc[0] = abs(mis_inc)
else:
tds.mis.append(mis_qg)
tds.mis_inc.append(mis_inc)
mis = abs(mis_inc)
if tds.niter > tds.config.chatter_iter:
if abs(sum(tds.mis_inc[-2:])) < 1e-6 and abs(tds.mis_inc[-1]) > 1e-4:
tds.chatter = True
inc_x = inc[:n].ravel()
inc_y = inc[n:n + dae.m].ravel()
if use_ls:
merit_old = np.dot(tds.qg, tds.qg)
merit_history.append(merit_old)
merit_ref = max(merit_history[-3:])
tds.xs[:] = dae.x
tds.ys[:] = dae.y
alpha = 1.0
for _ in range(4):
dae.x[:] = tds.xs - alpha * inc_x
dae.y[:] = tds.ys - alpha * inc_y
system.vars_to_models()
tds.fg_update(models=system.exist.pflow_tds)
tds.qg[:n] = (dae.Tf * (dae.x[:n] - cache.x_pred + beta0 * cache.phi)
- h * beta0 * dae.f)
for item in system.antiwindups:
for key, _, eqval in item.x_set:
np.put(tds.qg, key, eqval)
if tds.config.g_scale > 0:
tds.qg[n:] = tds.config.g_scale * h * dae.g
else:
tds.qg[n:] = dae.g
merit_new = np.dot(tds.qg, tds.qg)
if merit_new < merit_ref:
break
alpha *= 0.5
logger.debug("QNDF line search backtrack: alpha=%.4g at t=%.6f",
alpha, dae.t)
else:
dae.x -= inc_x
dae.y -= inc_y
system.vars_to_models()
tds.fg_update(models=system.exist.pflow_tds)
tds.niter += 1
if mis <= tds.config.tol:
tds.converged = True
break
if tds.chatter:
tds.converged = True
break
if tds.niter > tds.config.max_iter:
break
if abs(mis) > 1e6 and abs(mis) > 1e6 * tds.mis[0]:
tds.err_msg = 'Error diverged'
break
# === End Newton loop ===
if not tds.converged:
# NEWTON FAILURE — restore, set shrunk deltat, return False
cache.D[:] = cache.D_prev
dae.x[:] = tds.x0
dae.y[:] = tds.y0
dae.f[:] = tds.f0
system.vars_to_models()
cache.consec_fail_count += 1
tds.deltat = min(h * 0.5, tds.deltatmax)
tds.last_converged = False
logger.debug('* QNDF: max iter %d reached for t=%.6f, h=%.6f, max inc=%.4g',
tds.config.max_iter, dae.t, h, mis)
if logger.isEnabledFor(logging.DEBUG):
g_max = np.argmax(abs(dae.g))
inc_max = np.argmax(abs(inc))
tds._debug_g(g_max)
tds._debug_ac(inc_max)
return False
# === Error estimation ===
x_new = dae.x[:n]
cache.dx_corr[:] = x_new - cache.x_pred
# Warm-up: accept unconditionally until the D-table has enough
# history for meaningful error estimates (order+1 accepted steps).
# With fewer points, D[:,k+1] differences are raw corrections
# rather than smooth higher-order errors, producing wildly inflated err_est.
if cache.consec_steps < k + 1:
update_D(cache.D, cache.dx_corr, k)
cache.D[:, 0] = x_new
cache.h_prev = h
cache.consec_steps += 1
cache.consec_fail_count = 0
cache.err_est = 0.0
tds.deltat = h
tds.last_converged = True
return True
update_D(cache.D, cache.dx_corr, k)
# err_est at current order k
err_k = error_constant(k) * cache.D[:, k + 1]
err_est = weighted_rms_error(err_k, cache.x_prev, x_new,
cache.abstol, cache.reltol, cache.err_wt)
cache.err_est = err_est
# err_est at order k-1
if k > 1 and cache.consec_steps >= k:
err_km1 = error_constant(k - 1) * cache.D[:, k]
cache.err_est_km1 = weighted_rms_error(err_km1, cache.x_prev, x_new,
cache.abstol, cache.reltol, cache.err_wt)
else:
cache.err_est_km1 = float('inf')
# err_est at order k+1
if k < cache.max_order and cache.consec_steps >= k + 2:
err_kp1 = error_constant(k + 1) * cache.D[:, k + 2]
cache.err_est_kp1 = weighted_rms_error(err_kp1, cache.x_prev, x_new,
cache.abstol, cache.reltol, cache.err_wt)
else:
cache.err_est_kp1 = float('inf')
if err_est <= 1.0:
# ACCEPT
cache.D[:, 0] = x_new
order_new, h_new = select_order_and_step(cache, h, k, err_est)
if order_new == k:
cache.consec_steps += 1
else:
cache.consec_steps = 0
cache.order_prev = k
cache.order = order_new
cache.h_prev = h
cache.consec_fail_count = 0
tds.deltat = min(h_new, tds.deltatmax)
tds.last_converged = True
return True
else:
# LTE REJECT — restore state, set shrunk deltat, return False
cache.D[:] = cache.D_prev
dae.x[:] = tds.x0
dae.y[:] = tds.y0
dae.f[:] = tds.f0
system.vars_to_models()
_, h_next, fail_count_next = accept_reject(
err_est=err_est,
h=h,
deltatmax=tds.deltatmax,
order=k,
fail_count=cache.consec_fail_count,
accept_threshold=1.0,
# acceptance not used in this branch, keep same policy shape.
accept_safety=(1.0 / 1.2),
accept_min_factor=0.0,
accept_max_factor=10.0,
reject_safety=(1.0 / 1.2),
reject_min_factor=0.2,
reject_max_factor=1.0,
repeat_reject_after=1,
repeat_reject_factor=0.5,
)
tds.deltat = h_next
cache.consec_fail_count = fail_count_next
if cache.consec_fail_count > 2 and k > 1:
cache.order = k - 1
# After repeated LTE rejections, the D-table checkpoint becomes
# unreliable (rescaling stale data produces poor predictors and
# inflated error estimates). Reset to break the pathological cycle.
if cache.consec_fail_count >= 3:
cache.D[:] = 0.0
cache.D[:, 0] = tds.x0[:n]
cache.consec_steps = 0
cache.order = 1
cache.h_prev = 0.0
logger.debug("QNDF LTE reject at t=%.6f: err_est=%.3g, h=%.4g, next deltat=%.4g",
dae.t, err_est, h, tds.deltat)
tds.converged = False
tds.last_converged = False
return False