Source code for pelicun.tests.test_uq

# -*- coding: utf-8 -*-
#
# Copyright (c) 2018 Leland Stanford Junior University
# Copyright (c) 2018 The Regents of the University of California
#
# This file is part of pelicun.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its contributors
# may be used to endorse or promote products derived from this software without
# specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# You should have received a copy of the BSD 3-Clause License along with
# pelicun. If not, see <http://www.opensource.org/licenses/>.
#
# Contributors:
# Adam Zsarnóczay

"""
These are unit and integration tests on the uq module of pelicun.

"""
import pytest
import numpy as np
from scipy.stats import norm
from numpy.testing import assert_allclose
from copy import deepcopy

import os, sys, inspect
current_dir = os.path.dirname(
    os.path.abspath(inspect.getfile(inspect.currentframe())))
parent_dir = os.path.dirname(current_dir)
sys.path.insert(0,os.path.dirname(parent_dir))

from pelicun.tests.test_pelicun import assert_normal_distribution
from pelicun.uq import *

# ------------------------------------------------------------------------------
# ORTHOTOPE DENSITY
# ------------------------------------------------------------------------------
[docs]def test_MVN_CDF_univariate(): """ Test if the MVN CDF function provides accurate results for the special univariate case. """ # Testing is based on the CDF results from scipy's norm function. ref_mean = 0.5 ref_std = 0.25 ref_var = ref_std ** 2. lower = np.arange(13)/5.-1.5 upper = np.arange(13)/3. for l,u in zip(lower,upper): ref_l, ref_u = norm.cdf([l,u],loc=ref_mean, scale=ref_std) ref_res = ref_u-ref_l test_res, __ = mvn_orthotope_density(ref_mean, ref_var, lower=l, upper=u) assert ref_res == pytest.approx(test_res) # test if the function works properly with infinite boundaries test_res, __ = mvn_orthotope_density(ref_mean, ref_var) assert test_res == 1.
[docs]def test_MVN_CDF_multivariate(): """ Test if the MVN CDF function provides accurate results for multivariate cases. """ # Testing is based on univariate results compared to estimates of MVN CDF # with special correlation structure # First, assume perfect correlation. Results should be identical for all # dimensions for dims in range(2, 6): ref_mean = np.arange(dims, dtype=np.float64) ref_std = np.arange(1, dims + 1, dtype=np.float64) ref_rho = np.ones((dims, dims)) * 1.0 ref_COV = np.outer(ref_std, ref_std) * ref_rho np.fill_diagonal(ref_COV, ref_std ** 2.) lower = ref_mean - ref_std * 2.5 upper = ref_mean + ref_std * 1.5 test_res, __ = mvn_orthotope_density(ref_mean, ref_COV, lower=lower, upper=upper) ref_l, ref_u = norm.cdf([-2.5, 1.5], loc=0., scale=1.) ref_res = ref_u - ref_l assert ref_res == pytest.approx(test_res) # Second, assume independence. Results should be equal to the univariate # result on the power of the number of dimensions. for dims in range(2, 6): ref_mean = np.arange(dims, dtype=np.float64) ref_std = np.arange(1, dims + 1, dtype=np.float64) ref_rho = np.ones((dims, dims)) * 0.0 ref_COV = np.outer(ref_std, ref_std) * ref_rho np.fill_diagonal(ref_COV, ref_std ** 2.) lower = ref_mean - ref_std * 2.5 upper = ref_mean + ref_std * 1.5 test_res, __ = mvn_orthotope_density(ref_mean, ref_COV, lower=lower, upper=upper) ref_l, ref_u = norm.cdf([-2.5, 1.5], loc=0., scale=1.) ref_res = ref_u - ref_l assert ref_res**dims == pytest.approx(test_res)
# ------------------------------------------------------------------------------ # Random_Variable objects # ------------------------------------------------------------------------------
[docs]def test_RandomVariable_incorrect_none_defined(): """ Test if the random variable object raises an error when the distribution is not empirical and no parameters are provided. """ with pytest.raises(ValueError) as e_info: RandomVariable(name='A', distribution='normal')
[docs]def test_RandomVariable_incorrect_multinomial_definition(): """ Test if the random variable object raises an error when a multinomial distribution is defined with incorrect parameters, and test that it does not raise an error when the right parameters are provided. """ p_values = [0.5, 0.2, 0.1, 0.2] # correct parameters RandomVariable(name='A', distribution='multinomial', theta=[0.5, 0.2, 0.1, 0.2]) # sum(p) less than 1.0 -> should be automatically corrected RandomVariable(name='A', distribution='multinomial', theta=[0.5, 0.2, 0.1, 0.1]) # sum(p) more than 1.0 -> should raise an error with pytest.raises(ValueError) as e_info: RandomVariable(name='A', distribution='multinomial', theta=[0.5, 0.2, 0.1, 0.3])
# ------------------------------------------------------------------------------ # SAMPLING # ------------------------------------------------------------------------------
[docs]def test_sampling_tr_alpha_error(): """ Test if the function raises an error when the probability density that the truncation limits define is not sufficiently accurate for further use. """ RV_reg = RandomVariableRegistry() RV_reg.add_RV(RandomVariable(name='A', distribution='normal', theta=[0.5, 0.25], truncation_limits = [-10.0, -9.9])) with pytest.raises(ValueError) as e_info: RV_reg.generate_samples(sample_size=10, seed=1)
[docs]def test_sampling_non_truncated(): """ Test if the sampling method returns appropriate samples for a non-truncated univariate and multivariate normal distribution. """ # univariate case ref_mean = 0.5 ref_std = 0.25 RV_reg = RandomVariableRegistry() RV_reg.add_RV(RandomVariable(name='A', distribution='normal', theta=[ref_mean, ref_std])) def sampling_function(sample_size): RV_reg.generate_samples(sample_size=sample_size) return RV_reg.RV_samples['A'] assert assert_normal_distribution(sampling_function, ref_mean, ref_std**2.) # bivariate case, various correlation coefficients rho_list = [-1., -0.5, 0., 0.5, 1.] for rho in rho_list: ref_mean = [0.5, 1.5] ref_std = [0.25, 1.0] ref_rho = np.asarray([[1.0, rho],[rho, 1.0]]) ref_COV = np.outer(ref_std, ref_std) * ref_rho RV_reg = RandomVariableRegistry() for i, (mu, std) in enumerate(zip(ref_mean, ref_std)): RV_reg.add_RV(RandomVariable(name=i, distribution='normal', theta=[mu, std])) RV_reg.add_RV_set( RandomVariableSet('A', [RV_reg.RV[rv] for rv in range(len(ref_mean))], ref_rho)) def sampling_function(sample_size): RV_reg.generate_samples(sample_size=sample_size) return pd.DataFrame(RV_reg.RV_samples).values assert_normal_distribution(sampling_function, ref_mean, ref_COV) # multi-dimensional case dims = 5 ref_mean = np.arange(dims, dtype=np.float64) ref_std = np.arange(1,dims+1, dtype=np.float64) ref_rho = np.ones((dims,dims))*0.3 np.fill_diagonal(ref_rho, 1.0) ref_COV = np.outer(ref_std,ref_std) * ref_rho RV_reg = RandomVariableRegistry() for i, (mu, std) in enumerate(zip(ref_mean, ref_std)): RV_reg.add_RV(RandomVariable(name=i, distribution='normal', theta=[mu, std])) RV_reg.add_RV_set( RandomVariableSet('A', [RV_reg.RV[rv] for rv in range(len(ref_mean))], ref_rho)) def sampling_function(sample_size): RV_reg.generate_samples(sample_size=sample_size) return pd.DataFrame(RV_reg.RV_samples).values assert_normal_distribution(sampling_function, ref_mean, ref_COV)
[docs]def test_sampling_truncated_wide_limits(): """ Test if the sampling method returns appropriate samples for a truncated univariate and multivariate normal distribution when the truncation limits are sufficiently wide to consider the result a normal distribution. """ # assign a non-symmetric, but very wide set of limits for all cases # univariate case ref_mean = 0.5 ref_std = 0.25 ref_var = ref_std ** 2. lower = -1e10 upper = 1e9 # truncation both ways RV_reg = RandomVariableRegistry() RV_reg.add_RV(RandomVariable(name='A', distribution='normal', theta=[ref_mean, ref_std], truncation_limits = [lower, upper])) def sampling_function(sample_size): RV_reg.generate_samples(sample_size=sample_size) return RV_reg.RV_samples['A'] assert assert_normal_distribution(sampling_function, ref_mean, ref_std ** 2.) # upper truncation RV_reg = RandomVariableRegistry() RV_reg.add_RV(RandomVariable(name='A', distribution='normal', theta=[ref_mean, ref_std], truncation_limits=[None, upper])) def sampling_function(sample_size): RV_reg.generate_samples(sample_size=sample_size) return RV_reg.RV_samples['A'] assert assert_normal_distribution(sampling_function, ref_mean, ref_std ** 2.) # lower truncation RV_reg = RandomVariableRegistry() RV_reg.add_RV(RandomVariable(name='A', distribution='normal', theta=[ref_mean, ref_std], truncation_limits=[lower, None])) def sampling_function(sample_size): RV_reg.generate_samples(sample_size=sample_size) return RV_reg.RV_samples['A'] assert assert_normal_distribution(sampling_function, ref_mean, ref_std ** 2.) # bivariate case, various correlation coefficients rho_list = [-1., -0.5, 0., 0.5, 1.] lower = np.ones(2) * lower upper = np.ones(2) * upper for rho in rho_list: ref_mean = [0.5, 1.5] ref_std = [0.25, 1.0] ref_rho = np.asarray([[1.0, rho], [rho, 1.0]]) ref_COV = np.outer(ref_std, ref_std) * ref_rho RV_reg = RandomVariableRegistry() for i, (mu, std) in enumerate(zip(ref_mean, ref_std)): RV_reg.add_RV( RandomVariable(name=i, distribution='normal', theta=[mu, std], truncation_limits = [lower[i], upper[i]])) RV_reg.add_RV_set( RandomVariableSet('A', [RV_reg.RV[rv] for rv in range(len(ref_mean))], ref_rho)) def sampling_function(sample_size): RV_reg.generate_samples(sample_size=sample_size) return pd.DataFrame(RV_reg.RV_samples).values assert_normal_distribution(sampling_function, ref_mean, ref_COV) # multi-dimensional case dims = 3 ref_mean = np.arange(dims, dtype=np.float64) ref_std = np.ones(dims) * 0.25 ref_rho = np.ones((dims, dims)) * 0.3 np.fill_diagonal(ref_rho, 1.0) ref_COV = np.outer(ref_std, ref_std) * ref_rho lower = np.ones(dims) * lower[0] upper = np.ones(dims) * upper[0] RV_reg = RandomVariableRegistry() for i, (mu, std) in enumerate(zip(ref_mean, ref_std)): RV_reg.add_RV(RandomVariable(name=i, distribution='normal', theta=[mu, std], truncation_limits = [lower[i], upper[i]])) RV_reg.add_RV_set( RandomVariableSet('A', [RV_reg.RV[rv] for rv in range(len(ref_mean))], ref_rho)) def sampling_function(sample_size): RV_reg.generate_samples(sample_size=sample_size) return pd.DataFrame(RV_reg.RV_samples).values assert_normal_distribution(sampling_function, ref_mean, ref_COV)
[docs]def test_sampling_truncated_narrow_limits(): """ Test if the sampling method returns appropriate samples for a truncated univariate and multivariate normal distribution when the truncation limits are narrow. """ # here we focus on testing if the returned samples are between the # pre-defined limits in all dimensions # univariate case ref_mean = 0.5 ref_std = 0.25 ref_var = ref_std ** 2. lower = 0.1 upper = 0.6 RV_reg = RandomVariableRegistry() RV_reg.add_RV(RandomVariable(name='A', distribution='normal', theta=[ref_mean, ref_std], truncation_limits=[lower, upper])) RV_reg.generate_samples(sample_size=1000) samples = RV_reg.RV_samples['A'] sample_min = np.min(samples) sample_max = np.max(samples) assert sample_min > lower assert sample_max < upper assert sample_min == pytest.approx(lower, abs=0.01) assert sample_max == pytest.approx(upper, abs=0.01) # multi-dimensional case dims = 2 ref_mean = np.arange(dims, dtype=np.float64) ref_std = np.ones(dims) * 0.25 ref_rho = np.ones((dims, dims)) * 0.3 np.fill_diagonal(ref_rho, 1.0) lower = ref_mean - ref_std * 2.5 upper = ref_mean + ref_std * 1.5 RV_reg = RandomVariableRegistry() for i, (mu, std) in enumerate(zip(ref_mean, ref_std)): RV_reg.add_RV(RandomVariable(name=i, distribution='normal', theta=[mu, std], truncation_limits=[lower[i], upper[i]])) RV_reg.add_RV_set( RandomVariableSet('A', [RV_reg.RV[rv] for rv in range(len(ref_mean))], ref_rho)) RV_reg.generate_samples(sample_size=1000) samples = pd.DataFrame(RV_reg.RV_samples).values sample_min = np.amin(samples, axis=0) sample_max = np.amax(samples, axis=0) assert np.all(sample_min > lower) assert np.all(sample_max < upper) assert_allclose(sample_min, lower, atol=0.1) assert_allclose(sample_max, upper, atol=0.1)
[docs]def test_RandomVariable_sample_distribution_mixed_normal(): """ Test if the distribution is sampled appropriately for a correlated mixture of normal and lognormal variables. Note that we already tested the sampling algorithm itself earlier, so we will not do a thorough verification of the samples, but rather check for errors in the inputs that would typically lead to significant mistakes in the results. """ dims = 3 ref_mean = np.arange(dims, dtype=np.float64) ref_std = np.ones(dims) * 1.00 ref_rho = np.ones((dims, dims)) * 0.5 np.fill_diagonal(ref_rho, 1.0) # prepare the truncation limits - note that these are wide limits and # barely affect the distribution lower = (ref_mean - ref_std * 10.).tolist() upper = ref_mean + ref_std * 10. # variable 1 is assumed to have lognormal distribution with no lower # truncation rv_mean = ref_mean.copy() rv_mean[1] = np.exp(rv_mean[1]) lower[1] = None upper[1] = np.exp(upper[1]) dist_list = ['normal', 'lognormal', 'normal'] RV_reg = RandomVariableRegistry() for i, (mu, std, dist) in enumerate(zip(rv_mean, ref_std, dist_list)): RV_reg.add_RV(RandomVariable(name=i, distribution=dist, theta=[mu, std], truncation_limits=[lower[i], upper[i]])) RV_reg.add_RV_set( RandomVariableSet('A', [RV_reg.RV[rv] for rv in range(len(ref_mean))], ref_rho)) RV_reg.generate_samples(sample_size=1000) samples = pd.DataFrame(RV_reg.RV_samples).values.T # make sure that retrieving samples of an individual RV works as intended assert_allclose(samples[1], RV_reg.RV_samples[1]) # convert the lognormal samples to log space for the checks samples[1] = np.log(samples[1]) assert_allclose(np.mean(samples, axis=1), ref_mean, atol=0.01) ref_COV = np.outer(ref_std, ref_std) * ref_rho assert_allclose(np.cov(samples), ref_COV, atol=0.10)
[docs]def test_RandomVariable_sample_distribution_multinomial(): """ Test if the distribution is sampled appropriately for a multinomial variable. Also test that getting values for an individual RV that is part of a correlated RV_set works appropriately." """ # first test with an incomplete p_ref p_ref = [0.1, 0.3, 0.5] dims = 3 RV_reg= RandomVariableRegistry() for i in range(dims): RV_reg.add_RV(RandomVariable(name=i, distribution='multinomial', theta=p_ref)) ref_rho = np.ones((dims, dims)) * 0.5 np.fill_diagonal(ref_rho, 1.0) RV_reg.add_RV_set( RandomVariableSet('A', [RV_reg.RV[rv] for rv in range(dims)], ref_rho)) RV_reg.generate_samples(sample_size=10000) samples = pd.DataFrame(RV_reg.RV_samples).values.T p_ref[-1] = 1. - np.sum(p_ref[:-1]) h_bins = np.arange(len(p_ref) + 1) - 0.5 p_test = np.histogram(samples[1], bins=h_bins, density=True)[0] assert_allclose(p_test, p_ref, atol=0.05) # also make sure that individual RV's samples are returned appropriately p_test_RV = np.histogram(RV_reg.RV_samples[1], bins=h_bins, density=True)[0] assert_allclose(p_test_RV, p_ref, atol=0.05) assert_allclose(samples[1], RV_reg.RV_samples[1]) # and the prescribed correlation is applied # note that the correlation between the samples is not going to be identical # to the correlation used to generate the underlying uniform distribution rho_target = [[1.0, 0.383, 0.383], [0.383, 1.0, 0.383], [0.383, 0.383, 1.0]] assert_allclose(np.corrcoef(samples), rho_target, atol=0.05) # finally, check the original sampling with the complete p_ref RV_reg = RandomVariableRegistry() for i in range(dims): RV_reg.add_RV(RandomVariable(name=i, distribution='multinomial', theta=p_ref)) ref_rho = np.ones((dims, dims)) * 0.5 np.fill_diagonal(ref_rho, 1.0) RV_reg.add_RV_set( RandomVariableSet('A', [RV_reg.RV[rv] for rv in range(dims)], ref_rho)) RV_reg.generate_samples(sample_size=10000) samples = pd.DataFrame(RV_reg.RV_samples).values.T p_test = np.histogram(samples[1], bins=h_bins, density=True)[0] assert_allclose(p_test, p_ref, atol=0.05)
# ------------------------------------------------------------------------------ # FITTING # ------------------------------------------------------------------------------
[docs]def test_fitting_baseline(): """ Test if the max. likelihood estimates of a (multivariate) normal distribution are sufficiently accurate in the baseline case with no truncation and no censoring. """ # univariate case ref_mean = 0.5 ref_std = 0.25 # generate samples RV_reg = RandomVariableRegistry() RV_reg.add_RV(RandomVariable(name='A', distribution='normal', theta=[ref_mean, ref_std])) RV_reg.generate_samples(sample_size=100) samples = RV_reg.RV_samples['A'] # estimate the parameters of the distribution mu, std = fit_distribution(samples, 'normal')[0][0] assert ref_mean == pytest.approx(mu, abs=0.01) assert ref_std == pytest.approx(std, rel=0.05) # multi-dimensional case dims = 3 ref_mean = np.arange(dims, dtype=np.float64) ref_std = np.ones(dims) * 0.5 ref_rho = np.ones((dims, dims)) * 0.5 np.fill_diagonal(ref_rho, 1.0) RV_reg = RandomVariableRegistry() for i, (mu, std) in enumerate(zip(ref_mean, ref_std)): RV_reg.add_RV(RandomVariable(name=i, distribution='normal', theta=[mu, std])) RV_reg.add_RV_set( RandomVariableSet('A', [RV_reg.RV[rv] for rv in range(dims)], ref_rho)) RV_reg.generate_samples(sample_size=100) samples = pd.DataFrame(RV_reg.RV_samples).values.T # estimate the parameters of the distribution test_theta, test_rho = fit_distribution(samples, 'normal') test_mu, test_std = test_theta.T assert_allclose(test_mu, ref_mean, atol=0.01) assert_allclose(test_std, ref_std, rtol=0.1) assert_allclose(test_rho, ref_rho, atol=0.3)
[docs]def test_fitting_censored(): """ Test if the max. likelihood estimates of a multivariate normal distribution are sufficiently accurate in cases with censored data. """ # univariate case ref_mean = 0.5 ref_std = 0.25 c_lower = 0.35 c_upper = 1.25 sample_count = 100 # generate censored samples RV_reg = RandomVariableRegistry() RV_reg.add_RV(RandomVariable(name='A', distribution='normal', theta=[ref_mean, ref_std])) RV_reg.generate_samples(sample_size=sample_count) samples = RV_reg.RV_samples['A'] # censor the samples good_ones = np.all([samples>c_lower, samples<c_upper],axis=0) c_samples = samples[good_ones] c_count = sample_count - sum(good_ones) # estimate the parameters of the distribution test_theta, __ = fit_distribution(c_samples, 'normal', censored_count=c_count, detection_limits=[c_lower, c_upper]) test_mu, test_std = test_theta[0] assert ref_mean == pytest.approx(test_mu, abs=0.05) assert ref_std == pytest.approx(test_std, rel=0.05) # multi-dimensional case dims = 3 ref_mean = np.arange(dims, dtype=np.float64) ref_std = np.ones(dims) * 0.25 ref_rho = np.ones((dims, dims)) * 0.5 np.fill_diagonal(ref_rho, 1.0) c_lower = ref_mean - 1.0 * ref_std c_upper = ref_mean + 8.5 * ref_std c_lower[2] = -np.inf c_upper[0] = np.inf sample_count = 1000 # generate samples RV_reg = RandomVariableRegistry() for i, (mu, std) in enumerate(zip(ref_mean, ref_std)): RV_reg.add_RV(RandomVariable(name=i, distribution='normal', theta=[mu, std])) RV_reg.add_RV_set( RandomVariableSet('A', [RV_reg.RV[rv] for rv in range(dims)], ref_rho)) RV_reg.generate_samples(sample_size=sample_count) samples = pd.DataFrame(RV_reg.RV_samples).values # censor the samples good_ones = np.all([samples > c_lower, samples < c_upper], axis=0) good_ones = np.all(good_ones, axis=1) c_samples = samples[good_ones] c_count = sample_count - sum(good_ones) det_lims = np.array([c_lower, c_upper]).T test_theta, test_rho = fit_distribution(c_samples.T, 'normal', censored_count=c_count, detection_limits=det_lims) test_mu, test_std = test_theta.T assert_allclose(test_mu, ref_mean, atol=0.1) assert_allclose(test_std , ref_std, rtol=0.25) assert_allclose(test_rho, ref_rho, atol=0.3)
[docs]def test_fitting_truncated(): """ Test if the max. likelihood estimates of a multivariate normal distribution are sufficiently accurate in cases with truncation and uncensored data. """ # univariate case ref_mean = 0.5 ref_std = 0.25 tr_lower = 0.35 tr_upper = 1.25 # generate samples of a TMVN distribution RV_reg = RandomVariableRegistry() RV_reg.add_RV(RandomVariable(name='A', distribution='normal', theta=[ref_mean, ref_std], truncation_limits=[tr_lower, tr_upper])) RV_reg.generate_samples(sample_size=100) samples = RV_reg.RV_samples['A'] # estimate the parameters of the distribution test_theta, __ = fit_distribution(samples, 'normal', truncation_limits=[tr_lower, tr_upper]) test_mu, test_std = test_theta[0] assert ref_mean == pytest.approx(test_mu, abs=0.05) assert ref_std == pytest.approx(test_std, rel=0.05) # multi-dimensional case dims = 3 ref_mean = np.arange(dims, dtype=np.float64) ref_std = np.ones(dims) * 0.25 ref_rho = np.ones((dims, dims)) * 0.5 np.fill_diagonal(ref_rho, 1.0) tr_lower = ref_mean - 0.25 * ref_std tr_upper = ref_mean + 2.0 * ref_std tr_lower[2] = -np.inf tr_upper[0] = np.inf # generate samples RV_reg = RandomVariableRegistry() for i, (mu, std) in enumerate(zip(ref_mean, ref_std)): RV_reg.add_RV(RandomVariable(name=i, distribution='normal', theta=[mu, std], truncation_limits=[tr_lower[i], tr_upper[i]])) RV_reg.add_RV_set( RandomVariableSet('A', [RV_reg.RV[rv] for rv in range(dims)], ref_rho)) RV_reg.generate_samples(sample_size=1000) samples = pd.DataFrame(RV_reg.RV_samples).values.T # estimate the parameters of the distribution tr_lims = np.array([tr_lower, tr_upper]).T test_theta, test_rho = fit_distribution(samples, 'normal', truncation_limits=tr_lims) test_mu, test_std = test_theta.T #print(max(abs(test_mu-ref_mean)), max(abs(test_std-ref_std))) assert_allclose(test_mu, ref_mean, atol=0.1) assert_allclose(test_std, ref_std, atol=0.05) assert_allclose(test_rho, ref_rho, atol=0.2)
[docs]def test_fitting_truncated_and_censored(): """ Test if the max. likelihood estimates of a multivariate normal distribution are sufficiently accurate in cases with truncation and censored data. """ # univariate case ref_mean = 0.5 ref_std = 0.25 tr_lower = 0.35 tr_upper = 2.5 det_upper = 1.25 det_lower = tr_lower # generate samples of a TMVN distribution RV_reg = RandomVariableRegistry() RV_reg.add_RV(RandomVariable(name='A', distribution='normal', theta=[ref_mean, ref_std], truncation_limits=[tr_lower, tr_upper])) RV_reg.generate_samples(sample_size=100) samples = RV_reg.RV_samples['A'] # censor the samples good_ones = samples < det_upper c_samples = samples[good_ones] c_count = 100 - sum(good_ones) # estimate the parameters of the distribution test_theta, __ = fit_distribution(c_samples, 'normal', censored_count=c_count, detection_limits=[det_lower, det_upper], truncation_limits=[tr_lower, tr_upper]) test_mu, test_std = test_theta[0] assert ref_mean == pytest.approx(test_mu, abs=0.05) assert ref_std == pytest.approx(test_std, rel=0.05) # # multi-dimensional case dims = 3 ref_mean = np.arange(dims, dtype=np.float64) ref_std = np.ones(dims) * 0.25 ref_rho = np.ones((dims, dims)) * 0.5 np.fill_diagonal(ref_rho, 1.0) tr_lower = ref_mean - 4.5 * ref_std tr_upper = ref_mean + 2.5 * ref_std tr_lower[2] = -np.inf tr_upper[0] = np.inf det_lower = ref_mean - 1. * ref_std det_lower[2] = -np.inf det_upper = tr_upper # generate samples RV_reg = RandomVariableRegistry() for i, (mu, std) in enumerate(zip(ref_mean, ref_std)): RV_reg.add_RV(RandomVariable(name=i, distribution='normal', theta=[mu, std], truncation_limits=[tr_lower[i], tr_upper[i]])) RV_reg.add_RV_set( RandomVariableSet('A', [RV_reg.RV[rv] for rv in range(dims)], ref_rho)) RV_reg.generate_samples(sample_size=1000) samples = pd.DataFrame(RV_reg.RV_samples).values # censor the samples good_ones = np.all([samples > det_lower, samples < det_upper], axis=0) good_ones = np.all(good_ones, axis=1) c_samples = samples[good_ones] c_count = 1000 - sum(good_ones) # estimate the parameters of the distribution det_lims = np.array([det_lower, det_upper]).T tr_lims = np.array([tr_lower, tr_upper]).T test_theta, test_rho = fit_distribution( c_samples.T, 'normal', censored_count=c_count, detection_limits=det_lims, truncation_limits=tr_lims) test_mu, test_std = test_theta.T assert_allclose(test_mu, ref_mean, atol=0.1) assert_allclose(test_std, ref_std, rtol=0.25) assert_allclose(test_rho, ref_rho, atol=0.3)
[docs]def test_fitting_lognormal(): """ Test if the max. likelihood estimates of a multivariate lognormal distribution are sufficiently accurate """ # univariate case # generate raw data ref_median = 0.5 ref_std = 0.25 # generate samples RV_reg = RandomVariableRegistry() RV_reg.add_RV(RandomVariable(name='A', distribution='lognormal', theta=[ref_median, ref_std])) RV_reg.generate_samples(sample_size=100) samples = RV_reg.RV_samples['A'] # estimate the parameters of the distribution median, std = fit_distribution(samples, 'lognormal')[0][0] assert ref_median == pytest.approx(median, abs=0.01) assert ref_std == pytest.approx(std, rel=0.05) # multivariate case # generate raw data dims = 6 ref_median = np.exp(np.arange(dims, dtype=np.float64)) ref_std = np.ones(dims) * 0.5 ref_rho = np.ones((dims, dims)) * 0.5 np.fill_diagonal(ref_rho, 1.0) RV_reg = RandomVariableRegistry() for i, (median, std) in enumerate(zip(ref_median, ref_std)): RV_reg.add_RV(RandomVariable(name=i, distribution='lognormal', theta=[median, std])) RV_reg.add_RV_set( RandomVariableSet('A', [RV_reg.RV[rv] for rv in range(dims)], ref_rho)) RV_reg.generate_samples(sample_size=100) samples = pd.DataFrame(RV_reg.RV_samples).values.T # estimate the parameters of the distribution test_theta, test_rho = fit_distribution(samples, 'lognormal') test_median, test_std = test_theta.T assert_allclose(np.log(test_median), np.log(ref_median), atol=0.01) assert_allclose(test_std, ref_std, rtol=0.2) assert_allclose(test_rho, ref_rho, atol=0.3)