From 05796cbde895c959bacda3392c4ebb8111d150c9 Mon Sep 17 00:00:00 2001 From: jh-206 Date: Mon, 7 Oct 2024 17:30:33 -0600 Subject: [PATCH] Update data_funcs.py Initial commit of cleaner funcs to process fmda dicts --- fmda/data_funcs.py | 200 +++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 195 insertions(+), 5 deletions(-) diff --git a/fmda/data_funcs.py b/fmda/data_funcs.py index 87a33b5..ca355f9 100644 --- a/fmda/data_funcs.py +++ b/fmda/data_funcs.py @@ -14,9 +14,192 @@ import json import copy import subprocess import os.path as osp -from utils import Dict, str2time, check_increment, time_intp +from utils import Dict, str2time, check_increment, time_intp, read_pkl import warnings + +# New Dict Functions as of 2024-10-2, needs more testing +# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +feature_types = { + # Static features are based on physical location, e.g. location of RAWS site + 'static': ['elev', 'lon', 'lat'], + # Atmospheric weather features come from either RAWS subdict or HRRR + 'atm': ['temp', 'rh', 'wind', 'solar', 'soilm', 'canopyw', 'groundflux', 'Ed', 'Ew'], + # Features that require calculation. NOTE: rain only calculated in HRRR, not RAWS + 'engineered': ['doy', 'hod', 'rain'] +} + +def check_feat(feat, d): + if feat not in d: + raise ValueError(f"Feature {feat} not found") + +def get_time(d, atm="HRRR"): + check_feat('time', d[atm]) + time = str2time(d[atm]['time']) + return time + +def get_static(d, hours): + cols = [] + # Use all static vars, don't allow for missing + names = feature_types['static'] + for feat in names: + check_feat(feat, d['loc']) + cols.append(np.full(hours,d['loc'][feat])) + return cols, names + +def get_hrrr_atm(d, fstep): + cols = [] + # Use all names, don't allow for missing + names = feature_types['atm'].copy() + for feat in names: + check_feat(feat, d["HRRR"][fstep]) + v = d["HRRR"][fstep][feat] + cols.append(v) + return cols, names + +def calc_time_features(time): + names = ['doy', 'hod'] + doy = np.array([dt.timetuple().tm_yday - 1 for dt in time]) + hod = time.astype('datetime64[h]').astype(int) % 24 + cols = [doy, hod] + return cols, names + +def calc_hrrr_rain(d, fstep, fprev): + rain = d["HRRR"][fstep]['precip_accum']- d["HRRR"][fprev]['precip_accum'] + return rain + +def get_raws_atm(d, time): + # may not be the same as requested time vector, used to interpolate to input time + time_raws=str2time(d['RAWS']['time_raws']) + + cols = [] + names = [] + + # Loop through all features, including rain + for feat in feature_types['atm']+['rain']: + if feat in d['RAWS']: + v = d['RAWS'][feat] + v = time_intp(time_raws, v, time) + assert len(v)==len(time), f"RAWS feature {feat} not equal length to input time: {len(v)} vs {len(time)}" + cols.append(v) + names.append(feat) + return cols, names + +def build_features_single(subdict, atm ="HRRR", fstep=None, fprev=None): + # cols = [] + # names = [] + # Get Time variable + time = get_time(subdict) + # Calculate derived time variables + tvars, tnames = calc_time_features(time) + # Get Static Features, extends to hours + static_vars, static_names = get_static(subdict, hours = len(time)) + # Get atmospheric variables based on data source. HRRR requires rain calculation + if atm == "HRRR": + atm_vars, atm_names = get_hrrr_atm(subdict, fstep) + rain = calc_hrrr_rain(subdict, fstep, fprev) + atm_vars.append(rain) + atm_names.append("rain") + elif atm == "RAWS": + atm_vars, atm_names = get_raws_atm(subdict, time) + else: + raise ValueError(f"Unrecognized atmospheric data source: {atm}") + # Put everything together and stack + cols = tvars + static_vars + atm_vars + X = np.column_stack(cols) + names = tnames + static_names + atm_names + + return X, names + +def get_fm(d, time): + fm = d['RAWS']['fm'] + time_raws = str2time(d['RAWS']['time_raws']) + return time_intp(time_raws,fm,time) + +def build_train_dict2(input_file_paths, params_data, spatial=True, atm_source="HRRR", forecast_step=1, verbose=True): + # TODO: process involves multiple loops over dictionary keys. Inefficient, but functionality are isolated in separate functions that call for loops so will be tedious to fix + + + # Define Forecast Step: NOTE: if atm_source == RAWS, this should be zero + if atm_source == "RAWS": + print("Atmospheric data source is RAWS, so forecast_step set to zero") + forecast_step=0 + elif forecast_step > 0 and forecast_step < 100 and forecast_step == int(forecast_step): + fstep='f'+str(forecast_step).zfill(2) + fprev='f'+str(forecast_step-1).zfill(2) + # logging.info('Using data from step %s',fstep) + # logging.info('Using rain as the difference of accumulated precipitation between %s and %s',fstep,fprev) + else: + # logging.critical('forecast_step must be integer between 1 and 99') + raise ValueError('bad forecast_step') + + + # Loop over input dictionary cases, extract and calculate features, then run data filters + new_dict = {} + for input_file_path in input_file_paths: + if verbose: + print("~"*75) + print(f"Extracting data from input file {input_file_path}") + dict0 = read_pkl(input_file_path) + for key in dict0: + # if verbose: + # print("~"*50) + # print(f"Extracting data for case {key}") + X, names = build_features_single(dict0[key], atm=atm_source, fstep=fstep, fprev = fprev) + time = str2time(dict0[key]['HRRR']['time']) + hrrr_increment = check_increment(time,id=key+f' {"HRRR"}.time') + if hrrr_increment < 1: + # logging.critical('HRRR increment is %s h must be at least 1 h',hrrr_increment) + raise(ValueError) + new_dict[key] = { + 'id': key, + 'case': key, + 'filename': input_file_path, + 'loc': dict0[key]['loc'], + 'time': time, + 'X': X, + 'y': get_fm(dict0[key], time), + 'features_list': names, + 'atm_source': atm_source + } + + # Run Data Filters + # Subset timeseries into shorter stretches, discard short ones + if verbose: + print("~"*75) + print(f"Splitting Timeseries into smaller portions to aid with data filtering. Input data param for max timeseries hours: {params_data['hours']}") + new_dict = split_timeseries(new_dict, hours=params_data['hours'], verbose=verbose) + new_dict = discard_keys_with_short_y(new_dict, hours=params_data['hours'], verbose=False) + + # Check for suspect data + flags = flag_dict_keys(new_dict, params_data['zero_lag_threshold'], params_data['max_intp_time'], max_y = params_data['max_fm'], min_y = params_data['min_fm'], verbose=verbose) + + # Remove flagged cases + cases = list([*new_dict.keys()]) + flagged_cases = [element for element, flag in zip(cases, flags) if flag == 1] + remove_key_list(new_dict, flagged_cases, verbose=verbose) + + if spatial: + new_dict = combine_nested(new_dict) + + return Dict(new_dict) + + + + + + + + + + + + + + + + # Wrapper Functions to Put it all together #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -25,7 +208,7 @@ import warnings # doy = np.array([dt.timetuple().tm_yday - 1 for dt in rnn_dat.time]) def create_spatial_train(input_file_paths, params_data, atm_dict = "HRRR", verbose=False): - train = process_train_dict(file_paths, params_data = params_data, verbose=verbose) + train = process_train_dict(input_file_paths, params_data = params_data, verbose=verbose) train_sp = Dict(combine_nested(train)) return train_sp @@ -179,9 +362,9 @@ def build_train_dict(input_file_path, elif feat in feature_types['static']: columns.append(np.full(hours,loc[feat])) # Add Engineered Time features, doy and hod - hod = time_hrrr.astype('datetime64[h]').astype(int) % 24 - doy = np.array([dt.timetuple().tm_yday - 1 for dt in time_hrrr]) - columns.extend([doy, hod]) + # hod = time_hrrr.astype('datetime64[h]').astype(int) % 24 + # doy = np.array([dt.timetuple().tm_yday - 1 for dt in time_hrrr]) + # columns.extend([doy, hod]) # compute rain as difference of accumulated precipitation if 'rain' in features_list: @@ -244,6 +427,7 @@ def build_train_dict(input_file_path, return train + def remove_key_list(d, ls, verbose=False): for key in ls: if key in d: @@ -356,6 +540,11 @@ def discard_keys_with_short_y(input_dict, hours, verbose=False): """ Remove keys from a dictionary where the subkey `y` is less than given hours. Used to remove partial sequences at the end of timeseries after the longer timeseries has been subdivided. """ + + max_y = max(case['y'].shape[0] for case in input_dict.values()) + if max_y < hours: + raise ValueError(f"Given input hours of {hours}, all cases in input dictionary would be filtered out as too short. Max timeseries y length: {max_y}. Try using a larger value for `hours` in data params") + discarded_keys = [key for key, value in input_dict.items() if len(value['y']) < hours] if verbose: @@ -386,6 +575,7 @@ def combine_nested(nested_input_dict, verbose=True): d['time'] = _combine_key(nested_input_dict, 'time') d['X'] = _combine_key(nested_input_dict, 'X') d['y'] = _combine_key(nested_input_dict, 'y') + d['atm_source'] = _combine_key(nested_input_dict, 'atm_source') # Build the loc subdictionary using _combine_key for each loc key d['loc'] = { -- 2.11.4.GIT