"""
Module for power flow calculation.
"""
import logging
from collections import OrderedDict
from andes.utils.misc import elapsed
from andes.routines.base import BaseRoutine
from andes.variables.report import Report
from andes.shared import np, matrix, sparse, newton_krylov
logger = logging.getLogger(__name__)
[docs]class PFlow(BaseRoutine):
"""
Power flow calculation routine.
Power flow analysis currently supports limiting reactive power (needs to to be
turned on via `config.pv2pq`) but does not enforce voltage limits.
"""
[docs] def __init__(self, system=None, config=None):
super().__init__(system, config)
self.config.add(OrderedDict((('tol', 1e-6),
('max_iter', 25),
('method', 'NR'),
('n_factorize', 4),
('report', 1),
('degree', 0),
('init_tds', 0),
('linesearch', 1),
)))
self.config.add_extra("_help",
tol="convergence tolerance",
max_iter="max. number of iterations",
method="calculation method",
n_factorize="first N iterations to factorize Jacobian in dishonest method",
report="write output report",
degree='use degree in report',
init_tds="initialize TDS after PFlow",
linesearch="backtracking line search for NR",
)
self.config.add_extra("_alt",
tol="float",
method=("NR", "dishonest", "NK"),
max_iter=">=10",
n_factorize=">0",
report=(0, 1),
degree=(0, 1),
init_tds=(0, 1),
linesearch=(0, 1),
)
self.converged = False
self.inc = None
self.A = None
self.niter = 0
self.mis = [1]
self.models = OrderedDict()
self.x_sol = None
self.y_sol = None
def init(self):
"""
Initialize variables for power flow.
"""
system = self.system
t0, _ = elapsed()
self.models = system.find_models('pflow')
self.converged = False
self.res = matrix(0, (system.dae.n + system.dae.m, 1), 'd')
self.A = None
self.niter = 0
self.mis = [1]
self.exec_time = 0.0
self.x_sol = None
self.y_sol = None
self.system.set_var_arrays(self.models, inplace=True, alloc=False)
self.system.init(self.models, routine='pflow')
_, s1 = elapsed(t0)
logger.info('Power flow initialized in %s.', s1)
# force compile if numba is on - improves timing accuracy
if system.runtime.numba:
t0, _ = elapsed()
system.f_update(self.models)
system.g_update(self.models)
system.j_update(models=self.models)
_, s1 = elapsed(t0)
logger.info('Numba compilation for power flow finished in %s.', s1)
return system.dae.xy
def nr_step(self):
"""
Solve a single iteration step using the Newton-Raphson method.
Returns
-------
float
maximum absolute mismatch
"""
system = self.system
# ---------- Build numerical DAE----------
self.fg_update()
# ---------- update the Jacobian on conditions ----------
if self.config.method != 'dishonest' or (self.niter < self.config.n_factorize):
system.j_update(self.models)
self.solver.worker.new_A = True
# ---------- prepare and solve linear equations ----------
self.res[:system.dae.n] = -system.dae.f[:]
self.res[system.dae.n:] = -system.dae.g[:]
self.A = sparse([[system.dae.fx, system.dae.gx],
[system.dae.fy, system.dae.gy]])
if not self.config.linsolve:
self.inc = self.solver.solve(self.A, self.res)
else:
self.inc = self.solver.linsolve(self.A, self.res)
system.dae.x += np.ravel(self.inc[:system.dae.n])
system.dae.y += np.ravel(self.inc[system.dae.n:])
mis = self._max_mis()
system.vars_to_models()
return mis
def _max_mis(self):
"""
Compute max absolute mismatch from current dae.f and dae.g.
"""
system = self.system
fmax = 0
if system.dae.n > 0:
fmax_idx = np.argmax(np.abs(system.dae.f))
fmax = system.dae.f[fmax_idx]
logger.debug("Max. diff mismatch %.10g on %s", fmax, system.dae.x_name[fmax_idx])
gmax_idx = np.argmax(np.abs(system.dae.g))
gmax = system.dae.g[gmax_idx]
logger.debug("Max. algeb mismatch %.10g on %s", gmax, system.dae.y_name[gmax_idx])
return max(abs(fmax), abs(gmax))
def nr_solve(self):
"""
Solve the power flow problem using Newton-Raphson iteration.
When ``config.linesearch`` is enabled, applies nonmonotone backtracking
line search (Grippo-Lampariello-Lucidi). The loop is structured so that
the trial-point evaluation at the end of iteration *k* doubles as the
initial evaluation for iteration *k+1*, making the line search zero-cost
when the full step is accepted.
"""
system = self.system
self.niter = 0
use_ls = self.config.linesearch
if use_ls:
x0 = np.empty_like(system.dae.x)
y0 = np.empty_like(system.dae.y)
merit_history = []
# initial residual (reused by first iteration)
self.fg_update()
while True:
mis = self._max_mis()
logger.info('%d: |F(x)| = %.10g', self.niter, mis)
if self.niter == 0:
self.mis[0] = mis
else:
self.mis.append(mis)
if mis < self.config.tol:
self.converged = True
system.vars_to_models()
break
if self.niter > self.config.max_iter:
break
if np.isnan(mis).any():
logger.error('NaN found in solution. Convergence is not likely')
break
if mis > 1e4 * self.mis[0]:
logger.error('Mismatch increased too fast. Convergence is not likely.')
break
# --- Jacobian update ---
if self.config.method != 'dishonest' or self.niter < self.config.n_factorize:
system.j_update(self.models)
self.solver.worker.new_A = True
# --- solve linear system ---
self.res[:system.dae.n] = -system.dae.f[:]
self.res[system.dae.n:] = -system.dae.g[:]
self.A = sparse([[system.dae.fx, system.dae.gx],
[system.dae.fy, system.dae.gy]])
if not self.config.linsolve:
self.inc = self.solver.solve(self.A, self.res)
else:
self.inc = self.solver.linsolve(self.A, self.res)
inc_x = np.ravel(self.inc[:system.dae.n])
inc_y = np.ravel(self.inc[system.dae.n:])
# --- apply step ---
if use_ls:
merit_cur = np.dot(system.dae.f, system.dae.f) + np.dot(system.dae.g, system.dae.g)
merit_history.append(merit_cur)
merit_ref = max(merit_history[-3:])
x0[:] = system.dae.x
y0[:] = system.dae.y
alpha = 1.0
for _ in range(4):
system.dae.x[:] = x0 + alpha * inc_x
system.dae.y[:] = y0 + alpha * inc_y
system.vars_to_models()
self.fg_update()
merit_new = np.dot(system.dae.f, system.dae.f) + np.dot(system.dae.g, system.dae.g)
if merit_new < merit_ref:
break
alpha *= 0.5
logger.debug("Line search backtrack: alpha=%.4g", alpha)
else:
system.dae.x[:] += inc_x
system.dae.y[:] += inc_y
system.vars_to_models()
self.fg_update()
self.niter += 1
return self.converged
def summary(self):
"""
Output a summary for the PFlow routine.
"""
# extract package name of the solver
sp_module = sparse.__module__
if '.' in sp_module:
sp_module = sp_module.split('.')[0]
out = list()
out.append('')
out.append('-> Power flow calculation')
out.append(f'{"Numba":>16s}: {"On" if self.system.runtime.numba else "Off"}')
out.append(f'{"Sparse solver":>16s}: {self.solver.sparselib.upper()}')
out.append(f'{"Solution method":>16s}: {self.config.method} method')
out_str = '\n'.join(out)
logger.info(out_str)
def run(self, **kwargs):
"""
Solve the power flow using the selected method.
Returns
-------
bool
convergence status
"""
system = self.system
self.summary()
self.init()
if system.dae.m == 0:
logger.error("Loaded case contains no power flow element.")
system.exit_code = 1
return False
method = self.config.method.lower()
t0, _ = elapsed()
# ---------- Call solution methods ----------
if method in ('nr', 'dishonest'):
self.nr_solve()
elif method == 'nk':
self.newton_krylov()
t1, s1 = elapsed(t0)
self.exec_time = t1 - t0
if not self.converged:
if abs(self.mis[-1] - self.mis[-2]) < self.config.tol:
max_idx = np.argmax(np.abs(system.dae.xy))
name = system.dae.xy_name[max_idx]
logger.error('Mismatch is not correctable possibly due to large load-generation imbalance.')
logger.error('Largest mismatch on equation associated with <%s>', name)
else:
logger.error('Power flow failed after %d iterations for "%s".', self.niter + 1, system.files.case)
else:
logger.info('Converged in %d iterations in %s.', self.niter + 1, s1)
# make a copy of power flow solutions
self.x_sol = system.dae.x.copy()
self.y_sol = system.dae.y.copy()
# evaluate observable variables post-solve
system.b_update(self.models)
if self.config.init_tds:
system.TDS.init()
if self.config.report:
system.PFlow.report()
system.exit_code = 0 if self.converged else 1
return self.converged
def report(self):
"""
Write power flow report to a plain-text file.
Returns
-------
bool
True if report was written, False otherwise.
"""
if self.system.files.no_output is False:
r = Report(self.system)
r.write()
return True
return False
def _set_xy(self, xy):
"""
Helper function to set values for variables.
"""
system = self.system
system.dae.x[:] = xy[:system.dae.n]
system.dae.y[:] = xy[system.dae.n:]
system.vars_to_models()
def _fg_wrapper(self, xy):
"""
Wrapper for algebraic equations to be used with Newton-Krylov general solver
Parameters
----------
xy
Returns
-------
"""
self._set_xy(xy)
self.fg_update()
return self.system.dae.fg
def fg_update(self):
"""
Evaluate the limiters and residual equations.
"""
system = self.system
system.dae.clear_fg()
system.l_update_var(self.models, niter=self.niter, err=self.mis[-1])
system.s_update_var(self.models)
system.f_update(self.models)
system.g_update(self.models)
system.l_update_eq(self.models, niter=0)
system.fg_to_dae()
def newton_krylov(self, verbose=True):
"""
Full Newton-Krylov method from SciPy.
Warnings
--------
The result might be wrong if discrete are in use!
Parameters
----------
verbose
True if verbose.
Returns
-------
bool
Convergence status
"""
system = self.system
v0 = system.dae.xy
try:
ret = newton_krylov(self._fg_wrapper, v0, verbose=verbose)
self._set_xy(ret)
self.converged = True
except ValueError as e:
logger.error('Mismatch is not correctable. Equations may be unsolvable.')
logger.error(e)
self.converged = False
return self.converged