Source code for pelicun.uq

# -*- coding: utf-8 -*-
#
# Copyright (c) 2018 Leland Stanford Junior University
# Copyright (c) 2018 The Regents of the University of California
#
# This file is part of pelicun.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its contributors
# may be used to endorse or promote products derived from this software without
# specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# You should have received a copy of the BSD 3-Clause License along with
# pelicun. If not, see <http://www.opensource.org/licenses/>.
#
# Contributors:
# Adam Zsarnóczay

"""
This module defines constants, classes and methods for uncertainty
quantification in pelicun.

.. rubric:: Contents

.. autosummary::

    mvn_orthotope_density
    fit_distribution
    RandomVariable
    RandomVariableSet
    RandomVariableRegistry


"""

from .base import *

from scipy.stats import uniform, norm
from scipy.stats import multivariate_normal as mvn
from scipy.stats.mvn import mvndst
from scipy.linalg import cholesky, svd
from scipy.optimize import minimize

import warnings


[docs]def mvn_orthotope_density(mu, COV, lower=None, upper=None): """ Estimate the probability density within a hyperrectangle for an MVN distr. Use the method of Alan Genz (1992) to estimate the probability density of a multivariate normal distribution within an n-orthotope (i.e., hyperrectangle) defined by its lower and upper bounds. Limits can be relaxed in any direction by assigning infinite bounds (i.e. numpy.inf). Parameters ---------- mu: float scalar or ndarray Mean(s) of the non-truncated distribution. COV: float ndarray Covariance matrix of the non-truncated distribution lower: float vector, optional, default: None Lower bound(s) for the truncated distributions. A scalar value can be used for a univariate case, while a list of bounds is expected in multivariate cases. If the distribution is non-truncated from below in a subset of the dimensions, use either `None` or assign an infinite value (i.e. -numpy.inf) to those dimensions. upper: float vector, optional, default: None Upper bound(s) for the truncated distributions. A scalar value can be used for a univariate case, while a list of bounds is expected in multivariate cases. If the distribution is non-truncated from above in a subset of the dimensions, use either `None` or assign an infinite value (i.e. numpy.inf) to those dimensions. Returns ------- alpha: float Estimate of the probability density within the hyperrectangle eps_alpha: float Estimate of the error in alpha. """ # process the inputs and get the number of dimensions mu = np.atleast_1d(mu) COV = np.atleast_2d(COV) if mu.shape == (): mu = np.asarray([mu]) COV = np.asarray([COV]) else: COV = np.asarray(COV) sig = np.sqrt(np.diag(COV)) corr = COV / np.outer(sig, sig) ndim = mu.size if lower is None: lower = -np.ones(ndim) * np.inf else: lower = np.atleast_1d(lower) if upper is None: upper = np.ones(ndim) * np.inf else: upper = np.atleast_1d(upper) # replace None with np.inf lower[np.where(lower == None)[0]] = -np.inf lower = lower.astype(np.float64) upper[np.where(upper == None)[0]] = np.inf upper = upper.astype(np.float64) # standardize the truncation limits lower = (lower - mu) / sig upper = (upper - mu) / sig # prepare the flags for infinite bounds (these are needed for the mvndst # function) lowinf = np.isneginf(lower) uppinf = np.isposinf(upper) infin = 2.0 * np.ones(ndim) np.putmask(infin, lowinf, 0) np.putmask(infin, uppinf, 1) np.putmask(infin, lowinf * uppinf, -1) # prepare the correlation coefficients if ndim == 1: correl = 0 else: correl = corr[np.tril_indices(ndim, -1)] # estimate the density eps_alpha, alpha, __ = mvndst(lower, upper, infin, correl) return alpha, eps_alpha
def get_theta(params, inits, distribution): theta = np.zeros(inits.shape) for i, (params_i, inits_i) in enumerate(zip(params, inits)): if distribution in ['normal', 'lognormal']: sig = np.exp(np.log(inits_i[1]) + params_i[1]) mu = inits_i[0] + params_i[0] * sig theta[i, 0] = mu theta[i, 1] = sig return theta def _get_limit_probs(limits, distribution, theta): if distribution in ['normal', 'lognormal']: a, b = limits mu = theta[0] sig = theta[1] if a is not None: p_a = norm.cdf((a - mu) / sig) else: p_a = 0.0 if b is not None: p_b = norm.cdf((b - mu) / sig) else: p_b = 1.0 return p_a, p_b def _get_std_samples(samples, theta, tr_limits, distribution): ndims = samples.shape[0] std_samples = np.zeros(samples.shape) for i, (samples_i, theta_i, tr_lim_i) in enumerate( zip(samples, theta, tr_limits)): if distribution in ['normal', 'lognormal']: # consider truncation if needed p_a, p_b = _get_limit_probs(tr_lim_i, distribution, theta_i) uni_samples = (norm.cdf(samples_i, loc=theta_i[0], scale=theta_i[1]) - p_a) / (p_b - p_a) std_samples[i] = norm.ppf(uni_samples, loc=0., scale=1.) return std_samples def _get_std_corr_matrix(std_samples): n_dims, n_samples = std_samples.shape rho_hat = np.zeros((n_dims, n_dims)) np.fill_diagonal(rho_hat, 1.0) for dim_i in range(n_dims): for dim_j in np.arange(dim_i + 1, n_dims): rho_hat[dim_i, dim_j] = np.sum( std_samples[dim_i] * std_samples[dim_j]) / n_samples rho_hat[dim_j, dim_i] = rho_hat[dim_i, dim_j] # make sure rho_hat is positive semidefinite try: L = cholesky(rho_hat, lower=True) # if this works, we're good except: # otherwise, we can try to fix the matrix using SVD try: U, s, V = svd(rho_hat, ) except: return None S = np.diagflat(s) rho_hat = U @ S @ U.T np.fill_diagonal(rho_hat, 1.0) if ((np.max(rho_hat) > 1.0) or (np.min(rho_hat) < -1.0)): return None return rho_hat def _mvn_scale(x, rho): x = np.atleast_2d(x) n_dims = x.shape[1] rho_0 = np.zeros((n_dims, n_dims)) np.fill_diagonal(rho_0, 1) a = mvn.pdf(x, mean=np.zeros(n_dims), cov=rho_0) b = mvn.pdf(x, mean=np.zeros(n_dims), cov=rho) return b / a def _neg_log_likelihood(params, inits, bnd_lower, bnd_upper, samples, distribution, tr_limits, det_limits, censored_count, enforce_bounds=False): # First, check if the parameters are within the pre-defined bounds if enforce_bounds: if ((params > bnd_lower) & (params < bnd_upper)).all(0) == False: # if they are not, then return a large value to discourage the # optimization algorithm from going in that direction return 1e10 # If there is nan in params, return a large value if np.isnan(np.sum(params)): return 1e10 params = np.reshape(params, inits.shape) n_dims, n_samples = samples.shape theta = get_theta(params, inits, distribution) likelihoods = np.zeros(samples.shape) # calculate the marginal likelihoods for i, (theta_i, samples_i, tr_lim_i) in enumerate( zip(theta, samples, tr_limits)): # consider truncation if needed p_a, p_b = _get_limit_probs(tr_lim_i, distribution, theta_i) tr_alpha = p_b - p_a # this is the probability mass within the # truncation limits # calculate the likelihood for each available sample if distribution in ['normal', 'lognormal']: likelihoods[i] = norm.pdf(samples_i, loc=theta_i[0], scale=theta_i[1]) / tr_alpha # transform every sample into standard normal space and get the correlation # matrix std_samples = _get_std_samples(samples, theta, tr_limits, distribution) rho_hat = _get_std_corr_matrix(std_samples) if rho_hat is None: return 1e10 # likelihoods related to censoring need to be handled together if censored_count > 0: det_lower = np.zeros(n_dims) det_upper = np.zeros(n_dims) for i, (theta_i, tr_lim_i, det_lim_i) in enumerate( zip(theta, tr_limits, det_limits)): # also prepare the standardized detection limits p_a, p_b = _get_limit_probs(tr_lim_i, distribution, theta_i) p_l, p_u = _get_limit_probs(det_lim_i, distribution, theta_i) # rescale to consider truncation p_l, p_u = [np.min([np.max([lim, p_a]), p_b]) for lim in [p_l, p_u]] p_l, p_u = [(lim - p_a) / (p_b - p_a) for lim in [p_l, p_u]] det_lower[i], det_upper[i] = norm.ppf([p_l, p_u], loc=0., scale=1.) # get the likelihood of censoring a sample det_alpha, eps_alpha = mvn_orthotope_density(np.zeros(n_dims), rho_hat, det_lower, det_upper) # Make sure that det_alpha is estimated with sufficient accuracy if det_alpha <= 100. * eps_alpha: return 1e10 # make sure that the likelihood is a positive number cen_likelihood = max(1.0 - det_alpha, np.nextafter(0, 1)) else: # If the data is not censored, use 1.0 for cen_likelihood to get a # zero log-likelihood later. Note that although this is # theoretically not correct, it does not alter the solution and # it is numerically much more convenient than working around the # log of zero likelihood. cen_likelihood = 1.0 # flatten the likelihoods calculated in each dimension try: scale = _mvn_scale(std_samples.T, rho_hat) except: return 1e10 likelihoods = np.prod(likelihoods, axis=0) * scale # Zeros are a result of limited floating point precision. Replace them # with the smallest possible positive floating point number to # improve convergence. likelihoods = np.clip(likelihoods, a_min=np.nextafter(0, 1), a_max=None) # calculate the total negative log likelihood NLL = -(np.sum(np.log(likelihoods)) # from samples + censored_count * np.log(cen_likelihood)) # censoring influence # normalize the NLL with the sample count NLL = NLL / samples.size # print(theta[0], NLL) return NLL
[docs]def fit_distribution(raw_samples, distribution, truncation_limits=[None, None], censored_count=0, detection_limits=[None, None], multi_fit=False, alpha_lim=1e-4): """ Fit a distribution to samples using maximum likelihood estimation. The number of dimensions of the distribution are inferred from the shape of the sample data. Censoring is automatically considered if the number of censored samples and the corresponding detection limits are provided. Infinite or unspecified truncation limits lead to fitting a non-truncated distribution in that dimension. Parameters ---------- raw_samples: float ndarray Raw data that serves as the basis of estimation. The number of samples equals the number of columns and each row introduces a new feature. In other words: a list of sample lists is expected where each sample list is a collection of samples of one variable. distribution: {'normal', 'lognormal'} Defines the target probability distribution type. Different types of distributions can be mixed by providing a list rather than a single value. Each element of the list corresponds to one of the features in the raw_samples. truncation_limits: float ndarray, optional, default: [None, None] Lower and/or upper truncation limits for the specified distributions. A two-element vector can be used for a univariate case, while two lists of limits are expected in multivariate cases. If the distribution is non-truncated from one side in a subset of the dimensions, use either `None` or assign an infinite value (i.e. numpy.inf) to those dimensions. censored_count: int, optional, default: None The number of censored samples that are beyond the detection limits. All samples outside the detection limits are aggregated into one set. This works the same way in one and in multiple dimensions. Prescription of specific censored sample counts for sub-regions of the input space outside the detection limits is not supported. detection_limits: float ndarray, optional, default: [None, None] Lower and/or upper detection limits for the provided samples. A two-element vector can be used for a univariate case, while two lists of limits are expected in multivariate cases. If the data is not censored from one side in a subset of the dimensions, use either `None` or assign an infinite value (i.e. numpy.inf) to those dimensions. multi_fit: bool, optional, default: False If True, we attempt to fit a multivariate distribution to the samples. Otherwise, we fit each marginal univariate distribution independently and estimate the correlation matrix in the end based on the fitted marginals. Using multi_fit can be advantageous with censored data and if the correlation in the data is not Gaussian. It leads to substantially longer calculation time and does not always produce better results, especially when the number of dimensions is large. alpha_lim: float, optional, default:None Introduces a lower limit to the probability density within the n-orthotope defined by the truncation limits. Assigning a reasonable minimum (such as 1e-4) can be useful when the mean of the distribution is several standard deviations from the truncation limits and the sample size is small. Such cases without a limit often converge to distant means with inflated variances. Besides being incorrect estimates, those solutions only offer negligible reduction in the negative log likelihood, while making subsequent sampling of the truncated normal distribution very challenging. Returns ------- theta: float ndarray Estimates of the parameters of the fitted probability distribution in each dimension. The following parameters are returned for the supported distributions: normal - mean, standard deviation; lognormal - median, log standard deviation; Rho: float 2D ndarray, optional In the multivariate case, returns the estimate of the correlation matrix. """ samples = np.atleast_2d(raw_samples) tr_limits = np.atleast_2d(truncation_limits) det_limits = np.atleast_2d(detection_limits) n_dims, n_samples = samples.shape if (tr_limits.shape[0] == 1) and (samples.shape[0] != 1): tr_limits = np.tile(tr_limits[0], samples.shape[0] ).reshape([samples.shape[0], 2]) if (det_limits.shape[0] == 1) and (samples.shape[0] != 1): det_limits = np.tile(det_limits[0], samples.shape[0] ).reshape([samples.shape[0], 2]) # Convert samples to log space if the distribution is lognormal if distribution == 'lognormal': samples = np.log(samples) for dim in range(tr_limits.shape[0]): for var in range(tr_limits.shape[1]): if tr_limits[dim][var] is not None: tr_limits[dim][var] = np.log(tr_limits[dim][var]) for dim in range(det_limits.shape[0]): for var in range(det_limits.shape[1]): if det_limits[dim][var] is not None: det_limits[dim][var] = np.log(det_limits[dim][var]) # Define initial values of distribution parameters if distribution in ['normal', 'lognormal']: # use the first two moments for normal distribution mu_init = np.mean(samples, axis=1) # replace zero standard dev with negligible standard dev sig_init = np.std(samples, axis=1) sig_zero_id = np.where(sig_init == 0.0)[0] sig_init[sig_zero_id] = 1e-6 * np.abs(mu_init[sig_zero_id]) # prepare a vector of initial values # Note: The actual optimization uses zeros as initial parameters to # avoid bias from different scales. These initial values are sent to # the likelihood function and considered in there. inits = np.transpose([mu_init, sig_init]) # Define the bounds for each input (assuming standardized initials) # These bounds help avoid unrealistic results and improve the # convergence rate bnd_lower = np.array([[-10.0, -5.0] for t in range(n_dims)]) bnd_upper = np.array([[10.0, 5.0] for t in range(n_dims)]) bnd_lower = bnd_lower.flatten() bnd_upper = bnd_upper.flatten() #inits_0 = np.copy(inits) # There is nothing to gain from a time-consuming optimization if.. # the number of samples is too small if ((n_samples < 3) or # there are no truncation or detection limits involved (np.all(tr_limits == None) and np.all(det_limits == None))): # In this case, it is typically hard to improve on the method of # moments estimates for the parameters of the marginal distributions theta = inits # Otherwise, we run the optimization that aims to find the parameters that # maximize the likelihood of observing the samples else: # First, optimize for each marginal independently for dim in range(n_dims): inits_i = inits[dim:dim + 1] tr_limits_i = [None, None] for lim in range(2): if ((tr_limits[dim][lim] is None) and (det_limits[dim][lim] is not None)): tr_limits_i[lim] = det_limits[dim][lim] elif det_limits[dim][lim] is not None: tr_limits_i[lim] = np.max([tr_limits[dim][lim], det_limits[dim][lim]]) else: tr_limits_i[lim] = tr_limits[dim][lim] out_m_i = minimize(_neg_log_likelihood, np.zeros(inits[dim].size), args=(inits_i, bnd_lower[dim:dim + 1], bnd_upper[dim:dim + 1], samples[dim:dim + 1], distribution, [tr_limits_i, ], [None, None], 0, True,), method='BFGS', options=dict(maxiter=50) ) out = out_m_i.x.reshape(inits_i.shape) theta = get_theta(out, inits_i, distribution) inits[dim] = theta[0] # Second, if requested, we attempt the multivariate fitting using the # marginal results as initial parameters. if multi_fit or (censored_count > 0): out_m = minimize(_neg_log_likelihood, np.zeros(inits.size), args=(inits, bnd_lower, bnd_upper, samples, distribution, tr_limits, det_limits, censored_count, True,), method='BFGS', options=dict(maxiter=50) ) out = out_m.x.reshape(inits.shape) theta = get_theta(out, inits, distribution) else: theta = inits # Calculate rho in the standard normal space because we will generate new # samples using that type of correlation (i.e., Gaussian copula) std_samples = _get_std_samples(samples, theta, tr_limits, distribution) rho_hat = _get_std_corr_matrix(std_samples) if rho_hat is None: # If there is not enough data to produce a valid correlation matrix # estimate, we assume independence # TODO: provide a warning for the user rho_hat = np.zeros((n_dims, n_dims)) np.fill_diagonal(rho_hat, 1.0) # Convert mean back to linear space if the distribution is lognormal if distribution == 'lognormal': theta_mod = theta.T.copy() theta_mod[0] = np.exp(theta_mod[0]) theta = theta_mod.T #for val in list(zip(inits_0, theta)): # print(val) return theta, rho_hat
[docs]class RandomVariable(object): """ Description Parameters ---------- name: string A unique string that identifies the random variable. distribution: {'normal', 'lognormal', 'multinomial', 'custom', 'empirical', 'coupled_empirical', 'uniform'}, optional Defines the type of probability distribution for the random variable. theta: float scalar or ndarray, optional Set of parameters that define the cumulative distribution function of the variable given its distribution type. The following parameters are expected currently for the supported distribution types: normal - mean, standard deviation; lognormal - median, log standard deviation; uniform - a, b, the lower and upper bounds of the distribution; multinomial - likelihood of each unique event (the last event's likelihood is adjusted automatically to ensure the likelihoods sum up to one); custom - according to the custom expression provided; empirical and coupled_empirical - N/A. truncation_limits: float ndarray, optional Defines the [a,b] truncation limits for the distribution. Use None to assign no limit in one direction. bounded: float ndarray, optional Defines the [P_a, P_b] probability bounds for the distribution. Use None to assign no lower or upper bound. custom_expr: string, optional Provide an expression that is a Python syntax for a custom CDF. The controlling variable shall be "x" and the parameters shall be "p1", "p2", etc. anchor: RandomVariable, optional Anchors this to another variable. If the anchor is not None, this variable will be perfectly correlated with its anchor. Note that the attributes of this variable and its anchor do not have to be identical. """ def __init__(self, name, distribution, theta=None, truncation_limits=None, bounds=None, custom_expr=None, raw_samples=None, anchor=None): self.name = name if ((distribution not in ['empirical', 'coupled_empirical']) and (theta is None)): raise ValueError( f"A random variable that follows a {distribution} distribution " f"is characterized by a set of parameters (theta). The " f"parameters need to be provided when the RV is created." ) if distribution == 'multinomial': if np.sum(theta) > 1: raise ValueError( f"The set of p values provided for a multinomial " f"distribution shall sum up to less than or equal to 1.0. " f"The provided values sum up to {np.sum(theta)}. p = " f"{theta} ." ) # save the other parameters internally self._distribution = distribution self._theta = theta self._truncation_limits = truncation_limits self._bounds = bounds self._custom_expr = custom_expr self._raw_samples = np.atleast_1d(raw_samples) self._uni_samples = None self._RV_set = None if anchor == None: self._anchor = self else: self._anchor = anchor @property def distribution(self): """ Return the assigned probability distribution type. """ return self._distribution @property def theta(self): """ Return the assigned probability distribution parameters. """ return self._theta @theta.setter def theta(self, value): """ Assign an anchor to the random variable """ self._theta = value @property def truncation_limits(self): """ Return the assigned truncation limits. """ return self._truncation_limits @property def bounds(self): """ Return the assigned probability bounds. """ return self._bounds @property def custom_expr(self): """ Return the assigned custom expression for CDF. """ return self._custom_expr @property def RV_set(self): """ Return the RV_set this RV is a member of """ return self._RV_set @RV_set.setter def RV_set(self, value): """ Assign an RV_set to this RV """ self._RV_set = value @property def samples(self): """ Return the empirical or generated samples. """ return self._samples @property def samples_DF(self): """ Return the empirical or generated samples in a pandas Series. """ return self._samples_DF @samples.setter def samples(self, value): """ Assign samples to the random variable """ self._samples = value self._samples_DF = pd.Series(value) @property def uni_samples(self): """ Return the samples from the controlling uniform distribution. """ return self._anchor._uni_samples @uni_samples.setter def uni_samples(self, value): """ Assign the controlling samples to the random variable Parameters ---------- value: float ndarray An array of floating point values in the [0, 1] domain. """ self._uni_samples = value @property def anchor(self): """ Return the anchor of the variable (if any). """ return self._anchor @anchor.setter def anchor(self, value): """ Assign an anchor to the random variable """ self._anchor = value
[docs] def cdf(self, values): """ Returns the cdf at the given values """ result = None if self.distribution == 'normal': mu, sig = self.theta if self.truncation_limits is not None: a, b = self.truncation_limits if a is None: a = -np.inf if b is None: b = np.inf p_a, p_b = [norm.cdf((lim - mu) / sig) for lim in [a, b]] # cap the values at the truncation limits values = np.minimum(np.maximum(values, a), b) # get the cdf from a non-truncated normal p_vals = norm.cdf(values, loc=mu, scale=sig) # adjust for truncation result = (p_vals - p_a) / (p_b - p_a) else: result = norm.cdf(values, loc=mu, scale=sig) elif self.distribution == 'lognormal': theta, beta = self.theta if self.truncation_limits is not None: a, b = self.truncation_limits if a is None: a = np.nextafter(0, 1) if b is None: b = np.inf p_a, p_b = [norm.cdf((np.log(lim) - np.log(theta)) / beta) for lim in [a, b]] # cap the values at the truncation limits values = np.minimum(np.maximum(values, a), b) # get the cdf from a non-truncated lognormal p_vals = norm.cdf(np.log(values), loc=np.log(theta), scale=beta) # adjust for truncation result = (p_vals - p_a) / (p_b - p_a) else: values = np.maximum(values, np.nextafter(0, 1)) result = norm.cdf(np.log(values), loc=np.log(theta), scale=beta) elif self.distribution == 'uniform': a, b = self.theta if a is None: a = -np.inf if b is None: b = np.inf if self.truncation_limits is not None: a, b = self.truncation_limits result = uniform.cdf(values, loc=a, scale=b-a) return result
[docs] def inverse_transform(self, values): """ Uses inverse probability integral transformation on the provided values. """ result = None if self.distribution == 'normal': mu, sig = self.theta if self.truncation_limits is not None: a, b = self.truncation_limits if a is None: a = -np.inf if b is None: b = np.inf p_a, p_b = [norm.cdf((lim-mu)/sig) for lim in [a, b]] if p_b - p_a == 0: raise ValueError( "The probability mass within the truncation limits is " "too small and the truncated distribution cannot be " "sampled with sufficiently high accuracy. This is most " "probably due to incorrect truncation limits set for " "the distribution." ) result = norm.ppf(values * (p_b - p_a) + p_a, loc=mu, scale=sig) else: result = norm.ppf(values, loc=mu, scale=sig) elif self.distribution == 'lognormal': theta, beta = self.theta if self.truncation_limits is not None: a, b = self.truncation_limits if a is None: a = np.nextafter(0, 1) else: a = np.maximum(np.nextafter(0, 1), a) if b is None: b = np.inf p_a, p_b = [norm.cdf((np.log(lim) - np.log(theta)) / beta) for lim in [a, b]] result = np.exp( norm.ppf(values * (p_b - p_a) + p_a, loc=np.log(theta), scale=beta)) else: result = np.exp(norm.ppf(values, loc=np.log(theta), scale=beta)) elif self.distribution == 'uniform': a, b = self.theta if a is None: a = -np.inf if b is None: b = np.inf if self.truncation_limits is not None: a, b = self.truncation_limits result = uniform.ppf(values, loc=a, scale=b-a) elif self.distribution == 'empirical': s_ids = (values * len(self._raw_samples)).astype(int) result = self._raw_samples[s_ids] elif self.distribution == 'coupled_empirical': raw_sample_count = len(self._raw_samples) new_sample_count = len(values) new_samples = np.tile(self._raw_samples, int(new_sample_count/raw_sample_count)+1) result = new_samples[:new_sample_count] elif self.distribution == 'multinomial': p_cum = np.cumsum(self.theta)[:-1] samples = values for i, p_i in enumerate(p_cum): samples[samples < p_i] = 10 + i samples[samples <= 1.0] = 10 + len(p_cum) result = samples - 10 return result
[docs] def inverse_transform_sampling(self): """ Creates samples using inverse probability integral transformation. """ self.samples = self.inverse_transform(self.uni_samples)
[docs]class RandomVariableSet(object): """ Description Parameters ---------- name: string A unique string that identifies the set of random variables. RV_list: list of RandomVariable Defines the random variables in the set Rho: float 2D ndarray Defines the correlation matrix that describes the correlation between the random variables in the set. Currently, only the Gaussian copula is supported. """ def __init__(self, name, RV_list, Rho): self.name = name if len(RV_list) > 1: # put the RVs in a dictionary for more efficient access reorder = np.argsort([RV.name for RV in RV_list]) self._variables = dict([(RV_list[i].name, RV_list[i]) for i in reorder]) # reorder the entries in the correlation matrix to correspond to the # sorted list of RVs self._Rho = np.asarray(Rho[(reorder)].T[(reorder)].T) else: # if there is only one variable (for testing, probably) self._variables = dict([(rv.name, rv) for rv in RV_list]) self._Rho = np.asarray(Rho) # assign this RV_set to the variables for __, var in self._variables.items(): var.RV_set = self @property def RV(self): """ Return the random variable(s) assigned to the set """ return self._variables @property def size(self): """ Return the size (i.e., number of variables in the) RV set """ return len(self._variables) @property def samples(self): """ Return the samples of the variables in the set """ return dict([(name, rv.samples) for name, rv in self._variables.items()])
[docs] def Rho(self, var_subset=None): """ Return the (subset of the) correlation matrix. """ if var_subset is None: return self._Rho else: var_ids = [list(self._variables.keys()).index(var_i) for var_i in var_subset] return (self._Rho[var_ids]).T[var_ids]
[docs] def apply_correlation(self): """ Apply correlation to n dimensional uniform samples. Currently, correlation is applied using a Gaussian copula. First, we try using Cholesky transformation. If the correlation matrix is not positive semidefinite and Cholesky fails, use SVD to apply the correlations while preserving as much as possible from the correlation matrix. """ U_RV = np.array([RV.uni_samples for RV_name, RV in self.RV.items()]) # First try doing the Cholesky transformation try: N_RV = norm.ppf(U_RV) L = cholesky(self._Rho, lower=True) NC_RV = L @ N_RV UC_RV = norm.cdf(NC_RV) except: # if the Cholesky doesn't work, we need to use the more # time-consuming but more robust approach based on SVD N_RV = norm.ppf(U_RV) U, s, __ = svd(self._Rho, ) S = np.diagflat(np.sqrt(s)) NC_RV = (N_RV.T @ S @ U.T).T UC_RV = norm.cdf(NC_RV) for (RV_name, RV), uc_RV in zip(self.RV.items(), UC_RV): RV.uni_samples = uc_RV
[docs] def orthotope_density(self, lower=None, upper=None, var_subset=None): """ Estimate the probability density within an orthotope for the RV set. Use the mvn_orthotope_density function in this module for the calculation. The distribution of individual RVs is not limited to the normal family. The provided limits are converted to the standard normal space that is the basis of all RVs in pelicun. Truncation limits and correlation (using Gaussian copula) are automatically taken into consideration. Parameters ---------- lower: float ndarray, optional, default: None Lower bound(s) of the orthotope. A scalar value can be used for a univariate RV; a list of bounds is expected in multivariate cases. If the orthotope is not bounded from below in a dimension, use 'None' to that dimension. upper: float ndarray, optional, default: None Upper bound(s) of the orthotope. A scalar value can be used for a univariate RV; a list of bounds is expected in multivariate cases. If the orthotope is not bounded from above in a dimension, use 'None' to that dimension. var_subset: list of strings, optional, default: None If provided, allows for selecting only a subset of the variables in the RV_set for the density calculation. Returns ------- alpha: float Estimate of the probability density within the orthotope. eps_alpha: float Estimate of the error in alpha. """ if lower is not None: target_shape = lower.shape elif upper is not None: target_shape = upper.shape else: return 1.0 lower_std = np.full(target_shape, None) upper_std = np.full(target_shape, None) # collect the variables involved if var_subset is None: vars = list(self._variables.keys()) else: vars = var_subset # first, convert limits to standard normal values for var_i, var_name in enumerate(vars): var = self._variables[var_name] if (lower is not None) and (lower[var_i] is not None): lower_std[var_i] = norm.ppf(var.cdf(lower[var_i]), loc=0, scale=1) if (upper is not None) and (upper[var_i] is not None): upper_std[var_i] = norm.ppf(var.cdf(upper[var_i]), loc=0, scale=1) # then calculate the orthotope results in std normal space lower_std = lower_std.T upper_std = upper_std.T OD = [mvn_orthotope_density(mu=np.zeros(len(vars)), COV=self.Rho(var_subset), lower=l_i, upper=u_i)[0] for l_i, u_i in zip(lower_std, upper_std)] return np.asarray(OD)
[docs]class RandomVariableRegistry(object): """ Description Parameters ---------- """ def __init__(self): self._variables = {} self._sets = {} @property def RV(self): """ Return all random variable(s) in the registry """ return self._variables
[docs] def RVs(self, keys): """ Return a subset of the random variables in the registry """ return {name:self._variables[name] for name in keys}
[docs] def add_RV(self, RV): """ Add a new random variable to the registry. """ self._variables.update({RV.name: RV})
@property def RV_set(self): """ Return the random variable set(s) in the registry. """ return self._sets
[docs] def add_RV_set(self, RV_set): """ Add a new set of random variables to the registry """ self._sets.update({RV_set.name: RV_set})
@property def RV_samples(self): """ Return the samples for every random variable in the registry """ return dict([(name, rv.samples) for name,rv in self.RV.items()])
[docs] def generate_samples(self, sample_size, method='LHS_midpoint', seed=None): """ Generates samples for all variables in the registry. Parameters ---------- sample_size: int The number of samples requested per variable. method: {'random', 'LHS', 'LHS_midpoint'}, optional The sample generation method to use. 'random' stands for conventional random sampling; 'LHS' is Latin HyperCube Sampling with random sample location within each bin of the hypercube; 'LHS_midpoint' is like LHS, but the samples are assigned to the midpoints of the hypercube bins. seed: int, optional Random seed used for sampling. """ # Initialize the random number generator rng = np.random.default_rng(seed) # Generate a dictionary with IDs of the free (non-anchored) variables RV_list = [RV_name for RV_name, RV in self.RV.items() if RV.anchor == RV] RV_ID = dict([(RV_name, ID) for ID, RV_name in enumerate(RV_list)]) RV_count = len(RV_ID) # Generate controlling samples from a uniform distribution for free RVs if 'LHS' in method: bin_low = np.array([rng.permutation(sample_size) for i in range(RV_count)]) if method == 'LHS_midpoint': U_RV = np.ones([RV_count, sample_size]) * 0.5 U_RV = (bin_low + U_RV) / sample_size elif method == 'LHS': U_RV = rng.random(size=[RV_count, sample_size]) U_RV = (bin_low + U_RV) / sample_size elif method == 'random': U_RV = rng.random(size=[RV_count, sample_size]) # Assign the controlling samples to the RVs for RV_name, RV_id in RV_ID.items(): self.RV[RV_name].uni_samples = U_RV[RV_id] # Apply correlations for the pre-defined RV sets for RV_set_name, RV_set in self.RV_set.items(): # prepare the correlated uniform distribution for the set RV_set.apply_correlation() # Convert from uniform to the target distribution for every RV for RV_name, RV in self.RV.items(): RV.inverse_transform_sampling()