Source code for pelicun.base

#
# Copyright (c) 2018 Leland Stanford Junior University
# Copyright (c) 2018 The Regents of the University of California
#
# This file is part of pelicun.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its contributors
# may be used to endorse or promote products derived from this software without
# specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# You should have received a copy of the BSD 3-Clause License along with
# pelicun. If not, see <http://www.opensource.org/licenses/>.
#
# Contributors:
# Adam Zsarnóczay
# John Vouvakis Manousakis


"""Constants, basic classes, and methods for pelicun."""

from __future__ import annotations

import argparse
import json
import pprint
import sys
import traceback
import warnings
from datetime import datetime, timezone
from pathlib import Path
from typing import TYPE_CHECKING, Any, ClassVar, Optional, TypeVar, overload

import colorama
import numpy as np
import pandas as pd
from colorama import Fore, Style
from scipy.interpolate import interp1d  # type: ignore

from pelicun.pelicun_warnings import PelicunWarning

if TYPE_CHECKING:
    from collections.abc import Callable
    from types import TracebackType

    from pelicun.assessment import AssessmentBase


colorama.init()
# set printing options
pp = pprint.PrettyPrinter(indent=2, width=80 - 24)

pd.options.display.max_rows = 20
pd.options.display.max_columns = None  # type: ignore
pd.options.display.expand_frame_repr = True
pd.options.display.width = 300

idx = pd.IndexSlice


T = TypeVar('T')


