Source code for andes.routines.daeint

"""
Integration methods for DAE.
"""

import logging
import numpy as np

from andes.routines.adaptive import accept_reject, check_adaptive_bust, weighted_rms_error
from andes.shared import sparse, matrix
from andes.routines.qndf import QNDF


logger = logging.getLogger(__name__)


[docs]class ImplicitIter: """ Base class for implicit iterative methods. """ nolte_event_steps = 0 nolte_event_window = 0.0 requires_variable_step = False @staticmethod def niter_next_h(niter, h, h_min, h_max): """ Niter-based step-size heuristic shared by all methods. Returns a clamped next step size based on how many Newton iterations were needed for convergence. """ if niter >= 15: h_new = h * 0.5 elif niter <= 6: h_new = h * 1.1 else: h_new = h * 0.95 return min(max(h_new, h_min), h_max) @staticmethod def calc_h(tds): """ Default step-size control: niter heuristic with fixt/shrinkt handling. """ config = tds.config if tds.converged: tds.deltat = ImplicitIter.niter_next_h( tds.niter, tds.deltat, tds.deltatmin, tds.deltatmax) if config.fixt: tds.deltat = min(config.tstep, tds.deltat) if tds.chatter is True: tds.chatter = False else: if config.fixt and not config.shrinkt: tds.deltat = 0 tds.busted = True tds.err_msg = ( f"Simulation did not converge with step size h={config.tstep:.4f}.\n" "Reduce the step size `tstep`, or set `shrinkt = 1` to let it shrink." ) else: tds.deltat *= 0.9 if tds.deltat < tds.deltatmin: tds.deltat = 0 tds.err_msg = "Time step reduced to zero. Convergence not likely." tds.busted = True @staticmethod def calc_jac(tds, gxs, gys): pass @staticmethod def calc_q(x, f, Tf, h, x0, f0): pass @staticmethod def checkpoint_state(tds): """ Snapshot DAE state for rollback. """ dae = tds.system.dae return dae.x.copy(), dae.y.copy(), dae.f.copy() @staticmethod def restore_state(tds, state): """ Restore DAE state from a checkpoint. """ dae = tds.system.dae x_state, y_state, f_state = state dae.x[:] = x_state dae.y[:] = y_state dae.f[:] = f_state tds.system.vars_to_models() @staticmethod def solve_once(tds, h, method): """ Run one implicit step with the given method and step size. """ original_method = tds.method original_h = tds.h tds.method = method tds.h = h try: return ImplicitIter.step(tds) finally: tds.method = original_method tds.h = original_h @staticmethod def step(tds): """ Integrate with Implicit Trapezoidal Method (ITM) to the current time. This function has an internal Newton-Raphson loop for algebraized semi-explicit DAE. The function returns the convergence status when done but does NOT progress simulation time. When ``tds.config.linesearch`` is enabled, applies nonmonotone backtracking line search using ``‖qg‖₂²`` as the merit function. The ``g_scale * h`` scaling ensures differential and algebraic residuals are comparable. The trial-point ``fg_update`` at the end of iteration *k* doubles as the initial evaluation for iteration *k+1*, making it zero-cost when the full step is accepted. Returns ------- bool Convergence status in ``tds.converged``. """ system = tds.system dae = tds.system.dae if tds.h == 0: logger.error("Current step size is zero. Integration is not permitted.") return False tds.mis = [1] tds.mis_inc = [1] tds.niter = 0 tds.converged = False tds.x0[:] = dae.x tds.y0[:] = dae.y tds.f0[:] = dae.f use_ls = tds.config.linesearch if use_ls: merit_history = [] # initial residual evaluation (reused by first iteration) tds.fg_update(models=system.exist.pflow_tds) while True: # lazy Jacobian update reason = '' if dae.t == 0: reason = 't=0' elif tds.config.honest: reason = 'using honest method' elif tds.custom_event: reason = 'custom event set' elif not tds.last_converged: reason = 'non-convergence in the last step' elif tds.niter > 4 and (tds.niter + 1) % 3 == 0: reason = 'every 3 iterations beyond 4 iterations' elif dae.t - tds._last_switch_t < 0.1: reason = 'within 0.1s of event' if reason: system.j_update(models=system.exist.pflow_tds, info=reason) # set flag in `solver.worker.factorize`, not `solver.factorize`. tds.solver.worker.factorize = True # `Tf` should remain constant throughout the simulation, even if the corresponding diff. var. # is pegged by the anti-windup limiters. # solve implicit trapezoidal method (ITM) integration if tds.config.g_scale > 0: gxs = tds.config.g_scale * tds.h * dae.gx gys = tds.config.g_scale * tds.h * dae.gy else: gxs = dae.gx gys = dae.gy # calculate complete Jacobian matrix ``Ac``` tds.Ac = tds.method.calc_jac(tds, gxs, gys) # equation `tds.qg[:dae.n] = 0` is the implicit form of differential equations using ITM tds.qg[:dae.n] = tds.method.calc_q(dae.x, dae.f, dae.Tf, tds.h, tds.x0, tds.f0) # reset the corresponding q elements for pegged anti-windup limiter for item in system.antiwindups: for key, _, eqval in item.x_set: np.put(tds.qg, key, eqval) # set or scale the algebraic residuals if tds.config.g_scale > 0: tds.qg[dae.n:] = tds.config.g_scale * tds.h * dae.g else: tds.qg[dae.n:] = dae.g # calculate variable corrections if not tds.config.linsolve: inc = tds.solver.solve(tds.Ac, matrix(tds.qg)) else: inc = tds.solver.linsolve(tds.Ac, matrix(tds.qg)) # check for np.nan first if np.isnan(inc).any(): tds.err_msg = 'NaN found in solution. Convergence is not likely' tds.niter = tds.config.max_iter + 1 tds.busted = True break # reset tiny values to reduce chattering if tds.config.reset_tiny: inc[np.where(np.abs(inc) < tds.tol_zero)] = 0 # store `inc` to tds for debugging tds.inc = inc # retrieve maximum abs. residual and maximum var. correction mis_arg = np.argmax(np.abs(inc)) mis_inc = inc[mis_arg] mis_qg_arg = np.argmax(np.abs(tds.qg)) mis_qg = tds.qg[mis_qg_arg] # store initial maximum mismatch if tds.niter == 0: tds.mis[0] = abs(mis_qg) tds.mis_inc[0] = abs(mis_inc) else: tds.mis.append(mis_qg) tds.mis_inc.append(mis_inc) mis = abs(mis_inc) # chattering detection if tds.niter > tds.config.chatter_iter: if (abs(sum(tds.mis_inc[-2:])) < 1e-6) and abs(tds.mis_inc[-1]) > 1e-4: tds.chatter = True logger.debug("Chattering detected at t=%s s", dae.t) logger.debug("Chattering variable: %s", dae.xy_name[mis_arg]) # --- apply step --- inc_x = inc[:dae.n].ravel() inc_y = inc[dae.n: dae.n + dae.m].ravel() if use_ls: merit_old = np.dot(tds.qg, tds.qg) merit_history.append(merit_old) merit_ref = max(merit_history[-3:]) tds.xs[:] = dae.x tds.ys[:] = dae.y alpha = 1.0 for _ in range(4): dae.x[:] = tds.xs - alpha * inc_x dae.y[:] = tds.ys - alpha * inc_y system.vars_to_models() tds.fg_update(models=system.exist.pflow_tds) # recompute qg at trial point tds.qg[:dae.n] = tds.method.calc_q( dae.x, dae.f, dae.Tf, tds.h, tds.x0, tds.f0) for item in system.antiwindups: for key, _, eqval in item.x_set: np.put(tds.qg, key, eqval) if tds.config.g_scale > 0: tds.qg[dae.n:] = tds.config.g_scale * tds.h * dae.g else: tds.qg[dae.n:] = dae.g merit_new = np.dot(tds.qg, tds.qg) if merit_new < merit_ref: break alpha *= 0.5 logger.debug("TDS line search backtrack: alpha=%.4g at t=%.6f", alpha, dae.t) else: dae.x -= inc_x dae.y -= inc_y system.vars_to_models() tds.fg_update(models=system.exist.pflow_tds) tds.niter += 1 # converged if abs(mis) <= tds.config.tol: tds.converged = True break if tds.chatter: tds.converged = True break # non-convergence cases if tds.niter > tds.config.max_iter: break if (abs(mis) > 1e6) and (abs(mis) > 1e6 * tds.mis[0]): tds.err_msg = 'Error increased too quickly.' break if not tds.converged: # restore variables and f dae.x[:] = np.array(tds.x0) dae.y[:] = np.array(tds.y0) dae.f[:] = np.array(tds.f0) system.vars_to_models() # debug outputs if tds.busted: logger.debug('* NaN in solution at t=%.6f, h=%.6f', dae.t, tds.h) else: logger.debug('* Max. iter. %d reached for t=%.6f, h=%.6f, max inc=%.4g', tds.config.max_iter, dae.t, tds.h, mis) if logger.isEnabledFor(logging.DEBUG): g_max = np.argmax(abs(dae.g)) inc_max = np.argmax(abs(inc)) tds._debug_g(g_max) tds._debug_ac(inc_max) else: logger.debug('Converged in %d steps for t=%.6f, h=%.6f, max inc=%.4g', tds.niter, dae.t, tds.h, mis) tds.last_converged = tds.converged return tds.converged
[docs]class BackEuler(ImplicitIter): """ Backward Euler's integration method. """ @staticmethod def calc_jac(tds, gxs, gys): """ Build full Jacobian matrix ``Ac`` for Trapezoid method. """ dae = tds.system.dae return sparse([[tds.Teye - tds.h * dae.fx, gxs], [-tds.h * dae.fy, gys]], 'd') @staticmethod def calc_q(x, f, Tf, h, x0, f0): """ Calculate the residual of algebraized differential equations. Notes ----- Numba jit somehow slows down this function for the 14-bus and the 2k-bus systems. """ return Tf * (x - x0) - h * f
[docs]class Trapezoid(ImplicitIter): """ Trapezoidal methods. """ @staticmethod def calc_jac(tds, gxs, gys): """ Build full Jacobian matrix ``Ac`` for Trapezoid method. """ dae = tds.system.dae return sparse([[tds.Teye - tds.h * 0.5 * dae.fx, gxs], [-tds.h * 0.5 * dae.fy, gys]], 'd') @staticmethod def calc_q(x, f, Tf, h, x0, f0): """ Calculate the residual of algebraized differential equations. Notes ----- Numba jit somehow slows down this function for the 14-bus and the 2k-bus systems. """ return Tf * (x - x0) - h * 0.5 * (f + f0)
[docs]class TrapezoidAdaptive(Trapezoid): """ Adaptive trapezoid with step-doubling LTE estimation. The LTE estimate is based on the difference between one full step of size ``h`` and two half-steps of size ``h/2``. """ nolte_event_steps = 4 nolte_event_window = 0.1 requires_variable_step = True _trap_solver = Trapezoid() @staticmethod def calc_h(tds): """ Step size is set by ``step()``. Only check bust on failure. """ check_adaptive_bust(tds) @staticmethod def _reject(tds, h_next, state=None): """ Reject current candidate, optionally restoring the previous state. """ if state is not None: ImplicitIter.restore_state(tds, state) # Keep predictor snapshots consistent with restored DAE state. dae = tds.system.dae if tds.x0 is not None: tds.x0[:] = dae.x if tds.y0 is not None: tds.y0[:] = dae.y if tds.f0 is not None: tds.f0[:] = dae.f tds.deltat = min(h_next, tds.deltatmax) tds.converged = False tds.last_converged = False return False @staticmethod def _nolte_next_h(tds, h): """ Heuristic next step size when LTE control is disabled. """ return ImplicitIter.niter_next_h(tds.niter, h, tds.deltatmin_adapt, tds.deltatmax) @staticmethod def step(tds): """ One adaptive trapezoid step. Reads ``tds.h`` and writes ``tds.deltat``. Returns True when accepted, False when rejected. """ dae = tds.system.dae h = tds.h if h == 0: logger.error("Current step size is zero. Integration is not permitted.") return False n = dae.n trap = TrapezoidAdaptive._trap_solver # Restart mode near events: use converged trapezoid steps without LTE # for a few steps and/or for a short event-time window. use_nolte = tds._adaptive_nolte_steps > 0 if (not use_nolte) and (tds.method.nolte_event_window > 0.0): use_nolte = (dae.t - tds._last_switch_t) < tds.method.nolte_event_window if use_nolte: accepted = ImplicitIter.solve_once(tds, h, trap) if accepted: if tds._adaptive_nolte_steps > 0: tds._adaptive_nolte_steps -= 1 tds.deltat = TrapezoidAdaptive._nolte_next_h(tds, h) tds.converged = True tds.last_converged = True return True tds.deltat = min(max(h * 0.5, tds.deltatmin_adapt), tds.deltatmax) tds.converged = False tds.last_converged = False return False # algebraic-only systems: fallback to a single trapezoid solve if n == 0: accepted = ImplicitIter.solve_once(tds, h, trap) if accepted: tds.deltat = min(h, tds.deltatmax) else: tds.deltat = min(h * 0.5, tds.deltatmax) tds.converged = accepted tds.last_converged = accepted return accepted state0 = ImplicitIter.checkpoint_state(tds) x_prev = dae.x[:n].copy() # one full step with h ok_full = ImplicitIter.solve_once(tds, h, trap) if not ok_full: # Base Newton path already rolled state back. return TrapezoidAdaptive._reject(tds, h * 0.5) x_full = dae.x[:n].copy() # restore and run two half-steps ImplicitIter.restore_state(tds, state0) if not ImplicitIter.solve_once(tds, 0.5 * h, trap): return TrapezoidAdaptive._reject(tds, h * 0.5, state0) if not ImplicitIter.solve_once(tds, 0.5 * h, trap): return TrapezoidAdaptive._reject(tds, h * 0.5, state0) # second half-step result is already in dae.x / dae.y x_half = dae.x[:n] err_wt = np.empty_like(x_half) err_vec = (x_half - x_full) / 3.0 err_est = weighted_rms_error(err_vec, x_prev, x_half, tds.config.abstol, tds.config.reltol, err_wt) accepted, h_next, _ = accept_reject( err_est=err_est, h=h, deltatmax=tds.deltatmax, order=2, accept_safety=0.9, accept_min_factor=0.2, accept_max_factor=2.0, reject_safety=0.9, reject_min_factor=0.2, reject_max_factor=0.9, repeat_reject_after=999, # no failure counter for trapezoid-adaptive repeat_reject_factor=1.0, ) if accepted: tds.deltat = h_next tds.converged = True tds.last_converged = True return True # Reject — skip LTE for enough steps that the 1.1x/step growth # can recover from the worst-case 0.2x shrink (1.1^20 ≈ 6.7 > 5). tds._adaptive_nolte_steps = max(tds._adaptive_nolte_steps, 20) return TrapezoidAdaptive._reject(tds, h_next, state0)
# --- solution method name-to-class mapping --- # !!! add new solvers to below method_map = {"trapezoid": Trapezoid, "trap_adapt": TrapezoidAdaptive, "backeuler": BackEuler, 'qndf': QNDF, }