Source code for andes.routines.se

"""
Static and dynamic state estimation routine.
"""

import logging
from collections import OrderedDict

import numpy as np

from andes.routines.base import BaseRoutine
from andes.se.measurement import Measurements, StaticEvaluator
from andes.se.algorithms import wls
from andes.utils.misc import elapsed

logger = logging.getLogger(__name__)


[docs]class SE(BaseRoutine): """ State estimation routine. Supports static SE (single snapshot, WLS by default) and will support dynamic SE (time-stepping with EKF/UKF) in a future phase. Static SE estimates bus voltage magnitudes and angles from a set of measurements. Requires a converged power flow as starting point. Examples -------- Basic usage with auto-generated measurements:: ss = andes.load('ieee14.raw') ss.PFlow.run() ss.SE.run() With custom measurements:: from andes.se import Measurements m = Measurements(ss) m.add('Bus', 'v', idx=[1, 2, 6, 8], sigma=0.005) m.add_bus_injection(sigma_p=0.02, sigma_q=0.03) m.generate_from_pflow(seed=42) ss.SE.run(measurements=m) """
[docs] def __init__(self, system=None, config=None): super().__init__(system, config) self.config.add(OrderedDict((('tol', 1e-4), ('max_iter', 100), ('flat_start', 0), ('report', 1), ))) self.config.add_extra("_help", tol="convergence tolerance", max_iter="max number of SE iterations", flat_start="use flat start (1.0 pu, 0 rad) as initial guess", report="write output report", ) self.config.add_extra("_alt", tol="float", max_iter=">=1", flat_start=(0, 1), report=(0, 1), ) self.converged = False self.result = None self.measurements = None self.evaluator = None self._algorithm = None
def summary(self, algo=None): """Print SE configuration summary.""" if algo is None: algo = wls algo_name = getattr(algo, '__name__', str(algo)).upper() out = ['', '-> State Estimation', f'{"Method":>16s}: {algo_name}', f'{"Tolerance":>16s}: {self.config.tol}', f'{"Max iterations":>16s}: {self.config.max_iter}', ] logger.info('\n'.join(out)) def init(self, measurements=None, algorithm=None): """ Initialize state estimation. Parameters ---------- measurements : Measurements or None If None, auto-generates measurements from converged PFlow with default noise levels. algorithm : callable or None Estimation algorithm. Defaults to WLS. """ system = self.system if not system.PFlow.converged: logger.error("Power flow has not converged. Run PFlow first.") return False # Set up measurements if measurements is not None: self.measurements = measurements else: self.measurements = self._default_measurements() # Add reference bus angle constraint if no angle measurements exist self._ensure_angle_reference() if not self.measurements._finalized: self.measurements.finalize() # Build evaluator self.evaluator = StaticEvaluator(system, self.measurements) return True def run(self, measurements=None, algorithm=None, **kwargs): """ Run static state estimation. Parameters ---------- measurements : Measurements or None Measurement data. If None, auto-generates from PFlow. algorithm : callable or None Algorithm function with signature ``f(evaluator, x0, tol, max_iter) -> dict``. Defaults to WLS. Returns ------- bool Convergence status. """ system = self.system algo = algorithm if algorithm is not None else wls self._algorithm = algo self.summary(algo) t0, _ = elapsed() if not self.init(measurements=measurements, algorithm=algorithm): return False # Initial state guess nb = system.Bus.n if self.config.flat_start: x0 = np.zeros(2 * nb, dtype=float) x0[nb:] = 1.0 # Vm = 1.0 pu else: # Use PFlow solution as starting point x0 = np.concatenate([ np.array(system.Bus.a.v, dtype=float), np.array(system.Bus.v.v, dtype=float), ]) self.result = algo( self.evaluator, x0, tol=self.config.tol, max_iter=self.config.max_iter, ) self.converged = self.result['converged'] t1, s1 = elapsed(t0) self.exec_time = t1 - t0 if self.converged: logger.info('SE converged in %d iterations in %s.', self.result['n_iter'], s1) else: logger.warning('SE did not converge after %d iterations.', self.result['n_iter']) if self.config.report: self.report() system.exit_code = 0 if self.converged else 1 return self.converged def report(self): """Log a summary of SE results.""" if self.result is None: return nb = self.system.Bus.n nm = self.measurements.nm r = self.result out = ['', '-> SE Report', f'{"Converged":>16s}: {r["converged"]}', f'{"Iterations":>16s}: {r["n_iter"]}', f'{"Objective J":>16s}: {r["J"]:.6g}', f'{"Measurements":>16s}: {nm}', f'{"States":>16s}: {2 * nb}', f'{"Redundancy":>16s}: {nm / (2 * nb):.2f}x', ] if r['converged']: v_err = np.abs(self.v_est - np.array(self.system.Bus.v.v)) a_err = np.abs(self.a_est - np.array(self.system.Bus.a.v)) out.append(f'{"Max |dV|":>16s}: {np.max(v_err):.6g} pu') out.append(f'{"Max |da|":>16s}: {np.max(a_err):.6g} rad') logger.info('\n'.join(out)) def chi_squared_test(self, confidence=0.95): """ Perform chi-squared test on the SE result. Returns ------- tuple (passed, J, threshold, dof) where dof = nm - n_state. Notes ----- The chi-squared test is only valid for WLS, where the objective J follows a chi-squared distribution. Using it after a non-WLS algorithm (e.g. LAV) will log a warning and return ``passed=False``. """ from scipy.stats import chi2 if self.result is None: raise RuntimeError("No SE result available. Run SE first.") if getattr(self, '_algorithm', wls) is not wls: logger.warning("Chi-squared test is only valid for WLS. " "The last SE run used a different algorithm.") return (False, self.result['J'], float('inf'), 0) nm = self.measurements.nm n_state = 2 * self.system.Bus.n dof = nm - n_state if dof <= 0: logger.warning("Degrees of freedom <= 0 (nm=%d, n_state=%d). " "System may be unobservable.", nm, n_state) return (False, self.result['J'], float('inf'), dof) threshold = chi2.ppf(confidence, dof) passed = self.result['J'] < threshold return (passed, self.result['J'], threshold, dof) # ------------------------------------------------------------------ # Properties for convenient result access # ------------------------------------------------------------------ @property def v_est(self): """Estimated bus voltage magnitudes.""" if self.result is None: return None return self.result['x_est'][self.system.Bus.n:] @property def a_est(self): """Estimated bus voltage angles (radians).""" if self.result is None: return None return self.result['x_est'][:self.system.Bus.n] # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _default_measurements(self, seed=None): """Generate a default measurement set from PFlow results.""" m = Measurements(self.system) m.add_bus_voltage(sigma=0.01) m.add_bus_injection(sigma_p=0.02, sigma_q=0.03) m.generate_from_pflow(seed=seed) logger.info("No measurements provided; using default set " "(all bus voltages + all bus injections, %d measurements).", m.nm) return m def _ensure_angle_reference(self): """ Ensure each island has at least one angle reference measurement. For multi-island networks, each connected component needs its own angle reference to make the WLS gain matrix non-singular. Uses ``Bus.island_sets`` from connectivity analysis. """ system = self.system m = self.measurements Bus = system.Bus # Collect bus UIDs that already have angle measurements angle_uids = set() for i in range(m.nm): if m._kind[i] == 'direct' and m._models[i] == 'Bus' and m._vars[i] == 'a': angle_uids.add(Bus.idx2uid(m._idx[i])) # Get island topology (list of lists of bus UIDs) island_sets = Bus.island_sets if len(island_sets) == 0: island_sets = [list(range(Bus.n))] # Build slack bus UID → bus idx mapping slack_uid_to_idx = {} if hasattr(system, 'Slack') and system.Slack.n > 0: for i in range(system.Slack.n): bus_idx = system.Slack.bus.v[i] uid = Bus.idx2uid(bus_idx) slack_uid_to_idx[uid] = bus_idx for island in island_sets: island_set = set(island) # Skip if this island already has an angle measurement if angle_uids & island_set: continue # Pick a reference bus: prefer the slack bus in this island ref_bus = None for uid, bus_idx in slack_uid_to_idx.items(): if uid in island_set: ref_bus = bus_idx break if ref_bus is None: ref_bus = Bus.idx.v[island[0]] logger.info("Adding angle reference at bus %s for island " "with %d buses.", ref_bus, len(island)) # Use public API to add the pseudo-measurement m.add('Bus', 'a', idx=[ref_bus], sigma=1e-6) # If z was already generated, append the PFlow angle value if m.z is not None and len(m.z) == m.nm - 1: pflow_angle = float(Bus.a.v[Bus.idx2uid(ref_bus)]) m.z = np.append(m.z, pflow_angle)
# If z hasn't been set yet, it will be set by generate_from_pflow or finalize