Source code for andes.system.dae_compactor

"""
DAE compaction helpers for System.
"""

#  [ANDES] (C)2015-2024 Hantao Cui
#
#  This program is free software; you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation; either version 3 of the License, or
#  (at your option) any later version.

import logging

from andes.shared import np

logger = logging.getLogger(__name__)


[docs]class DAECompactor: """ Handle DAE algebraic-variable compaction for replaced static devices. """
[docs] def __init__(self, system): self.system = system
@staticmethod def _break_var_contiguity(var): """Mark a variable as non-contiguous so ``set_var_arrays`` allocates standalone arrays.""" var._contiguous = False var.v_inplace = False var.e_inplace = False def compact_dae(self): """ Detect replaced static devices and compact ``dae.y`` by removing their algebraic variable slots. Called during TDS initialization after ``init()``. """ excluded = self._detect_replaced_devices() if not excluded: return old_to_new, new_m, need_sink, all_replaced_models = \ self._build_y_compaction_map(excluded) self._compact_dae_y(old_to_new, new_m, need_sink, all_replaced_models) def _detect_replaced_devices(self): """ Scan TDS models for ``IdxParam`` with ``replaces=True``, mark referenced static devices as ``_replaced``, and set ``u=0``. Returns ------- dict ``{model_name: set_of_replaced_uids}`` """ system = self.system excluded = {} for mdl in system.exist.tds.values(): if mdl.n == 0: continue for p_instance in mdl.idx_params.values(): if not p_instance.replaces: continue group = system.groups[p_instance.model] for i in range(mdl.n): # only consider online dynamic devices as replacements if hasattr(mdl, 'u') and mdl.u.v[i] != 1: continue target_idx = p_instance.v[i] if target_idx is None: continue target_model = group.idx2model(target_idx) target_uid = target_model.idx2uid(target_idx) model_name = target_model.class_name if target_model._replaced is None: target_model._replaced = np.zeros(target_model.n, dtype=bool) target_model._replaced[target_uid] = True excluded.setdefault(model_name, set()).add(target_uid) # set u=0 and ue=0 on all replaced devices for model_name, uids in excluded.items(): mdl = system.__dict__[model_name] for uid in uids: mdl.u.v[uid] = 0 mdl.ue.v[uid] = 0 return excluded def _build_y_compaction_map(self, excluded): """ Build mapping from old ``dae.y`` indices to new compact indices. Parameters ---------- excluded : dict ``{model_name: set_of_replaced_uids}`` from ``_detect_replaced_devices`` Returns ------- old_to_new : np.ndarray Mapping array (``-1`` for removed indices). new_m : int New algebraic variable count (excluding sink). need_sink : bool Whether a sink slot is needed for partial replacement. all_replaced_models : set Model names where every device is replaced. """ system = self.system old_m = system.dae.m remove_indices = set() all_replaced_models = set() for model_name, uids in excluded.items(): mdl = system.__dict__[model_name] if len(uids) == mdl.n: all_replaced_models.add(model_name) # collect internal Algeb addresses for replaced devices for var in mdl.algebs.values(): for uid in uids: remove_indices.add(var.a[uid]) # build old-to-new mapping old_to_new = np.full(old_m, -1, dtype=int) new_idx = 0 for old_idx in range(old_m): if old_idx not in remove_indices: old_to_new[old_idx] = new_idx new_idx += 1 new_m = new_idx # need sink if any model has partial replacement need_sink = len(all_replaced_models) < len(excluded) return old_to_new, new_m, need_sink, all_replaced_models def _compact_dae_y(self, old_to_new, new_m, need_sink, all_replaced_models): """ Compact ``dae.y`` by removing replaced devices' algebraic variable slots, remap addresses, and refresh model bindings. Parameters ---------- old_to_new : np.ndarray Mapping from old to new indices (``-1`` for removed). new_m : int New algebraic variable count (excluding sink). need_sink : bool Whether a sink slot is needed. all_replaced_models : set Models where every device is replaced. """ system = self.system old_m = system.dae.m keep = old_to_new >= 0 # --- resize / compact arrays --- if need_sink: sink_idx = new_m system.dae.m = new_m + 1 else: sink_idx = None system.dae.m = new_m system._y_sink_idx = sink_idx system._all_replaced_models = all_replaced_models new_y = np.zeros(system.dae.m) new_y[old_to_new[keep]] = system.dae.y[keep] system.dae.y = new_y system.dae.g = np.zeros(system.dae.m) old_y_name = system.dae.y_name[:] old_y_tex_name = system.dae.y_tex_name[:] system.dae.y_name = [''] * system.dae.m system.dae.y_tex_name = [''] * system.dae.m old_y_map = dict(system.dae.y_map) system.dae.y_map = {} for old_idx in np.where(keep)[0]: ni = old_to_new[old_idx] system.dae.y_name[ni] = old_y_name[old_idx] system.dae.y_tex_name[ni] = old_y_tex_name[old_idx] if old_idx in old_y_map: system.dae.y_map[ni] = old_y_map[old_idx] # --- remap var.a and break contiguity --- def remap_a(a): new_a = old_to_new[a] if need_sink: new_a[new_a == -1] = sink_idx return new_a for mdl in system.exist.pflow_tds.values(): if mdl.n == 0 or mdl.class_name in all_replaced_models: continue for var in mdl.cache.algebs_and_ext.values(): var.a = remap_a(var.a) self._break_var_contiguity(var) # break contiguity for all-replaced models so set_var_arrays # allocates fresh arrays instead of in-place views into dae.y for model_name in all_replaced_models: mdl = system.__dict__[model_name] for var in mdl.cache.vars_int.values(): self._break_var_contiguity(var) # --- rebind views and refresh model inputs --- system.set_var_arrays(models=system.exist.pflow_tds) for model in system.exist.pflow_tds.values(): if model.n > 0: model.get_inputs(refresh=True) # --- log --- n_removed = np.count_nonzero(old_to_new == -1) logger.info("DAE compaction: removed %d algebraic variable slots " "(m: %d -> %d)", n_removed, old_m, system.dae.m) if need_sink: logger.debug(" Sink slot at index %d", sink_idx) if all_replaced_models: logger.debug(" All-replaced models: %s", ', '.join(sorted(all_replaced_models)))