Source code for andes.system.codegen

"""
Code generation and pycode loading helpers for System.
"""

#  [ANDES] (C)2015-2024 Hantao Cui
#
#  This program is free software; you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation; either version 3 of the License, or
#  (at your option) any later version.

import importlib
import importlib.util
import inspect
import logging
import os
import sys
import time
from collections import OrderedDict
from typing import Union

from andes.core import Model
from andes.models import file_classes
from andes.shared import NCPUS, Pool, Process, dilled_vars, numba
from andes.utils.misc import elapsed
from andes.utils.paths import andes_root, get_pycode_path

logger = logging.getLogger(__name__)


[docs]class CodegenManager: """ Manage symbolic code generation, loading and pre-compilation. """
[docs] def __init__(self, system): self.system = system
def prepare(self, quick=False, incremental=False, models=None, nomp=False, ncpu=NCPUS): """ Generate numerical functions from symbolically defined models. All procedures in this function must be independent of test case. Parameters ---------- quick : bool, optional True to skip pretty-print generation to reduce code generation time. incremental : bool, optional True to generate only for modified models, incrementally. models : list, OrderedDict, None List or OrderedList of models to prepare nomp : bool True to disable multiprocessing Notes ----- Option ``incremental`` compares the md5 checksum of all var and service strings, and only regenerate for updated models. Examples -------- If one needs to print out LaTeX-formatted equations in a Jupyter Notebook, one need to generate such equations with :: import andes sys = andes.prepare() Alternatively, one can explicitly create a System and generate the code :: import andes sys = andes.System() sys.prepare() Warnings -------- Generated lambda functions will be serialized to file, but pretty prints (SymPy objects) can only exist in the System instance on which prepare is called. """ system = self.system if incremental is True: mode_text = 'rapid incremental mode' elif quick is True: mode_text = 'quick mode' else: mode_text = 'full mode' logger.info('Numerical code generation (%s) started...', mode_text) t0, _ = elapsed() # consistency check for group parameters and variables system.registry.check_group_common() # get `pycode` folder path without automatic creation pycode_path = get_pycode_path(system.options.get("pycode_path"), mkdir=False) # determine which models to prepare based on mode and `models` list. if incremental and models is None: if not system.with_calls: self._load_calls() models = self._find_stale_models() elif not incremental and models is None: models = system.models else: models = system._get_models(models) total = len(models) if nomp is False: logger.info("Generating code for %d models on %d processes.", total, ncpu) self._mp_prepare(models, quick, pycode_path, ncpu=ncpu) else: for idx, (name, model) in enumerate(models.items()): logger.info("Generating code for %s (%d/%d).", name, idx + 1, total) model.prepare(quick=quick, pycode_path=pycode_path) if len(models) > 0: self._finalize_pycode(pycode_path) self._store_calls(models) _, s = elapsed(t0) logger.info('Generated numerical code for %d models in %s.', len(models), s) def _mp_prepare(self, models, quick, pycode_path, ncpu): """ Wrapper for multiprocessed code generation. Parameters ---------- models : OrderedDict model name : model instance pairs quick : bool True to skip LaTeX string generation pycode_path : str Path to store `pycode` folder ncpu : int Number of processors to use """ # create empty models without dependency if len(models) == 0: return system = self.system model_names = list(models.keys()) model_list = list() for fname, cls_list in file_classes: for model_name in cls_list: if model_name not in model_names: continue the_module = importlib.import_module('andes.models.' + fname) the_class = getattr(the_module, model_name) model_list.append(the_class(system=None, config=system._config_object)) yapf_pycode = system.runtime.yapf_pycode def _prep_model(model: Model): """ Wrapper function to call prepare on a model. """ model.prepare(quick=quick, pycode_path=pycode_path, yapf_pycode=yapf_pycode ) Pool(ncpu).map(_prep_model, model_list) def _finalize_pycode(self, pycode_path): """ Helper function for finalizing pycode generation by writing ``__init__.py`` and reloading ``pycode`` package. """ import andes system = self.system init_path = os.path.join(pycode_path, '__init__.py') with open(init_path, 'w') as f: f.write(f"__version__ = '{andes.__version__}'\n\n") for name in system.models.keys(): f.write(f"from . import {name:20s} # NOQA\n") f.write('\n') logger.info('Saved generated pycode to "%s"', pycode_path) # RELOAD REQUIRED as the generated Jacobian arguments may be in a different order importlib.invalidate_caches() self._load_calls() def _find_stale_models(self): """ Find models whose ModelCall are stale using md5 checksum. """ out = OrderedDict() for model in self.system.models.values(): calls_md5 = getattr(model.calls, 'md5', None) if calls_md5 != model.get_md5(): out[model.class_name] = model return out def _init_numba(self, models: OrderedDict): """ Helper function to compile all functions with Numba before init. """ system = self.system if not system.runtime.numba: return try: getattr(numba, '__version__') except ImportError: # numba not installed logger.warning("numba is enabled but not installed. Please install numba manually.") system.runtime.numba = 0 return False use_parallel = bool(system.runtime.numba_parallel) nopython = bool(system.runtime.numba_nopython) logger.info("Numba compilation initiated with caching.") for mdl in models.values(): mdl.numba_jitify(parallel=use_parallel, nopython=nopython, ) return True def precompile(self, models: Union[OrderedDict, None] = None, nomp: bool = False, ncpu: int = NCPUS): """ Trigger precompilation for the given models. Arguments are the same as ``prepare``. """ system = self.system t0, _ = elapsed() if models is None: models = system.models else: models = system._get_models(models) # turn on numba for precompilation system.runtime.numba = 1 system.setup() numba_ok = self._init_numba(models) if not numba_ok: return def _precompile_model(model: Model): model.precompile() logger.info("Compilation in progress. This might take a minute...") if nomp is True: for name, mdl in models.items(): _precompile_model(mdl) logger.debug("Model <%s> compiled.", name) # multi-processed implementation. `Pool.map` runs very slow somehow. else: jobs = [] for idx, (name, mdl) in enumerate(models.items()): job = Process( name='Process {0:d}'.format(idx), target=_precompile_model, args=(mdl,), ) jobs.append(job) job.start() if (idx % ncpu == ncpu - 1) or (idx == len(models) - 1): time.sleep(0.02) for job in jobs: job.join() jobs = [] _, s = elapsed(t0) logger.info('Numba compiled %d model%s in %s.', len(models), '' if len(models) == 1 else 's', s) def undill(self, autogen_stale=True): """ Reload generated function functions, from either the ``$HOME/.andes/pycode`` folder. If no change is made to models, future calls to ``prepare()`` can be replaced with ``undill()`` for acceleration. Parameters ---------- autogen_stale: bool True to automatically call code generation if stale code is detected. Regardless of this option, codegen is trigger if importing existing code fails. """ # load equations and jacobian from saved code loaded = self._load_calls() stale_models = self._find_stale_models() if loaded is False: self.prepare(quick=True, incremental=False) loaded = True elif len(stale_models) == 0: pass elif autogen_stale is False: # NOTE: incremental code generation may be triggered due to Python # not invalidating ``.pyc`` caches. If multiprocessing is being # used, code generation will cause nested multiprocessing, which is # not allowed. # The flag ``autogen_stale=False`` is used to prevent nested codegen # and is intended to be used only in multiprocessing. logger.info("Generated code for <%s> is stale.", ', '.join(stale_models.keys())) logger.info("Automatic code re-generation manually skipped") loaded = True else: logger.info("Generated code for <%s> is stale.", ', '.join(stale_models.keys())) self.prepare(quick=True, incremental=True, models=stale_models) loaded = True return loaded def _load_calls(self): """ Helper function for loading generated numerical functions from the ``pycode`` module. """ loaded = False user_pycode_path = self.system.options.get("pycode_path") pycode = import_pycode(user_pycode_path=user_pycode_path) if pycode: try: self._expand_pycode(pycode) loaded = True except KeyError: logger.error("Your generated pycode is broken. Run `andes prep` to re-generate. ") self.system.with_calls = loaded return loaded def _expand_pycode(self, pycode_module): """ Expand imported ``pycode`` module to model calls. Parameters ---------- pycode : module The module for generated code for models. """ system = self.system for name, model in system.models.items(): if name not in pycode_module.__dict__: logger.debug("Model %s does not exist in pycode", name) continue pycode_model = pycode_module.__dict__[model.class_name] # md5 model.calls.md5 = getattr(pycode_model, 'md5', None) # reload stored variables for item in dilled_vars: model.calls.__dict__[item] = pycode_model.__dict__[item] # equations model.calls.f = pycode_model.__dict__.get("f_update") model.calls.g = pycode_model.__dict__.get("g_update") # observables model.calls.b = pycode_model.__dict__.get("b_update") # services for instance in model.services.values(): if (instance.v_str is not None) and instance.sequential is True: sv_name = f'{instance.name}_svc' model.calls.s[instance.name] = pycode_model.__dict__[sv_name] # services - non sequential model.calls.sns = pycode_model.__dict__.get("sns_update") # load initialization; assignment for instance in model.cache.all_vars.values(): if instance.v_str is not None: ia_name = f'{instance.name}_ia' model.calls.ia[instance.name] = pycode_model.__dict__[ia_name] # load initialization: iterative for item in model.calls.init_seq: if isinstance(item, list): name_concat = '_'.join(item) model.calls.ii[name_concat] = pycode_model.__dict__[name_concat + '_ii'] model.calls.ij[name_concat] = pycode_model.__dict__[name_concat + '_ij'] # load Jacobian functions for jname in model.calls.j_names: model.calls.j[jname] = pycode_model.__dict__.get(f'{jname}_update') def _store_calls(self, models: OrderedDict): """ Collect and store model calls into system. """ import andes system = self.system logger.debug("Collecting Model.calls into System.") system.calls['__version__'] = andes.__version__ for name, mdl in models.items(): system.calls[name] = mdl.calls
[docs]def import_pycode(user_pycode_path=None): """ Helper function to import generated pycode in the following priority: 1. a user-provided path from CLI. Currently, this is only for specifying the path to store the generated pycode via ``andes prepare``. 2. ``~/.andes/pycode``. This is where pycode is stored by default. 3. ``<andes_package_root>/pycode``. One can store pycode in the ANDES package folder and ship a full package, which does not require code generation. """ pycode_path = get_pycode_path(user_pycode_path, mkdir=False) sources = ( lambda: reload_submodules('pycode'), lambda: _import_pycode_from(pycode_path), lambda: reload_submodules('andes.pycode'), lambda: _import_pycode_from(os.path.join(andes_root(), 'pycode')), ) for source in sources: pycode = source() if pycode: return pycode return None
def _import_pycode_from(pycode_path): """ Helper function to load pycode from ``.andes``. """ module_path = os.path.join(pycode_path, '__init__.py') module_name = 'pycode' pycode = None if os.path.isfile(module_path): try: spec = importlib.util.spec_from_file_location(module_name, module_path) pycode = importlib.util.module_from_spec(spec) # NOQA sys.modules[spec.name] = pycode spec.loader.exec_module(pycode) logger.info('> Loaded generated Python code in "%s".', pycode_path) except ImportError: logger.debug('> Failed loading generated Python code in "%s".', pycode_path) return pycode
[docs]def reload_submodules(module_name): """ Helper function for reloading an existing module and its submodules. It is used to reload the ``pycode`` module after regenerating code. """ if module_name in sys.modules: pycode = sys.modules[module_name] for _, m in inspect.getmembers(pycode, inspect.ismodule): importlib.reload(m) logger.info('> Reloaded generated Python code of module "%s".', module_name) return pycode return None