#
# Copyright (c) 2018 Leland Stanford Junior University
# Copyright (c) 2018 The Regents of the University of California
#
# This file is part of pelicun.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its contributors
# may be used to endorse or promote products derived from this software without
# specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# You should have received a copy of the BSD 3-Clause License along with
# pelicun. If not, see <http://www.opensource.org/licenses/>.
#
# Contributors:
# Adam Zsarnóczay
# John Vouvakis Manousakis
"""Loss model objects and associated methods."""
from __future__ import annotations
from abc import ABC, abstractmethod
from collections import defaultdict
from itertools import product
from pathlib import Path
from typing import TYPE_CHECKING, Any, overload
import numpy as np
import pandas as pd
from scipy.interpolate import RegularGridInterpolator
from pelicun import base, file_io, uq
from pelicun.model.demand_model import (
_assemble_required_demand_data,
_get_required_demand_type,
_verify_edps_available,
)
from pelicun.model.pelicun_model import PelicunModel
from pelicun.pelicun_warnings import InconsistentUnitsError
if TYPE_CHECKING:
from collections.abc import Callable
from pelicun.assessment import AssessmentBase
idx = base.idx
[docs]
class LossModel(PelicunModel):
"""
Manages loss information used in assessments.
Contains a loss model for components with Damage States (DS) and
one for components with Loss Functions (LF).
"""
__slots__ = ['ds_model', 'dv_units', 'lf_model']
def __init__(
self,
assessment: AssessmentBase,
decision_variables: tuple[str, ...] = ('Cost', 'Time'),
dv_units: dict[str, str] | None = None,
) -> None:
"""
Initialize LossModel objects.
Parameters
----------
assessment: pelicun.AssessmentBase
Parent assessment
decision_variables: tuple
Defines the decision variables to be included in the loss
calculations. Defaults to those supported, but fewer can be
used if desired. When fewer are used, the loss parameters for
those not used will not be required.
"""
super().__init__(assessment)
self.ds_model: RepairModel_DS = RepairModel_DS(assessment)
self.lf_model: RepairModel_LF = RepairModel_LF(assessment)
self._loss_map = None
self.decision_variables = decision_variables
self.dv_units = dv_units
@property
def sample(self) -> pd.DataFrame | None:
"""
Combines the samples of the ds_model and lf_model sub-models.
Returns
-------
pd.DataFrame
The combined loss sample.
"""
# Handle `None` cases
if self.ds_model.sample is None and self.lf_model.sample is None:
return None
if self.ds_model.sample is None:
return self.lf_model.sample
if self.lf_model.sample is None:
return self.ds_model.sample
# If both are not None, combine
ds_model_levels = self.ds_model.sample.columns.names
# add a `ds` level to the lf_model sample
new_index = self.lf_model.sample.columns.to_frame(index=False)
# add
new_index['ds'] = 'N/A'
# reorder
new_index = new_index[ds_model_levels]
new_multiindex = pd.MultiIndex.from_frame(new_index)
self.lf_model.sample.columns = new_multiindex
return pd.concat((self.ds_model.sample, self.lf_model.sample), axis=1)
@property
def decision_variables(self) -> tuple[str, ...]:
"""
Retrieve the decision variables.
Returns
-------
tuple
Decision variables.
"""
# pick the object from one of the models
# it's the same for the other(s).
return self.ds_model.decision_variables
@decision_variables.setter
def decision_variables(self, decision_variables: tuple[str, ...]) -> None:
"""
Set the decision variables.
Supported: {`Cost`, `Time`, `Energy`, `Carbon`}.
Could also be any other string, as long as the provided loss
parameters contain that decision variable.
"""
# assign the same DVs to the included loss models.
for model in self._loss_models:
model.decision_variables = decision_variables
[docs]
def add_loss_map(
self,
loss_map_path: str | pd.DataFrame | None = None,
loss_map_policy: str | None = None,
) -> None:
"""
Add a loss map to the loss model.
A loss map defines what loss parameter definition should be
used for each component ID in the asset model.
Parameters
----------
loss_map_path: str or pd.DataFrame or None
Path to a csv file or DataFrame object that maps
components IDs to their loss parameter definitions.
loss_map_policy: str or None
If None, does not modify the loss map.
If set to `fill`, each component ID that is present in
the asset model but not in the loss map is mapped to
itself, but `excessiveRID` is excluded.
If set to `fill_all`, each component ID that is present in
the asset model but not in the loss map is mapped to
itself without exceptions.
Raises
------
ValueError
If both arguments are None.
"""
self.log.msg('Loading loss map...')
# If no loss map is provided and no default is requested,
# there is no loss map and we can't proceed.
if loss_map_path is None and loss_map_policy is None:
msg = 'Please provide a loss map and/or a loss map extension policy.'
raise ValueError(msg)
# get a list of unique component IDs
cmp_set = set(self._asmnt.asset.list_unique_component_ids())
if loss_map_path is not None:
self.log.msg('Loss map is provided.', prepend_timestamp=False)
# Read the loss map into a variable
loss_map = file_io.load_data(
loss_map_path,
None,
orientation=1,
reindex=False,
log=self._asmnt.log,
)
assert isinstance(loss_map, pd.DataFrame)
# <backwards compatibility>
if np.any(['DMG' in x for x in loss_map.index]): # type: ignore
self.log.warning(
'The `DMG-` flag in the loss_map index is deprecated '
'and no longer necessary. '
'Please do not prepend `DMG-` before the component '
'names in the loss map.'
)
loss_map.index = pd.Index([x[1] for x in loss_map.index])
else:
self.log.msg('Using default loss map.', prepend_timestamp=False)
# Instantiate an empty loss map.
loss_map = pd.DataFrame({'Repair': pd.Series(dtype='object')})
loss_map.index = loss_map.index.astype('object')
if loss_map_policy in {'fill', 'fill_all'}:
# Populate missing rows with cmp_id -> cmp_id
for component in cmp_set:
if component not in loss_map.index:
if loss_map_policy == 'fill' and component == 'excessiveRID':
continue
loss_map.loc[component, :] = component
elif loss_map_policy is None:
# Don't do anything.
pass
# TODO(AZ): add other loss map policies.
else:
msg = f'Unknown loss map policy: `{loss_map_policy}`.'
raise ValueError(msg)
# Assign the loss map to the available loss models
self._loss_map = loss_map
self.log.msg('Loss map loaded successfully.', prepend_timestamp=True)
[docs]
def load_model(
self,
data_paths: list[str | pd.DataFrame],
loss_map: str | pd.DataFrame,
decision_variables: tuple[str, ...] | None = None,
) -> None:
"""<backwards compatibility>."""
self.log.warning(
'`load_model` is deprecated and will be dropped in '
'future versions of pelicun. '
'Please use `load_model_parameters` instead.'
)
self.add_loss_map(loss_map)
self.load_model_parameters(data_paths, decision_variables)
[docs]
def load_model_parameters(
self,
data_paths: list[str | pd.DataFrame],
decision_variables: tuple[str, ...] | None = None,
) -> None:
"""
Load loss model parameters.
Parameters
----------
data_paths: list of (string | DataFrame)
List of paths to data or files with loss model
information. Default XY datasets can be accessed as
PelicunDefault/XY. Order matters. Parameters defined in
prior elements in the list take precedence over the same
parameters in subsequent data paths. I.e., place the
Default datasets in the back.
decision_variables: tuple
Defines the decision variables to be included in the loss
calculations. Defaults to those supported, but fewer can be
used if desired. When fewer are used, the loss parameters for
those not used will not be required.
"""
if decision_variables is not None:
# <backwards compatibility>
self.decision_variables = decision_variables
self.log.warning(
'The `decision_variables` argument has been removed. '
'Please set your desired decision variables like so: '
'{assessment object}.loss.decision_variables '
"= ('dv1', 'dv2', ...) before calling "
'{assessment object}.add_loss_map().'
)
self.log.div()
self.log.msg('Loading loss parameters...')
# replace `PelicunDefault/` flag with default data path
data_paths = file_io.substitute_default_path(data_paths)
#
# load loss parameter data into the models
#
for data_path in data_paths:
self._load_from_data_path(data_path)
self.log.msg(
'Loss model parameters loaded successfully.', prepend_timestamp=False
)
#
# remove items
#
self.log.msg(
'Removing unused loss model parameters.', prepend_timestamp=False
)
assert self._loss_map is not None
for loss_model in self._loss_models:
# drop unused loss parameter definitions
loss_model.drop_unused_loss_parameters(self._loss_map)
# remove components with incomplete loss parameters
loss_model.remove_incomplete_components()
# drop unused damage state columns
self.ds_model.drop_unused_damage_states()
#
# obtain DV units
#
dv_units: dict = {}
if self.ds_model.loss_params is not None:
dv_units.update(
self.ds_model.loss_params['DV', 'Unit']
.groupby(level=[1])
.first()
.to_dict()
)
if self.lf_model.loss_params is not None:
dv_units.update(
self.lf_model.loss_params['DV', 'Unit']
.groupby(level=[1])
.first()
.to_dict()
)
self.dv_units = dv_units
#
# convert units
#
self.log.msg(
'Converting loss model parameter units.', prepend_timestamp=False
)
for loss_model in self._loss_models:
loss_model.convert_loss_parameter_units()
#
# verify loss parameter availability
#
self.log.msg(
'Checking loss model parameter '
'availability for all components in the asset model.',
prepend_timestamp=False,
)
self._ensure_loss_parameter_availability()
def _load_from_data_path(self, data_path: str | pd.DataFrame) -> None:
if 'bldg_repair_DB' in data_path:
data_path = data_path.replace('bldg_repair_DB', 'loss_repair_DB')
self.log.warning(
'`bldg_repair_DB` is deprecated and will '
'be dropped in future versions of pelicun. '
'Please use `loss_repair_DB` instead.'
)
data = file_io.load_data(
data_path, None, orientation=1, reindex=False, log=self._asmnt.log
)
assert isinstance(data, pd.DataFrame)
# Check for unit consistency
data.index.names = ['cmp', 'dv']
units_isolated = data.reset_index()[[('dv', ''), ('DV', 'Unit')]]
units_isolated.columns = pd.Index(['dv', 'Units'])
units_isolated_grp = units_isolated.groupby('dv')['Units']
unit_counts = units_isolated_grp.nunique()
more_than_one = unit_counts[unit_counts > 1]
if not more_than_one.empty:
raise InconsistentUnitsError
# determine if the loss model parameters are for damage
# states or loss functions
if _is_for_ds_model(data):
self.ds_model.load_model_parameters(data)
elif _is_for_lf_model(data):
self.lf_model.load_model_parameters(data)
else:
msg = f'Invalid loss model parameters: {data_path}'
raise ValueError(msg)
[docs]
def calculate(self) -> None:
"""
Calculate the loss of each component block.
Note: This method simply calculates the loss of each component
block without any special treatment to `replacement`
consequences. This can be done at a later step with the
`aggregate_losses` method.
Raises
------
ValueError
If the size of the demand sample and the damage sample
don't match.
"""
self.log.div()
self.log.msg('Calculating losses...')
# Get the damaged quantities in each damage state for each
# component of interest.
demand = self._asmnt.demand.sample
assert demand is not None
demand_offset = self._asmnt.options.demand_offset
assert demand_offset is not None
nondirectional_multipliers = self._asmnt.options.nondir_multi_dict
assert nondirectional_multipliers is not None
assert self._asmnt.asset.cmp_sample is not None
cmp_sample = self._asmnt.asset.cmp_sample.to_dict('series')
cmp_marginal_params = self._asmnt.asset.cmp_marginal_params
assert cmp_marginal_params is not None
if self._asmnt.damage.ds_model.sample is not None:
# TODO(JVM): FIND A WAY to avoid making a copy of this.
dmg_quantities = self._asmnt.damage.ds_model.sample.copy()
if len(demand) != len(dmg_quantities):
msg = (
f'The demand sample contains {len(demand)} realizations, '
f'but the damage sample contains {len(dmg_quantities)}. '
f'Loss calculation cannot proceed when '
f'these numbers are different. '
)
raise ValueError(msg)
self.ds_model.calculate(dmg_quantities)
self.lf_model.calculate(
demand,
cmp_sample,
cmp_marginal_params,
demand_offset,
nondirectional_multipliers,
)
self.log.msg('Loss calculation successful.')
[docs]
def consequence_scaling(self, scaling_specification: str) -> None:
"""
Apply scale factors to losses.
Applies scale factors to the loss sample according to the
given scaling specification. The scaling specification should
be a path to a CSV file. It should contain a `Decision
Variable` column with a specified decision variable in each
row. Other optional columns are `Component`, `Location`,
`Direction`. Each row acts as an independent scaling
operation, with the scale factor defined in the `Scale Factor`
column, which is required. If any value is missing in the
optional columns, it is assumed that the scale factor should
be applied to all entries of the loss sample where the other
column values match. For example, if the specification has a
single row with `Decision Variable` set to 'Cost', `Scale
Factor` set to 2.0, and no other columns, this will double the
'Cost' DV. If instead `Location` was also set to `1`, it would
double the Cost of all components that have that location. The
columns `Location` and `Direction` can contain ranges, like
this: `1--3` means `1`, `2`, and `3`. If a range is used in
both `Location` and `Direction`, the factor of that row will
be applied once to all combinations.
Parameters
----------
scaling_specification: str
Path to a CSV file containing the scaling specification.
Raises
------
ValueError
If required columns are missing or contain NaNs.
"""
# Specify expected dtypes from the start.
dtypes = {
'Decision Variable': 'str',
'Component': 'str',
'Location': 'str',
'Direction': 'str',
'Scale Factor': 'float64',
}
scaling_specification_df = pd.read_csv(scaling_specification, dtype=dtypes)
if (
'Decision Variable' not in scaling_specification_df.columns
or scaling_specification_df['Decision Variable'].isna().any()
):
msg = (
'The `Decision Variable` column is missing '
'from the scaling specification or contains NaN values.'
)
raise ValueError(msg)
if (
'Scale Factor' not in scaling_specification_df.columns
or scaling_specification_df['Scale Factor'].isna().any()
):
msg = (
'The `Scale Factor` column is missing '
'from the scaling specification or contains NaN values.'
)
raise ValueError(msg)
# Add missing optional columns with NaN values
optional_cols = ['Component', 'Location', 'Direction']
for col in optional_cols:
if col not in scaling_specification_df.columns:
scaling_specification_df[col] = np.nan
# Rename the columns to the internally used values
name_map = {
'Decision Variable': 'dv',
'Component': 'dmg',
'Location': 'loc',
'Direction': 'dir',
'Scale Factor': 'scaling',
}
scaling_specification_df = scaling_specification_df.rename(columns=name_map)
# Expand ranges in 'loc' and 'dir'
def _expand_range(col): # noqa: ANN001, ANN202
if pd.isna(col):
return [col]
if '--' in col:
start, end = (int(x) for x in col.split('--'))
return [str(x) for x in range(start, end + 1)]
return [col]
# Generate all combinations of loc and dir ranges
expanded_df = scaling_specification_df.apply( # type: ignore
lambda row: pd.DataFrame(
list(product(_expand_range(row['loc']), _expand_range(row['dir']))),
columns=['loc', 'dir'],
).assign(dv=row['dv'], dmg=row['dmg'], scaling=row['scaling']),
axis=1,
)
expanded_df = pd.concat(expanded_df.values)
# Now, for each unique combination in expanded_df, apply
# consequence scaling
for _, row in expanded_df.iterrows():
scaling_conditions = {
k: row[k] for k in ('dv', 'dmg', 'loc', 'dir') if not pd.isna(row[k])
}
self._apply_consequence_scaling(
scaling_conditions, row['scaling'], raise_missing=False
)
def _apply_consequence_scaling(
self,
scaling_conditions: dict,
scale_factor: float,
*,
raise_missing: bool = True,
) -> None:
"""
Apply a scale factor to selected loss sample columns.
The scaling conditions are passed as a dictionary mapping
level names with their required value for the condition to be
met. It has to contain `dv` as one of its keys, defining the
decision variable where the factors should be applied. Other
valid levels include:
- `dmg`: containing a source component name,
- `loc`: containing a location,
- `dir`: containing a direction,
- `uid`: containing a Unique Component ID (UID).
If any of the keys is missing, it is assumed that the scaling
factor should be applied to all relevant consequences (those
matching the remaining values of the hierarchical index).
Parameters
----------
scaling_conditions: dict
A dictionary mapping level names with a single value. Only the
rows where the index levels have the provided values will be
affected. The dictionary can be empty, in which case all rows
will be affected, or contain only some levels and values, in
which case only the matching rows will be affected.
scale_factor: float
Scale factor to use.
raise_missing: bool
Raise an error if no rows are matching the given conditions.
Raises
------
ValueError
If the scaling_conditions dictionary does not contain a
`dv` key.
"""
# make sure we won't apply the same factor to all DVs at once,
# highly unlikely anyone would actually want to do this.
if 'dv' not in scaling_conditions:
msg = (
'The index of the `scaling_conditions` dictionary '
'should contain a level named `dv` listing the '
'relevant decision variable.'
)
raise ValueError(msg)
for model in self._loss_models:
# check if it's empty
if model.sample is None:
continue
# ensure the levels exist (but don't check if specified
# values exist yet)
for name in scaling_conditions:
if name not in model.sample.columns.names:
msg = (
f'`scaling_conditions` contains an unknown level: `{name}`.'
)
raise ValueError(msg)
# apply scale factors
base.multiply_factor_multiple_levels(
model.sample,
scaling_conditions,
scale_factor,
axis=1,
raise_missing=raise_missing,
)
[docs]
def save_sample(
self, filepath: str | None = None, *, save_units: bool = False
) -> None | pd.DataFrame | tuple[pd.DataFrame, pd.Series]:
"""
<backwards compatibility>.
Saves the sample of the `ds_model`.
Returns
-------
tuple
The output of {loss model}.ds_model.save_sample.
"""
self.log.warning(
'`{loss model}.save_sample` is deprecated and will raise '
'in future versions of pelicun. Please use '
'{loss model}.ds_model.save_sample instead.'
)
return self.ds_model.save_sample(filepath=filepath, save_units=save_units)
[docs]
def load_sample(self, filepath: str | pd.DataFrame) -> None:
"""
<backwards compatibility>.
Saves the sample of the `ds_model`.
"""
self.log.warning(
'`{loss model}.load_sample` is deprecated and will raise '
'in future versions of pelicun. Please use '
'{loss model}.ds_model.load_sample instead.'
)
dv_units = self.ds_model.load_sample(filepath=filepath)
self.dv_units = dv_units
[docs]
def aggregate_losses( # noqa: C901
self,
replacement_configuration: (
tuple[uq.RandomVariableRegistry, dict[str, float]] | None
) = None,
loss_combination: dict | None = None,
*,
future: bool = False,
) -> pd.DataFrame | tuple[pd.DataFrame, pd.DataFrame]:
"""
Aggregate the losses produced by each component.
Parameters
----------
replacement_configuration: Tuple, optional
Tuple containing a RandomVariableRegistry and a
dictionary. The RandomVariableRegistry is defining
building replacement consequence RVs for the active
decision variables. The dictionary defines exceedance
thresholds. If the aggregated value for a decision
variable (conditioned on no replacement) exceeds the
threshold, then replacement is triggered. This can happen
for multiple decision variables at the same
realization. The consequence keyword `replacement` is
reserved to represent exclusive triggering of the
replacement consequences, and other consequences are
ignored for those realizations where replacement is
triggered. When assigned to None, then `replacement` is
still treated as an exclusive consequence (other
consequences are set to zero when replacement is nonzero)
but it is not being additionally triggered by the
exceedance of any thresholds. The aggregated loss sample
contains an additional column with information on whether
replacement was already present or triggered by a
threshold exceedance for each realization.
loss_combination: dict, optional
Dictionary defining how losses for specific components
should be aggregated for a given decision variable. It has
the following structure: {`dv`: {(`c1`, `c2`): `arr`,
...}, ...}, where `dv` is some decision variable, (`c1`,
`c2`) is a tuple defining a component pair, `arr` is a NxN
numpy array defining a combination table, and `...` means
that more key-value pairs with the same schema can exist
in the dictionaries. The loss sample is expected to
contain columns that include both `c1` and `c2` listed as
the component. The combination is applied to all pairs of
columns where the components are `c1` and `c2`, and all of
the rest of the multiindex levels match (`loc`, `dir`,
`uid`). This means, for example, that when combining wind
and flood losses, the asset model should contain both a
wind and a flood component defined at the same
location-direction. `arr` can also be an M-dimensional
numpy array where each dimension has length N (NxNx...xN).
This structure allows for the loss combination of M
components. In this case the (`c1`, `c2`) tuple should
contain M elements instead of two.
future: bool, optional
Defaults to False. When set to True, it enables the
updated return type.
Notes
-----
Regardless of the value of the arguments, this method does not
alter the state of the loss model, i.e., it does not modify
the values of the `.sample` attributes.
Returns
-------
dataframe or tuple
Dataframe with the aggregated loss of each realization,
and another boolean dataframe with information on which DV
thresholds were exceeded in each realization, triggering
replacement. If no thresholds are specified it only
contains False values. The second dataframe is only
returned with `future` set to True.
"""
# TODO(JVM): When we start working on the documentation,
# simplify the docstring above and point the relevant detailed
# section in the documentation.
# validate input
if replacement_configuration is not None:
self._validate_input_replacement_thresholds(replacement_configuration)
# validate loss_combination input
if loss_combination is not None:
self._validate_input_loss_combination(loss_combination)
#
# operate on copes of the loss samples to avoid altering them.
#
if self.ds_model.sample is not None:
ds_sample = self.ds_model.sample.copy()
else:
ds_sample = None
if self.lf_model.sample is not None:
lf_sample = self.lf_model.sample.copy()
else:
lf_sample = None
def _construct_columns() -> list[str]:
columns = [
f'repair_{x.lower()}' for x in self.decision_variables if x != 'Time'
]
# Note: The `Time` DV gets special treatment.
# create the summary DF
if 'Time' in self.decision_variables:
columns.extend(('repair_time-sequential', 'repair_time-parallel'))
return columns
if ds_sample is None and lf_sample is None:
self.log.msg('There are no losses.')
return pd.DataFrame(0.00, index=[0], columns=_construct_columns())
#
# handle `replacement`, regardless of whether
# `replacement_thresholds` is empty. (if `replacement`
# occurs, we ignore the losses from other componnets)
#
if ds_sample is not None:
self._make_replacement_exclusive(ds_sample, lf_sample)
#
# combine samples
#
# levels to preserve (this aggregates `ds` for the ds_model)
column_levels = ['dv', 'loss', 'dmg', 'loc', 'dir', 'uid']
combined_sample = self.sample
sample = (
combined_sample.groupby(by=column_levels, axis=1) # type: ignore
.sum()
.sort_index(axis=1)
)
#
# perform loss combinations (special non-additive
# aggregations, e.g., Wind + Flood)
#
if loss_combination is not None:
sample = self._apply_loss_combinations(loss_combination, sample)
#
# consider replacement threshold values
#
sample, exceedance_bool_df = self._apply_replacement_thresholds(
sample, replacement_configuration
)
# Sum-up component losses
df_agg = self._aggregate_sample(sample, _construct_columns())
if not future:
self.log.warning(
'`aggregate_losses` has been expanded to support the '
'consideration of the exceedance of loss threshold '
'values leading to asset replacement '
'(like excessive repair costs). The new implementation '
'returns a tuple where the first element is the '
'aggregated losses and the second contains information '
'on which decision variables triggered replacement '
'considering the specified replacement trhesholds. '
'To obtain the new output and silence this warning, '
'please specify `future=True` as an argument to this method.'
)
return df_agg
return df_agg, exceedance_bool_df
def _validate_input_loss_combination(self, loss_combination: dict) -> None:
for dv, combinations in loss_combination.items():
if dv not in self.decision_variables:
msg = (
f'`loss_combination` contains the key '
f'`{dv}` which is not found in the active '
f'decision variables. These are: '
f'{self.decision_variables}.'
)
raise ValueError(msg)
for components, array in combinations.items():
if not isinstance(components, tuple):
msg = (
f'Invalid type for components in loss combination '
f'for `{dv}`: {type(components)}. It should be a tuple.'
)
raise TypeError(msg)
if not all(isinstance(c, str) for c in components):
msg = (
f'All elements of the components tuple in loss '
f'combination for `{dv}` should be strings.'
)
raise ValueError(msg)
if not isinstance(array, np.ndarray):
msg = (
f'Invalid type for array in loss combination '
f'for `{dv}`: {type(array)}. It should be a numpy array.'
)
raise TypeError(msg)
def _validate_input_replacement_thresholds(
self,
replacement_configuration: tuple[
uq.RandomVariableRegistry, dict[str, float]
],
) -> None:
replacement_consequence_rv_reg, replacement_ratios = (
replacement_configuration
)
if not isinstance(replacement_consequence_rv_reg, uq.RandomVariableRegistry):
msg = (
f'Invalid type for replacement consequence RV registry: '
f'{type(replacement_consequence_rv_reg)}. It should be '
f'uq.RandomVariableRegistry.'
)
raise TypeError(msg)
for key in replacement_consequence_rv_reg.RV:
if key not in self.decision_variables:
msg = (
f'`replacement_consequence_RV_reg` contains the key '
f'`{key}` which is not found in the active '
f'decision variables. These are: '
f'{self.decision_variables}.'
)
if self.query_error_setup(
'Loss/ReplacementThreshold/RaiseOnUnknownKeys'
):
raise ValueError(msg)
self.log.warning(msg)
for key in replacement_ratios:
if key not in self.decision_variables:
msg = (
f'`replacement_ratios` contains the key '
f'`{key}` which is not found in the active '
f'decision variables. These are: '
f'{self.decision_variables}.'
)
if self.query_error_setup(
'Loss/ReplacementThreshold/RaiseOnUnknownKeys'
):
raise ValueError(msg)
self.log.warning(msg)
# The replacement_consequence_RV_reg should contain an RV for
# all active DVs, regardless of whether there is a replacement
# threshold for that DV, because when replacememnt is
# triggered, we need to assign a consequence for all DVs.
for key in self.decision_variables:
if key not in replacement_consequence_rv_reg.RV:
msg = f'Missing replacement consequence RV ' f'for `{key}`.'
raise ValueError(msg)
def _apply_loss_combinations(
self, loss_combination: dict, sample: pd.DataFrame
) -> pd.DataFrame:
"""
Perform loss combinations of specified components.
This function deconstructs the loss combination arrays,
identifies the combinable components, and applies the
specified loss combinations to the sample data. The
transformed sample, including the combined columns, is
returned as a new DataFrame.
Parameters
----------
loss_combination: dict
A dictionary containing the loss combination
information. The structure is nested dictionaries where
the outer keys are decision variables, inner keys are
components to combine, and the values are array objects
representing the combination data.
sample: pandas.DataFrame
The input DataFrame containing the sample data. The
columns are assumed to be a MultiIndex with at least the
levels (decision_variable, loss_id, component_id,
location, direction, uid).
Returns
-------
pandas.DataFrame
A new DataFrame with the combined loss data.
"""
# deconstruct combination arrays to extract the input domains
loss_combination_converted = self._deconstruct_loss_combination_arrays(
loss_combination
)
# initialize variables
# sample as dictionary for fast column retrieval
dsample = {col: sample[col] for col in sample.columns}
# Transformed sample (includes the combined columns), as
# dictionary. Will be turned into a dataframe in the end.
# This avoids manipulating the original sample dataframe which
# would be slow.
dcsample: dict = {}
# add columns to the new sample dictionary.
# those that should be combined
self._loss_combination_add_combinable(
dsample, loss_combination_converted, dcsample
)
# and the remaining
for col, val in dsample.items():
dcsample[col] = val # noqa: PERF403
# turn into a dataframe
return pd.DataFrame(dcsample).rename_axis(columns=sample.columns.names)
def _loss_combination_add_combinable(
self, dsample: dict, loss_combination_converted: dict, dcsample: dict
) -> None:
"""
Add combinable loss data.
This function identifies groups of `loc`-`dir`-`uid` that can
be combined for each decision variable and computes the
combined loss using interpolation functions. It modifies the
given datasets `dsample` and `dcsample` in-place, removing
combinable columns from dsample and adding the combined losses
to dcsample.
Parameters
----------
dsample: dict
A dictionary representing the loss sample data, where keys
are tuples of the form (decision_variable, loss_id,
component_id, location, direction, uid) and values are the
corresponding data arrays.
loss_combination_converted: dict
A dictionary containing loss combination data. The
structure is nested dictionaries where the outer keys are
decision variables, inner keys are components to combine,
and the values are tuples of combination parameters
(domains and reference values).
dcsample: dict
A dictionary to store the combined loss data, where keys
are tuples of the form (decision_variable, 'combination',
combined_component_string, location, direction, uid) and
values are the combined loss data arrays.
"""
dmg_to_loss = self._map_component_ids_to_loss_ids(dsample)
# identify all `loc`-`dir`-`uid`s that can be grouped for each
# decision variable.
potential_groups = self._identify_potential_groups(dsample)
# cache already defined interpolation functions. This obviates
# the need to define all of them and we can just define them
# on the spot when needed, and reuse them if available.
interpolation_function_cache: dict = {}
for (
decision_variable,
combination_data,
) in loss_combination_converted.items():
for (
components_to_combine,
combination_parameters,
) in combination_data.items():
# determine if the components to combine are part of
# an available group
target_group: dict | None = None
for available_group in potential_groups[decision_variable]:
# check if `components_to_combine` is a subset of
# that available group
if frozenset(components_to_combine) <= available_group:
target_group = available_group
break
assert target_group is not None
# construct relevant loss sample columns
for loc_dir_uid in potential_groups[decision_variable][target_group]:
cols = [
(decision_variable, dmg_to_loss[x], x, *loc_dir_uid)
for x in target_group
]
values = np.column_stack([dsample[col] for col in cols])
# define/get interpolation function
if (
interpolation_function_cache.get(components_to_combine)
is not None
):
interp_func = interpolation_function_cache.get(
components_to_combine
)
else:
domains, reference_values = combination_parameters
interp_func = RegularGridInterpolator(
domains, reference_values
)
assert interp_func is not None
combined_loss = interp_func(values)
combined_loss_col = (
decision_variable,
'combination',
'(' + ', '.join(components_to_combine) + ')',
*loc_dir_uid,
)
dcsample[combined_loss_col] = combined_loss
for col in cols:
dsample.pop(col)
def _identify_potential_groups(self, dsample: dict) -> dict: # noqa: PLR6301
"""
Identify potential groups of `loc`-`dir`-`uid` for each DV.
This function identifies all combinations of `loc`-`dir`-`uid`
that can be grouped for each decision variable based on the
provided data sample.
Parameters
----------
dsample: iterable
An iterable where each containing tuple contains
information about the components and their attributes. The
expected format of each tuple is (decision_variable,
loss_id, component_id, location, direction, uid).
Returns
-------
dict
A dictionary where keys are decision variables and values
are nested dictionaries. The nested dictionaries map
frozensets of component IDs to lists of (location,
direction, uid) tuples.
"""
grouped: defaultdict = defaultdict(defaultdict(list).copy)
for col in dsample:
c_dv, _, c_dmg, c_loc, c_dir, c_uid = col
grouped[c_dv][c_loc, c_dir, c_uid].append(c_dmg)
# invert so that we have component sets mapped to
# `loc`-`dir`-`uid`s.
inverted: defaultdict = defaultdict(defaultdict(list).copy)
for c_dv in grouped:
for loc_dir_uid, component_set in grouped[c_dv].items():
inverted[c_dv][frozenset(component_set)].append(loc_dir_uid)
return inverted
def _map_component_ids_to_loss_ids(self, dsample: dict) -> dict: # noqa: PLR6301
"""
Map component IDs to loss IDs.
This function maps components to losses based on the loss
sample's columns. It assumes that multiple component IDs can
have the same loss ID, but the same component ID cannot have
multiple loss IDs.
Parameters
----------
dsample: tuple dictionary keys
Each tuple contains information about the components and
corresponding losses.
Returns
-------
dict
A dictionary where keys are component IDs and values are
loss IDs.
"""
dmg_to_loss = {}
for col in dsample:
c_loss = col[1]
c_dmg = col[2]
dmg_to_loss[c_dmg] = c_loss
return dmg_to_loss
def _deconstruct_loss_combination_arrays(self, loss_combination: dict) -> dict: # noqa: PLR6301
"""
Deconstruct loss combination arrays.
This function converts a nested dictionary of loss combination
arrays into a format suitable for further processing. It
extracts the array values and the index information from each
array.
Parameters
----------
loss_combination: dict
A dictionary where keys are decision variables and values
are another dictionary. The inner dictionary has keys as
components to combine and values as numpy array
objects representing the combination data.
Returns
-------
dict
A dictionary with the same structure as the input
`loss_combination`. For each decision variable and
component combination, the array is replaced with a
tuple containing the combination domain and the combination
array itself.
"""
loss_combination_converted: dict = {}
for decision_variable, combination_data in loss_combination.items():
loss_combination_converted[decision_variable] = {}
for (
components_to_combine,
combination_array,
) in combination_data.items():
combination_index = (
combination_array[:, 0],
combination_array[0, :],
)
loss_combination_converted[decision_variable][
components_to_combine
] = (
combination_index,
combination_array,
)
return loss_combination_converted
def _aggregate_sample(self, sample: pd.DataFrame, columns: list) -> pd.DataFrame:
"""
Sum up component losses.
Returns
-------
pd.DataFrame
Dataframe with the aggregated losses.
"""
df_agg = pd.DataFrame(index=sample.index, columns=columns)
# group results by DV type and location
aggregated = sample.groupby(
level=['dv', 'loc'],
axis=1, # type: ignore
).sum()
for decision_variable in self.decision_variables:
# Time
if (
decision_variable == 'Time'
and 'Time' in aggregated.columns.get_level_values('dv')
):
df_agg['repair_time-sequential'] = aggregated['Time'].sum(axis=1)
df_agg['repair_time-parallel'] = aggregated['Time'].max(axis=1)
elif (
decision_variable == 'Time'
and 'Time' not in aggregated.columns.get_level_values('dv')
):
df_agg = df_agg.drop(
['repair_time-parallel', 'repair_time-sequential'], axis=1
)
# All other
elif decision_variable in aggregated.columns.get_level_values('dv'):
df_agg[f'repair_{decision_variable.lower()}'] = aggregated[
decision_variable
].sum(axis=1)
else:
df_agg = df_agg.drop(f'repair_{decision_variable.lower()}', axis=1)
# Convert units ..
column_measures = [
x.replace('repair_', '')
.replace('-sequential', '')
.replace('-parallel', '')
for x in df_agg.columns.get_level_values(0)
]
assert self.dv_units is not None
column_units = [self.dv_units[x.title()] for x in column_measures]
dv_units = pd.Series(column_units, index=df_agg.columns, name='Units')
res = file_io.save_to_csv(
df_agg,
None,
units=dv_units,
unit_conversion_factors=self._asmnt.unit_conversion_factors,
use_simpleindex=False,
log=self._asmnt.log,
)
assert isinstance(res, pd.DataFrame)
df_agg = res
df_agg = df_agg.drop('Units')
df_agg = df_agg.astype(float)
df_agg_mi = base.convert_to_MultiIndex(df_agg, axis=1)
assert isinstance(df_agg_mi, pd.DataFrame)
df_agg = df_agg_mi
df_agg = df_agg.sort_index(axis=1)
df_agg = df_agg.reset_index(drop=True)
assert isinstance(df_agg, pd.DataFrame)
return df_agg
def _apply_replacement_thresholds( # noqa: PLR6301
self,
sample: pd.DataFrame,
replacement_configuration: (
tuple[uq.RandomVariableRegistry, dict[str, float]] | None
),
) -> tuple[pd.DataFrame, pd.DataFrame]:
# If there is no `replacement_configuration`, simply return.
if replacement_configuration is None:
# `exceedance_bool_df` is empty in this case.
exceedance_bool_df = pd.DataFrame(index=sample.index, dtype=bool)
return sample, exceedance_bool_df
replacement_consequence_rv_reg, replacement_ratios = (
replacement_configuration
)
exceedance_bool_df = pd.DataFrame( # type: ignore
data=False,
index=sample.index,
columns=replacement_consequence_rv_reg.RV.keys(),
dtype=bool,
)
# Sample replacement consequences from the registry
replacement_consequence_rv_reg.generate_sample(len(sample), 'MonteCarlo')
sample_dvs = replacement_consequence_rv_reg.RV.keys()
for sample_dv in sample_dvs:
sub_sample = sample.loc[:, sample_dv]
if 'replacement' in sub_sample.columns.get_level_values('loss'):
# If `replacement` already exists as a consequence,
# determine the realizations where it is non-zero.
no_replacement_mask = (
~(sub_sample['replacement'] > 0.00).any(axis=1).to_numpy()
)
no_replacement_columns = (
sub_sample.columns.get_level_values('loss') != 'replacement'
)
else:
# Otherwise there is no row where we already have replacement
no_replacement_mask = np.full(len(sub_sample), fill_value=True)
no_replacement_columns = np.full(
len(sub_sample.columns), fill_value=True
)
# Get the sum to compare with the thresholds
consequence_sum_given_no_replacement = sub_sample.iloc[ # type: ignore
no_replacement_mask, no_replacement_columns
].sum(axis=1)
if not consequence_sum_given_no_replacement.empty:
consequence_values = replacement_consequence_rv_reg.RV[
sample_dv
].sample
assert consequence_values is not None
exceedance_mask = (
consequence_sum_given_no_replacement
> consequence_values[no_replacement_mask]
* replacement_ratios[sample_dv]
)
# Fill the remaining rows with False
if len(exceedance_mask) < len(sub_sample):
exceedance_mask = exceedance_mask.reindex(
sub_sample.index, fill_value=False
)
else:
exceedance_mask = pd.Series(
np.full(len(sub_sample), fill_value=False),
index=sub_sample.index,
)
# Monitor triggering of replacement
exceedance_bool_df[sample_dv] = exceedance_mask
# Assign replacement consequences where the threshold has been
# exceeded.
exceedance_realizations = exceedance_bool_df.any(axis=1)
# Assign replacement consequences: needs to include all DVs
for other_dv in replacement_consequence_rv_reg.RV:
col = (
other_dv,
'replacement',
'threshold_exceedance',
'0',
'1',
'0',
)
# If it doesn't exist, initialize and set to 0.00
if col not in sample:
sample[col] = 0.00
sample = sample.sort_index(axis=1)
# Assign replacement consequences
other_sample = replacement_consequence_rv_reg.RV[other_dv].sample
assert other_sample is not None
sample.loc[exceedance_realizations, col] = other_sample[
exceedance_realizations
]
# Remove all other realized consequences from the realizations
# where the threshold was exceeded.
sample.loc[
exceedance_realizations,
sample.columns.get_level_values('dmg') != 'threshold_exceedance',
] = 0.00
return sample, exceedance_bool_df
def _make_replacement_exclusive( # noqa: PLR6301
self, ds_sample: pd.DataFrame, lf_sample: pd.DataFrame | None
) -> None:
"""
Make the replacement consequence exclusive.
If `replacement` columns exist in `ds_sample`, this method
treats all nonzero loss values driven by `replacement` as
exclusive and zeroes-out the loss values of all other columns
for the applicable rows.
"""
# rows where replacement is non-zero
replacement_rows: list = []
# columns that correspond to the replacement consequence
replacement_columns = (
ds_sample.columns.get_level_values('loss') == 'replacement'
)
rows_df = ds_sample.iloc[:, replacement_columns]
if not rows_df.empty:
replacement_rows = (
np.argwhere(np.any(rows_df.to_numpy() > 0.0, axis=1))
.reshape(-1)
.tolist()
)
ds_sample.iloc[replacement_rows, ~replacement_columns] = 0.00 # type: ignore
if lf_sample is not None:
lf_sample.iloc[replacement_rows, :] = 0.00
@property
def _loss_models(self) -> tuple[RepairModel_DS, RepairModel_LF]:
return (self.ds_model, self.lf_model)
@property
def _loss_map(self) -> pd.DataFrame | None:
"""
Returns the loss map.
Returns
-------
pd.DataFrame
The loss map.
"""
# Retrieve the DataFrame from one of the included loss models.
# We use a single loss map for all.
return self.ds_model.loss_map
@_loss_map.setter
def _loss_map(self, loss_map: pd.DataFrame) -> None:
"""
Set the loss map.
Parameters
----------
loss_map: pd.DataFrame
The loss map.
"""
# Add the DataFrame to the included loss models.
# We use a single loss map for all.
for model in self._loss_models:
model.loss_map = loss_map
@property
def _missing(self) -> set[tuple[str, str]]:
"""
Returns the missing components.
Returns
-------
set
Set containing tuples identifying missing loss parameter
definitions.
"""
return self.ds_model.missing
@_missing.setter
def _missing(self, missing: set[tuple[str, str]]) -> None:
"""
Assign missing parameter definitions to the loss models.
Parameters
----------
missing: set
Set containing tuples identifying missing loss parameter
definitions.
"""
for model in self._loss_models:
model.missing = missing
def _ensure_loss_parameter_availability(self) -> None:
"""Make sure that all components have loss parameters."""
#
# Repair Models (currently the only type supported)
#
required = []
assert self._loss_map is not None
for dv in self.decision_variables:
required.extend(
[(component, dv) for component in self._loss_map['Repair']]
)
missing_set = set(required)
for model in (self.ds_model, self.lf_model):
missing_set -= model.get_available()
if missing_set:
self.log.warning(
f'The loss model does not provide '
f'loss information for the following component(s) '
f'in the asset model: {sorted(missing_set)}.'
)
self._missing = missing_set
[docs]
class RepairModel_Base(PelicunModel):
"""Base class for loss models."""
__slots__ = [
'consequence',
'decision_variables',
'loss_map',
'loss_params',
'missing',
'sample',
]
def __init__(self, assessment: AssessmentBase) -> None:
"""
Initialize RepairModel_Base objects.
Parameters
----------
assessment: pelicun.AssessmentBase
Parent assessment
"""
super().__init__(assessment)
self.loss_params: pd.DataFrame | None = None
self.sample: pd.DataFrame | None = None
self.consequence = 'Repair'
self.decision_variables: tuple[str, ...] = ()
self.loss_map: pd.DataFrame | None = None
self.missing: set = set()
[docs]
def load_model_parameters(self, data: pd.DataFrame) -> None:
"""
Load model parameters from a DataFrame.
Extends those already available. Parameters already defined
take precedence, i.e. redefinitions of parameters are ignored.
Parameters
----------
data: DataFrame
Data with loss model information.
"""
data.index.names = ['Loss Driver', 'Decision Variable']
if self.loss_params is not None:
data = pd.concat((self.loss_params, data), axis=0)
# drop redefinitions of components
data = (
data.groupby(level=[0, 1]).first().transform(lambda x: x.fillna(np.nan))
)
# note: .groupby introduces None entries. We replace them with
# NaN for consistency.
self.loss_params = data
[docs]
def drop_unused_loss_parameters(self, loss_map: pd.DataFrame) -> None:
"""
Remove loss parameter definitions.
Applicable to component IDs not present in the loss map.
Parameters
----------
loss_map: str or pd.DataFrame or None
Path to a csv file or DataFrame object that maps
components IDs to their loss parameter definitions.
Components in the asset model that are omitted from the
provided loss map are mapped to the same ID.
"""
if self.loss_params is None:
return
# <backwards compatibility>
if 'BldgRepair' in loss_map.columns:
loss_map['Repair'] = loss_map['BldgRepair']
loss_map = loss_map.drop('BldgRepair', axis=1)
self.log.warning(
'`BldgRepair` as a loss map column name is '
'deprecated and will be dropped in '
'future versions of pelicun. Please use `Repair` instead.'
)
# get a list of unique component IDs
cmp_set = set(loss_map['Repair'].unique())
cmp_mask = self.loss_params.index.get_level_values(0).isin(cmp_set, level=0)
self.loss_params = self.loss_params.iloc[cmp_mask, :]
[docs]
def remove_incomplete_components(self) -> None:
"""
Remove incomplete components.
Removes components that have incomplete loss model
definitions from the loss model parameters.
"""
if self.loss_params is None:
return
if ('Incomplete', '') not in self.loss_params.columns:
return
cmp_incomplete_idx = self.loss_params.loc[
self.loss_params['Incomplete', ''] == 1
].index
self.loss_params = self.loss_params.drop(cmp_incomplete_idx)
if len(cmp_incomplete_idx) > 0:
self.log.msg(
f'\n'
f'WARNING: Loss model information is incomplete for '
f'the following component(s) '
f'{cmp_incomplete_idx.to_list()}. They '
f'were removed from the analysis.'
f'\n',
prepend_timestamp=False,
)
[docs]
def get_available(self) -> set:
"""
Get a set of components with available loss parameters.
Returns
-------
set
Set of components with available loss parameters.
"""
if self.loss_params is not None:
cmp_list = self.loss_params.index.to_list()
return set(cmp_list)
return set()
[docs]
@abstractmethod
def convert_loss_parameter_units(self) -> None:
"""Convert previously loaded loss parameters to base units."""
[docs]
class RepairModel_DS(RepairModel_Base):
"""Repair consequences for components with damage states."""
__slots__ = ['RV_reg']
[docs]
def save_sample(
self, filepath: str | None = None, *, save_units: bool = False
) -> None | pd.DataFrame | tuple[pd.DataFrame, pd.Series]:
"""
Save or return the loss sample.
This method handles the storage of a sample of loss estimates,
which can either be saved directly to a file or returned as a
DataFrame for further manipulation. When saving to a file,
additional information such as unit conversion factors and
column units can be included. If the data is not being saved
to a file, the method can return the DataFrame with or without
units as specified.
Parameters
----------
filepath: str, optional
The path to the file where the loss sample should be
saved. If not provided, the sample is not saved to disk
but returned.
save_units: bool, default: False
Indicates whether to include a row with unit information
in the returned DataFrame. This parameter is ignored if a
file path is provided.
Returns
-------
None or tuple
If `filepath` is provided, the function returns None after
saving the data.
If no `filepath` is specified, returns:
* DataFrame containing the loss sample.
* Optionally, a Series containing the units for each
column if `save_units` is True.
"""
self.log.div()
if filepath is not None:
self.log.msg('Saving loss sample...')
assert self.sample is not None
assert self.loss_params is not None
cmp_units = self.loss_params['DV', 'Unit'].sort_index()
dv_units = pd.Series(
index=self.sample.columns, name='Units', dtype='object'
).sort_index()
valid_dv_types = dv_units.index.unique(level=0)
valid_cmp_ids = dv_units.index.unique(level=1)
for cmp_id, dv_type in cmp_units.index:
if (dv_type in valid_dv_types) and (cmp_id in valid_cmp_ids):
dv_units.loc[dv_type, cmp_id] = cmp_units.loc[cmp_id, dv_type]
res = file_io.save_to_csv(
self.sample,
Path(filepath) if filepath is not None else None,
units=dv_units,
unit_conversion_factors=self._asmnt.unit_conversion_factors,
use_simpleindex=(filepath is not None),
log=self._asmnt.log,
)
if filepath is not None:
self.log.msg('Loss sample successfully saved.', prepend_timestamp=False)
return None
assert isinstance(res, pd.DataFrame)
units = res.loc['Units']
res = res.drop('Units')
res = res.astype(float)
assert isinstance(res, pd.DataFrame)
assert isinstance(units, pd.Series)
if save_units:
return res, units
return res
[docs]
def load_sample(self, filepath: str | pd.DataFrame) -> dict[str, str]:
"""
Load loss sample data.
Parameters
----------
filepath: str
Path to an existing sample stored in a file, or dataframe
containing the existing sample.
Returns
-------
dict[str, str]
Dictionary mapping each decision variable to its assigned
unit.
Raises
------
ValueError
If the columns have an invalid number of levels.
"""
names = ['dv', 'loss', 'dmg', 'ds', 'loc', 'dir', 'uid']
self.log.div()
self.log.msg('Loading loss sample...')
sample, units = file_io.load_data(
filepath,
self._asmnt.unit_conversion_factors,
log=self._asmnt.log,
return_units=True,
)
assert isinstance(sample, pd.DataFrame)
assert isinstance(units, pd.Series)
units.index.names = names
# Obtain the DV units
# Note: we don't need to check for consistency (all rows
# having the same unit) since the units are extracted from a
# single row in the CSV, affecting all subsequent rows.
units_isolated = (
units.reset_index()[['dv', 'Units']]
.set_index('dv')
.groupby('dv')['Units']
)
dv_units = units_isolated.first().to_dict()
# check if `uid` level was provided
num_levels = len(sample.columns.names)
num_levels_without_uid = 6
num_levels_with_uid = num_levels_without_uid + 1
if num_levels == num_levels_without_uid:
sample.columns.names = names[:-1]
sample = base.dedupe_index(sample.T).T
elif num_levels == num_levels_with_uid:
sample.columns.names = names
else:
msg = (
f'Invalid loss sample: Column MultiIndex '
f'has an unexpected length: {num_levels}'
)
raise ValueError(msg)
self.sample = sample
self.log.msg('Loss sample successfully loaded.', prepend_timestamp=False)
return dv_units
[docs]
def calculate(self, dmg_quantities: pd.DataFrame) -> None: # noqa: C901
"""
Calculate damage consequences.
Parameters
----------
dmg_quantities: DataFrame
A table with the quantity of damage experienced in each
damage state of each performance group at each location
and direction. You can use the prepare_dmg_quantities
method in the DamageModel to get such a DF.
"""
assert self.loss_map is not None
sample_size = len(dmg_quantities)
# If everything is undamaged there are no losses
if set(dmg_quantities.columns.get_level_values('ds')) == {'0'}:
self.sample = None
self.log.msg(
'There is no damage---DV sample is set to None.',
prepend_timestamp=False,
)
return
# calculate the quantities for economies of scale
self.log.msg('\nAggregating damage quantities...', prepend_timestamp=False)
if self._asmnt.options.eco_scale['AcrossFloors']:
if self._asmnt.options.eco_scale['AcrossDamageStates']:
eco_levels = [0]
eco_columns = ['cmp']
else:
eco_levels = [0, 4]
eco_columns = ['cmp', 'ds']
elif self._asmnt.options.eco_scale['AcrossDamageStates']:
eco_levels = [0, 1]
eco_columns = ['cmp', 'loc']
else:
eco_levels = [0, 1, 4]
eco_columns = ['cmp', 'loc', 'ds']
eco_group = dmg_quantities.groupby(level=eco_levels, axis=1) # type: ignore
eco_qnt = eco_group.sum().mask(eco_group.count() == 0, np.nan)
assert eco_qnt.columns.names == eco_columns
self.log.msg(
'Successfully aggregated damage quantities.', prepend_timestamp=False
)
# apply the median functions, if needed, to get median consequences for
# each realization
self.log.msg(
'\nCalculating the median repair consequences...',
prepend_timestamp=False,
)
medians = self._calc_median_consequence(eco_qnt)
self.log.msg(
'Successfully determined median repair consequences.',
prepend_timestamp=False,
)
# combine the median consequences with the samples of deviation from the
# median to get the consequence realizations.
self.log.msg(
'\nConsidering deviations from the median values to obtain '
'random DV sample...'
)
self.log.msg(
'Preparing random variables for repair consequences...',
prepend_timestamp=False,
)
self.RV_reg = self._create_DV_RVs(dmg_quantities.columns) # type: ignore
if self.RV_reg is not None:
assert self._asmnt.options.sampling_method is not None
self.RV_reg.generate_sample(
sample_size=sample_size, method=self._asmnt.options.sampling_method
)
std_sample = base.convert_to_MultiIndex(
pd.DataFrame(self.RV_reg.RV_sample), axis=1
)
std_sample.columns.names = ['dv', 'cmp', 'ds', 'loc', 'dir', 'uid']
std_sample = std_sample.sort_index(axis=1)
else:
std_sample = None
self.log.msg(
f'\nSuccessfully generated {sample_size} realizations of '
'deviation from the median consequences.',
prepend_timestamp=False,
)
res_list = []
key_list: list[tuple[Any, ...]] = []
dmg_quantities.columns = dmg_quantities.columns.reorder_levels( # type: ignore
['cmp', 'ds', 'loc', 'dir', 'uid']
)
dmg_quantities = dmg_quantities.sort_index(axis=1)
if std_sample is not None:
std_dvs = std_sample.columns.unique(level=0)
else:
std_dvs = []
for decision_variable in self.decision_variables: # noqa: PLR1702
if decision_variable in std_dvs:
assert isinstance(std_sample, pd.DataFrame)
prob_cmp_list = std_sample[decision_variable].columns.unique(level=0)
else:
prob_cmp_list = []
cmp_list: list[tuple[Any, ...]] = []
if decision_variable not in medians:
continue
for component in medians[decision_variable].columns.unique(level=0):
# check if there is damage in the component
consequence = self.loss_map.loc[component, 'Repair']
if component not in dmg_quantities.columns.get_level_values('cmp'):
continue
ds_list = []
for ds in (
medians[decision_variable]
.loc[:, component]
.columns.unique(level=0)
):
loc_list = []
for loc_id, loc in enumerate(
dmg_quantities.loc[
:, (component, ds) # type: ignore
].columns.unique(level=0)
):
if (
self._asmnt.options.eco_scale['AcrossFloors'] is True
) and (loc_id > 0):
break
if self._asmnt.options.eco_scale['AcrossFloors'] is True:
median_i = medians[decision_variable].loc[
:, (component, ds)
]
dmg_i = dmg_quantities.loc[
:, (component, ds) # type: ignore
]
if component in prob_cmp_list:
assert std_sample is not None
std_i = std_sample.loc[
:,
(
decision_variable,
component,
ds,
), # type: ignore
]
else:
std_i = None
else:
median_i = medians[decision_variable].loc[
:, (component, ds, loc)
]
dmg_i = dmg_quantities.loc[
:, (component, ds, loc) # type: ignore
]
if component in prob_cmp_list:
assert std_sample is not None
std_i = std_sample.loc[
:,
(
decision_variable,
component,
ds,
loc,
), # type: ignore
]
else:
std_i = None
if std_i is not None:
res_list.append(dmg_i.mul(median_i, axis=0) * std_i)
else:
res_list.append(dmg_i.mul(median_i, axis=0))
loc_list.append(loc)
if self._asmnt.options.eco_scale['AcrossFloors'] is True:
ds_list += [
ds,
]
else:
ds_list += [(ds, loc) for loc in loc_list]
if self._asmnt.options.eco_scale['AcrossFloors'] is True:
cmp_list += [(consequence, component, ds) for ds in ds_list]
else:
cmp_list += [
(consequence, component, ds, loc) for ds, loc in ds_list
]
if self._asmnt.options.eco_scale['AcrossFloors'] is True:
key_list += [
(decision_variable, loss_cmp_i, dmg_cmp_i, ds)
for loss_cmp_i, dmg_cmp_i, ds in cmp_list
]
else:
key_list += [
(decision_variable, loss_cmp_i, dmg_cmp_i, ds, loc)
for loss_cmp_i, dmg_cmp_i, ds, loc in cmp_list
]
lvl_names = ['dv', 'loss', 'dmg', 'ds', 'loc', 'dir', 'uid']
dv_sample = pd.concat(res_list, axis=1, keys=key_list, names=lvl_names)
dv_sample = dv_sample.fillna(0).convert_dtypes()
self.log.msg('Successfully obtained DV sample.', prepend_timestamp=False)
self.sample = dv_sample
[docs]
def convert_loss_parameter_units(self) -> None:
"""Convert previously loaded loss parameters to base units."""
if self.loss_params is None:
return
units = self.loss_params['DV', 'Unit']
arg_units = self.loss_params['Quantity', 'Unit']
for column in self.loss_params.columns.unique(level=0):
if not column.startswith('DS'):
continue
params = self.loss_params.loc[:, column].copy()
assert isinstance(params, pd.DataFrame)
self.loss_params.loc[:, column] = self._convert_marginal_params(
params, units, arg_units
).to_numpy()
[docs]
def drop_unused_damage_states(self) -> None:
"""
Remove unused columns.
Remove columns from the loss model parameters corresponding
to unused damage states.
"""
if self.loss_params is None:
return
first_level = self.loss_params.columns.get_level_values(0).unique().to_list()
ds_list = [x for x in first_level if x.startswith('DS')]
ds_to_drop = []
for damage_state in ds_list:
if (
np.all(
pd.isna(
self.loss_params.loc[
:, # type: ignore
idx[damage_state, :],
].values
)
)
# Note: When this evaluates to True, when you add `is
# True` on the right it suddenly evaluates to
# False. We need to figure out why this is happening,
# but the way it's written now does what we want in
# each case.
):
ds_to_drop.append(damage_state) # noqa: PERF401
self.loss_params = self.loss_params.drop(columns=ds_to_drop, level=0)
def _create_DV_RVs( # noqa: N802, C901
self, cases: pd.MultiIndex
) -> uq.RandomVariableRegistry | None:
"""
Prepare the random variables.
Prepare the random variables associated with decision
variables, such as repair cost and time.
Parameters
----------
cases: MultiIndex
Index with cmp-loc-uid-dir-ds descriptions that identify
the RVs we need for the simulation.
Returns
-------
RandomVariableRegistry or None
A RandomVariableRegistry containing all the generated
random variables necessary for the simulation. If no
random variables are generated (due to missing parameters
or conditions), returns None.
"""
# Convert the MultiIndex to a DataFrame
case_df = pd.DataFrame(index=cases).reset_index()
# maps `cmp` to array of damage states
damage_states = case_df.groupby('cmp')['ds'].unique().to_dict()
# maps `cmp`-`ds` to tuple of `loc`-`dir`-`uid` tuples
loc_dir_uids = (
case_df.groupby(['cmp', 'ds'])
.apply(
lambda x: tuple( # type: ignore
zip(
x['loc'],
x['dir'],
x['uid'],
)
)
)
.to_dict()
)
damaged_components = set(cases.get_level_values('cmp'))
rv_reg = uq.RandomVariableRegistry(self._asmnt.options.rng)
rv_count = 0
# for each component in the loss map
assert isinstance(self.loss_map, pd.DataFrame)
for component, consequence in self.loss_map['Repair'].items():
# if that component does not have realized damage states,
# skip it (e.g., this can happen when there is only
# `collapse`).
if component not in damaged_components:
continue
# for each DV
for decision_variable in self.decision_variables:
# If loss parameters are missing for that consequence,
# don't estimate losses for it. A warning has already
# been issued for what is missing.
if (consequence, decision_variable) in self.missing:
continue
# If loss parameters are missing for that consequence,
# for this particular loss model, they will exist in
# the other(s).
assert self.loss_params is not None
if (consequence, decision_variable) not in self.loss_params.index:
continue
# load the corresponding parameters
parameters = (
self.loss_params.loc[(consequence, decision_variable), :]
.dropna()
.to_dict()
)
for ds in damage_states[component]:
if ds == '0':
continue
ds_family = parameters.get((f'DS{ds}', 'Family'))
ds_theta = [
theta
for t_i in range(3)
if (theta := parameters.get((f'DS{ds}', f'Theta_{t_i}')))
is not None
]
# If there is no RV family we don't need an RV
if ds_family is None:
continue
# If the first parameter is controlled by a function, we use
# 1.0 in its place and will scale the results in a later
# step
if isinstance(ds_theta[0], str) and '|' in ds_theta[0]:
ds_theta[0] = 1.0
loc_dir_uid = loc_dir_uids[component, ds]
for loc, direction, uid in loc_dir_uid:
# assign RVs
rv_reg.add_RV(
uq.rv_class_map(ds_family)( # type: ignore
name=(
f'{decision_variable}-{component}-'
f'{ds}-{loc}-{direction}-{uid}'
),
theta=np.array(ds_theta),
truncation_limits=np.array([0.0, np.nan]),
)
)
rv_count += 1
# assign Time-Cost correlation whenever applicable
rho = self._asmnt.options.rho_cost_time
if rho != 0.0:
for rv_tag in rv_reg.RV:
if not rv_tag.startswith('Cost'):
continue
component = rv_tag.split('-')[1]
ds = rv_tag.split('-')[2]
loc = rv_tag.split('-')[3]
direction = rv_tag.split('-')[4]
uid = rv_tag.split('-')[5]
time_rv_tag = rv_tag.replace('Cost', 'Time')
if time_rv_tag in rv_reg.RV:
rv_reg.add_RV_set(
uq.RandomVariableSet(
f'DV-{component}-{ds}-{loc}-{direction}-{uid}_set',
list(rv_reg.RVs([rv_tag, time_rv_tag]).values()),
np.array([[1.0, rho], [rho, 1.0]]),
)
)
self.log.msg(
f'\n{rv_count} random variables created.', prepend_timestamp=False
)
if rv_count > 0:
return rv_reg
return None
def _calc_median_consequence(self, eco_qnt: pd.DataFrame) -> dict: # noqa: C901
"""
Calculate median reiapr consequences.
Calculate the median repair consequences for each loss
component based on its quantity realizations and the
associated loss parameters.
Parameters
----------
eco_qnt: DataFrame
A DataFrame containing quantity realizations for various
components and damage states, appropriately grouped to
account for economies of scale.
decision_variables: list
Defines the decision variables to be included in the loss
calculations. Defaults to those supported, but fewer can be
used if desired. When fewer are used, the loss parameters for
those not used will not be required.
Returns
-------
dict
A dictionary where keys are the types of decision
variables (DV) like 'COST' or 'TIME', and values are
DataFrames containing the median consequences for each
component and damage state. These DataFrames are structured
with MultiIndex columns that may include 'cmp' (component),
'ds' (damage state), and potentially 'loc' (location),
depending on the way economies of scale are handled.
Raises
------
ValueError
If any loss driver types or distribution types are not
recognized, or if the parameters are incomplete or
unsupported.
"""
medians = {}
for decision_variable in self.decision_variables:
cmp_list = []
median_list = []
assert self.loss_map is not None
for loss_cmp_id, loss_cmp_name in self.loss_map['Repair'].items():
if (loss_cmp_name, decision_variable) in self.missing:
continue
if loss_cmp_id not in eco_qnt.columns.get_level_values(0).unique():
continue
ds_list = []
sub_medians = []
assert self.loss_params is not None
for ds in self.loss_params.columns.get_level_values(0).unique():
if not ds.startswith('DS'):
continue
ds_id = ds[2:]
if ds_id == '0':
continue
loss_params_ds = self.loss_params.loc[
(loss_cmp_name, decision_variable), ds
]
# check if theta_0 is defined
theta_0 = loss_params_ds.get('Theta_0', np.nan)
if pd.isna(theta_0):
continue
# check if the distribution type is supported
family = loss_params_ds.get('Family', np.nan)
if (not pd.isna(family)) and (
family not in {'normal', 'lognormal', 'deterministic'}
):
msg = (
f'Loss Distribution of type {family} ' f'not supported.'
)
raise ValueError(msg)
# If theta_0 is a scalar
try:
theta_0 = float(theta_0)
if pd.isna(loss_params_ds.get('Family', np.nan)):
# if theta_0 is constant, then use it directly
f_median = _prep_constant_median_DV(theta_0)
else:
# otherwise use a constant 1.0 as the median
# The random variable will be generated as a
# variation from this 1.0 and added in a later step.
f_median = _prep_constant_median_DV(1.0)
except ValueError:
# otherwise, use the multilinear function
all_vals = np.array(
[val.split(',') for val in theta_0.split('|')],
dtype=float,
)
medns = all_vals[0]
qnts = all_vals[1]
f_median = _prep_bounded_multilinear_median_DV(medns, qnts)
# get the corresponding aggregate damage quantities
# to consider economies of scale
if 'ds' in eco_qnt.columns.names:
avail_ds = eco_qnt.loc[
:, # type: ignore
loss_cmp_id,
].columns.unique(level=0)
if ds_id not in avail_ds:
continue
eco_qnt_i = eco_qnt.loc[
:, # type: ignore
(loss_cmp_id, ds_id),
].copy()
else:
eco_qnt_i = eco_qnt.loc[
:, # type: ignore
loss_cmp_id,
].copy()
if isinstance(eco_qnt_i, pd.Series):
eco_qnt_i = eco_qnt_i.to_frame()
eco_qnt_i.columns = ['X']
eco_qnt_i.columns.name = 'del'
# generate the median values for each realization
eco_qnt_i.loc[:, :] = f_median(eco_qnt_i.values) # type: ignore
sub_medians.append(eco_qnt_i)
ds_list.append(ds_id)
if len(ds_list) > 0:
# combine medians across damage states into one DF
median_list.append(pd.concat(sub_medians, axis=1, keys=ds_list))
cmp_list.append(loss_cmp_id)
if len(cmp_list) > 0:
# combine medians across components into one DF
result = pd.concat(median_list, axis=1, keys=cmp_list)
# remove the extra column header level
if 'del' in result.columns.names:
result.columns = result.columns.droplevel('del')
# name the remaining column header levels
if eco_qnt.columns.names == ['cmp', 'ds', 'loc']:
result.columns.names = ['cmp', 'ds', 'loc']
else:
result.columns.names = ['cmp', 'ds']
# save the results to the returned dictionary
medians.update({decision_variable: result})
return medians
[docs]
class RepairModel_LF(RepairModel_Base):
"""Repair consequences for components with loss functions."""
__slots__ = []
[docs]
def calculate(
self,
demand_sample: pd.DataFrame,
cmp_sample: dict,
cmp_marginal_params: pd.DataFrame,
demand_offset: dict,
nondirectional_multipliers: dict,
) -> None:
"""
Calculate repair consequences.
Parameters
----------
demand_sample: pd.DataFrame
The sample of the demand model to be used for the inputs
of the loss functions.
cmp_sample: dict
Dict mapping each `cmp`-`loc`-`dir`-`uid` to the component
quantity realizations in the asset model in the form of
pd.Series objects.
cmp_marginal_params: pd.DataFrame
Dataframe containing component marginal distribution
parameters.
demand_offset: dict
Dictionary specifying the demand offset.
nondirectional_multipliers: dict
Dictionary specifying the non directional multipliers used
to combine the directional demands.
"""
if self.loss_params is None:
return
assert self.loss_map is not None
loss_map = self.loss_map['Repair'].to_dict()
sample_size = len(demand_sample)
# TODO(JVM): this can be taken out and simply passed as blocks in
# the arguments, and cast to a dict in here. Index can be
# obtained from there.
index = [
x
for x in cmp_marginal_params.index.get_level_values(0)
if loss_map.get(x) in self.loss_params.index
]
# If `Blocks` information is unspecified add one block per
# component.
if 'Blocks' not in cmp_marginal_params.columns:
cmp_marginal_params['Blocks'] = 1
blocks = cmp_marginal_params.loc[index, 'Blocks'].to_dict()
performance_group_dict = {}
for (component, location, direction, uid), num_blocks in blocks.items():
for decision_variable in self.decision_variables:
if (component, decision_variable) in self.missing:
continue
performance_group_dict[
(component, decision_variable), location, direction, uid
] = num_blocks
if not performance_group_dict:
self.log.msg(
'No loss function-driven components---LF sample is set to None.',
prepend_timestamp=False,
)
return
performance_group = pd.DataFrame( # type: ignore
performance_group_dict.values(),
index=performance_group_dict.keys(),
columns=['Blocks'],
)
performance_group.index.names = ['cmp', 'loc', 'dir', 'uid']
required_edps = base.invert_mapping(
_get_required_demand_type(
self.loss_params, performance_group, demand_offset
)
)
available_edps = (
pd.DataFrame(index=demand_sample.columns)
.reset_index()
.groupby(['type', 'loc'])['dir']
.agg(lambda x: list(set(x)))
.to_dict()
)
# Raise an error if demand sample is missing necessary entries.
_verify_edps_available(available_edps, set(required_edps.values()))
demand_dict = _assemble_required_demand_data(
set(required_edps.values()),
nondirectional_multipliers,
demand_sample,
)
self.log.msg(
'\nCalculating the median repair consequences...',
prepend_timestamp=False,
)
medians = self._calc_median_consequence(
performance_group, loss_map, required_edps, demand_dict, cmp_sample
)
self.log.msg(
'Successfully determined median repair consequences.',
prepend_timestamp=False,
)
self.log.msg(
'\nConsidering deviations from the median values to obtain '
'random DV sample...'
)
self.log.msg(
'Preparing random variables for repair cost and time...',
prepend_timestamp=False,
)
rv_reg = self._create_DV_RVs(medians.columns) # type: ignore
if rv_reg is not None:
assert self._asmnt.options.sampling_method is not None
rv_reg.generate_sample(
sample_size=sample_size, method=self._asmnt.options.sampling_method
)
std_sample = base.convert_to_MultiIndex(
pd.DataFrame(rv_reg.RV_sample), axis=1
)
std_sample.columns.names = [
'dv',
'loss',
'dmg',
'loc',
'dir',
'uid',
'block',
]
std_sample = std_sample.sort_index(axis=1)
sample = (medians * std_sample).combine_first(medians)
else:
sample = medians
self.log.msg(
f'\nSuccessfully generated {sample_size} realizations of '
'deviation from the median consequences.',
prepend_timestamp=False,
)
# sum up the block losses
sample = sample.groupby( # type: ignore
by=['dv', 'loss', 'dmg', 'loc', 'dir', 'uid'], axis=1
).sum()
self.log.msg('Successfully obtained DV sample.', prepend_timestamp=False)
self.sample = sample
return
[docs]
def convert_loss_parameter_units(self) -> None:
"""Convert previously loaded loss parameters to base units."""
if self.loss_params is None:
return
units = self.loss_params['DV', 'Unit']
arg_units = self.loss_params['Demand', 'Unit']
column = 'LossFunction'
params = self.loss_params[column].copy()
assert isinstance(params, pd.DataFrame)
self.loss_params.loc[:, column] = self._convert_marginal_params(
params,
units,
arg_units,
divide_units=False,
).to_numpy()
return
def _calc_median_consequence(
self,
performance_group: pd.DataFrame,
loss_map: dict,
required_edps: dict,
demand_dict: dict,
cmp_sample: dict,
) -> pd.DataFrame:
"""
Calculate median repair consequences.
Calculates the median repair consequences for each loss
function-driven component based on its quantity realizations
and the associated loss parameters.
Parameters
----------
performance_group: pd.DataFrame
Dataframe with a single column `Blocks` containing an
integer for the number of blocks of each
(`cmp`-`decision_variable`)-`loc`-`dir`-`uid`.
loss_map: dict
Dictionary mpping component IDs, `cmp`, to their repair
consequences.
required_edps: dict
Dictionary mapping (`cmp`-`realization`)-`loc`-`dir`-`uid`
(each entry of the `performance_group`'s index) with the
EDP name (e.g., `PFA-1-1`) that should be used as its loss
function input.
demand_dict: dict
Dictionary mapping each EDP name to the values in the
demand sample in the form of numpy arrays.
cmp_sample: dict
Dict mapping each `cmp`-`loc`-`dir`-`uid` to the component
quantity realizations in the asset model in the form of
pd.Series objects.
Returns
-------
pd.DataFrame
Dataframe with medial loss for loss-function driven
components.
Raises
------
ValueError
If loss function interpolation fails.
"""
medians_dict = {}
# for each component in the asset model
component: str
decision_variable: str
location: str
direction: str
uid: str
blocks: int
for ( # type: ignore
(component, decision_variable),
location,
direction,
uid,
), blocks in performance_group['Blocks'].items():
consequence = loss_map[component]
edp = required_edps[
(consequence, decision_variable), location, direction, uid
]
edp_values = demand_dict[edp]
assert self.loss_params is not None
loss_function_str = self.loss_params.loc[
(component, decision_variable), ('LossFunction', 'Theta_0')
]
assert isinstance(loss_function_str, str)
try:
median_loss = base.stringterpolation(loss_function_str)(edp_values)
except ValueError as exc:
msg = (
f'Loss function interpolation for consequence '
f'`{consequence}-{decision_variable}` has failed. '
f'Ensure a sufficient interpolation domain '
f'for the X values (those after the `|` symbol) '
f'and verify the X-value and Y-value lengths match.'
)
raise ValueError(msg) from exc
for block in range(blocks):
medians_dict[
decision_variable,
consequence,
component,
location,
direction,
uid,
str(block),
] = (
median_loss
* cmp_sample[component, location, direction, uid].to_numpy()
/ float(blocks)
)
medians = pd.DataFrame(medians_dict)
medians.columns.names = ['dv', 'loss', 'dmg', 'loc', 'dir', 'uid', 'block']
return medians.sort_index(axis=1)
def _create_DV_RVs( # noqa: N802
self, cases: pd.MultiIndex
) -> uq.RandomVariableRegistry | None:
"""
Prepare the decision variable random variables.
Prepares the random variables associated with decision
variables, such as repair cost and time.
Parameters
----------
cases: MultiIndex
Index with `dv`-`loss`-`dmg`-`loc`-`dir`-`uid`
entries that identify the RVs we need for the
simulation (columns of the `medians` dataframe).
Returns
-------
RandomVariableRegistry or None
A RandomVariableRegistry containing all the generated
random variables necessary for the simulation. If no
random variables are generated (due to missing parameters
or conditions), returns None.
"""
rv_reg = uq.RandomVariableRegistry(self._asmnt.options.rng)
rv_count = 0
# for each component in the loss map
for (
decision_variable,
consequence,
component,
location,
direction,
uid,
block,
) in cases:
# load the corresponding parameters
assert self.loss_params is not None
parameters = self.loss_params.loc[(consequence, decision_variable), :]
if ('LossFunction', 'Family') not in parameters:
# Everything is deterministic, no need to create RVs.
continue
family = parameters.loc['LossFunction', 'Family']
theta = [
theta_value
for t_i in range(3)
if (theta_value := parameters.get(('LossFunction', f'Theta_{t_i}')))
is not None
]
# If there is no RV family we don't need an RV
if pd.isna(family):
continue
# Since the first parameter is controlled by a function,
# we use 1.0 in its place and will scale the results in a
# later step.
theta[0] = 1.0 # type: ignore
# assign RVs
rv_reg.add_RV(
uq.rv_class_map(family)( # type: ignore
name=(
f'{decision_variable}-{consequence}-'
f'{component}-{location}-{direction}-{uid}-{block}'
),
theta=np.array(theta),
truncation_limits=np.array([0.0, np.nan]),
)
)
rv_count += 1
# assign Time-Cost correlation whenever applicable
rho = self._asmnt.options.rho_cost_time
if rho != 0.0:
for rv_tag in rv_reg.RV:
if not rv_tag.startswith('Cost'):
continue
split = rv_tag.split('-')
consequence = split[1]
component = split[2]
location = split[3]
direction = split[4]
uid = split[5]
block = split[6]
time_rv_tag = rv_tag.replace('Cost', 'Time')
if time_rv_tag in rv_reg.RV:
rv_reg.add_RV_set(
uq.RandomVariableSet(
(
f'DV-{consequence}-{component}-'
f'{location}-{direction}-{uid}-{block}_set'
),
list(rv_reg.RVs([rv_tag, time_rv_tag]).values()),
np.array([[1.0, rho], [rho, 1.0]]),
)
)
self.log.msg(
f'\n{rv_count} random variables created.', prepend_timestamp=False
)
if rv_count > 0:
return rv_reg
return None
def _prep_constant_median_DV(median: float) -> Callable: # noqa: N802
"""
Return a constant median Decision Variable (DV) function.
Parameters
----------
median: float
The median DV for a consequence function with fixed median.
Returns
-------
callable
A function that returns the constant median DV for all component
quantities.
"""
def f(*args): # noqa: ANN002, ANN202, ARG001
return median
return f
def _prep_bounded_multilinear_median_DV( # noqa: N802
medians: np.ndarray, quantities: np.ndarray
) -> Callable:
"""
Return a bounded multilinear median Decision Variable (DV) function.
The median DV equals the min and max values when the quantity is
outside of the prescribed quantity bounds. When the quantity is within the
bounds, the returned median is calculated by linear interpolation.
Parameters
----------
medians: ndarray
Series of values that define the y coordinates of the multilinear DV
function.
quantities: ndarray
Series of values that define the component quantities corresponding to
the series of medians and serving as the x coordinates of the
multilinear DV function.
Returns
-------
callable
A function that returns the median DV given the quantity of damaged
components.
"""
def f(quantity): # noqa: ANN001, ANN202
if quantity is None:
msg = (
'A bounded linear median Decision Variable function called '
'without specifying the quantity of damaged components'
)
raise ValueError(msg)
q_array = np.asarray(quantity, dtype=np.float64)
# calculate the median consequence given the quantity of damaged
# components
return np.interp(q_array, quantities, medians)
return f
def _is_for_lf_model(data: pd.DataFrame) -> bool:
"""
Determine if the data are for the lf_model.
Parameters
----------
data: pd.DataFrame
Data to be checked.
Returns
-------
bool
Whether the data are for the lf_model.
"""
return 'LossFunction' in data.columns.get_level_values(0)
def _is_for_ds_model(data: pd.DataFrame) -> bool:
"""
Determine if the data are for the ds_model.
Parameters
----------
data: pd.DataFrame
Data to be checked.
Returns
-------
bool
Whether the data are for the ds_model.
"""
return 'DS1' in data.columns.get_level_values(0)