Source code for andes.system.facade

"""
System class for power system data and methods.
"""

#  [ANDES] (C)2015-2026 Hantao Cui
#
#  This program is free software; you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation; either version 3 of the License, or
#  (at your option) any later version.
#
#  File name: system/facade.py

import logging
import warnings
from collections import OrderedDict, defaultdict
from typing import Dict, Optional, Tuple, Union

import andes.io
from andes.core import AntiWindup, Model, ConnMan
from andes.core.service import BackRef
from andes.io.streaming import Streaming
from andes.shared import NCPUS, jac_names, np, spmatrix
from andes.system.codegen import CodegenManager
from andes.system.config_runtime import SystemConfigRuntime
from andes.system.dae_compactor import DAECompactor
from andes.system.helpers import _set_hi_name, _set_xy_name, _set_z_name
from andes.system.registry import RegistryLoader
from andes.utils.misc import elapsed
from andes.utils.tab import Tab
from andes.variables import DAE, FileMan

logger = logging.getLogger(__name__)


[docs]class ExistingModels: """ Storage class for existing models """
[docs] def __init__(self): self.pflow = OrderedDict() self.tds = OrderedDict() # if a model needs to be initialized before TDS, set `flags.tds = True` self.pflow_tds = OrderedDict() self.ybus = OrderedDict()
[docs]class System: """ System contains models and routines for modeling and simulation. System contains a several special `OrderedDict` member attributes for housekeeping. These attributes include `models`, `groups`, `routines` and `calls` for loaded models, groups, analysis routines, and generated numerical function calls, respectively. Parameters ---------- no_undill : bool, optional, default=False True to disable the call to ``System.undill()`` at the end of object creation. False by default. autogen_stale : bool, optional, default=True True to automatically generate code for stale models. Notes ----- System stores model and routine instances as attributes. Model and routine attribute names are the same as their class names. For example, `Bus` is stored at ``system.Bus``, the power flow calculation routine is at ``system.PFlow``, and the numerical DAE instance is at ``system.dae``. See attributes for the list of attributes. Attributes ---------- dae : andes.variables.dae.DAE Numerical DAE storage files : andes.variables.fileman.FileMan File path storage config : andes.core.Config System config storage models : OrderedDict model name and instance pairs groups : OrderedDict group name and instance pairs routines : OrderedDict routine name and instance pairs """
[docs] def __init__(self, case: Optional[str] = None, name: Optional[str] = None, config: Optional[Dict] = None, config_path: Optional[str] = None, default_config: Optional[bool] = False, options: Optional[Dict] = None, no_undill: Optional[bool] = False, autogen_stale: Optional[bool] = True, **kwargs ): self.name = name self.options = {} if options is not None: self.options.update(options) if kwargs: self.options.update(kwargs) self.calls = OrderedDict() # a dictionary with model names (keys) and their ``calls`` instance self.models = OrderedDict() # model names and instances self.model_aliases = OrderedDict() # alias: model instance self.groups = OrderedDict() # group names and instances self.routines = OrderedDict() # routine names and instances self.switch_times = np.array([]) # an array of ordered event switching times self.switch_dict = OrderedDict() # time: OrderedDict of associated models self.with_calls = False # if generated function calls have been loaded self.n_switches = 0 # number of elements in `self.switch_times` self.exit_code = 0 # command-line exit code, 0 - normal, others - error. # --- Config resolution (phased) --- # Precedence: defaults < andes.rc < file _config < CLI config_option self.config_runtime = SystemConfigRuntime(self) self.config_runtime.load_rc(config_path=config_path, default_config=default_config) self.files = FileMan(case=case, **self.options) self.config_runtime.merge_file_config(self.files) self.config_runtime.apply_cli_overrides() self.config_runtime.finalize(config=config) # --- Managers --- self._init_managers() # --- Models, routines, and codegen --- self._init_models(no_undill=no_undill, autogen_stale=autogen_stale)
def _init_managers(self): """ Create internal manager objects (DAE, connectivity, registry, etc.). """ self.exist = ExistingModels() self.dae = DAE(system=self) self.streaming = Streaming(self) self.conn = ConnMan(system=self) self.registry = RegistryLoader(self) self.codegen = CodegenManager(self) self.dae_compactor = DAECompactor(self) def _init_models(self, no_undill=False, autogen_stale=True): """ Import groups, models, and routines from the registry, then optionally load generated code (undill). """ self.registry.load_all() self._getters = dict(f=list(), g=list(), x=list(), y=list()) self._adders = dict(f=list(), g=list(), x=list(), y=list()) self._setters = dict(f=list(), g=list(), x=list(), y=list()) self.antiwindups = list() self.no_check_init = list() self.call_stats = defaultdict(dict) # status propagation graph (built during setup) self._status_children = {} self._status_parent_map = {} self.is_setup = False if not no_undill: self.undill(autogen_stale=autogen_stale) def _update_config_object(self): """ Change config on the fly based on command-line options. .. deprecated:: 2.0 Use ``system.config_runtime.apply_cli_overrides()`` instead. This method will be removed in v3.0. """ if hasattr(self, 'config_runtime'): return self.config_runtime.apply_cli_overrides() # Fallback for subclasses that do not call super().__init__(). warnings.warn( "System._update_config_object() is deprecated and will be removed " "in v3.0. Subclasses should call super().__init__() and use " "system.config_runtime.apply_cli_overrides().", FutureWarning, stacklevel=2, ) SystemConfigRuntime(self).apply_cli_overrides() def reload(self, case, **kwargs): """ Reload a new case in the same System object. """ self.options.update(kwargs) self.files.set(case=case, **kwargs) # TODO: clear all flags and empty data andes.io.parse(self) self.setup() def _clear_adder_setter(self): """ Clear adders and setters storage """ self._getters = dict(f=list(), g=list(), x=list(), y=list()) self._adders = dict(f=list(), g=list(), x=list(), y=list()) self._setters = dict(f=list(), g=list(), x=list(), y=list()) def prepare(self, quick=False, incremental=False, models=None, nomp=False, ncpu=NCPUS): """ Delegate to :class:`andes.system.codegen.CodegenManager`. """ return self.codegen.prepare(quick=quick, incremental=incremental, models=models, nomp=nomp, ncpu=ncpu) def _mp_prepare(self, models, quick, pycode_path, ncpu): """ Deprecated wrapper to :class:`andes.system.codegen.CodegenManager`. """ warnings.warn( "System._mp_prepare() is deprecated and will be removed in v3.0. " "Use system.codegen._mp_prepare() instead.", DeprecationWarning, stacklevel=2, ) return self.codegen._mp_prepare(models=models, quick=quick, pycode_path=pycode_path, ncpu=ncpu) def _finalize_pycode(self, pycode_path): """ Deprecated wrapper to :class:`andes.system.codegen.CodegenManager`. """ warnings.warn( "System._finalize_pycode() is deprecated and will be removed in v3.0. " "Use system.codegen._finalize_pycode() instead.", DeprecationWarning, stacklevel=2, ) return self.codegen._finalize_pycode(pycode_path) def _find_stale_models(self): """ Deprecated wrapper to :class:`andes.system.codegen.CodegenManager`. """ warnings.warn( "System._find_stale_models() is deprecated and will be removed in v3.0. " "Use system.codegen._find_stale_models() instead.", DeprecationWarning, stacklevel=2, ) return self.codegen._find_stale_models() def _to_orddct(self, model_list): """ Helper function to convert a list of model names to OrderedDict with name as keys and model instances as values. """ if isinstance(model_list, OrderedDict): return model_list if isinstance(model_list, list): out = OrderedDict() for name in model_list: if name not in self.models: logger.error("Model <%s> does not exist. Check your inputs.", name) continue out[name] = self.models[name] return out else: raise TypeError("Type %s not recognized" % type(model_list)) def setup(self): """ Set up system for studies. This function is to be called after adding all device data. """ ret = True t0, _ = elapsed() if self.is_setup: logger.warning( 'System has been setup. Calling setup() twice is not allowed. ' 'To add devices, reload the case with setup=False, add devices, then call setup().' ) ret = False return ret self.collect_ref() self._build_status_graph() self._list2array() # `list2array` must come before `link_ext_param` if not self.link_ext_param(): ret = False self.find_devices() # find or add required devices self._report_param_corrections() self._check_setpoints() # === no device addition or removal after this point === self.calc_pu_coeff() # calculate parameters in system per units self.store_existing() # store models with routine flags # assign address at the end before adding devices and processing parameters self.set_address(self.exist.pflow) self.set_dae_names(self.exist.pflow) # needs perf. optimization self.store_sparse_pattern(self.exist.pflow) self.store_adder_setter(self.exist.pflow) # propagate bus/parent status to ue before first connectivity check self.propagate_init_status() self.conn.check_connectivity() if ret is True: self.is_setup = True # set `is_setup` if no error occurred else: logger.error("System setup failed. Please resolve the reported issue(s).") self.exit_code += 1 _, s = elapsed(t0) logger.info('System internal structure set up in %s.', s) return ret def store_existing(self): """ Store existing models in `System.existing`. TODO: Models with `TimerParam` will need to be stored anyway. This will allow adding switches on the fly. """ self.exist.pflow = self.find_models('pflow') self.exist.tds = self.find_models('tds') self.exist.pflow_tds = self.find_models(('tds', 'pflow')) self.exist.ybus = self.find_models('ybus') def reset(self, force=False): """ Reset to the state after reading data and setup (before power flow). Warnings -------- If TDS is initialized, reset will lead to unpredictable state. """ if self.TDS.initialized is True and not force: logger.error('Reset failed because TDS is initialized. \nPlease reload the test case to start over.') return self.dae.reset() self.call_models('a_reset', models=self.models) self.e_clear(models=self.models) self._p_restore() self.is_setup = False self.setup() def add(self, model_name, param_dict=None, **kwargs): """ Add a device instance for an existing model. This method calls the ``add`` method of the model and registers the device `idx` to its group. Parameters can be passed as a dictionary, as keyword arguments, or both. When both are provided, keyword arguments are merged into the dictionary (kwargs take precedence on conflicts). Parameters ---------- model_name : str Name of the model (e.g., ``'Fault'``, ``'Toggle'``, ``'PQ'``). param_dict : dict, optional Dictionary of parameter names to values. **kwargs Parameter names and values as keyword arguments. Returns ------- idx The assigned device index. Examples -------- Keyword arguments are the preferred style:: ss.add('Fault', bus=5, tf=1.0, tc=1.1) Models with a ``model`` parameter (e.g., ``Alter``, ``Toggle``) can now use keyword arguments directly:: ss.add('Alter', model='TGOV1', dev=1, src='paux0', t=1.0, method='=', amount=0.05) ss.add('Toggle', model='Line', dev='Line_5', t=1.0) """ if model_name not in self.models and (model_name not in self.model_aliases): logger.warning("<%s> is not an existing model.", model_name) return if self.is_setup: raise NotImplementedError( "Adding devices after setup() is not supported. " "To add devices, reload the case with setup=False, add devices, then call setup()." ) group_name = self.__dict__[model_name].group group = self.groups[group_name] if param_dict is None: param_dict = {} if kwargs is not None: param_dict.update(kwargs) # remove `uid` field param_dict.pop('uid', None) idx = param_dict.pop('idx', None) if idx is not None and (not isinstance(idx, str) and np.isnan(idx)): idx = None idx = group.get_next_idx(idx=idx, model_name=model_name) self.__dict__[model_name].add(idx=idx, **param_dict) group.add(idx=idx, model=self.__dict__[model_name]) return idx def find_devices(self): """ Add dependent devices for all model based on `DeviceFinder`. """ for mdl in self.models.values(): if len(mdl.services_fnd) == 0: continue for fnd in mdl.services_fnd.values(): fnd.find_or_add(self) def set_address(self, models): """ Set addresses for differential and algebraic variables. """ # --- Phase 1: set internal variable addresses --- for mdl in models.values(): if mdl.flags.address is True: logger.debug('%s internal address exists', mdl.class_name) continue if mdl.n == 0: continue logger.debug('Setting internal address for %s', mdl.class_name) collate = mdl.flags.collate ndevice = mdl.n # get and set internal variable addresses xaddr = self.dae.request_address('x', ndevice=ndevice, nvar=len(mdl.states), collate=mdl.flags.collate, ) yaddr = self.dae.request_address('y', ndevice=ndevice, nvar=len(mdl.algebs), collate=mdl.flags.collate, ) for idx, item in enumerate(mdl.states.values()): item.set_address(xaddr[idx], contiguous=not collate) for idx, item in enumerate(mdl.algebs.values()): item.set_address(yaddr[idx], contiguous=not collate) # observable variable addresses (in dae.b) if len(mdl.observables) > 0: baddr = self.dae.request_address('b', ndevice=ndevice, nvar=len(mdl.observables), collate=mdl.flags.collate, ) for idx, item in enumerate(mdl.observables.values()): item.set_address(baddr[idx], contiguous=not collate) # --- Phase 2: set external variable addresses --- # NOTE: # This step will retrieve the number of variables (item.n) for Phase 3. for mdl in models.values(): # handle external groups for instance in mdl.cache.vars_ext.values(): ext_name = instance.model try: ext_model = self.__dict__[ext_name] except KeyError: raise KeyError('<%s> is not a model or group name.' % ext_name) try: instance.link_external(ext_model) except (IndexError, KeyError) as e: logger.error('Error: <%s> cannot retrieve <%s> from <%s> using <%s>:\n %s', mdl.class_name, instance.name, instance.model, instance.indexer.name, repr(e)) # --- Phase 3: set external variable RHS addresses --- for mdl in models.values(): if mdl.flags.address is True: logger.debug('%s RHS address exists', mdl.class_name) continue if mdl.n == 0: continue for item in mdl.states_ext.values(): # skip if no equation, i.e., no RHS value if item.e_str is None: continue item.set_address(np.arange(self.dae.p, self.dae.p + item.n)) self.dae.p += item.n for item in mdl.algebs_ext.values(): if item.e_str is None: continue item.set_address(np.arange(self.dae.q, self.dae.q + item.n)) self.dae.q += item.n mdl.flags.address = True # allocate memory for DAE arrays self.dae.resize_arrays() # set `v` and `e` in variables self.set_var_arrays(models=models) self.dae.alloc_or_extend_names() def set_dae_names(self, models): """ Set variable names for differential and algebraic variables, right-hand side of external equations, and discrete flags. """ for mdl in models.values(): _set_xy_name(mdl, mdl.states, (self.dae.x_name, self.dae.x_tex_name)) _set_xy_name(mdl, mdl.algebs, (self.dae.y_name, self.dae.y_tex_name)) _set_hi_name(mdl, mdl.states_ext, (self.dae.h_name, self.dae.h_tex_name)) _set_hi_name(mdl, mdl.algebs_ext, (self.dae.i_name, self.dae.i_tex_name)) _set_xy_name(mdl, mdl.observables, (self.dae.b_name, self.dae.b_tex_name)) # build reverse map: addr → (model, var) for var in mdl.states.values(): for addr in var.a: self.dae.x_map[int(addr)] = (mdl, var) for var in mdl.algebs.values(): for addr in var.a: self.dae.y_map[int(addr)] = (mdl, var) # add discrete flag names if self.TDS.config.store_z == 1: _set_z_name(mdl, self.dae, (self.dae.z_name, self.dae.z_tex_name)) def set_var_arrays(self, models, inplace=True, alloc=True): """ Set arrays (`v` and `e`) for internal variables to access dae arrays in place. This function needs to be called after de-serializing a System object, where the internal variables are incorrectly assigned new memory. Parameters ---------- models : OrderedDict, list, Model, optional Models to execute. inplace : bool True to retrieve arrays that share memory with dae alloc : bool True to allocate for arrays internally """ for mdl in models.values(): if mdl.n == 0: continue for var in mdl.cache.vars_int.values(): var.set_arrays(self.dae, inplace=inplace, alloc=alloc) for var in mdl.cache.vars_ext.values(): var.set_arrays(self.dae, inplace=inplace, alloc=alloc) for var in mdl.observables.values(): var.set_arrays(self.dae, inplace=inplace, alloc=alloc) def _init_numba(self, models: OrderedDict): """ Deprecated wrapper to :class:`andes.system.codegen.CodegenManager`. """ warnings.warn( "System._init_numba() is deprecated and will be removed in v3.0. " "Use system.codegen._init_numba() instead.", DeprecationWarning, stacklevel=2, ) return self.codegen._init_numba(models) def precompile(self, models: Union[OrderedDict, None] = None, nomp: bool = False, ncpu: int = NCPUS): """ Delegate to :class:`andes.system.codegen.CodegenManager`. """ return self.codegen.precompile(models=models, nomp=nomp, ncpu=ncpu) def init(self, models: OrderedDict, routine: str): """ Initialize the variables for each of the specified models. For each model, the initialization procedure is: - Get values for all `ExtService`. - Call the model `init()` method, which initializes internal variables. - Copy variables to DAE and then back to the model. """ self.codegen._init_numba(models) for mdl in models.values(): # link externals services first for instance in mdl.services_ext.values(): ext_name = instance.model try: ext_model = self.__dict__[ext_name] except KeyError: raise KeyError('<%s> is not a model or group name.' % ext_name) try: instance.link_external(ext_model) except (IndexError, KeyError) as e: logger.error('Error: <%s> cannot retrieve <%s> from <%s> using <%s>:\n %s', mdl.class_name, instance.name, instance.model, instance.indexer.name, repr(e)) # initialize variables second mdl.init(routine=routine) self.vars_to_dae(mdl) self.vars_to_models() self.s_update_post(models) # store time constants associated with differential equations self._store_tf(models) def store_getters(self, models): """ Build ``_getters`` and ``_setters`` so that ``vars_to_models`` and ``vars_to_dae`` work during model initialization. This is the lightweight subset of ``store_adder_setter`` needed before ``system.init()``. The full ``store_adder_setter`` is called after DAE compaction to build ``_adders`` and ``antiwindups`` as well. """ self._getters = dict(f=list(), g=list(), x=list(), y=list()) self._setters = dict(f=list(), g=list(), x=list(), y=list()) for mdl in models.values(): if not mdl.n: continue mdl.cache.refresh() for var in mdl.cache.v_getters.values(): self._getters[var.v_code].append(var) for var in mdl.cache.v_setters.values(): self._setters[var.v_code].append(var) for var in mdl.cache.e_setters.values(): self._setters[var.e_code].append(var) def store_adder_setter(self, models): """ Store non-inplace adders and setters for variables and equations. """ self._clear_adder_setter() for mdl in models.values(): # Note: # We assume that a Model with no device is not addressed and, therefore, # contains no value in each variable. # It is always true for the current architecture. if not mdl.n: continue # skip models where all devices have been replaced by dynamic models if mdl._all_replaced: continue # Fixes an issue if the cache was manually built but stale # after assigning addresses for simulation # Assigning memory will affect the cache of `v_adders` and `e_adders`. mdl.cache.refresh() # ``getters` that retrieve variable values from DAE for var in mdl.cache.v_getters.values(): self._getters[var.v_code].append(var) # ``adders`` that add variable values to the DAE array for var in mdl.cache.v_adders.values(): self._adders[var.v_code].append(var) for var in mdl.cache.e_adders.values(): self._adders[var.e_code].append(var) # ``setters`` that force set variable values in the DAE array for var in mdl.cache.v_setters.values(): self._setters[var.v_code].append(var) for var in mdl.cache.e_setters.values(): self._setters[var.e_code].append(var) # ``antiwindups`` stores all AntiWindup instances for item in mdl.discrete.values(): if isinstance(item, AntiWindup): self.antiwindups.append(item) return def store_no_check_init(self, models): """ Store differential variables with ``check_init == False``. """ self.no_check_init = list() for mdl in models.values(): if mdl.n == 0: continue for var in mdl.states.values(): if var.check_init is False: self.no_check_init.extend(var.a) def link_ext_param(self, model=None): """ Retrieve values for ``ExtParam`` for the given models. """ if model is None: models = self.models else: models = self._get_models(model) ret = True for model in models.values(): # get external parameters with `link_external` and then calculate the pu coeff for instance in model.params_ext.values(): ext_name = instance.model # '__self__' is a sentinel for self-referencing ExtParam # (e.g. ``ue`` which copies the model's own ``u``) if ext_name == '__self__': ext_model = model else: ext_model = self.__dict__[ext_name] try: instance.link_external(ext_model) except (IndexError, KeyError) as e: logger.error('Error: <%s> cannot retrieve <%s> from <%s> using <%s>:\n %s', model.class_name, instance.name, instance.model, instance.indexer.name if instance.indexer else 'None', repr(e)) ret = False return ret def calc_pu_coeff(self): """ Perform per unit value conversion. This function calculates the per unit conversion factors, stores input parameters to `vin`, and perform the conversion. """ # `Sb`, `Vb` and `Zb` are the system base, bus base values # `Sn`, `Vn` and `Zn` are the device bases Sb = self.config.mva for mdl in self.models.values(): # default Sn to Sb if not provided. Some controllers might not have Sn or Vn. if 'Sn' in mdl.__dict__: Sn = mdl.Sn.v else: Sn = Sb # If both Vn and Vn1 are not provided, default to Vn = Vb = 1 # test if is shunt-connected or series-connected to bus, or unconnected to bus Vb, Vn = 1, 1 if 'bus' in mdl.__dict__: Vb = self.Bus.get(src='Vn', idx=mdl.bus.v, attr='v') Vn = mdl.Vn.v if 'Vn' in mdl.__dict__ else Vb elif 'bus1' in mdl.__dict__: Vb = self.Bus.get(src='Vn', idx=mdl.bus1.v, attr='v') Vn = mdl.Vn1.v if 'Vn1' in mdl.__dict__ else Vb Zn = Vn ** 2 / Sn Zb = Vb ** 2 / Sb # process dc parameter pu conversion Vdcb, Vdcn, Idcn = 1, 1, 1 if 'node' in mdl.__dict__: Vdcb = self.Node.get(src='Vdcn', idx=mdl.node.v, attr='v') Vdcn = mdl.Vdcn.v if 'Vdcn' in mdl.__dict__ else Vdcb Idcn = mdl.Idcn.v if 'Idcn' in mdl.__dict__ else (Sb / Vdcb) elif 'node1' in mdl.__dict__: Vdcb = self.Node.get(src='Vdcn', idx=mdl.node1.v, attr='v') Vdcn = mdl.Vdcn1.v if 'Vdcn1' in mdl.__dict__ else Vdcb Idcn = mdl.Idcn.v if 'Idcn' in mdl.__dict__ else (Sb / Vdcb) Idcb = Sb / Vdcb Rb = Vdcb / Idcb Rn = Vdcn / Idcn coeffs = {'voltage': Vn / Vb, 'power': Sn / Sb, 'ipower': Sb / Sn, 'current': (Sn / Vn) / (Sb / Vb), 'z': Zn / Zb, 'y': Zb / Zn, 'dc_voltage': Vdcn / Vdcb, 'dc_current': Idcn / Idcb, 'r': Rn / Rb, 'g': Rb / Rn, } for prop, coeff in coeffs.items(): for p in mdl.find_param(prop).values(): p.set_pu_coeff(coeff) # store coeffs and bases back in models. mdl.coeffs = coeffs mdl.bases = {'Sn': Sn, 'Sb': Sb, 'Vn': Vn, 'Vb': Vb, 'Zn': Zn, 'Zb': Zb} def l_update_var(self, models: OrderedDict, niter=0, err=None): """ Update variable-based limiter discrete states by calling ``l_update_var`` of models. This function is must be called before any equation evaluation. """ self.call_models('l_update_var', models, dae_t=self.dae.t, niter=niter, err=err) def l_update_eq(self, models: OrderedDict, init=False, niter=0): """ Update equation-dependent limiter discrete components by calling ``l_check_eq`` of models. Force set equations after evaluating equations. This function is must be called after differential equation updates. """ self.call_models('l_check_eq', models, init=init, niter=niter) def s_update_var(self, models: OrderedDict): """ Update variable services by calling ``s_update_var`` of models. This function is must be called before any equation evaluation after limiter update function `l_update_var`. """ self.call_models('s_update_var', models) def s_update_post(self, models: OrderedDict): """ Update variable services by calling ``s_update_post`` of models. This function is called at the end of `System.init()`. """ self.call_models('s_update_post', models) def fg_to_dae(self): """ Collect equation values into the DAE arrays. Additionally, the function resets the differential equations associated with variables pegged by anti-windup limiters. """ self._e_to_dae(('f', 'g')) # reset mismatches for islanded buses self.g_islands() # update variable values set by anti-windup limiters for item in self.antiwindups: if len(item.x_set) > 0: for key, val, _ in item.x_set: np.put(self.dae.x, key, val) def f_update(self, models: OrderedDict): """ Call the differential equation update method for models in sequence. Notes ----- Updated equation values remain in models and have not been collected into DAE at the end of this step. """ try: self.call_models('f_update', models) except TypeError as e: logger.error("f_update failed. Have you run `andes prepare -i` after updating?") raise e def g_update(self, models: OrderedDict): """ Call the algebraic equation update method for models in sequence. Notes ----- Like `f_update`, updated values have not collected into DAE at the end of the step. """ try: self.call_models('g_update', models) except TypeError as e: logger.error("g_update failed. Have you run `andes prepare -i` after updating?") raise e def b_update(self, models: OrderedDict): """ Call the observable variable update method for models in sequence. This evaluates all Observable variables post-solve and stores values in ``dae.b``. """ try: self.call_models('b_update', models) except TypeError as e: logger.error("b_update failed. Have you run `andes prepare -i` after updating?") raise e def g_islands(self): """ Reset algebraic mismatches for islanded buses. """ if self.Bus.n_islanded_buses == 0: return self.dae.g[self.Bus.islanded_a] = 0.0 self.dae.g[self.Bus.islanded_v] = 0.0 def j_update(self, models: OrderedDict, info=None): """ Call the Jacobian update method for models in sequence. The procedure is - Restore the sparsity pattern with :py:func:`andes.variables.dae.DAE.restore_sparse` - For each sparse matrix in (fx, fy, gx, gy), evaluate the Jacobian function calls and add values. Notes ----- Updated Jacobians are immediately reflected in the DAE sparse matrices (fx, fy, gx, gy). """ self.call_models('j_update', models) self.dae.restore_sparse() # collect sparse values into sparse structures for j_name in jac_names: j_size = self.dae.get_size(j_name) for mdl in models.values(): for rows, cols, vals in mdl.triplets.zip_ijv(j_name): try: if self.runtime.ipadd: self.dae.__dict__[j_name].ipadd(vals, rows, cols) else: self.dae.__dict__[j_name] += spmatrix(vals, rows, cols, j_size, 'd') except TypeError as e: logger.error("Error adding Jacobian triplets to existing sparsity pattern.") logger.error(f'{mdl.class_name}: j_name {j_name}, row={rows}, col={cols}, val={vals}, ' f'j_size={j_size}') raise e self.j_islands() if info: logger.debug("Jacobian updated at t=%.6f: %s.", self.dae.t, info) else: logger.debug("Jacobian updated at t=%.6f.", self.dae.t) def j_islands(self): """ Set gy diagonals to eps for `a` and `v` variables of islanded buses. """ if self.Bus.n_islanded_buses == 0: return aidx = self.Bus.islanded_a vidx = self.Bus.islanded_v if self.runtime.ipadd: self.dae.gy.ipset(self.config.diag_eps, aidx, aidx) self.dae.gy.ipset(0.0, aidx, vidx) self.dae.gy.ipset(self.config.diag_eps, vidx, vidx) self.dae.gy.ipset(0.0, vidx, aidx) else: avals = [-self.dae.gy[int(idx), int(idx)] + self.config.diag_eps for idx in aidx] vvals = [-self.dae.gy[int(idx), int(idx)] + self.config.diag_eps for idx in vidx] self.dae.gy += spmatrix(avals, aidx, aidx, self.dae.gy.size, 'd') self.dae.gy += spmatrix(vvals, vidx, vidx, self.dae.gy.size, 'd') def store_sparse_pattern(self, models: OrderedDict): """ Collect and store the sparsity pattern of Jacobian matrices. This is a runtime function specific to cases. Notes ----- For `gy` matrix, always make sure the diagonal is reserved. It is a safeguard if the modeling user omitted the diagonal term in the equations. """ self.call_models('store_sparse_pattern', models) # add variable jacobian values for jname in jac_names: ii, jj, vv = list(), list(), list() # for `gy`, reserve memory for the main diagonal if jname == 'gy': ii.extend(np.arange(self.dae.m)) jj.extend(np.arange(self.dae.m)) vv.extend(np.zeros(self.dae.m)) for mdl in models.values(): for row, col, val in mdl.triplets.zip_ijv(jname): ii.extend(row) jj.extend(col) vv.extend(np.zeros_like(row)) for row, col, val in mdl.triplets.zip_ijv(jname + 'c'): # process constant Jacobians separately ii.extend(row) jj.extend(col) vv.extend(val * np.ones_like(row)) if len(ii) > 0: ii = np.array(ii, dtype=int) jj = np.array(jj, dtype=int) vv = np.array(vv, dtype=float) self.dae.store_sparse_ijv(jname, ii, jj, vv) self.dae.build_pattern(jname) # set sink diagonal to 1.0 for compacted replaced devices si = getattr(self, '_y_sink_idx', None) if si is not None: self.dae.tpl['gy'][si, si] = 1.0 self.dae.gy[si, si] = 1.0 def vars_to_dae(self, model): """ Copy variables values from models to `System.dae`. This function clears `DAE.x` and `DAE.y` and collects values from models. """ self._v_to_dae('x', model) self._v_to_dae('y', model) def vars_to_models(self): """ Copy variable values from `System.dae` to models. """ for var in self._getters['y']: if var.n > 0: var.v[:] = self.dae.y[var.a] for var in self._getters['x']: if var.n > 0: var.v[:] = self.dae.x[var.a] def build_ybus(self): """ Build bus admittance matrix by aggregating contributions from all models with ``flags.ybus == True``. Each contributing model must implement a ``build_ybus()`` method returning a kvxopt ``spmatrix`` of size ``(nb, nb)`` with typecode ``'z'``. Returns ------- spmatrix Bus admittance matrix (sparse, complex). Notes ----- Uses ``exist.ybus``, populated once during ``setup()``. Device status changes after setup are reflected via ``u.v`` at call time, but the set of contributing models is fixed. """ nb = self.Bus.n Y = spmatrix([], [], [], (nb, nb), 'z') for mdl in self.exist.ybus.values(): Y += mdl.build_ybus() return Y def connectivity(self, info=True): """ Delegate to :meth:`ConnMan.check_connectivity`. """ return self.conn.check_connectivity(info=info) def to_ipysheet(self, model: str, vin: bool = False): """ Return an ipysheet object for editing in Jupyter Notebook. """ from ipysheet import from_dataframe return from_dataframe(self.models[model].as_df(vin=vin)) def from_ipysheet(self, model: str, sheet, vin: bool = False): """ Set an ipysheet object back to model. """ from ipysheet import to_dataframe df = to_dataframe(sheet) self.models[model].update_from_df(df, vin=vin) def summary(self): """ Delegate to :meth:`ConnMan.summary`. """ return self.conn.summary() def _v_to_dae(self, v_code, model): """ Helper function for collecting variable values into ``dae`` structures `x` and `y`. This function must be called with ``dae.x`` and ``dae.y`` both being zeros. Otherwise, adders will be summed again, causing an error. Parameters ---------- v_code : 'x' or 'y' Variable type name """ if model.n == 0: return if model.flags.initialized is False: return for var in model.cache.v_adders.values(): if var.v_code != v_code: continue np.add.at(self.dae.__dict__[v_code], var.a, var.v) for var in self._setters[v_code]: if var.owner.flags.initialized is False: continue if var.n > 0: np.put(self.dae.__dict__[v_code], var.a, var.v) def _e_to_dae(self, eq_name: Union[str, Tuple] = ('f', 'g')): """ Helper function for collecting equation values into `System.dae.f` and `System.dae.g`. Parameters ---------- eq_name : 'x' or 'y' or tuple Equation type name """ if isinstance(eq_name, str): eq_name = [eq_name] for name in eq_name: for var in self._adders[name]: np.add.at(self.dae.__dict__[name], var.a, var.e) for var in self._setters[name]: np.put(self.dae.__dict__[name], var.a, var.e) def get_z(self, models: OrderedDict): """ Get all discrete status flags in a numpy array. Values are written to ``dae.z`` in place. Returns ------- numpy.array """ if self.TDS.config.store_z != 1: return None if len(self.dae.z) != self.dae.o: self.dae.z = np.zeros(self.dae.o, dtype=float) ii = 0 for mdl in models.values(): if mdl.n == 0 or len(mdl._input_z) == 0: continue for zz in mdl._input_z.values(): self.dae.z[ii:ii + mdl.n] = zz ii += mdl.n return self.dae.z def find_models(self, flag: Optional[Union[str, Tuple]], skip_zero: bool = True): """ Find models with at least one of the flags as True. Warnings -------- Checking the number of devices has been centralized into this function. ``models`` passed to most System calls must be retrieved from here. Parameters ---------- flag : list, str Flags to find skip_zero : bool Skip models with zero devices Returns ------- OrderedDict model name : model instance """ if isinstance(flag, str): flag = [flag] out = OrderedDict() for name, mdl in self.models.items(): if skip_zero is True: if (mdl.n == 0) or (mdl.in_use is False): continue for f in flag: if mdl.flags.__dict__[f] is True: out[name] = mdl break return out def undill(self, autogen_stale=True): """ Delegate to :class:`andes.system.codegen.CodegenManager`. """ return self.codegen.undill(autogen_stale=autogen_stale) def _load_calls(self): """ Deprecated wrapper to :class:`andes.system.codegen.CodegenManager`. """ warnings.warn( "System._load_calls() is deprecated and will be removed in v3.0. " "Use system.codegen._load_calls() instead.", DeprecationWarning, stacklevel=2, ) return self.codegen._load_calls() def _expand_pycode(self, pycode_module): """ Deprecated wrapper to :class:`andes.system.codegen.CodegenManager`. """ warnings.warn( "System._expand_pycode() is deprecated and will be removed in v3.0. " "Use system.codegen._expand_pycode() instead.", DeprecationWarning, stacklevel=2, ) return self.codegen._expand_pycode(pycode_module) def _get_models(self, models): """ Helper function for sanitizing the ``models`` input. The output is an OrderedDict of model names and instances. """ out = OrderedDict() if isinstance(models, OrderedDict): out.update(models) elif models is None: out.update(self.exist.pflow) elif isinstance(models, str): out[models] = self.__dict__[models] elif isinstance(models, Model): out[models.class_name] = models elif isinstance(models, list): for item in models: if isinstance(item, Model): out[item.class_name] = item elif isinstance(item, str): out[item] = self.__dict__[item] else: raise TypeError(f'Unknown type {type(item)}') return out def _store_tf(self, models): """ Store the inverse time constant associated with equations. """ for mdl in models.values(): for var in mdl.cache.states_and_ext.values(): if var.t_const is not None: np.put(self.dae.Tf, var.a, var.t_const.v) def call_models(self, method: str, models: OrderedDict, *args, **kwargs): """ Call methods on the given models. Parameters ---------- method : str Name of the model method to be called models : OrderedDict, list, str Models on which the method will be called args Positional arguments to be passed to the model method kwargs Keyword arguments to be passed to the model method Returns ------- The return value of the models in an OrderedDict """ ret = OrderedDict() for name, mdl in models.items(): ret[name] = getattr(mdl, method)(*args, **kwargs) if self.runtime.save_stats: if method not in self.call_stats[name]: self.call_stats[name][method] = 1 else: self.call_stats[name][method] += 1 return ret def check_group_common(self): """ Delegate to :class:`andes.system.registry.RegistryLoader`. """ return self.registry.check_group_common() def _check_group_common(self): """ Check if all group common variables and parameters are met. """ warnings.warn( "System._check_group_common() is deprecated and will be removed in v3.0. " "Use system.registry.check_group_common() instead.", DeprecationWarning, stacklevel=2, ) return self.registry.check_group_common() def collect_ref(self): """ Collect indices into `BackRef` for all models. """ models_and_groups = list(self.models.values()) + list(self.groups.values()) # Auto-create BackRef on destination groups/models for status_parent IdxParams. # This allows the framework to propagate status without requiring manual # BackRef declarations on every parent (e.g., Bus/ACNode doesn't need to # pre-declare BackRef for every child group). for model in models_and_groups: if not hasattr(model, 'idx_params') or model.n == 0: continue for idxp in model.idx_params.values(): if not getattr(idxp, 'status_parent', False) or idxp.model is None: continue dest = self.__dict__.get(idxp.model) if dest is None or dest.n == 0: continue for ref_name in (model.group, model.class_name): if ref_name not in dest.services_ref: br = BackRef(info=f'auto for status from {model.class_name}') setattr(dest, ref_name, br) # create an empty list of lists for all `BackRef` instances for model in models_and_groups: for ref in model.services_ref.values(): ref.v = [list() for _ in range(model.n)] # `model` is the model who stores `IdxParam`s to other models # `BackRef` is declared at other models specified by the `model` parameter # of `IdxParam`s. for model in models_and_groups: if model.n == 0: continue # skip: a group is not allowed to link to other groups if not hasattr(model, "idx_params"): continue for idxp in model.idx_params.values(): if (idxp.model not in self.models) and (idxp.model not in self.groups): continue dest = self.__dict__[idxp.model] if dest.n == 0: continue for name in (model.class_name, model.group): # `BackRef` not requested by the linked models or groups if name not in dest.services_ref: continue for model_idx, dest_idx in zip(model.idx.v, idxp.v): if dest_idx not in dest.uid: continue dest.set_backref(name, from_idx=model_idx, to_idx=dest_idx) # set model ``in_use`` flag if isinstance(model, Model): model.set_in_use() def _collect_ref_index(self, status_parent_only=False): """ Build a reverse index from ``IdxParam`` declarations. Returns a dict mapping each target group/model name to a list of ``(source_model_name, param_attr_name)`` tuples. Parameters ---------- status_parent_only : bool, optional If ``True``, include only ``IdxParam`` instances with ``status_parent=True``. Default is ``False`` (all references). Returns ------- dict ``{target_group_or_model: [(source_model, param_name), ...]}`` Examples -------- With ``status_parent_only=False`` (all references):: { 'ACNode': [('PQ', 'bus'), ('PV', 'bus'), ('Line', 'bus1'), ('Line', 'bus2'), ('GENROU', 'bus'), ...], 'SynGen': [('TGOV1', 'syn'), ('ESST3A', 'syn'), ...], 'Exciter': [('IEEEST', 'avr'), ...], 'Bus': [('Fault', 'bus'), ('PV', 'busr'), ...], ... } With ``status_parent_only=True``, only entries where the ``IdxParam`` has ``status_parent=True`` are included (a subset of the above). """ ref_index = {} for mdl_name, mdl in self.models.items(): if mdl.n == 0: continue if not hasattr(mdl, 'idx_params'): continue for p_name, p_instance in mdl.idx_params.items(): if status_parent_only and not getattr(p_instance, 'status_parent', False): continue target = p_instance.model if target is None: continue if target not in ref_index: ref_index[target] = [] ref_index[target].append((mdl_name, p_name)) return ref_index def _build_status_graph(self): """ Build the status propagation graph from ``IdxParam`` declarations with ``status_parent=True``. Populates ``self._status_children`` and ``self._status_parent_map`` used by :meth:`set_status` for recursive propagation. After this method runs, the two dicts look like:: _status_parent_map = { 'GENROU': [('bus', 'ACNode')], 'ESST3A': [('syn', 'SynGen')], 'TGOV1': [('syn', 'SynGen')], 'Line': [('bus1', 'ACNode'), ('bus2', 'ACNode')], ... } _status_children = { 'ACNode': {'PQ', 'PV', 'Slack', 'StaticGen', 'SynGen', 'GENROU', 'Line', 'ACLine', 'Shunt', ...}, 'SynGen': {'ESST3A', 'TGOV1', 'Exciter', 'TurbineGov', ...}, ... } Values in ``_status_children`` are BackRef names present on the parent group, which may be group names (e.g., ``'Exciter'``) or model names (e.g., ``'ESST3A'``). """ self._status_children = {} self._status_parent_map = {} for target, refs in self._collect_ref_index(status_parent_only=True).items(): for mdl_name, p_name in refs: # Record child -> parent mapping (list of tuples for multi-parent) if mdl_name not in self._status_parent_map: self._status_parent_map[mdl_name] = [] self._status_parent_map[mdl_name].append((p_name, target)) # Record parent -> child BackRef names to follow if target not in self._status_children: self._status_children[target] = set() parent_obj = self.__dict__.get(target) if parent_obj is None: continue mdl = self.models[mdl_name] child_group_name = mdl.group child_model_name = mdl.class_name if child_group_name in parent_obj.services_ref: self._status_children[target].add(child_group_name) if child_model_name in parent_obj.services_ref: self._status_children[target].add(child_model_name) def propagate_init_status(self): """ Propagate effective status from parent to child models at init time. For each model with a ``status_parent``, recompute ``ue.v = u.v * parent_ue`` for every device, then recurse into children. This replaces the previous ``ug``/``uee`` ExtParam mechanism that fetched the parent's ``u`` at init. Should be called once after ``system.init()`` for TDS models. """ if not self._status_children: return # Find top-level parent groups (groups that have children but # are not themselves children of another group) child_groups = {self.models[m].group for m in self._status_parent_map if m in self.models} parent_groups = set(self._status_children.keys()) - child_groups # For each top-level parent group, propagate from every device for grp_name in parent_groups: grp = self.groups.get(grp_name) if grp is None: continue for mdl_name in grp.models: mdl = self.models.get(mdl_name) if mdl is None or mdl.n == 0: continue for uid in range(mdl.n): idx = mdl.idx.v[uid] ue_val = self._get_effective_status(mdl, uid) self._propagate_status(mdl, idx, uid, ue_val) def _get_effective_status(self, mdl, uid): """ Return the effective status value for a device. """ return mdl.ue.v[uid] def _get_parent_ue(self, mdl, uid): """ Get the combined parent effective status for a device. For models with multiple parents (e.g., Line with bus1 and bus2), returns the product of all parents' effective statuses. Returns 1.0 if the model has no status parent. """ mdl_name = mdl.class_name if mdl_name not in self._status_parent_map: return 1.0 result = 1.0 for p_name, parent_group_name in self._status_parent_map[mdl_name]: parent_idx = mdl.__dict__[p_name].v[uid] if parent_idx is None: continue parent_obj = self.__dict__.get(parent_group_name) if parent_obj is None: continue # Resolve to concrete model via group if hasattr(parent_obj, 'idx2model'): parent_mdl = parent_obj.idx2model(parent_idx) else: parent_mdl = parent_obj parent_uid = parent_mdl.idx2uid(parent_idx) result *= self._get_effective_status(parent_mdl, parent_uid) return result def _propagate_status(self, mdl, idx, uid, ue_val): """ Recursively propagate effective status to children via BackRef. """ group_name = mdl.group if group_name not in self._status_children: return group = self.groups.get(group_name) if group is None: return # Get group-level uid for this device if idx not in group.uid: return group_uid = group.idx2uid(idx) for backref_name in self._status_children[group_name]: if backref_name not in group.services_ref: continue child_idx_list = group.services_ref[backref_name].v[group_uid] for child_idx in child_idx_list: # Resolve child idx to concrete model if backref_name in self.groups: child_group = self.groups[backref_name] try: child_mdl = child_group.idx2model(child_idx) except KeyError: continue elif backref_name in self.models: child_mdl = self.models[backref_name] else: continue child_uid = child_mdl.idx2uid(child_idx) parent_ue = self._get_parent_ue(child_mdl, child_uid) child_ue = child_mdl.u.v[child_uid] * parent_ue old_ue = child_mdl.ue.v[child_uid] child_mdl.ue.v[child_uid] = child_ue if child_mdl.flags.topo and child_ue != old_ue: self.conn.invalidate() # Recurse to children of this child self._propagate_status(child_mdl, child_idx, child_uid, child_ue) def set_status(self, model_or_group, idx, value): """ Set the online status of a device and propagate to dependents. Sets ``u.v`` (the device's own status) and recomputes ``ue.v`` (effective status) for the device and all downstream dependents recursively via BackRef. Parameters ---------- model_or_group : str Model name or group name containing the device. idx : str, int, float Device idx. value : int or float New status value (0 or 1). """ if model_or_group in self.groups: grp = self.groups[model_or_group] mdl = grp.idx2model(idx) elif model_or_group in self.models: mdl = self.models[model_or_group] else: raise KeyError(f"'{model_or_group}' is not a model or group name.") uid = mdl.idx2uid(idx) old_ue = mdl.ue.v[uid] # Set u.v (device's own status) mdl.u.v[uid] = value # Recompute ue.v for this device parent_ue = self._get_parent_ue(mdl, uid) ue_val = value * parent_ue mdl.ue.v[uid] = ue_val if mdl.flags.topo and ue_val != old_ue: self.conn.invalidate() elif mdl.flags.ybus and ue_val != old_ue: self.conn.invalidate_ybus() # Propagate to children self._propagate_status(mdl, idx, uid, ue_val) def get_status(self, model_or_group, idx): """ Get the effective status of a device. Returns ``ue.v`` (effective online status) for the device. Parameters ---------- model_or_group : str Model name or group name. idx : str, int, float Device idx. Returns ------- float Effective status value (0 or 1). """ if model_or_group in self.groups: grp = self.groups[model_or_group] mdl = grp.idx2model(idx) elif model_or_group in self.models: mdl = self.models[model_or_group] else: raise KeyError(f"'{model_or_group}' is not a model or group name.") uid = mdl.idx2uid(idx) return self._get_effective_status(mdl, uid) def find_connected(self, model_or_group, idx): """ Find all devices connected to a given device. Scans ``IdxParam`` references across all models to find devices that point to the specified target device. Parameters ---------- model_or_group : str Model name (e.g., ``'Bus'``) or group name (e.g., ``'ACNode'``). idx : str, int, float Device idx to query. Returns ------- OrderedDict ``{model_name: [idx, ...]}`` for each model with at least one device referencing the target. Empty models are omitted. Examples -------- Find all devices connected to Bus 1:: ss.find_connected('Bus', 1) # OrderedDict([('PQ', [1]), ('Line', [1, 4]), ('GENROU', [1])]) """ # Build and cache the reverse index on first call if not hasattr(self, '_ref_index'): self._ref_index = self._collect_ref_index(status_parent_only=False) # Resolve model name to group name and collect both keys targets = set() if model_or_group in self.groups: targets.add(model_or_group) elif model_or_group in self.models: targets.add(model_or_group) targets.add(self.models[model_or_group].group) else: raise KeyError(f"'{model_or_group}' is not a model or group name.") result = OrderedDict() for target in targets: if target not in self._ref_index: continue for src_model, param_name in self._ref_index[target]: mdl = self.models.get(src_model) if mdl is None or mdl.n == 0: continue idxp = mdl.idx_params.get(param_name) if idxp is None: continue matched = [mdl.idx.v[uid] for uid, ref_idx in enumerate(idxp.v) if ref_idx == idx] if matched: if src_model in result: result[src_model].extend(matched) else: result[src_model] = matched return result def import_groups(self): """ Delegate to :class:`andes.system.registry.RegistryLoader`. """ warnings.warn( "System.import_groups() is deprecated and will be removed in v3.0. " "Use system.registry.import_groups() instead.", DeprecationWarning, stacklevel=2, ) return self.registry.import_groups() def import_models(self): """ Delegate to :class:`andes.system.registry.RegistryLoader`. """ warnings.warn( "System.import_models() is deprecated and will be removed in v3.0. " "Use system.registry.import_models() instead.", DeprecationWarning, stacklevel=2, ) return self.registry.import_models() def import_routines(self): """ Delegate to :class:`andes.system.registry.RegistryLoader`. """ warnings.warn( "System.import_routines() is deprecated and will be removed in v3.0. " "Use system.registry.import_routines() instead.", DeprecationWarning, stacklevel=2, ) return self.registry.import_routines() def store_switch_times(self, models, eps=1e-4): """ Store event switching time in a sorted Numpy array in ``System.switch_times`` and an OrderedDict ``System.switch_dict``. ``System.switch_dict`` has keys as event times and values as the OrderedDict of model names and instances associated with the event. Parameters ---------- models : OrderedDict model name : model instance eps : float The small time step size to use immediately before and after the event Returns ------- array-like self.switch_times """ out = np.array([], dtype=float) if self.options.get('flat') is True: return out names = [] for instance in models.values(): times = np.array(instance.get_times()).ravel() out = np.append(out, times) out = np.append(out, times - eps) out = np.append(out, times + eps) names.extend([instance.class_name] * (3 * len(times))) # sort sort_idx = np.argsort(out).astype(int) out = out[sort_idx] names = [names[i] for i in sort_idx] # select t > current time ltzero_idx = np.where(out >= self.dae.t)[0] out = out[ltzero_idx] names = [names[i] for i in ltzero_idx] # make into an OrderedDict with unique keys and model names combined for i, j in zip(out, names): if i not in self.switch_dict: self.switch_dict[i] = {j: self.models[j]} else: self.switch_dict[i].update({j: self.models[j]}) self.switch_times = np.array(list(self.switch_dict.keys())) # self.switch_times = out self.n_switches = len(self.switch_times) return self.switch_times def switch_action(self, models: OrderedDict): """ Invoke the actions associated with switch times. This function will not be called if ``flat=True`` is passed to system. """ for instance in models.values(): instance.switch_action(self.dae.t) # TODO: generalize below for any models with timeseries data. self.TimeSeries.apply_exact(self.dae.t) def _p_restore(self): """ Restore parameters stored in `vin`. """ for model in self.models.values(): for param in model.num_params.values(): param.restore() def e_clear(self, models: OrderedDict): """ Clear equation arrays in DAE and model variables. This step must be called before calling `f_update` or `g_update` to flush existing values. """ self.dae.clear_fg() self.call_models('e_clear', models) def remove_pycapsule(self): """ Remove PyCapsule objects in solvers. """ for r in self.routines.values(): r.solver.clear() def _store_calls(self, models: OrderedDict): """ Deprecated wrapper to :class:`andes.system.codegen.CodegenManager`. """ warnings.warn( "System._store_calls() is deprecated and will be removed in v3.0. " "Use system.codegen._store_calls() instead.", DeprecationWarning, stacklevel=2, ) return self.codegen._store_calls(models) def _list2array(self): """ Helper function to call models' ``list2array`` method, which usually performs memory preallocation. """ self.call_models('list2array', self.models) def _report_param_corrections(self): """ Report parameter corrections for all models. Emits grouped warnings for parameter values that were corrected during device addition (e.g., non-zero, non-positive, non-negative violations). """ for mdl in self.models.values(): mdl.report_corrections() def _check_setpoints(self): """ Validate ``_setpoints`` declarations on all models. For every model that declares a ``_setpoints`` dict, verify that each target attribute exists on the model and has a ``.v`` member. Called once during :meth:`setup`. """ for mdl_name, mdl in self.models.items(): sp_map = getattr(mdl, '_setpoints', None) if not sp_map: continue for sp_name, attr_name in sp_map.items(): attr = mdl.__dict__.get(attr_name) if attr is None: raise AttributeError( f"Model <{mdl.class_name}> declares _setpoints" f"['{sp_name}'] = '{attr_name}', but attribute" f" '{attr_name}' does not exist." ) if not hasattr(attr, 'v'): raise AttributeError( f"Model <{mdl.class_name}> declares _setpoints" f"['{sp_name}'] = '{attr_name}', but '{attr_name}'" f" has no '.v' array." ) def set_config(self, config=None): """ Delegate to :class:`andes.system.config_runtime.SystemConfigRuntime`. """ return self.config_runtime.set_config(config=config) def collect_config(self): """ Delegate to :class:`andes.system.config_runtime.SystemConfigRuntime`. """ return self.config_runtime.collect_config() def save_config(self, file_path=None, overwrite=False): """ Delegate to :class:`andes.system.config_runtime.SystemConfigRuntime`. """ return self.config_runtime.save_config(file_path=file_path, overwrite=overwrite) def supported_models(self, export='plain'): """ Return the support group names and model names in a table. Returns ------- str A table-formatted string for the groups and models """ def rst_ref(name, export): """ Refer to the model in restructuredText mode so that it renders as a hyperlink. """ if export == 'rest': return ":ref:`" + name + '`' else: return name seen = set() pairs = list() for g in self.groups: group = self.groups[g] if id(group) in seen: continue seen.add(id(group)) models = list() for m in group.models: models.append(rst_ref(m, export)) if len(models) > 0: pairs.append((rst_ref(g, export), ', '.join(models))) tab = Tab(title='Supported Groups and Models', header=['Group', 'Models'], data=pairs, export=export, ) return tab.draw() def as_dict(self, vin=False, skip_empty=True): """ Return system data as a dict where the keys are model names and values are dicts. Each dict has parameter names as keys and corresponding data in an array as values. Returns ------- OrderedDict """ out = OrderedDict() for name, instance in self.models.items(): if skip_empty and instance.n == 0: continue out[name] = instance.as_dict(vin=vin) return out def set_output_subidx(self, models): """ Process :py:class:`andes.models.misc.Output` data and store the sub-indices into ``dae.xy``. Parameters ---------- models : OrderedDict Models currently in use for the routine """ export_vars = dict(x=list(), y=list()) # indices of export x and y for model, var, dev in zip(self.Output.model.v, self.Output.varname.v, self.Output.dev.v): # check validity of model name if model not in models: logger.info("Output model <%s> invalid or contains no device. Skipped.", model) continue mdl_instance = models[model] mdl_all_vars = mdl_instance.cache.all_vars # check validity of var name if var is not None and (var not in mdl_all_vars): logger.info("Output model <%s> contains no variable <%s>. Skipped.", model, var) continue # check validity of dev idx if (dev is not None) and (dev not in mdl_instance.idx.v): logger.info("Output model <%s> contains no device <%s>. Skipped.", model, dev) continue # TODO: dev-based indexing is not fully supported # for multi-index variables, such as those in COI. if var is None: for item in mdl_all_vars.values(): if dev is None: export_vars[item.v_code].extend(item.a) else: uid = mdl_instance.idx2uid(dev) export_vars[item.v_code].append(item.a[uid]) else: # with variable name item = mdl_all_vars[var] if dev is None: export_vars[item.v_code].extend(item.a) else: # with exact index uid = mdl_instance.idx2uid(dev) export_vars[item.v_code].append(item.a[uid]) self.Output.xidx = sorted(np.unique(export_vars['x'])) self.Output.yidx = sorted(np.unique(export_vars['y']))