[docs] class Options: """ Analysis options and logging configuration. Attributes ---------- sampling_method: str Sampling method to use. Specified in the user's configuration dictionary, otherwise left as provided in the default configuration file (see settings/default_config.json in the pelicun source code). Can be any of ['LHS', 'LHS_midpoint', 'MonteCarlo']. The default is 'LHS'. units_file: str Location of a user-specified units file, which should contain the names of supported units and their conversion factors (the value some quantity of a given unit needs to be multiplied to be expressed in the base units). Value specified in the user configuration dictionary. Pelicun comes with a set of default units which are always loaded (see `settings/default_units.json` in the pelicun source code). Units specified in the units_file overwrite the default units. demand_offset: dict Demand offsets are used in the process of mapping a component location to its associated EDP. This allows components that are sensitive to EDPs of different levels to be specified as present at the same location (e.g. think of desktop computer and suspended ceiling, both at the same story). Each component's offset value is specified in the component fragility database. This setting applies a supplemental global offset to specific EDP types. The value is specified in the user's configuration dictionary, otherwise left as provided in the default configuration file (see settings/default_config.json in the pelicun source code). nondir_multi_dict: dict Nondirectional components are sensitive to demands coming in any direction. Results are typically available in two orthogonal directions. FEMA P-58 suggests using the formula `max(dir_1, dir_2) * 1.2` to estimate the demand for such components. This parameter allows modifying the 1.2 multiplier with a user-specified value. The change can be applied to "ALL" EDPs, or for specific EDPs, such as "PFA", "PFV", etc. The value is specified in the user's configuration dictionary, otherwise left as provided in the default configuration file (see settings/default_config.json in the pelicun source code). rho_cost_time: float Specifies the correlation between the repair cost and repair time consequences. The value is specified in the user's configuration dictionary, otherwise left as provided in the default configuration file (see "RepairCostAndTimeCorrelation") (see settings/default_config.json in the pelicun source code). eco_scale: dict Controls how the effects of economies of scale are handled in the damaged component quantity aggregation for loss measure estimation. The dictionary is specified in the user's configuration dictionary, otherwise left as provided in the default configuration file (see settings/default_config.json in the pelicun source code). log: Logger Logger object. Configuration parameters coming from the user's configuration dictionary or the default configuration file control logging behavior. See Logger class. """ __slots__ = [ '_asmnt', '_rng', '_seed', 'defaults', 'demand_offset', 'eco_scale', 'eco_scale', 'error_setup', 'error_setup', 'list_all_ds', 'log', 'log', 'nondir_multi_dict', 'rho_cost_time', 'sampling_method', 'units_file', ] def __init__( self, user_config_options: dict[str, Any] | None, assessment: AssessmentBase | None = None, ) -> None: """ Initialize an Options object. Parameters ---------- user_config_options: dict, Optional User-specified configuration dictionary. Any provided user_config_options override the defaults. assessment: AssessmentBase, Optional Assessment object that will be using this Options object. If it is not intended to use this Options object for an Assessment (e.g. defining an Options object for UQ use), this value should be None. """ self._asmnt = assessment self.defaults: dict[str, Any] | None = None self.sampling_method: str | None = None self.list_all_ds: bool | None = None merged_config_options = merge_default_config(user_config_options) self.seed = merged_config_options['Seed'] self.sampling_method = merged_config_options['Sampling']['SamplingMethod'] self.list_all_ds = merged_config_options['ListAllDamageStates'] self.units_file = merged_config_options['UnitsFile'] self.demand_offset = merged_config_options['DemandOffset'] self.nondir_multi_dict = merged_config_options['NonDirectionalMultipliers'] self.rho_cost_time = merged_config_options['RepairCostAndTimeCorrelation'] self.eco_scale = merged_config_options['EconomiesOfScale'] self.error_setup = merged_config_options['ErrorSetup'] # instantiate a Logger object with the finalized configuration self.log = Logger( merged_config_options['LogFile'], verbose=merged_config_options['Verbose'], log_show_ms=merged_config_options['LogShowMS'], print_log=merged_config_options['PrintLog'], ) @property def seed(self) -> float | None: """ Seed property. Returns ------- float Seed value """ return self._seed @seed.setter def seed(self, value: float) -> None: """Seed property setter.""" self._seed = value self._rng = np.random.default_rng(self._seed) # type: ignore @property def rng(self) -> np.random.Generator: """ rng property. Returns ------- Generator Random generator """ return self._rng
# Define a module-level LoggerRegistry
[docs] class LoggerRegistry: """Registry to manage all logger instances.""" _loggers: ClassVar[list[Logger]] = [] # The @classmethod decorator allows this method to be called on # the class itself, rather than on instances. It interacts with # class-level data (like _loggers), enabling a single registry for # all Logger instances without needing an object of LoggerRegistry # itself.
[docs] @classmethod def register(cls, logger: Logger) -> None: """Register a logger instance.""" cls._loggers.append(logger)
[docs] @classmethod def log_exception( cls, exc_type: type[BaseException], exc_value: BaseException, exc_traceback: TracebackType | None, ) -> None: """Log exceptions to all registered loggers.""" message = ( f"Unhandled exception occurred:" f"\n" f"{''.join(traceback.format_exception(exc_type, exc_value, exc_traceback))}" ) for logger in cls._loggers: logger.msg(message) # Also call the default excepthook to print the exception to # the console as is done by default. sys.__excepthook__(exc_type, exc_value, exc_traceback)
# Update sys.excepthook to log exceptions in all loggers # https://docs.python.org/3/library/sys.html#sys.excepthook sys.excepthook = LoggerRegistry.log_exception
[docs] class Logger: """Generate log files documenting execution events.""" __slots__ = [ 'emitted', 'log_div', 'log_file', 'log_show_ms', 'log_time_format', 'print_log', 'spaces', 'verbose', 'warning_file', 'warning_stack', ] def __init__( self, log_file: str | None, *, verbose: bool, log_show_ms: bool, print_log: bool, ) -> None: """ Initialize a Logger object. Parameters ---------- verbose: bool If True, the pelicun echoes more information throughout the assessment. This can be useful for debugging purposes. The value is specified in the user's configuration dictionary, otherwise left as provided in the default configuration file (see settings/default_config.json in the pelicun source code). log_show_ms: bool If True, the timestamps in the log file are in microsecond precision. The value is specified in the user's configuration dictionary, otherwise left as provided in the default configuration file (see settings/default_config.json in the pelicun source code). log_file: str, optional If a value is provided, the log is written to that file. The value is specified in the user's configuration dictionary, otherwise left as provided in the default configuration file (see settings/default_config.json in the pelicun source code). print_log: bool If True, the log is also printed to standard output. The value is specified in the user's configuration dictionary, otherwise left as provided in the default configuration file (see settings/default_config.json in the pelicun source code). """ self.verbose = verbose self.log_show_ms = bool(log_show_ms) if log_file is None: self.log_file = None self.warning_file = None else: path = Path(log_file) self.log_file = str(path.resolve()) name, extension = split_file_name(self.log_file) self.warning_file = ( path.parent / (name + '_warnings' + extension) ).resolve() with Path(self.log_file).open('w', encoding='utf-8') as f: f.write('') with Path(self.warning_file).open('w', encoding='utf-8') as f: f.write('') self.print_log = str2bool(print_log) self.warning_stack: list[str] = [] self.emitted: set[str] = set() self.reset_log_strings() control_warnings() # Register the logger to the LoggerRegistry in order to # capture raised exceptions. LoggerRegistry.register(self)
[docs] def reset_log_strings(self) -> None: """Populate the string-related attributes of the logger.""" if self.log_show_ms: self.log_time_format = '%H:%M:%S:%f' # the length of the time string in the log file self.spaces = ' ' * 16 # to have a total length of 80 with the time added self.log_div = '-' * (80 - 17) else: self.log_time_format = '%H:%M:%S' self.spaces = ' ' * 9 self.log_div = '-' * (80 - 10)
[docs] def msg( self, msg: str = '', *, prepend_timestamp: bool = True, prepend_blank_space: bool = True, ) -> None: """ Write a message in the log file with the current time as prefix. The time is in ISO-8601 format, e.g. 2018-06-16T20:24:04Z Parameters ---------- msg: string Message to print. prepend_timestamp: bool Controls whether a timestamp is placed before the message. prepend_blank_space: bool Controls whether blank space is placed before the message. """ msg_lines = msg.split('\n') for msg_i, msg_line in enumerate(msg_lines): if prepend_timestamp and (msg_i == 0): formatted_msg = ( f'{datetime.now().strftime(self.log_time_format)} {msg_line}' # noqa: DTZ005 ) elif prepend_timestamp or prepend_blank_space: formatted_msg = self.spaces + msg_line else: formatted_msg = msg_line if self.print_log: print(formatted_msg) # noqa: T201 if self.log_file is not None: with Path(self.log_file).open('a', encoding='utf-8') as f: f.write('\n' + formatted_msg)
[docs] def add_warning(self, msg: str) -> None: """ Add a warning to the warning stack. Notes ----- Warnings are only emitted when `emit_warnings` is called. Parameters ---------- msg: str The warning message. """ msg_lines = msg.split('\n') formatted_msg = '\n' for msg_line in msg_lines: formatted_msg += ( self.spaces + Fore.RED + msg_line + Style.RESET_ALL + '\n' ) if formatted_msg not in self.warning_stack: self.warning_stack.append(formatted_msg)
[docs] def emit_warnings(self) -> None: """Issues all warnings and clears the warning stack.""" for message in self.warning_stack: if message not in self.emitted: warnings.warn(message, PelicunWarning, stacklevel=3) if self.warning_file is not None: with Path(self.warning_file).open('a', encoding='utf-8') as f: f.write( message.replace(Fore.RED, '') .replace(Style.RESET_ALL, '') .replace(self.spaces, '') ) self.emitted = self.emitted.union(set(self.warning_stack)) self.warning_stack = []
[docs] def warning(self, msg: str) -> None: """ Add an emit a warning immediately. Parameters ---------- msg: str Warning message """ self.add_warning(msg) self.emit_warnings()
[docs] def div(self, *, prepend_timestamp: bool = False) -> None: """Add a divider line in the log file.""" msg = self.log_div if prepend_timestamp else '-' * 80 self.msg(msg, prepend_timestamp=prepend_timestamp)
[docs] def print_system_info(self) -> None: """Write system information in the log.""" self.msg( 'System Information:', prepend_timestamp=False, prepend_blank_space=False ) start = datetime.now().strftime('%Y-%m-%dT%H:%M:%S') # noqa: DTZ005 self.msg( f'local time zone: {datetime.now(timezone.utc).astimezone().tzinfo}\n' f'start time: {start}\n' f'python: {sys.version}\n' f'numpy: {np.__version__}\n' f'pandas: {pd.__version__}\n', prepend_timestamp=False, )
# get the absolute path of the pelicun directory pelicun_path = Path(__file__).resolve().parent
[docs] def split_file_name(file_path: str) -> tuple[str, str]: """ Separate a file name from the extension. Separates a file name from the extension accounting for the case where the file name itself contains periods. Parameters ---------- file_path: str Original file path. Returns ------- tuple name: str Name of the file. extension: str File extension. """ path = Path(file_path) name = path.stem extension = path.suffix return name, extension
[docs] def control_warnings() -> None: """ Turn warnings on/off. See also: `pelicun/pytest.ini`. Devs: make sure to update that file when addressing & eliminating warnings. """ if not sys.warnoptions: # Here we specify *specific* warnings to ignore. # 'message' -- a regex that the warning message must match # Note: we ignore known warnings emitted from our dependencies # and plan to address them soon. warnings.filterwarnings( action='ignore', message='.*Use to_numeric without passing `errors`.*' ) warnings.filterwarnings( action='ignore', message=".*errors='ignore' is deprecated.*" ) warnings.filterwarnings( action='ignore', message='.*The previous implementation of stack is deprecated.*', ) warnings.filterwarnings( action='ignore', message='.*Setting an item of incompatible dtype is deprecated.*', ) warnings.filterwarnings( action='ignore', message='.*DataFrame.groupby with axis=1 is deprecated.*', )
[docs] def load_default_options() -> dict: """ Load the default_config.json file to set options to default values. Returns ------- dict Default options """ with Path(pelicun_path / 'settings/default_config.json').open( encoding='utf-8' ) as f: default_config = json.load(f) return default_config['Options']
[docs] def update_vals( update_value: dict, primary: dict, update_path: str, primary_path: str ) -> None: """ Transfer values between nested dictionaries. Updates the values of the `update` nested dictionary with those provided in the `primary` nested dictionary. If a key already exists in update, and does not map to another dictionary, the value is left unchanged. Parameters ---------- update_value: dict Dictionary -which can contain nested dictionaries- to be updated based on the values of `primary`. New keys existing in `primary` are added to `update`. Values of which keys already exist in `primary` are left unchanged. primary: dict Dictionary -which can contain nested dictionaries- to be used to update the values of `update`. update_path: str Identifier for the update dictionary. Used to make error messages more meaningful. primary_path: str Identifier for the update dictionary. Used to make error messages more meaningful. Raises ------ ValueError If primary[key] is dict but update[key] is not. ValueError If update[key] is dict but primary[key] is not. """ # we go over the keys of `primary` for key in primary: # noqa: PLC0206 # if `primary[key]` is a dictionary: if isinstance(primary[key], dict): # if the same `key` does not exist in update, # we associate it with an empty dictionary. if key not in update_value: update_value[key] = {} # if it exists already, it should map to # a dictionary. elif not isinstance(update_value[key], dict): msg = ( f'{update_path}["{key}"] ' 'should map to a dictionary. ' 'The specified value is ' f'{update_path}["{key}"] = {update_value[key]}, but ' f'the default value is ' f'{primary_path}["{key}"] = {primary[key]}. ' f'Please revise {update_path}["{key}"].' ) raise ValueError(msg) # With both being dictionaries, we use recursion. update_vals( update_value[key], primary[key], f'{update_path}["{key}"]', f'{primary_path}["{key}"]', ) # if `primary[key]` is NOT a dictionary: elif key not in update_value: update_value[key] = primary[key] elif isinstance(update_value[key], dict): msg = ( f'{update_path}["{key}"] ' 'should not map to a dictionary. ' f'The specified value is ' f'{update_path}["{key}"] = {update_value[key]}, but ' f'the default value is ' f'{primary_path}["{key}"] = {primary[key]}. ' f'Please revise {update_path}["{key}"].' ) raise ValueError(msg)
[docs] def merge_default_config(user_config: dict | None) -> dict: """ Merge default config with user's options. Merge the user-specified config with the configuration defined in the default_config.json file. If the user-specified config does not include some option available in the default options, then the default option is used in the merged config. Parameters ---------- user_config: dict User-specified configuration dictionary Returns ------- dict Merged configuration dictionary """ config = user_config # start from the user's config default_config = load_default_options() if config is None: config = {} # We fill out the user's config with the values available in the # default config that were not set. # We use a recursive function to handle nesting. update_vals(config, default_config, 'user_settings', 'default_settings') return config
# https://stackoverflow.com/questions/52445559/ # how-can-i-type-hint-a-function-where-the- # return-type-depends-on-the-input-type-o @overload def convert_to_SimpleIndex( data: pd.DataFrame, axis: int = 0, *, inplace: bool = False ) -> pd.DataFrame: ... @overload def convert_to_SimpleIndex( data: pd.Series, axis: int = 0, *, inplace: bool = False ) -> pd.Series: ...
[docs] def convert_to_SimpleIndex( # noqa: N802 data: pd.DataFrame | pd.Series, axis: int = 0, *, inplace: bool = False ) -> pd.DataFrame | pd.Series: """ Convert the index of a DataFrame to a simple, one-level index. The target index uses standard SimCenter convention to identify different levels: a dash character ('-') is used to separate each level of the index. Parameters ---------- data: DataFrame The DataFrame that will be modified. axis: int, optional, default:0 Identifies if the index (0) or the columns (1) shall be edited. inplace: bool, optional, default:False If yes, the operation is performed directly on the input DataFrame and not on a copy of it. Returns ------- DataFrame The modified DataFrame Raises ------ ValueError When an invalid axis parameter is specified """ if axis in {0, 1}: data_mod = data if inplace else data.copy() if axis == 0: # only perform this if there are multiple levels if data.index.nlevels > 1: simple_name = '-'.join( [n if n is not None else '' for n in data.index.names] ) simple_index = [ '-'.join([str(id_i) for id_i in idx]) for idx in data.index ] data_mod.index = pd.Index(simple_index, name=simple_name) data_mod.index.name = simple_name elif axis == 1: # only perform this if there are multiple levels if data.columns.nlevels > 1: simple_name = '-'.join( [n if n is not None else '' for n in data.columns.names] ) simple_index = [ '-'.join([str(id_i) for id_i in idx]) for idx in data.columns ] data_mod.columns = pd.Index(simple_index, name=simple_name) data_mod.columns.name = simple_name else: msg = f'Invalid axis parameter: {axis}' raise ValueError(msg) return data_mod
@overload def convert_to_MultiIndex( data: pd.DataFrame, axis: int = 0, *, inplace: bool = False ) -> pd.DataFrame: ... @overload def convert_to_MultiIndex( data: pd.Series, axis: int = 0, *, inplace: bool = False ) -> pd.Series: ...
[docs] def convert_to_MultiIndex( # noqa: N802 data: pd.DataFrame | pd.Series, axis: int = 0, *, inplace: bool = False ) -> pd.DataFrame | pd.Series: """ Convert the index of a DataFrame to a MultiIndex. We assume that the index uses standard SimCenter convention to identify different levels: a dash character ('-') is expected to separate each level of the index. Parameters ---------- data: DataFrame The DataFrame that will be modified. axis: int, optional, default:0 Identifies if the index (0) or the columns (1) shall be edited. inplace: bool, optional, default:False If yes, the operation is performed directly on the input DataFrame and not on a copy of it. Returns ------- DataFrame The modified DataFrame. Raises ------ ValueError If an invalid axis is specified. """ # check if the requested axis is already a MultiIndex if ((axis == 0) and (isinstance(data.index, pd.MultiIndex))) or ( (axis == 1) and (isinstance(data.columns, pd.MultiIndex)) ): # if yes, return the data unchanged return data if axis == 0: index_labels = [str(label).split('-') for label in data.index] elif axis == 1: index_labels = [str(label).split('-') for label in data.columns] else: msg = f'Invalid axis parameter: {axis}' raise ValueError(msg) max_lbl_len = np.max([len(labels) for labels in index_labels]) for l_i, labels in enumerate(index_labels): if len(labels) != max_lbl_len: labels += [''] * (max_lbl_len - len(labels)) # noqa: PLW2901 index_labels[l_i] = labels index_labels_np = np.array(index_labels) if index_labels_np.shape[1] > 1: data_mod = data if inplace else data.copy() if axis == 0: data_mod.index = pd.MultiIndex.from_arrays(index_labels_np.T) else: data_mod.columns = pd.MultiIndex.from_arrays(index_labels_np.T) return data_mod return data
[docs] def convert_dtypes(dataframe: pd.DataFrame) -> pd.DataFrame: """ Convert columns to a numeric datatype whenever possible. The function replaces None with NA otherwise columns containing None would continue to have the `object` type. Parameters ---------- dataframe: DataFrame The DataFrame that will be modified. Returns ------- DataFrame The modified DataFrame. """ with ( pd.option_context('future.no_silent_downcasting', True), # noqa: FBT003 pd.option_context('mode.copy_on_write', True), # noqa: FBT003 ): dataframe = dataframe.fillna(value=np.nan).infer_objects() # note: `axis=0` applies the function to the columns # note: ignoring errors is a bad idea and should never be done. In # this case, however, that's not what we do, despite the name of # this parameter. We simply don't convert the dtype of columns # that cannot be interpreted as numeric. That's what # `errors='ignore'` does. # See: # https://pandas.pydata.org/docs/reference/api/pandas.to_numeric.html return dataframe.apply( lambda x: pd.to_numeric(x, errors='ignore'), # type:ignore axis=0, )
[docs] def show_matrix( data: np.ndarray | pd.DataFrame, *, use_describe: bool = False ) -> None: """ Print a matrix in a nice way using a DataFrame. Parameters ---------- data: array-like The matrix data to display. Can be any array-like structure that pandas can convert to a DataFrame. use_describe: bool, default: False If True, provides a descriptive statistical summary of the matrix including specified percentiles. If False, simply prints the matrix as is. """ if use_describe: pp.pprint( pd.DataFrame(data).describe(percentiles=[0.01, 0.1, 0.5, 0.9, 0.99]) ) else: pp.pprint(pd.DataFrame(data))
[docs] def multiply_factor_multiple_levels( df: pd.DataFrame, conditions: dict, factor: float, axis: int = 0, *, raise_missing: bool = True, ) -> None: """ Multiply a value to selected rows, in place. Multiplies a value to selected rows of a DataFrame that is indexed with a hierarchical index (pd.MultiIndex). The change is done in place. Parameters ---------- df: pd.DataFrame The DataFrame to be modified. conditions: dict A dictionary mapping level names with a single value. Only the rows where the index levels have the provided values will be affected. The dictionary can be empty, in which case all rows will be affected, or contain only some levels and values, in which case only the matching rows will be affected. factor: float Scaling factor to use. axis: int With 0 the condition is checked against the DataFrame's index, otherwise with 1 it is checked against the DataFrame's columns. raise_missing: bool Raise an error if no rows are matching the given conditions. Raises ------ ValueError If the provided `axis` values is not either 0 or 1. ValueError If there are no rows matching the conditions and raise_missing is True. """ if axis == 0: idx_to_use = df.index elif axis == 1: idx_to_use = df.columns else: msg = f'Invalid axis: `{axis}`' raise ValueError(msg) mask = pd.Series(data=True, index=idx_to_use) # Apply each condition to update the mask for level, value in conditions.items(): mask &= idx_to_use.get_level_values(level) == value if np.all(mask == False) and raise_missing: # noqa: E712 msg = f'No rows found matching the conditions: `{conditions}`' raise ValueError(msg) if axis == 0: df.iloc[mask.to_numpy()] *= factor else: df.iloc[:, mask.to_numpy()] *= factor
def _warning( message: str, category: type[Warning], filename: str, lineno: int, file: Any = None, # noqa: ARG001, ANN401 line: Any = None, # noqa: ARG001, ANN401 ) -> None: """ Display warnings in a custom format. Custom warning function to format and print warnings more attractively. This function modifies how warning messages are displayed, emphasizing the file path and line number from where the warning originated. Parameters ---------- message: str The warning message to be displayed. category: Warning The category of the warning (unused, but required for compatibility with standard warning signature). filename: str The path of the file from which the warning is issued. The function simplifies the path for display. lineno: int The line number in the file at which the warning is issued. file: file-like object, optional The target file object to write the warning to (unused, but required for compatibility with standard warning signature). line: str, optional Line of code causing the warning (unused, but required for compatibility with standard warning signature). """ # pylint:disable = unused-argument if category != PelicunWarning: if '\\' in filename: file_path = filename.split('\\') elif '/' in filename: file_path = filename.split('/') else: file_path = None python_file = '/'.join(file_path[-3:]) if file_path is not None else filename print(f'WARNING in {python_file} at line {lineno}\n{message}\n') # noqa: T201 else: print(message) # noqa: T201 warnings.showwarning = _warning # type: ignore
[docs] def describe( data: pd.DataFrame | pd.Series | np.ndarray, percentiles: tuple[float, ...] = ( 0.001, 0.023, 0.10, 0.159, 0.5, 0.841, 0.90, 0.977, 0.999, ), ) -> pd.DataFrame: """ Extend descriptive statistics. Provides extended descriptive statistics for given data, including percentiles and log standard deviation for applicable columns. This function accepts both pandas Series and DataFrame objects directly, or any array-like structure which can be converted to them. It calculates common descriptive statistics and optionally adds log standard deviation for columns where all values are positive. Parameters ---------- data: pd.Series, pd.DataFrame, or array-like The data to describe. If array-like, it is converted to a DataFrame or Series before analysis. percentiles: tuple of float, optional Specific percentiles to include in the output. Default includes an extensive range tailored to provide a detailed summary. Returns ------- pd.DataFrame A DataFrame containing the descriptive statistics of the input data, transposed so that each descriptive statistic is a row. """ if isinstance(data, np.ndarray): vals = data if vals.ndim == 1: data = pd.Series(vals, name=0) else: cols = np.arange(vals.shape[1]) data = pd.DataFrame(vals, columns=cols) # convert Series to a DataFrame if isinstance(data, pd.Series): data = pd.DataFrame(data) desc = pd.DataFrame(data.describe(list(percentiles)).T) # add log standard deviation to the stats desc.insert(3, 'log_std', np.nan) desc = desc.T for col in desc.columns: if np.min(data[col]) > 0.0: desc.loc['log_std', col] = np.std(np.log(data[col]), ddof=1) return desc
[docs] def str2bool(v: str | bool) -> bool: # noqa: FBT001 """ Convert a string representation of truth to boolean True or False. This function is designed to convert string inputs that represent boolean values into actual Python boolean types. It handles typical representations of truthiness and falsiness, and is case insensitive. Parameters ---------- v: str or bool The value to convert into a boolean. This can be a boolean itself (in which case it is simply returned) or a string that is expected to represent a boolean value. Returns ------- bool The boolean value corresponding to the input. Raises ------ argparse.ArgumentTypeError If `v` is a string that does not correspond to a boolean value, an error is raised indicating that a boolean value was expected. """ # courtesy of Maxim @ Stackoverflow if isinstance(v, bool): return v if v.lower() in {'yes', 'true', 'True', 't', 'y', '1'}: return True if v.lower() in {'no', 'false', 'False', 'f', 'n', '0'}: return False msg = 'Boolean value expected.' raise argparse.ArgumentTypeError(msg)
[docs] def float_or_None(string: str) -> float | None: # noqa: N802 """ Convert strings to float or None. Parameters ---------- string: str A string Returns ------- float or None A float, if the given string can be converted to a float. Otherwise, it returns None """ try: return float(string) except ValueError: return None
[docs] def int_or_None(string: str) -> int | None: # noqa: N802 """ Convert strings to int or None. Parameters ---------- string: str A string Returns ------- int or None An int, if the given string can be converted to an int. Otherwise, it returns None """ try: return int(string) except ValueError: return None
[docs] def check_if_str_is_na(string: Any) -> bool: # noqa: ANN401 """ Check if the provided string can be interpreted as N/A. Parameters ---------- string: object The string to evaluate Returns ------- bool The evaluation result. Yes, if the string is considered N/A. """ na_vals = { '', 'N/A', '-1.#QNAN', 'null', 'None', '<NA>', 'nan', '-NaN', '1.#IND', 'NaN', '#NA', '1.#QNAN', 'NULL', '-nan', '#N/A', '#N/A N/A', 'n/a', '-1.#IND', 'NA', } # obtained from Pandas' internal STR_NA_VALUES variable. return isinstance(string, str) and string in na_vals
[docs] def with_parsed_str_na_values(df: pd.DataFrame) -> pd.DataFrame: """ Identify string values interpretable as N/A. Given a dataframe, this function identifies values that have string type and can be interpreted as N/A, and replaces them with actual NA's. Parameters ---------- df: pd.DataFrame Dataframe to process Returns ------- pd.DataFrame The dataframe with proper N/A values. """ # Replace string NA values with actual NaNs return df.apply( lambda col: col.map(lambda x: np.nan if check_if_str_is_na(x) else x) )
[docs] def dedupe_index(dataframe: pd.DataFrame, dtype: type = str) -> pd.DataFrame: """ Add a `uid` level to the index. Modifies the index of a DataFrame to ensure all index elements are unique by adding an extra level. Assumes that the DataFrame's original index is a MultiIndex with specified names. A unique identifier ('uid') is added as an additional index level based on the cumulative count of occurrences of the original index combinations. Parameters ---------- dataframe: pd.DataFrame The DataFrame whose index is to be modified. It must have a MultiIndex. dtype: type, optional The data type for the new index level 'uid'. Defaults to str. Returns ------- dataframe: pd.DataFrame The original dataframe with an additional `uid` level at the index. """ inames = dataframe.index.names dataframe = dataframe.reset_index() dataframe['uid'] = (dataframe.groupby([*inames]).cumcount()).astype(dtype) dataframe = dataframe.set_index([*inames, 'uid']) return dataframe.sort_index()
# Input specs EDP_to_demand_type = { # Drifts 'Story Drift Ratio': 'PID', 'Peak Interstory Drift Ratio': 'PID', 'Roof Drift Ratio': 'PRD', 'Peak Roof Drift Ratio': 'PRD', 'Damageable Wall Drift': 'DWD', 'Racking Drift Ratio': 'RDR', 'Mega Drift Ratio': 'PMD', 'Residual Drift Ratio': 'RID', 'Residual Interstory Drift Ratio': 'RID', 'Peak Effective Drift Ratio': 'EDR', # Floor response 'Peak Floor Acceleration': 'PFA', 'Peak Floor Velocity': 'PFV', 'Peak Floor Displacement': 'PFD', # Component response 'Peak Link Rotation Angle': 'LR', 'Peak Link Beam Chord Rotation': 'LBR', # Wind Intensity 'Peak Gust Wind Speed': 'PWS', # Wind Demands 'Peak Wind Force': 'PWF', 'Peak Internal Force': 'PIF', 'Peak Line Force': 'PLF', 'Peak Wind Pressure': 'PWP', # Inundation Intensity 'Peak Inundation Height': 'PIH', # Shaking Intensity 'Peak Ground Acceleration': 'PGA', 'Peak Ground Velocity': 'PGV', 'Spectral Acceleration': 'SA', 'Spectral Velocity': 'SV', 'Spectral Displacement': 'SD', 'Peak Spectral Acceleration': 'SA', 'Peak Spectral Velocity': 'SV', 'Peak Spectral Displacement': 'SD', 'Permanent Ground Deformation': 'PGD', # Placeholder for advanced calculations 'One': 'ONE', }
[docs] def dict_raise_on_duplicates(ordered_pairs: list[tuple]) -> dict: """ Construct a dictionary from a list of key-value pairs. Constructs a dictionary from a list of key-value pairs, raising an exception if duplicate keys are found. This function ensures that no two pairs have the same key. It is particularly useful when parsing JSON-like data where unique keys are expected but not enforced by standard parsing methods. Parameters ---------- ordered_pairs: list of tuples A list of tuples, each containing a key and a value. Keys are expected to be unique across the list. Returns ------- dict A dictionary constructed from the ordered_pairs without any duplicates. Raises ------ ValueError If a duplicate key is found in the input list, a ValueError is raised with a message indicating the duplicate key. Examples -------- >>> dict_raise_on_duplicates( ... [("key1", "value1"), ("key2", "value2"), ("key1", "value3")] ... ) ValueError: duplicate key: key1 Notes ----- This implementation is useful for contexts in which data integrity is crucial and key uniqueness must be ensured. """ d = {} for k, v in ordered_pairs: if k in d: msg = f'duplicate key: {k}' raise ValueError(msg) d[k] = v return d
[docs] def parse_units( # noqa: C901 custom_file: str | None = None, *, preserve_categories: bool = False ) -> dict: """ Parse the unit conversion factor JSON file and return a dictionary. Parameters ---------- custom_file: str, optional If a custom file is provided, only the units specified in the custom file are used. preserve_categories: bool, optional If True, maintains the original data types of category values from the JSON file. If False, converts all values to floats and flattens the dictionary structure, ensuring that each unit name is globally unique across categories. Returns ------- dict A dictionary where keys are unit names and values are their corresponding conversion factors. If `preserve_categories` is True, the dictionary may maintain its original nested structure based on the JSON file. If `preserve_categories` is False, the dictionary is flattened to have globally unique unit names. """ def get_contents(file_path: Path, *, preserve_categories: bool = False) -> dict: # noqa: C901 """ Map unit names to conversion factors. Parses a unit conversion factors JSON file and returns a dictionary mapping unit names to conversion factors. This function allows the use of a custom JSON file for defining unit conversion factors or defaults to a predefined file. It ensures that each unit name is unique and that all conversion factors are float values. Additionally, it supports the option to preserve the original data types of category values from the JSON. Parameters ---------- file_path: str The file path to a JSON file containing unit conversion factors. If not provided, a default file is used. preserve_categories: bool, optional If True, maintains the original data types of category values from the JSON file. If False, converts all values to floats and flattens the dictionary structure, ensuring that each unit name is globally unique across categories. Returns ------- dict A dictionary where keys are unit names and values are their corresponding conversion factors. If `preserve_categories` is True, the dictionary may maintain its original nested structure based on the JSON file. Raises ------ FileNotFoundError If the specified file does not exist. ValueError If a unit name is duplicated or other JSON structure issues are present. TypeError If a conversion factor is not a float. TypeError If any value that needs to be converted to float cannot be converted. Examples -------- >>> parse_units('custom_units.json') { 'm': 1.0, 'cm': 0.01, 'mm': 0.001 } >>> parse_units('custom_units.json', preserve_categories=True) { 'Length': {'m': 1.0, 'cm': 0.01, 'mm': 0.001} } """ try: with Path(file_path).open(encoding='utf-8') as f: dictionary = json.load(f, object_pairs_hook=dict_raise_on_duplicates) except FileNotFoundError as exc: msg = f'{file_path} was not found.' raise FileNotFoundError(msg) from exc except json.decoder.JSONDecodeError as exc: msg = f'{file_path} is not a valid JSON file.' raise ValueError(msg) from exc for category_dict in list(dictionary.values()): # ensure all first-level keys point to a dictionary if not isinstance(category_dict, dict): msg = ( f'{file_path} contains first-level keys ' "that don't point to a dictionary" ) raise TypeError(msg) # convert values to float try: for key, val in category_dict.items(): category_dict[key] = float(val) except (ValueError, TypeError) as exc: msg = ( f'Unit {key} has a value of {val} ' 'which cannot be interpreted as a float' ) raise type(exc)(msg) from exc if preserve_categories: return dictionary flattened = {} for category in dictionary: for unit_name, factor in dictionary[category].items(): if unit_name in flattened: msg = f'{unit_name} defined twice in {file_path}.' raise ValueError(msg) flattened[unit_name] = factor return flattened if custom_file: return get_contents( Path(custom_file), preserve_categories=preserve_categories ) return get_contents( pelicun_path / 'settings/default_units.json', preserve_categories=preserve_categories, )
[docs] def convert_units( # noqa: C901 values: float | list[float] | np.ndarray, unit: str, to_unit: str, category: str | None = None, ) -> float | list[float] | np.ndarray: """ Convert numeric values between different units. Supports conversion within a specified category of units and automatically infers the category if not explicitly provided. It maintains the type of the input in the output. Parameters ---------- values: (float | list[float] | np.ndarray) The numeric value(s) to convert. unit: (str) The current unit of the values. to_unit: (str) The target unit to convert the values into. category: (Optional[str]) The category of the units (e.g., 'length', 'pressure'). If not provided, the category will be inferred based on the provided units. Returns ------- float or list[float] or np.ndarray The converted value(s) in the target unit, in the same data type as the input values. Raises ------ TypeError If the input `values` are not of type float, list, or np.ndarray. ValueError If the `unit`, `to_unit`, or `category` is unknown or if `unit` and `to_unit` are not in the same category. """ if isinstance(values, (float, list)): vals = np.atleast_1d(values) elif isinstance(values, np.ndarray): vals = values else: msg = 'Invalid input type for `values`' raise TypeError(msg) # load default units all_units = parse_units(preserve_categories=True) # if a category is given use it, otherwise try to determine it if category: if category not in all_units: msg = f'Unknown category: `{category}`' raise ValueError(msg) units = all_units[category] for unt in unit, to_unit: if unt not in units: msg = f'Unknown unit: `{unt}`' raise ValueError(msg) else: unit_category: str | None = None for key in all_units: units = all_units[key] if unit in units: unit_category = key break if not unit_category: msg = f'Unknown unit `{unit}`' raise ValueError(msg) units = all_units[unit_category] if to_unit not in units: msg = ( f'`{unit}` is a `{unit_category}` unit, but `{to_unit}` ' f'is not specified in that category.' ) raise ValueError(msg) # convert units from_factor = units[unit] to_factor = units[to_unit] new_values = vals * float(from_factor) / float(to_factor) # return the results in the same type as that of the provided # values if isinstance(values, float): return new_values[0] if isinstance(values, list): return new_values.tolist() return new_values
[docs] def stringterpolation( arguments: str, ) -> Callable[[np.ndarray], np.ndarray]: """ Linear interpolation from strings. Turns a string of specially formatted arguments into a multilinear interpolating function. Parameters ---------- arguments: str String of arguments containing Y values and X values, separated by a pipe symbol (`|`). Individual values are separated by commas (`,`). Example: arguments = 'y1,y2,y3|x1,x2,x3' Returns ------- Callable A callable interpolating function """ split = arguments.split('|') x_vals = split[1].split(',') y_vals = split[0].split(',') x = np.array(x_vals, dtype=float) y = np.array(y_vals, dtype=float) return interp1d(x=x, y=y, kind='linear')
[docs] def invert_mapping(original_dict: dict) -> dict: """ Inverts a dictionary mapping from key to list of values. Parameters ---------- original_dict: dict Dictionary with values that are lists of hashable items. Returns ------- dict New dictionary where each item in the original value lists becomes a key and the original key becomes the corresponding value. Raises ------ ValueError If any value in the original dictionary's value lists appears more than once. """ inverted_dict = {} for key, value_list in original_dict.items(): for value in value_list: if value in inverted_dict: msg = 'Cannot invert mapping with duplicate values.' raise ValueError(msg) inverted_dict[value] = key return inverted_dict
[docs] def get( d: dict | None, path: str, default: Any | None = None, # noqa: ANN401 ) -> Any: # noqa: ANN401 """ Path-like dictionary value retrieval. Retrieves a value from a nested dictionary using a path with '/' as the separator. Parameters ---------- d: dict The dictionary to search. path: str The path to the desired value, with keys separated by '/'. default: Any, optional The value to return if the path is not found. Defaults to None. Returns ------- Any The value found at the specified path, or the default value if the path is not found. Examples -------- >>> config = { ... "DL": { ... "Outputs": { ... "Format": { ... "JSON": "desired_value" ... } ... } ... } ... } >>> get(config, '/DL/Outputs/Format/JSON', default='default_value') 'desired_value' >>> get(config, '/DL/Outputs/Format/XML', default='default_value') 'default_value' """ if d is None: return default keys = path.strip('/').split('/') current_dict = d try: for key in keys: current_dict = current_dict[key] return current_dict # noqa: TRY300 except (KeyError, TypeError): return default
[docs] def update( d: dict[str, Any], path: str, value: Any, # noqa: ANN401 *, only_if_empty_or_none: bool = False, ) -> None: """ Set a value in a nested dictionary using a path with '/' as the separator. Parameters ---------- d: dict The dictionary to update. path: str The path to the desired value, with keys separated by '/'. value: Any The value to set at the specified path. only_if_empty_or_none: bool, optional If True, only update the value if it is None or an empty dictionary. Defaults to False. Examples -------- >>> d = {} >>> update(d, 'x/y/z', 1) >>> d {'x': {'y': {'z': 1}}} >>> update(d, 'x/y/z', 2, only_if_empty_or_none=True) >>> d {'x': {'y': {'z': 1}}} # value remains 1 since it is not empty or None >>> update(d, 'x/y/z', 2) >>> d {'x': {'y': {'z': 2}}} # value is updated to 2 """ keys = path.strip('/').split('/') current_dict = d for key in keys[:-1]: if key not in current_dict or not isinstance(current_dict[key], dict): current_dict[key] = {} current_dict = current_dict[key] if only_if_empty_or_none: if is_unspecified(current_dict, keys[-1]): current_dict[keys[-1]] = value else: current_dict[keys[-1]] = value
[docs] def is_unspecified(d: dict[str, Any], path: str) -> bool: """ Check if something is specified. Checks if a value in a nested dictionary is either non-existent, None, NaN, or an empty dictionary or list. Parameters ---------- d: dict The dictionary to search. path: str The path to the desired value, with keys separated by '/'. Returns ------- bool True if the value is non-existent, None, or an empty dictionary or list. False otherwise. Examples -------- >>> config = { ... "DL": { ... "Outputs": { ... "Format": { ... "JSON": "desired_value", ... "EmptyDict": {} ... } ... } ... } ... } >>> is_unspecified(config, '/DL/Outputs/Format/JSON') False >>> is_unspecified(config, '/DL/Outputs/Format/XML') True >>> is_unspecified(config, '/DL/Outputs/Format/EmptyDict') True """ value = get(d, path, default=None) if value is None: return True if pd.isna(value): return True if value == {}: return True return value == []
[docs] def is_specified(d: dict[str, Any], path: str) -> bool: """ Opposite of `is_unspecified()`. Parameters ---------- d: dict The dictionary to search. path: str The path to the desired value, with keys separated by '/'. Returns ------- bool True if the value is specified, False otherwise. """ return not is_unspecified(d, path)
[docs] def ensure_value(value: T | None) -> T: """ Ensure a variable is not None. This function checks that the provided variable is not None. It is used to assist with type hinting by avoiding repetitive `assert value is not None` statements throughout the code. Parameters ---------- value : Optional[T] The variable to check, which can be of any type or None. Returns ------- T The same variable, guaranteed to be non-None. Raises ------ TypeError If the provided variable is None. """ if value is None: raise TypeError return value