#
# Copyright (c) 2018 Leland Stanford Junior University
# Copyright (c) 2018 The Regents of the University of California
#
# This file is part of pelicun.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its contributors
# may be used to endorse or promote products derived from this software without
# specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# You should have received a copy of the BSD 3-Clause License along with
# pelicun. If not, see <http://www.opensource.org/licenses/>.
"""Constants, classes and methods for uncertainty quantification."""
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING
import colorama
import numpy as np
import pandas as pd
from scipy.linalg import cholesky, svd # type: ignore
from scipy.optimize import minimize # type: ignore
from scipy.stats import multivariate_normal as mvn # type: ignore
from scipy.stats import norm, uniform, weibull_min # type: ignore
from scipy.stats._mvn import (
mvndst, # type: ignore # noqa: PLC2701
)
if TYPE_CHECKING:
from collections.abc import Callable
from pelicun.base import Logger
colorama.init()
FIRST_POSITIVE_NUMBER = np.nextafter(0, 1)
[docs]
def scale_distribution(
scale_factor: float,
family: str,
theta: np.ndarray,
truncation_limits: np.ndarray | None = None,
) -> tuple[np.ndarray, np.ndarray | None]:
"""
Scale parameters of a random distribution.
Parameters
----------
scale_factor: float
Value by which to scale the parameters.
family: {'normal' (or 'normal_cov'), 'normal_std', 'lognormal',
'uniform'}
Defines the type of probability distribution for the random
variable.
theta: float ndarray of length 2
Set of parameters that define the cumulative distribution
function of the variable given its distribution type. See the
expected parameters explained in the RandomVariable
class. Each parameter can be defined by one or more values. If
a set of values are provided for one parameter, they define
ordinates of a multilinear function that is used to get the
parameter values given an independent variable.
truncation_limits: float ndarray of length 2, default: None
Defines the [a,b] truncation limits for the distribution. Use
None to assign no limit in one direction.
Returns
-------
tuple
A tuple containing the scaled parameters and truncation
limits:
* theta_new (float ndarray of length 2): Scaled parameters of
the distribution.
* truncation_limits (float ndarray of length 2 or None):
Scaled truncation limits for the distribution, or None if no
truncation is applied.
Raises
------
ValueError
If the specified distribution family is unsupported.
"""
if truncation_limits is not None:
truncation_limits = truncation_limits.copy()
truncation_limits *= scale_factor
# undefined family is considered deterministic
if pd.isna(family):
family = 'deterministic'
theta_new = np.full_like(theta, np.nan)
if family == 'normal_std':
theta_new[0] = theta[0] * scale_factor # mean
theta_new[1] = theta[1] * scale_factor # STD
elif family in {'normal', 'normal_cov'}:
theta_new[0] = theta[0] * scale_factor
theta_new[1] = theta[1] # because it is CoV
elif family == 'lognormal':
theta_new[0] = theta[0] * scale_factor
theta_new[1] = theta[1] # because it is log std
elif family == 'uniform':
theta_new[0] = theta[0] * scale_factor
theta_new[1] = theta[1] * scale_factor
elif family in {'deterministic', 'multilinear_CDF'}:
theta_new[0] = theta[0] * scale_factor
else:
msg = f'Unsupported distribution: {family}'
raise ValueError(msg)
return theta_new, truncation_limits
[docs]
def mvn_orthotope_density(
mu: float | np.ndarray,
cov: np.ndarray,
lower: float | np.ndarray = np.nan,
upper: float | np.ndarray = np.nan,
) -> tuple[float, float]:
"""
Estimate the probability density within a hyperrectangle for an MVN distr.
Use the method of Alan Genz (1992) to estimate the probability density
of a multivariate normal distribution within an n-orthotope (i.e.,
hyperrectangle) defined by its lower and upper bounds. Limits can be
relaxed in any direction by assigning infinite bounds (i.e. numpy.inf).
Parameters
----------
mu: float scalar or ndarray
Mean(s) of the non-truncated distribution.
cov: float ndarray
Covariance matrix of the non-truncated distribution
lower: float vector, optional, default: np.nan
Lower bound(s) for the truncated distributions. A scalar value can be
used for a univariate case, while a list of bounds is expected in
multivariate cases. If the distribution is non-truncated from below
in a subset of the dimensions, use either `None` or assign an infinite
value (i.e. -numpy.inf) to those dimensions.
upper: float vector, optional, default: np.nan
Upper bound(s) for the truncated distributions. A scalar value can be
used for a univariate case, while a list of bounds is expected in
multivariate cases. If the distribution is non-truncated from above
in a subset of the dimensions, use either `None` or assign an infinite
value (i.e. numpy.inf) to those dimensions.
Returns
-------
tuple
alpha: float
Estimate of the probability density within the hyperrectangle.
eps_alpha: float
Estimate of the error in the calculated probability density.
"""
# process the inputs and get the number of dimensions
mu = np.atleast_1d(mu)
cov = np.atleast_2d(cov)
sig = np.sqrt(np.diag(cov))
corr = cov / np.outer(sig, sig)
ndim = mu.size
if np.all(np.isnan(lower)):
lower = -np.ones(ndim) * np.inf
else:
lower = np.atleast_1d(lower)
if np.all(np.isnan(upper)):
upper = np.ones(ndim) * np.inf
else:
upper = np.atleast_1d(upper)
# replace None with np.inf
lower[np.where(np.isnan(lower))[0]] = -np.inf
lower = lower.astype(np.float64)
upper[np.where(np.isnan(upper))[0]] = np.inf
upper = upper.astype(np.float64)
# standardize the truncation limits
lower = (lower - mu) / sig
upper = (upper - mu) / sig
# prepare the flags for infinite bounds (these are needed for the mvndst
# function)
lowinf = np.isneginf(lower)
uppinf = np.isposinf(upper)
infin = 2.0 * np.ones(ndim)
np.putmask(infin, lowinf, 0)
np.putmask(infin, uppinf, 1)
np.putmask(infin, lowinf * uppinf, -1)
# prepare the correlation coefficients
correl = np.array([0.0]) if ndim == 1 else corr[np.tril_indices(ndim, -1)]
# estimate the density
eps_alpha, alpha, _ = mvndst(lower, upper, infin, correl)
return alpha, eps_alpha
def _get_theta(
params: np.ndarray, inits: np.ndarray, dist_list: np.ndarray
) -> np.ndarray:
"""
Return the parameters of the target distributions.
Uses the parameter values from the optimization algorithm (that
are relative to the initial values) and the initial values to
transform them to the parameters of the target distributions.
Parameters
----------
params: float ndarray, Nx2
Numpy array containing the parameter values.
inits: float ndarray, Nx2
Numpy array containing the initial values.
dist_list: str ndarray
Array of strings containing the names of the distributions.
Returns
-------
theta: float ndarray
The estimated parameters.
Raises
------
ValueError
If any of the distributions is unsupported.
"""
theta = np.zeros(inits.shape)
for i, (params_i, inits_i, dist_i) in enumerate(zip(params, inits, dist_list)):
if dist_i in {'normal', 'normal_std', 'lognormal'}:
# Standard deviation is used directly for 'normal' and
# 'lognormal'
sig = (
np.exp(np.log(inits_i[1]) + params_i[1])
if dist_i == 'lognormal'
else inits_i[1] + params_i[1]
)
# The mean uses the standard transformation
mu = inits_i[0] + params_i[0]
theta[i, 0] = mu
theta[i, 1] = sig
elif dist_i == 'normal_cov':
# Note that the CoV is used for 'normal_cov'
sig = np.exp(np.log(inits_i[1]) + params_i[1])
# The mean uses the standard transformation
mu = inits_i[0] + params_i[0] * sig
theta[i, 0] = mu
theta[i, 1] = sig
else:
msg = f'Unsupported distribution: {dist_i}'
raise ValueError(msg)
return theta
def _get_limit_probs(
limits: np.ndarray, distribution: str, theta: np.ndarray
) -> tuple[float, float]:
"""
Get the CDF value at the specified limits.
Parameters
----------
limits: float ndarray
The limits on which to return the CDF value.
distribution: str
The distribution to be used.
theta: float ndarray
The parameters of the specified distribution.
Returns
-------
tuple
The CDF values.
Raises
------
ValueError
If any of the distributions is unsupported.
"""
if distribution in {'normal', 'normal_std', 'normal_cov', 'lognormal'}:
a, b = limits
mu = theta[0]
sig = theta[1] if distribution != 'normal_COV' else np.abs(mu) * theta[1]
p_a = 0.0 if np.isnan(a) else norm.cdf((a - mu) / sig)
p_b = 1.0 if np.isnan(b) else norm.cdf((b - mu) / sig)
else:
msg = f'Unsupported distribution: {distribution}'
raise ValueError(msg)
return p_a, p_b
def _get_std_samples(
samples: np.ndarray,
theta: np.ndarray,
tr_limits: np.ndarray,
dist_list: np.ndarray,
) -> np.ndarray:
"""
Transform samples to standard normal space.
Parameters
----------
samples: float ndarray DxN
2D array of samples. Each row represents a sample.
theta: float ndarray Dx2
2D array of theta values that represent each dimension of the
samples.
tr_limits: float ndarray Dx2
2D array with rows that represent [a, b] pairs of truncation
limits.
dist_list: str ndarray of length D
1D array containing the names of the distributions.
Returns
-------
ndarray
float DxN ndarray of the samples transformed to standard normal
space, with each row representing a transformed sample in
standard normal space.
Raises
------
ValueError
If any of the distributions is unsupported.
"""
std_samples = np.zeros(samples.shape)
for i, (samples_i, theta_i, tr_lim_i, dist_i) in enumerate(
zip(samples, theta, tr_limits, dist_list)
):
if dist_i in {'normal', 'normal_std', 'normal_cov', 'lognormal'}:
lim_low = tr_lim_i[0]
lim_high = tr_lim_i[1]
if (
True in (samples_i > lim_high).tolist()
or True in (samples_i < lim_low).tolist()
):
msg = (
'One or more sample values lie outside '
'of the specified truncation limits.'
)
raise ValueError(msg)
# first transform from normal to uniform
uni_sample = norm.cdf(samples_i, loc=theta_i[0], scale=theta_i[1])
# replace 0 and 1 values with the nearest float
uni_sample[uni_sample == 0] = FIRST_POSITIVE_NUMBER
uni_sample[uni_sample == 1] = np.nextafter(1, -1)
# consider truncation if needed
p_a, p_b = _get_limit_probs(tr_lim_i, dist_i, theta_i)
uni_sample = (uni_sample - p_a) / (p_b - p_a)
# then transform from uniform to standard normal
std_samples[i] = norm.ppf(uni_sample, loc=0.0, scale=1.0)
else:
msg = f'Unsupported distribution: {dist_i}'
raise ValueError(msg)
return std_samples
def _get_std_corr_matrix(std_samples: np.ndarray) -> np.ndarray | None:
"""
Estimate the correlation matrix.
Estimate the correlation matrix of the given standard normal
samples. Ensure that the correlation matrix is positive
semidefinite.
Parameters
----------
std_samples: float ndarray, DxN
Array containing the standard normal samples. Each column is a
sample. It should not contain Inf or NaN values.
Returns
-------
ndarray
Correlation matrix. float ndarray, DxD
Raises
------
ValueError
If any of the elements of std_samples is np.inf or np.nan
"""
if True in np.isinf(std_samples) or True in np.isnan(std_samples):
msg = 'std_samples array must not contain inf or NaN values'
raise ValueError(msg)
n_dims, n_samples = std_samples.shape
# initialize the correlation matrix estimate
rho_hat = np.zeros((n_dims, n_dims))
np.fill_diagonal(rho_hat, 1.0)
# take advantage of having standard normal samples
for dim_i in range(n_dims):
for dim_j in np.arange(dim_i + 1, n_dims):
rho_hat[dim_i, dim_j] = (
np.sum(std_samples[dim_i] * std_samples[dim_j]) / n_samples
)
rho_hat[dim_j, dim_i] = rho_hat[dim_i, dim_j]
# make sure rho_hat is positive semidefinite
try:
cholesky(rho_hat, lower=True) # if this works, we're good
# otherwise, we can try to fix the matrix using SVD
except np.linalg.LinAlgError:
try:
u_matrix, s_vector, _ = svd(
rho_hat,
)
except np.linalg.LinAlgError:
# if this also fails, we give up
return None
s_diag = np.diagflat(s_vector)
rho_hat = u_matrix @ s_diag @ u_matrix.T
np.fill_diagonal(rho_hat, 1.0)
# check if we introduced any unreasonable values
vmax = 1.01
vmin = -1.01
if (np.max(rho_hat) > vmax) or (np.min(rho_hat) < vmin):
return None
# round values to 1.0 and -1.0, if needed
if np.max(rho_hat) > 1.0:
rho_hat /= np.max(rho_hat)
if np.min(rho_hat) < -1.0:
rho_hat /= np.abs(np.min(rho_hat))
return rho_hat
def _mvn_scale(x: np.ndarray, rho: np.ndarray) -> np.ndarray:
"""
Scaling utility function.
Parameters
----------
x: ndarray
Input array
rho: ndarray
Covariance matrix
Returns
-------
ndarray
Scaled values
"""
x = np.atleast_2d(x)
n_dims = x.shape[1]
# create an uncorrelated covariance matrix
rho_0 = np.eye(n_dims, n_dims)
a = mvn.pdf(x, mean=np.zeros(n_dims), cov=rho_0)
a[a < FIRST_POSITIVE_NUMBER] = FIRST_POSITIVE_NUMBER
b = mvn.pdf(x, mean=np.zeros(n_dims), cov=rho)
return b / a
def _neg_log_likelihood( # noqa: C901
params: np.ndarray,
inits: np.ndarray,
bnd_lower: np.ndarray,
bnd_upper: np.ndarray,
samples: np.ndarray,
dist_list: np.ndarray,
tr_limits: np.ndarray,
det_limits: list[np.ndarray],
censored_count: int,
enforce_bounds: bool = False, # noqa: FBT001, FBT002
) -> float:
"""
Calculate negative log likelihood.
Calculate the negative log likelihood of the given data samples
given the parameter values and distribution information.
This function is used as an objective function in optimization
algorithms to estimate the parameters of the distribution of the
input data.
Parameters
----------
params: ndarray
1D array with the parameter values to be assessed.
inits: ndarray
1D array with the initial estimates for the distribution
parameters.
bnd_lower: ndarray
1D array with the lower bounds for the distribution
parameters.
bnd_upper: ndarray
1D array with the upper bounds for the distribution
parameters.
samples: ndarray
2D array with the data samples. Each column corresponds to a
different random variable.
dist_list: str ndarray of length D
1D array containing the names of the distributions
tr_limits: float ndarray Dx2
2D array with rows that represent [a, b] pairs of truncation
limits.
det_limits: list
List with the detection limits for each random variable.
censored_count: int
Number of censored samples in the data.
enforce_bounds: bool, optional
If True, the parameters are only considered valid if they are
within the bounds defined by bnd_lower and bnd_upper. The
default value is False.
Returns
-------
float
The negative log likelihood of the data given the distribution parameters.
"""
# First, check if the parameters are within the pre-defined bounds
# TODO(AZ): check if it is more efficient to use a bounded
# minimization algo
if enforce_bounds:
if not ((params > bnd_lower) & (params < bnd_upper)).all(0):
# if they are not, then return a large value to discourage the
# optimization algorithm from going in that direction
return 1e10
# If there is nan in params, return a large value
if np.isnan(np.sum(params)):
return 1e10
params = np.reshape(params, inits.shape)
n_dims, _ = samples.shape
theta = _get_theta(params, inits, dist_list)
likelihoods = np.zeros(samples.shape)
# calculate the marginal likelihoods
for i, (theta_i, samples_i, tr_lim_i, dist_i) in enumerate(
zip(theta, samples, tr_limits, dist_list)
):
# consider truncation if needed
p_a, p_b = _get_limit_probs(tr_lim_i, dist_i, theta_i)
# this is the probability mass within the
# truncation limits
tr_alpha = p_b - p_a
# Calculate the likelihood for each available sample
# Note that we are performing this without any transformation to be able
# to respect truncation limits
if dist_i in {'normal', 'normal_std', 'normal_cov', 'lognormal'}:
likelihoods[i] = (
norm.pdf(samples_i, loc=theta_i[0], scale=theta_i[1]) / tr_alpha
)
# transform every sample into standard normal space
std_samples = _get_std_samples(samples, theta, tr_limits, dist_list)
# if the problem is more than one dimensional, get the correlation matrix
if n_dims > 1:
rho_hat = _get_std_corr_matrix(std_samples)
if rho_hat is None:
return 1e10
else:
rho_hat = np.atleast_2d([1.0])
# likelihoods related to censoring need to be handled together
if censored_count > 0:
det_lower = np.zeros(n_dims)
det_upper = np.zeros(n_dims)
for i, (theta_i, tr_lim_i, det_lim_i, dist_i) in enumerate(
zip(theta, tr_limits, det_limits, dist_list)
):
# prepare the standardized truncation and detection limits
p_a, p_b = _get_limit_probs(tr_lim_i, dist_i, theta_i)
p_l, p_u = _get_limit_probs(det_lim_i, dist_i, theta_i)
# rescale detection limits to consider truncation
p_l, p_u = (np.min([np.max([lim, p_a]), p_b]) for lim in (p_l, p_u))
p_l, p_u = ((lim - p_a) / (p_b - p_a) for lim in (p_l, p_u))
# transform limits to standard normal space
det_lower[i], det_upper[i] = norm.ppf([p_l, p_u], loc=0.0, scale=1.0)
# get the likelihood of getting a non-censored sample given the
# detection limits and the correlation matrix
det_alpha, eps_alpha = mvn_orthotope_density(
np.zeros(n_dims), rho_hat, det_lower, det_upper
)
# Make sure det_alpha is estimated with sufficient accuracy
if det_alpha <= 100.0 * eps_alpha:
return 1e10
# make sure that the likelihood of censoring a sample is positive
cen_likelihood = max(1.0 - det_alpha, FIRST_POSITIVE_NUMBER)
else:
# If the data is not censored, use 1.0 for cen_likelihood to get a
# zero log-likelihood later. Note that although this is
# theoretically not correct, it does not alter the solution and
# it is numerically much more convenient than working around the
# log of zero likelihood.
cen_likelihood = 1.0
# take the product of likelihoods calculated in each dimension
scale = _mvn_scale(std_samples.T, rho_hat)
# TODO(AZ): We can almost surely replace the product of likelihoods
# with a call to mvn()
likelihoods = np.prod(likelihoods, axis=0) * scale
# Zeros are a result of limited floating point precision. Replace them
# with the smallest possible positive floating point number to
# improve convergence.
likelihoods = np.clip(likelihoods, a_min=FIRST_POSITIVE_NUMBER, a_max=None)
# calculate the total negative log likelihood
negative_log_likelihood = -(
np.sum(np.log(likelihoods)) # from samples
+ censored_count * np.log(cen_likelihood)
) # censoring influence
# print(theta[0], params, NLL)
# normalize the NLL with the sample count
return negative_log_likelihood / samples.size
[docs]
def fit_distribution_to_sample( # noqa: C901
raw_sample: np.ndarray,
distribution: str | list[str],
truncation_limits: tuple[float, float] = (np.nan, np.nan),
censored_count: int = 0,
detection_limits: tuple[float, float] = (np.nan, np.nan),
*,
multi_fit: bool = False,
logger_object: Logger | None = None,
) -> tuple[np.ndarray, np.ndarray]:
"""
Fit a distribution to sample using maximum likelihood estimation.
The number of dimensions of the distribution are inferred from the
shape of the sample data. Censoring is automatically considered if the
number of censored samples and the corresponding detection limits are
provided. Infinite or unspecified truncation limits lead to fitting a
non-truncated distribution in that dimension.
Parameters
----------
raw_sample: float ndarray
Raw data that serves as the basis of estimation. The number of samples
equals the number of columns and each row introduces a new feature. In
other words: a list of sample lists is expected where each sample list
is a collection of samples of one variable.
distribution: {'normal', 'lognormal'}
Defines the target probability distribution type. Different types of
distributions can be mixed by providing a list rather than a single
value. Each element of the list corresponds to one of the features in
the raw_sample.
truncation_limits: float ndarray, optional, default: [None, None]
Lower and/or upper truncation limits for the specified distributions.
A two-element vector can be used for a univariate case, while two lists
of limits are expected in multivariate cases. If the distribution is
non-truncated from one side in a subset of the dimensions, use either
`None` or assign an infinite value (i.e. numpy.inf) to those dimensions.
censored_count: int, optional, default: None
The number of censored samples that are beyond the detection limits.
All samples outside the detection limits are aggregated into one set.
This works the same way in one and in multiple dimensions. Prescription
of specific censored sample counts for sub-regions of the input space
outside the detection limits is not supported.
detection_limits: float ndarray, optional, default: [None, None]
Lower and/or upper detection limits for the provided samples. A
two-element vector can be used for a univariate case, while two lists
of limits are expected in multivariate cases. If the data is not
censored from one side in a subset of the dimensions, use either `None`
or assign an infinite value (i.e. numpy.inf) to those dimensions.
multi_fit: bool, optional, default: False
If True, we attempt to fit a multivariate distribution to the samples.
Otherwise, we fit each marginal univariate distribution independently
and estimate the correlation matrix in the end based on the fitted
marginals. Using multi_fit can be advantageous with censored data and
if the correlation in the data is not Gaussian. It leads to
substantially longer calculation time and does not always produce
better results, especially when the number of dimensions is large.
logger_object:
Logging object to be used. If no object is specified, no
logging is performed.
Returns
-------
tuple
theta: float ndarray
Estimates of the parameters of the fitted probability
distribution in each dimension. The following parameters
are returned for the supported distributions: normal,
normal_cov - mean, coefficient of variation; normal_std -
mean, standard deviation; lognormal - median, log standard
deviation;
Rho: float 2D ndarray, optional
In the multivariate case, returns the estimate of the
correlation matrix.
Raises
------
ValueError
If NaN values are produced during standard normal space transformation
"""
samples = np.atleast_2d(raw_sample)
tr_limits = np.atleast_2d(truncation_limits)
det_limits = np.atleast_2d(detection_limits)
dist_list = np.atleast_1d(distribution)
n_dims, n_samples = samples.shape
if (tr_limits.shape[0] == 1) and (n_dims != 1):
tr_limits = np.tile(tr_limits[0], (n_dims, 1))
if (det_limits.shape[0] == 1) and (n_dims != 1):
det_limits = np.tile(det_limits[0], (n_dims, 1))
if (dist_list.shape[0] == 1) and (n_dims != 1):
dist_list = np.tile(dist_list[0], n_dims)
# Convert samples and limits to log space if the distribution is lognormal
for d_i, distr in enumerate(dist_list):
if distr == 'lognormal':
samples[d_i] = np.log(samples[d_i])
for lim in range(2):
if not np.isnan(tr_limits[d_i][lim]):
tr_limits[d_i][lim] = np.log(tr_limits[d_i][lim])
for lim in range(2):
if not np.isnan(det_limits[d_i][lim]):
det_limits[d_i][lim] = np.log(det_limits[d_i][lim])
# Define initial values of distribution parameters
# Initialize arrays
mu_init = np.ones(n_dims) * np.nan
sig_init = np.ones_like(mu_init) * np.nan
for d_i, distr in enumerate(dist_list):
if distr in {'normal', 'normal_cov', 'normal_std', 'lognormal'}:
# use the first two moments
mu_init[d_i] = np.mean(samples[d_i])
if n_samples == 1:
sig_init[d_i] = 0.0
else:
sig_init[d_i] = np.std(samples[d_i])
# replace zero standard dev with negligible standard dev
sig_zero_id = np.where(sig_init == 0.0)[0]
sig_init[sig_zero_id] = (
1e-6 * np.abs(mu_init[sig_zero_id]) + FIRST_POSITIVE_NUMBER
)
# prepare a vector of initial values
# Note: The actual optimization uses zeros as initial parameters to
# avoid bias from different scales. These initial values are sent to
# the likelihood function and considered in there.
inits = np.transpose([mu_init, sig_init])
# Define the bounds for each input (assuming standardized initials)
# These bounds help avoid unrealistic results and improve the
# convergence rate
# Note that the standard deviation (2nd parameter) is fit in log space to
# facilitate fitting it within reasonable bounds.
bnd_lower = np.array([[-10.0, -5.0] for t in range(n_dims)])
bnd_upper = np.array([[10.0, 5.0] for t in range(n_dims)])
bnd_lower = bnd_lower.flatten()
bnd_upper = bnd_upper.flatten()
# There is nothing to gain from a time-consuming optimization if..
# the number of samples is too small
min_sample_size_for_optimization = 3
if (n_samples < min_sample_size_for_optimization) or (
# there are no truncation or detection limits involved
np.all(np.isnan(tr_limits)) and np.all(np.isnan(det_limits))
):
# In this case, it is typically hard to improve on the method of
# moments estimates for the parameters of the marginal distributions
theta = inits
# Otherwise, we run the optimization that aims to find the parameters that
# maximize the likelihood of observing the samples
else:
# First, optimize for each marginal independently
for dim in range(n_dims):
inits_i = inits[dim : dim + 1]
# Censored samples are only considered in the following step, but
# we fit a truncated distribution if there are censored samples to
# make it easier to fit the censored distribution later.
tr_limits_i = [np.nan, np.nan]
for lim in range(2):
if (np.isnan(tr_limits[dim][lim])) and (
not np.isnan(det_limits[dim][lim])
):
tr_limits_i[lim] = det_limits[dim][lim]
elif not np.isnan(det_limits[dim][lim]):
if lim == 0:
tr_limits_i[lim] = np.min(
[tr_limits[dim][lim], det_limits[dim][lim]]
)
elif lim == 1:
tr_limits_i[lim] = np.max(
[tr_limits[dim][lim], det_limits[dim][lim]]
)
else:
tr_limits_i[lim] = tr_limits[dim][lim]
out_m_i = minimize(
_neg_log_likelihood,
np.zeros(inits[dim].size),
args=(
inits_i,
bnd_lower[dim],
bnd_upper[dim],
samples[dim : dim + 1],
[
dist_list[dim],
],
[
tr_limits_i,
],
[np.nan, np.nan],
0,
True,
),
method='BFGS',
options={'maxiter': 50},
)
out = out_m_i.x.reshape(inits_i.shape)
theta = _get_theta(
out,
inits_i,
np.array([dist_list[dim]]),
)
inits[dim] = theta[0]
# Second, if multi_fit is requested or there are censored samples,
# we attempt the multivariate fitting using the marginal results as
# initial parameters.
if multi_fit or (censored_count > 0):
bnd_lower = bnd_lower.flatten()
bnd_upper = bnd_upper.flatten()
out_m = minimize(
_neg_log_likelihood,
np.zeros(inits.size),
args=(
inits,
bnd_lower,
bnd_upper,
samples,
dist_list,
tr_limits,
det_limits,
censored_count,
True,
),
method='BFGS',
options={'maxiter': 50},
)
out = out_m.x.reshape(inits.shape)
theta = _get_theta(out, inits, dist_list)
else:
theta = inits
# Calculate rho in the standard normal space because we will generate new
# samples using that type of correlation (i.e., Gaussian copula)
std_samples = _get_std_samples(samples, theta, tr_limits, dist_list)
if True in np.isnan(std_samples) or True in np.isinf(std_samples):
msg = (
'Something went wrong.'
'\n'
'Conversion to standard normal space was unsuccessful. \n'
'The given samples might deviate '
'substantially from the specified distribution.'
)
raise ValueError(msg)
rho_hat = _get_std_corr_matrix(std_samples)
if rho_hat is None:
# If there is not enough data to produce a valid correlation matrix
# estimate, we assume uncorrelated demands
rho_hat = np.zeros((n_dims, n_dims))
np.fill_diagonal(rho_hat, 1.0)
if logger_object:
logger_object.warning(
'Demand sample size too small to reliably estimate '
'the correlation matrix. Assuming uncorrelated demands.'
)
else:
print( # noqa: T201
'\nWARNING: Demand sample size '
'too small to reliably estimate '
'the correlation matrix. Assuming '
'uncorrelated demands.'
)
for d_i, distr in enumerate(dist_list):
# Convert mean back to linear space if the distribution is lognormal
if distr == 'lognormal':
theta[d_i][0] = np.exp(theta[d_i][0])
# theta_mod = theta.T.copy()
# theta_mod[0] = np.exp(theta_mod[0])
# theta = theta_mod.T
# Convert the std to cov if the distribution is normal_cov
elif distr in {'normal', 'normal_cov'}:
# replace standard deviation with coefficient of variation
# note: this results in cov=inf if the mean is zero.
almost_zero = 1.0e-40
if np.abs(theta[d_i][0]) < almost_zero:
theta[d_i][1] = np.inf
else:
theta[d_i][1] /= np.abs(theta[d_i][0])
return theta, rho_hat
def _OLS_percentiles( # noqa: N802
params: tuple[float, float], values: np.ndarray, perc: np.ndarray, family: str
) -> float:
"""
Estimate percentiles using ordinary least squares (OLS).
Parameters
----------
params: tuple of floats
The parameters of the selected distribution family.
values: float ndarray
The sample values for which the percentiles are requested.
perc: float ndarray
The requested percentile(s).
family: str
The distribution family to use for the percentile estimation.
Can be either 'normal' or 'lognormal'.
Returns
-------
float
The sum of the squared errors between the estimated and actual values.
Raises
------
ValueError
If `family` is not 'normal' or 'lognormal'.
"""
if family == 'normal':
theta_0 = params[0]
theta_1 = params[1]
if theta_1 <= 0:
return 1e10
val_hat = norm.ppf(perc, loc=theta_0, scale=theta_1)
elif family == 'lognormal':
theta_0 = params[0]
theta_1 = params[1]
if theta_0 <= 0:
return 1e10
if theta_1 <= 0:
return 1e10
val_hat = np.exp(norm.ppf(perc, loc=np.log(theta_0), scale=theta_1))
else:
msg = f'Distribution family not recognized: {family}'
raise ValueError(msg)
return np.sum((val_hat - values) ** 2.0)
[docs]
def fit_distribution_to_percentiles(
values: list[float], percentiles: list[float], families: list[str]
) -> tuple[str, list[float]]:
"""
Fit distribution to pre-defined values at a finite number of percentiles.
Parameters
----------
values: list of float
Pre-defined values at the given percentiles. At least two values are
expected.
percentiles: list of float
Percentiles where values are defined. At least two percentiles are
expected.
families: list of strings {'normal', 'lognormal'}
Defines the distribution family candidates.
Returns
-------
tuple
family: string
The optimal choice of family among the provided list of
families
theta: array of float
Parameters of the fitted distribution.
"""
out_list = []
percentiles_np = np.array(percentiles)
median_id = np.argmin(np.abs(percentiles_np - 0.5))
extreme_id = np.argmax(percentiles_np - 0.5)
for family in families:
inits = [
values[median_id],
]
if family == 'normal':
inits.append(
np.abs(values[extreme_id] - inits[0])
/ np.abs(norm.ppf(percentiles_np[extreme_id], loc=0, scale=1))
)
elif family == 'lognormal':
inits.append(
np.abs(np.log(values[extreme_id] / inits[0]))
/ np.abs(norm.ppf(percentiles_np[extreme_id], loc=0, scale=1))
)
out_list.append(
minimize(
_OLS_percentiles,
inits,
args=(values, percentiles_np, family),
method='BFGS',
)
)
best_out_id = np.argmin([out.fun for out in out_list])
return families[best_out_id], out_list[best_out_id].x
[docs]
class BaseRandomVariable(ABC): # noqa: B024
"""Base abstract class for different types of random variables."""
__slots__: list[str] = [
'RV_set',
'_sample',
'_sample_DF',
'_uni_sample',
'anchor',
'distribution',
'f_map',
'name',
]
def __init__(
self,
name: str,
f_map: Callable | None = None,
anchor: BaseRandomVariable | None = None,
) -> None:
"""
Instantiate a RandomVariable object.
Parameters
----------
name: string
A unique string that identifies the random variable.
f_map: function, optional
A user-defined function that is applied on the realizations before
returning a sample.
anchor: RandomVariable, optional
Anchors this to another variable. If the anchor is not None, this
variable will be perfectly correlated with its anchor. Note that
the attributes of this variable and its anchor do not have to be
identical.
"""
self.name = name
self.distribution: str | None = None
self.f_map = f_map
self._uni_sample: np.ndarray | None = None
self.RV_set: RandomVariableSet | None = None
self._sample_DF: pd.Series | None = None
self._sample: np.ndarray | None = None
if anchor is None:
self.anchor = self
else:
self.anchor = anchor
@property
def sample(self) -> np.ndarray | None:
"""
Return the empirical or generated sample.
Returns
-------
ndarray
The empirical or generated sample.
"""
if self.f_map is not None:
return self.f_map(self._sample)
return self._sample
@sample.setter
def sample(self, value: np.ndarray) -> None:
"""
Assign a sample to the random variable.
Parameters
----------
value: ndarray
Sample to assign
"""
self._sample = value
self._sample_DF = pd.Series(value)
@property
def sample_DF(self) -> pd.Series | None: # noqa: N802
"""
Return the empirical or generated sample in a pandas Series.
Returns
-------
ndarray
The empirical or generated sample in a pandas Series.
"""
if self.f_map is not None:
assert self._sample_DF is not None
return self._sample_DF.apply(self.f_map)
return self._sample_DF
@property
def uni_sample(self) -> np.ndarray | None:
"""
Return the sample from the controlling uniform distribution.
Returns
-------
ndarray
The sample from the controlling uniform distribution.
"""
if self.anchor is self:
return self._uni_sample
return self.anchor.uni_sample
@uni_sample.setter
def uni_sample(self, value: np.ndarray) -> None:
"""
Assign the controlling sample to the random variable.
Parameters
----------
value: float ndarray
An array of floating point values in the [0, 1] domain.
"""
self._uni_sample = value
[docs]
class RandomVariable(BaseRandomVariable):
"""Random variable that needs `values` in `inverse_transform`."""
__slots__: list[str] = ['theta', 'truncation_limits']
def __init__(
self,
name: str,
theta: np.ndarray,
truncation_limits: np.ndarray | None = None,
f_map: Callable | None = None,
anchor: BaseRandomVariable | None = None,
) -> None:
"""
Instantiate a normal random variable.
Parameters
----------
name: string
A unique string that identifies the random variable.
theta: float ndarray
Set of parameters that define the Cumulative Distribution
Function (CDF) of the variable: E.g., mean and coefficient
of variation. Actual parameters depend on the distribution.
A 1D `theta` array represents constant parameters and results
in realizations that are all from the same distribution.
A 2D `theta` array represents variable parameters, meaning
that each realization will be sampled from the distribution
family that the object represents, but with the parameters
set for that realization.
truncation_limits: float ndarray, optional
Defines the np.array((a, b)) truncation limits for the
distribution. Use np.nan to assign no limit in one direction,
like so: np.array((a, np.nan)), or np.array((np.nan, b)).
f_map: function, optional
A user-defined function that is applied on the realizations before
returning a sample.
anchor: RandomVariable, optional
Anchors this to another variable. If the anchor is not None, this
variable will be perfectly correlated with its anchor. Note that
the attributes of this variable and its anchor do not have to be
identical.
"""
if truncation_limits is None:
truncation_limits = np.array((np.nan, np.nan))
# For backwards compatibility, cast to a numpy array if
# given a tuple or list.
if isinstance(theta, (list, tuple)):
theta = np.array(theta)
if isinstance(truncation_limits, (list, tuple)):
truncation_limits = np.array(truncation_limits)
# Verify type
if theta is not None:
assert isinstance(
theta, np.ndarray
), 'Parameter `theta` should be a numpy array.'
assert theta.ndim in {
1,
2,
}, 'Parameter `theta` can only be a 1D or 2D array.'
theta = np.atleast_1d(theta)
assert isinstance(
truncation_limits, np.ndarray
), 'Parameter `truncation_limits` should be a numpy array.'
assert truncation_limits.ndim in {
1,
2,
}, 'Parameter `truncation_limits` can only be a 1D or 2D array.'
# 1D corresponds to constant parameters.
# 2D corresponds to variable parameters (different in each
# realization).
self.theta = theta
self.truncation_limits = truncation_limits
super().__init__(
name=name,
f_map=f_map,
anchor=anchor,
)
[docs]
def constant_parameters(self) -> bool:
"""
If the RV has constant or variable parameters.
Constant parameters are the same in each realization.
Returns
-------
bool
True if the parameters are constant, false otherwise.
"""
if self.theta is None:
return True
assert self.theta.ndim in {1, 2}
return self.theta.ndim == 1
def _prepare_theta_and_truncation_limit_arrays(
self, values: np.ndarray
) -> tuple[np.ndarray, np.ndarray]:
"""
Prepare the `theta` and `truncation_limits` arrays.
Prepare the `theta` and `truncation_limits` arrays for use in
calculations. This method adjusts the shape and size of the
`theta` and `truncation_limits` attributes to ensure
compatibility with the provided `values` array. The
adjustments enable support for two approaches:
* Constant parameters: The parameters remain the same
across all realizations.
* Variable parameters: The parameters vary across
realizations.
Depending on whether the random variable uses constant or
variable parameters, the method ensures that the arrays are
correctly sized and broadcasted as needed.
Parameters
----------
values : np.ndarray
Array of values for which the `theta` and
`truncation_limits` need to be prepared. The size of
`values` determines how the attributes are adjusted.
Returns
-------
tuple
A tuple containing:
* `theta` (np.ndarray): Adjusted array of parameters.
* `truncation_limits` (np.ndarray): Adjusted array of
truncation limits.
Raises
------
ValueError
If the number of elements in `values` does not match the
number of rows of the `theta` attribute or if the
`truncation_limits` array is incompatible with the `theta`
array.
Notes
-----
The method ensures that `truncation_limits` are broadcasted to
match the shape of `theta` if needed. For constant parameters,
a single-row `theta` is expanded to a 2D array. For variable
parameters, the number of rows in `theta` must match the size
of `values`.
"""
theta = self.theta
truncation_limits = self.truncation_limits
assert truncation_limits is not None
if self.constant_parameters():
theta = np.atleast_2d(theta)
elif len(values) != theta.shape[0]:
msg = (
'Number of elements in `values` variable should '
'match the number of rows of the parameter '
'attribute `theta`.'
)
raise ValueError(msg)
# Broadcast truncation limits to match shape
truncation_limits = np.atleast_2d(truncation_limits)
assert truncation_limits is not None
if truncation_limits.shape != theta.shape:
# Number of rows should match
if truncation_limits.shape[1] != theta.shape[1]:
msg = 'Incompatible `truncation_limits` value.'
raise ValueError(msg)
truncation_limits = np.tile(truncation_limits, (theta.shape[0], 1))
return theta, truncation_limits
@staticmethod
def _ensure_positive_probability_difference(
p_b: np.ndarray, p_a: np.ndarray
) -> None:
"""
Ensure there is probability mass between the truncation limits.
Parameters
----------
p_b: float
The probability of not exceeding the upper truncation limit
based on the CDF of the random variable.
p_a: float
The probability of not exceeding the lower truncation limit
based on the CDF of the random variable.
Raises
------
ValueError
If a negative probability difference is found.
"""
if np.any((p_b - p_a) < FIRST_POSITIVE_NUMBER):
msg = (
'The probability mass within the truncation limits is '
'too small and the truncated distribution cannot be '
'sampled with sufficiently high accuracy. This is most '
'probably due to incorrect truncation limits set for '
'the distribution.'
)
raise ValueError(msg)
[docs]
class UtilityRandomVariable(BaseRandomVariable):
"""Random variable that needs `sample_size` in `inverse_transform`."""
__slots__: list[str] = []
@abstractmethod
def __init__(
self,
name: str,
f_map: Callable | None = None,
anchor: BaseRandomVariable | None = None,
) -> None:
"""
Instantiate a normal random variable.
Parameters
----------
name: string
A unique string that identifies the random variable.
f_map: function, optional
A user-defined function that is applied on the realizations before
returning a sample.
anchor: RandomVariable, optional
Anchors this to another variable. If the anchor is not None, this
variable will be perfectly correlated with its anchor. Note that
the attributes of this variable and its anchor do not have to be
identical.
"""
super().__init__(
name=name,
f_map=f_map,
anchor=anchor,
)
[docs]
class NormalRandomVariable(RandomVariable):
"""Normal random variable."""
__slots__: list[str] = []
def __init__(
self,
name: str,
theta: np.ndarray,
truncation_limits: np.ndarray | None = None,
f_map: Callable | None = None,
anchor: BaseRandomVariable | None = None,
) -> None:
"""Instantiate a Normal random variable."""
if truncation_limits is None:
truncation_limits = np.array((np.nan, np.nan))
super().__init__(
name=name,
theta=theta,
truncation_limits=truncation_limits,
f_map=f_map,
anchor=anchor,
)
self.distribution = 'normal'
[docs]
def cdf(self, values: np.ndarray) -> np.ndarray:
"""
Return the CDF at the given values.
Parameters
----------
values: 1D float ndarray
Values for which to evaluate the CDF
Returns
-------
ndarray
1D float ndarray containing CDF values
"""
theta, truncation_limits = self._prepare_theta_and_truncation_limit_arrays(
values
)
mu, sig = theta.T
if np.any(~np.isnan(self.truncation_limits)):
a, b = truncation_limits.T
# Replace NaN values
a = np.nan_to_num(a, nan=-np.inf)
b = np.nan_to_num(b, nan=np.inf)
p_a, p_b = (norm.cdf((lim - mu) / sig) for lim in (a, b))
self._ensure_positive_probability_difference(p_b, p_a)
# cap the values at the truncation limits
values = np.minimum(np.maximum(values, a), b)
# get the cdf from a non-truncated normal
p_vals = norm.cdf(values, loc=mu, scale=sig)
# adjust for truncation
result = (p_vals - p_a) / (p_b - p_a)
else:
result = norm.cdf(values, loc=mu, scale=sig)
return result
[docs]
class Normal_STD(NormalRandomVariable):
"""
Normal random variable with standard deviation.
This class represents a normal random variable defined by mean and
standard deviation.
"""
__slots__: list[str] = []
def __init__(
self,
name: str,
theta: np.ndarray,
truncation_limits: np.ndarray | None = None,
f_map: Callable | None = None,
anchor: BaseRandomVariable | None = None,
) -> None:
"""Instantiate a Normal_STD random variable."""
mean, std = theta[:2]
theta = np.array([mean, std])
super().__init__(name, theta, truncation_limits, f_map, anchor)
[docs]
class Normal_COV(NormalRandomVariable):
"""
Normal random variable with coefficient of variation.
This class represents a normal random variable defined by mean and
coefficient of variation.
"""
__slots__: list[str] = []
def __init__(
self,
name: str,
theta: np.ndarray,
truncation_limits: np.ndarray | None = None,
f_map: Callable | None = None,
anchor: BaseRandomVariable | None = None,
) -> None:
"""
Instantiate a Normal_COV random variable.
Raises
------
ValueError
If the specified mean is zero.
"""
mean, cov = theta[:2]
almost_zero = 1e-40
if np.abs(mean) < almost_zero:
msg = 'The mean of Normal_COV RVs cannot be zero.'
raise ValueError(msg)
std = mean * cov
theta = np.array([mean, std])
super().__init__(name, theta, truncation_limits, f_map, anchor)
[docs]
class LogNormalRandomVariable(RandomVariable):
"""Lognormal random variable."""
__slots__: list[str] = []
def __init__(
self,
name: str,
theta: np.ndarray,
truncation_limits: np.ndarray | None = None,
f_map: Callable | None = None,
anchor: BaseRandomVariable | None = None,
) -> None:
"""Instantiate a LogNormal random variable."""
if truncation_limits is None:
truncation_limits = np.array((np.nan, np.nan))
super().__init__(
name=name,
theta=theta,
truncation_limits=truncation_limits,
f_map=f_map,
anchor=anchor,
)
self.distribution = 'lognormal'
[docs]
def cdf(self, values: np.ndarray) -> np.ndarray:
"""
Return the CDF at the given values.
Parameters
----------
values: 1D float ndarray
Values for which to evaluate the CDF
Returns
-------
ndarray
1D float ndarray containing CDF values
"""
theta, truncation_limits = self._prepare_theta_and_truncation_limit_arrays(
values
)
theta, beta = theta.T
if np.any(~np.isnan(self.truncation_limits)):
a, b = truncation_limits.T
# Replace NaN values
a = np.nan_to_num(a, nan=FIRST_POSITIVE_NUMBER)
b = np.nan_to_num(b, nan=np.inf)
p_a, p_b = (
norm.cdf((np.log(lim) - np.log(theta)) / beta) for lim in (a, b)
)
self._ensure_positive_probability_difference(p_b, p_a)
# cap the values at the truncation limits
values = np.minimum(np.maximum(values, a), b)
# get the cdf from a non-truncated lognormal
p_vals = norm.cdf(np.log(values), loc=np.log(theta), scale=beta)
# adjust for truncation
result = (p_vals - p_a) / (p_b - p_a)
else:
values = np.maximum(values, FIRST_POSITIVE_NUMBER)
result = norm.cdf(np.log(values), loc=np.log(theta), scale=beta)
return result
[docs]
class WeibullRandomVariable(RandomVariable):
"""Weibull random variable."""
__slots__: list[str] = []
def __init__(
self,
name: str,
theta: np.ndarray,
truncation_limits: np.ndarray | None = None,
f_map: Callable | None = None,
anchor: BaseRandomVariable | None = None,
) -> None:
"""
Instantiate a Weibull random variable.
Raises
------
ValueError
If variable parameters are specified.
"""
if truncation_limits is None:
truncation_limits = np.array((np.nan, np.nan))
super().__init__(
name=name,
theta=theta,
truncation_limits=truncation_limits,
f_map=f_map,
anchor=anchor,
)
self.distribution = 'weibull'
if self.theta.ndim != 1:
msg = (
'Variable parameters are currently not supported for '
'Weibull random variables.'
)
raise ValueError(msg)
[docs]
def cdf(self, values: np.ndarray) -> np.ndarray:
"""
Return the CDF at the given values.
Parameters
----------
values: 1D float ndarray
Values for which to evaluate the CDF
Returns
-------
ndarray
1D float ndarray containing CDF values
"""
lambda_, kappa = self.theta[:2]
if np.any(~np.isnan(self.truncation_limits)):
a, b = self.truncation_limits
if np.isnan(a):
# Weibull is not defined for negative values
a = 0.0
if np.isnan(b):
b = np.inf
p_a, p_b = (weibull_min.cdf(lim, kappa, scale=lambda_) for lim in (a, b))
self._ensure_positive_probability_difference(p_b, p_a)
# cap the values at the truncation limits
values = np.minimum(np.maximum(values, a), b)
# get the cdf from a non-truncated weibull
p_vals = weibull_min.cdf(values, kappa, scale=lambda_)
# adjust for truncation
result = (p_vals - p_a) / (p_b - p_a)
else:
values = np.maximum(
values, 0.0
) # Weibull is not defined for negative values
result = weibull_min.cdf(values, kappa, scale=lambda_)
return result
[docs]
class MultilinearCDFRandomVariable(RandomVariable):
"""
Multilinear CDF random variable.
This RV is defined by specifying the points that define its
Cumulative Density Function (CDF), and linear interpolation
between them.
"""
__slots__: list[str] = []
def __init__(
self,
name: str,
theta: np.ndarray,
truncation_limits: np.ndarray | None = None,
f_map: Callable | None = None,
anchor: BaseRandomVariable | None = None,
) -> None:
"""
Instantiate a MultilinearCDF random variable.
Raises
------
ValueError
In case of incompatible input parameters.
NotImplementedError
If truncation limits are specified.
"""
if truncation_limits is None:
truncation_limits = np.array((np.nan, np.nan))
super().__init__(
name=name,
theta=theta,
truncation_limits=truncation_limits,
f_map=f_map,
anchor=anchor,
)
self.distribution = 'multilinear_CDF'
if not np.all(np.isnan(truncation_limits)):
msg = f'{self.distribution} RVs do not support truncation'
raise NotImplementedError(msg)
y_1 = theta[0, 1]
if y_1 != 0.00:
msg = 'For multilinear CDF random variables, y_1 should be set to 0.00'
raise ValueError(msg)
y_n = theta[-1, 1]
if y_n != 1.00:
msg = 'For multilinear CDF random variables, y_n should be set to 1.00'
raise ValueError(msg)
x_s = theta[:, 0]
if not np.array_equal(np.sort(x_s), x_s):
msg = (
'For multilinear CDF random variables, '
'Xs should be specified in ascending order'
)
raise ValueError(msg)
if np.any(np.isclose(np.diff(x_s), 0.00)):
msg = (
'For multilinear CDF random variables, '
'Xs should be specified in strictly ascending order'
)
raise ValueError(msg)
y_s = theta[:, 1]
if not np.array_equal(np.sort(y_s), y_s):
msg = (
'For multilinear CDF random variables, '
'Ys should be specified in ascending order'
)
raise ValueError(msg)
if np.any(np.isclose(np.diff(y_s), 0.00)):
msg = (
'For multilinear CDF random variables, '
'Ys should be specified in strictly ascending order'
)
raise ValueError(msg)
required_ndim = 2
assert self.theta.ndim == required_ndim, 'Invalid `theta` dimensions.'
[docs]
def cdf(self, values: np.ndarray) -> np.ndarray:
"""
Return the CDF at the given values.
Parameters
----------
values: 1D float ndarray
Values for which to evaluate the CDF
Returns
-------
ndarray
1D float ndarray containing CDF values
"""
x_i = [-np.inf] + [x[0] for x in self.theta] + [np.inf]
y_i = [0.00] + [x[1] for x in self.theta] + [1.00]
# Using Numpy's interp for linear interpolation
return np.interp(values, x_i, y_i, left=0.00, right=1.00)
[docs]
class EmpiricalRandomVariable(RandomVariable):
"""Empirical random variable."""
__slots__: list[str] = []
def __init__(
self,
name: str,
theta: np.ndarray,
truncation_limits: np.ndarray | None = None,
f_map: Callable | None = None,
anchor: BaseRandomVariable | None = None,
) -> None:
"""Instantiate an Empirical random variable."""
if truncation_limits is None:
truncation_limits = np.array((np.nan, np.nan))
theta = np.atleast_1d(theta)
super().__init__(
name=name,
theta=theta,
truncation_limits=truncation_limits,
f_map=f_map,
anchor=anchor,
)
self.distribution = 'empirical'
if not np.all(np.isnan(truncation_limits)):
msg = f'{self.distribution} RVs do not support truncation'
raise NotImplementedError(msg)
[docs]
class CoupledEmpiricalRandomVariable(UtilityRandomVariable):
"""Coupled empirical random variable."""
__slots__: list[str] = ['theta']
def __init__(
self,
name: str,
theta: np.ndarray,
truncation_limits: np.ndarray | None = None,
f_map: Callable | None = None,
anchor: BaseRandomVariable | None = None,
) -> None:
"""
Instantiate a coupled empirical random variable.
Parameters
----------
name: string
A unique string that identifies the random variable.
theta: 1D float ndarray
Samples from which to draw empirical realizations.
truncation_limits: 2D float ndarray
Not supported for CoupledEmpirical RVs.
Should be np.array((np.nan, np.nan))
f_map: function, optional
A user-defined function that is applied on the realizations before
returning a sample.
anchor: RandomVariable, optional
Anchors this to another variable. If the anchor is not None, this
variable will be perfectly correlated with its anchor. Note that
the attributes of this variable and its anchor do not have to be
identical.
Raises
------
NotImplementedError
When truncation limits are provided
"""
super().__init__(
name=name,
f_map=f_map,
anchor=anchor,
)
self.distribution = 'coupled_empirical'
if truncation_limits is not None:
msg = f'{self.distribution} RVs do not support truncation'
raise NotImplementedError(msg)
self.theta = np.atleast_1d(theta)
[docs]
class DeterministicRandomVariable(UtilityRandomVariable):
"""Deterministic random variable."""
__slots__: list[str] = ['theta']
def __init__(
self,
name: str,
theta: np.ndarray,
truncation_limits: np.ndarray | None = None,
f_map: Callable | None = None,
anchor: BaseRandomVariable | None = None,
) -> None:
"""
Instantiate a deterministic random variable.
This behaves like a RandomVariable object but represents a
specific, deterministic value.
Parameters
----------
name: string
A unique string that identifies the random variable.
theta: 1-element float ndarray
The value.
truncation_limits: 2D float ndarray
Not supported for Deterministic RVs.
Should be np.array((np.nan, np.nan))
f_map: function, optional
A user-defined function that is applied on the realizations before
returning a sample.
anchor: RandomVariable, optional
Anchors this to another variable. If the anchor is not None, this
variable will be perfectly correlated with its anchor. Note that
the attributes of this variable and its anchor do not have to be
identical.
Raises
------
NotImplementedError
When truncation limits are provided
"""
if truncation_limits is None:
truncation_limits = np.array((np.nan, np.nan))
super().__init__(
name=name,
f_map=f_map,
anchor=anchor,
)
self.distribution = 'deterministic'
if not np.all(np.isnan(truncation_limits)):
msg = f'{self.distribution} RVs do not support truncation'
raise NotImplementedError(msg)
self.theta = np.atleast_1d(theta)
[docs]
class MultinomialRandomVariable(RandomVariable):
"""Multinomial random variable."""
__slots__: list[str] = []
def __init__(
self,
name: str,
theta: np.ndarray,
truncation_limits: np.ndarray | None = None,
f_map: Callable | None = None,
anchor: BaseRandomVariable | None = None,
) -> None:
"""
Instantiate a Multinomial random variable.
Raises
------
ValueError
In case of incompatible input parameters.
NotImplementedError
If truncation limits are specified.
"""
if truncation_limits is None:
truncation_limits = np.array((np.nan, np.nan))
super().__init__(
name=name,
theta=theta,
truncation_limits=truncation_limits,
f_map=f_map,
anchor=anchor,
)
if not np.all(np.isnan(truncation_limits)):
msg = f'{self.distribution} RVs do not support truncation'
raise NotImplementedError(msg)
self.distribution = 'multinomial'
if np.sum(theta) > 1.00:
msg = (
f'The set of p values provided for a multinomial '
f'distribution shall sum up to less than or equal to 1.0. '
f'The provided values sum up to {np.sum(theta)}. p = '
f'{theta} .'
)
raise ValueError(msg)
[docs]
class RandomVariableSet:
"""
Random variable set.
Represents a set of random variables, each of which is described
by its own probability distribution. The set allows the user to
define correlations between the random variables, and provides
methods to sample from the correlated variables and estimate
various statistical properties of the set, such as the probability
density within a specified range or orthotope.
Parameters
----------
name: string
A unique string that identifies the set of random variables.
RV_list: list of RandomVariable
Defines the random variables in the set
Rho: float 2D ndarray
Defines the correlation matrix that describes the correlation between
the random variables in the set. Currently, only the Gaussian copula
is supported.
"""
__slots__: list[str] = ['_Rho', '_variables', 'name']
def __init__(
self, name: str, rv_list: list[BaseRandomVariable], rho: np.ndarray
) -> None:
"""Instantiate a random variable set."""
self.name = name
if len(rv_list) > 1:
# put the RVs in a dictionary for more efficient access
reorder = np.argsort([RV.name for RV in rv_list])
self._variables = {rv_list[i].name: rv_list[i] for i in reorder}
# reorder the entries in the correlation matrix to correspond to the
# sorted list of RVs
self._Rho = np.asarray(rho[(reorder)].T[(reorder)].T)
else: # if there is only one variable (for testing, probably)
self._variables = {rv.name: rv for rv in rv_list}
self._Rho = np.asarray(rho)
# assign this RV_set to the variables
for var in self._variables.values():
var.RV_set = self
@property
def RV(self) -> dict[str, RandomVariable]: # noqa: N802
"""
Returns the random variable(s) assigned to the set.
Returns
-------
ndarray
The random variable(s) assigned to the set.
"""
return self._variables
@property
def size(self) -> int:
"""
Returns the size (i.e., number of variables in the) RV set.
Returns
-------
ndarray
The size (i.e., number of variables in the) RV set.
"""
return len(self._variables)
@property
def sample(self) -> dict[str, np.ndarray | None]:
"""
Returns the sample of the variables in the set.
Returns
-------
ndarray
The sample of the variables in the set.
"""
return {name: rv.sample for name, rv in self._variables.items()}
[docs]
def Rho(self, var_subset: list[str] | None = None) -> np.ndarray: # noqa: N802
"""
Return the (subset of the) correlation matrix.
Returns
-------
ndarray
The (subset of the) correlation matrix.
"""
if var_subset is None:
return self._Rho
var_ids = [list(self._variables.keys()).index(var_i) for var_i in var_subset]
return (self._Rho[var_ids]).T[var_ids]
[docs]
def apply_correlation(self) -> None:
"""
Apply correlation to n dimensional uniform samples.
Currently, correlation is applied using a Gaussian copula. First, we
try using Cholesky transformation. If the correlation matrix is not
positive semidefinite and Cholesky fails, use SVD to apply the
correlations while preserving as much as possible from the correlation
matrix.
"""
u_rv = np.array([RV.uni_sample for RV_name, RV in self.RV.items()])
# First try doing the Cholesky transformation
try:
n_rv = norm.ppf(u_rv)
l_mat = cholesky(self._Rho, lower=True)
nc_rv = l_mat @ n_rv
uc_rv = norm.cdf(nc_rv)
except np.linalg.LinAlgError:
# if the Cholesky doesn't work, we need to use the more
# time-consuming but more robust approach based on SVD
n_rv = norm.ppf(u_rv)
u_mat, s_mat, _ = svd(
self._Rho,
)
s_diag = np.diagflat(np.sqrt(s_mat))
nc_rv = (n_rv.T @ s_diag @ u_mat.T).T
uc_rv = norm.cdf(nc_rv)
for rv, ucrv in zip(self.RV.values(), uc_rv):
rv.uni_sample = ucrv
[docs]
def orthotope_density(
self,
lower: np.ndarray | float = np.nan,
upper: np.ndarray | float = np.nan,
var_subset: list[str] | None = None,
) -> np.ndarray:
"""
Estimate the probability density within an orthotope for the RV set.
Use the mvn_orthotope_density function in this module for the
calculation. The distribution of individual RVs is not limited to the
normal family. The provided limits are converted to the standard normal
space that is the basis of all RVs in pelicun. Truncation limits and
correlation (using Gaussian copula) are automatically taken into
consideration.
Parameters
----------
lower: float ndarray, optional, default: np.nan
Lower bound(s) of the orthotope. A scalar value can be used for a
univariate RV; a list of bounds is expected in multivariate cases.
If the orthotope is not bounded from below in a dimension, use
'np.nan' to that dimension.
upper: float ndarray, optional, default: np.nan
Upper bound(s) of the orthotope. A scalar value can be used for a
univariate RV; a list of bounds is expected in multivariate cases.
If the orthotope is not bounded from above in a dimension, use
'np.nan' to that dimension.
var_subset: list of strings, optional, default: None
If provided, allows for selecting only a subset of the variables in
the RV_set for the density calculation.
Returns
-------
tuple
alpha: float
Estimate of the probability density within the orthotope.
eps_alpha: float
Estimate of the error in alpha.
"""
if isinstance(lower, float):
lower = np.array([lower])
if isinstance(upper, float):
upper = np.array([upper])
if np.any(~np.isnan(lower)):
target_shape = lower.shape
elif np.any(~np.isnan(upper)):
target_shape = upper.shape
else:
return np.array([1.0])
lower_std = np.full(target_shape, np.nan)
upper_std = np.full(target_shape, np.nan)
# collect the variables involved
if var_subset is None:
variables = list(self._variables.keys())
else:
variables = var_subset
# first, convert limits to standard normal values
for var_i, var_name in enumerate(variables):
var = self._variables[var_name]
if (np.any(~np.isnan(lower))) and (~np.isnan(lower[var_i])):
lower_std[var_i] = norm.ppf(var.cdf(lower[var_i]), loc=0, scale=1)
if (np.any(~np.isnan(upper))) and (~np.isnan(upper[var_i])):
upper_std[var_i] = norm.ppf(var.cdf(upper[var_i]), loc=0, scale=1)
# then calculate the orthotope results in std normal space
lower_std = lower_std.T
upper_std = upper_std.T
od = [
mvn_orthotope_density(
mu=np.zeros(len(variables)),
cov=self.Rho(var_subset),
lower=l_i,
upper=u_i,
)[0]
for l_i, u_i in zip(lower_std, upper_std)
]
return np.asarray(od)
[docs]
class RandomVariableRegistry:
"""Random variable registry."""
__slots__: list[str] = ['_rng', '_sets', '_variables']
def __init__(self, rng: np.random.Generator) -> None:
"""
Instantiate a random variable registry.
Parameters
----------
rng: numpy.random._generator.Generator
Random variable generator object.
e.g.: np.random.default_rng(seed).
"""
self._rng = rng
self._variables: dict[str, BaseRandomVariable] = {}
self._sets: dict[str, RandomVariableSet] = {}
@property
def RV(self) -> dict[str, BaseRandomVariable]: # noqa: N802
"""
Returns all random variable(s) in the registry.
Returns
-------
dict
all random variable(s) in the registry.
"""
return self._variables
[docs]
def RVs(self, keys: list[str]) -> dict[str, BaseRandomVariable]: # noqa: N802
"""
Return a subset of the random variables in the registry.
Parameters
----------
keys: list of str
Keys that define the subset.
Returns
-------
dict
A subset random variable(s) in the registry.
"""
return {name: self._variables[name] for name in keys}
[docs]
def add_RV(self, rv: BaseRandomVariable) -> None: # noqa: N802
"""
Add a new random variable to the registry.
Raises
------
ValueError
When the RV already exists in the registry
"""
if rv.name in self._variables:
msg = f'RV {rv.name} already exists in the registry.'
raise ValueError(msg)
self._variables.update({rv.name: rv})
@property
def RV_set(self) -> dict[str, RandomVariableSet]: # noqa: N802
"""
Return the random variable set(s) in the registry.
Returns
-------
dict
The random variable set(s) in the registry.
"""
return self._sets
[docs]
def add_RV_set(self, rv_set: RandomVariableSet) -> None: # noqa: N802
"""Add a new set of random variables to the registry."""
self._sets.update({rv_set.name: rv_set})
@property
def RV_sample(self) -> dict[str, np.ndarray | None]: # noqa: N802
"""
Return the sample for every random variable in the registry.
Returns
-------
dict
The sample for every random variable in the registry.
"""
return {name: rv.sample for name, rv in self.RV.items()}
[docs]
def generate_sample(self, sample_size: int, method: str) -> None:
"""
Generate samples for all variables in the registry.
Parameters
----------
sample_size: int
The number of samples requested per variable.
method: str
Can be any of: 'MonteCarlo', 'LHS', 'LHS_midpoint'
The sample generation method to use. 'MonteCarlo' stands for
conventional random sampling; 'LHS' is Latin HyperCube Sampling
with random sample location within each bin of the hypercube;
'LHS_midpoint' is like LHS, but the samples are assigned to the
midpoints of the hypercube bins.
Raises
------
NotImplementedError
When the RV parent class is Unknown
"""
# Generate a dictionary with IDs of the free (non-anchored and
# non-deterministic) variables
rv_list = [
RV_name
for RV_name, RV in self.RV.items()
if (
(RV.anchor == RV)
or (RV.distribution in {'deterministic', 'coupled_empirical'})
)
]
rv_id = {RV_name: ID for ID, RV_name in enumerate(rv_list)}
rv_count = len(rv_id)
# Generate controlling samples from a uniform distribution for free RVs
if 'LHS' in method:
bin_low = np.array(
[self._rng.permutation(sample_size) for i in range(rv_count)]
)
if method == 'LHS_midpoint':
u_rv = np.ones([rv_count, sample_size]) * 0.5
u_rv = (bin_low + u_rv) / sample_size
elif method == 'LHS':
u_rv = self._rng.random(size=[rv_count, sample_size])
u_rv = (bin_low + u_rv) / sample_size
elif method == 'MonteCarlo':
u_rv = self._rng.random(size=[rv_count, sample_size])
# Assign the controlling samples to the RVs
for rv_name, rvid in rv_id.items():
self.RV[rv_name].uni_sample = u_rv[rvid]
# Apply correlations for the pre-defined RV sets
for rv_set in self.RV_set.values():
# prepare the correlated uniform distribution for the set
rv_set.apply_correlation()
# Convert from uniform to the target distribution for every RV
for rv in self.RV.values():
if isinstance(rv, UtilityRandomVariable):
rv.inverse_transform_sampling(sample_size)
elif isinstance(rv, RandomVariable):
rv.inverse_transform_sampling()
else:
msg = 'Unknown RV parent class.'
raise NotImplementedError(msg)
[docs]
def rv_class_map(
distribution_name: str,
) -> type[RandomVariable | UtilityRandomVariable]:
"""
Map convenient distributions to their corresponding class.
Parameters
----------
distribution_name: str
The name of a distribution.
Returns
-------
type[RandomVariable | UtilityRandomVariable]
The class of the corresponding random variable.
Raises
------
ValueError
If the given distribution name does not correspond to a
distribution class.
"""
if pd.isna(distribution_name):
distribution_name = 'deterministic'
# Mapping for RandomVariable subclasses
random_variable_map: dict[str, type[RandomVariable]] = {
'normal': Normal_COV,
'normal_std': Normal_STD,
'normal_cov': Normal_COV,
'lognormal': LogNormalRandomVariable,
'uniform': UniformRandomVariable,
'weibull': WeibullRandomVariable,
'multilinear_CDF': MultilinearCDFRandomVariable,
'empirical': EmpiricalRandomVariable,
'multinomial': MultinomialRandomVariable,
}
# Mapping for UtilityRandomVariable subclasses
utility_random_variable_map: dict[str, type[UtilityRandomVariable]] = {
'coupled_empirical': CoupledEmpiricalRandomVariable,
'deterministic': DeterministicRandomVariable,
}
if distribution_name in random_variable_map:
return random_variable_map[distribution_name]
if distribution_name in utility_random_variable_map:
return utility_random_variable_map[distribution_name]
msg = f'Unsupported distribution: {distribution_name}'
raise ValueError(msg)