Source code for pelicun.db

# -*- coding: utf-8 -*-
# Copyright (c) 2018 Leland Stanford Junior University
# Copyright (c) 2018 The Regents of the University of California
# This file is part of pelicun.
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the names of its contributors
# may be used to endorse or promote products derived from this software without
# specific prior written permission.
# You should have received a copy of the BSD 3-Clause License along with
# pelicun. If not, see <>.
# Contributors:
# Adam Zsarnóczay

This module has classes and methods to manage databases used by pelicun.

.. rubric:: Contents

.. autosummary::




from .base import *
from pathlib import Path
import json
import xml.etree.ElementTree as ET
import shutil

[docs]def dict_generator(indict, pre=None): """ Lists all branches of a tree defined by a dictionary. The dictionary can have nested dictionaries and lists. When encountering a list, its elements are returned as separate branches with each element's id created as a combination of the parent key and #i where i stands for the element number in the list. This method can process a json file and break it up into independent branches. """ pre = pre[:] if pre else [] if isinstance(indict, dict): for key, value in indict.items(): if isinstance(value, dict): for d in dict_generator(value, pre + [key]): yield d elif isinstance(value, list) or isinstance(value, tuple): for v_id, v in enumerate(value): for d in dict_generator(v, pre + [key + f'#{v_id}']): yield d else: yield pre + [key, value] else: yield pre + [indict]
[docs]def get_val_from_dict(indict, col): """ Gets the value from a branch of a dictionary. The dictionary can have nested dictionaries and lists. When walking through lists, #i in the branch data identifies the ith element of the list. This method can be used to travel branches of a dictionary previously defined by the dict_generator method above. """ val = indict for col_i in col: if col_i != ' ': if '#' in col_i: col_name, col_id = col_i.split('#') col_id = int(col_id) if (col_name in val.keys()) and (col_id < len(val[col_name])): val = val[col_name][int(col_id)] else: return None elif col_i in val.keys(): val = val[col_i] else: return None return val
def convert_jsons_to_table(json_id_list, json_list, json_template): # Define the header for the data table based on the template structure header = np.array( [[col[:-1], len(col[:-1])] for col in dict_generator(json_template)]) lvls = max(np.transpose(header)[1]) header = [col + (lvls - size) * [' ', ] for col, size in header] # Use the header to initialize the DataFrame that will hold the data MI = pd.MultiIndex.from_tuples(header) json_DF = pd.DataFrame(columns=MI, index=json_id_list) = 'ID' # Load the data into the DF for json_id, json_data in zip(json_id_list, json_list): for col in json_DF.columns: val = get_val_from_dict(json_data, col) if val is not None:[json_id, col] = val # Remove empty rows and columns json_DF = json_DF.dropna(axis=0, how='all') json_DF = json_DF.dropna(axis=1, how='all') # Set the dtypes for the columns based on the template for col in json_DF.columns: dtype = get_val_from_dict(json_template, col) if dtype != 'string': try: json_DF[col] = json_DF[col].astype(dtype) except: print(col, dtype) else: json_DF[col] = json_DF[col].apply(str) return json_DF
[docs]def save_to_standard_HDF(df, name, target_path, mode='w'): """ Saves a DataFrame in a standard HDF format using h5py. """ import h5py # import here to avoid an issue on Stampede2 df = df.T hf = h5py.File(target_path, mode) #try: if True: # save each row (i.e., column of FG_df) as a separate dataset in the file for row_id, row in df.iterrows(): row = row.convert_dtypes() # create a tree that identifies the column in the hierarchy row_name = name for label in row_id: if label != " ": row_name += f'/{label}' # perform the appropriate type conversion before saving if row.dtype == np.float64: values = row.values.astype(float) hf.create_dataset(row_name, data=values) elif row.dtype == pd.StringDtype(): # Strings are saved as ASCII strings so that the files can be # opened with any tool on any platform. Non-ASCII characters # are replaced by a backslash-escaped UTF8 identifier. values = row.values.astype(str) values = np.char.encode(values, encoding='ASCII', errors='backslashreplace') hf.create_dataset(row_name, data=values) elif row.dtype in [pd.BooleanDtype(), pd.Int64Dtype()]: row.fillna(-1, inplace=True) values = row.values.astype(int) hf.create_dataset(row_name, data=values) else: print("unknown dtype: ", row.dtype) # finally, save the index values = df.columns.values.astype(str) values = np.char.encode(values, encoding='ASCII', errors='backslashreplace') hf.create_dataset(f'{name}/index', data=values) #except: # show_warning("Error while trying to save standard HDF5 file.") hf.close()
[docs]def convert_json_files_to_HDF(data_source_dir, DL_dir, db_name): """ Converts data from json files to a single HDF5 file """ # Start with the fragility and consequence data - we'll call it data DL_dir = Path(DL_dir).resolve() data_source_dir = Path(data_source_dir).resolve() # get a list of json files to convert FG_ID_list = [filename[:-5] for filename in os.listdir(DL_dir / 'DL json')] # We will use a template.json to define the schema for the jsons and the # header for the data table. with open(data_source_dir / 'DL_template.json', 'r') as f: FG_template = json.load(f) FG_list = [] DL_json_dir = DL_dir / 'DL json' for FG_i in FG_ID_list: with open(DL_json_dir / f'{FG_i}.json', 'r') as f: FG_list.append(json.load(f)) FG_df = convert_jsons_to_table(FG_ID_list, FG_list, FG_template) # start with saving the data in standard HDF5 format save_to_standard_HDF(FG_df, name='data_standard', target_path=DL_dir / f'{db_name}.hdf') # then also save it using PyTables for quick access and slicing FG_df.to_hdf(DL_dir / f'{db_name}.hdf', 'data', mode='a', format='table', complevel=1, complib='blosc:snappy') # Now add the population distribution data - we'll call it pop # Only do this if there is population data try: with open(DL_dir / 'population.json', 'r') as f: pop = json.load(f) pop_ID_list = list(pop.keys()) pop_data = [pop[key] for key in pop.keys()] with open(data_source_dir / 'pop_template.json', 'r') as f: pop_template = json.load(f) pop_df = convert_jsons_to_table(pop_ID_list, pop_data, pop_template) save_to_standard_HDF(pop_df, name='pop_standard', target_path=DL_dir / f'{db_name}.hdf', mode='a') pop_df.to_hdf(DL_dir / f'{db_name}.hdf', 'pop', mode='a', format='table', complevel=1, complib='blosc:snappy') except: pass
[docs]def convert_Series_to_dict(comp_Series): """ Converts data from a table to a json file """ comp_Series = comp_Series.dropna(how='all') comp_dict = {} for branch in comp_Series.index: nested_dict = comp_dict parent_dict = None parent_val = None parent_id = None for val in branch: if val != ' ': if '#' in val: val, list_id = val.split('#') list_id = int(list_id) else: list_id = None if val not in nested_dict.keys(): if list_id is not None: nested_dict.update({val: []}) else: nested_dict.update({val: {}}) if list_id is not None: if list_id > len(nested_dict[val]) - 1: nested_dict[val].append({}) parent_dict = nested_dict nested_dict = nested_dict[val][list_id] parent_id = list_id else: parent_dict = nested_dict nested_dict = nested_dict[val] parent_val = val if isinstance(parent_dict[parent_val], dict): parent_dict[parent_val] = comp_Series[branch] else: parent_dict[parent_val][parent_id] = comp_Series[branch] return comp_dict
[docs]def convert_P58_data_to_json(data_dir, target_dir): """ Create JSON data files from publicly available P58 data. FEMA P58 damage and loss information is publicly available in an Excel spreadsheet and also in a series of XML files as part of the PACT tool. Those files are copied to the resources folder in the pelicun repo. Here we collect the available information on Fragility Groups from those files and save the damage and loss data in the common SimCenter JSON format. A large part of the Fragility Groups in FEMA P58 do not have complete damage and loss information available. These FGs are clearly marked with an incomplete flag in the JSON file and the 'Undefined' value highlights the missing pieces of information. Parameters ---------- data_dir: string Path to the folder with the FEMA P58 Excel file and a 'DL xml' subfolder in it that contains the XML files. target_dir: string Path to the folder where the JSON files shall be saved. """ data_dir = Path(data_dir).resolve() target_dir = Path(target_dir).resolve() DL_dir = None convert_unit = { 'Unit less': 'ea', 'Radians' : 'rad', 'g' : 'g', 'meter/sec': 'mps' } convert_DSG_type = { 'MutEx': 'MutuallyExclusive', 'Simul': 'Simultaneous' } def decode_DS_Hierarchy(DSH): if 'Seq' == DSH[:3]: DSH = DSH[4:-1] DS_setup = [] while len(DSH) > 0: if DSH[:2] == 'DS': DS_setup.append(DSH[:3]) DSH = DSH[4:] elif DSH[:5] in ['MutEx', 'Simul']: closing_pos = DSH.find(')') subDSH = DSH[:closing_pos + 1] DSH = DSH[closing_pos + 2:] DS_setup.append([subDSH[:5]] + subDSH[6:-1].split(',')) return DS_setup def parse_DS_xml(DS_xml): CFG = DS_xml.find('ConsequenceGroup') CFG_C = CFG.find('CostConsequence') CFG_T = CFG.find('TimeConsequence') repair_cost = dict( Amount=[float(CFG_C.find('MaxAmount').text), float(CFG_C.find('MinAmount').text)], Quantity=[float(CFG_C.find('LowerQuantity').text), float(CFG_C.find('UpperQuantity').text)], CurveType=CFG_C.find('CurveType').text, Beta=float(CFG_C.find('Uncertainty').text), Bounds=[0., 'None'] ) if repair_cost['Amount'] == [0.0, 0.0]: repair_cost['Amount'] = 'Undefined' repair_time = dict( Amount=[float(CFG_T.find('MaxAmount').text), float(CFG_T.find('MinAmount').text)], Quantity=[float(CFG_T.find('LowerQuantity').text), float(CFG_T.find('UpperQuantity').text)], CurveType=CFG_T.find('CurveType').text, Beta=float(CFG_T.find('Uncertainty').text), Bounds=[0., 'None'] ) if repair_time['Amount'] == [0.0, 0.0]: repair_time['Amount'] = 'Undefined' return repair_cost, repair_time def is_float(s): try: if type(s) == str and s[-1] == '%': s_f = float(s[:-1]) / 100. else: s_f = float(s) if np.isnan(s_f): return False else: return True except ValueError: return False src_df = pd.read_excel(data_dir / 'PACT_fragility_data.xlsx') ID_list = src_df['NISTIR Classification'] XML_list = [f for f in os.listdir(data_dir / 'DL xml') if f.endswith('.xml')] incomplete_count = 0 for filename in XML_list: comp_ID = filename[:-4] #try: if True: tree = ET.parse((data_dir / 'DL xml') / f'{comp_ID}.xml') root = tree.getroot() # correct for the error in the numbering of RC beams if (comp_ID[:5] == 'B1051') and ( comp_ID[-1] not in str([1, 2, 3, 4])): comp_ID = 'B1042' + comp_ID[5:] row = src_df.loc[np.where(ID_list == comp_ID)[0][0], :] json_output = {} incomplete = False json_output.update({'Name': row['Component Name']}) QU = row['Fragility Unit of Measure'] QU = QU.split(' ') if is_float(QU[1]): if QU[0] in ['TN', 'AP', 'CF', 'KV']: QU[0] = 'ea' QU[1] = 1 json_output.update({'QuantityUnit': [int(QU[1]), QU[0]]}) else: json_output.update({'QuantityUnit': [0., 'Undefined']}) incomplete = True json_output.update({'Directional': row['Directional?'] in ['YES']}) json_output.update({'Correlated': row['Correlated?'] in ['YES']}) json_output.update({ 'EDP': { 'Type' : row['Demand Parameter (value):'], 'Unit' : [1, convert_unit[row['Demand Parameter (unit):']]], 'Offset': int( row['Demand Location (use floor above? Yes/No)'] in [ 'Yes']) } }) json_output.update({ 'GeneralInformation': { 'ID' : row['NISTIR Classification'], 'Description': row['Component Description'], 'Author' : row['Author'], 'Official' : root.find('Official').text in ['True', 'true'], 'DateCreated': root.find('DateCreated').text, 'Approved' : root.find('Approved').text in ['True', 'true'], 'Incomplete' : root.find('Incomplete').text in ['True', 'true'], 'Notes' : row['Comments / Notes'] } }) for key in json_output['GeneralInformation'].keys(): if json_output['GeneralInformation'][key] is np.nan: json_output['GeneralInformation'][key] = 'Undefined' json_output.update({ 'Ratings': { 'DataQuality' : row['Data Quality'], 'DataRelevance': row['Data Relevance'], 'Documentation': row['Documentation Quality'], 'Rationality' : row['Rationality'], } }) for key in json_output['Ratings'].keys(): if json_output['Ratings'][key] is np.nan: json_output['Ratings'][key] = 'Undefined' DSH = decode_DS_Hierarchy(row['DS Hierarchy']) json_output.update({'DSGroups': []}) for DSG in DSH: if DSG[0] in ['MutEx', 'Simul']: mu = row['DS {}, Median Demand'.format(DSG[1][-1])] beta = row[ 'DS {}, Total Dispersion (Beta)'.format(DSG[1][-1])] if is_float(mu) and is_float(beta): json_output['DSGroups'].append({ 'MedianEDP' : float(mu), 'Beta' : float(beta), 'CurveType' : 'LogNormal', 'DSGroupType' : convert_DSG_type[DSG[0]], 'DamageStates': DSG[1:] }) else: json_output['DSGroups'].append({ 'MedianEDP' : float(mu) if is_float( mu) else 'Undefined', 'Beta' : float(beta) if is_float( beta) else 'Undefined', 'CurveType' : 'LogNormal', 'DSGroupType' : convert_DSG_type[DSG[0]], 'DamageStates': DSG[1:] }) incomplete = True else: mu = row['DS {}, Median Demand'.format(DSG[-1])] beta = row['DS {}, Total Dispersion (Beta)'.format(DSG[-1])] if is_float(mu) and is_float(beta): json_output['DSGroups'].append({ 'MedianEDP' : float(mu), 'Beta' : float(beta), 'CurveType' : 'LogNormal', 'DSGroupType' : 'Single', 'DamageStates': [DSG], }) else: json_output['DSGroups'].append({ 'MedianEDP' : float(mu) if is_float( mu) else 'Undefined', 'Beta' : float(beta) if is_float( beta) else 'Undefined', 'CurveType' : 'LogNormal', 'DSGroupType' : 'Single', 'DamageStates': [DSG], }) incomplete = True need_INJ = False need_RT = False for DSG in json_output['DSGroups']: DS_list = DSG['DamageStates'] DSG['DamageStates'] = [] for DS in DS_list: # avoid having NaN as repair measures repair_measures = row['DS {}, Repair Description'.format(DS[-1])] if not isinstance(repair_measures, str): repair_measures = "" DSG['DamageStates'].append({ 'Weight' : float(row['DS {}, Probability'.format(DS[-1])]), 'LongLeadTime' : int(row['DS {}, Long Lead Time'.format(DS[-1])] in [ 'YES']), 'Consequences' : {}, 'Description' : row['DS {}, Description'.format(DS[-1])], 'RepairMeasures': repair_measures }) IMG = row['DS{}, Illustrations'.format(DS[-1])] if IMG not in ['none', np.nan]: DSG['DamageStates'][-1].update({'DamageImageName': IMG}) AA = row['DS {} - Casualty Affected Area'.format(DS[-1])] if (isinstance(AA, str) and (is_float(AA.split(' ')[0]))): AA = AA.split(' ') DSG['DamageStates'][-1].update( {'AffectedArea': [int(AA[0]), AA[1]]}) need_INJ = True else: DSG['DamageStates'][-1].update( {'AffectedArea': [0, 'SF']}) DSG['DamageStates'][-1]['Consequences'].update( {'Injuries': [{}, {}]}) INJ0 = DSG[ 'DamageStates'][-1]['Consequences']['Injuries'][0] INJ_mu = row[ 'DS {} Serious Injury Rate - Median'.format(DS[-1])] INJ_beta = row[ 'DS {} Serious Injury Rate - Dispersion'.format(DS[-1])] if is_float(INJ_mu) and is_float(INJ_beta): INJ0.update({ 'Amount' : float(INJ_mu), 'Beta' : float(INJ_beta), 'CurveType': 'Normal', 'Bounds' : [0., 1.] }) if INJ_mu != 0.0: need_INJ = True if DSG['DamageStates'][-1]['AffectedArea'][0] == 0: incomplete = True else: INJ0.update({'Amount' : float(INJ_mu) if is_float(INJ_mu) else 'Undefined', 'Beta' : float(INJ_beta) if is_float(INJ_beta) else 'Undefined', 'CurveType': 'Normal'}) if ((INJ0['Amount'] == 'Undefined') or (INJ0['Beta'] == 'Undefined')): incomplete = True INJ1 = DSG[ 'DamageStates'][-1]['Consequences']['Injuries'][1] INJ_mu = row['DS {} Loss of Life Rate - Median'.format(DS[-1])] INJ_beta = row['DS {} Loss of Life Rate - Dispersion'.format(DS[-1])] if is_float(INJ_mu) and is_float(INJ_beta): INJ1.update({ 'Amount' : float(INJ_mu), 'Beta' : float(INJ_beta), 'CurveType': 'Normal', 'Bounds' : [0., 1.] }) if INJ_mu != 0.0: need_INJ = True if DSG['DamageStates'][-1]['AffectedArea'][0] == 0: incomplete = True else: INJ1.update({'Amount' : float(INJ_mu) if is_float(INJ_mu) else 'Undefined', 'Beta' : float(INJ_beta) if is_float(INJ_beta) else 'Undefined', 'CurveType': 'Normal', 'Bounds': [0., 1.]}) if ((INJ1['Amount'] == 'Undefined') or (INJ1['Beta'] == 'Undefined')): incomplete = True DSG['DamageStates'][-1]['Consequences'].update({'RedTag': {}}) RT = DSG['DamageStates'][-1]['Consequences']['RedTag'] RT_mu = row['DS {}, Unsafe Placard Damage Median'.format(DS[-1])] RT_beta = row['DS {}, Unsafe Placard Damage Dispersion'.format(DS[-1])] if is_float(RT_mu) and is_float(RT_beta): RT.update({ 'Amount' : float(RT_mu), 'Beta' : float(RT_beta), 'CurveType': 'Normal', 'Bounds' : [0., 1.] }) if RT['Amount'] != 0.0: need_RT = True else: RT.update({'Amount' : float(RT_mu[:-1]) if is_float(RT_mu) else 'Undefined', 'Beta' : float(RT_beta[:-1]) if is_float(RT_beta) else 'Undefined', 'CurveType': 'Normal', 'Bounds': [0., 1.]}) if ((RT['Amount'] == 'Undefined') or (RT['Beta'] == 'Undefined')): incomplete = True # remove the unused fields if not need_INJ: for DSG in json_output['DSGroups']: for DS in DSG['DamageStates']: del DS['AffectedArea'] del DS['Consequences']['Injuries'] if not need_RT: for DSG in json_output['DSGroups']: for DS in DSG['DamageStates']: del DS['Consequences']['RedTag'] # collect the repair cost and time consequences from the XML file DSG_list = root.find('DamageStates').findall('DamageState') for DSG_i, DSG_xml in enumerate(DSG_list): if DSG_xml.find('DamageStates') is not None: DS_list = (DSG_xml.find('DamageStates')).findall('DamageState') for DS_i, DS_xml in enumerate(DS_list): r_cost, r_time = parse_DS_xml(DS_xml) CONSEQ = json_output['DSGroups'][DSG_i][ 'DamageStates'][DS_i]['Consequences'] CONSEQ.update({ 'ReconstructionCost': r_cost, 'ReconstructionTime': r_time }) if ((r_cost['Amount'] == 'Undefined') or (r_time['Amount'] == 'Undefined')): incomplete = True else: r_cost, r_time = parse_DS_xml(DSG_xml) CONSEQ = json_output['DSGroups'][DSG_i][ 'DamageStates'][0]['Consequences'] CONSEQ.update({ 'ReconstructionCost': r_cost, 'ReconstructionTime': r_time }) if ((r_cost['Amount'] == 'Undefined') or (r_time['Amount'] == 'Undefined')): incomplete = True if incomplete: json_output['GeneralInformation']['Incomplete'] = True incomplete_count += 1 if DL_dir is None: DL_dir = target_dir / "DL json" DL_dir.mkdir(exist_ok=True) with open(DL_dir / f'{comp_ID}.json', 'w') as f: json.dump(json_output, f, indent=2) #except: # warnings.warn(UserWarning( # 'Error converting data for component {}'.format(comp_ID))) # finally, copy the population file shutil.copy( data_dir / 'population.json', target_dir / 'population.json' )
[docs]def create_HAZUS_EQ_json_files(data_dir, target_dir): """ Create JSON data files from publicly available HAZUS data. HAZUS damage and loss information is publicly available in the technical manuals. The relevant tables have been converted into a JSON input file (hazus_data_eq.json) that is stored in the 'resources/HAZUS MH 2.1' folder in the pelicun repo. Here we read that file (or a file of similar format) and produce damage and loss data for Fragility Groups in the common SimCenter JSON format. HAZUS handles damage and losses at the assembly level differentiating only structural and two types of non-structural component assemblies. In this implementation we consider each of those assemblies a Fragility Group and describe their damage and its consequences in a FEMA P58-like framework but using the data from the HAZUS Technical Manual. Parameters ---------- data_dir: string Path to the folder with the hazus_data_eq JSON file. target_dir: string Path to the folder where the results shall be saved. The population distribution file will be saved here, the DL JSON files will be saved to a 'DL json' subfolder. """ data_dir = Path(data_dir).resolve() target_dir = Path(target_dir).resolve() DL_dir = None convert_design_level = { 'High_code' : 'HC', 'Moderate_code': 'MC', 'Low_code' : 'LC', 'Pre_code' : 'PC' } convert_DS_description = { 'DS1': 'Slight', 'DS2': 'Moderate', 'DS3': 'Extensive', 'DS4': 'Complete', 'DS5': 'Collapse', } # open the raw HAZUS data with open(data_dir / 'hazus_data_eq.json', 'r') as f: raw_data = json.load(f) design_levels = list( raw_data['Structural_Fragility_Groups']['EDP_limits'].keys()) building_types = list( raw_data['Structural_Fragility_Groups']['P_collapse'].keys()) occupancy_types = list(raw_data['Structural_Fragility_Groups'][ 'Reconstruction_cost'].keys()) S_data = raw_data['Structural_Fragility_Groups'] NSA_data = raw_data[ 'NonStructural_Acceleration_Sensitive_Fragility_Groups'] NSD_data = raw_data['NonStructural_Drift_Sensitive_Fragility_Groups'] for ot in occupancy_types: # first, structural fragility groups for dl in design_levels: for bt in building_types: if bt in S_data['EDP_limits'][dl].keys(): json_output = {} dl_id = 'S-{}-{}-{}'.format(bt, convert_design_level[dl], ot) # this might get replaced by a more descriptive name in the future json_output.update({'Name': dl_id}) # General info json_output.update({ 'Directional': False, 'GeneralInformation': { 'ID' : dl_id, 'Description': dl_id, # this might get replaced by more details in the future # other fields can be added here if needed } }) # EDP info json_output.update({ 'EDP': { 'Type': 'Roof Drift Ratio', 'Unit': [1, 'rad'] } }) # Damage State info json_output.update({'DSGroups': []}) EDP_lim = S_data['EDP_limits'][dl][bt] for dsg_i in range(4): json_output['DSGroups'].append({ 'MedianEDP' : EDP_lim[dsg_i], 'Beta' : S_data['Fragility_beta'][dl], 'CurveType' : 'LogNormal', 'DSGroupType' : 'Single', 'DamageStates': [{ 'Weight' : 1.0, 'Consequences': {}, 'Description' : 'DS{}'.format(dsg_i + 1), }] }) # the last DSG is different if dsg_i == 3: json_output['DSGroups'][-1][ 'DSGroupType'] = 'MutuallyExclusive' DS5_w = S_data['P_collapse'][bt] json_output['DSGroups'][-1][ 'DamageStates'].append({ 'Weight' : DS5_w, 'Consequences': {}, 'Description' : 'DS5' }) json_output['DSGroups'][-1]['DamageStates'][0][ 'Weight'] = 1.0 - DS5_w for dsg_i, DSG in enumerate(json_output['DSGroups']): base_cost = S_data['Reconstruction_cost'][ot][dsg_i] / 100. base_time = S_data['Reconstruction_time'][ot][dsg_i] for DS in DSG['DamageStates']: DS_id = DS['Description'] DS['Consequences'] = { # injury rates are provided in percentages of the population 'Injuries' : [ {'Amount': val / 100.} for val in S_data['Injury_rates'][DS_id][bt]], # reconstruction cost is provided in percentages of replacement cost 'ReconstructionCost': { "Amount": base_cost, "CurveType": "N/A", }, 'ReconstructionTime': { "Amount": base_time, "CurveType": "N/A", } } DS['Description'] = convert_DS_description[ DS['Description']] # create the DL json directory (if it does not exist) if DL_dir is None: DL_dir = target_dir / "DL json" DL_dir.mkdir(exist_ok=True) with open(DL_dir / f'{dl_id}.json', 'w') as f: json.dump(json_output, f, indent=2) # second, nonstructural acceleration sensitive fragility groups json_output = {} dl_id = 'NSA-{}-{}'.format(convert_design_level[dl], ot) # this might get replaced by a more descriptive name in the future json_output.update({'Name': dl_id}) # General info json_output.update({ 'Directional': False, 'GeneralInformation': { 'ID' : dl_id, 'Description': dl_id, # this might get replaced by more details in the future # other fields can be added here if needed } }) # EDP info json_output.update({ 'EDP': { 'Type': 'Peak Floor Acceleration', 'Unit': [1, 'g'], 'Offset': NSA_data.get('Offset', 0) } }) # Damage State info json_output.update({'DSGroups': []}) for dsg_i in range(4): base_cost = NSA_data['Reconstruction_cost'][ot][dsg_i] / 100. json_output['DSGroups'].append({ 'MedianEDP' : NSA_data['EDP_limits'][dl][dsg_i], 'Beta' : NSA_data['Fragility_beta'], 'CurveType' : 'LogNormal', 'DSGroupType' : 'Single', 'DamageStates': [{ 'Weight' : 1.0, 'Consequences': { # reconstruction cost is provided in percentages of replacement cost 'ReconstructionCost': { "Amount": base_cost, "CurveType": "N/A", }, }, 'Description' : convert_DS_description[ 'DS{}'.format(dsg_i + 1)] }] }) # create the DL json directory (if it does not exist) if DL_dir is None: DL_dir = target_dir / "DL json" DL_dir.mkdir(exist_ok=True) with open(DL_dir / f'{dl_id}.json', 'w') as f: json.dump(json_output, f, indent=2) # third, nonstructural drift sensitive fragility groups json_output = {} dl_id = 'NSD-{}'.format(ot) # this might get replaced by a more descriptive name in the future json_output.update({'Name': dl_id}) # General info json_output.update({ 'Directional': False, 'GeneralInformation': { 'ID' : dl_id, 'Description': dl_id, # this might get replaced by more details in the future # other fields can be added here if needed } }) # EDP info json_output.update({ 'EDP': { 'Type': 'Roof Drift Ratio', 'Unit': [1, 'rad'] } }) # Damage State info json_output.update({'DSGroups': []}) for dsg_i in range(4): base_cost = NSD_data['Reconstruction_cost'][ot][dsg_i] / 100. json_output['DSGroups'].append({ 'MedianEDP' : NSD_data['EDP_limits'][dsg_i], 'Beta' : NSD_data['Fragility_beta'], 'CurveType' : 'LogNormal', 'DSGroupType' : 'Single', 'DamageStates': [{ 'Weight' : 1.0, 'Consequences': { # reconstruction cost is provided in percentages of replacement cost 'ReconstructionCost': { "Amount": base_cost, "CurveType": "N/A", }, }, 'Description' : convert_DS_description[ 'DS{}'.format(dsg_i + 1)] }] }) if DL_dir is None: DL_dir = target_dir / "DL json" DL_dir.mkdir(exist_ok=True) with open(DL_dir / f'{dl_id}.json', 'w') as f: json.dump(json_output, f, indent=2) # finally, prepare the population distribution data PD_data = raw_data['Population_Distribution'] pop_output = {} for ot in PD_data.keys(): night_ids = raw_data['Parts_of_day']['Nighttime'] day_ids = raw_data['Parts_of_day']['Daytime'] commute_ids = raw_data['Parts_of_day']['Commute'] daily_pop = np.ones(24) daily_pop[night_ids] = PD_data[ot][0] daily_pop[day_ids] = PD_data[ot][1] daily_pop[commute_ids] = PD_data[ot][2] daily_pop = list(daily_pop) # HAZUS does not introduce monthly and weekend/weekday variation pop_output.update({ot: { "weekday": { "daily" : daily_pop, "monthly": list(np.ones(12)) }, "weekend": { "daily" : daily_pop, "monthly": list(np.ones(12)) } }}) with open(target_dir / 'population.json', 'w') as f: json.dump(pop_output, f, indent=2)
[docs]def create_HAZUS_EQ_story_json_files(data_dir, target_dir): """ Create JSON data files from publicly available HAZUS data. HAZUS damage and loss information is publicly available in the technical manuals. The relevant tables have been converted into a JSON input file (hazus_data_eq.json) that is stored in the 'resources/HAZUS MH 2.1' folder in the pelicun repo. Here we read that file (or a file of similar format) and produce damage and loss data for Fragility Groups in the common SimCenter JSON format. HAZUS handles damage and losses at the assembly level differentiating only structural and two types of non-structural component assemblies. In this implementation we consider each of those assemblies a Fragility Group and describe their damage and its consequences in a FEMA P58-like framework but using the data from the HAZUS Technical Manual. Parameters ---------- data_dir: string Path to the folder with the hazus_data_eq JSON file. target_dir: string Path to the folder where the results shall be saved. The population distribution file will be saved here, the DL JSON files will be saved to a 'DL json' subfolder. """ data_dir = Path(data_dir).resolve() target_dir = Path(target_dir).resolve() DL_dir = None convert_design_level = { 'High_code' : 'HC', 'Moderate_code': 'MC', 'Low_code' : 'LC', 'Pre_code' : 'PC' } convert_DS_description = { 'DS1': 'Slight', 'DS2': 'Moderate', 'DS3': 'Extensive', 'DS4': 'Complete', 'DS5': 'Collapse', } # open the raw HAZUS data with open(data_dir / 'hazus_data_eq.json', 'r') as f: raw_data = json.load(f) design_levels = list( raw_data['Structural_Fragility_Groups']['EDP_limits'].keys()) building_types = list( raw_data['Structural_Fragility_Groups']['P_collapse'].keys()) occupancy_types = list(raw_data['Structural_Fragility_Groups'][ 'Reconstruction_cost'].keys()) S_data = raw_data['Structural_Fragility_Groups'] NSA_data = raw_data[ 'NonStructural_Acceleration_Sensitive_Fragility_Groups'] NSD_data = raw_data['NonStructural_Drift_Sensitive_Fragility_Groups'] for ot in occupancy_types: # first, structural fragility groups for dl in design_levels: for bt in building_types: if bt in S_data['EDP_limits'][dl].keys(): json_output = {} dl_id = 'S-{}-{}-{}'.format(bt, convert_design_level[dl], ot) # this might get replaced by a more descriptive name in the future json_output.update({'Name': dl_id}) # General info json_output.update({ 'Directional': True, 'GeneralInformation': { 'ID' : dl_id, 'Description': dl_id, # this might get replaced by more details in the future # other fields can be added here if needed } }) # EDP info json_output.update({ 'EDP': { 'Type': 'Story Drift Ratio', 'Unit': [1, 'rad'] } }) # Damage State info json_output.update({'DSGroups': []}) EDP_lim = S_data['EDP_limits'][dl][bt] for dsg_i in range(4): json_output['DSGroups'].append({ 'MedianEDP' : EDP_lim[dsg_i], 'Beta' : S_data['Fragility_beta'][dl], 'CurveType' : 'LogNormal', 'DSGroupType' : 'Single', 'DamageStates': [{ 'Weight' : 1.0, 'Consequences': {}, 'Description' : 'DS{}'.format(dsg_i + 1), }] }) # the last DSG is different if dsg_i == 3: json_output['DSGroups'][-1][ 'DSGroupType'] = 'MutuallyExclusive' DS5_w = S_data['P_collapse'][bt] json_output['DSGroups'][-1][ 'DamageStates'].append({ 'Weight' : DS5_w, 'Consequences': {}, 'Description' : 'DS5' }) json_output['DSGroups'][-1]['DamageStates'][0][ 'Weight'] = 1.0 - DS5_w for dsg_i, DSG in enumerate(json_output['DSGroups']): base_cost = S_data['Reconstruction_cost'][ot][dsg_i] / 100. base_time = S_data['Reconstruction_time'][ot][dsg_i] for DS in DSG['DamageStates']: DS_id = DS['Description'] DS['Consequences'] = { # injury rates are provided in percentages of the population 'Injuries' : [ {'Amount': val / 100.} for val in S_data['Injury_rates'][DS_id][bt]], # reconstruction cost is provided in percentages of replacement cost 'ReconstructionCost': { "Amount": base_cost, "CurveType": "N/A", }, 'ReconstructionTime': { "Amount": base_time, "CurveType": "N/A", } } DS['Description'] = convert_DS_description[ DS['Description']] # create the DL json directory (if it does not exist) if DL_dir is None: DL_dir = target_dir / "DL json" DL_dir.mkdir(exist_ok=True) with open(DL_dir / f'{dl_id}.json', 'w') as f: json.dump(json_output, f, indent=2) # second, nonstructural acceleration sensitive fragility groups json_output = {} dl_id = 'NSA-{}-{}'.format(convert_design_level[dl], ot) # this might get replaced by a more descriptive name in the future json_output.update({'Name': dl_id}) # General info json_output.update({ 'Directional': False, 'GeneralInformation': { 'ID' : dl_id, 'Description': dl_id, # this might get replaced by more details in the future # other fields can be added here if needed } }) # EDP info json_output.update({ 'EDP': { 'Type': 'Peak Floor Acceleration', 'Unit': [1, 'g'], 'Offset': NSA_data.get('Offset', 0) } }) # Damage State info json_output.update({'DSGroups': []}) for dsg_i in range(4): base_cost = NSA_data['Reconstruction_cost'][ot][dsg_i] / 100. json_output['DSGroups'].append({ 'MedianEDP' : NSA_data['EDP_limits'][dl][dsg_i], 'Beta' : NSA_data['Fragility_beta'], 'CurveType' : 'LogNormal', 'DSGroupType' : 'Single', 'DamageStates': [{ 'Weight' : 1.0, 'Consequences': { # reconstruction cost is provided in percentages of replacement cost 'ReconstructionCost': { "Amount": base_cost, "CurveType": "N/A", }, }, 'Description' : convert_DS_description[ 'DS{}'.format(dsg_i + 1)] }] }) # create the DL json directory (if it does not exist) if DL_dir is None: DL_dir = target_dir / "DL json" DL_dir.mkdir(exist_ok=True) with open(DL_dir / f'{dl_id}.json', 'w') as f: json.dump(json_output, f, indent=2) # third, nonstructural drift sensitive fragility groups json_output = {} dl_id = 'NSD-{}'.format(ot) # this might get replaced by a more descriptive name in the future json_output.update({'Name': dl_id}) # General info json_output.update({ 'Directional': True, 'GeneralInformation': { 'ID' : dl_id, 'Description': dl_id, # this might get replaced by more details in the future # other fields can be added here if needed } }) # EDP info json_output.update({ 'EDP': { 'Type': 'Story Drift Ratio', 'Unit': [1, 'rad'] } }) # Damage State info json_output.update({'DSGroups': []}) for dsg_i in range(4): base_cost = NSD_data['Reconstruction_cost'][ot][dsg_i] / 100. json_output['DSGroups'].append({ 'MedianEDP' : NSD_data['EDP_limits'][dsg_i], 'Beta' : NSD_data['Fragility_beta'], 'CurveType' : 'LogNormal', 'DSGroupType' : 'Single', 'DamageStates': [{ 'Weight' : 1.0, 'Consequences': { # reconstruction cost is provided in percentages of replacement cost 'ReconstructionCost': { "Amount": base_cost, "CurveType": "N/A", }, }, 'Description' : convert_DS_description[ 'DS{}'.format(dsg_i + 1)] }] }) if DL_dir is None: DL_dir = target_dir / "DL json" DL_dir.mkdir(exist_ok=True) with open(DL_dir / f'{dl_id}.json', 'w') as f: json.dump(json_output, f, indent=2) # finally, prepare the population distribution data PD_data = raw_data['Population_Distribution'] pop_output = {} for ot in PD_data.keys(): night_ids = raw_data['Parts_of_day']['Nighttime'] day_ids = raw_data['Parts_of_day']['Daytime'] commute_ids = raw_data['Parts_of_day']['Commute'] daily_pop = np.ones(24) daily_pop[night_ids] = PD_data[ot][0] daily_pop[day_ids] = PD_data[ot][1] daily_pop[commute_ids] = PD_data[ot][2] daily_pop = list(daily_pop) # HAZUS does not introduce monthly and weekend/weekday variation pop_output.update({ot: { "weekday": { "daily" : daily_pop, "monthly": list(np.ones(12)) }, "weekend": { "daily" : daily_pop, "monthly": list(np.ones(12)) } }}) with open(target_dir / 'population.json', 'w') as f: json.dump(pop_output, f, indent=2)
[docs]def create_HAZUS_EQ_PGA_json_files(data_dir, target_dir): """ Create JSON data files from publicly available HAZUS data. HAZUS damage and loss information is publicly available in the technical manuals. The relevant tables have been converted into a JSON input file (hazus_data_eq.json) that is stored in the 'resources/HAZUS MH 2.1 earthquake PGA' folder in the pelicun repo. Here we read that file (or a file of similar format) and produce damage and loss data for Fragility Groups in the common SimCenter JSON format. HAZUS handles damage and losses at the assembly level. This method assumes that we use the fragility curves controlled by equivalent PGA from HAZUS. Those curves are only available for structural component assemblies. A Fragility Group is created for each of the structural configurations that describes their damage and its consequences in a FEMA P58-like framework but using the data from the HAZUS Technical Manual. Parameters ---------- data_dir: string Path to the folder with the hazus_data_eq JSON file. target_dir: string Path to the folder where the results shall be saved. The population distribution file will be saved here, the DL JSON files will be saved to a 'DL json' subfolder. """ data_dir = Path(data_dir).resolve() target_dir = Path(target_dir).resolve() DL_dir = None convert_design_level = { 'High_code' : 'HC', 'Moderate_code': 'MC', 'Low_code' : 'LC', 'Pre_code' : 'PC' } convert_DS_description = { 'DS1': 'Slight', 'DS2': 'Moderate', 'DS3': 'Extensive', 'DS4': 'Complete', 'DS5': 'Collapse', } convert_GF_description = { 'H_S': 'Lateral spreading, shallow foundation', 'H_D': 'Lateral spreading, deep foundation', 'V_S': 'Ground settlement, shallow foundation', 'V_D': 'Ground settlement, deep foundation', } # open the raw HAZUS data with open(data_dir / 'hazus_data_eq.json', 'r') as f: raw_data = json.load(f) # First, the ground shaking fragilities S_data = raw_data['Structural_Fragility_Groups'] design_levels = list(S_data['EDP_limits'].keys()) building_types = list(S_data['P_collapse'].keys()) occupancy_types = list(S_data['Reconstruction_cost'].keys()) for ot in occupancy_types: for dl in design_levels: for bt in building_types: if bt in S_data['EDP_limits'][dl].keys(): json_output = {} dl_id = 'S-{}-{}-{}'.format(bt, convert_design_level[dl], ot) # this might get replaced by a more descriptive name in the future json_output.update({'Name': dl_id}) # General info json_output.update({ 'Directional': True, 'GeneralInformation': { 'ID' : dl_id, 'Description': dl_id, # this might get replaced by more details in the future # other fields can be added here if needed } }) # EDP info json_output.update({ 'EDP': { 'Type': 'Peak Ground Acceleration', 'Unit': [1, 'g'] } }) # Damage State info json_output.update({'DSGroups': []}) EDP_lim = S_data['EDP_limits'][dl][bt] for dsg_i in range(4): json_output['DSGroups'].append({ 'MedianEDP' : EDP_lim[dsg_i], 'Beta' : S_data['Fragility_beta'][dl], 'CurveType' : 'LogNormal', 'DSGroupType' : 'Single', 'DamageStates': [{ 'Weight' : 1.0, 'Consequences': {}, 'Description' : 'DS{}'.format(dsg_i + 1), }] }) # the last DSG is different if dsg_i == 3: json_output['DSGroups'][-1][ 'DSGroupType'] = 'MutuallyExclusive' DS5_w = S_data['P_collapse'][bt] json_output['DSGroups'][-1][ 'DamageStates'].append({ 'Weight' : DS5_w, 'Consequences': {}, 'Description' : 'DS5' }) json_output['DSGroups'][-1]['DamageStates'][0][ 'Weight'] = 1.0 - DS5_w for dsg_i, DSG in enumerate(json_output['DSGroups']): base_cost = S_data['Reconstruction_cost'][ot][dsg_i] / 100. base_time = S_data['Reconstruction_time'][ot][dsg_i] for DS in DSG['DamageStates']: DS_id = DS['Description'] DS['Consequences'] = { # injury rates are provided in percentages of the population 'Injuries' : [ {'Amount': val / 100.} for val in S_data['Injury_rates'][DS_id][bt]], # reconstruction cost is provided in percentages of replacement cost 'ReconstructionCost': { "Amount": base_cost, "CurveType": "N/A", }, 'ReconstructionTime': { "Amount": base_time, "CurveType": "N/A", } } DS['Description'] = convert_DS_description[ DS['Description']] if DL_dir is None: DL_dir = target_dir / "DL json" DL_dir.mkdir(exist_ok=True) with open(DL_dir / f'{dl_id}.json', 'w') as f: json.dump(json_output, f, indent=2) # Second, the ground failure fragilities L_data = raw_data['Ground_Failure'] ground_failure_types = list(L_data['EDP_limits'].keys()) for gf in ground_failure_types: for bt in building_types: if bt in L_data['P_collapse'].keys(): json_output = {} dl_id = f'GF-{gf}-{bt}' # this might get replaced by a more descriptive name in the future json_output.update({'Name': dl_id}) # General info json_output.update({ 'Directional': True, 'GeneralInformation': { 'ID' : dl_id, 'Description': convert_GF_description[gf], # this might get replaced by more details in the future # other fields can be added here if needed } }) # EDP info json_output.update({ 'EDP': { 'Type': 'Permanent Ground Deformation', 'Unit': [1, 'in'] } }) # Damage State info json_output.update({'DSGroups': []}) EDP_lim = L_data['EDP_limits'][gf] beta_vals = L_data['Fragility_beta'][gf] for dsg_i in range(2): json_output['DSGroups'].append({ 'MedianEDP' : EDP_lim[dsg_i], 'Beta' : beta_vals[dsg_i], 'CurveType' : 'LogNormal', 'DSGroupType' : 'Single', 'DamageStates': [{ 'Weight' : 1.0, 'Consequences': {}, 'Description' : 'DS{}'.format(dsg_i + 1), }] }) # the last DSG is different if dsg_i == 1: json_output['DSGroups'][-1][ 'DSGroupType'] = 'MutuallyExclusive' DS5_w = L_data['P_collapse'][bt] json_output['DSGroups'][-1]['DamageStates'].append({ 'Weight' : DS5_w, 'Consequences': {}, 'Description' : 'DS5' }) json_output['DSGroups'][-1]['DamageStates'][0]['Weight'] = 1.0 - DS5_w if DL_dir is None: DL_dir = target_dir / "DL json" DL_dir.mkdir(exist_ok=True) # consequences are handled through propagating damage to other components with open(DL_dir / f'{dl_id}.json', 'w') as f: json.dump(json_output, f, indent=2) # prepare the population distribution data PD_data = raw_data['Population_Distribution'] pop_output = {} for ot in PD_data.keys(): night_ids = raw_data['Parts_of_day']['Nighttime'] day_ids = raw_data['Parts_of_day']['Daytime'] commute_ids = raw_data['Parts_of_day']['Commute'] daily_pop = np.ones(24) daily_pop[night_ids] = PD_data[ot][0] daily_pop[day_ids] = PD_data[ot][1] daily_pop[commute_ids] = PD_data[ot][2] daily_pop = list(daily_pop) # HAZUS does not introduce monthly and weekend/weekday variation pop_output.update({ot: { "weekday": { "daily" : daily_pop, "monthly": list(np.ones(12)) }, "weekend": { "daily" : daily_pop, "monthly": list(np.ones(12)) } }}) with open(target_dir / 'population.json', 'w') as f: json.dump(pop_output, f, indent=2)
[docs]def create_HAZUS_HU_json_files(data_dir, target_dir): """ Create JSON data files from publicly available HAZUS data. HAZUS damage and loss information is publicly available in the technical manuals and the HAZUS software tool. The relevant data have been collected in a series of Excel files (e.g., hu_Wood.xlsx) that are stored in the 'resources/HAZUS MH 2.1 hurricane' folder in the pelicun repo. Here we read that file (or a file of similar format) and produce damage and loss data for Fragility Groups in the common SimCenter JSON format. The HAZUS hurricane methodology handles damage and losses at the assembly level. In this implementation each building is represented by one Fragility Group that describes the damage states and their consequences in a FEMA P58-like framework but using the data from the HAZUS Technical Manual. Note: HAZUS calculates lossess independently of damage using peak wind gust speed as a controlling variable. We fitted a model to the curves in HAZUS that assigns losses to each damage state and determines losses as a function of building damage. Results shall be in good agreement with those of HAZUS for the majority of building configurations. Exceptions and more details are provided in the ... section of the documentation. Parameters ---------- data_dir: string Path to the folder with the hazus_data_eq JSON file. target_dir: string Path to the folder where the results shall be saved. The population distribution file will be saved here, the DL JSON files will be saved to a 'DL json' subfolder. """ data_dir = Path(data_dir).resolve() target_dir = Path(target_dir).resolve() DL_dir = None # open the raw HAZUS data df_wood = pd.read_excel(data_dir / 'hu_Wood.xlsx') # some general formatting to make file name generation easier df_wood['shutters'] = df_wood['shutters'].astype(int) df_wood['terr_rough'] = (df_wood['terr_rough'] * 100.).astype(int) convert_building_type = { 'WSF1' : 'Wood Single-Family Homes 1 story', 'WSF2' : 'Wood Single-Family Homes 2+ stories', 'WMUH1': 'Wood Multi-Unit or Hotel or Motel 1 story', 'WMUH2': 'Wood Multi-Unit or Hotel or Motel 2 stories', 'WMUH3': 'Wood Multi-Unit or Hotel or Motel 3+ stories', } convert_bldg_char_names = { 'roof_shape' : 'Roof Shape', 'sec_water_res' : 'Secondary Water Resistance', 'roof_deck_attch': 'Roof Deck Attachment', 'roof_wall_conn' : 'Roof-Wall Connection', 'garage' : 'Garage', 'shutters' : 'Shutters', 'roof_cover' : 'Roof Cover Type', 'roof_quality' : 'Roof Cover Quality', 'terr_rough' : 'Terrain', } convert_bldg_chars = { 1 : True, 0 : False, 'gab' : 'gable', 'hip' : 'hip', 'flt' : 'flat', '6d' : '6d @ 6"/12"', '8d' : '8d @ 6"/12"', '6s' : '6d/8d mix @ 6"/6"', '8s' : '8D @ 6"/6"', 'tnail': 'Toe-nail', 'strap': 'Strap', 'no' : 'None', 'wkd' : 'Weak', 'std' : 'Standard', 'sup' : 'SFBC 1994', 'bur' : 'BUR', 'spm' : 'SPM', 'god' : 'Good', 'por' : 'Poor', 3 : 'Open', 15 : 'Light Suburban', 35 : 'Suburban', 70 : 'Light Trees', 100 : 'Trees', } convert_dist = { 'normal' : 'Normal', 'lognormal': 'LogNormal', } convert_ds = { 1: 'Minor', 2: 'Moderate', 3: 'Severe', 4: 'Destruction', } for index, row in df_wood.iterrows(): #print(index, end=' ') json_output = {} # define the name of the building damage and loss configuration bldg_type = row["bldg_type"] if bldg_type[:3] == "WSF": cols_of_interest = ["bldg_type", "roof_shape", "sec_water_res", "roof_deck_attch", "roof_wall_conn", "garage", "shutters", "terr_rough"] elif bldg_type[:4] == "WMUH": cols_of_interest = ["bldg_type", "roof_shape", "roof_cover", "roof_quality", "sec_water_res", "roof_deck_attch", "roof_wall_conn", "shutters", "terr_rough"] bldg_chars = row[cols_of_interest] if np.isnan(bldg_chars["sec_water_res"]): bldg_chars["sec_water_res"] = 'null' else: bldg_chars["sec_water_res"] = int(bldg_chars["sec_water_res"]) if bldg_type[:4] == "WMUH": if (not isinstance(bldg_chars["roof_cover"],str) and np.isnan(bldg_chars["roof_cover"])): bldg_chars["roof_cover"] = 'null' if (not isinstance(bldg_chars["roof_quality"], str) and np.isnan(bldg_chars["roof_quality"])): bldg_chars["roof_quality"] = 'null' dl_id = "_".join(bldg_chars.astype(str)) json_output.update({'Name': dl_id}) # general information json_output.update({ 'GeneralInformation': { 'ID' : str(index), 'Description' : dl_id, 'Building type': convert_building_type[bldg_type], } }) for col in cols_of_interest: if (col != 'bldg_type') and (bldg_chars[col] != 'null'): json_output['GeneralInformation'].update({ convert_bldg_char_names[col]: convert_bldg_chars[ bldg_chars[col]] }) # EDP info json_output.update({ 'EDP': { 'Type': 'Peak Gust Wind Speed', 'Unit': [1, 'mph'] } }) # Damage States json_output.update({'DSGroups': []}) for dsg_i in range(1, 5): json_output['DSGroups'].append({ 'MedianEDP' : row['DS{}_mu'.format(dsg_i)], 'Beta' : row['DS{}_sig'.format(dsg_i)], 'CurveType' : convert_dist[row['DS{}_dist'.format(dsg_i)]], 'DSGroupType' : 'Single', 'DamageStates': [{ 'Weight' : 1.0, 'Consequences': { 'ReconstructionCost': { 'Amount': row[ 'L{}'.format(dsg_i)] if dsg_i < 4 else 1.0 } }, 'Description' : convert_ds[dsg_i] }] }) if DL_dir is None: DL_dir = target_dir / "DL json" DL_dir.mkdir(exist_ok=True) with open(DL_dir / f'{dl_id}.json', 'w') as f: json.dump(json_output, f, indent=2)