Source code for andes.routines.tds

"""
ANDES module for time-domain simulation.
"""

import importlib
import logging
import os
import sys
import time
from collections import OrderedDict

from andes.routines.base import BaseRoutine
from andes.routines.daeint import Trapezoid, method_map
from andes.routines.qndf import QNDFCache
from andes.routines.criteria import deltadelta
from andes.shared import get_tqdm, matrix, np, pd, spdiag, tqdm_nb
from andes.utils.misc import elapsed, is_interactive, is_notebook
from andes.utils.tab import Tab

logger = logging.getLogger(__name__)


[docs]class TDS(BaseRoutine): """ Time-domain simulation routine. Some cases may be sensitive to large convergence tolerance ``config.tol``. If numerical oscillation happens, try reducing ``config.tol`` to ``1e-6``. """
[docs] def __init__(self, system=None, config=None): super().__init__(system, config) self.config.add(OrderedDict((('method', 'trapezoid'), ('tol', 1e-4), ('t0', 0.0), ('tf', 20.0), ('fixt', 1), ('shrinkt', 1), ('honest', 0), ('tstep', 1/30), ('max_iter', 15), ('refresh_event', 0), ('test_init', 1), ('check_conn', 1), ('check_conn', 1), ('criteria', 1), ('ddelta_limit', 180), ('g_scale', 1), ('reset_tiny', 1), ('qrt', 0), ('kqrt', 1.0), ('store_z', 0), ('store_f', 0), ('store_h', 0), ('store_i', 0), ('limit_store', 0), ('max_store', 900), ('save_every', 1), ('save_mode', 'auto'), ('no_tqdm', 0), ('chatter_iter', 4), ('linesearch', 1), ('dtmax', 0), ('dtmin_adapt', 0), ('abstol', 1e-6), ('reltol', 1e-3), ))) self.config.add_extra("_help", method='DAE solution method: "trapezoid" and "backeuler" use ' 'niter-based step heuristic; "trap_adapt" and "qndf" use ' 'internal LTE error control (requires fixt=0)', tol="convergence tolerance", t0="simulation starting time", tf="simulation ending time", fixt='use fixed step size (1) or variable (0); ' 'adaptive methods (trap_adapt, qndf) override to 0', shrinkt='allow step shrinking on non-convergence when fixt=1', honest='honest Newton method that updates Jac at each step', tstep='integration step size', max_iter='maximum number of iterations', refresh_event='refresh events at each step', test_init='test if initialization passes', check_conn='re-check connectivity after event', criteria='use criteria to stop simulation if unstable', ddelta_limit='delta diff. limit to be considered unstable, in degree', g_scale='scale algebraic residuals with time step size', reset_tiny='reset tiny residuals to zero to avoid chattering', qrt='quasi-real-time stepping', kqrt='quasi-real-time scaling factor; kqrt > 1 means slowing down', store_z='store limiter status in TDS output', store_f='store RHS of diff. equations', store_h='store RHS of external diff. equations', store_i='store RHS of external algeb. equations', limit_store='limit in-memory timeseries storage', max_store='maximum steps of data stored in memory before offloading', save_every='save one step to memory every N simulation steps', save_mode='automatically or manually save output data when done', no_tqdm='disable tqdm progressbar', chatter_iter='minimum iterations to detect chattering', linesearch='nonmonotone backtracking line search', dtmax='maximum step size (0 = auto from freq and tspan)', dtmin_adapt='minimum step size for adaptive methods (0 = auto)', abstol='absolute tolerance for adaptive LTE estimation', reltol='relative tolerance for adaptive LTE estimation', ) self.config.add_extra("_alt", method=tuple(method_map.keys()), tol="float", t0=">=0", tf=">t0", fixt=(0, 1), shrinkt=(0, 1), honest=(0, 1), tstep='float', max_iter='>=10', refresh_event=(0, 1), test_init=(0, 1), check_conn=(0, 1), criteria=(0, 1), g_scale='positive', reset_tiny=(0, 1), qrt=(0, 1), kqrt='positive', store_z=(0, 1), store_f=(0, 1), store_h=(0, 1), store_i=(0, 1), limit_store=(0, 1), max_store='positive integer', save_every='integer', save_mode=('auto', 'manual'), no_tqdm=(0, 1), chatter_iter='int>=4', linesearch=(0, 1), dtmax='>=0', dtmin_adapt='>=0', abstol='float', reltol='float', ) # overwrite `tf` from command line if system.options.get('tf') is not None: self.config.tf = system.options.get('tf') if system.options.get('qrt') is True: self.config.qrt = system.options.get('qrt') if system.options.get('kqrt') is not None: self.config.kqrt = system.options.get('kqrt') # if data is from a CSV file instead of simulation self.from_csv = None self.data_csv = None self.k_csv = 0 # row number # to be computed self.deltat = 0 self.deltatmin = 0 self.deltatmin_adapt = 0 self.deltatmax = 0 self.h = 0 self.last_pc = 0.0 self.Teye = None self.qg = np.array([]) self.tol_zero = self.config.tol / 1e6 # internal status self.converged = False self.chatter = False self.last_converged = False # True if the previous step converged self.busted = False # True if in a non-recoverable error state self.err_msg = '' self.niter = 0 self._switch_idx = 0 # index into `System.switch_times` self._last_switch_t = -999 # the last critical time self.custom_event = False self._adaptive_nolte_steps = 0 self.mis = [1, 1] self.pbar = None self.callpert = None self.plotter = None self.plt = None self.initialized = False self.test_ok = None self.qrt_start = None self.headroom = 0.0 self.call_stats = list() # internal storage for iterations self.x0 = None self.y0 = None self.f0 = None self.xs = None self.ys = None self.Ac = None self.inc = None self.qndf_cache = None # set DAE solver self.method = Trapezoid() self.set_method(self.config.method)
def init(self): """ Initialize the status, storage and values for TDS. Returns ------- array-like The initial values of xy. """ t0, _ = elapsed() system = self.system if self.initialized: return system.dae.xy self.reset() self.set_method(self.config.method) self._load_pert() # restore power flow solutions system.dae.x[:len(system.PFlow.x_sol)] = system.PFlow.x_sol system.dae.y[:len(system.PFlow.y_sol)] = system.PFlow.y_sol system.dae.t -= system.dae.t # set `dae.t` to zero # Note: # calling `set_address` on `system.exist.pflow_tds` will point all variables # to the new array after extending `dae.y`. system.set_address(models=system.exist.pflow_tds) system.set_dae_names(models=system.exist.tds) system.set_output_subidx(models=system.exist.pflow_tds) system.store_no_check_init(models=system.exist.pflow_tds) # lightweight setup for init(): only _getters/_setters for vars_to_models system.store_getters(models=system.exist.pflow_tds) system.vars_to_models() system.init(system.exist.tds, routine='tds') system.dae_compactor.compact_dae() # propagate after compact_dae so that ue reflects replacement u=0 system.propagate_init_status() # full setup after compaction (sparse patterns, adders, antiwindups) system.dae.clear_ts() system.store_sparse_pattern(models=system.exist.pflow_tds) system.store_adder_setter(models=system.exist.pflow_tds) system.vars_to_models() self.fg_update(system.exist.tds, init=True) system.b_update(system.exist.pflow_tds) # reset diff. equation RHS for binding antiwindups for item in system.antiwindups: for key, _, eqval in item.x_set: np.put(system.dae.f, key, eqval) # only store switch times when not replaying CSV data if self.data_csv is None: system.store_switch_times(system.exist.tds) # Build mass matrix into `self.Teye` self.Teye = spdiag(system.dae.Tf.tolist()) self.qg = np.zeros(system.dae.n + system.dae.m) if self.qndf_cache is None and self.config.method == 'qndf': if system.dae.n == 0: raise RuntimeError( "QNDF requires at least one differential equation (dae.n > 0). " "Use 'trapezoid' or 'backeuler' for algebraic-only systems.") self.qndf_cache = QNDFCache( n=system.dae.n, abstol=self.config.abstol, reltol=self.config.reltol) self.initialized = True # populate delta addresses for stability criteria check if self.config.criteria and system.SynGen.n > 0: system.conn.check_connectivity(info=False) # test if residuals are close enough to zero if self.config.test_init: self.test_ok = self.test_init() # discard initialized values and use that from CSV if provided if self.data_csv is not None: self._csv_data_to_dae() # connect to data streaming server if system.streaming.dimec is None: system.streaming.connect() if system.runtime.dime_enabled: # send out system data using DiME self.streaming_init() self.streaming_step() # if `dae.n == 1`, `calc_h_first` depends on new `dae.gy` self.calc_h() # allocate for internal variables self.x0 = np.zeros_like(system.dae.x) self.y0 = np.zeros_like(system.dae.y) self.f0 = np.zeros_like(system.dae.f) self.xs = np.zeros_like(system.dae.x) self.ys = np.zeros_like(system.dae.y) # save post-init DAE state for reinit() self._x_t0 = system.dae.x.copy() self._y_t0 = system.dae.y.copy() # snapshot post-init model state for reinit() # Covers device status (u, ue), ConstServices (Fault.uf, vref0, paux0), # and NumParams that Alter +/-/*/÷ may modify during simulation for mdl in system.exist.pflow_tds.values(): if mdl.n > 0: mdl.snapshot_init() _, s1 = elapsed(t0) logger.info("Initialization for dynamics completed in %s.", s1) if self.test_ok is True: logger.info("Initialization was successful.") elif self.test_ok is False: logger.error("** Initialization FAILED **") logger.error("For detailed debugging:") logger.error(" andes -v 10 run -r tds --init %s", self.system.files.case) else: logger.warning("Initialization results were not verified.") if system.dae.n == 0: logger.info('No differential equation detected.') return system.dae.xy def summary(self): """ Print out a summary of TDS options to logger.info. Returns ------- None """ out = list() out.append('') out.append('-> Time Domain Simulation Summary:') if self.data_csv is not None: out.append(f'Loaded data from CSV file "{self.from_csv}".') out.append('Replaying from CSV data.') out.append(f'Replay time: {self.system.dae.t}-{self.config.tf} s.') else: out.append(f'Sparse Solver: {self.solver.sparselib.upper()}') out.append(f'Simulation time: {self.system.dae.t}-{self.config.tf} s.') if self.config.fixt == 1: msg = f'Fixed step size: h={1000 * self.config.tstep:.4g} ms.' msg += ' Shrink if not converged.' if self.config.shrinkt == 1 else '' out.append(msg) else: out.append(f'Variable step size: h0={1000 * self.config.tstep:.4g} ms.') out_str = '\n'.join(out) logger.info(out_str) if self.config.honest == 1: logger.warning("The honest Newton method is being used. It will slow down the simulation.") logger.warning("For speed up, set `honest=0` in TDS.config.") def init_resume(self): """ Initialize a resumed simulation. """ system = self.system dae = system.dae self.calc_h(resume=True) dae.t += self.h logger.debug("Resuming simulation: initial step size is h=%.4fs.", self.h) logger.debug("Resuming from t=%.4fs.", system.dae.t) def run(self, no_summary=False, from_csv=None, **kwargs): """ Run time-domain simulation using numerical integration. The default method is the Implicit Trapezoidal Method (ITM). """ system = self.system dae = self.system.dae config = self.config succeed = False if system.PFlow.converged is False: logger.warning('Power flow not solved. Simulation will not continue.') system.exit_code += 1 return succeed if no_summary is False and (system.dae.t == 0): self.summary() # load from csv if provided if from_csv is not None: self.from_csv = from_csv else: self.from_csv = system.options.get("from_csv") if self.from_csv is not None: self.data_csv = self._load_csv(self.from_csv) # only initializing at t<0 allows to continue when `run` is called again. if system.dae.t < 0: self.init() else: # resume simulation self.init_resume() if system.options.get("init") is True: logger.debug("Initialization only is requested and done") return self.initialized # Get appropriate tqdm based on environment (terminal, notebook, or batch) tqdm_cls, disable = get_tqdm(self.config.no_tqdm) pbar_kwargs = dict(total=100, unit='%', disable=disable) if tqdm_cls is not tqdm_nb: pbar_kwargs.update(ncols=80, ascii=True, file=sys.stdout) self.pbar = tqdm_cls(**pbar_kwargs) # set initial pbar percentage; also works for resumed simulation perc = round((dae.t - config.t0) / (config.tf - config.t0) * 100, 2) self.last_pc = perc self.pbar.update(perc) self.qrt_start = time.time() self.headroom = 0.0 # write variable list file at the beginning if not system.files.no_output: system.dae.write_lst(self.system.files.lst) t0, _ = elapsed() while (system.dae.t - self.h < self.config.tf) and (not self.busted): logger.debug("Start to integrate time step t=%g", system.dae.t) # call perturbation file if specified if self.callpert is not None: self.callpert(dae.t, system) step_status = False # call the stepping method of the integration method (or data replay) if self.data_csv is None: step_status = self.itm_step() # compute for the current step else: step_status = self._csv_step() # record number of iterations and success flag if system.runtime.save_stats: self.call_stats.append((system.dae.t.tolist(), self.niter, step_status)) if step_status: # evaluate observable variables after convergence system.b_update(system.exist.pflow_tds) if config.save_every != 0: if config.save_every == 1: dae.store() else: if dae.kcount % config.save_every == 0: dae.store() # offload if exceeds `max_store` if self.config.limit_store and len(dae.ts._ys) >= self.config.max_store: # write to file if enabled if not system.files.no_output: self.save_output() logger.info("Offload data from memory to file for t=%.2f - %.2f sec", dae.ts.t[0], dae.ts.t[-1]) # clear storage in memory anyway dae.ts.reset() self.streaming_step() if self.check_criteria() is False: self.err_msg = 'Violated stability criteria. To turn off, set [TDS].criteria = 0.' self.busted = True # check if the next step is critical time self.do_switch() self.calc_h() dae.t += self.h dae.kcount += 1 logger.debug("Next time step advanced to t=%g", dae.t) # show progress in percentage perc = max(min((dae.t - config.t0) / (config.tf - config.t0) * 100, 100), 0) perc = round(perc, 2) perc_diff = perc - self.last_pc if perc_diff >= 1: self.pbar.update(perc_diff) self.last_pc = self.last_pc + perc_diff # quasi-real-time check and wait (except for the last step) if config.qrt and self.h > 0: rt_end = self.qrt_start + self.h * config.kqrt # if the ending time has passed t_overrun = time.time() - rt_end if t_overrun > 0: logger.debug('Simulation over-run for %4.4g msec at t=%4.4g s.', 1000 * t_overrun, dae.t) else: self.headroom += (rt_end - time.time()) while time.time() - rt_end < 0: time.sleep(1e-4) self.qrt_start = time.time() else: logger.debug("Anticipated time step t=%g did not converge", system.dae.t) dae.t -= self.h self.calc_h() logger.debug("From t=%g, new step size h=%g ", system.dae.t, self.h) if self.h == 0: self.err_msg = "Time step reduced to zero. Convergence is not likely." self.busted = True break dae.t += self.h if self.busted: logger.error(self.err_msg) logger.error("Simulation terminated at t=%.4f s.", system.dae.t) system.exit_code += 1 elif system.dae.t == self.config.tf: succeed = True # success flag system.exit_code += 0 self.pbar.update(100 - self.last_pc) else: system.exit_code += 1 # removed `pbar` so that System object can be serialized self.pbar.close() self.pbar = None t1, s1 = elapsed(t0) self.exec_time = t1 - t0 logger.info('Simulation to t=%.2f sec completed in %s.', config.tf, s1) if config.qrt: logger.debug('QRT headroom time: %.4g s.', self.headroom) # in case of resumed simulations, # manually unpack data to update arrays in `dae.ts` # disable warning in case data has just been dumped system.dae.ts.unpack(warn_empty=False) if (not system.files.no_output) and (config.save_mode == 'auto'): t0, _ = elapsed() self.save_output() _, s1 = elapsed(t0) np_file = self.system.files.npz logger.info('Outputs to "%s" and "%s".', self.system.files.lst, np_file) logger.info('Outputs written in %s.', s1) # end data streaming if system.runtime.dime_enabled: system.streaming.finalize() # load data into `TDS.plotter` in a notebook or in an interactive mode if is_notebook() or is_interactive(): self.load_plotter() return succeed def itm_step(self): """ Integrate one step of size ``self.h`` using the configured DAE method. Delegates to ``self.method.step(self)`` where ``self.method`` is set by ``set_method()``. Returns ------- bool Convergence status in ``self.converged``. """ return self.method.step(self) def _csv_step(self): """ Fetch data for the next step from ``data_csv``. When `Output` exists, the target variables `x` and `y` are filled with the data, while the remaining variables are set to zero. """ self._csv_data_to_dae() self.converged = True return self.converged def calc_h(self, resume=False): """ Calculate the time step size during the TDS. Parameters ---------- resume : bool If True, calculate the initial step size. Notes ----- Step-size control is delegated to ``self.method.calc_h()``. The default (``ImplicitIter.calc_h``) applies a niter-based heuristic. Adaptive methods (``TrapezoidAdaptive``, ``QNDF``) set ``deltat`` inside their ``step()`` and only check the bust condition here. Returns ------- float computed time step size stored in ``self.h`` """ system = self.system config = self.config # t=0, first iteration (not previously failed), or resumed simulation if (system.dae.t == 0 and self.niter == 0) or resume: self.deltat = self._calc_h_first() else: self.method.calc_h(self) self.h = self.deltat # do not skip over the end time self.h = max(min(self.h, config.tf - system.dae.t), 0) # skip the first switch at the exact first time step to avoid h == 0 if self._switch_idx < system.n_switches: if (not resume) and (system.dae.t == system.switch_times[self._switch_idx]): self._switch_idx += 1 # do not skip over event switch_times if self._switch_idx < system.n_switches: if (system.dae.t + self.h) > system.switch_times[self._switch_idx]: self.h = system.switch_times[self._switch_idx] - system.dae.t if self.data_csv is not None: if self.k_csv + 1 < self.data_csv.shape[0]: self.k_csv += 1 self.h = self.data_csv[self.k_csv, 0] - system.dae.t else: self.h = 0 logger.debug("Calculated TDS.h = %g", self.h) return self.h def _calc_h_first(self): """ Compute the first time step and save to ``self.deltat`` and return it. """ system = self.system config = self.config if not system.dae.n: freq = 1.0 elif system.dae.n == 1: B = matrix(system.dae.gx) self.solver.linsolve(system.dae.gy, B) As = system.dae.fx - system.dae.fy * B freq = max(abs(As[0, 0]), 1) else: freq = 30.0 if freq > system.config.freq: freq = float(system.config.freq) tspan = abs(config.tf - config.t0) tcycle = 1 / freq self.deltatmax = min(tcycle, tspan / 100.0) self.deltat = min(tcycle, tspan / 100.0) self.deltatmin = min(tcycle / 500, self.deltatmax / 20) if config.tstep <= 0: logger.warning('Fixed time step must be positive, current value is %g', config.tstep) logger.warning('Switching to automatic time steping') config.fixt = False if config.fixt: self.deltat = config.tstep if config.tstep < self.deltatmin: logger.warning('Fixed time step is smaller than the estimated minimum.') if config.tstep > self.deltatmax: logger.debug('Increased deltatmax to tstep=%g.', config.tstep) self.deltatmax = config.tstep # Explicit dtmax overrides the auto-computed deltatmax if config.dtmax > 0: self.deltatmax = config.dtmax self.deltat = min(self.deltat, self.deltatmax) self.deltatmin = min(self.deltatmin, self.deltatmax / 20) # Adaptive-only minimum floor. Default is looser than `deltatmin` # so adaptive controllers can survive sharp event transients. if config.dtmin_adapt > 0: self.deltatmin_adapt = min(config.dtmin_adapt, self.deltatmax) else: self.deltatmin_adapt = min(max(self.deltatmin * 1e-6, 1e-12), self.deltatmax) # if from CSV, determine `deltat` from data if self.data_csv is not None: if self.data_csv.shape[0] > 1: self.deltat = self.data_csv[1, 0] - self.data_csv[0, 0] else: logger.warning("CSV file only contains data for one time step.") self.deltat = 0 return self.deltat def load_plotter(self): """ Manually load a plotter into ``TDS.plotter``. """ from andes.plot import TDSData # NOQA self.plotter = TDSData(mode='memory', dae=self.system.dae) self.plt = self.plotter def plot(self, *args, **kwargs): """ Shorthand for ``TDS.plt.plot(...)``. Loads the plotter on first use if not already loaded. All arguments are passed through to :py:meth:`TDSData.plot`. """ if self.plt is None: self.load_plotter() return self.plt.plot(*args, **kwargs) def test_init(self): """ Test if the TDS initialization is successful. This function update ``dae.f`` and ``dae.g`` and checks if the residuals are zeros. """ system = self.system # fg_update is called in TDS.init() system.j_update(models=system.exist.pflow_tds) # reset diff. RHS where `check_init == False` system.dae.f[system.no_check_init] = 0.0 if np.max(np.abs(system.dae.fg)) < self.config.tol: logger.debug('Initialization tests passed.') return True # otherwise, show initialization error diagnostics # --- Cause: variables clamped at limits --- limit_rows = [] for model in system.exist.pflow_tds.values(): for item in model.discrete.values(): limit_rows.extend(item.get_limit_report()) if limit_rows: lim_tab = Tab( title='Variables clamped at limits during initialization', header=['Model', 'Idx', 'Variable', 'Limit (param)', 'Limit Value', 'Unconstr. Value'], data=[[r['model'], r['idx'], r['var'], r['limit_name'], f"{r['limit_val']:.6g}", f"{r['unconstr']:.6g}"] for r in limit_rows], ) logger.error(lim_tab.draw()) logger.error('Clamped variables may cause the equation mismatches below.') # --- Effect: equations with nonzero residuals --- fail_idx = np.ravel(np.where(abs(system.dae.fg) >= self.config.tol)) nan_idx = np.ravel(np.where(np.isnan(system.dae.fg))) bad_idx = np.hstack([fail_idx, nan_idx]) fail_names = [system.dae.xy_name[int(i)] for i in fail_idx] nan_names = [system.dae.xy_name[int(i)] for i in nan_idx] bad_names = fail_names + nan_names bad_estr = [] n = system.dae.n for i in bad_idx: ii = int(i) if ii < n: entry = system.dae.x_map.get(ii) else: entry = system.dae.y_map.get(ii - n) bad_estr.append(entry[1].e_str if entry and entry[1].e_str else '') err_tab = Tab( title='** Equations with nonzero residuals **', header=['Name', 'Eqn. Mismatch', 'Equation (0=)'], data=list(map(list, zip(bad_names, system.dae.fg[bad_idx], bad_estr))), ) logger.error(err_tab.draw()) if system.options.get('verbose') == 1: breakpoint() system.exit_code += 1 return False def save_output(self, npz=True): """ Save the simulation data into two files: a `.lst` file and a `.npz` file. This function saves the output regardless of the `files.no_output` flag. Parameters ---------- npz : bool True to save in npz format; False to save in npy format. Returns ------- bool True if files are written. False otherwise. """ if npz is True: self.system.dae.write_npz(self.system.files.npz) else: self.system.dae.write_npy(self.system.files.npy) self.system.dae.ts.idx_ptr = len(self.system.dae.ts.t) return True def do_switch(self): """ Checks if is an event time and perform switch if true. """ ret = False system = self.system # refresh switch times if enabled if self.config.refresh_event: system.store_switch_times(system.exist.pflow_tds) # if not all events have been processed if self._switch_idx < system.n_switches: # if the current time is close enough to the next event time if np.equal(system.dae.t, system.switch_times[self._switch_idx]): # `_last_switch_t` is used by the Jacobian updater self._last_switch_t = system.switch_times[self._switch_idx] # only call `switch_action` on the models that defined the time system.switch_action(system.switch_dict[self._last_switch_t]) # progress `_switch_idx` to avoid calling the same event if time gets stuck self._switch_idx += 1 system.vars_to_models() ret = True # if a `custom_event` flag is set (without a specific callback) if self.custom_event is True: system.switch_action(system.exist.pflow_tds) self._last_switch_t = system.dae.t.tolist() system.vars_to_models() self.custom_event = False ret = True # check system connectivity if topology changed during switching if self.config.check_conn == 1 and system.conn._dirty: system.conn.check_connectivity(info=False) if ret and self.qndf_cache is not None: self.qndf_cache.reset_for_event(system.dae.x[:system.dae.n]) # Method-specific no-LTE restart window after discontinuities. if ret and self.method.nolte_event_steps > 0: self._adaptive_nolte_steps = max(self._adaptive_nolte_steps, self.method.nolte_event_steps) return ret def fg_update(self, models, init=False): """ Perform one round of evaluation for one iteration step. The following operations are performed in order: - variable service updating through ``s_update_var`` - discrete flags updating through ``l_update_var`` - evaluation of the right-hand-side of ``f`` - equation-dependent discrete flags updating through ``l_update_eq`` - evaluation of the right-hand-side of ``g`` - collection of residuals into dae through ``fg_to_dae``. """ system = self.system system.dae.clear_fg() system.s_update_var(models=models) # update VarService system.l_update_var(models=models, niter=self.niter, err=self.mis[-1], ) # evalute the RHS of `f` and check the limiters (anti-windup) # 12/08/2020: Moved `l_update_eq` to before `g_update` # because some algebraic variables depend on pegged states. system.f_update(models=models) system.l_update_eq(models=models, init=init, niter=self.niter) system.g_update(models=models) system.fg_to_dae() def _fg_wrapper(self, xy): """ Wrapper function for equations. Callable by general-purpose DAE solvers. Parameters ---------- xy : np.ndarray Input values for evaluating equations. Returns ------- np.ndarray RHS of diff. and algeb. equations. """ system = self.system system.dae.x[:] = xy[:system.dae.n] system.dae.y[:] = xy[system.dae.n:] system.vars_to_models() self.fg_update(system.exist.pflow_tds) return system.dae.fg def _load_pert(self): """ Load perturbation files to ``self.callpert``. """ system = self.system if not system.files.pert: return False if not os.path.isfile(system.files.pert): logger.warning('Pert file not found at "%s".', system.files.pert) return False pert_path, full_name = os.path.split(system.files.pert) logger.debug('Pert file "%s" located at path %s', full_name, pert_path) sys.path.append(pert_path) name, _ = os.path.splitext(full_name) module = importlib.import_module(name) self.callpert = getattr(module, 'pert') logger.info('Perturbation file "%s" loaded.', system.files.pert) return True def _load_csv(self, csv_file): """ Load simulation data from CSV file and return a numpy array. Parameters ---------- csv_file : str Path to the CSV file. """ if csv_file is None: return None df = pd.read_csv(csv_file) if df.isnull().values.any(): raise ValueError("CSV file contains missing values. Please check data consistency.") data = df.to_numpy() if data.ndim != 2: raise ValueError("Data from CSV is not 2-dimensional (time versus variable)") if data.shape[0] < 2: logger.warning("CSV data does not contain more than one time step.") system = self.system if data.shape[1] < (system.dae.m + system.dae.n): if system.Output.n < 1: logger.warning("CSV data contains fewer variables than required.") logger.warning("Check if the CSV data file is generated from the test case.") else: logger.info("Output selection detected.") # NOTE: data has first column as time, so the rest should be `n+m` from `Output` if data.shape[1] - 1 < (len(system.Output.xidx + system.Output.yidx)): logger.warning("CSV data contains fewer variables than required.") logger.warning("Check if the CSV data file is generated with selected output.") # set start and end times from data self.config.t0 = data[0, 0] self.config.tf = data[-1, 0] return data def _debug_g(self, y_idx): """ Print out the associated variables with the given algebraic equation index. Parameters ---------- y_idx Index of the equation into the `g` array. Diff. eqns. are not counted in. """ y_idx = y_idx.tolist() logger.debug('--> Iteration Number: niter = %d', self.niter) logger.debug('Max. algebraic equation mismatch:') logger.debug(' <%s> [y_idx=%d]', self.system.dae.y_name[y_idx], y_idx) logger.debug(' Variable value = %.8f', self.system.dae.y[y_idx]) logger.debug(' Mismatch value = %.8f', self.system.dae.g[y_idx]) assoc_vars = self.system.dae.gy[y_idx, :] vars_idx = np.where(np.ravel(matrix(assoc_vars)))[0] logger.debug('Related variable values:') logger.debug(f'{"y_index":<10} {"Variable":<20} {"Derivative":<20}') for v in vars_idx: v = v.tolist() logger.debug('%10d %20s %20g', v, self.system.dae.y_name[v], assoc_vars[v]) def _debug_ac(self, xy_idx): """ Debug Ac matrix by printing out equations and derivatives associated with the max. mismatch variable. Parameters ---------- xy_idx Index of the maximum mismatch into the `xy` array. """ xy_idx = xy_idx.tolist() assoc_eqns = self.Ac[:, xy_idx] assoc_vars = self.Ac[xy_idx, :] eqns_idx = np.where(np.ravel(matrix(assoc_eqns)))[0] vars_idx = np.where(np.ravel(matrix(assoc_vars)))[0] logger.debug('Max. correction=%.4f for variable %s [%d]', self.inc[xy_idx], self.system.dae.xy_name[xy_idx], xy_idx) logger.debug('Associated equation RHS is %20g', self.system.dae.fg[xy_idx]) logger.debug('') logger.debug('Related Jacobian elements:') logger.debug(f'{"y_index":<10} {"Variable":<20} {"Derivative":<20}') logger.debug(f'{"xy_index":<10} {"Equation (row)":<20} {"Derivative":<20} {"Eq. Mismatch":<20}') for eq in eqns_idx: eq = eq.tolist() logger.debug(f'{eq:<10} {self.system.dae.xy_name[eq]:<20} {assoc_eqns[eq]:<20g} ' f'{self.system.dae.fg[eq]:<20g}') logger.debug('') logger.debug(f'{"xy_index":<10} {"Variable (col)":<20} {"Derivative":<20} {"Eq. Mismatch":<20}') for v in vars_idx: v = v.tolist() logger.debug(f'{v:<10} {self.system.dae.xy_name[v]:<20} {assoc_vars[v]:<20g} ' f'{self.system.dae.fg[v]:<20g}') def _reset_integration(self): """ Reset integration state that changes during ``TDS.run()``. Preserves infrastructure that is invariant between episodes (``Teye``, ``qg``, sparse patterns, addresses). Called by both :meth:`reset` and :meth:`reinit`. """ # step-size control self.deltat = 0 self.deltatmin = 0 self.deltatmin_adapt = 0 self.deltatmax = 0 self.h = 0 self.last_pc = 0.0 # convergence / error tracking self.converged = False self.last_converged = False self.busted = False self.chatter = False self.err_msg = '' self.niter = 0 self.mis = [1, 1] # event scheduling self._switch_idx = 0 # index into `System.switch_times` self._last_switch_t = -999 # the last event time self.custom_event = False self._adaptive_nolte_steps = 0 # statistics self.call_stats = list() # predictor / scratch arrays (None before first init) if self.x0 is not None: self.x0[:] = 0 if self.y0 is not None: self.y0[:] = 0 if self.f0 is not None: self.f0[:] = 0 if self.xs is not None: self.xs[:] = 0 if self.ys is not None: self.ys[:] = 0 # QNDF integration history (uses existing reset_for_event API) if self.qndf_cache is not None: self.qndf_cache.reset_for_event(self.system.dae.x) def reset(self): """ Reset internal states to pre-init condition. """ self._reset_integration() # infrastructure teardown (not needed for reinit) self.Teye = None self.qg = np.array([]) self.system.dae.t = np.array(0.0) self.pbar = None self.plotter = None self.plt = None # short name for `plotter` self.qndf_cache = None self.initialized = False def reinit(self): """ Reset TDS to post-init state for a new simulation episode. Re-derives all dependent state (discrete flags, residuals, antiwindup bindings) from saved initial ``(x, y)`` via idempotent evaluation. Invariants (addresses, sparse patterns, ``Teye``) are preserved. This is designed for RL training loops where ``TDS.run()`` is called repeatedly with different disturbances or control actions:: ss = andes.load('case.raw') ss.PFlow.run() ss.TDS.init() for episode in range(100_000): ss.TDS.reinit() ss.Alter.amount.v[0] = random() ss.TDS.config.tf = 20.0 ss.TDS.run(no_summary=True) Restores: DAE state ``(x, y)``, device status ``(u, ue)``, ConstService values (setpoints, Fault flags), NumParam values (including those modified by ``Alter``), and observable variables. """ if not self.initialized: raise RuntimeError("Call TDS.init() before reinit()") system = self.system dae = system.dae # 1. Restore the 2 essential arrays dae.x[:] = self._x_t0 dae.y[:] = self._y_t0 dae.set_t(0.0) dae.kcount = 0 dae.clear_fg() # 2. Clear time series dae.ts.reset() # --- Restore model state (device status, services, params) --- for mdl in system.exist.pflow_tds.values(): if mdl.n > 0: mdl.restore_init() # --- Re-derive all dependent state --- system.vars_to_models() self._reset_integration() self.fg_update(system.exist.tds, init=True) for item in system.antiwindups: for key, _, eqval in item.x_set: np.put(dae.f, key, eqval) system.b_update(system.exist.pflow_tds) self.calc_h() # --- Episode bookkeeping --- system.exit_code = 0 def rewind(self, t): """ TODO: rewind to a past time. """ raise NotImplementedError("TDS.rewind() not implemented") def streaming_init(self): """ Send out initialization variables and process init from modules. Returns ------- None """ system = self.system if system.runtime.dime_enabled: system.streaming.send_init(recepient='all') logger.info('Broadcast system data. Waiting to receive modules init info...') time.sleep(0.5) system.streaming.sync_and_handle() def streaming_step(self): """ Sync, handle and streaming for each integration step. Returns ------- None """ system = self.system if system.runtime.dime_enabled: system.streaming.sync_and_handle() system.streaming.vars_to_modules() system.streaming.vars_to_pmu() def set_method(self, name: str = 'trapezoid'): """ Set DAE solution method. Parameters ---------- name : str, optional, default: 'trapezoid' DAE solver name """ if name not in method_map: logger.error('"%s" is not a registered dae method name. ' 'Falling back to trapezoid.', name) name = 'trapezoid' self.method = method_map[name]() self.config.method = name if self.method.requires_variable_step and self.config.fixt: logger.info('"%s" requires variable step size; setting fixt=0.', name) self.config.fixt = 0 if name != 'qndf': self.qndf_cache = None def check_criteria(self): """ Check stability criteria. """ res = deltadelta(self.system.dae.x[self.system.SynGen.delta_addr], self.config.ddelta_limit) return res def get_timeseries(self, var): """ Return time-series data for a variable as a DataFrame. Dispatches automatically based on the variable type (State/ExtState use ``dae.ts.x``; Algeb/ExtAlgeb use ``dae.ts.y``). Parameters ---------- var : BaseVar A variable instance, e.g., ``ss.GENROU.omega`` or ``ss.Bus.v``. Returns ------- pandas.DataFrame DataFrame with time as index and device idx values as columns. """ from andes.core.var import BaseVar if not isinstance(var, BaseVar): raise TypeError( f"Expected a variable (e.g., ss.GENROU.omega), got {type(var).__name__}." ) ts = self.system.dae.ts if ts.t is None or len(ts.t) == 0: raise ValueError("No time-series data. Run TDS first.") data_matrix = getattr(ts, var.v_code) data = data_matrix[:, var.a] if len(var.a) == len(var.owner.idx.v): columns = list(var.owner.idx.v) else: columns = list(range(len(var.a))) return pd.DataFrame(data, index=ts.t, columns=columns) def _csv_data_to_dae(self): """ Helper function to fetch data when replaying from CSV file. When loading from a partial CSV file, the recorded data is loaded while the rest of the variables remain to be initial values. """ system = self.system if system.Output.n < 1: system.dae.x[:] = self.data_csv[self.k_csv, 1:system.dae.n + 1] system.dae.y[:] = self.data_csv[self.k_csv, system.dae.n + 1:system.dae.n + system.dae.m + 1] else: xidx = system.Output.xidx system.dae.x[xidx] = self.data_csv[self.k_csv, 1:len(xidx) + 1] yidx = system.Output.yidx system.dae.y[yidx] = self.data_csv[self.k_csv, len(xidx) + 1:len(xidx) + len(yidx) + 1] system.vars_to_models()