Source code for andes.system

"""
System class for power system data and methods
"""

#  [ANDES] (C)2015-2021 Hantao Cui
#
#  This program is free software; you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation; either version 3 of the License, or
#  (at your option) any later version.
#
#  File name: system.py
#  Last modified: 8/16/20, 7:26 PM

import configparser
import importlib
import logging
import os
import sys
import inspect
import dill

from collections import OrderedDict
from typing import Dict, Tuple, Union, Optional

import andes.io
from andes.models import file_classes
from andes.models.group import GroupBase
from andes.variables import FileMan, DAE
from andes.routines import all_routines
from andes.utils.tab import Tab
from andes.utils.misc import elapsed
from andes.utils.paths import confirm_overwrite
from andes.utils.paths import get_config_path, get_dot_andes_path
from andes.utils.paths import get_pycode_path, get_pkl_path
from andes.core import Config, Model, AntiWindup
from andes.io.streaming import Streaming

from andes.shared import np, jac_names, dilled_vars, IP_ADD
from andes.shared import matrix, spmatrix, sparse

logger = logging.getLogger(__name__)
dill.settings['recurse'] = True
pycode = None


[docs]class ExistingModels: """ Storage class for existing models """ def __init__(self): self.pflow = OrderedDict() self.tds = OrderedDict() # if a model needs to be initialized before TDS, set `flags.tds = True` self.pflow_tds = OrderedDict()
[docs]class System: """ System contains models and routines for modeling and simulation. System contains a several special `OrderedDict` member attributes for housekeeping. These attributes include `models`, `groups`, `routines` and `calls` for loaded models, groups, analysis routines, and generated numerical function calls, respectively. Notes ----- System stores model and routine instances as attributes. Model and routine attribute names are the same as their class names. For example, `Bus` is stored at ``system.Bus``, the power flow calculation routine is at ``system.PFlow``, and the numerical DAE instance is at ``system.dae``. See attributes for the list of attributes. Attributes ---------- dae : andes.variables.dae.DAE Numerical DAE storage files : andes.variables.fileman.FileMan File path storage config : andes.core.Config System config storage models : OrderedDict model name and instance pairs groups : OrderedDict group name and instance pairs routines : OrderedDict routine name and instance pairs """ def __init__(self, case: Optional[str] = None, name: Optional[str] = None, config: Optional[Dict] = None, config_path: Optional[str] = None, default_config: Optional[bool] = False, options: Optional[Dict] = None, **kwargs ): self.name = name self.options = {} if options is not None: self.options.update(options) if kwargs: self.options.update(kwargs) self.calls = OrderedDict() # a dictionary with model names (keys) and their ``calls`` instance self.models = OrderedDict() # model names and instances self.groups = OrderedDict() # group names and instances self.routines = OrderedDict() # routine names and instances self.switch_times = np.array([]) # an array of ordered event switching times self.switch_dict = OrderedDict() # time: OrderedDict of associated models self.with_calls = False # if generated function calls have been loaded self.n_switches = 0 # number of elements in `self.switch_times` self.exit_code = 0 # command-line exit code, 0 - normal, others - error. # get and load default config file self._config_path = get_config_path() if config_path is not None: self._config_path = config_path if default_config is True: self._config_path = None self._config_object = self.load_config(self._config_path) self.config = Config(self.__class__.__name__, dct=config) self.config.load(self._config_object) # custom configuration for system goes after this line self.config.add(OrderedDict((('freq', 60), ('mva', 100), ('store_z', 0), ('ipadd', 1), ('seed', 'None'), ('diag_eps', 1e-8), ('warn_limits', 1), ('warn_abnormal', 1), ('dime_enabled', 0), ('dime_name', 'andes'), ('dime_address', 'ipc:///tmp/dime2'), ('numba', 0), ('numba_parallel', 0), ('yapf_pycode', 0), ('np_divide', 'warn'), ('np_invalid', 'warn'), ('pickle_path', get_pkl_path()) ))) self.config.add_extra("_help", freq='base frequency [Hz]', mva='system base MVA', store_z='store limiter status in TDS output', ipadd='use spmatrix.ipadd if available', seed='seed (or None) for random number generator', diag_eps='small value for Jacobian diagonals', warn_limits='warn variables initialized at limits', warn_abnormal='warn initialization out of normal values', numba='use numba for JIT compilation', numba_parallel='enable parallel for numba.jit', yapf_pycode='format generated code with yapf', np_divide='treatment for division by zero', np_invalid='treatment for invalid floating-point ops.', pickle_path='path models should be (un)dilled to/from', ) self.config.add_extra("_alt", freq="float", mva="float", store_z=(0, 1), ipadd=(0, 1), seed='int or None', warn_limits=(0, 1), warn_abnormal=(0, 1), numba=(0, 1), numba_parallel=(0, 1), yapf_pycode=(0, 1), np_divide={'ignore', 'warn', 'raise', 'call', 'print', 'log'}, np_invalid={'ignore', 'warn', 'raise', 'call', 'print', 'log'}, ) self.config.check() self._set_numpy() self.exist = ExistingModels() self.files = FileMan(case=case, **self.options) # file path manager self.dae = DAE(system=self) # numerical DAE storage self.streaming = Streaming(self) # Dime2 streaming # dynamic imports of groups, models and routines self.import_groups() self.import_models() self.import_routines() # routine imports come after models self._getters = dict(f=list(), g=list(), x=list(), y=list()) self._adders = dict(f=list(), g=list(), x=list(), y=list()) self._setters = dict(f=list(), g=list(), x=list(), y=list()) self.antiwindups = list() self.no_check_init = list() # states for which initialization check is omitted # internal flags self.is_setup = False # if system has been setup def _set_numpy(self): """ Configure NumPy based on Config. """ # set up numpy random seed if isinstance(self.config.seed, int): np.random.seed(self.config.seed) logger.debug("Random seed set to <%d>.", self.config.seed) np.seterr(divide=self.config.np_divide, invalid=self.config.np_invalid, )
[docs] def reload(self, case, **kwargs): """ Reload a new case in the same System object. """ self.options.update(kwargs) self.files.set(case=case, **kwargs) # TODO: clear all flags and empty data andes.io.parse(self) self.setup()
def _clear_adder_setter(self): """ Clear adders and setters storage """ self._getters = dict(f=list(), g=list(), x=list(), y=list()) self._adders = dict(f=list(), g=list(), x=list(), y=list()) self._setters = dict(f=list(), g=list(), x=list(), y=list())
[docs] def prepare(self, quick=False, incremental=False, models=None): """ Generate numerical functions from symbolically defined models. All procedures in this function must be independent of test case. Parameters ---------- quick : bool, optional True to skip pretty-print generation to reduce code generation time. incremental : bool, optional True to generate only for modified models, incrementally. models : list, OrderedDict, None List or OrderedList of models to prepare Notes ----- Option ``incremental`` compares the md5 checksum of all var and service strings, and only regenerate for updated models. Examples -------- If one needs to print out LaTeX-formatted equations in a Jupyter Notebook, one need to generate such equations with :: import andes sys = andes.prepare() Alternatively, one can explicitly create a System and generate the code :: import andes sys = andes.System() sys.prepare() Warnings -------- Generated lambda functions will be serialized to file, but pretty prints (SymPy objects) can only exist in the System instance on which prepare is called. """ # info if incremental is True: mode_text = 'rapid incremental mode' elif quick is True: mode_text = 'quick mode' else: mode_text = 'full mode' logger.info('Numerical code generation (%s) started...', mode_text) t0, _ = elapsed() # consistency check for group parameters and variables self._check_group_common() # get `pycode` folder path without automatic creation pycode_path = get_pycode_path(self.options.get("pycode_path"), mkdir=False) # determine which models to prepare based on mode and `models` list. if incremental and models is None: if not self.with_calls: self._load_calls() models = self._find_stale_models() elif not incremental and models is None: models = self.models else: models = self._to_orddct(models) total = len(models) width = len(str(total)) for idx, (name, model) in enumerate(models.items()): print(f"\r\x1b[K Generating code for {name} ({idx+1:>{width}}/{total:>{width}}).", end='\r', flush=True) # generate code for a single model model.prepare(quick=quick, pycode_path=pycode_path) if len(models) > 0: self._finalize_pycode(pycode_path) self._store_calls(models) self.dill() _, s = elapsed(t0) logger.info('Generated numerical code for %d models in %s.', len(models), s)
def _finalize_pycode(self, pycode_path): """ Helper function for finalizing pycode generation by writing ``__init__.py`` and reloading ``pycode`` package. """ init_path = os.path.join(pycode_path, '__init__.py') with open(init_path, 'w') as f: f.write(f"__version__ = '{andes.__version__}'\n\n") for name in self.models.keys(): f.write(f"from . import {name:20s} # NOQA\n") f.write('\n') logger.info('Saved generated pycode to "%s"', pycode_path) # RELOAD REQUIRED as the generated Jacobian arguments may be in a different order if pycode is not None: self._call_from_pycode() def _find_stale_models(self): """ Find models whose ModelCall are stale using md5 checksum. """ out = OrderedDict() for model in self.models.values(): calls_md5 = getattr(model.calls, 'md5', None) if calls_md5 != model.get_md5(): out[model.class_name] = model return out def _to_orddct(self, model_list): """ Helper function to convert a list of model names to OrderedDict with name as keys and model instances as values. """ if isinstance(model_list, OrderedDict): return model_list if isinstance(model_list, list): out = OrderedDict() for name in model_list: if name not in self.models: logger.error("Model <%s> does not exist. Check your inputs.", name) continue out[name] = self.models[name] return out else: raise TypeError("Type %s not recognized" % type(model_list)) def _prepare_mp(self, quick=False): """ Code generation with multiprocessing. NOT WORKING NOW. Warnings -------- Function is not working. Serialization failed for `conj`. """ from andes.shared import Pool dill.settings['recurse'] = True # consistency check for group parameters and variables self._check_group_common() def _prep_model(model: Model): model.prepare(quick=quick) return model model_list = list(self.models.values()) # TODO: failed when serializing. ret = Pool().map(_prep_model, model_list) for idx, name in enumerate(self.models.keys()): self.models[name] = ret[idx] self._store_calls(self.models) self.dill()
[docs] def setup(self): """ Set up system for studies. This function is to be called after adding all device data. """ ret = True t0, _ = elapsed() if self.is_setup: logger.warning('System has been setup. Calling setup twice is not allowed.') ret = False return ret self.collect_ref() self._list2array() # `list2array` must come before `link_ext_param` if not self.link_ext_param(): ret = False self.find_devices() # find or add required devices # === no device addition or removal after this point === self.calc_pu_coeff() # calculate parameters in system per units self.store_existing() # store models with routine flags # assign address at the end before adding devices and processing parameters self.set_address(self.exist.pflow) self.set_dae_names(self.exist.pflow) self.store_sparse_pattern(self.exist.pflow) self.store_adder_setter(self.exist.pflow) if ret is True: self.is_setup = True # set `is_setup` if no error occurred else: logger.error("System setup failed. Please resolve the reported issue(s).") self.exit_code += 1 _, s = elapsed(t0) logger.info('System internal structure set up in %s.', s) return ret
[docs] def store_existing(self): """ Store existing models in `System.existing`. TODO: Models with `TimerParam` will need to be stored anyway. This will allow adding switches on the fly. """ self.exist.pflow = self.find_models('pflow') self.exist.tds = self.find_models('tds') self.exist.pflow_tds = self.find_models(('tds', 'pflow'))
[docs] def reset(self, force=False): """ Reset to the state after reading data and setup (before power flow). Warnings -------- If TDS is initialized, reset will lead to unpredictable state. """ if self.TDS.initialized is True and not force: logger.error('Reset failed because TDS is initialized. \nPlease reload the test case to start over.') return self.dae.reset() self.call_models('a_reset', models=self.models) self.e_clear(models=self.models) self._p_restore() self.is_setup = False self.setup()
[docs] def add(self, model, param_dict=None, **kwargs): """ Add a device instance for an existing model. This methods calls the ``add`` method of `model` and registers the device `idx` to group. """ if model not in self.models: logger.warning("<%s> is not an existing model.", model) return if self.is_setup: raise NotImplementedError("Adding devices are not allowed after setup.") group_name = self.__dict__[model].group group = self.groups[group_name] if param_dict is None: param_dict = {} if kwargs is not None: param_dict.update(kwargs) idx = param_dict.pop('idx', None) if idx is np.nan: idx = None idx = group.get_next_idx(idx=idx, model_name=model) self.__dict__[model].add(idx=idx, **param_dict) group.add(idx=idx, model=self.__dict__[model]) return idx
[docs] def find_devices(self): """ Add dependent devices for all model based on `DeviceFinder`. """ for mdl in self.models.values(): if len(mdl.services_fnd) == 0: continue for fnd in mdl.services_fnd.values(): fnd.find_or_add(self)
[docs] def set_address(self, models): """ Set addresses for differential and algebraic variables. """ for mdl in models.values(): if mdl.flags.address is True: logger.debug('%s internal address exists', mdl.class_name) continue if mdl.n == 0: continue # set internal variable addresses logger.debug('Setting internal address for %s', mdl.class_name) n = mdl.n m0 = self.dae.m n0 = self.dae.n m_end = m0 + len(mdl.algebs) * n n_end = n0 + len(mdl.states) * n collate = mdl.flags.collate if not collate: for idx, item in enumerate(mdl.algebs.values()): item.set_address(np.arange(m0 + idx * n, m0 + (idx + 1) * n), contiguous=True) for idx, item in enumerate(mdl.states.values()): item.set_address(np.arange(n0 + idx * n, n0 + (idx + 1) * n), contiguous=True) else: for idx, item in enumerate(mdl.algebs.values()): item.set_address(np.arange(m0 + idx, m_end, len(mdl.algebs)), contiguous=False) for idx, item in enumerate(mdl.states.values()): item.set_address(np.arange(n0 + idx, n_end, len(mdl.states)), contiguous=False) self.dae.m = m_end self.dae.n = n_end # set external variable addresses for mdl in models.values(): # handle external groups for name, instance in mdl.cache.vars_ext.items(): ext_name = instance.model try: ext_model = self.__dict__[ext_name] except KeyError: raise KeyError('<%s> is not a model or group name.' % ext_name) try: instance.link_external(ext_model) except (IndexError, KeyError) as e: logger.error('Error: <%s> cannot retrieve <%s> from <%s> using <%s>:\n %s', mdl.class_name, instance.name, instance.model, instance.indexer.name, repr(e)) # set external variable RHS addresses for mdl in models.values(): if mdl.flags.address is True: logger.debug('%s RHS address exists', mdl.class_name) continue if mdl.n == 0: continue for item in mdl.states_ext.values(): item.set_address(np.arange(self.dae.p, self.dae.p + item.n)) self.dae.p = self.dae.p + item.n for item in mdl.algebs_ext.values(): item.set_address(np.arange(self.dae.q, self.dae.q + item.n)) self.dae.q = self.dae.q + item.n mdl.flags.address = True # allocate memory for DAE arrays self.dae.resize_arrays() # set `v` and `e` in variables self.set_var_arrays(models=models) # pre-allocate for names if len(self.dae.y_name) == 0: self.dae.x_name = [''] * self.dae.n self.dae.y_name = [''] * self.dae.m self.dae.h_name = [''] * self.dae.p self.dae.i_name = [''] * self.dae.q self.dae.x_tex_name = [''] * self.dae.n self.dae.y_tex_name = [''] * self.dae.m self.dae.h_tex_name = [''] * self.dae.p self.dae.i_tex_name = [''] * self.dae.q else: self.dae.x_name.extend([''] * (self.dae.n - len(self.dae.x_name))) self.dae.y_name.extend([''] * (self.dae.m - len(self.dae.y_name))) self.dae.h_name.extend([''] * (self.dae.p - len(self.dae.h_name))) self.dae.i_name.extend([''] * (self.dae.q - len(self.dae.i_name))) self.dae.x_tex_name.extend([''] * (self.dae.n - len(self.dae.x_tex_name))) self.dae.y_tex_name.extend([''] * (self.dae.m - len(self.dae.y_tex_name))) self.dae.h_tex_name.extend([''] * (self.dae.p - len(self.dae.h_tex_name))) self.dae.i_tex_name.extend([''] * (self.dae.q - len(self.dae.i_tex_name)))
[docs] def set_dae_names(self, models): """ Set variable names for differential and algebraic variables, and discrete flags. """ def append_model_name(model_name, idx): out = '' if isinstance(idx, str) and (model_name in idx): out = idx else: out = f'{model_name} {idx}' # replaces `_` with space for LaTeX to continue out = out.replace('_', ' ') return out for mdl in models.values(): mdl_name = mdl.class_name idx = mdl.idx for name, item in mdl.algebs.items(): for id, addr in zip(idx.v, item.a): self.dae.y_name[addr] = f'{name} {append_model_name(mdl_name, id)}' self.dae.y_tex_name[addr] = rf'${item.tex_name}$ {append_model_name(mdl_name, id)}' for name, item in mdl.states.items(): for id, addr in zip(idx.v, item.a): self.dae.x_name[addr] = f'{name} {append_model_name(mdl_name, id)}' self.dae.x_tex_name[addr] = rf'${item.tex_name}$ {append_model_name(mdl_name, id)}' # add discrete flag names if self.config.store_z == 1: for item in mdl.discrete.values(): if mdl.flags.initialized: continue for name, tex_name in zip(item.get_names(), item.get_tex_names()): for id in idx.v: self.dae.z_name.append(f'{name} {append_model_name(mdl_name, id)}') self.dae.z_tex_name.append(rf'${item.tex_name}$ {append_model_name(mdl_name, id)}') self.dae.o += 1
[docs] def set_var_arrays(self, models, inplace=True, alloc=True): """ Set arrays (`v` and `e`) in internal variables. Parameters ---------- models : OrderedDict, list, Model, optional Models to execute. inplace : bool True to retrieve arrays that share memory with dae alloc : bool True to allocate for arrays internally """ for mdl in models.values(): if mdl.n == 0: continue for var in mdl.cache.vars_int.values(): var.set_arrays(self.dae, inplace=inplace, alloc=alloc) for var in mdl.cache.vars_ext.values(): var.set_arrays(self.dae, inplace=inplace, alloc=alloc)
def _init_numba(self, models: OrderedDict): """ Helper function to compile all functions with Numba before init. """ if self.config.numba: use_parallel = True if (self.config.numba_parallel == 1) else False use_cache = True if (pycode is not None) else False logger.info("Numba compilation initiated, parallel=%s, cache=%s.", use_parallel, use_cache) for mdl in models.values(): mdl.numba_jitify(parallel=use_parallel, cache=use_cache)
[docs] def init(self, models: OrderedDict, routine: str): """ Initialize the variables for each of the specified models. For each model, the initialization procedure is: - Get values for all `ExtService`. - Call the model `init()` method, which initializes internal variables. - Copy variables to DAE and then back to the model. """ try: self._init_numba(models) except ImportError: logger.warning("Numba not found. JIT compilation is skipped.") for mdl in models.values(): # link externals first for instance in mdl.services_ext.values(): ext_name = instance.model try: ext_model = self.__dict__[ext_name] except KeyError: raise KeyError('<%s> is not a model or group name.' % ext_name) try: instance.link_external(ext_model) except (IndexError, KeyError) as e: logger.error('Error: <%s> cannot retrieve <%s> from <%s> using <%s>:\n %s', mdl.class_name, instance.name, instance.model, instance.indexer.name, repr(e)) # initialize variables second mdl.init(routine=routine) self.vars_to_dae(mdl) self.vars_to_models() self.s_update_post(models) # store the inverse of time constants self._store_tf(models)
[docs] def store_adder_setter(self, models): """ Store non-inplace adders and setters for variables and equations. """ self._clear_adder_setter() for mdl in models.values(): # Note: # We assume that a Model with no device is not addressed and, therefore, # contains no value in each variable. # It is always true for the current architecture. if not mdl.n: continue # ``getters` that retrieve variable values from DAE for var in mdl.cache.v_getters.values(): self._getters[var.v_code].append(var) # ``adders`` that add variable values to the DAE array for var in mdl.cache.v_adders.values(): self._adders[var.v_code].append(var) for var in mdl.cache.e_adders.values(): self._adders[var.e_code].append(var) # ``setters`` that force set variable values in the DAE array for var in mdl.cache.v_setters.values(): self._setters[var.v_code].append(var) for var in mdl.cache.e_setters.values(): self._setters[var.e_code].append(var) # ``antiwindups`` stores all AntiWindup instances for item in mdl.discrete.values(): if isinstance(item, AntiWindup): self.antiwindups.append(item) return
[docs] def store_no_check_init(self, models): """ Store differential variables with ``check_init == False``. """ self.no_check_init = list() for mdl in models.values(): if mdl.n == 0: continue for var in mdl.states.values(): if var.check_init is False: self.no_check_init.extend(var.a)
[docs] def calc_pu_coeff(self): """ Perform per unit value conversion. This function calculates the per unit conversion factors, stores input parameters to `vin`, and perform the conversion. """ Sb = self.config.mva for mdl in self.models.values(): # default Sn to Sb if not provided. Some controllers might not have Sn or Vn. if 'Sn' in mdl.__dict__: Sn = mdl.Sn.v else: Sn = Sb # If both Vn and Vn1 are not provided, default to Vn = Vb = 1 # test if is shunt-connected or series-connected to bus, or unconnected to bus Vb, Vn = 1, 1 if 'bus' in mdl.__dict__: Vb = self.Bus.get(src='Vn', idx=mdl.bus.v, attr='v') Vn = mdl.Vn.v if 'Vn' in mdl.__dict__ else Vb elif 'bus1' in mdl.__dict__: Vb = self.Bus.get(src='Vn', idx=mdl.bus1.v, attr='v') Vn = mdl.Vn1.v if 'Vn1' in mdl.__dict__ else Vb Zn = Vn ** 2 / Sn Zb = Vb ** 2 / Sb # process dc parameter pu conversion Vdcb, Vdcn, Idcn = 1, 1, 1 if 'node' in mdl.__dict__: Vdcb = self.Node.get(src='Vdcn', idx=mdl.node.v, attr='v') Vdcn = mdl.Vdcn.v if 'Vdcn' in mdl.__dict__ else Vdcb Idcn = mdl.Idcn.v if 'Idcn' in mdl.__dict__ else (Sb / Vdcb) elif 'node1' in mdl.__dict__: Vdcb = self.Node.get(src='Vdcn', idx=mdl.node1.v, attr='v') Vdcn = mdl.Vdcn1.v if 'Vdcn1' in mdl.__dict__ else Vdcb Idcn = mdl.Idcn.v if 'Idcn' in mdl.__dict__ else (Sb / Vdcb) Idcb = Sb / Vdcb Rb = Vdcb / Idcb Rn = Vdcn / Idcn coeffs = {'voltage': Vn / Vb, 'power': Sn / Sb, 'ipower': Sb / Sn, 'current': (Sn / Vn) / (Sb / Vb), 'z': Zn / Zb, 'y': Zb / Zn, 'dc_voltage': Vdcn / Vdcb, 'dc_current': Idcn / Idcb, 'r': Rn / Rb, 'g': Rb / Rn, } for prop, coeff in coeffs.items(): for p in mdl.find_param(prop).values(): p.set_pu_coeff(coeff)
[docs] def l_update_var(self, models: OrderedDict, niter=None, err=None): """ Update variable-based limiter discrete states by calling ``l_update_var`` of models. This function is must be called before any equation evaluation. """ self.call_models('l_update_var', models, dae_t=self.dae.t, niter=niter, err=err)
[docs] def l_update_eq(self, models: OrderedDict): """ Update equation-dependent limiter discrete components by calling ``l_check_eq`` of models. Force set equations after evaluating equations. This function is must be called after differential equation updates. """ self.call_models('l_check_eq', models)
[docs] def s_update_var(self, models: OrderedDict): """ Update variable services by calling ``s_update_var`` of models. This function is must be called before any equation evaluation after limiter update function `l_update_var`. """ self.call_models('s_update_var', models)
[docs] def s_update_post(self, models: OrderedDict): """ Update variable services by calling ``s_update_post`` of models. This function is called at the end of `System.init()`. """ self.call_models('s_update_post', models)
[docs] def fg_to_dae(self): """ Collect equation values into the DAE arrays. Additionally, the function resets the differential equations associated with variables pegged by anti-windup limiters. """ self._e_to_dae(('f', 'g')) # reset mismatches for islanded buses self.g_islands() # update variable values set by anti-windup limiters for item in self.antiwindups: if len(item.x_set) > 0: for key, val, _ in item.x_set: np.put(self.dae.x, key, val)
[docs] def f_update(self, models: OrderedDict): """ Call the differential equation update method for models in sequence. Notes ----- Updated equation values remain in models and have not been collected into DAE at the end of this step. """ try: self.call_models('f_update', models) except TypeError as e: logger.error("f_update failed. Have you run `andes prepare -i` after updating?") raise e
[docs] def g_update(self, models: OrderedDict): """ Call the algebraic equation update method for models in sequence. Notes ----- Like `f_update`, updated values have not collected into DAE at the end of the step. """ try: self.call_models('g_update', models) except TypeError as e: logger.error("g_update failed. Have you run `andes prepare -i` after updating?") raise e
[docs] def g_islands(self): """ Reset algebraic mismatches for islanded buses. """ if self.Bus.n_islanded_buses == 0: return self.dae.g[self.Bus.islanded_a] = 0.0 self.dae.g[self.Bus.islanded_v] = 0.0
[docs] def j_update(self, models: OrderedDict, info=None): """ Call the Jacobian update method for models in sequence. The procedure is - Restore the sparsity pattern with :py:func:`andes.variables.dae.DAE.restore_sparse` - For each sparse matrix in (fx, fy, gx, gy), evaluate the Jacobian function calls and add values. Notes ----- Updated Jacobians are immediately reflected in the DAE sparse matrices (fx, fy, gx, gy). """ self.call_models('j_update', models) self.dae.restore_sparse() # collect sparse values into sparse structures for j_name in jac_names: j_size = self.dae.get_size(j_name) for mdl in models.values(): for rows, cols, vals in mdl.triplets.zip_ijv(j_name): try: if self.config.ipadd and IP_ADD: self.dae.__dict__[j_name].ipadd(vals, rows, cols) else: self.dae.__dict__[j_name] += spmatrix(vals, rows, cols, j_size, 'd') except TypeError as e: logger.error("Error adding Jacobian triplets to existing sparsity pattern.") logger.error(f'{mdl.class_name}: j_name {j_name}, row={rows}, col={cols}, val={vals}, ' f'j_size={j_size}') raise e self.j_islands() if info: logger.debug("Jacobian updated at t=%.6f: %s.", self.dae.t, info) else: logger.debug("Jacobian updated at t=%.6f.", self.dae.t)
[docs] def j_islands(self): """ Set gy diagonals to eps for `a` and `v` variables of islanded buses. """ if self.Bus.n_islanded_buses == 0: return aidx = self.Bus.islanded_a vidx = self.Bus.islanded_v if self.config.ipadd and IP_ADD: self.dae.gy.ipset(self.config.diag_eps, aidx, aidx) self.dae.gy.ipset(self.config.diag_eps, vidx, vidx) else: avals = [-self.dae.gy[int(idx), int(idx)] + self.config.diag_eps for idx in aidx] vvals = [-self.dae.gy[int(idx), int(idx)] + self.config.diag_eps for idx in vidx] self.dae.gy += spmatrix(avals, aidx, aidx, self.dae.gy.size, 'd') self.dae.gy += spmatrix(vvals, vidx, vidx, self.dae.gy.size, 'd')
[docs] def store_sparse_pattern(self, models: OrderedDict): """ Collect and store the sparsity pattern of Jacobian matrices. This is a runtime function specific to cases. Notes ----- For `gy` matrix, always make sure the diagonal is reserved. It is a safeguard if the modeling user omitted the diagonal term in the equations. """ self.call_models('store_sparse_pattern', models) # add variable jacobian values for jname in jac_names: ii, jj, vv = list(), list(), list() # for `gy`, reserve memory for the main diagonal if jname == 'gy': ii.extend(np.arange(self.dae.m)) jj.extend(np.arange(self.dae.m)) vv.extend(np.zeros(self.dae.m)) for mdl in models.values(): for row, col, val in mdl.triplets.zip_ijv(jname): ii.extend(row) jj.extend(col) vv.extend(np.zeros_like(row)) for row, col, val in mdl.triplets.zip_ijv(jname + 'c'): # process constant Jacobians separately ii.extend(row) jj.extend(col) vv.extend(val * np.ones_like(row)) if len(ii) > 0: ii = np.array(ii, dtype=int) jj = np.array(jj, dtype=int) vv = np.array(vv, dtype=float) self.dae.store_sparse_ijv(jname, ii, jj, vv) self.dae.build_pattern(jname)
[docs] def vars_to_dae(self, model): """ Copy variables values from models to `System.dae`. This function clears `DAE.x` and `DAE.y` and collects values from models. """ self._v_to_dae('x', model) self._v_to_dae('y', model)
[docs] def vars_to_models(self): """ Copy variable values from `System.dae` to models. """ for var in self._getters['y']: if var.n > 0: var.v[:] = self.dae.y[var.a] for var in self._getters['x']: if var.n > 0: var.v[:] = self.dae.x[var.a]
[docs] def connectivity(self, info=True): """ Perform connectivity check for system. Parameters ---------- info : bool True to log connectivity summary. """ logger.debug("Entering connectivity check.") self.Bus.n_islanded_buses = 0 self.Bus.islanded_buses = list() self.Bus.island_sets = list() self.Bus.nosw_island = list() self.Bus.msw_island = list() self.Bus.islands = list() n = self.Bus.n # collect from-bus and to-bus indices fr, to, u = list(), list(), list() # collect from Line fr.extend(self.Line.a1.a.tolist()) to.extend(self.Line.a2.a.tolist()) u.extend(self.Line.u.v.tolist()) os = [0] * len(u) # find islanded buses diag = list(matrix(spmatrix(u, to, os, (n, 1), 'd') + spmatrix(u, fr, os, (n, 1), 'd'))) nib = self.Bus.n_islanded_buses = diag.count(0) for idx in range(n): if diag[idx] == 0: self.Bus.islanded_buses.append(idx) # store `a` and `v` indices for zeroing out residuals self.Bus.islanded_a = np.array(self.Bus.islanded_buses) self.Bus.islanded_v = self.Bus.n + self.Bus.islanded_a # find islanded areas - Goderya's algorithm temp = spmatrix(list(u) * 4, fr + to + fr + to, to + fr + fr + to, (n, n), 'd') cons = temp[0, :] nelm = len(cons.J) conn = spmatrix([], [], [], (1, n), 'd') enum = idx = islands = 0 while True: while True: cons = cons * temp cons = sparse(cons) # remove zero values new_nelm = len(cons.J) if new_nelm == nelm: break nelm = new_nelm # started with an islanded bus if len(conn.J) == 0: enum += 1 # all buses are interconnected elif len(cons.J) == n: break self.Bus.island_sets.append(list(cons.J)) conn += cons islands += 1 nconn = len(conn.J) if nconn >= (n - nib): self.Bus.island_sets = [i for i in self.Bus.island_sets if len(i) > 0] break for element in conn.J[idx:]: if not diag[idx]: enum += 1 # skip islanded buses if element <= enum: idx += 1 enum += 1 else: break cons = temp[enum, :] # --- check if all areas have a slack generator --- if len(self.Bus.island_sets) > 0: for idx, island in enumerate(self.Bus.island_sets): nosw = 1 slack_bus_uid = self.Bus.idx2uid(self.Slack.bus.v) slack_u = self.Slack.u.v for u, item in zip(slack_u, slack_bus_uid): if (u == 1) and (item in island): nosw -= 1 if nosw == 1: self.Bus.nosw_island.append(idx) elif nosw < 0: self.Bus.msw_island.append(idx) # --- Post processing --- # extend islanded buses, each in a list if len(self.Bus.islanded_buses) > 0: self.Bus.islands.extend([[item] for item in self.Bus.islanded_buses]) if len(self.Bus.island_sets) == 0: self.Bus.islands.append(list(range(n))) else: self.Bus.islands.extend(self.Bus.island_sets) if info is True: self.summary()
[docs] def to_ipysheet(self, model: str, vin: bool = False): """ Return an ipysheet object for editing in Jupyter Notebook. """ from ipysheet import from_dataframe return from_dataframe(self.models[model].as_df(vin=vin))
[docs] def from_ipysheet(self, model: str, sheet, vin: bool = False): """ Set an ipysheet object back to model. """ from ipysheet import to_dataframe df = to_dataframe(sheet) self.models[model].update_from_df(df, vin=vin)
[docs] def summary(self): """ Print out system summary. """ island_sets = self.Bus.island_sets nosw_island = self.Bus.nosw_island msw_island = self.Bus.msw_island n_islanded_buses = self.Bus.n_islanded_buses logger.info("-> System connectivity check results:") if n_islanded_buses == 0: logger.info(" No islanded bus detected.") else: logger.info(" %d islanded bus detected.", n_islanded_buses) logger.debug(" Islanded Bus indices (0-based): %s", self.Bus.islanded_buses) if len(island_sets) == 0: logger.info(" No island detected.") else: logger.info(" A total of %d island(s) detected.", len(island_sets)) logger.debug(" Bus indices in islanded areas (0-based): %s", island_sets) if len(nosw_island) > 0: logger.warning(' Slack generator is not defined/enabled for %d island(s).', len(nosw_island)) logger.debug(" Bus indices in no-Slack areas (0-based): %s", [island_sets[item] for item in nosw_island]) if len(msw_island) > 0: logger.warning(' Multiple slack generators are defined/enabled for %d island(s).', len(msw_island)) logger.debug(" Bus indices in multiple-Slack areas (0-based): %s", [island_sets[item] for item in msw_island]) if len(self.Bus.nosw_island) == 0 and len(self.Bus.msw_island) == 0: logger.info(' Each island has a slack bus correctly defined and enabled.')
def _v_to_dae(self, v_code, model): """ Helper function for collecting variable values into ``dae`` structures `x` and `y`. This function must be called with ``dae.x`` and ``dae.y`` both being zeros. Otherwise, adders will be summed again, causing an error. Parameters ---------- v_code : 'x' or 'y' Variable type name """ if model.n == 0: return if model.flags.initialized is False: return for var in model.cache.v_adders.values(): if var.v_code != v_code: continue np.add.at(self.dae.__dict__[v_code], var.a, var.v) for var in self._setters[v_code]: if var.owner.flags.initialized is False: continue if var.n > 0: np.put(self.dae.__dict__[v_code], var.a, var.v) def _e_to_dae(self, eq_name: Union[str, Tuple] = ('f', 'g')): """ Helper function for collecting equation values into `System.dae.f` and `System.dae.g`. Parameters ---------- eq_name : 'x' or 'y' or tuple Equation type name """ if isinstance(eq_name, str): eq_name = [eq_name] for name in eq_name: for var in self._adders[name]: np.add.at(self.dae.__dict__[name], var.a, var.e) for var in self._setters[name]: np.put(self.dae.__dict__[name], var.a, var.e)
[docs] def get_z(self, models: OrderedDict): """ Get all discrete status flags in a numpy array. Values are written to ``dae.z`` in place. Returns ------- numpy.array """ if self.config.store_z != 1: return None if len(self.dae.z) != self.dae.o: self.dae.z = np.zeros(self.dae.o, dtype=float) ii = 0 for mdl in models.values(): if mdl.n == 0 or len(mdl._input_z) == 0: continue for zz in mdl._input_z.values(): self.dae.z[ii:ii + mdl.n] = zz ii += mdl.n return self.dae.z
[docs] def get_ext_fg(self, model: OrderedDict): """ Get the right-hand side of the external equations. """ pass
[docs] def find_models(self, flag: Optional[Union[str, Tuple]], skip_zero: bool = True): """ Find models with at least one of the flags as True. Warnings -------- Checking the number of devices has been centralized into this function. ``models`` passed to most System calls must be retrieved from here. Parameters ---------- flag : list, str Flags to find skip_zero : bool Skip models with zero devices Returns ------- OrderedDict model name : model instance """ if isinstance(flag, str): flag = [flag] out = OrderedDict() for name, mdl in self.models.items(): if skip_zero is True: if (mdl.n == 0) or (mdl.in_use is False): continue for f in flag: if mdl.flags.__dict__[f] is True: out[name] = mdl break return out
[docs] def dill(self): """ Serialize generated numerical functions in ``System.calls`` with package ``dill``. The serialized file will be stored to ``~/.andes/calls.pkl``, where `~` is the home directory path. Notes ----- This function sets `dill.settings['recurse'] = True` to serialize the function calls recursively. """ np_ver = np.__version__.split('.') np_ver = tuple([int(i) for i in np_ver]) if np_ver < (1, 20): logger.debug("Dumping calls to calls.pkl with dill") dill.settings['recurse'] = True with open(self.config.pickle_path, 'wb') as f: dill.dump(self.calls, f) else: logger.debug("Dumping calls to calls.pkl is not supported with NumPy 1.2+") logger.debug("ANDES is fully functional with generated Python code.")
[docs] def undill(self): """ Deserialize the function calls from ``~/.andes/calls.pkl`` with ``dill``. If no change is made to models, future calls to ``prepare()`` can be replaced with ``undill()`` for acceleration. """ # load equations and jacobian from saved code loaded = self._load_calls() stale_models = self._find_stale_models() if loaded is False: self.prepare(quick=True, incremental=False) loaded = True elif len(stale_models) > 0: logger.info("Generated code for <%s> is stale.", ', '.join(stale_models.keys())) self.prepare(quick=True, incremental=True, models=stale_models) loaded = True return loaded
def _load_calls(self): """ Helper function for loading calls from pkl or pycode module. """ loaded = self._call_from_pycode() if loaded is False: logger.debug("Pycode not found. Trying to load from `calls.pkl`.") loaded = self._call_from_pkl() self.with_calls = loaded return loaded def _load_pkl(self): """ Helper function to open and load dill-pickled functions. """ loaded_calls = None if os.path.isfile(self.config.pickle_path): with open(self.config.pickle_path, 'rb') as f: try: loaded_calls = dill.load(f) logger.info('Loaded generated code from pkl file "%s"', self.config.pickle_path) except (IOError, EOFError, AttributeError): logger.debug('Cannot open pkl file at "%s"', self.config.pickle_path) return loaded_calls def _call_from_pkl(self): """ Helper function for loading ModelCall from pickle file. """ self.calls = self._load_pkl() loaded = False if self.calls is not None: loaded = True for name, model_call in self.calls.items(): if name in self.__dict__: self.__dict__[name].calls = model_call return loaded def _call_from_pycode(self): """ Helper function to import generated pycode. """ pycode = None loaded = False pycode = load_pycode_dot_andes() if not pycode: # or use `pycode` in the andes source folder try: pycode = importlib.import_module("andes.pycode") logger.info("Loaded generated Python code in andes source dir.") except ImportError: pass if pycode is not None: self._expand_pycode(pycode) loaded = True return loaded def _expand_pycode(self, pycode_module): """ Expand imported ``pycode`` module to model calls. Parameters ---------- pycode : module The module for generated code for models. """ for name, model in self.models.items(): if name not in pycode_module.__dict__: logger.debug("Model %s does not exist in pycode", name) continue pycode_model = pycode_module.__dict__[model.class_name] # md5 model.calls.md5 = pycode_model.md5 # reload stored variables for item in dilled_vars: model.calls.__dict__[item] = pycode_model.__dict__[item] # equations model.calls.f = pycode_model.__dict__.get("f_update") model.calls.g = pycode_model.__dict__.get("g_update") # services for instance in model.services.values(): if instance.v_str is not None: sv_name = f'{instance.name}_svc' model.calls.s[instance.name] = pycode_model.__dict__[sv_name] # load initialization; assignment for instance in model.cache.all_vars.values(): if instance.v_str is not None: ia_name = f'{instance.name}_ia' model.calls.ia[instance.name] = pycode_model.__dict__[ia_name] # load initialization: iterative for item in model.calls.init_seq: if isinstance(item, list): name_concat = '_'.join(item) model.calls.ii[name_concat] = pycode_model.__dict__[name_concat + '_ii'] model.calls.ij[name_concat] = pycode_model.__dict__[name_concat + '_ij'] # load Jacobian functions for jname in model.calls.j_names: model.calls.j[jname] = pycode_model.__dict__.get(f'{jname}_update') def _get_models(self, models): """ Helper function for sanitizing the ``models`` input. The output is an OrderedDict of model names and instances. """ if models is None: models = self.exist.pflow if isinstance(models, str): models = {models: self.__dict__[models]} elif isinstance(models, Model): models = {models.class_name: models} elif isinstance(models, list): models = OrderedDict() for item in models: if isinstance(item, Model): models[item.class_name] = item elif isinstance(item, str): models[item] = self.__dict__[item] else: raise TypeError(f'Unknown type {type(item)}') # do nothing for OrderedDict type return models def _store_tf(self, models): """ Store the inverse time constant associated with equations. """ for mdl in models.values(): for var in mdl.cache.states_and_ext.values(): if var.t_const is not None: np.put(self.dae.Tf, var.a, var.t_const.v)
[docs] def call_models(self, method: str, models: OrderedDict, *args, **kwargs): """ Call methods on the given models. Parameters ---------- method : str Name of the model method to be called models : OrderedDict, list, str Models on which the method will be called args Positional arguments to be passed to the model method kwargs Keyword arguments to be passed to the model method Returns ------- The return value of the models in an OrderedDict """ ret = OrderedDict() for name, mdl in models.items(): ret[name] = getattr(mdl, method)(*args, **kwargs) return ret
def _check_group_common(self): """ Check if all group common variables and parameters are met. This function is called at the end of code generation by `prepare`. Raises ------ KeyError if any parameter or value is not provided """ for group in self.groups.values(): for item in group.common_params: for model in group.models.values(): # the below includes all of BaseParam (NumParam, DataParam and ExtParam) if item not in model.__dict__: raise KeyError(f'Group <{group.class_name}> common param <{item}> does not exist ' f'in model <{model.class_name}>') for item in group.common_vars: for model in group.models.values(): if item not in model.cache.all_vars: raise KeyError(f'Group <{group.class_name}> common param <{item}> does not exist ' f'in model <{model.class_name}>')
[docs] def collect_ref(self): """ Collect indices into `BackRef` for all models. """ models_and_groups = list(self.models.values()) + list(self.groups.values()) for model in models_and_groups: for ref in model.services_ref.values(): ref.v = [list() for _ in range(model.n)] for model in models_and_groups: if model.n == 0: continue if not hasattr(model, "idx_params"): # skip: group does not link to another group continue for ref in model.idx_params.values(): if ref.model not in self.models and (ref.model not in self.groups): continue dest_model = self.__dict__[ref.model] if dest_model.n == 0: continue for n in (model.class_name, model.group): if n not in dest_model.services_ref: continue for model_idx, dest_idx in zip(model.idx.v, ref.v): if dest_idx not in dest_model.uid: continue uid = dest_model.idx2uid(dest_idx) dest_model.services_ref[n].v[uid].append(model_idx) # set model ``in_use`` flag if isinstance(model, Model): model.set_in_use()
[docs] def import_groups(self): """ Import all groups classes defined in ``devices/group.py``. Groups will be stored as instances with the name as class names. All groups will be stored to dictionary ``System.groups``. """ module = importlib.import_module('andes.models.group') for m in inspect.getmembers(module, inspect.isclass): name, cls = m if name == 'GroupBase': continue elif not issubclass(cls, GroupBase): # skip other imported classes such as `OrderedDict` continue self.__dict__[name] = cls() self.groups[name] = self.__dict__[name]
[docs] def import_models(self): """ Import and instantiate models as System member attributes. Models defined in ``models/__init__.py`` will be instantiated `sequentially` as attributes with the same name as the class name. In addition, all models will be stored in dictionary ``System.models`` with model names as keys and the corresponding instances as values. Examples -------- ``system.Bus`` stores the `Bus` object, and ``system.GENCLS`` stores the classical generator object, ``system.models['Bus']`` points the same instance as ``system.Bus``. """ for file, cls_list in file_classes.items(): for model_name in cls_list: the_module = importlib.import_module('andes.models.' + file) the_class = getattr(the_module, model_name) self.__dict__[model_name] = the_class(system=self, config=self._config_object) self.models[model_name] = self.__dict__[model_name] self.models[model_name].config.check() # link to the group group_name = self.__dict__[model_name].group self.__dict__[group_name].add_model(model_name, self.__dict__[model_name])
[docs] def import_routines(self): """ Import routines as defined in ``routines/__init__.py``. Routines will be stored as instances with the name as class names. All groups will be stored to dictionary ``System.groups``. Examples -------- ``System.PFlow`` is the power flow routine instance, and ``System.TDS`` and ``System.EIG`` are time-domain analysis and eigenvalue analysis routines, respectively. """ for file, cls_list in all_routines.items(): for cls_name in cls_list: file = importlib.import_module('andes.routines.' + file) the_class = getattr(file, cls_name) attr_name = cls_name self.__dict__[attr_name] = the_class(system=self, config=self._config_object) self.routines[attr_name] = self.__dict__[attr_name] self.routines[attr_name].config.check()
[docs] def store_switch_times(self, models, eps=1e-4): """ Store event switching time in a sorted Numpy array in ``System.switch_times`` and an OrderedDict ``System.switch_dict``. ``System.switch_dict`` has keys as event times and values as the OrderedDict of model names and instances associated with the event. Parameters ---------- models : OrderedDict model name : model instance eps : float The small time step size to use immediately before and after the event Returns ------- array-like self.switch_times """ out = np.array([], dtype=float) if self.options.get('flat') is True: return out names = [] for instance in models.values(): times = np.array(instance.get_times()).ravel() out = np.append(out, times) out = np.append(out, times - eps) out = np.append(out, times + eps) names.extend([instance.class_name] * (3 * len(times))) # sort sort_idx = np.argsort(out).astype(int) out = out[sort_idx] names = [names[i] for i in sort_idx] # select t > current time ltzero_idx = np.where(out >= self.dae.t)[0] out = out[ltzero_idx] names = [names[i] for i in ltzero_idx] # make into an OrderedDict with unique keys and model names combined for i, j in zip(out, names): if i not in self.switch_dict: self.switch_dict[i] = {j: self.models[j]} else: self.switch_dict[i].update({j: self.models[j]}) self.switch_times = np.array(list(self.switch_dict.keys())) # self.switch_times = out self.n_switches = len(self.switch_times) return self.switch_times
[docs] def switch_action(self, models: OrderedDict): """ Invoke the actions associated with switch times. Switch actions will be disabled if `flat=True` is passed to system. """ for instance in models.values(): instance.switch_action(self.dae.t)
def _p_restore(self): """ Restore parameters stored in `pin`. """ for model in self.models.values(): for param in model.num_params.values(): param.restore()
[docs] def e_clear(self, models: OrderedDict): """ Clear equation arrays in DAE and model variables. This step must be called before calling `f_update` or `g_update` to flush existing values. """ self.dae.clear_fg() self.call_models('e_clear', models)
[docs] def remove_pycapsule(self): """ Remove PyCapsule objects in solvers. """ for r in self.routines.values(): r.solver.clear()
def _store_calls(self, models: OrderedDict): """ Collect and store model calls into system. """ logger.debug("Collecting Model.calls into System.") self.calls['__version__'] = andes.__version__ for name, mdl in models.items(): self.calls[name] = mdl.calls def _list2array(self): """ Helper function to call models' ``list2array`` method, which usually performs memory preallocation. """ self.call_models('list2array', self.models)
[docs] def set_config(self, config=None): """ Set configuration for the System object. Config for models are routines are passed directly to their constructors. """ if config is not None: # set config for system if self.__class__.__name__ in config: self.config.add(config[self.__class__.__name__]) logger.debug("Config: set for System")
[docs] def get_config(self): """ Collect config data from models. Returns ------- dict a dict containing the config from devices; class names are keys and configs in a dict are values. """ config_dict = configparser.ConfigParser() config_dict[self.__class__.__name__] = self.config.as_dict() all_with_config = OrderedDict(list(self.routines.items()) + list(self.models.items())) for name, instance in all_with_config.items(): cfg = instance.config.as_dict() if len(cfg) > 0: config_dict[name] = cfg return config_dict
[docs] @staticmethod def load_config(conf_path=None): """ Load config from an rc-formatted file. Parameters ---------- conf_path : None or str Path to the config file. If is `None`, the function body will not run. Returns ------- configparse.ConfigParser """ if conf_path is None: return conf = configparser.ConfigParser() conf.read(conf_path) logger.info('Loaded config from file "%s"', conf_path) return conf
[docs] def save_config(self, file_path=None, overwrite=False): """ Save all system, model, and routine configurations to an rc-formatted file. Parameters ---------- file_path : str, optional path to the configuration file default to `~/andes/andes.rc`. overwrite : bool, optional If file exists, True to overwrite without confirmation. Otherwise prompt for confirmation. Warnings -------- Saved config is loaded back and populated *at system instance creation time*. Configs from the config file takes precedence over default config values. """ if file_path is None: andes_path = os.path.join(os.path.expanduser('~'), '.andes') os.makedirs(andes_path, exist_ok=True) file_path = os.path.join(andes_path, 'andes.rc') elif os.path.isfile(file_path): if not confirm_overwrite(file_path, overwrite=overwrite): return conf = self.get_config() with open(file_path, 'w') as f: conf.write(f) logger.info('Config written to "%s"', file_path) return file_path
[docs] def supported_models(self, export='plain'): """ Return the support group names and model names in a table. Returns ------- str A table-formatted string for the groups and models """ pairs = list() for g in self.groups: models = list() for m in self.groups[g].models: models.append(m) if len(models) > 0: pairs.append((g, ', '.join(models))) tab = Tab(title='Supported Groups and Models', header=['Group', 'Models'], data=pairs, export=export, ) return tab.draw()
[docs] def as_dict(self, vin=False, skip_empty=True): """ Return system data as a dict where the keys are model names and values are dicts. Each dict has parameter names as keys and corresponding data in an array as values. Returns ------- OrderedDict """ out = OrderedDict() for name, instance in self.models.items(): if skip_empty and instance.n == 0: continue out[name] = instance.as_dict(vin=vin) return out
[docs]def load_pycode_dot_andes(): """ Helper function to load pycode from ``.andes``. """ pycode = None MODULE_PATH = get_dot_andes_path() + '/pycode/__init__.py' MODULE_NAME = 'pycode' if os.path.isfile(MODULE_PATH): try: spec = importlib.util.spec_from_file_location(MODULE_NAME, MODULE_PATH) pycode = importlib.util.module_from_spec(spec) # NOQA sys.modules[spec.name] = pycode spec.loader.exec_module(pycode) logger.info('Loaded generated Python code in "~/.andes/pycode".') except ImportError: pass return pycode