Source code for andes.routines.eig

"""
Module for eigenvalue analysis.
"""

import logging
import scipy.io
import numpy as np

from math import ceil, pi
from scipy.linalg import solve

from andes.io.txt import dump_data
from andes.utils.misc import elapsed
from andes.routines.base import BaseRoutine
from andes.variables.report import report_info

from andes.plot import set_latex

from andes.shared import matrix, spmatrix, sparse, plt, mpl
from andes.shared import div, spdiag


logger = logging.getLogger(__name__)


[docs]class EIG(BaseRoutine): """ Eigenvalue analysis routine """
[docs] def __init__(self, system, config): super().__init__(system=system, config=config) self.config.add(plot=0, tol=1e-6) self.config.add_extra("_help", plot="show plot after computation", tol="numerical tolerance to treat eigenvalues as zeros") self.config.add_extra("_alt", plot=(0, 1)) # internal flags and storage self.As = None # state matrix after removing the ones associated with zero T consts self.Asc = None # the original complete As without reordering self.mu = None # eigenvalues self.N = None # right eigenvectors self.W = None # left eigenvectors self.pfactors = None # --- related to states with zero time constants (zs) --- self.zstate_idx = np.array([], dtype=int) self.nz_counts = None # --- statistics -- self.n_positive = 0 self.n_zeros = 0 self.n_negative = 0 self.x_name = []
[docs] def calc_As(self, dense=True): r""" Return state matrix and store to ``self.As``. Notes ----- For systems in the mass-matrix formulation, .. math :: T \dot{x} = f(x, y) \\ 0 = g(x, y) Assume `T` is non-singular, the state matrix is calculated from .. math :: A_s = T^{-1} (f_x - f_y * g_y^{-1} * g_x) Returns ------- kvxopt.matrix state matrix """ dae = self.system.dae self.find_zero_states() self.x_name = np.array(dae.x_name) self.As = self._reduce(dae.fx, dae.fy, dae.gx, dae.gy, dae.Tf, dense=dense) if len(self.zstate_idx) > 0: self.Asc = self.As self.As = self._reduce(*self._reorder()) return self.As
def _reduce(self, fx, fy, gx, gy, Tf, dense=True): """ Reduce algebraic equations (or states associated with zero time constants). Returns ------- spmatrix The reduced state matrix """ gyx = matrix(gx) self.solver.linsolve(gy, gyx) Tfnz = Tf + np.ones_like(Tf) * np.equal(Tf, 0.0) iTf = spdiag((1 / Tfnz).tolist()) if dense: return iTf * (fx - fy * gyx) else: return sparse(iTf * (fx - fy * gyx)) def _reorder(self): """ reorder As by moving rows and cols associated with zero time constants to the end. Returns `fx`, `fy`, `gx`, `gy`, `Tf`. """ dae = self.system.dae rows = np.arange(dae.n, dtype=int) cols = np.arange(dae.n, dtype=int) vals = np.ones(dae.n, dtype=float) swaps = [] bidx = self.nz_counts for ii in range(dae.n - self.nz_counts): if ii in self.zstate_idx: while (bidx in self.zstate_idx): bidx += 1 cols[ii] = bidx rows[bidx] = ii swaps.append((ii, bidx)) # swap the variable names for fr, bk in swaps: bk_name = self.x_name[bk] self.x_name[fr] = bk_name self.x_name = self.x_name[:self.nz_counts] # compute the permutation matrix for `As` containing non-states perm = spmatrix(matrix(vals), matrix(rows), matrix(cols)) As_perm = perm * sparse(self.As) * perm self.As_perm = As_perm nfx = As_perm[:self.nz_counts, :self.nz_counts] nfy = As_perm[:self.nz_counts, self.nz_counts:] ngx = As_perm[self.nz_counts:, :self.nz_counts] ngy = As_perm[self.nz_counts:, self.nz_counts:] nTf = np.delete(self.system.dae.Tf, self.zstate_idx) return nfx, nfy, ngx, ngy, nTf
[docs] def calc_eig(self, As=None): """ Calculate eigenvalues and right eigen vectors. This function is a wrapper to ``np.linalg.eig``. Results are returned but not stored to ``EIG``. Returns ------- np.array(dtype=complex) eigenvalues np.array() right eigenvectors """ if As is None: As = self.As # `mu`: eigenvalues, `N`: right eigenvectors with each column corr. to one eigvalue mu, N = np.linalg.eig(As) return mu, N
def _store_stats(self): """ Count and store the number of eigenvalues with positive, zero, and negative real parts using ``self.mu``. """ mu_real = self.mu.real self.n_positive = np.count_nonzero(mu_real > self.config.tol) self.n_zeros = np.count_nonzero(abs(mu_real) <= self.config.tol) self.n_negative = np.count_nonzero(mu_real < self.config.tol) return True
[docs] def calc_pfactor(self, As=None): """ Compute participation factor of states in eigenvalues. Each row in the participation factor correspond to one state, and each column correspond to one mode. Parameters ---------- As : np.array or None State matrix to process. If None, use ``self.As``. Returns ------- np.array(dtype=complex) eigenvalues np.array participation factor matrix """ mu, N = self.calc_eig(As) n_state = len(mu) # --- calculate the left eig vector and store to ``W``` # based on orthogonality that `W.T @ N = I`, # left eigenvector is `inv(N)^T` Weye = np.eye(n_state) WT = solve(N, Weye, overwrite_b=True) W = WT.T # --- calculate participation factor --- pfactor = np.abs(W) * np.abs(N) b = np.ones(n_state) W_abs = b @ pfactor pfactor = pfactor.T # --- normalize participation factor --- for item in range(n_state): pfactor[:, item] /= W_abs[item] pfactor = np.round(pfactor, 5) return mu, pfactor, N, W
[docs] def summary(self): """ Print out a summary to ``logger.info``. """ out = list() out.append('') out.append('-> Eigenvalue Analysis:') out_str = '\n'.join(out) logger.info(out_str)
[docs] def find_zero_states(self): """ Find the indices of states associated with zero time constants in ``x``. """ system = self.system self.zstate_idx = np.array([], dtype=int) if sum(system.dae.Tf != 0) != len(system.dae.Tf): self.zstate_idx = np.where(system.dae.Tf == 0)[0] logger.info("%d states are associated with zero time constants. ", len(self.zstate_idx)) logger.debug([system.dae.x_name[i] for i in self.zstate_idx]) self.nz_counts = system.dae.n - len(self.zstate_idx)
def _pre_check(self): """ Helper function for pre-computation checks. """ system = self.system status = True if system.PFlow.converged is False: logger.warning('Power flow not solved. Eig analysis will not continue.') status = False if system.TDS.initialized is False: system.TDS.init() system.TDS.itm_step() elif system.dae.n == 0: logger.error('No dynamic model. Eig analysis will not continue.') status = False return status
[docs] def run(self, **kwargs): """ Run small-signal stability analysis. """ succeed = False system = self.system if not self._pre_check(): system.exit_code += 1 return False self.summary() t1, s = elapsed() self.calc_As() self.mu, self.pfactors, self.N, self.W = self.calc_pfactor() self._store_stats() _, s = elapsed(t1) logger.info(' Positive %6g', self.n_positive) logger.info(' Zeros %6g', self.n_zeros) logger.info(' Negative %6g', self.n_negative) logger.info('Eigenvalue analysis finished in {:s}.'.format(s)) if not self.system.files.no_output: self.report() if system.options.get('state_matrix'): self.export_mat() if self.config.plot: self.plot() succeed = True if not succeed: system.exit_code += 1 return succeed
[docs] def plot(self, mu=None, fig=None, ax=None, left=-6, right=0.5, ymin=-8, ymax=8, damping=0.05, line_width=0.5, dpi=100, figsize=None, base_color='black', show=True, latex=True): """ Plot utility for eigenvalues in the S domain. Parameters ---------- mu : array, optional an array of complex eigenvalues fig : figure handl, optional existing matplotlib figure handle ax : axis handle, optional existing axis handle left : int, optional left tick for the x-axis, by default -6 right : float, optional right tick, by default 0.5 ymin : int, optional bottom tick, by default -8 ymax : int, optional top tick, by default 8 damping : float, optional damping value for which the dash plots are drawn line_width : float, optional default line width, by default 0.5 dpi : int, optional figure dpi, by default 100 figsize : [type], optional default figure size, by default None base_color : str, optional base color for negative eigenvalues show : bool, optional True to show figure after plot, by default True latex : bool, optional True to use latex, by default True Returns ------- figure matplotlib figure object axis matplotlib axis object """ mpl.rc('font', family='Times New Roman', size=12) if mu is None: mu = self.mu mu_real = mu.real mu_imag = mu.imag p_mu_real, p_mu_imag = list(), list() z_mu_real, z_mu_imag = list(), list() n_mu_real, n_mu_imag = list(), list() for re, im in zip(mu_real, mu_imag): if abs(re) <= self.config.tol: z_mu_real.append(re) z_mu_imag.append(im) elif re > self.config.tol: p_mu_real.append(re) p_mu_imag.append(im) elif re < -self.config.tol: n_mu_real.append(re) n_mu_imag.append(im) if latex: set_latex() if fig is None or ax is None: fig = plt.figure(dpi=dpi, figsize=figsize) ax = plt.gca() ax.scatter(z_mu_real, z_mu_imag, marker='o', s=40, linewidth=0.5, facecolors='none', edgecolors='green') ax.scatter(n_mu_real, n_mu_imag, marker='x', s=40, linewidth=0.5, color=base_color) ax.scatter(p_mu_real, p_mu_imag, marker='x', s=40, linewidth=0.5, color='red') # axes lines ax.axhline(linewidth=0.5, color='grey', linestyle='--') ax.axvline(linewidth=0.5, color='grey', linestyle='--') # TODO: Improve the damping and range # --- plot 5% damping lines --- xin = np.arange(left, 0, 0.01) yneg = xin / damping ypos = - xin / damping ax.plot(xin, yneg, color='grey', linewidth=line_width, linestyle='--') ax.plot(xin, ypos, color='grey', linewidth=line_width, linestyle='--') # --- damping lines end --- if latex: ax.set_xlabel('Real [$s^{-1}$]') ax.set_ylabel('Imaginary [$s^{-1}$]') else: ax.set_xlabel('Real [s -1]') ax.set_ylabel('Imaginary [s -1]') ax.set_xlim(left=left, right=right) ax.set_ylim(ymin, ymax) if show is True: plt.show() return fig, ax
[docs] def export_mat(self): """ Export state matrix to a ``<CaseName>_As.mat`` file with the variable name ``As``, where ``<CaseName>`` is the test case name. State variable names are stored in variables ``x_name`` and ``x_tex_name``. Returns ------- bool True if successful """ system = self.system out = {'As': self.As, 'Asc': self.Asc, 'x_name': np.array(system.dae.x_name, dtype=object), 'x_tex_name': np.array(system.dae.x_tex_name, dtype=object), } scipy.io.savemat(system.files.mat, mdict=out) logger.info('State matrix saved to "%s"', system.files.mat) return True
[docs] def post_process(self): """ Post processing of eigenvalues. """ # --- statistics --- n_states = len(self.mu) mu_real = self.mu.real numeral = [''] * n_states for idx, item in enumerate(range(n_states)): if mu_real[idx] == 0: marker = '*' elif mu_real[idx] > 0: marker = '**' else: marker = '' numeral[idx] = '#' + str(idx + 1) + marker # compute frequency, un-damped frequency and damping freq = np.zeros(n_states) ufreq = np.zeros(n_states) damping = np.zeros(n_states) for idx, item in enumerate(self.mu): if item.imag == 0: freq[idx] = 0 ufreq[idx] = 0 damping[idx] = 0 else: ufreq[idx] = abs(item) / 2 / pi freq[idx] = abs(item.imag / 2 / pi) damping[idx] = -div(item.real, abs(item)) * 100 return freq, ufreq, damping, numeral
[docs] def report(self, x_name=None, **kwargs): """ Save eigenvalue analysis reports. Returns ------- None """ if x_name is None: x_name = self.x_name n_states = len(self.mu) mu_real = self.mu.real mu_imag = self.mu.imag freq, ufreq, damping, numeral = self.post_process() # obtain most associated variables var_assoc = [] for prow in range(n_states): temp_row = self.pfactors[prow, :] name_idx = list(temp_row).index(max(temp_row)) var_assoc.append(x_name[name_idx]) text, header, rowname, data = list(), list(), list(), list() # opening info section text.append(report_info(self.system)) header.append(None) rowname.append(None) data.append(None) text.append('') text.append('EIGENVALUE ANALYSIS REPORT') header.append([]) rowname.append([]) data.append([]) text.append('STATISTICS\n') header.append(['']) rowname.append(['Positives', 'Zeros', 'Negatives']) data.append((self.n_positive, self.n_zeros, self.n_negative)) text.append('EIGENVALUE DATA\n') header.append([ 'Most Associated', 'Real', 'Imag.', 'Damped Freq.', 'Frequency', 'Damping [%]']) rowname.append(numeral) data.append( [var_assoc, list(mu_real), list(mu_imag), freq, ufreq, damping]) n_cols = 7 # columns per block n_block = int(ceil(n_states / n_cols)) if n_block <= 100: for idx in range(n_block): start = n_cols * idx end = n_cols * (idx + 1) text.append('PARTICIPATION FACTORS [{}/{}]\n'.format( idx + 1, n_block)) header.append(numeral[start:end]) rowname.append(x_name) data.append(self.pfactors[start:end, :]) dump_data(text, header, rowname, data, self.system.files.eig) logger.info('Report saved to "%s".', self.system.files.eig)