Source code for andes.system

"""
System class for power system data and methods.
"""

#  [ANDES] (C)2015-2024 Hantao Cui
#
#  This program is free software; you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation; either version 3 of the License, or
#  (at your option) any later version.
#
#  File name: system.py

import configparser
import importlib
import importlib.util
import inspect
import logging
import os
import sys
import time
from collections import OrderedDict, defaultdict
from typing import Dict, Optional, Tuple, Union

import andes.io
from andes.core import AntiWindup, Config, Model
from andes.io.streaming import Streaming
from andes.models import file_classes
from andes.models.group import GroupBase
from andes.routines import all_routines
from andes.shared import (NCPUS_PHYSICAL, Pool, Process, dilled_vars,
                          jac_names, matrix, np, sparse, spmatrix, numba)
from andes.utils.misc import elapsed
from andes.utils.paths import (andes_root, confirm_overwrite, get_config_path,
                               get_pycode_path)
from andes.utils.tab import Tab
from andes.variables import DAE, FileMan

logger = logging.getLogger(__name__)


[docs]class ExistingModels: """ Storage class for existing models """
[docs] def __init__(self): self.pflow = OrderedDict() self.tds = OrderedDict() # if a model needs to be initialized before TDS, set `flags.tds = True` self.pflow_tds = OrderedDict()
[docs]class System: """ System contains models and routines for modeling and simulation. System contains a several special `OrderedDict` member attributes for housekeeping. These attributes include `models`, `groups`, `routines` and `calls` for loaded models, groups, analysis routines, and generated numerical function calls, respectively. Parameters ---------- no_undill : bool, optional, default=False True to disable the call to ``System.undill()`` at the end of object creation. False by default. autogen_stale : bool, optional, default=True True to automatically generate code for stale models. Notes ----- System stores model and routine instances as attributes. Model and routine attribute names are the same as their class names. For example, `Bus` is stored at ``system.Bus``, the power flow calculation routine is at ``system.PFlow``, and the numerical DAE instance is at ``system.dae``. See attributes for the list of attributes. Attributes ---------- dae : andes.variables.dae.DAE Numerical DAE storage files : andes.variables.fileman.FileMan File path storage config : andes.core.Config System config storage models : OrderedDict model name and instance pairs groups : OrderedDict group name and instance pairs routines : OrderedDict routine name and instance pairs """
[docs] def __init__(self, case: Optional[str] = None, name: Optional[str] = None, config: Optional[Dict] = None, config_path: Optional[str] = None, default_config: Optional[bool] = False, options: Optional[Dict] = None, no_undill: Optional[bool] = False, autogen_stale: Optional[bool] = True, **kwargs ): self.name = name self.options = {} if options is not None: self.options.update(options) if kwargs: self.options.update(kwargs) self.calls = OrderedDict() # a dictionary with model names (keys) and their ``calls`` instance self.models = OrderedDict() # model names and instances self.model_aliases = OrderedDict() # alias: model instance self.groups = OrderedDict() # group names and instances self.routines = OrderedDict() # routine names and instances self.switch_times = np.array([]) # an array of ordered event switching times self.switch_dict = OrderedDict() # time: OrderedDict of associated models self.with_calls = False # if generated function calls have been loaded self.n_switches = 0 # number of elements in `self.switch_times` self.exit_code = 0 # command-line exit code, 0 - normal, others - error. # get and load default config file self._config_path = get_config_path() if config_path is not None: self._config_path = config_path if default_config is True: self._config_path = None self._config_object = load_config_rc(self._config_path) self._update_config_object() self.config = Config(self.__class__.__name__, dct=config) self.config.load(self._config_object) # custom configuration for system goes after this line self.config.add(OrderedDict((('freq', 60), ('mva', 100), ('ipadd', 1), ('seed', 'None'), ('diag_eps', 1e-8), ('warn_limits', 1), ('warn_abnormal', 1), ('dime_enabled', 0), ('dime_name', 'andes'), ('dime_address', 'ipc:///tmp/dime2'), ('numba', 0), ('numba_parallel', 0), ('numba_nopython', 1), ('yapf_pycode', 0), ('save_stats', 0), ('np_divide', 'warn'), ('np_invalid', 'warn'), ))) self.config.add_extra("_help", freq='base frequency [Hz]', mva='system base MVA', ipadd='use spmatrix.ipadd if available', seed='seed (or None) for random number generator', diag_eps='small value for Jacobian diagonals', warn_limits='warn variables initialized at limits', warn_abnormal='warn initialization out of normal values', numba='use numba for JIT compilation', numba_parallel='enable parallel for numba.jit', numba_nopython='nopython mode for numba', yapf_pycode='format generated code with yapf', save_stats='store statistics of function calls', np_divide='treatment for division by zero', np_invalid='treatment for invalid floating-point ops.', ) self.config.add_extra("_alt", freq="float", mva="float", ipadd=(0, 1), seed='int or None', warn_limits=(0, 1), warn_abnormal=(0, 1), numba=(0, 1), numba_parallel=(0, 1), numba_nopython=(0, 1), yapf_pycode=(0, 1), save_stats=(0, 1), np_divide={'ignore', 'warn', 'raise', 'call', 'print', 'log'}, np_invalid={'ignore', 'warn', 'raise', 'call', 'print', 'log'}, ) self.config.check() _config_numpy(seed=self.config.seed, divide=self.config.np_divide, invalid=self.config.np_invalid, ) self.exist = ExistingModels() self.files = FileMan(case=case, **self.options) # file path manager self.dae = DAE(system=self) # numerical DAE storage self.streaming = Streaming(self) # Dime2 streaming # dynamic imports of groups, models and routines self.import_groups() self.import_models() self.import_routines() # routine imports come after models self._getters = dict(f=list(), g=list(), x=list(), y=list()) self._adders = dict(f=list(), g=list(), x=list(), y=list()) self._setters = dict(f=list(), g=list(), x=list(), y=list()) self.antiwindups = list() self.no_check_init = list() # states for which initialization check is omitted self.call_stats = defaultdict(dict) # call statistics storage # internal flags self.is_setup = False # if system has been setup if not no_undill: self.undill(autogen_stale=autogen_stale)
def _update_config_object(self): """ Change config on the fly based on command-line options. """ config_option = self.options.get('config_option', None) if config_option is None: return if len(config_option) == 0: return newobj = False if self._config_object is None: self._config_object = configparser.ConfigParser() newobj = True for item in config_option: # check the validity of the config field # each field follows the format `SECTION.FIELD = VALUE` if item.count('=') != 1: raise ValueError('config_option "{}" must be an assignment expression'.format(item)) field, value = item.split("=") if field.count('.') != 1: raise ValueError('config_option left-hand side "{}" must use format SECTION.FIELD'.format(field)) section, key = field.split(".") section = section.strip() key = key.strip() value = value.strip() if not newobj: self._config_object.set(section, key, value) logger.debug("Existing config option set: %s.%s=%s", section, key, value) else: self._config_object.add_section(section) self._config_object.set(section, key, value) logger.debug("New config option added: %s.%s=%s", section, key, value)
[docs] def reload(self, case, **kwargs): """ Reload a new case in the same System object. """ self.options.update(kwargs) self.files.set(case=case, **kwargs) # TODO: clear all flags and empty data andes.io.parse(self) self.setup()
def _clear_adder_setter(self): """ Clear adders and setters storage """ self._getters = dict(f=list(), g=list(), x=list(), y=list()) self._adders = dict(f=list(), g=list(), x=list(), y=list()) self._setters = dict(f=list(), g=list(), x=list(), y=list())
[docs] def prepare(self, quick=False, incremental=False, models=None, nomp=False, ncpu=NCPUS_PHYSICAL): """ Generate numerical functions from symbolically defined models. All procedures in this function must be independent of test case. Parameters ---------- quick : bool, optional True to skip pretty-print generation to reduce code generation time. incremental : bool, optional True to generate only for modified models, incrementally. models : list, OrderedDict, None List or OrderedList of models to prepare nomp : bool True to disable multiprocessing Notes ----- Option ``incremental`` compares the md5 checksum of all var and service strings, and only regenerate for updated models. Examples -------- If one needs to print out LaTeX-formatted equations in a Jupyter Notebook, one need to generate such equations with :: import andes sys = andes.prepare() Alternatively, one can explicitly create a System and generate the code :: import andes sys = andes.System() sys.prepare() Warnings -------- Generated lambda functions will be serialized to file, but pretty prints (SymPy objects) can only exist in the System instance on which prepare is called. """ if incremental is True: mode_text = 'rapid incremental mode' elif quick is True: mode_text = 'quick mode' else: mode_text = 'full mode' logger.info('Numerical code generation (%s) started...', mode_text) t0, _ = elapsed() # consistency check for group parameters and variables self._check_group_common() # get `pycode` folder path without automatic creation pycode_path = get_pycode_path(self.options.get("pycode_path"), mkdir=False) # determine which models to prepare based on mode and `models` list. if incremental and models is None: if not self.with_calls: self._load_calls() models = self._find_stale_models() elif not incremental and models is None: models = self.models else: models = self._get_models(models) total = len(models) width = len(str(total)) if nomp is False: print(f"Generating code for {total} models on {ncpu} processes.") self._mp_prepare(models, quick, pycode_path, ncpu=ncpu) else: for idx, (name, model) in enumerate(models.items()): print(f"\r\x1b[K Generating code for {name} ({idx+1:>{width}}/{total:>{width}}).", end='\r', flush=True) model.prepare(quick=quick, pycode_path=pycode_path) if len(models) > 0: self._finalize_pycode(pycode_path) self._store_calls(models) _, s = elapsed(t0) logger.info('Generated numerical code for %d models in %s.', len(models), s)
def _mp_prepare(self, models, quick, pycode_path, ncpu): """ Wrapper for multiprocessed code generation. Parameters ---------- models : OrderedDict model name : model instance pairs quick : bool True to skip LaTeX string generation pycode_path : str Path to store `pycode` folder ncpu : int Number of processors to use """ # create empty models without dependency if len(models) == 0: return model_names = list(models.keys()) model_list = list() for fname, cls_list in file_classes: for model_name in cls_list: if model_name not in model_names: continue the_module = importlib.import_module('andes.models.' + fname) the_class = getattr(the_module, model_name) model_list.append(the_class(system=None, config=self._config_object)) yapf_pycode = self.config.yapf_pycode def _prep_model(model: Model, ): """ Wrapper function to call prepare on a model. """ model.prepare(quick=quick, pycode_path=pycode_path, yapf_pycode=yapf_pycode ) Pool(ncpu).map(_prep_model, model_list) def _finalize_pycode(self, pycode_path): """ Helper function for finalizing pycode generation by writing ``__init__.py`` and reloading ``pycode`` package. """ init_path = os.path.join(pycode_path, '__init__.py') with open(init_path, 'w') as f: f.write(f"__version__ = '{andes.__version__}'\n\n") for name in self.models.keys(): f.write(f"from . import {name:20s} # NOQA\n") f.write('\n') logger.info('Saved generated pycode to "%s"', pycode_path) # RELOAD REQUIRED as the generated Jacobian arguments may be in a different order self._load_calls() def _find_stale_models(self): """ Find models whose ModelCall are stale using md5 checksum. """ out = OrderedDict() for model in self.models.values(): calls_md5 = getattr(model.calls, 'md5', None) if calls_md5 != model.get_md5(): out[model.class_name] = model return out def _to_orddct(self, model_list): """ Helper function to convert a list of model names to OrderedDict with name as keys and model instances as values. """ if isinstance(model_list, OrderedDict): return model_list if isinstance(model_list, list): out = OrderedDict() for name in model_list: if name not in self.models: logger.error("Model <%s> does not exist. Check your inputs.", name) continue out[name] = self.models[name] return out else: raise TypeError("Type %s not recognized" % type(model_list))
[docs] def setup(self): """ Set up system for studies. This function is to be called after adding all device data. """ ret = True t0, _ = elapsed() if self.is_setup: logger.warning('System has been setup. Calling setup twice is not allowed.') ret = False return ret self.collect_ref() self._list2array() # `list2array` must come before `link_ext_param` if not self.link_ext_param(): ret = False self.find_devices() # find or add required devices # === no device addition or removal after this point === self.calc_pu_coeff() # calculate parameters in system per units self.store_existing() # store models with routine flags # assign address at the end before adding devices and processing parameters self.set_address(self.exist.pflow) self.set_dae_names(self.exist.pflow) # needs perf. optimization self.store_sparse_pattern(self.exist.pflow) self.store_adder_setter(self.exist.pflow) if ret is True: self.is_setup = True # set `is_setup` if no error occurred else: logger.error("System setup failed. Please resolve the reported issue(s).") self.exit_code += 1 _, s = elapsed(t0) logger.info('System internal structure set up in %s.', s) return ret
[docs] def store_existing(self): """ Store existing models in `System.existing`. TODO: Models with `TimerParam` will need to be stored anyway. This will allow adding switches on the fly. """ self.exist.pflow = self.find_models('pflow') self.exist.tds = self.find_models('tds') self.exist.pflow_tds = self.find_models(('tds', 'pflow'))
[docs] def reset(self, force=False): """ Reset to the state after reading data and setup (before power flow). Warnings -------- If TDS is initialized, reset will lead to unpredictable state. """ if self.TDS.initialized is True and not force: logger.error('Reset failed because TDS is initialized. \nPlease reload the test case to start over.') return self.dae.reset() self.call_models('a_reset', models=self.models) self.e_clear(models=self.models) self._p_restore() self.is_setup = False self.setup()
[docs] def add(self, model, param_dict=None, **kwargs): """ Add a device instance for an existing model. This methods calls the ``add`` method of `model` and registers the device `idx` to group. """ if model not in self.models and (model not in self.model_aliases): logger.warning("<%s> is not an existing model.", model) return if self.is_setup: raise NotImplementedError("Adding devices are not allowed after setup.") group_name = self.__dict__[model].group group = self.groups[group_name] if param_dict is None: param_dict = {} if kwargs is not None: param_dict.update(kwargs) # remove `uid` field param_dict.pop('uid', None) idx = param_dict.pop('idx', None) if idx is not None and (not isinstance(idx, str) and np.isnan(idx)): idx = None idx = group.get_next_idx(idx=idx, model_name=model) self.__dict__[model].add(idx=idx, **param_dict) group.add(idx=idx, model=self.__dict__[model]) return idx
[docs] def find_devices(self): """ Add dependent devices for all model based on `DeviceFinder`. """ for mdl in self.models.values(): if len(mdl.services_fnd) == 0: continue for fnd in mdl.services_fnd.values(): fnd.find_or_add(self)
[docs] def set_address(self, models): """ Set addresses for differential and algebraic variables. """ # --- Phase 1: set internal variable addresses --- for mdl in models.values(): if mdl.flags.address is True: logger.debug('%s internal address exists', mdl.class_name) continue if mdl.n == 0: continue logger.debug('Setting internal address for %s', mdl.class_name) collate = mdl.flags.collate ndevice = mdl.n # get and set internal variable addresses xaddr = self.dae.request_address('x', ndevice=ndevice, nvar=len(mdl.states), collate=mdl.flags.collate, ) yaddr = self.dae.request_address('y', ndevice=ndevice, nvar=len(mdl.algebs), collate=mdl.flags.collate, ) for idx, item in enumerate(mdl.states.values()): item.set_address(xaddr[idx], contiguous=not collate) for idx, item in enumerate(mdl.algebs.values()): item.set_address(yaddr[idx], contiguous=not collate) # --- Phase 2: set external variable addresses --- # NOTE: # This step will retrieve the number of variables (item.n) for Phase 3. for mdl in models.values(): # handle external groups for instance in mdl.cache.vars_ext.values(): ext_name = instance.model try: ext_model = self.__dict__[ext_name] except KeyError: raise KeyError('<%s> is not a model or group name.' % ext_name) try: instance.link_external(ext_model) except (IndexError, KeyError) as e: logger.error('Error: <%s> cannot retrieve <%s> from <%s> using <%s>:\n %s', mdl.class_name, instance.name, instance.model, instance.indexer.name, repr(e)) # --- Phase 3: set external variable RHS addresses --- for mdl in models.values(): if mdl.flags.address is True: logger.debug('%s RHS address exists', mdl.class_name) continue if mdl.n == 0: continue for item in mdl.states_ext.values(): # skip if no equation, i.e., no RHS value if item.e_str is None: continue item.set_address(np.arange(self.dae.p, self.dae.p + item.n)) self.dae.p += item.n for item in mdl.algebs_ext.values(): if item.e_str is None: continue item.set_address(np.arange(self.dae.q, self.dae.q + item.n)) self.dae.q += item.n mdl.flags.address = True # allocate memory for DAE arrays self.dae.resize_arrays() # set `v` and `e` in variables self.set_var_arrays(models=models) self.dae.alloc_or_extend_names()
[docs] def set_dae_names(self, models): """ Set variable names for differential and algebraic variables, right-hand side of external equations, and discrete flags. """ for mdl in models.values(): _set_xy_name(mdl, mdl.states, (self.dae.x_name, self.dae.x_tex_name)) _set_xy_name(mdl, mdl.algebs, (self.dae.y_name, self.dae.y_tex_name)) _set_hi_name(mdl, mdl.states_ext, (self.dae.h_name, self.dae.h_tex_name)) _set_hi_name(mdl, mdl.algebs_ext, (self.dae.i_name, self.dae.i_tex_name)) # add discrete flag names if self.TDS.config.store_z == 1: _set_z_name(mdl, self.dae, (self.dae.z_name, self.dae.z_tex_name))
[docs] def set_var_arrays(self, models, inplace=True, alloc=True): """ Set arrays (`v` and `e`) for internal variables to access dae arrays in place. This function needs to be called after de-serializing a System object, where the internal variables are incorrectly assigned new memory. Parameters ---------- models : OrderedDict, list, Model, optional Models to execute. inplace : bool True to retrieve arrays that share memory with dae alloc : bool True to allocate for arrays internally """ for mdl in models.values(): if mdl.n == 0: continue for var in mdl.cache.vars_int.values(): var.set_arrays(self.dae, inplace=inplace, alloc=alloc) for var in mdl.cache.vars_ext.values(): var.set_arrays(self.dae, inplace=inplace, alloc=alloc)
def _init_numba(self, models: OrderedDict): """ Helper function to compile all functions with Numba before init. """ if not self.config.numba: return try: getattr(numba, '__version__') except ImportError: # numba not installed logger.warning("numba is enabled but not installed. Please install numba manually.") self.config.numba = 0 return False use_parallel = bool(self.config.numba_parallel) nopython = bool(self.config.numba_nopython) logger.info("Numba compilation initiated with caching.") for mdl in models.values(): mdl.numba_jitify(parallel=use_parallel, nopython=nopython, ) return True
[docs] def precompile(self, models: Union[OrderedDict, None] = None, nomp: bool = False, ncpu: int = NCPUS_PHYSICAL): """ Trigger precompilation for the given models. Arguments are the same as ``prepare``. """ t0, _ = elapsed() if models is None: models = self.models else: models = self._get_models(models) # turn on numba for precompilation self.config.numba = 1 self.setup() numba_ok = self._init_numba(models) if not numba_ok: return def _precompile_model(model: Model): model.precompile() logger.info("Compilation in progress. This might take a minute...") if nomp is True: for name, mdl in models.items(): _precompile_model(mdl) logger.debug("Model <%s> compiled.", name) # multi-processed implementation. `Pool.map` runs very slow somehow. else: jobs = [] for idx, (name, mdl) in enumerate(models.items()): job = Process( name='Process {0:d}'.format(idx), target=_precompile_model, args=(mdl,), ) jobs.append(job) job.start() if (idx % ncpu == ncpu - 1) or (idx == len(models) - 1): time.sleep(0.02) for job in jobs: job.join() jobs = [] _, s = elapsed(t0) logger.info('Numba compiled %d model%s in %s.', len(models), '' if len(models) == 1 else 's', s)
[docs] def init(self, models: OrderedDict, routine: str): """ Initialize the variables for each of the specified models. For each model, the initialization procedure is: - Get values for all `ExtService`. - Call the model `init()` method, which initializes internal variables. - Copy variables to DAE and then back to the model. """ self._init_numba(models) for mdl in models.values(): # link externals services first for instance in mdl.services_ext.values(): ext_name = instance.model try: ext_model = self.__dict__[ext_name] except KeyError: raise KeyError('<%s> is not a model or group name.' % ext_name) try: instance.link_external(ext_model) except (IndexError, KeyError) as e: logger.error('Error: <%s> cannot retrieve <%s> from <%s> using <%s>:\n %s', mdl.class_name, instance.name, instance.model, instance.indexer.name, repr(e)) # initialize variables second mdl.init(routine=routine) self.vars_to_dae(mdl) self.vars_to_models() self.s_update_post(models) # store time constants associated with differential equations self._store_tf(models)
[docs] def store_adder_setter(self, models): """ Store non-inplace adders and setters for variables and equations. """ self._clear_adder_setter() for mdl in models.values(): # Note: # We assume that a Model with no device is not addressed and, therefore, # contains no value in each variable. # It is always true for the current architecture. if not mdl.n: continue # Fixes an issue if the cache was manually built but stale # after assigning addresses for simulation # Assigning memory will affect the cache of `v_adders` and `e_adders`. mdl.cache.refresh() # ``getters` that retrieve variable values from DAE for var in mdl.cache.v_getters.values(): self._getters[var.v_code].append(var) # ``adders`` that add variable values to the DAE array for var in mdl.cache.v_adders.values(): self._adders[var.v_code].append(var) for var in mdl.cache.e_adders.values(): self._adders[var.e_code].append(var) # ``setters`` that force set variable values in the DAE array for var in mdl.cache.v_setters.values(): self._setters[var.v_code].append(var) for var in mdl.cache.e_setters.values(): self._setters[var.e_code].append(var) # ``antiwindups`` stores all AntiWindup instances for item in mdl.discrete.values(): if isinstance(item, AntiWindup): self.antiwindups.append(item) return
[docs] def store_no_check_init(self, models): """ Store differential variables with ``check_init == False``. """ self.no_check_init = list() for mdl in models.values(): if mdl.n == 0: continue for var in mdl.states.values(): if var.check_init is False: self.no_check_init.extend(var.a)
[docs] def calc_pu_coeff(self): """ Perform per unit value conversion. This function calculates the per unit conversion factors, stores input parameters to `vin`, and perform the conversion. """ # `Sb`, `Vb` and `Zb` are the system base, bus base values # `Sn`, `Vn` and `Zn` are the device bases Sb = self.config.mva for mdl in self.models.values(): # default Sn to Sb if not provided. Some controllers might not have Sn or Vn. if 'Sn' in mdl.__dict__: Sn = mdl.Sn.v else: Sn = Sb # If both Vn and Vn1 are not provided, default to Vn = Vb = 1 # test if is shunt-connected or series-connected to bus, or unconnected to bus Vb, Vn = 1, 1 if 'bus' in mdl.__dict__: Vb = self.Bus.get(src='Vn', idx=mdl.bus.v, attr='v') Vn = mdl.Vn.v if 'Vn' in mdl.__dict__ else Vb elif 'bus1' in mdl.__dict__: Vb = self.Bus.get(src='Vn', idx=mdl.bus1.v, attr='v') Vn = mdl.Vn1.v if 'Vn1' in mdl.__dict__ else Vb Zn = Vn ** 2 / Sn Zb = Vb ** 2 / Sb # process dc parameter pu conversion Vdcb, Vdcn, Idcn = 1, 1, 1 if 'node' in mdl.__dict__: Vdcb = self.Node.get(src='Vdcn', idx=mdl.node.v, attr='v') Vdcn = mdl.Vdcn.v if 'Vdcn' in mdl.__dict__ else Vdcb Idcn = mdl.Idcn.v if 'Idcn' in mdl.__dict__ else (Sb / Vdcb) elif 'node1' in mdl.__dict__: Vdcb = self.Node.get(src='Vdcn', idx=mdl.node1.v, attr='v') Vdcn = mdl.Vdcn1.v if 'Vdcn1' in mdl.__dict__ else Vdcb Idcn = mdl.Idcn.v if 'Idcn' in mdl.__dict__ else (Sb / Vdcb) Idcb = Sb / Vdcb Rb = Vdcb / Idcb Rn = Vdcn / Idcn coeffs = {'voltage': Vn / Vb, 'power': Sn / Sb, 'ipower': Sb / Sn, 'current': (Sn / Vn) / (Sb / Vb), 'z': Zn / Zb, 'y': Zb / Zn, 'dc_voltage': Vdcn / Vdcb, 'dc_current': Idcn / Idcb, 'r': Rn / Rb, 'g': Rb / Rn, } for prop, coeff in coeffs.items(): for p in mdl.find_param(prop).values(): p.set_pu_coeff(coeff) # store coeffs and bases back in models. mdl.coeffs = coeffs mdl.bases = {'Sn': Sn, 'Sb': Sb, 'Vn': Vn, 'Vb': Vb, 'Zn': Zn, 'Zb': Zb}
[docs] def l_update_var(self, models: OrderedDict, niter=0, err=None): """ Update variable-based limiter discrete states by calling ``l_update_var`` of models. This function is must be called before any equation evaluation. """ self.call_models('l_update_var', models, dae_t=self.dae.t, niter=niter, err=err)
[docs] def l_update_eq(self, models: OrderedDict, init=False, niter=0): """ Update equation-dependent limiter discrete components by calling ``l_check_eq`` of models. Force set equations after evaluating equations. This function is must be called after differential equation updates. """ self.call_models('l_check_eq', models, init=init, niter=niter)
[docs] def s_update_var(self, models: OrderedDict): """ Update variable services by calling ``s_update_var`` of models. This function is must be called before any equation evaluation after limiter update function `l_update_var`. """ self.call_models('s_update_var', models)
[docs] def s_update_post(self, models: OrderedDict): """ Update variable services by calling ``s_update_post`` of models. This function is called at the end of `System.init()`. """ self.call_models('s_update_post', models)
[docs] def fg_to_dae(self): """ Collect equation values into the DAE arrays. Additionally, the function resets the differential equations associated with variables pegged by anti-windup limiters. """ self._e_to_dae(('f', 'g')) # reset mismatches for islanded buses self.g_islands() # update variable values set by anti-windup limiters for item in self.antiwindups: if len(item.x_set) > 0: for key, val, _ in item.x_set: np.put(self.dae.x, key, val)
[docs] def f_update(self, models: OrderedDict): """ Call the differential equation update method for models in sequence. Notes ----- Updated equation values remain in models and have not been collected into DAE at the end of this step. """ try: self.call_models('f_update', models) except TypeError as e: logger.error("f_update failed. Have you run `andes prepare -i` after updating?") raise e
[docs] def g_update(self, models: OrderedDict): """ Call the algebraic equation update method for models in sequence. Notes ----- Like `f_update`, updated values have not collected into DAE at the end of the step. """ try: self.call_models('g_update', models) except TypeError as e: logger.error("g_update failed. Have you run `andes prepare -i` after updating?") raise e
[docs] def g_islands(self): """ Reset algebraic mismatches for islanded buses. """ if self.Bus.n_islanded_buses == 0: return self.dae.g[self.Bus.islanded_a] = 0.0 self.dae.g[self.Bus.islanded_v] = 0.0
[docs] def j_update(self, models: OrderedDict, info=None): """ Call the Jacobian update method for models in sequence. The procedure is - Restore the sparsity pattern with :py:func:`andes.variables.dae.DAE.restore_sparse` - For each sparse matrix in (fx, fy, gx, gy), evaluate the Jacobian function calls and add values. Notes ----- Updated Jacobians are immediately reflected in the DAE sparse matrices (fx, fy, gx, gy). """ self.call_models('j_update', models) self.dae.restore_sparse() # collect sparse values into sparse structures for j_name in jac_names: j_size = self.dae.get_size(j_name) for mdl in models.values(): for rows, cols, vals in mdl.triplets.zip_ijv(j_name): try: if self.config.ipadd: self.dae.__dict__[j_name].ipadd(vals, rows, cols) else: self.dae.__dict__[j_name] += spmatrix(vals, rows, cols, j_size, 'd') except TypeError as e: logger.error("Error adding Jacobian triplets to existing sparsity pattern.") logger.error(f'{mdl.class_name}: j_name {j_name}, row={rows}, col={cols}, val={vals}, ' f'j_size={j_size}') raise e self.j_islands() if info: logger.debug("Jacobian updated at t=%.6f: %s.", self.dae.t, info) else: logger.debug("Jacobian updated at t=%.6f.", self.dae.t)
[docs] def j_islands(self): """ Set gy diagonals to eps for `a` and `v` variables of islanded buses. """ if self.Bus.n_islanded_buses == 0: return aidx = self.Bus.islanded_a vidx = self.Bus.islanded_v if self.config.ipadd: self.dae.gy.ipset(self.config.diag_eps, aidx, aidx) self.dae.gy.ipset(0.0, aidx, vidx) self.dae.gy.ipset(self.config.diag_eps, vidx, vidx) self.dae.gy.ipset(0.0, vidx, aidx) else: avals = [-self.dae.gy[int(idx), int(idx)] + self.config.diag_eps for idx in aidx] vvals = [-self.dae.gy[int(idx), int(idx)] + self.config.diag_eps for idx in vidx] self.dae.gy += spmatrix(avals, aidx, aidx, self.dae.gy.size, 'd') self.dae.gy += spmatrix(vvals, vidx, vidx, self.dae.gy.size, 'd')
[docs] def store_sparse_pattern(self, models: OrderedDict): """ Collect and store the sparsity pattern of Jacobian matrices. This is a runtime function specific to cases. Notes ----- For `gy` matrix, always make sure the diagonal is reserved. It is a safeguard if the modeling user omitted the diagonal term in the equations. """ self.call_models('store_sparse_pattern', models) # add variable jacobian values for jname in jac_names: ii, jj, vv = list(), list(), list() # for `gy`, reserve memory for the main diagonal if jname == 'gy': ii.extend(np.arange(self.dae.m)) jj.extend(np.arange(self.dae.m)) vv.extend(np.zeros(self.dae.m)) for mdl in models.values(): for row, col, val in mdl.triplets.zip_ijv(jname): ii.extend(row) jj.extend(col) vv.extend(np.zeros_like(row)) for row, col, val in mdl.triplets.zip_ijv(jname + 'c'): # process constant Jacobians separately ii.extend(row) jj.extend(col) vv.extend(val * np.ones_like(row)) if len(ii) > 0: ii = np.array(ii, dtype=int) jj = np.array(jj, dtype=int) vv = np.array(vv, dtype=float) self.dae.store_sparse_ijv(jname, ii, jj, vv) self.dae.build_pattern(jname)
[docs] def vars_to_dae(self, model): """ Copy variables values from models to `System.dae`. This function clears `DAE.x` and `DAE.y` and collects values from models. """ self._v_to_dae('x', model) self._v_to_dae('y', model)
[docs] def vars_to_models(self): """ Copy variable values from `System.dae` to models. """ for var in self._getters['y']: if var.n > 0: var.v[:] = self.dae.y[var.a] for var in self._getters['x']: if var.n > 0: var.v[:] = self.dae.x[var.a]
[docs] def connectivity(self, info=True): """ Perform connectivity check for system. Parameters ---------- info : bool True to log connectivity summary. """ logger.debug("Entering connectivity check.") self.Bus.n_islanded_buses = 0 self.Bus.islanded_buses = list() self.Bus.island_sets = list() self.Bus.nosw_island = list() self.Bus.msw_island = list() self.Bus.islands = list() n = self.Bus.n # collect from-bus and to-bus indices fr, to, u = list(), list(), list() # TODO: generalize it to all serial devices # collect from Line fr.extend(self.Line.a1.a.tolist()) to.extend(self.Line.a2.a.tolist()) u.extend(self.Line.u.v.tolist()) # collect from Jumper fr.extend(self.Jumper.a1.a.tolist()) to.extend(self.Jumper.a2.a.tolist()) u.extend(self.Jumper.u.v.tolist()) # collect from Fortescue fr.extend(self.Fortescue.a.a.tolist()) to.extend(self.Fortescue.aa.a.tolist()) u.extend(self.Fortescue.u.v.tolist()) fr.extend(self.Fortescue.a.a.tolist()) to.extend(self.Fortescue.ab.a.tolist()) u.extend(self.Fortescue.u.v.tolist()) fr.extend(self.Fortescue.a.a.tolist()) to.extend(self.Fortescue.ac.a.tolist()) u.extend(self.Fortescue.u.v.tolist()) os = [0] * len(u) # find islanded buses diag = list(matrix(spmatrix(u, to, os, (n, 1), 'd') + spmatrix(u, fr, os, (n, 1), 'd'))) for idx in range(n): if diag[idx] == 0: self.Bus.islanded_buses.append(idx) # store `a` and `v` indices for zeroing out residuals self.Bus.islanded_a = np.array(self.Bus.islanded_buses) self.Bus.islanded_v = self.Bus.n + self.Bus.islanded_a # `self.Bus.n_islanded_buses` is used to determine if `g_islands` # needs to be call self.Bus.n_islanded_buses = len(self.Bus.islanded_a) # find islanded areas - Goderya's algorithm temp = spmatrix(list(u) * 4, fr + to + fr + to, to + fr + fr + to, (n, n), 'd') temp = sparse(temp) # need to drop allocated zero values # Translated find_islanded_areas from goderya.jl into Python conn = spmatrix([], [], [], (1, n), 'd') island_sets = [] starting_bus = 0 visit_idx = 0 while True: if starting_bus in self.Bus.islanded_buses: starting_bus += 1 continue cons = temp[starting_bus, :] nelm = len(cons.J) while True: cons = cons * temp cons = sparse(cons) # remove zero values new_nelm = len(cons.J) if new_nelm == nelm: break nelm = new_nelm island_sets.append(list(cons.J)) conn += cons if len(conn.J) >= (n - len(self.Bus.islanded_buses)): break # Increment `starting_bus` until it's not in `conn.J` and # `self.Bus.islanded_buses` for i in range(visit_idx, self.Bus.n): if i in conn.J or i in self.Bus.islanded_buses: i += 1 else: visit_idx = i break starting_bus = visit_idx self.Bus.island_sets = island_sets # --- check if all areas have a slack generator --- if len(self.Bus.island_sets) > 0: for idx, island in enumerate(self.Bus.island_sets): nosw = 1 slack_bus_uid = self.Bus.idx2uid(self.Slack.bus.v) slack_u = self.Slack.u.v for u, item in zip(slack_u, slack_bus_uid): if (u == 1) and (item in island): nosw -= 1 if nosw == 1: self.Bus.nosw_island.append(idx) elif nosw < 0: self.Bus.msw_island.append(idx) # --- Post processing --- # 1. extend islanded buses, each in a list if len(self.Bus.islanded_buses) > 0: self.Bus.islands.extend([[item] for item in self.Bus.islanded_buses]) if len(self.Bus.island_sets) == 0: self.Bus.islands.append(list(range(n))) else: self.Bus.islands.extend(self.Bus.island_sets) # 2. find generators in the largest island if self.TDS.config.criteria and self.TDS.initialized: lg_island = None for item in self.Bus.islands: if lg_island is None: lg_island = item continue if len(item) > len(lg_island): lg_island = item lg_bus_idx = [self.Bus.idx.v[ii] for ii in lg_island] self.SynGen.store_idx_island(lg_bus_idx) if info is True: self.summary()
[docs] def to_ipysheet(self, model: str, vin: bool = False): """ Return an ipysheet object for editing in Jupyter Notebook. """ from ipysheet import from_dataframe return from_dataframe(self.models[model].as_df(vin=vin))
[docs] def from_ipysheet(self, model: str, sheet, vin: bool = False): """ Set an ipysheet object back to model. """ from ipysheet import to_dataframe df = to_dataframe(sheet) self.models[model].update_from_df(df, vin=vin)
[docs] def summary(self): """ Print out system summary. """ island_sets = self.Bus.island_sets nosw_island = self.Bus.nosw_island msw_island = self.Bus.msw_island n_islanded_buses = self.Bus.n_islanded_buses logger.info("-> System connectivity check results:") if n_islanded_buses == 0: logger.info(" No islanded bus detected.") else: logger.info(" %d islanded bus detected.", n_islanded_buses) logger.debug(" Islanded Bus indices (0-based): %s", self.Bus.islanded_buses) if len(island_sets) == 0: logger.info(" No island detected.") elif len(island_sets) == 1: logger.info(" System is interconnected.") logger.debug(" Bus indices in interconnected system (0-based): %s", island_sets) else: logger.info(" System contains %d island(s).", len(island_sets)) logger.debug(" Bus indices in islanded areas (0-based): %s", island_sets) if len(nosw_island) > 0: logger.warning(' Slack generator is not defined/enabled for %d island(s).', len(nosw_island)) logger.debug(" Bus indices in no-Slack areas (0-based): %s", [island_sets[item] for item in nosw_island]) if len(msw_island) > 0: logger.warning(' Multiple slack generators are defined/enabled for %d island(s).', len(msw_island)) logger.debug(" Bus indices in multiple-Slack areas (0-based): %s", [island_sets[item] for item in msw_island]) if len(self.Bus.nosw_island) == 0 and len(self.Bus.msw_island) == 0: logger.info(' Each island has a slack bus correctly defined and enabled.')
def _v_to_dae(self, v_code, model): """ Helper function for collecting variable values into ``dae`` structures `x` and `y`. This function must be called with ``dae.x`` and ``dae.y`` both being zeros. Otherwise, adders will be summed again, causing an error. Parameters ---------- v_code : 'x' or 'y' Variable type name """ if model.n == 0: return if model.flags.initialized is False: return for var in model.cache.v_adders.values(): if var.v_code != v_code: continue np.add.at(self.dae.__dict__[v_code], var.a, var.v) for var in self._setters[v_code]: if var.owner.flags.initialized is False: continue if var.n > 0: np.put(self.dae.__dict__[v_code], var.a, var.v) def _e_to_dae(self, eq_name: Union[str, Tuple] = ('f', 'g')): """ Helper function for collecting equation values into `System.dae.f` and `System.dae.g`. Parameters ---------- eq_name : 'x' or 'y' or tuple Equation type name """ if isinstance(eq_name, str): eq_name = [eq_name] for name in eq_name: for var in self._adders[name]: np.add.at(self.dae.__dict__[name], var.a, var.e) for var in self._setters[name]: np.put(self.dae.__dict__[name], var.a, var.e)
[docs] def get_z(self, models: OrderedDict): """ Get all discrete status flags in a numpy array. Values are written to ``dae.z`` in place. Returns ------- numpy.array """ if self.TDS.config.store_z != 1: return None if len(self.dae.z) != self.dae.o: self.dae.z = np.zeros(self.dae.o, dtype=float) ii = 0 for mdl in models.values(): if mdl.n == 0 or len(mdl._input_z) == 0: continue for zz in mdl._input_z.values(): self.dae.z[ii:ii + mdl.n] = zz ii += mdl.n return self.dae.z
[docs] def find_models(self, flag: Optional[Union[str, Tuple]], skip_zero: bool = True): """ Find models with at least one of the flags as True. Warnings -------- Checking the number of devices has been centralized into this function. ``models`` passed to most System calls must be retrieved from here. Parameters ---------- flag : list, str Flags to find skip_zero : bool Skip models with zero devices Returns ------- OrderedDict model name : model instance """ if isinstance(flag, str): flag = [flag] out = OrderedDict() for name, mdl in self.models.items(): if skip_zero is True: if (mdl.n == 0) or (mdl.in_use is False): continue for f in flag: if mdl.flags.__dict__[f] is True: out[name] = mdl break return out
[docs] def undill(self, autogen_stale=True): """ Reload generated function functions, from either the ``$HOME/.andes/pycode`` folder. If no change is made to models, future calls to ``prepare()`` can be replaced with ``undill()`` for acceleration. Parameters ---------- autogen_stale: bool True to automatically call code generation if stale code is detected. Regardless of this option, codegen is trigger if importing existing code fails. """ # load equations and jacobian from saved code loaded = self._load_calls() stale_models = self._find_stale_models() if loaded is False: self.prepare(quick=True, incremental=False) loaded = True elif autogen_stale is False: # NOTE: incremental code generation may be triggered due to Python # not invalidating ``.pyc`` caches. If multiprocessing is being # used, code generation will cause nested multiprocessing, which is # not allowed. # The flag ``autogen_stale=False`` is used to prevent nested codegen # and is intended to be used only in multiprocessing. logger.info("Generated code for <%s> is stale.", ', '.join(stale_models.keys())) logger.info("Automatic code re-generation manually skipped") loaded = True elif len(stale_models) > 0: logger.info("Generated code for <%s> is stale.", ', '.join(stale_models.keys())) self.prepare(quick=True, incremental=True, models=stale_models) loaded = True return loaded
def _load_calls(self): """ Helper function for loading generated numerical functions from the ``pycode`` module. """ loaded = False user_pycode_path = self.options.get("pycode_path") pycode = import_pycode(user_pycode_path=user_pycode_path) if pycode: try: self._expand_pycode(pycode) loaded = True except KeyError: logger.error("Your generated pycode is broken. Run `andes prep` to re-generate. ") return loaded def _expand_pycode(self, pycode_module): """ Expand imported ``pycode`` module to model calls. Parameters ---------- pycode : module The module for generated code for models. """ for name, model in self.models.items(): if name not in pycode_module.__dict__: logger.debug("Model %s does not exist in pycode", name) continue pycode_model = pycode_module.__dict__[model.class_name] # md5 model.calls.md5 = getattr(pycode_model, 'md5', None) # reload stored variables for item in dilled_vars: model.calls.__dict__[item] = pycode_model.__dict__[item] # equations model.calls.f = pycode_model.__dict__.get("f_update") model.calls.g = pycode_model.__dict__.get("g_update") # services for instance in model.services.values(): if (instance.v_str is not None) and instance.sequential is True: sv_name = f'{instance.name}_svc' model.calls.s[instance.name] = pycode_model.__dict__[sv_name] # services - non sequential model.calls.sns = pycode_model.__dict__.get("sns_update") # load initialization; assignment for instance in model.cache.all_vars.values(): if instance.v_str is not None: ia_name = f'{instance.name}_ia' model.calls.ia[instance.name] = pycode_model.__dict__[ia_name] # load initialization: iterative for item in model.calls.init_seq: if isinstance(item, list): name_concat = '_'.join(item) model.calls.ii[name_concat] = pycode_model.__dict__[name_concat + '_ii'] model.calls.ij[name_concat] = pycode_model.__dict__[name_concat + '_ij'] # load Jacobian functions for jname in model.calls.j_names: model.calls.j[jname] = pycode_model.__dict__.get(f'{jname}_update') def _get_models(self, models): """ Helper function for sanitizing the ``models`` input. The output is an OrderedDict of model names and instances. """ out = OrderedDict() if isinstance(models, OrderedDict): out.update(models) elif models is None: out.update(self.exist.pflow) elif isinstance(models, str): out[models] = self.__dict__[models] elif isinstance(models, Model): out[models.class_name] = models elif isinstance(models, list): for item in models: if isinstance(item, Model): out[item.class_name] = item elif isinstance(item, str): out[item] = self.__dict__[item] else: raise TypeError(f'Unknown type {type(item)}') return out def _store_tf(self, models): """ Store the inverse time constant associated with equations. """ for mdl in models.values(): for var in mdl.cache.states_and_ext.values(): if var.t_const is not None: np.put(self.dae.Tf, var.a, var.t_const.v)
[docs] def call_models(self, method: str, models: OrderedDict, *args, **kwargs): """ Call methods on the given models. Parameters ---------- method : str Name of the model method to be called models : OrderedDict, list, str Models on which the method will be called args Positional arguments to be passed to the model method kwargs Keyword arguments to be passed to the model method Returns ------- The return value of the models in an OrderedDict """ ret = OrderedDict() for name, mdl in models.items(): ret[name] = getattr(mdl, method)(*args, **kwargs) if self.config.save_stats: if method not in self.call_stats[name]: self.call_stats[name][method] = 1 else: self.call_stats[name][method] += 1 return ret
def _check_group_common(self): """ Check if all group common variables and parameters are met. This function is called at the end of code generation by `prepare`. Raises ------ KeyError if any parameter or value is not provided """ for group in self.groups.values(): for item in group.common_params: for model in group.models.values(): # the below includes all of BaseParam (NumParam, DataParam and ExtParam) if item not in model.__dict__: if item in model.group_param_exception: continue raise KeyError(f'Group <{group.class_name}> common param <{item}> does not exist ' f'in model <{model.class_name}>') for item in group.common_vars: for model in group.models.values(): if item not in model.cache.all_vars: if item in model.group_var_exception: continue raise KeyError(f'Group <{group.class_name}> common var <{item}> does not exist ' f'in model <{model.class_name}>')
[docs] def collect_ref(self): """ Collect indices into `BackRef` for all models. """ models_and_groups = list(self.models.values()) + list(self.groups.values()) # create an empty list of lists for all `BackRef` instances for model in models_and_groups: for ref in model.services_ref.values(): ref.v = [list() for _ in range(model.n)] # `model` is the model who stores `IdxParam`s to other models # `BackRef` is declared at other models specified by the `model` parameter # of `IdxParam`s. for model in models_and_groups: if model.n == 0: continue # skip: a group is not allowed to link to other groups if not hasattr(model, "idx_params"): continue for idxp in model.idx_params.values(): if (idxp.model not in self.models) and (idxp.model not in self.groups): continue dest = self.__dict__[idxp.model] if dest.n == 0: continue for name in (model.class_name, model.group): # `BackRef` not requested by the linked models or groups if name not in dest.services_ref: continue for model_idx, dest_idx in zip(model.idx.v, idxp.v): if dest_idx not in dest.uid: continue dest.set_backref(name, from_idx=model_idx, to_idx=dest_idx) # set model ``in_use`` flag if isinstance(model, Model): model.set_in_use()
[docs] def import_groups(self): """ Import all groups classes defined in ``models/group.py``. Groups will be stored as instances with the name as class names. All groups will be stored to dictionary ``System.groups``. """ module = importlib.import_module('andes.models.group') for m in inspect.getmembers(module, inspect.isclass): name, cls = m if name == 'GroupBase': continue elif not issubclass(cls, GroupBase): # skip other imported classes such as `OrderedDict` continue self.__dict__[name] = cls() self.groups[name] = self.__dict__[name]
[docs] def import_models(self): """ Import and instantiate models as System member attributes. Models defined in ``models/__init__.py`` will be instantiated `sequentially` as attributes with the same name as the class name. In addition, all models will be stored in dictionary ``System.models`` with model names as keys and the corresponding instances as values. Examples -------- ``system.Bus`` stores the `Bus` object, and ``system.GENCLS`` stores the classical generator object, ``system.models['Bus']`` points the same instance as ``system.Bus``. """ for fname, cls_list in file_classes: for model_name in cls_list: the_module = importlib.import_module('andes.models.' + fname) the_class = getattr(the_module, model_name) self.__dict__[model_name] = the_class(system=self, config=self._config_object) self.models[model_name] = self.__dict__[model_name] self.models[model_name].config.check() # link to the group group_name = self.__dict__[model_name].group self.__dict__[group_name].add_model(model_name, self.__dict__[model_name]) for key, val in andes.models.model_aliases.items(): self.model_aliases[key] = self.models[val] self.__dict__[key] = self.models[val]
[docs] def import_routines(self): """ Import routines as defined in ``routines/__init__.py``. Routines will be stored as instances with the name as class names. All routines will be stored to dictionary ``System.routines``. Examples -------- ``System.PFlow`` is the power flow routine instance, and ``System.TDS`` and ``System.EIG`` are time-domain analysis and eigenvalue analysis routines, respectively. """ for file, cls_list in all_routines.items(): for cls_name in cls_list: file = importlib.import_module('andes.routines.' + file) the_class = getattr(file, cls_name) attr_name = cls_name self.__dict__[attr_name] = the_class(system=self, config=self._config_object) self.routines[attr_name] = self.__dict__[attr_name] self.routines[attr_name].config.check()
[docs] def store_switch_times(self, models, eps=1e-4): """ Store event switching time in a sorted Numpy array in ``System.switch_times`` and an OrderedDict ``System.switch_dict``. ``System.switch_dict`` has keys as event times and values as the OrderedDict of model names and instances associated with the event. Parameters ---------- models : OrderedDict model name : model instance eps : float The small time step size to use immediately before and after the event Returns ------- array-like self.switch_times """ out = np.array([], dtype=float) if self.options.get('flat') is True: return out names = [] for instance in models.values(): times = np.array(instance.get_times()).ravel() out = np.append(out, times) out = np.append(out, times - eps) out = np.append(out, times + eps) names.extend([instance.class_name] * (3 * len(times))) # sort sort_idx = np.argsort(out).astype(int) out = out[sort_idx] names = [names[i] for i in sort_idx] # select t > current time ltzero_idx = np.where(out >= self.dae.t)[0] out = out[ltzero_idx] names = [names[i] for i in ltzero_idx] # make into an OrderedDict with unique keys and model names combined for i, j in zip(out, names): if i not in self.switch_dict: self.switch_dict[i] = {j: self.models[j]} else: self.switch_dict[i].update({j: self.models[j]}) self.switch_times = np.array(list(self.switch_dict.keys())) # self.switch_times = out self.n_switches = len(self.switch_times) return self.switch_times
[docs] def switch_action(self, models: OrderedDict): """ Invoke the actions associated with switch times. This function will not be called if ``flat=True`` is passed to system. """ for instance in models.values(): instance.switch_action(self.dae.t) # TODO: generalize below for any models with timeseries data. self.TimeSeries.apply_exact(self.dae.t)
def _p_restore(self): """ Restore parameters stored in `pin`. """ for model in self.models.values(): for param in model.num_params.values(): param.restore()
[docs] def e_clear(self, models: OrderedDict): """ Clear equation arrays in DAE and model variables. This step must be called before calling `f_update` or `g_update` to flush existing values. """ self.dae.clear_fg() self.call_models('e_clear', models)
[docs] def remove_pycapsule(self): """ Remove PyCapsule objects in solvers. """ for r in self.routines.values(): r.solver.clear()
def _store_calls(self, models: OrderedDict): """ Collect and store model calls into system. """ logger.debug("Collecting Model.calls into System.") self.calls['__version__'] = andes.__version__ for name, mdl in models.items(): self.calls[name] = mdl.calls def _list2array(self): """ Helper function to call models' ``list2array`` method, which usually performs memory preallocation. """ self.call_models('list2array', self.models)
[docs] def set_config(self, config=None): """ Set configuration for the System object. Config for models are routines are passed directly to their constructors. """ if config is not None: # set config for system if self.__class__.__name__ in config: self.config.add(config[self.__class__.__name__]) logger.debug("Config: set for System")
[docs] def collect_config(self): """ Collect config data from models. Returns ------- dict a dict containing the config from devices; class names are keys and configs in a dict are values. """ config_dict = configparser.ConfigParser() config_dict[self.__class__.__name__] = self.config.as_dict() all_with_config = OrderedDict(list(self.routines.items()) + list(self.models.items())) for name, instance in all_with_config.items(): cfg = instance.config.as_dict() if len(cfg) > 0: config_dict[name] = cfg return config_dict
[docs] def save_config(self, file_path=None, overwrite=False): """ Save all system, model, and routine configurations to an rc-formatted file. Parameters ---------- file_path : str, optional path to the configuration file default to `~/andes/andes.rc`. overwrite : bool, optional If file exists, True to overwrite without confirmation. Otherwise prompt for confirmation. Warnings -------- Saved config is loaded back and populated *at system instance creation time*. Configs from the config file takes precedence over default config values. """ if file_path is None: andes_path = os.path.join(os.path.expanduser('~'), '.andes') os.makedirs(andes_path, exist_ok=True) file_path = os.path.join(andes_path, 'andes.rc') elif os.path.isfile(file_path): if not confirm_overwrite(file_path, overwrite=overwrite): return conf = self.collect_config() with open(file_path, 'w') as f: conf.write(f) logger.info('Config written to "%s"', file_path) return file_path
[docs] def supported_models(self, export='plain'): """ Return the support group names and model names in a table. Returns ------- str A table-formatted string for the groups and models """ def rst_ref(name, export): """ Refer to the model in restructuredText mode so that it renders as a hyperlink. """ if export == 'rest': return ":ref:`" + name + '`' else: return name pairs = list() for g in self.groups: models = list() for m in self.groups[g].models: models.append(rst_ref(m, export)) if len(models) > 0: pairs.append((rst_ref(g, export), ', '.join(models))) tab = Tab(title='Supported Groups and Models', header=['Group', 'Models'], data=pairs, export=export, ) return tab.draw()
[docs] def as_dict(self, vin=False, skip_empty=True): """ Return system data as a dict where the keys are model names and values are dicts. Each dict has parameter names as keys and corresponding data in an array as values. Returns ------- OrderedDict """ out = OrderedDict() for name, instance in self.models.items(): if skip_empty and instance.n == 0: continue out[name] = instance.as_dict(vin=vin) return out
[docs] def set_output_subidx(self, models): """ Process :py:class:`andes.models.misc.Output` data and store the sub-indices into ``dae.xy``. Parameters ---------- models : OrderedDict Models currently in use for the routine """ export_vars = dict(x=list(), y=list()) # indices of export x and y for model, var, dev in zip(self.Output.model.v, self.Output.varname.v, self.Output.dev.v): # check validity of model name if model not in models: logger.info("Output model <%s> invalid or contains no device. Skipped.", model) continue mdl_instance = models[model] mdl_all_vars = mdl_instance.cache.all_vars # check validity of var name if var is not None and (var not in mdl_all_vars): logger.info("Output model <%s> contains no variable <%s>. Skipped.", model, var) continue # check validity of dev idx if (dev is not None) and (dev not in mdl_instance.idx.v): logger.info("Output model <%s> contains no device <%s>. Skipped.", model, dev) continue # TODO: dev-based indexing is not fully supported # for multi-index variables, such as those in COI. if var is None: for item in mdl_all_vars.values(): if dev is None: export_vars[item.v_code].extend(item.a) else: uid = mdl_instance.idx2uid(dev) export_vars[item.v_code].append(item.a[uid]) else: # with variable name item = mdl_all_vars[var] if dev is None: export_vars[item.v_code].extend(item.a) else: # with exact index uid = mdl_instance.idx2uid(dev) export_vars[item.v_code].append(item.a[uid]) self.Output.xidx = sorted(np.unique(export_vars['x'])) self.Output.yidx = sorted(np.unique(export_vars['y']))
# --------------- Helper Functions --------------- def _config_numpy(seed='None', divide='warn', invalid='warn'): """ Configure NumPy based on Config. """ # set up numpy random seed if isinstance(seed, int): np.random.seed(seed) logger.debug("Random seed set to <%d>.", seed) # set levels np.seterr(divide=divide, invalid=invalid, )
[docs]def load_config_rc(conf_path=None): """ Load config from an rc-formatted file. Parameters ---------- conf_path : None or str Path to the config file. If is `None`, the function body will not run. Returns ------- configparse.ConfigParser """ if conf_path is None: return conf = configparser.ConfigParser() conf.read(conf_path) logger.info('> Loaded config from file "%s"', conf_path) return conf
[docs]def fix_view_arrays(system): """ Point NumPy arrays without OWNDATA (termed "view arrays" here) to the source array. This function properly sets ``v`` and ``e`` arrays of internal variables as views of the corresponding DAE arrays. Inputs will be refreshed for each model. Parameters ---------- system : andes.system.System System object to be fixed """ system.set_var_arrays(system.models) for model in system.models.values(): model.get_inputs(refresh=True) return True
[docs]def import_pycode(user_pycode_path=None): """ Helper function to import generated pycode in the following priority: 1. a user-provided path from CLI. Currently, this is only for specifying the path to store the generated pycode via ``andes prepare``. 2. ``~/.andes/pycode``. This is where pycode is stored by default. 3. ``<andes_package_root>/pycode``. One can store pycode in the ANDES package folder and ship a full package, which does not require code generation. """ # below are executed serially because of priority pycode = reload_submodules('pycode') if not pycode: pycode_path = get_pycode_path(user_pycode_path, mkdir=False) pycode = _import_pycode_from(pycode_path) if not pycode: pycode = reload_submodules('andes.pycode') if not pycode: pycode = _import_pycode_from(os.path.join(andes_root(), 'pycode')) return pycode
def _import_pycode_from(pycode_path): """ Helper function to load pycode from ``.andes``. """ MODULE_PATH = os.path.join(pycode_path, '__init__.py') MODULE_NAME = 'pycode' pycode = None if os.path.isfile(MODULE_PATH): try: spec = importlib.util.spec_from_file_location(MODULE_NAME, MODULE_PATH) pycode = importlib.util.module_from_spec(spec) # NOQA sys.modules[spec.name] = pycode spec.loader.exec_module(pycode) logger.info('> Loaded generated Python code in "%s".', pycode_path) except ImportError: logger.debug('> Failed loading generated Python code in "%s".', pycode_path) return pycode
[docs]def reload_submodules(module_name): """ Helper function for reloading an existing module and its submodules. It is used to reload the ``pycode`` module after regenerating code. """ if module_name in sys.modules: pycode = sys.modules[module_name] for _, m in inspect.getmembers(pycode, inspect.ismodule): importlib.reload(m) logger.info('> Reloaded generated Python code of module "%s".', module_name) return pycode return None
def _append_model_name(model_name, idx): """ Helper function for appending ``idx`` to model names. Removes duplicate model name strings. """ out = '' if isinstance(idx, str) and (model_name in idx): out = idx else: out = f'{model_name} {idx}' # replaces `_` with space for LaTeX to continue out = out.replace('_', ' ') return out def _set_xy_name(mdl, vars_dict, dests): """ Helper function for setting algebraic and state variable names. """ mdl_name = mdl.class_name idx = mdl.idx for name, item in vars_dict.items(): for idx_item, addr in zip(idx.v, item.a): dests[0][addr] = f'{name} {_append_model_name(mdl_name, idx_item)}' dests[1][addr] = rf'${item.tex_name}$ {_append_model_name(mdl_name, idx_item)}' def _set_hi_name(mdl, vars_dict, dests): """ Helper function for setting names of external equations. """ mdl_name = mdl.class_name idx = mdl.idx for item in vars_dict.values(): if len(item.r) != len(idx.v): idxall = item.indexer.v else: idxall = idx.v for idx_item, addr in zip(idxall, item.r): dests[0][addr] = f'{item.ename} {_append_model_name(mdl_name, idx_item)}' dests[1][addr] = rf'${item.tex_ename}$ {_append_model_name(mdl_name, idx_item)}' def _set_z_name(mdl, dae, dests): """ Helper function for addng and setting discrete flag names. """ for item in mdl.discrete.values(): if mdl.flags.initialized: continue mdl_name = mdl.class_name for name, tex_name in zip(item.get_names(), item.get_tex_names()): for idx_item in mdl.idx.v: dests[0].append(f'{name} {_append_model_name(mdl_name, idx_item)}') dests[1].append(rf'${item.tex_name}$ {_append_model_name(mdl_name, idx_item)}') dae.o += 1
[docs]def example(setup=True, no_output=True, **kwargs): """ Return an :py:class:`andes.system.System` object for the ``ieee14_linetrip.xlsx`` as an example. This function is useful when a user wants to quickly get a System object for testing. Returns ------- System An example :py:class:`andes.system.System` object. """ return andes.load(andes.get_case("ieee14/ieee14_linetrip.xlsx"), setup=setup, no_output=no_output, **kwargs)