"""
System class for power system data and methods.
"""
# [ANDES] (C)2015-2026 Hantao Cui
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# File name: system/facade.py
import logging
import warnings
from collections import OrderedDict, defaultdict
from typing import Dict, Optional, Tuple, Union
import andes.io
from andes.core import AntiWindup, Model, ConnMan
from andes.core.service import BackRef
from andes.io.streaming import Streaming
from andes.shared import NCPUS, jac_names, np, spmatrix
from andes.system.codegen import CodegenManager
from andes.system.config_runtime import SystemConfigRuntime
from andes.system.dae_compactor import DAECompactor
from andes.system.helpers import _set_hi_name, _set_xy_name, _set_z_name
from andes.system.registry import RegistryLoader
from andes.utils.misc import elapsed
from andes.utils.tab import Tab
from andes.variables import DAE, FileMan
logger = logging.getLogger(__name__)
[docs]class ExistingModels:
"""
Storage class for existing models
"""
[docs] def __init__(self):
self.pflow = OrderedDict()
self.tds = OrderedDict() # if a model needs to be initialized before TDS, set `flags.tds = True`
self.pflow_tds = OrderedDict()
self.ybus = OrderedDict()
[docs]class System:
"""
System contains models and routines for modeling and simulation.
System contains a several special `OrderedDict` member attributes for housekeeping.
These attributes include `models`, `groups`, `routines` and `calls` for loaded models, groups,
analysis routines, and generated numerical function calls, respectively.
Parameters
----------
no_undill : bool, optional, default=False
True to disable the call to ``System.undill()`` at the end of object creation.
False by default.
autogen_stale : bool, optional, default=True
True to automatically generate code for stale models.
Notes
-----
System stores model and routine instances as attributes.
Model and routine attribute names are the same as their class names.
For example, `Bus` is stored at ``system.Bus``, the power flow calculation routine is at
``system.PFlow``, and the numerical DAE instance is at ``system.dae``. See attributes for the list of
attributes.
Attributes
----------
dae : andes.variables.dae.DAE
Numerical DAE storage
files : andes.variables.fileman.FileMan
File path storage
config : andes.core.Config
System config storage
models : OrderedDict
model name and instance pairs
groups : OrderedDict
group name and instance pairs
routines : OrderedDict
routine name and instance pairs
"""
[docs] def __init__(self,
case: Optional[str] = None,
name: Optional[str] = None,
config: Optional[Dict] = None,
config_path: Optional[str] = None,
default_config: Optional[bool] = False,
options: Optional[Dict] = None,
no_undill: Optional[bool] = False,
autogen_stale: Optional[bool] = True,
**kwargs
):
self.name = name
self.options = {}
if options is not None:
self.options.update(options)
if kwargs:
self.options.update(kwargs)
self.calls = OrderedDict() # a dictionary with model names (keys) and their ``calls`` instance
self.models = OrderedDict() # model names and instances
self.model_aliases = OrderedDict() # alias: model instance
self.groups = OrderedDict() # group names and instances
self.routines = OrderedDict() # routine names and instances
self.switch_times = np.array([]) # an array of ordered event switching times
self.switch_dict = OrderedDict() # time: OrderedDict of associated models
self.with_calls = False # if generated function calls have been loaded
self.n_switches = 0 # number of elements in `self.switch_times`
self.exit_code = 0 # command-line exit code, 0 - normal, others - error.
# --- Config resolution (phased) ---
# Precedence: defaults < andes.rc < file _config < CLI config_option
self.config_runtime = SystemConfigRuntime(self)
self.config_runtime.load_rc(config_path=config_path,
default_config=default_config)
self.files = FileMan(case=case, **self.options)
self.config_runtime.merge_file_config(self.files)
self.config_runtime.apply_cli_overrides()
self.config_runtime.finalize(config=config)
# --- Managers ---
self._init_managers()
# --- Models, routines, and codegen ---
self._init_models(no_undill=no_undill, autogen_stale=autogen_stale)
def _init_managers(self):
"""
Create internal manager objects (DAE, connectivity, registry, etc.).
"""
self.exist = ExistingModels()
self.dae = DAE(system=self)
self.streaming = Streaming(self)
self.conn = ConnMan(system=self)
self.registry = RegistryLoader(self)
self.codegen = CodegenManager(self)
self.dae_compactor = DAECompactor(self)
def _init_models(self, no_undill=False, autogen_stale=True):
"""
Import groups, models, and routines from the registry, then
optionally load generated code (undill).
"""
self.registry.load_all()
self._getters = dict(f=list(), g=list(), x=list(), y=list())
self._adders = dict(f=list(), g=list(), x=list(), y=list())
self._setters = dict(f=list(), g=list(), x=list(), y=list())
self.antiwindups = list()
self.no_check_init = list()
self.call_stats = defaultdict(dict)
# status propagation graph (built during setup)
self._status_children = {}
self._status_parent_map = {}
self.is_setup = False
if not no_undill:
self.undill(autogen_stale=autogen_stale)
def _update_config_object(self):
"""
Change config on the fly based on command-line options.
.. deprecated:: 2.0
Use ``system.config_runtime.apply_cli_overrides()`` instead.
This method will be removed in v3.0.
"""
if hasattr(self, 'config_runtime'):
return self.config_runtime.apply_cli_overrides()
# Fallback for subclasses that do not call super().__init__().
warnings.warn(
"System._update_config_object() is deprecated and will be removed "
"in v3.0. Subclasses should call super().__init__() and use "
"system.config_runtime.apply_cli_overrides().",
FutureWarning,
stacklevel=2,
)
SystemConfigRuntime(self).apply_cli_overrides()
def reload(self, case, **kwargs):
"""
Reload a new case in the same System object.
"""
self.options.update(kwargs)
self.files.set(case=case, **kwargs)
# TODO: clear all flags and empty data
andes.io.parse(self)
self.setup()
def _clear_adder_setter(self):
"""
Clear adders and setters storage
"""
self._getters = dict(f=list(), g=list(), x=list(), y=list())
self._adders = dict(f=list(), g=list(), x=list(), y=list())
self._setters = dict(f=list(), g=list(), x=list(), y=list())
def prepare(self, quick=False, incremental=False, models=None, nomp=False, ncpu=NCPUS):
"""
Delegate to :class:`andes.system.codegen.CodegenManager`.
"""
return self.codegen.prepare(quick=quick,
incremental=incremental,
models=models,
nomp=nomp,
ncpu=ncpu)
def _mp_prepare(self, models, quick, pycode_path, ncpu):
"""
Deprecated wrapper to :class:`andes.system.codegen.CodegenManager`.
"""
warnings.warn(
"System._mp_prepare() is deprecated and will be removed in v3.0. "
"Use system.codegen._mp_prepare() instead.",
DeprecationWarning,
stacklevel=2,
)
return self.codegen._mp_prepare(models=models,
quick=quick,
pycode_path=pycode_path,
ncpu=ncpu)
def _finalize_pycode(self, pycode_path):
"""
Deprecated wrapper to :class:`andes.system.codegen.CodegenManager`.
"""
warnings.warn(
"System._finalize_pycode() is deprecated and will be removed in v3.0. "
"Use system.codegen._finalize_pycode() instead.",
DeprecationWarning,
stacklevel=2,
)
return self.codegen._finalize_pycode(pycode_path)
def _find_stale_models(self):
"""
Deprecated wrapper to :class:`andes.system.codegen.CodegenManager`.
"""
warnings.warn(
"System._find_stale_models() is deprecated and will be removed in v3.0. "
"Use system.codegen._find_stale_models() instead.",
DeprecationWarning,
stacklevel=2,
)
return self.codegen._find_stale_models()
def _to_orddct(self, model_list):
"""
Helper function to convert a list of model names to OrderedDict with
name as keys and model instances as values.
"""
if isinstance(model_list, OrderedDict):
return model_list
if isinstance(model_list, list):
out = OrderedDict()
for name in model_list:
if name not in self.models:
logger.error("Model <%s> does not exist. Check your inputs.", name)
continue
out[name] = self.models[name]
return out
else:
raise TypeError("Type %s not recognized" % type(model_list))
def setup(self):
"""
Set up system for studies.
This function is to be called after adding all device data.
"""
ret = True
t0, _ = elapsed()
if self.is_setup:
logger.warning(
'System has been setup. Calling setup() twice is not allowed. '
'To add devices, reload the case with setup=False, add devices, then call setup().'
)
ret = False
return ret
self.collect_ref()
self._build_status_graph()
self._list2array() # `list2array` must come before `link_ext_param`
if not self.link_ext_param():
ret = False
self.find_devices() # find or add required devices
self._report_param_corrections()
self._check_setpoints()
# === no device addition or removal after this point ===
self.calc_pu_coeff() # calculate parameters in system per units
self.store_existing() # store models with routine flags
# assign address at the end before adding devices and processing parameters
self.set_address(self.exist.pflow)
self.set_dae_names(self.exist.pflow) # needs perf. optimization
self.store_sparse_pattern(self.exist.pflow)
self.store_adder_setter(self.exist.pflow)
# propagate bus/parent status to ue before first connectivity check
self.propagate_init_status()
self.conn.check_connectivity()
if ret is True:
self.is_setup = True # set `is_setup` if no error occurred
else:
logger.error("System setup failed. Please resolve the reported issue(s).")
self.exit_code += 1
_, s = elapsed(t0)
logger.info('System internal structure set up in %s.', s)
return ret
def store_existing(self):
"""
Store existing models in `System.existing`.
TODO: Models with `TimerParam` will need to be stored anyway.
This will allow adding switches on the fly.
"""
self.exist.pflow = self.find_models('pflow')
self.exist.tds = self.find_models('tds')
self.exist.pflow_tds = self.find_models(('tds', 'pflow'))
self.exist.ybus = self.find_models('ybus')
def reset(self, force=False):
"""
Reset to the state after reading data and setup (before power flow).
Warnings
--------
If TDS is initialized, reset will lead to unpredictable state.
"""
if self.TDS.initialized is True and not force:
logger.error('Reset failed because TDS is initialized. \nPlease reload the test case to start over.')
return
self.dae.reset()
self.call_models('a_reset', models=self.models)
self.e_clear(models=self.models)
self._p_restore()
self.is_setup = False
self.setup()
def add(self, model_name, param_dict=None, **kwargs):
"""
Add a device instance for an existing model.
This method calls the ``add`` method of the model and registers the device `idx` to its group.
Parameters can be passed as a dictionary, as keyword arguments, or both.
When both are provided, keyword arguments are merged into the dictionary
(kwargs take precedence on conflicts).
Parameters
----------
model_name : str
Name of the model (e.g., ``'Fault'``, ``'Toggle'``, ``'PQ'``).
param_dict : dict, optional
Dictionary of parameter names to values.
**kwargs
Parameter names and values as keyword arguments.
Returns
-------
idx
The assigned device index.
Examples
--------
Keyword arguments are the preferred style::
ss.add('Fault', bus=5, tf=1.0, tc=1.1)
Models with a ``model`` parameter (e.g., ``Alter``, ``Toggle``)
can now use keyword arguments directly::
ss.add('Alter', model='TGOV1', dev=1, src='paux0',
t=1.0, method='=', amount=0.05)
ss.add('Toggle', model='Line', dev='Line_5', t=1.0)
"""
if model_name not in self.models and (model_name not in self.model_aliases):
logger.warning("<%s> is not an existing model.", model_name)
return
if self.is_setup:
raise NotImplementedError(
"Adding devices after setup() is not supported. "
"To add devices, reload the case with setup=False, add devices, then call setup()."
)
group_name = self.__dict__[model_name].group
group = self.groups[group_name]
if param_dict is None:
param_dict = {}
if kwargs is not None:
param_dict.update(kwargs)
# remove `uid` field
param_dict.pop('uid', None)
idx = param_dict.pop('idx', None)
if idx is not None and (not isinstance(idx, str) and np.isnan(idx)):
idx = None
idx = group.get_next_idx(idx=idx, model_name=model_name)
self.__dict__[model_name].add(idx=idx, **param_dict)
group.add(idx=idx, model=self.__dict__[model_name])
return idx
def find_devices(self):
"""
Add dependent devices for all model based on `DeviceFinder`.
"""
for mdl in self.models.values():
if len(mdl.services_fnd) == 0:
continue
for fnd in mdl.services_fnd.values():
fnd.find_or_add(self)
def set_address(self, models):
"""
Set addresses for differential and algebraic variables.
"""
# --- Phase 1: set internal variable addresses ---
for mdl in models.values():
if mdl.flags.address is True:
logger.debug('%s internal address exists', mdl.class_name)
continue
if mdl.n == 0:
continue
logger.debug('Setting internal address for %s', mdl.class_name)
collate = mdl.flags.collate
ndevice = mdl.n
# get and set internal variable addresses
xaddr = self.dae.request_address('x', ndevice=ndevice,
nvar=len(mdl.states),
collate=mdl.flags.collate,
)
yaddr = self.dae.request_address('y', ndevice=ndevice,
nvar=len(mdl.algebs),
collate=mdl.flags.collate,
)
for idx, item in enumerate(mdl.states.values()):
item.set_address(xaddr[idx], contiguous=not collate)
for idx, item in enumerate(mdl.algebs.values()):
item.set_address(yaddr[idx], contiguous=not collate)
# observable variable addresses (in dae.b)
if len(mdl.observables) > 0:
baddr = self.dae.request_address('b', ndevice=ndevice,
nvar=len(mdl.observables),
collate=mdl.flags.collate,
)
for idx, item in enumerate(mdl.observables.values()):
item.set_address(baddr[idx], contiguous=not collate)
# --- Phase 2: set external variable addresses ---
# NOTE:
# This step will retrieve the number of variables (item.n) for Phase 3.
for mdl in models.values():
# handle external groups
for instance in mdl.cache.vars_ext.values():
ext_name = instance.model
try:
ext_model = self.__dict__[ext_name]
except KeyError:
raise KeyError('<%s> is not a model or group name.' % ext_name)
try:
instance.link_external(ext_model)
except (IndexError, KeyError) as e:
logger.error('Error: <%s> cannot retrieve <%s> from <%s> using <%s>:\n %s',
mdl.class_name, instance.name, instance.model,
instance.indexer.name, repr(e))
# --- Phase 3: set external variable RHS addresses ---
for mdl in models.values():
if mdl.flags.address is True:
logger.debug('%s RHS address exists', mdl.class_name)
continue
if mdl.n == 0:
continue
for item in mdl.states_ext.values():
# skip if no equation, i.e., no RHS value
if item.e_str is None:
continue
item.set_address(np.arange(self.dae.p, self.dae.p + item.n))
self.dae.p += item.n
for item in mdl.algebs_ext.values():
if item.e_str is None:
continue
item.set_address(np.arange(self.dae.q, self.dae.q + item.n))
self.dae.q += item.n
mdl.flags.address = True
# allocate memory for DAE arrays
self.dae.resize_arrays()
# set `v` and `e` in variables
self.set_var_arrays(models=models)
self.dae.alloc_or_extend_names()
def set_dae_names(self, models):
"""
Set variable names for differential and algebraic variables,
right-hand side of external equations, and discrete flags.
"""
for mdl in models.values():
_set_xy_name(mdl, mdl.states, (self.dae.x_name, self.dae.x_tex_name))
_set_xy_name(mdl, mdl.algebs, (self.dae.y_name, self.dae.y_tex_name))
_set_hi_name(mdl, mdl.states_ext, (self.dae.h_name, self.dae.h_tex_name))
_set_hi_name(mdl, mdl.algebs_ext, (self.dae.i_name, self.dae.i_tex_name))
_set_xy_name(mdl, mdl.observables, (self.dae.b_name, self.dae.b_tex_name))
# build reverse map: addr → (model, var)
for var in mdl.states.values():
for addr in var.a:
self.dae.x_map[int(addr)] = (mdl, var)
for var in mdl.algebs.values():
for addr in var.a:
self.dae.y_map[int(addr)] = (mdl, var)
# add discrete flag names
if self.TDS.config.store_z == 1:
_set_z_name(mdl, self.dae, (self.dae.z_name, self.dae.z_tex_name))
def set_var_arrays(self, models, inplace=True, alloc=True):
"""
Set arrays (`v` and `e`) for internal variables to access dae arrays in
place.
This function needs to be called after de-serializing a System object,
where the internal variables are incorrectly assigned new memory.
Parameters
----------
models : OrderedDict, list, Model, optional
Models to execute.
inplace : bool
True to retrieve arrays that share memory with dae
alloc : bool
True to allocate for arrays internally
"""
for mdl in models.values():
if mdl.n == 0:
continue
for var in mdl.cache.vars_int.values():
var.set_arrays(self.dae, inplace=inplace, alloc=alloc)
for var in mdl.cache.vars_ext.values():
var.set_arrays(self.dae, inplace=inplace, alloc=alloc)
for var in mdl.observables.values():
var.set_arrays(self.dae, inplace=inplace, alloc=alloc)
def _init_numba(self, models: OrderedDict):
"""
Deprecated wrapper to :class:`andes.system.codegen.CodegenManager`.
"""
warnings.warn(
"System._init_numba() is deprecated and will be removed in v3.0. "
"Use system.codegen._init_numba() instead.",
DeprecationWarning,
stacklevel=2,
)
return self.codegen._init_numba(models)
def precompile(self,
models: Union[OrderedDict, None] = None,
nomp: bool = False,
ncpu: int = NCPUS):
"""
Delegate to :class:`andes.system.codegen.CodegenManager`.
"""
return self.codegen.precompile(models=models, nomp=nomp, ncpu=ncpu)
def init(self, models: OrderedDict, routine: str):
"""
Initialize the variables for each of the specified models.
For each model, the initialization procedure is:
- Get values for all `ExtService`.
- Call the model `init()` method, which initializes internal variables.
- Copy variables to DAE and then back to the model.
"""
self.codegen._init_numba(models)
for mdl in models.values():
# link externals services first
for instance in mdl.services_ext.values():
ext_name = instance.model
try:
ext_model = self.__dict__[ext_name]
except KeyError:
raise KeyError('<%s> is not a model or group name.' % ext_name)
try:
instance.link_external(ext_model)
except (IndexError, KeyError) as e:
logger.error('Error: <%s> cannot retrieve <%s> from <%s> using <%s>:\n %s',
mdl.class_name, instance.name, instance.model,
instance.indexer.name, repr(e))
# initialize variables second
mdl.init(routine=routine)
self.vars_to_dae(mdl)
self.vars_to_models()
self.s_update_post(models)
# store time constants associated with differential equations
self._store_tf(models)
def store_getters(self, models):
"""
Build ``_getters`` and ``_setters`` so that ``vars_to_models`` and
``vars_to_dae`` work during model initialization.
This is the lightweight subset of ``store_adder_setter`` needed
before ``system.init()``. The full ``store_adder_setter`` is
called after DAE compaction to build ``_adders`` and
``antiwindups`` as well.
"""
self._getters = dict(f=list(), g=list(), x=list(), y=list())
self._setters = dict(f=list(), g=list(), x=list(), y=list())
for mdl in models.values():
if not mdl.n:
continue
mdl.cache.refresh()
for var in mdl.cache.v_getters.values():
self._getters[var.v_code].append(var)
for var in mdl.cache.v_setters.values():
self._setters[var.v_code].append(var)
for var in mdl.cache.e_setters.values():
self._setters[var.e_code].append(var)
def store_adder_setter(self, models):
"""
Store non-inplace adders and setters for variables and equations.
"""
self._clear_adder_setter()
for mdl in models.values():
# Note:
# We assume that a Model with no device is not addressed and, therefore,
# contains no value in each variable.
# It is always true for the current architecture.
if not mdl.n:
continue
# skip models where all devices have been replaced by dynamic models
if mdl._all_replaced:
continue
# Fixes an issue if the cache was manually built but stale
# after assigning addresses for simulation
# Assigning memory will affect the cache of `v_adders` and `e_adders`.
mdl.cache.refresh()
# ``getters` that retrieve variable values from DAE
for var in mdl.cache.v_getters.values():
self._getters[var.v_code].append(var)
# ``adders`` that add variable values to the DAE array
for var in mdl.cache.v_adders.values():
self._adders[var.v_code].append(var)
for var in mdl.cache.e_adders.values():
self._adders[var.e_code].append(var)
# ``setters`` that force set variable values in the DAE array
for var in mdl.cache.v_setters.values():
self._setters[var.v_code].append(var)
for var in mdl.cache.e_setters.values():
self._setters[var.e_code].append(var)
# ``antiwindups`` stores all AntiWindup instances
for item in mdl.discrete.values():
if isinstance(item, AntiWindup):
self.antiwindups.append(item)
return
def store_no_check_init(self, models):
"""
Store differential variables with ``check_init == False``.
"""
self.no_check_init = list()
for mdl in models.values():
if mdl.n == 0:
continue
for var in mdl.states.values():
if var.check_init is False:
self.no_check_init.extend(var.a)
def link_ext_param(self, model=None):
"""
Retrieve values for ``ExtParam`` for the given models.
"""
if model is None:
models = self.models
else:
models = self._get_models(model)
ret = True
for model in models.values():
# get external parameters with `link_external` and then calculate the pu coeff
for instance in model.params_ext.values():
ext_name = instance.model
# '__self__' is a sentinel for self-referencing ExtParam
# (e.g. ``ue`` which copies the model's own ``u``)
if ext_name == '__self__':
ext_model = model
else:
ext_model = self.__dict__[ext_name]
try:
instance.link_external(ext_model)
except (IndexError, KeyError) as e:
logger.error('Error: <%s> cannot retrieve <%s> from <%s> using <%s>:\n %s',
model.class_name, instance.name, instance.model,
instance.indexer.name if instance.indexer else 'None',
repr(e))
ret = False
return ret
def calc_pu_coeff(self):
"""
Perform per unit value conversion.
This function calculates the per unit conversion factors, stores input
parameters to `vin`, and perform the conversion.
"""
# `Sb`, `Vb` and `Zb` are the system base, bus base values
# `Sn`, `Vn` and `Zn` are the device bases
Sb = self.config.mva
for mdl in self.models.values():
# default Sn to Sb if not provided. Some controllers might not have Sn or Vn.
if 'Sn' in mdl.__dict__:
Sn = mdl.Sn.v
else:
Sn = Sb
# If both Vn and Vn1 are not provided, default to Vn = Vb = 1
# test if is shunt-connected or series-connected to bus, or unconnected to bus
Vb, Vn = 1, 1
if 'bus' in mdl.__dict__:
Vb = self.Bus.get(src='Vn', idx=mdl.bus.v, attr='v')
Vn = mdl.Vn.v if 'Vn' in mdl.__dict__ else Vb
elif 'bus1' in mdl.__dict__:
Vb = self.Bus.get(src='Vn', idx=mdl.bus1.v, attr='v')
Vn = mdl.Vn1.v if 'Vn1' in mdl.__dict__ else Vb
Zn = Vn ** 2 / Sn
Zb = Vb ** 2 / Sb
# process dc parameter pu conversion
Vdcb, Vdcn, Idcn = 1, 1, 1
if 'node' in mdl.__dict__:
Vdcb = self.Node.get(src='Vdcn', idx=mdl.node.v, attr='v')
Vdcn = mdl.Vdcn.v if 'Vdcn' in mdl.__dict__ else Vdcb
Idcn = mdl.Idcn.v if 'Idcn' in mdl.__dict__ else (Sb / Vdcb)
elif 'node1' in mdl.__dict__:
Vdcb = self.Node.get(src='Vdcn', idx=mdl.node1.v, attr='v')
Vdcn = mdl.Vdcn1.v if 'Vdcn1' in mdl.__dict__ else Vdcb
Idcn = mdl.Idcn.v if 'Idcn' in mdl.__dict__ else (Sb / Vdcb)
Idcb = Sb / Vdcb
Rb = Vdcb / Idcb
Rn = Vdcn / Idcn
coeffs = {'voltage': Vn / Vb,
'power': Sn / Sb,
'ipower': Sb / Sn,
'current': (Sn / Vn) / (Sb / Vb),
'z': Zn / Zb,
'y': Zb / Zn,
'dc_voltage': Vdcn / Vdcb,
'dc_current': Idcn / Idcb,
'r': Rn / Rb,
'g': Rb / Rn,
}
for prop, coeff in coeffs.items():
for p in mdl.find_param(prop).values():
p.set_pu_coeff(coeff)
# store coeffs and bases back in models.
mdl.coeffs = coeffs
mdl.bases = {'Sn': Sn, 'Sb': Sb, 'Vn': Vn, 'Vb': Vb, 'Zn': Zn, 'Zb': Zb}
def l_update_var(self, models: OrderedDict, niter=0, err=None):
"""
Update variable-based limiter discrete states by calling ``l_update_var`` of models.
This function is must be called before any equation evaluation.
"""
self.call_models('l_update_var', models,
dae_t=self.dae.t, niter=niter, err=err)
def l_update_eq(self, models: OrderedDict, init=False, niter=0):
"""
Update equation-dependent limiter discrete components by calling ``l_check_eq`` of models.
Force set equations after evaluating equations.
This function is must be called after differential equation updates.
"""
self.call_models('l_check_eq', models, init=init, niter=niter)
def s_update_var(self, models: OrderedDict):
"""
Update variable services by calling ``s_update_var`` of models.
This function is must be called before any equation evaluation after
limiter update function `l_update_var`.
"""
self.call_models('s_update_var', models)
def s_update_post(self, models: OrderedDict):
"""
Update variable services by calling ``s_update_post`` of models.
This function is called at the end of `System.init()`.
"""
self.call_models('s_update_post', models)
def fg_to_dae(self):
"""
Collect equation values into the DAE arrays.
Additionally, the function resets the differential equations associated with variables pegged by
anti-windup limiters.
"""
self._e_to_dae(('f', 'g'))
# reset mismatches for islanded buses
self.g_islands()
# update variable values set by anti-windup limiters
for item in self.antiwindups:
if len(item.x_set) > 0:
for key, val, _ in item.x_set:
np.put(self.dae.x, key, val)
def f_update(self, models: OrderedDict):
"""
Call the differential equation update method for models in sequence.
Notes
-----
Updated equation values remain in models and have not been collected into DAE at the end of this step.
"""
try:
self.call_models('f_update', models)
except TypeError as e:
logger.error("f_update failed. Have you run `andes prepare -i` after updating?")
raise e
def g_update(self, models: OrderedDict):
"""
Call the algebraic equation update method for models in sequence.
Notes
-----
Like `f_update`, updated values have not collected into DAE at the end of the step.
"""
try:
self.call_models('g_update', models)
except TypeError as e:
logger.error("g_update failed. Have you run `andes prepare -i` after updating?")
raise e
def b_update(self, models: OrderedDict):
"""
Call the observable variable update method for models in sequence.
This evaluates all Observable variables post-solve and stores
values in ``dae.b``.
"""
try:
self.call_models('b_update', models)
except TypeError as e:
logger.error("b_update failed. Have you run `andes prepare -i` after updating?")
raise e
def g_islands(self):
"""
Reset algebraic mismatches for islanded buses.
"""
if self.Bus.n_islanded_buses == 0:
return
self.dae.g[self.Bus.islanded_a] = 0.0
self.dae.g[self.Bus.islanded_v] = 0.0
def j_update(self, models: OrderedDict, info=None):
"""
Call the Jacobian update method for models in sequence.
The procedure is
- Restore the sparsity pattern with :py:func:`andes.variables.dae.DAE.restore_sparse`
- For each sparse matrix in (fx, fy, gx, gy), evaluate the Jacobian function calls and add values.
Notes
-----
Updated Jacobians are immediately reflected in the DAE sparse matrices (fx, fy, gx, gy).
"""
self.call_models('j_update', models)
self.dae.restore_sparse()
# collect sparse values into sparse structures
for j_name in jac_names:
j_size = self.dae.get_size(j_name)
for mdl in models.values():
for rows, cols, vals in mdl.triplets.zip_ijv(j_name):
try:
if self.runtime.ipadd:
self.dae.__dict__[j_name].ipadd(vals, rows, cols)
else:
self.dae.__dict__[j_name] += spmatrix(vals, rows, cols, j_size, 'd')
except TypeError as e:
logger.error("Error adding Jacobian triplets to existing sparsity pattern.")
logger.error(f'{mdl.class_name}: j_name {j_name}, row={rows}, col={cols}, val={vals}, '
f'j_size={j_size}')
raise e
self.j_islands()
if info:
logger.debug("Jacobian updated at t=%.6f: %s.", self.dae.t, info)
else:
logger.debug("Jacobian updated at t=%.6f.", self.dae.t)
def j_islands(self):
"""
Set gy diagonals to eps for `a` and `v` variables of islanded buses.
"""
if self.Bus.n_islanded_buses == 0:
return
aidx = self.Bus.islanded_a
vidx = self.Bus.islanded_v
if self.runtime.ipadd:
self.dae.gy.ipset(self.config.diag_eps, aidx, aidx)
self.dae.gy.ipset(0.0, aidx, vidx)
self.dae.gy.ipset(self.config.diag_eps, vidx, vidx)
self.dae.gy.ipset(0.0, vidx, aidx)
else:
avals = [-self.dae.gy[int(idx), int(idx)] + self.config.diag_eps for idx in aidx]
vvals = [-self.dae.gy[int(idx), int(idx)] + self.config.diag_eps for idx in vidx]
self.dae.gy += spmatrix(avals, aidx, aidx, self.dae.gy.size, 'd')
self.dae.gy += spmatrix(vvals, vidx, vidx, self.dae.gy.size, 'd')
def store_sparse_pattern(self, models: OrderedDict):
"""
Collect and store the sparsity pattern of Jacobian matrices.
This is a runtime function specific to cases.
Notes
-----
For `gy` matrix, always make sure the diagonal is reserved.
It is a safeguard if the modeling user omitted the diagonal
term in the equations.
"""
self.call_models('store_sparse_pattern', models)
# add variable jacobian values
for jname in jac_names:
ii, jj, vv = list(), list(), list()
# for `gy`, reserve memory for the main diagonal
if jname == 'gy':
ii.extend(np.arange(self.dae.m))
jj.extend(np.arange(self.dae.m))
vv.extend(np.zeros(self.dae.m))
for mdl in models.values():
for row, col, val in mdl.triplets.zip_ijv(jname):
ii.extend(row)
jj.extend(col)
vv.extend(np.zeros_like(row))
for row, col, val in mdl.triplets.zip_ijv(jname + 'c'):
# process constant Jacobians separately
ii.extend(row)
jj.extend(col)
vv.extend(val * np.ones_like(row))
if len(ii) > 0:
ii = np.array(ii, dtype=int)
jj = np.array(jj, dtype=int)
vv = np.array(vv, dtype=float)
self.dae.store_sparse_ijv(jname, ii, jj, vv)
self.dae.build_pattern(jname)
# set sink diagonal to 1.0 for compacted replaced devices
si = getattr(self, '_y_sink_idx', None)
if si is not None:
self.dae.tpl['gy'][si, si] = 1.0
self.dae.gy[si, si] = 1.0
def vars_to_dae(self, model):
"""
Copy variables values from models to `System.dae`.
This function clears `DAE.x` and `DAE.y` and collects values from models.
"""
self._v_to_dae('x', model)
self._v_to_dae('y', model)
def vars_to_models(self):
"""
Copy variable values from `System.dae` to models.
"""
for var in self._getters['y']:
if var.n > 0:
var.v[:] = self.dae.y[var.a]
for var in self._getters['x']:
if var.n > 0:
var.v[:] = self.dae.x[var.a]
def build_ybus(self):
"""
Build bus admittance matrix by aggregating contributions from all
models with ``flags.ybus == True``.
Each contributing model must implement a ``build_ybus()`` method
returning a kvxopt ``spmatrix`` of size ``(nb, nb)`` with
typecode ``'z'``.
Returns
-------
spmatrix
Bus admittance matrix (sparse, complex).
Notes
-----
Uses ``exist.ybus``, populated once during ``setup()``.
Device status changes after setup are reflected via ``u.v``
at call time, but the set of contributing models is fixed.
"""
nb = self.Bus.n
Y = spmatrix([], [], [], (nb, nb), 'z')
for mdl in self.exist.ybus.values():
Y += mdl.build_ybus()
return Y
def connectivity(self, info=True):
"""
Delegate to :meth:`ConnMan.check_connectivity`.
"""
return self.conn.check_connectivity(info=info)
def to_ipysheet(self, model: str, vin: bool = False):
"""
Return an ipysheet object for editing in Jupyter Notebook.
"""
from ipysheet import from_dataframe
return from_dataframe(self.models[model].as_df(vin=vin))
def from_ipysheet(self, model: str, sheet, vin: bool = False):
"""
Set an ipysheet object back to model.
"""
from ipysheet import to_dataframe
df = to_dataframe(sheet)
self.models[model].update_from_df(df, vin=vin)
def summary(self):
"""
Delegate to :meth:`ConnMan.summary`.
"""
return self.conn.summary()
def _v_to_dae(self, v_code, model):
"""
Helper function for collecting variable values into ``dae``
structures `x` and `y`.
This function must be called with ``dae.x`` and ``dae.y``
both being zeros.
Otherwise, adders will be summed again, causing an error.
Parameters
----------
v_code : 'x' or 'y'
Variable type name
"""
if model.n == 0:
return
if model.flags.initialized is False:
return
for var in model.cache.v_adders.values():
if var.v_code != v_code:
continue
np.add.at(self.dae.__dict__[v_code], var.a, var.v)
for var in self._setters[v_code]:
if var.owner.flags.initialized is False:
continue
if var.n > 0:
np.put(self.dae.__dict__[v_code], var.a, var.v)
def _e_to_dae(self, eq_name: Union[str, Tuple] = ('f', 'g')):
"""
Helper function for collecting equation values into `System.dae.f` and `System.dae.g`.
Parameters
----------
eq_name : 'x' or 'y' or tuple
Equation type name
"""
if isinstance(eq_name, str):
eq_name = [eq_name]
for name in eq_name:
for var in self._adders[name]:
np.add.at(self.dae.__dict__[name], var.a, var.e)
for var in self._setters[name]:
np.put(self.dae.__dict__[name], var.a, var.e)
def get_z(self, models: OrderedDict):
"""
Get all discrete status flags in a numpy array.
Values are written to ``dae.z`` in place.
Returns
-------
numpy.array
"""
if self.TDS.config.store_z != 1:
return None
if len(self.dae.z) != self.dae.o:
self.dae.z = np.zeros(self.dae.o, dtype=float)
ii = 0
for mdl in models.values():
if mdl.n == 0 or len(mdl._input_z) == 0:
continue
for zz in mdl._input_z.values():
self.dae.z[ii:ii + mdl.n] = zz
ii += mdl.n
return self.dae.z
def find_models(self, flag: Optional[Union[str, Tuple]], skip_zero: bool = True):
"""
Find models with at least one of the flags as True.
Warnings
--------
Checking the number of devices has been centralized into this function.
``models`` passed to most System calls must be retrieved from here.
Parameters
----------
flag : list, str
Flags to find
skip_zero : bool
Skip models with zero devices
Returns
-------
OrderedDict
model name : model instance
"""
if isinstance(flag, str):
flag = [flag]
out = OrderedDict()
for name, mdl in self.models.items():
if skip_zero is True:
if (mdl.n == 0) or (mdl.in_use is False):
continue
for f in flag:
if mdl.flags.__dict__[f] is True:
out[name] = mdl
break
return out
def undill(self, autogen_stale=True):
"""
Delegate to :class:`andes.system.codegen.CodegenManager`.
"""
return self.codegen.undill(autogen_stale=autogen_stale)
def _load_calls(self):
"""
Deprecated wrapper to :class:`andes.system.codegen.CodegenManager`.
"""
warnings.warn(
"System._load_calls() is deprecated and will be removed in v3.0. "
"Use system.codegen._load_calls() instead.",
DeprecationWarning,
stacklevel=2,
)
return self.codegen._load_calls()
def _expand_pycode(self, pycode_module):
"""
Deprecated wrapper to :class:`andes.system.codegen.CodegenManager`.
"""
warnings.warn(
"System._expand_pycode() is deprecated and will be removed in v3.0. "
"Use system.codegen._expand_pycode() instead.",
DeprecationWarning,
stacklevel=2,
)
return self.codegen._expand_pycode(pycode_module)
def _get_models(self, models):
"""
Helper function for sanitizing the ``models`` input.
The output is an OrderedDict of model names and instances.
"""
out = OrderedDict()
if isinstance(models, OrderedDict):
out.update(models)
elif models is None:
out.update(self.exist.pflow)
elif isinstance(models, str):
out[models] = self.__dict__[models]
elif isinstance(models, Model):
out[models.class_name] = models
elif isinstance(models, list):
for item in models:
if isinstance(item, Model):
out[item.class_name] = item
elif isinstance(item, str):
out[item] = self.__dict__[item]
else:
raise TypeError(f'Unknown type {type(item)}')
return out
def _store_tf(self, models):
"""
Store the inverse time constant associated with equations.
"""
for mdl in models.values():
for var in mdl.cache.states_and_ext.values():
if var.t_const is not None:
np.put(self.dae.Tf, var.a, var.t_const.v)
def call_models(self, method: str, models: OrderedDict, *args, **kwargs):
"""
Call methods on the given models.
Parameters
----------
method : str
Name of the model method to be called
models : OrderedDict, list, str
Models on which the method will be called
args
Positional arguments to be passed to the model method
kwargs
Keyword arguments to be passed to the model method
Returns
-------
The return value of the models in an OrderedDict
"""
ret = OrderedDict()
for name, mdl in models.items():
ret[name] = getattr(mdl, method)(*args, **kwargs)
if self.runtime.save_stats:
if method not in self.call_stats[name]:
self.call_stats[name][method] = 1
else:
self.call_stats[name][method] += 1
return ret
def check_group_common(self):
"""
Delegate to :class:`andes.system.registry.RegistryLoader`.
"""
return self.registry.check_group_common()
def _check_group_common(self):
"""
Check if all group common variables and parameters are met.
"""
warnings.warn(
"System._check_group_common() is deprecated and will be removed in v3.0. "
"Use system.registry.check_group_common() instead.",
DeprecationWarning,
stacklevel=2,
)
return self.registry.check_group_common()
def collect_ref(self):
"""
Collect indices into `BackRef` for all models.
"""
models_and_groups = list(self.models.values()) + list(self.groups.values())
# Auto-create BackRef on destination groups/models for status_parent IdxParams.
# This allows the framework to propagate status without requiring manual
# BackRef declarations on every parent (e.g., Bus/ACNode doesn't need to
# pre-declare BackRef for every child group).
for model in models_and_groups:
if not hasattr(model, 'idx_params') or model.n == 0:
continue
for idxp in model.idx_params.values():
if not getattr(idxp, 'status_parent', False) or idxp.model is None:
continue
dest = self.__dict__.get(idxp.model)
if dest is None or dest.n == 0:
continue
for ref_name in (model.group, model.class_name):
if ref_name not in dest.services_ref:
br = BackRef(info=f'auto for status from {model.class_name}')
setattr(dest, ref_name, br)
# create an empty list of lists for all `BackRef` instances
for model in models_and_groups:
for ref in model.services_ref.values():
ref.v = [list() for _ in range(model.n)]
# `model` is the model who stores `IdxParam`s to other models
# `BackRef` is declared at other models specified by the `model` parameter
# of `IdxParam`s.
for model in models_and_groups:
if model.n == 0:
continue
# skip: a group is not allowed to link to other groups
if not hasattr(model, "idx_params"):
continue
for idxp in model.idx_params.values():
if (idxp.model not in self.models) and (idxp.model not in self.groups):
continue
dest = self.__dict__[idxp.model]
if dest.n == 0:
continue
for name in (model.class_name, model.group):
# `BackRef` not requested by the linked models or groups
if name not in dest.services_ref:
continue
for model_idx, dest_idx in zip(model.idx.v, idxp.v):
if dest_idx not in dest.uid:
continue
dest.set_backref(name,
from_idx=model_idx,
to_idx=dest_idx)
# set model ``in_use`` flag
if isinstance(model, Model):
model.set_in_use()
def _collect_ref_index(self, status_parent_only=False):
"""
Build a reverse index from ``IdxParam`` declarations.
Returns a dict mapping each target group/model name to a list of
``(source_model_name, param_attr_name)`` tuples.
Parameters
----------
status_parent_only : bool, optional
If ``True``, include only ``IdxParam`` instances with
``status_parent=True``. Default is ``False`` (all references).
Returns
-------
dict
``{target_group_or_model: [(source_model, param_name), ...]}``
Examples
--------
With ``status_parent_only=False`` (all references)::
{
'ACNode': [('PQ', 'bus'), ('PV', 'bus'), ('Line', 'bus1'),
('Line', 'bus2'), ('GENROU', 'bus'), ...],
'SynGen': [('TGOV1', 'syn'), ('ESST3A', 'syn'), ...],
'Exciter': [('IEEEST', 'avr'), ...],
'Bus': [('Fault', 'bus'), ('PV', 'busr'), ...],
...
}
With ``status_parent_only=True``, only entries where the
``IdxParam`` has ``status_parent=True`` are included (a subset
of the above).
"""
ref_index = {}
for mdl_name, mdl in self.models.items():
if mdl.n == 0:
continue
if not hasattr(mdl, 'idx_params'):
continue
for p_name, p_instance in mdl.idx_params.items():
if status_parent_only and not getattr(p_instance, 'status_parent', False):
continue
target = p_instance.model
if target is None:
continue
if target not in ref_index:
ref_index[target] = []
ref_index[target].append((mdl_name, p_name))
return ref_index
def _build_status_graph(self):
"""
Build the status propagation graph from ``IdxParam`` declarations
with ``status_parent=True``.
Populates ``self._status_children`` and ``self._status_parent_map``
used by :meth:`set_status` for recursive propagation.
After this method runs, the two dicts look like::
_status_parent_map = {
'GENROU': [('bus', 'ACNode')],
'ESST3A': [('syn', 'SynGen')],
'TGOV1': [('syn', 'SynGen')],
'Line': [('bus1', 'ACNode'), ('bus2', 'ACNode')],
...
}
_status_children = {
'ACNode': {'PQ', 'PV', 'Slack', 'StaticGen', 'SynGen',
'GENROU', 'Line', 'ACLine', 'Shunt', ...},
'SynGen': {'ESST3A', 'TGOV1', 'Exciter', 'TurbineGov', ...},
...
}
Values in ``_status_children`` are BackRef names present on the
parent group, which may be group names (e.g., ``'Exciter'``) or
model names (e.g., ``'ESST3A'``).
"""
self._status_children = {}
self._status_parent_map = {}
for target, refs in self._collect_ref_index(status_parent_only=True).items():
for mdl_name, p_name in refs:
# Record child -> parent mapping (list of tuples for multi-parent)
if mdl_name not in self._status_parent_map:
self._status_parent_map[mdl_name] = []
self._status_parent_map[mdl_name].append((p_name, target))
# Record parent -> child BackRef names to follow
if target not in self._status_children:
self._status_children[target] = set()
parent_obj = self.__dict__.get(target)
if parent_obj is None:
continue
mdl = self.models[mdl_name]
child_group_name = mdl.group
child_model_name = mdl.class_name
if child_group_name in parent_obj.services_ref:
self._status_children[target].add(child_group_name)
if child_model_name in parent_obj.services_ref:
self._status_children[target].add(child_model_name)
def propagate_init_status(self):
"""
Propagate effective status from parent to child models at init time.
For each model with a ``status_parent``, recompute
``ue.v = u.v * parent_ue`` for every device, then recurse into
children. This replaces the previous ``ug``/``uee`` ExtParam
mechanism that fetched the parent's ``u`` at init.
Should be called once after ``system.init()`` for TDS models.
"""
if not self._status_children:
return
# Find top-level parent groups (groups that have children but
# are not themselves children of another group)
child_groups = {self.models[m].group for m in self._status_parent_map
if m in self.models}
parent_groups = set(self._status_children.keys()) - child_groups
# For each top-level parent group, propagate from every device
for grp_name in parent_groups:
grp = self.groups.get(grp_name)
if grp is None:
continue
for mdl_name in grp.models:
mdl = self.models.get(mdl_name)
if mdl is None or mdl.n == 0:
continue
for uid in range(mdl.n):
idx = mdl.idx.v[uid]
ue_val = self._get_effective_status(mdl, uid)
self._propagate_status(mdl, idx, uid, ue_val)
def _get_effective_status(self, mdl, uid):
"""
Return the effective status value for a device.
"""
return mdl.ue.v[uid]
def _get_parent_ue(self, mdl, uid):
"""
Get the combined parent effective status for a device.
For models with multiple parents (e.g., Line with bus1 and bus2),
returns the product of all parents' effective statuses.
Returns 1.0 if the model has no status parent.
"""
mdl_name = mdl.class_name
if mdl_name not in self._status_parent_map:
return 1.0
result = 1.0
for p_name, parent_group_name in self._status_parent_map[mdl_name]:
parent_idx = mdl.__dict__[p_name].v[uid]
if parent_idx is None:
continue
parent_obj = self.__dict__.get(parent_group_name)
if parent_obj is None:
continue
# Resolve to concrete model via group
if hasattr(parent_obj, 'idx2model'):
parent_mdl = parent_obj.idx2model(parent_idx)
else:
parent_mdl = parent_obj
parent_uid = parent_mdl.idx2uid(parent_idx)
result *= self._get_effective_status(parent_mdl, parent_uid)
return result
def _propagate_status(self, mdl, idx, uid, ue_val):
"""
Recursively propagate effective status to children via BackRef.
"""
group_name = mdl.group
if group_name not in self._status_children:
return
group = self.groups.get(group_name)
if group is None:
return
# Get group-level uid for this device
if idx not in group.uid:
return
group_uid = group.idx2uid(idx)
for backref_name in self._status_children[group_name]:
if backref_name not in group.services_ref:
continue
child_idx_list = group.services_ref[backref_name].v[group_uid]
for child_idx in child_idx_list:
# Resolve child idx to concrete model
if backref_name in self.groups:
child_group = self.groups[backref_name]
try:
child_mdl = child_group.idx2model(child_idx)
except KeyError:
continue
elif backref_name in self.models:
child_mdl = self.models[backref_name]
else:
continue
child_uid = child_mdl.idx2uid(child_idx)
parent_ue = self._get_parent_ue(child_mdl, child_uid)
child_ue = child_mdl.u.v[child_uid] * parent_ue
old_ue = child_mdl.ue.v[child_uid]
child_mdl.ue.v[child_uid] = child_ue
if child_mdl.flags.topo and child_ue != old_ue:
self.conn.invalidate()
# Recurse to children of this child
self._propagate_status(child_mdl, child_idx, child_uid, child_ue)
def set_status(self, model_or_group, idx, value):
"""
Set the online status of a device and propagate to dependents.
Sets ``u.v`` (the device's own status) and recomputes ``ue.v``
(effective status) for the device and all downstream dependents
recursively via BackRef.
Parameters
----------
model_or_group : str
Model name or group name containing the device.
idx : str, int, float
Device idx.
value : int or float
New status value (0 or 1).
"""
if model_or_group in self.groups:
grp = self.groups[model_or_group]
mdl = grp.idx2model(idx)
elif model_or_group in self.models:
mdl = self.models[model_or_group]
else:
raise KeyError(f"'{model_or_group}' is not a model or group name.")
uid = mdl.idx2uid(idx)
old_ue = mdl.ue.v[uid]
# Set u.v (device's own status)
mdl.u.v[uid] = value
# Recompute ue.v for this device
parent_ue = self._get_parent_ue(mdl, uid)
ue_val = value * parent_ue
mdl.ue.v[uid] = ue_val
if mdl.flags.topo and ue_val != old_ue:
self.conn.invalidate()
elif mdl.flags.ybus and ue_val != old_ue:
self.conn.invalidate_ybus()
# Propagate to children
self._propagate_status(mdl, idx, uid, ue_val)
def get_status(self, model_or_group, idx):
"""
Get the effective status of a device.
Returns ``ue.v`` (effective online status) for the device.
Parameters
----------
model_or_group : str
Model name or group name.
idx : str, int, float
Device idx.
Returns
-------
float
Effective status value (0 or 1).
"""
if model_or_group in self.groups:
grp = self.groups[model_or_group]
mdl = grp.idx2model(idx)
elif model_or_group in self.models:
mdl = self.models[model_or_group]
else:
raise KeyError(f"'{model_or_group}' is not a model or group name.")
uid = mdl.idx2uid(idx)
return self._get_effective_status(mdl, uid)
def find_connected(self, model_or_group, idx):
"""
Find all devices connected to a given device.
Scans ``IdxParam`` references across all models to find devices
that point to the specified target device.
Parameters
----------
model_or_group : str
Model name (e.g., ``'Bus'``) or group name (e.g., ``'ACNode'``).
idx : str, int, float
Device idx to query.
Returns
-------
OrderedDict
``{model_name: [idx, ...]}`` for each model with at least one
device referencing the target. Empty models are omitted.
Examples
--------
Find all devices connected to Bus 1::
ss.find_connected('Bus', 1)
# OrderedDict([('PQ', [1]), ('Line', [1, 4]), ('GENROU', [1])])
"""
# Build and cache the reverse index on first call
if not hasattr(self, '_ref_index'):
self._ref_index = self._collect_ref_index(status_parent_only=False)
# Resolve model name to group name and collect both keys
targets = set()
if model_or_group in self.groups:
targets.add(model_or_group)
elif model_or_group in self.models:
targets.add(model_or_group)
targets.add(self.models[model_or_group].group)
else:
raise KeyError(f"'{model_or_group}' is not a model or group name.")
result = OrderedDict()
for target in targets:
if target not in self._ref_index:
continue
for src_model, param_name in self._ref_index[target]:
mdl = self.models.get(src_model)
if mdl is None or mdl.n == 0:
continue
idxp = mdl.idx_params.get(param_name)
if idxp is None:
continue
matched = [mdl.idx.v[uid]
for uid, ref_idx in enumerate(idxp.v)
if ref_idx == idx]
if matched:
if src_model in result:
result[src_model].extend(matched)
else:
result[src_model] = matched
return result
def import_groups(self):
"""
Delegate to :class:`andes.system.registry.RegistryLoader`.
"""
warnings.warn(
"System.import_groups() is deprecated and will be removed in v3.0. "
"Use system.registry.import_groups() instead.",
DeprecationWarning,
stacklevel=2,
)
return self.registry.import_groups()
def import_models(self):
"""
Delegate to :class:`andes.system.registry.RegistryLoader`.
"""
warnings.warn(
"System.import_models() is deprecated and will be removed in v3.0. "
"Use system.registry.import_models() instead.",
DeprecationWarning,
stacklevel=2,
)
return self.registry.import_models()
def import_routines(self):
"""
Delegate to :class:`andes.system.registry.RegistryLoader`.
"""
warnings.warn(
"System.import_routines() is deprecated and will be removed in v3.0. "
"Use system.registry.import_routines() instead.",
DeprecationWarning,
stacklevel=2,
)
return self.registry.import_routines()
def store_switch_times(self, models, eps=1e-4):
"""
Store event switching time in a sorted Numpy array in ``System.switch_times``
and an OrderedDict ``System.switch_dict``.
``System.switch_dict`` has keys as event times and values as the OrderedDict
of model names and instances associated with the event.
Parameters
----------
models : OrderedDict
model name : model instance
eps : float
The small time step size to use immediately before
and after the event
Returns
-------
array-like
self.switch_times
"""
out = np.array([], dtype=float)
if self.options.get('flat') is True:
return out
names = []
for instance in models.values():
times = np.array(instance.get_times()).ravel()
out = np.append(out, times)
out = np.append(out, times - eps)
out = np.append(out, times + eps)
names.extend([instance.class_name] * (3 * len(times)))
# sort
sort_idx = np.argsort(out).astype(int)
out = out[sort_idx]
names = [names[i] for i in sort_idx]
# select t > current time
ltzero_idx = np.where(out >= self.dae.t)[0]
out = out[ltzero_idx]
names = [names[i] for i in ltzero_idx]
# make into an OrderedDict with unique keys and model names combined
for i, j in zip(out, names):
if i not in self.switch_dict:
self.switch_dict[i] = {j: self.models[j]}
else:
self.switch_dict[i].update({j: self.models[j]})
self.switch_times = np.array(list(self.switch_dict.keys()))
# self.switch_times = out
self.n_switches = len(self.switch_times)
return self.switch_times
def switch_action(self, models: OrderedDict):
"""
Invoke the actions associated with switch times.
This function will not be called if ``flat=True`` is passed to system.
"""
for instance in models.values():
instance.switch_action(self.dae.t)
# TODO: generalize below for any models with timeseries data.
self.TimeSeries.apply_exact(self.dae.t)
def _p_restore(self):
"""
Restore parameters stored in `vin`.
"""
for model in self.models.values():
for param in model.num_params.values():
param.restore()
def e_clear(self, models: OrderedDict):
"""
Clear equation arrays in DAE and model variables.
This step must be called before calling `f_update` or `g_update` to flush existing values.
"""
self.dae.clear_fg()
self.call_models('e_clear', models)
def remove_pycapsule(self):
"""
Remove PyCapsule objects in solvers.
"""
for r in self.routines.values():
r.solver.clear()
def _store_calls(self, models: OrderedDict):
"""
Deprecated wrapper to :class:`andes.system.codegen.CodegenManager`.
"""
warnings.warn(
"System._store_calls() is deprecated and will be removed in v3.0. "
"Use system.codegen._store_calls() instead.",
DeprecationWarning,
stacklevel=2,
)
return self.codegen._store_calls(models)
def _list2array(self):
"""
Helper function to call models' ``list2array`` method, which usually
performs memory preallocation.
"""
self.call_models('list2array', self.models)
def _report_param_corrections(self):
"""
Report parameter corrections for all models.
Emits grouped warnings for parameter values that were corrected
during device addition (e.g., non-zero, non-positive, non-negative violations).
"""
for mdl in self.models.values():
mdl.report_corrections()
def _check_setpoints(self):
"""
Validate ``_setpoints`` declarations on all models.
For every model that declares a ``_setpoints`` dict, verify that
each target attribute exists on the model and has a ``.v`` member.
Called once during :meth:`setup`.
"""
for mdl_name, mdl in self.models.items():
sp_map = getattr(mdl, '_setpoints', None)
if not sp_map:
continue
for sp_name, attr_name in sp_map.items():
attr = mdl.__dict__.get(attr_name)
if attr is None:
raise AttributeError(
f"Model <{mdl.class_name}> declares _setpoints"
f"['{sp_name}'] = '{attr_name}', but attribute"
f" '{attr_name}' does not exist."
)
if not hasattr(attr, 'v'):
raise AttributeError(
f"Model <{mdl.class_name}> declares _setpoints"
f"['{sp_name}'] = '{attr_name}', but '{attr_name}'"
f" has no '.v' array."
)
def set_config(self, config=None):
"""
Delegate to :class:`andes.system.config_runtime.SystemConfigRuntime`.
"""
return self.config_runtime.set_config(config=config)
def collect_config(self):
"""
Delegate to :class:`andes.system.config_runtime.SystemConfigRuntime`.
"""
return self.config_runtime.collect_config()
def save_config(self, file_path=None, overwrite=False):
"""
Delegate to :class:`andes.system.config_runtime.SystemConfigRuntime`.
"""
return self.config_runtime.save_config(file_path=file_path,
overwrite=overwrite)
def supported_models(self, export='plain'):
"""
Return the support group names and model names in a table.
Returns
-------
str
A table-formatted string for the groups and models
"""
def rst_ref(name, export):
"""
Refer to the model in restructuredText mode so that
it renders as a hyperlink.
"""
if export == 'rest':
return ":ref:`" + name + '`'
else:
return name
seen = set()
pairs = list()
for g in self.groups:
group = self.groups[g]
if id(group) in seen:
continue
seen.add(id(group))
models = list()
for m in group.models:
models.append(rst_ref(m, export))
if len(models) > 0:
pairs.append((rst_ref(g, export), ', '.join(models)))
tab = Tab(title='Supported Groups and Models',
header=['Group', 'Models'],
data=pairs,
export=export,
)
return tab.draw()
def as_dict(self, vin=False, skip_empty=True):
"""
Return system data as a dict where the keys are model names and values
are dicts. Each dict has parameter names as keys and corresponding data
in an array as values.
Returns
-------
OrderedDict
"""
out = OrderedDict()
for name, instance in self.models.items():
if skip_empty and instance.n == 0:
continue
out[name] = instance.as_dict(vin=vin)
return out
def set_output_subidx(self, models):
"""
Process :py:class:`andes.models.misc.Output` data and store the
sub-indices into ``dae.xy``.
Parameters
----------
models : OrderedDict
Models currently in use for the routine
"""
export_vars = dict(x=list(), y=list()) # indices of export x and y
for model, var, dev in zip(self.Output.model.v,
self.Output.varname.v,
self.Output.dev.v):
# check validity of model name
if model not in models:
logger.info("Output model <%s> invalid or contains no device. Skipped.",
model)
continue
mdl_instance = models[model]
mdl_all_vars = mdl_instance.cache.all_vars
# check validity of var name
if var is not None and (var not in mdl_all_vars):
logger.info("Output model <%s> contains no variable <%s>. Skipped.",
model, var)
continue
# check validity of dev idx
if (dev is not None) and (dev not in mdl_instance.idx.v):
logger.info("Output model <%s> contains no device <%s>. Skipped.",
model, dev)
continue
# TODO: dev-based indexing is not fully supported
# for multi-index variables, such as those in COI.
if var is None:
for item in mdl_all_vars.values():
if dev is None:
export_vars[item.v_code].extend(item.a)
else:
uid = mdl_instance.idx2uid(dev)
export_vars[item.v_code].append(item.a[uid])
else: # with variable name
item = mdl_all_vars[var]
if dev is None:
export_vars[item.v_code].extend(item.a)
else: # with exact index
uid = mdl_instance.idx2uid(dev)
export_vars[item.v_code].append(item.a[uid])
self.Output.xidx = sorted(np.unique(export_vars['x']))
self.Output.yidx = sorted(np.unique(export_vars['y']))