1 # v2 training and prediction class infrastructure
7 import tensorflow
as tf
8 import matplotlib
.pyplot
as plt
10 from tensorflow
.keras
.callbacks
import Callback
, EarlyStopping
, TerminateOnNaN
11 # from sklearn.metrics import mean_squared_error
13 from tensorflow
.keras
.layers
import LSTM
, SimpleRNN
, Input
, Dropout
, Dense
15 import reproducibility
16 # from utils import print_dict_summary
17 from abc
import ABC
, abstractmethod
18 from utils
import hash2
, all_items_exist
, hash_ndarray
, hash_weights
19 from data_funcs
import rmse
, plot_data
, compare_dicts
22 from sklearn
.preprocessing
import MinMaxScaler
, StandardScaler
25 #*************************************************************************************
26 # Data Formatting Functions
28 def staircase(x
,y
,timesteps
,datapoints
,return_sequences
=False, verbose
= False):
29 # x [datapoints,features] all inputs
30 # y [datapoints,outputs]
31 # timesteps: split x and y into samples length timesteps, shifted by 1
32 # datapoints: number of timesteps to use for training, no more than y.shape[0]
34 print('staircase: shape x = ',x
.shape
)
35 print('staircase: shape y = ',y
.shape
)
36 print('staircase: timesteps=',timesteps
)
37 print('staircase: datapoints=',datapoints
)
38 print('staircase: return_sequences=',return_sequences
)
41 samples
= datapoints
-timesteps
+1
43 print('staircase: samples=',samples
,'timesteps=',timesteps
,'features=',features
)
44 x_train
= np
.empty([samples
, timesteps
, features
])
47 print('returning all timesteps in a sample')
48 y_train
= np
.empty([samples
, timesteps
, outputs
]) # all
49 for i
in range(samples
):
50 for k
in range(timesteps
):
51 x_train
[i
,k
,:] = x
[i
+k
,:]
52 y_train
[i
,k
,:] = y
[i
+k
,:]
55 print('returning only the last timestep in a sample')
56 y_train
= np
.empty([samples
, outputs
])
57 for i
in range(samples
):
58 for k
in range(timesteps
):
59 x_train
[i
,k
,:] = x
[i
+k
,:]
60 y_train
[i
,:] = y
[i
+timesteps
-1,:]
62 return x_train
, y_train
64 def staircase_2(x
,y
,timesteps
,batch_size
=None,trainsteps
=np
.inf
,return_sequences
=False, verbose
= False):
65 # create RNN training data in multiple batches
69 # timesteps: split x and y into sequences length timesteps
70 # a.k.a. lookback or sequence_length
72 # print params if verbose
74 if batch_size
is None:
75 raise ValueError('staircase_2 requires batch_size')
77 print('staircase_2: shape x = ',x
.shape
)
78 print('staircase_2: shape y = ',y
.shape
)
79 print('staircase_2: timesteps=',timesteps
)
80 print('staircase_2: batch_size=',batch_size
)
81 print('staircase_2: return_sequences=',return_sequences
)
85 datapoints
= min(nx
,ny
,trainsteps
)
87 print('staircase_2: datapoints=',datapoints
)
89 # sequence j in a given batch is assumed to be the continuation of sequence j in the previous batch
90 # https://www.tensorflow.org/guide/keras/working_with_rnns Cross-batch statefulness
92 # example with timesteps=3 batch_size=3 datapoints=15
93 # batch 0: [0 1 2] [1 2 3] [2 3 4]
94 # batch 1: [3 4 5] [4 5 6] [5 6 7]
95 # batch 2: [6 7 8] [7 8 9] [8 9 10]
96 # batch 3: [9 10 11] [10 11 12] [11 12 13]
97 # batch 4: [12 13 14] [13 14 15] when runs out this is the last batch, can be shorter
99 # TODO: implement for multiple locations, same starting time for each batch
101 # batch 0: [0 1 2] [0 1 2] [0 1 2]
102 # batch 1: [3 4 5] [3 4 5] [3 4 5]
103 # batch 2: [6 7 8] [6 7 8] [6 7 8]
104 # TODO: second epoch shift starting time at batch 0 in time
106 # TODO: implement for multiple locations, different starting times for each batch
108 # batch 0: [0 1 2] [1 2 3] [2 3 4]
109 # batch 1: [3 4 5] [4 5 6] [5 6 57
110 # batch 2: [6 7 8] [7 8 9] [8 9 10]
113 # the first sample in batch j starts from timesteps*j and ends with timesteps*(j+1)-1
114 # e.g. the final hidden state of the rnn after the sequence of steps [0 1 2] in batch 0
115 # becomes the starting hidden state of the rnn in the sequence of steps [3 4 5] in batch 1, etc.
117 # sample [0 1 2] means the rnn is used twice to map state 0 -> 1 -> 2
118 # the state at time 0 is fixed but the state is considered a variable at times 1 and 2
119 # the loss is computed from the output at time 2 and the gradient of the loss function by chain rule which ends at time 0 because the state there is a constant -> derivative is zero
120 # sample [3 4 5] means the rnn is used twice to map state 3 -> 4 -> 5 # the state at time 3 is fixed to the output of the first sequence [0 1 2]
121 # the loss is computed from the output at time 5 and the gradient of the loss function by chain rule which ends at time 3 because the state there is considered constant -> derivative is zero
122 # how is the gradient computed? I suppose keras adds gradient wrt the weights at 2 5 8 ... 3 6 9... 4 7 ... and uses that to update the weights
123 # there is only one set of weights h(2) = f(h(1),w) h(1) = f(h(0),w) but w is always the same
124 # each column is a one successive evaluation of h(n+1) = f(h(n),w) for n = n_startn n_start+1,...
125 # the cannot be evaluated efficiently on gpu because gpu is a parallel processor
126 # this of it as each column served by one thread, and the threads are independent because they execute in parallel, there needs to be large number of threads (32 is a good number)\
127 # each batch consists of independent calculations
128 # but it can depend on the result of the previous batch (that's the recurrent parr)
132 max_batches
= datapoints
// timesteps
133 max_sequences
= max_batches
* batch_size
136 print('staircase_2: max_batches=',max_batches
)
137 print('staircase_2: max_sequences=',max_sequences
)
139 x_train
= np
.zeros((max_sequences
, timesteps
, features
))
141 y_train
= np
.empty((max_sequences
, timesteps
, outputs
))
143 y_train
= np
.empty((max_sequences
, outputs
))
145 # build the sequences
147 for i
in range(max_batches
):
148 for j
in range(batch_size
):
149 begin
= i
*timesteps
+ j
150 next
= begin
+ timesteps
151 if next
> datapoints
:
154 print('sequence',k
,'batch',i
,'sample',j
,'data',begin
,'to',next
-1)
155 x_train
[k
,:,:] = x
[begin
:next
,:]
157 y_train
[k
,:,:] = y
[begin
:next
,:]
159 y_train
[k
,:] = y
[next
-1,:]
162 print('staircase_2: shape x_train = ',x_train
.shape
)
163 print('staircase_2: shape y_train = ',y_train
.shape
)
164 print('staircase_2: sequences generated',k
)
165 print('staircase_2: batch_size=',batch_size
)
166 k
= (k
// batch_size
) * batch_size
168 print('staircase_2: removing partial and empty batches at the end, keeping',k
)
169 x_train
= x_train
[:k
,:,:]
171 y_train
= y_train
[:k
,:,:]
173 y_train
= y_train
[:k
,:]
176 print('staircase_2: shape x_train = ',x_train
.shape
)
177 print('staircase_2: shape y_train = ',y_train
.shape
)
179 return x_train
, y_train
182 # Dictionary of scalers, used to avoid multiple object creation and to avoid multiple if statements
184 'minmax': MinMaxScaler(),
185 'standard': StandardScaler()
189 def batch_setup(ids
, batch_size
):
191 Sets up stateful batched training data scheme for RNN training.
193 This function takes a list or array of identifiers (`ids`) and divides them into batches of a specified size (`batch_size`). If the last batch does not have enough elements to meet the `batch_size`, the function will loop back to the start of the identifiers and continue filling the batch until it reaches the required size.
197 ids : list or numpy array
198 A list or numpy array containing the ids to be batched.
201 The desired size of each batch.
205 batches : list of lists
206 A list where each element is a batch (itself a list) of identifiers. Each batch will contain exactly `batch_size` elements.
210 >>> ids = [1, 2, 3, 4, 5]
212 >>> batch_setup(ids, batch_size)
213 [[1, 2, 3], [4, 5, 1]]
217 - If `ids` is shorter than `batch_size`, the returned list will contain a single batch where identifiers are repeated from the start of `ids` until the batch is filled.
219 # Ensure ids is a numpy array
222 # Initialize the list to hold the batches
225 # Use a loop to slice the list/array into batches
226 for i
in range(0, len(x
), batch_size
):
227 batch
= list(x
[i
:i
+ batch_size
])
229 # If the batch is not full, continue from the start
230 while len(batch
) < batch_size
:
231 # Calculate the remaining number of items needed
232 remaining
= batch_size
- len(batch
)
233 # Append the needed number of items from the start of the array
234 batch
.extend(x
[:remaining
])
236 batches
.append(batch
)
240 def staircase_spatial(X
, y
, batch_size
, timesteps
, hours
=None, start_times
= None, verbose
= True):
242 Prepares spatially formatted time series data for RNN training by creating batches of sequences across different locations, stacked to be compatible with stateful models.
244 This function processes multi-location time series data by slicing it into batches and formatting it to fit into a recurrent neural network (RNN) model. It utilizes a staircase-like approach to prepare sequences for each location and then interlaces them to align with stateful RNN structures.
248 X : list of numpy arrays
249 A list where each element is a numpy array containing features for a specific location. The shape of each array is `(total_time_steps, features)`.
251 y : list of numpy arrays
252 A list where each element is a numpy array containing the target values for a specific location. The shape of each array is `(total_time_steps,)`.
255 The number of sequences to include in each batch.
258 The number of time steps to include in each sequence for the RNN.
260 hours : int, optional
261 The length of each time series to consider for each location. If `None`, it defaults to the minimum length of `y` across all locations.
263 start_times : numpy array, optional
264 The initial time step for each location. If `None`, it defaults to an array starting from 0 and incrementing by 1 for each location.
266 verbose : bool, optional
267 If `True`, prints additional information during processing. Default is `True`.
272 A 3D numpy array with shape `(total_sequences, timesteps, features)` containing the prepared feature sequences for all locations.
275 A 2D numpy array with shape `(total_sequences, 1)` containing the corresponding target values for all locations.
278 Number of sequences per location. Used to reset states when location changes. Hidden state of RNN will be reset after n_seqs number of batches
282 - The function handles spatially distributed time series data by batching and formatting it for stateful RNNs.
283 - `hours` determines how much of the time series is used for each location. If not provided, it defaults to the shortest series in `y`.
284 - If `start_times` is not provided, it assumes each location starts its series at progressively later time steps.
285 - The `batch_setup` function is used internally to manage the creation of location and time step batches.
286 - The returned feature sequences `XX` and target sequences `yy` are interlaced to align with the expected input format of stateful RNNs.
289 # Generate ids based on number of distinct timeseries provided
290 n_loc
= len(y
) # assuming each list entry for y is a separate location
291 loc_ids
= np
.arange(n_loc
)
293 # Generate hours and start_times if None
295 print("Setting total hours to minimum length of y in provided dictionary")
296 hours
= min(len(yi
) for yi
in y
)
297 if start_times
is None:
298 print("Setting Start times to offset by 1 hour by location")
299 start_times
= np
.arange(n_loc
)
301 loc_batch
, t_batch
= batch_setup(loc_ids
, batch_size
), batch_setup(start_times
, batch_size
)
303 print(f
"Location ID Batches: {loc_batch}")
304 print(f
"Start Times for Batches: {t_batch}")
306 # Loop over batches and construct with staircase_2
309 for i
in range(0, len(loc_batch
)):
310 locs_i
= loc_batch
[i
]
312 for j
in range(0, len(locs_i
)):
315 # Create RNNData Dict
316 # Subset data to given location and time from t0 to t0+hours
317 k
= locs_i
[j
] # Used to account for fewer locations than batch size
318 X_temp
= X
[k
][t0
:tend
,:]
319 y_temp
= y
[k
][t0
:tend
].reshape(-1,1)
322 Xi
, yi
= staircase_2(
325 timesteps
= timesteps
,
326 batch_size
= 1, # note: using 1 here to format sequences for a single location, not same as target batch size for training data
332 # Drop incomplete batches
333 lens
= [yi
.shape
[0] for yi
in ys
]
336 print(f
"Minimum number of sequences by location: {n_seqs}")
337 print(f
"Applying minimum length to other arrays.")
338 Xs
= [Xi
[:n_seqs
] for Xi
in Xs
]
339 ys
= [yi
[:n_seqs
] for yi
in ys
]
341 # Interlace arrays to match stateful structure
342 n_features
= Xi
.shape
[2]
345 for i
in range(0, len(loc_batch
)):
346 locs_i
= loc_batch
[i
]
347 XXi
= np
.empty((Xs
[0].shape
[0]*batch_size
, 5, n_features
))
348 yyi
= np
.empty((Xs
[0].shape
[0]*batch_size
, 1))
349 for j
in range(0, len(locs_i
)):
350 XXi
[j
::(batch_size
)] = Xs
[locs_i
[j
]]
351 yyi
[j
::(batch_size
)] = ys
[locs_i
[j
]]
354 yy
= np
.concatenate(yys
, axis
=0)
355 XX
= np
.concatenate(XXs
, axis
=0)
358 print(f
"Spatially Formatted X Shape: {XX.shape}")
359 print(f
"Spatially Formatted X Shape: {yy.shape}")
362 return XX
, yy
, n_seqs
364 #***********************************************************************************************
365 ### RNN Class Functionality
367 class RNNParams(dict):
369 A custom dictionary class for handling RNN parameters. Automatically calculates certain params based on others. Overwrites the update method to protect from incompatible parameter choices. Inherits from dict
371 def __init__(self
, input_dict
):
373 Initializes the RNNParams instance and runs checks and shape calculations.
378 A dictionary containing RNN parameters.
380 super().__init
__(input_dict
)
381 # Automatically run checks on initialization
383 # Automatically calculate shapes on initialization
384 self
.calc_param_shapes()
385 def run_checks(self
, verbose
=True):
387 Validates that required keys exist and are of the correct type.
391 verbose : bool, optional
392 If True, prints status messages. Default is True.
394 print("Checking params...")
395 # Keys must exist and be integers
397 'batch_size', 'timesteps', 'rnn_layers',
398 'rnn_units', 'dense_layers', 'dense_units', 'epochs'
402 assert key
in self
, f
"Missing required key: {key}"
403 assert isinstance(self
[key
], int), f
"Key '{key}' must be an integer"
405 # Keys must exist and be lists
406 list_keys
= ['activation', 'features_list', 'dropout', 'time_fracs']
407 for key
in list_keys
:
408 assert key
in self
, f
"Missing required key: {key}"
409 assert isinstance(self
[key
], list), f
"Key '{key}' must be a list"
411 # Keys must exist and be floats
412 float_keys
= ['learning_rate']
413 for key
in float_keys
:
414 assert key
in self
, f
"Missing required key: {key}"
415 assert isinstance(self
[key
], float), f
"Key '{key}' must be a float"
417 print("Input dictionary passed all checks.")
418 def calc_param_shapes(self
, verbose
=True):
420 Calculates and updates the shapes of certain parameters based on input data.
424 verbose : bool, optional
425 If True, prints status messages. Default is True.
428 print("Calculating shape params based on features list, timesteps, and batch size")
429 print(f
"Input Feature List: {self['features_list']}")
430 print(f
"Input Timesteps: {self['timesteps']}")
431 print(f
"Input Batch Size: {self['batch_size']}")
433 n_features
= len(self
['features_list'])
434 batch_shape
= (self
["batch_size"], self
["timesteps"], n_features
)
436 print("Calculated params:")
437 print(f
"Number of features: {n_features}")
438 print(f
"Batch Shape: {batch_shape}")
440 # Update the dictionary
442 'n_features': n_features
,
443 'batch_shape': batch_shape
448 def update(self
, *args
, verbose
=True, **kwargs
):
450 Overwrites the standard update functon from dict. This is to prevent certain keys from being modified directly and to automatically update keys to be compatible with each other. The keys handled relate to the shape of the input data to the RNN.
454 verbose : bool, optional
455 If True, prints status messages. Default is True.
457 # Prevent updating n_features and batch_shape
458 restricted_keys
= {'n_features', 'batch_shape'}
459 keys_to_check
= {'features_list', 'timesteps', 'batch_size'}
461 # Check for restricted keys in args
463 if isinstance(args
[0], dict):
464 if restricted_keys
& args
[0].keys():
465 raise KeyError(f
"Cannot directly update keys: {restricted_keys & args[0].keys()}, \n Instead update one of: {keys_to_check}")
466 elif isinstance(args
[0], (tuple, list)) and all(isinstance(i
, tuple) and len(i
) == 2 for i
in args
[0]):
467 if restricted_keys
& {k
for k
, v
in args
[0]}:
468 raise KeyError(f
"Cannot directly update keys: {restricted_keys & {k for k, v in args[0]}}, \n Instead update one of: {keys_to_check}")
470 # Check for restricted keys in kwargs
471 if restricted_keys
& kwargs
.keys():
472 raise KeyError(f
"Cannot update restricted keys: {restricted_keys & kwargs.keys()}")
475 # Track if specific keys are updated
478 # Update using the standard dict update method
480 if isinstance(args
[0], dict):
481 keys_updated
.update(args
[0].keys() & keys_to_check
)
482 elif isinstance(args
[0], (tuple, list)) and all(isinstance(i
, tuple) and len(i
) == 2 for i
in args
[0]):
483 keys_updated
.update(k
for k
, v
in args
[0] if k
in keys_to_check
)
486 keys_updated
.update(kwargs
.keys() & keys_to_check
)
488 # Call the parent update method
489 super().update(*args
, **kwargs
)
491 # Recalculate shapes if necessary
493 self
.calc_param_shapes(verbose
=verbose
)
496 ## Class for handling input data
499 A custom dictionary class for managing RNN data, with validation, scaling, and train-test splitting functionality.
501 required_keys
= {"loc", "time", "X", "y", "features_list"}
502 def __init__(self
, input_dict
, scaler
=None, features_list
=None):
504 Initializes the RNNData instance, performs checks, and prepares data.
509 A dictionary containing the initial data.
510 scaler : str, optional
511 The name of the scaler to be used (e.g., 'minmax', 'standard'). Default is None.
512 features_list : list, optional
513 A subset of features to be used. Default is None which means all features.
516 # Copy to avoid changing external input
517 input_data
= input_dict
.copy()
518 # Initialize inherited dict class
519 super().__init
__(input_data
)
521 # Check if input data is one timeseries dataset or multiple
522 if type(self
.loc
['STID']) == str:
524 print("Input data is single timeseries.")
525 elif type(self
.loc
['STID']) == list:
527 print("Input data from multiple timeseries.")
529 raise KeyError(f
"Input locations not list or single string")
531 # Set up Data Scaling
533 if scaler
is not None:
534 self
.set_scaler(scaler
)
536 # Rename and define other stuff.
538 self
['hours'] = min(arr
.shape
[0] for arr
in self
.y
)
540 self
['hours'] = len(self
['y'])
542 self
['all_features_list'] = self
.pop('features_list')
543 if features_list
is None:
544 print("Using all input features.")
545 self
.features_list
= self
.all_features_list
547 self
.features_list
= features_list
549 self
.__dict
__.update(self
)
551 # TODO: Fix checks for multilocation
552 def run_checks(self
, verbose
=True):
554 Validates that required keys are present and checks the integrity of data shapes.
558 verbose : bool, optional
559 If True, prints status messages. Default is True.
561 missing_keys
= self
.required_keys
- self
.keys()
563 raise KeyError(f
"Missing required keys: {missing_keys}")
565 # y_shape = np.shape(self.y)
566 # if not (len(y_shape) == 1 or (len(y_shape) == 2 and y_shape[1] == 1)):
567 # raise ValueError(f"'y' must be one-dimensional, with shape (N,) or (N, 1). Current shape is {y_shape}.")
569 # # Check if 'hours' is provided and matches len(y)
570 # if 'hours' in self:
571 # if self.hours != len(self.y):
572 # raise ValueError(f"Provided 'hours' value {self.hours} does not match the length of 'y', which is {len(self.y)}.")
573 # Check desired subset of features is in all input features
574 if not all_items_exist(self
.features_list
, self
.all_features_list
):
575 raise ValueError(f
"Provided 'features_list' {self.features_list} has elements not in input features.")
576 def set_scaler(self
, scaler
):
578 Sets the scaler to be used for data normalization.
583 The name of the scaler (e.g., 'minmax', 'standard').
585 recognized_scalers
= ['minmax', 'standard']
586 if scaler
in recognized_scalers
:
587 print(f
"Setting data scaler: {scaler}")
588 self
.scaler
= scalers
[scaler
]
590 raise ValueError(f
"Unrecognized scaler '{scaler}'. Recognized scalers are: {recognized_scalers}.")
591 def train_test_split(self
, time_fracs
=[1.,0.,0.], space_fracs
=[1.,0.,0.], subset_features
=True, features_list
=None, verbose
=True):
593 Splits the data into training, validation, and test sets.
598 The fraction of data to be used for training.
599 val_frac : float, optional
600 The fraction of data to be used for validation. Default is 0.0.
601 subset_features : bool, optional
602 If True, subsets the data to the specified features list. Default is True.
603 features_list : list, optional
604 A list of features to use for subsetting. Default is None.
605 split_space : bool, optional
606 Whether to split the data based on space. Default is False.
607 verbose : bool, optional
608 If True, prints status messages. Default is True.
610 # Indicate whether multi timeseries or not
611 spatial
= self
.spatial
614 assert np
.sum(time_fracs
) == np
.sum(space_fracs
) == 1., f
"Provided cross validation params don't sum to 1"
615 if (len(time_fracs
) != 3) or (len(space_fracs
) != 3):
616 raise ValueError("Cross-validation params `time_fracs` and `space_fracs` must be lists of length 3, representing (train/validation/test)")
618 train_frac
= time_fracs
[0]
619 val_frac
= time_fracs
[1]
620 test_frac
= time_fracs
[2]
622 # Setup train/val/test in time
623 train_ind
= int(np
.floor(self
.hours
* train_frac
)); self
.train_ind
= train_ind
624 test_ind
= int(train_ind
+ round(self
.hours
* val_frac
)); self
.test_ind
= test_ind
625 # Check for any potential issues with indices
626 if test_ind
> self
.hours
:
627 print(f
"Setting test index to {self.hours}")
628 test_ind
= self
.hours
629 if train_ind
> test_ind
:
630 raise ValueError("Train index must be less than test index.")
632 # Setup train/val/test in space
634 train_frac_sp
= space_fracs
[0]
635 val_frac_sp
= space_fracs
[1]
636 locs
= np
.arange(len(self
.loc
['STID'])) # indices of locations
637 train_size
= int(len(locs
) * train_frac_sp
)
638 val_size
= int(len(locs
) * val_frac_sp
)
640 train_locs
= locs
[:train_size
]
641 val_locs
= locs
[train_size
:train_size
+ val_size
]
642 test_locs
= locs
[train_size
+ val_size
:]
643 # Store Lists of IDs in loc subdirectory
644 self
.loc
['train_locs'] = [self
.case
[i
] for i
in train_locs
]
645 self
.loc
['val_locs'] = [self
.case
[i
] for i
in val_locs
]
646 self
.loc
['test_locs'] = [self
.case
[i
] for i
in test_locs
]
649 # Extract data to desired features, copy to avoid changing input objects
653 if verbose
and self
.features_list
!= self
.all_features_list
:
654 print(f
"Subsetting input data to features_list: {self.features_list}")
655 # Indices to subset all features with based on params features
657 for item
in self
.features_list
:
658 if item
in self
.all_features_list
:
659 indices
.append(self
.all_features_list
.index(item
))
661 print(f
"Warning: feature name '{item}' not found in list of all features from input data")
663 X
= [Xi
[:, indices
] for Xi
in X
]
667 # Training data from 0 to train_ind
668 # Validation data from train_ind to test_ind
669 # Test data from test_ind to end
671 X_train
= [X
[i
] for i
in train_locs
]
672 X_val
= [X
[i
] for i
in val_locs
]
673 X_test
= [X
[i
] for i
in test_locs
]
674 y_train
= [y
[i
] for i
in train_locs
]
675 y_val
= [y
[i
] for i
in val_locs
]
676 y_test
= [y
[i
] for i
in test_locs
]
678 self
.X_train
= [Xi
[:train_ind
] for Xi
in X_train
]
679 self
.y_train
= [yi
[:train_ind
].reshape(-1,1) for yi
in y_train
]
680 if (val_frac
>0) and (val_frac_sp
)>0:
681 self
.X_val
= [Xi
[train_ind
:test_ind
] for Xi
in X_val
]
682 self
.y_val
= [yi
[train_ind
:test_ind
].reshape(-1,1) for yi
in y_val
]
683 self
.X_test
= [Xi
[test_ind
:] for Xi
in X_test
]
684 self
.y_test
= [yi
[test_ind
:].reshape(-1,1) for yi
in y_test
]
686 self
.X_train
= X
[:train_ind
]
687 self
.y_train
= y
[:train_ind
].reshape(-1,1) # assumes y 1-d, change this if vector output
689 self
.X_val
= X
[train_ind
:test_ind
]
690 self
.y_val
= y
[train_ind
:test_ind
].reshape(-1,1) # assumes y 1-d, change this if vector output
691 self
.X_test
= X
[test_ind
:]
692 self
.y_test
= y
[test_ind
:].reshape(-1,1) # assumes y 1-d, change this if vector output
696 # Print statements if verbose
698 print(f
"Train index: 0 to {train_ind}")
699 print(f
"Validation index: {train_ind} to {test_ind}")
700 print(f
"Test index: {test_ind} to {self.hours}")
703 print("Subsetting locations into train/val/test")
704 print(f
"Total Locations: {len(locs)}")
705 print(f
"Train Locations: {len(train_locs)}")
706 print(f
"Val. Locations: {len(val_locs)}")
707 print(f
"Test Locations: {len(test_locs)}")
708 print(f
"X_train[0] shape: {self.X_train[0].shape}, y_train[0] shape: {self.y_train[0].shape}")
709 print(f
"X_val[0] shape: {self.X_val[0].shape}, y_val[0] shape: {self.y_val[0].shape}")
710 print(f
"X_test[0] shape: {self.X_test[0].shape}, y_test[0] shape: {self.y_test[0].shape}")
712 print(f
"X_train shape: {self.X_train.shape}, y_train shape: {self.y_train.shape}")
713 if hasattr(self
, "X_val"):
714 print(f
"X_val shape: {self.X_val.shape}, y_val shape: {self.y_val.shape}")
715 print(f
"X_test shape: {self.X_test.shape}, y_test shape: {self.y_test.shape}")
716 def scale_data(self
, verbose
=True):
718 Scales the training data using the set scaler.
722 verbose : bool, optional
723 If True, prints status messages. Default is True.
725 # Indicate whether multi timeseries or not
726 spatial
= self
.spatial
727 if self
.scaler
is None:
728 raise ValueError("Scaler is not set. Use 'set_scaler' method to set a scaler before scaling data.")
729 # if hasattr(self.scaler, 'n_features_in_'):
730 # warnings.warn("Scale_data has already been called. Exiting to prevent issues.")
732 if not hasattr(self
, "X_train"):
733 raise AttributeError("No X_train within object. Run train_test_split first. This is to avoid fitting the scaler with prediction data.")
735 print(f
"Scaling training data with scaler {self.scaler}, fitting on X_train")
738 # Fit scaler on row-joined training data
739 self
.scaler
.fit(np
.vstack(self
.X_train
))
740 # Transform data using fitted scaler
741 self
.X_train
= [self
.scaler
.transform(Xi
) for Xi
in self
.X_train
]
742 if hasattr(self
, 'X_val'):
743 self
.X_val
= [self
.scaler
.transform(Xi
) for Xi
in self
.X_val
]
744 self
.X_test
= [self
.scaler
.transform(Xi
) for Xi
in self
.X_test
]
746 # Fit the scaler on the training data
747 self
.scaler
.fit(self
.X_train
)
748 # Transform the data using the fitted scaler
749 self
.X_train
= self
.scaler
.transform(self
.X_train
)
750 if hasattr(self
, 'X_val'):
751 self
.X_val
= self
.scaler
.transform(self
.X_val
)
752 self
.X_test
= self
.scaler
.transform(self
.X_test
)
754 # NOTE: only works for non spatial
755 def scale_all_X(self
, verbose
=True):
757 Scales the all data using the set scaler.
761 verbose : bool, optional
762 If True, prints status messages. Default is True.
766 Scaled X matrix, subsetted to features_list.
769 raise ValueError("Not implemented for spatial data")
771 if self
.scaler
is None:
772 raise ValueError("Scaler is not set. Use 'set_scaler' method to set a scaler before scaling data.")
774 print(f
"Scaling all X data with scaler {self.scaler}, fitted on X_train")
777 for item
in self
.features_list
:
778 if item
in self
.all_features_list
:
779 indices
.append(self
.all_features_list
.index(item
))
781 print(f
"Warning: feature name '{item}' not found in list of all features from input data")
782 X
= self
.X
[:, indices
]
783 X
= self
.scaler
.transform(X
)
787 def inverse_scale(self
, return_X
= 'all_hours', save_changes
=False, verbose
=True):
789 Inversely scales the data to its original form.
793 return_X : str, optional
794 Specifies what data to return after inverse scaling. Default is 'all_hours'.
795 save_changes : bool, optional
796 If True, updates the internal data with the inversely scaled values. Default is False.
797 verbose : bool, optional
798 If True, prints status messages. Default is True.
801 print("Inverse scaling data...")
802 X_train
= self
.scaler
.inverse_transform(self
.X_train
)
803 X_val
= self
.scaler
.inverse_transform(self
.X_val
)
804 X_test
= self
.scaler
.inverse_transform(self
.X_test
)
807 print("Inverse transformed data saved")
808 self
.X_train
= X_train
813 print("Inverse scaled, but internal data not changed.")
815 print(f
"Attempting to return {return_X}")
816 if return_X
== "all_hours":
817 return np
.concatenate((X_train
, X_val
, X_test
), axis
=0)
819 print(f
"Unrecognized or unimplemented return value {return_X}")
820 def batch_reshape(self
, timesteps
, batch_size
, hours
=None, verbose
=False, start_times
=None):
822 Restructures input data to RNN using batches and sequences.
827 The size of each training batch to reshape the data.
829 The number of timesteps or sequence length. Consistitutes a single sample
831 Number of timesteps or sequence length used for a single sequence in RNN training. Constitutes a single sample to the model
834 Number of sequences used within a batch of training
839 This method reshapes the data in place.
843 If either 'X_train' or 'y_train' attributes do not exist within the instance.
847 The reshaping method depends on self param "spatial".
848 - spatial == False: Reshapes data assuming no spatial dimensions.
849 - spatial == True: Reshapes data considering spatial dimensions.
853 if not hasattr(self
, 'X_train') or not hasattr(self
, 'y_train'):
854 raise AttributeError("Both 'X_train' and 'y_train' must be set before reshaping batches.")
856 # Indicator of spatial training scheme or not
857 spatial
= self
.spatial
860 print(f
"Reshaping spatial training data using batch size: {batch_size} and timesteps: {timesteps}")
861 self
.X_train
, self
.y_train
, self
.n_seqs
= staircase_spatial(self
.X_train
, self
.y_train
, timesteps
= timesteps
, batch_size
=batch_size
, hours
=hours
, verbose
=verbose
, start_times
=start_times
)
862 if hasattr(self
, "X_val"):
863 print(f
"Reshaping validation data using batch size: {batch_size} and timesteps: {timesteps}")
864 self
.X_val
, self
.y_val
, _
= staircase_spatial(self
.X_val
, self
.y_val
, timesteps
= timesteps
, batch_size
=batch_size
, hours
=None, verbose
=verbose
, start_times
=start_times
)
866 print(f
"Reshaping training data using batch size: {batch_size} and timesteps: {timesteps}")
867 self
.X_train
, self
.y_train
= staircase_2(self
.X_train
, self
.y_train
, timesteps
= timesteps
, batch_size
=batch_size
, verbose
=verbose
)
868 if hasattr(self
, "X_val"):
869 print(f
"Reshaping validation data using batch size: {batch_size} and timesteps: {timesteps}")
870 self
.X_val
, self
.y_val
= staircase_2(self
.X_val
, self
.y_val
, timesteps
= timesteps
, batch_size
=batch_size
, verbose
=verbose
)
872 def print_hashes(self
, attrs_to_check
= ['X', 'y', 'X_train', 'y_train', 'X_val', 'y_val', 'X_test', 'y_test']):
874 Prints the hash of specified data attributes.
878 attrs_to_check : list, optional
879 A list of attribute names to hash and print. Default includes 'X', 'y', and split data.
881 for attr
in attrs_to_check
:
882 if hasattr(self
, attr
):
883 value
= getattr(self
, attr
)
887 print(f
"Hash of {attr}: {hash_ndarray(value)}")
888 def __getattr__(self
, key
):
890 Allows attribute-style access to dictionary keys, a.k.a. enables the "." operator for get elements
895 raise AttributeError(f
"'rnn_data' object has no attribute '{key}'")
897 def __setitem__(self
, key
, value
):
899 Ensures dictionary and attribute updates stay in sync for required keys.
901 super().__setitem
__(key
, value
) # Update the dictionary
902 if key
in self
.required_keys
:
903 super().__setattr
__(key
, value
) # Ensure the attribute is updated as well
905 def __setattr__(self
, key
, value
):
907 Ensures dictionary keys are updated when setting attributes.
912 # Function to check reproduciblity hashes, environment info, and model parameters
913 def check_reproducibility(dict0
, params
, m_hash
, w_hash
):
915 Performs reproducibility checks on a model by comparing current settings and outputs with stored reproducibility information.
920 The data dictionary that should contain reproducibility information under the 'repro_info' attribute.
922 The current model parameters to be checked against the reproducibility information.
924 The hash of the current model predictions.
926 The hash of the current fitted model weights.
931 The function returns None. It issues warnings if any reproducibility checks fail.
935 - Checks are only performed if the `dict0` contains the 'repro_info' attribute.
936 - Issues warnings for mismatches in model weights, predictions, Python version, TensorFlow version, and model parameters.
937 - Skips checks if physics-based initialization is used (not implemented).
939 if not hasattr(dict0
, "repro_info"):
940 warnings
.warn("The provided data dictionary does not have the required 'repro_info' attribute. Not running reproduciblity checks.")
943 repro_info
= dict0
.repro_info
945 if params
['phys_initialize']:
946 hashes
= repro_info
['phys_initialize']
947 warnings
.warn("Physics Initialization not implemented yet. Not running reproduciblity checks.")
949 hashes
= repro_info
['rand_initialize']
950 print(f
"Fitted weights hash: {w_hash} \n Reproducibility weights hash: {hashes['fitted_weights_hash']}")
951 print(f
"Model predictions hash: {m_hash} \n Reproducibility preds hash: {hashes['preds_hash']}")
952 if (w_hash
!= hashes
['fitted_weights_hash']) or (m_hash
!= hashes
['preds_hash']):
953 if w_hash
!= hashes
['fitted_weights_hash']:
954 warnings
.warn("The fitted weights hash does not match the reproducibility weights hash.")
955 if m_hash
!= hashes
['preds_hash']:
956 warnings
.warn("The predictions hash does not match the reproducibility predictions hash.")
958 print("***Reproducibility Checks passed - model weights and model predictions match expected.***")
961 current_py_version
= sys
.version
[0:6]
962 current_tf_version
= tf
.__version
__
963 if current_py_version
!= repro_info
['env_info']['py_version']:
964 warnings
.warn(f
"Python version mismatch: Current Python version is {current_py_version}, "
965 f
"expected {repro_info['env_info']['py_version']}.")
967 if current_tf_version
!= repro_info
['env_info']['tf_version']:
968 warnings
.warn(f
"TensorFlow version mismatch: Current TensorFlow version is {current_tf_version}, "
969 f
"expected {repro_info['env_info']['tf_version']}.")
972 repro_params
= repro_info
.get('params', {})
974 for key
, repro_value
in repro_params
.items():
976 if params
[key
] != repro_value
:
977 warnings
.warn(f
"Parameter mismatch for '{key}': Current value is {params[key]}, "
978 f
"repro value is {repro_value}.")
980 warnings
.warn(f
"Parameter '{key}' is missing in the current params.")
986 Abstract base class for RNN models, providing structure for training, predicting, and running reproducibility checks.
988 def __init__(self
, params
: dict):
990 Initializes the RNNModel with the given parameters.
995 A dictionary containing model parameters.
998 if type(self
) is RNNModel
:
999 raise TypeError("MLModel is an abstract class and cannot be instantiated directly")
1003 def _build_model_train(self
):
1004 """Abstract method to build the training model."""
1008 def _build_model_predict(self
, return_sequences
=True):
1009 """Abstract method to build the prediction model. This model copies weights from the train model but with input structure that allows for easier prediction of arbitrary length timeseries. This model is not to be used for training, or don't use with .fit calls"""
1012 def is_stateful(self
):
1014 Checks whether any of the layers in the internal model (self.model_train) are stateful.
1017 bool: True if at least one layer in the model is stateful, False otherwise.
1019 This method iterates over all the layers in the model and checks if any of them
1020 have the 'stateful' attribute set to True. This is useful for determining if
1021 the model is designed to maintain state across batches during training.
1027 for layer
in self
.model_train
.layers
:
1028 if hasattr(layer
, 'stateful') and layer
.stateful
:
1032 def fit(self
, X_train
, y_train
, plot_history
=True, plot_title
= '',
1033 weights
=None, callbacks
=[], validation_data
=None, return_epochs
=False, *args
, **kwargs
):
1035 Trains the model on the provided training data. Uses the fit method of the training model and then copies the weights over to the prediction model, which has a less restrictive input shape. Formats a list of callbacks to use within the fit method based on params input
1039 X_train : np.ndarray
1040 The input matrix data for training.
1041 y_train : np.ndarray
1042 The target vector data for training.
1043 plot_history : bool, optional
1044 If True, plots the training history. Default is True.
1045 plot_title : str, optional
1046 The title for the training plot. Default is an empty string.
1048 Initial weights for the model. Default is None.
1049 callbacks : list, optional
1050 A list of callback functions to use during training. Default is an empty list.
1051 validation_data : tuple, optional
1052 Validation data to use during training, expected format (X_val, y_val). Default is None.
1053 return_epochs : bool
1054 If True, return the number of epochs that training took. Used to test and optimize early stopping
1056 # verbose_fit argument is for printing out update after each epoch, which gets very long
1057 verbose_fit
= self
.params
['verbose_fit']
1058 verbose_weights
= self
.params
['verbose_weights']
1060 print(f
"Training simple RNN with params: {self.params}")
1063 if self
.params
["reset_states"]:
1064 callbacks
=callbacks
+[ResetStatesCallback(self
.params
), TerminateOnNaN()]
1066 # Early stopping callback requires validation data
1067 if validation_data
is not None:
1068 X_val
, y_val
=validation_data
[0], validation_data
[1]
1069 print("Using early stopping callback.")
1070 early_stop
= EarlyStoppingCallback(patience
= self
.params
['early_stopping_patience'])
1071 callbacks
=callbacks
+[early_stop
]
1073 print(f
"Formatted X_train hash: {hash_ndarray(X_train)}")
1074 print(f
"Formatted y_train hash: {hash_ndarray(y_train)}")
1075 if validation_data
is not None:
1076 print(f
"Formatted X_val hash: {hash_ndarray(X_val)}")
1077 print(f
"Formatted y_val hash: {hash_ndarray(y_val)}")
1078 print(f
"Initial weights before training hash: {hash_weights(self.model_train)}")
1080 ## TODO: Hidden State Initialization
1081 # Evaluate Model once to set nonzero initial state
1082 # self.model_train(X_train[0:self.params['batch_size'],:,:])
1084 if validation_data
is not None:
1085 history
= self
.model_train
.fit(
1087 epochs
=self
.params
['epochs'],
1088 batch_size
=self
.params
['batch_size'],
1089 callbacks
= callbacks
,
1090 verbose
=verbose_fit
,
1091 validation_data
= (X_val
, y_val
),
1095 history
= self
.model_train
.fit(
1097 epochs
=self
.params
['epochs'],
1098 batch_size
=self
.params
['batch_size'],
1099 callbacks
= callbacks
,
1100 verbose
=verbose_fit
,
1105 self
.plot_history(history
,plot_title
)
1107 if self
.params
["verbose_weights"]:
1108 print(f
"Fitted Weights Hash: {hash_weights(self.model_train)}")
1110 # Update Weights for Prediction Model
1111 w_fitted
= self
.model_train
.get_weights()
1112 self
.model_predict
.set_weights(w_fitted
)
1115 # Epoch counting starts at 0, adding 1 for the count
1116 return early_stop
.best_epoch
+ 1
1118 def predict(self
, X_test
):
1120 Generates predictions on the provided test data using the internal prediction model.
1125 The input data for generating predictions.
1130 The predicted values.
1132 print("Predicting test data")
1133 X_test
= self
._format
_pred
_data
(X_test
)
1134 preds
= self
.model_predict
.predict(X_test
).flatten()
1138 def _format_pred_data(self
, X
):
1140 Formats the prediction data for RNN input.
1150 The formatted input data.
1152 return np
.reshape(X
,(1, X
.shape
[0], self
.params
['n_features']))
1154 def plot_history(self
, history
, plot_title
, create_figure
=True):
1156 Plots the training history. Uses log scale on y axis for readability.
1160 history : History object
1161 The training history object from model fitting. Output of keras' .fit command
1163 The title for the plot.
1167 plt
.figure(figsize
=(10, 6))
1168 plt
.semilogy(history
.history
['loss'], label
='Training loss')
1169 if 'val_loss' in history
.history
:
1170 plt
.semilogy(history
.history
['val_loss'], label
='Validation loss')
1171 plt
.title(f
'{plot_title} Model loss')
1174 plt
.legend(loc
='upper left')
1177 def run_model(self
, dict0
, reproducibility_run
=False, plot_period
='all', save_outputs
=True, return_epochs
=False):
1179 Runs the RNN model on input data dictionary, including training, prediction, and reproducibility checks.
1183 dict0 : RNNData (dict)
1184 The dictionary containing the input data and configuration.
1185 reproducibility_run : bool, optional
1186 If True, performs reproducibility checks after running the model. Default is False.
1188 If True, writes model outputs into input dictionary.
1189 return_epochs : bool
1190 If True, returns how many epochs of training happened. Used to optimize params related to early stopping
1195 Model predictions and a dictionary of RMSE errors broken up by time period.
1197 verbose_fit
= self
.params
['verbose_fit']
1198 verbose_weights
= self
.params
['verbose_weights']
1200 dict0
.print_hashes()
1202 X_train
, y_train
, X_test
, y_test
= dict0
.X_train
, dict0
.y_train
, dict0
.X_test
, dict0
.y_test
1203 if 'X_val' in dict0
:
1204 X_val
, y_val
= dict0
.X_val
, dict0
.y_val
1208 case_id
= "Spatial Training Set"
1210 case_id
= dict0
.case
1214 eps
= self
.fit(X_train
, y_train
, plot_title
=case_id
, return_epochs
=return_epochs
)
1216 eps
= self
.fit(X_train
, y_train
, validation_data
= (X_val
, y_val
), plot_title
=case_id
, return_epochs
=return_epochs
)
1218 # Generate Predictions and Evaluate Test Error
1220 m
, errs
= self
._eval
_multi
(dict0
)
1224 m
, errs
= self
._eval
_single
(dict0
, verbose_weights
, reproducibility_run
)
1227 plot_data(dict0
, title
="RNN", title2
=dict0
.case
, plot_period
=plot_period
)
1234 def _eval_single(self
, dict0
, verbose_weights
, reproducibility_run
):
1235 # Generate Predictions,
1236 # run through training to get hidden state set properly for forecast period
1237 print(f
"Running prediction on all input data, Training through Test")
1238 X
= dict0
.scale_all_X()
1239 y
= dict0
.y
.flatten()
1242 print(f
"All X hash: {hash_ndarray(X)}")
1244 m
= self
.predict(X
).flatten()
1246 print(f
"Predictions Hash: {hash_ndarray(m)}")
1248 if reproducibility_run
:
1249 print("Checking Reproducibility")
1250 check_reproducibility(dict0
, self
.params
, hash_ndarray(m
), hash_weights(self
.model_predict
))
1252 # print(dict0.keys())
1253 # Plot final fit and data
1255 # plot_data(dict0, title="RNN", title2=dict0['case'], plot_period=plot_period)
1259 train_ind
= dict0
.train_ind
# index of final training set value
1260 test_ind
= dict0
.test_ind
# index of first test set value
1262 err_train
= rmse(m
[:train_ind
], y
[:train_ind
].flatten())
1263 err_pred
= rmse(m
[test_ind
:], y
[test_ind
:].flatten())
1266 'training': err_train
,
1267 'prediction': err_pred
1271 def _eval_multi(self
, dict0
):
1272 # Train Error: NOT DOING YET. DECIDE WHETHER THIS IS NEEDED
1275 new_data
= np
.stack(dict0
.X_test
, axis
=0)
1276 y_array
= np
.stack(dict0
.y_test
, axis
=0)
1277 preds
= self
.model_predict
.predict(new_data
)
1280 ## Note: not using util rmse function since this approach is for 3d arrays
1281 # Compute the squared differences
1282 squared_diff
= np
.square(preds
- y_array
)
1284 # Mean squared error along the timesteps and dimensions (axis 1 and 2)
1285 mse
= np
.mean(squared_diff
, axis
=(1, 2))
1287 # Root mean squared error (RMSE) for each timeseries
1288 rmses
= np
.sqrt(mse
)
1295 # Helper functions for batch reset schedules
1296 def calc_exp_intervals(bmin
, bmax
, n_epochs
, force_bmax
= True):
1297 # Calculate the exponential intervals for each epoch
1298 epochs
= np
.arange(n_epochs
)
1299 factors
= epochs
/ n_epochs
1300 intervals
= bmin
* (bmax
/ bmin
) ** factors
1302 intervals
[-1] = bmax
# Ensure the last value is exactly bmax
1303 return intervals
.astype(int)
1305 def calc_log_intervals(bmin
, bmax
, n_epochs
, force_bmax
= True):
1306 # Calculate the logarithmic intervals for each epoch
1307 epochs
= np
.arange(n_epochs
)
1308 factors
= np
.log(1 + epochs
) / np
.log(1 + n_epochs
)
1309 intervals
= bmin
+ (bmax
- bmin
) * factors
1311 intervals
[-1] = bmax
# Ensure the last value is exactly bmax
1312 return intervals
.astype(int)
1314 class ResetStatesCallback(Callback
):
1316 Custom callback to reset the states of RNN layers at the end of each epoch and optionally after a specified number of batches.
1320 batch_reset : int, optional
1321 If provided, resets the states of RNN layers after every `batch_reset` batches. Default is None.
1323 # def __init__(self, bmin=None, bmax=None, epochs=None, loc_batch_reset = None, batch_schedule_type='linear', verbose=True):
1324 def __init__(self
, params
=None, verbose
=True):
1326 Initializes the ResetStatesCallback with an optional batch reset interval.
1330 params: dict, optional
1331 Dictionary of parameters. If None provided, only on_epoch_end will trigger reset of hidden states.
1333 Minimum for batch reset schedule
1335 Maximum for batch reset schedule
1337 Number of training epochs.
1338 - loc_batch_reset : int
1339 Interval of batches after which to reset the states of RNN layers for location changes. Triggers reset for training AND validation phases
1340 - batch_schedule_type : str
1341 Type of batch scheduling to be used. Recognized methods are following:
1342 - 'constant' : Used fixed batch reset interval throughout training
1343 - 'linear' : Increases the batch reset interval linearly over epochs from bmin to bmax.
1344 - 'exp' : Increases the batch reset interval exponentially over epochs from bmin to bmax.
1345 - 'log' : Increases the batch reset interval logarithmically over epochs from bmin to bmax.
1350 Only in-place reset of hidden states of RNN that calls uses this callback.
1353 super(ResetStatesCallback
, self
).__init
__()
1355 # Check for optional arguments, set None if missing in input params
1356 arg_list
= ['bmin', 'bmax', 'epochs', 'loc_batch_reset', 'batch_schedule_type']
1357 for arg
in arg_list
:
1358 setattr(self
, arg
, params
.get(arg
, None))
1360 self
.verbose
= verbose
1362 print(f
"Using ResetStatesCallback with Batch Reset Schedule: {self.batch_schedule_type}")
1363 # Calculate the reset intervals for each epoch during initialization
1364 if self
.batch_schedule_type
is not None:
1365 if self
.epochs
is None:
1366 raise ValueError(f
"Arugment `epochs` cannot be none with self.batch_schedule_type: {self.batch_schedule_type}")
1367 self
.batch_reset_intervals
= self
._calc
_reset
_intervals
(self
.batch_schedule_type
)
1369 print(f
"batch_reset_intervals: {self.batch_reset_intervals}")
1371 self
.batch_reset_intervals
= None
1372 def on_epoch_end(self
, epoch
, logs
=None):
1374 Resets the states of RNN layers at the end of each epoch.
1379 The index of the current epoch.
1380 logs : dict, optional
1381 A dictionary containing metrics from the epoch. Default is None.
1383 # print(f" Resetting hidden state after epoch: {epoch+1}", flush=True)
1384 # Iterate over each layer in the model
1385 for layer
in self
.model
.layers
:
1386 # Check if the layer has a reset_states method
1387 if hasattr(layer
, 'reset_states'):
1388 layer
.reset_states()
1389 def _calc_reset_intervals(self
,batch_schedule_type
):
1390 methods
= ['constant', 'linear', 'exp', 'log']
1391 if batch_schedule_type
not in methods
:
1392 raise ValueError(f
"Batch schedule method {batch_schedule_type} not recognized. \n Available methods: {methods}")
1393 if batch_schedule_type
== "constant":
1395 return np
.repeat(self
.bmin
, self
.epochs
).astype(int)
1396 elif batch_schedule_type
== "linear":
1397 return np
.linspace(self
.bmin
, self
.bmax
, self
.epochs
).astype(int)
1398 elif batch_schedule_type
== "exp":
1399 return calc_exp_intervals(self
.bmin
, self
.bmax
, self
.epochs
)
1400 elif batch_schedule_type
== "log":
1401 return calc_log_intervals(self
.bmin
, self
.bmax
, self
.epochs
)
1402 def on_epoch_begin(self
, epoch
, logs
=None):
1403 # Set the reset interval for the current epoch
1404 if self
.batch_reset_intervals
is not None:
1405 self
.current_batch_reset
= self
.batch_reset_intervals
[epoch
]
1407 self
.current_batch_reset
= None
1408 def on_train_batch_end(self
, batch
, logs
=None):
1410 Resets the states of RNN layers during training after a specified number of batches, if `batch_reset` or `loc_batch_reset` are provided. The `batch_reset` is used for stability and to avoid exploding gradients at the beginning of training when a hidden state is being passed with weights that haven't learned yet. The `loc_batch_reset` is used to reset the states when a particular batch is from a new location and thus the hidden state should be passed.
1415 The index of the current batch.
1416 logs : dict, optional
1417 A dictionary containing metrics from the batch. Default is None.
1419 batch_reset
= self
.current_batch_reset
1420 if (batch_reset
is not None and batch
% batch_reset
== 0):
1421 # print(f" Resetting states after batch {batch + 1}")
1422 # Iterate over each layer in the model
1423 for layer
in self
.model
.layers
:
1424 # Check if the layer has a reset_states method
1425 if hasattr(layer
, 'reset_states'):
1426 layer
.reset_states()
1427 def on_test_batch_end(self
, batch
, logs
=None):
1429 Resets the states of RNN layers during validation if `loc_batch_reset` is provided to demarcate a new location and thus avoid passing a hidden state to a wrong location.
1434 The index of the current batch.
1435 logs : dict, optional
1436 A dictionary containing metrics from the batch. Default is None.
1438 loc_batch_reset
= self
.loc_batch_reset
1439 if (loc_batch_reset
is not None and batch
% loc_batch_reset
== 0):
1440 # print(f"Resetting states in Validation mode after batch {batch + 1}")
1441 # Iterate over each layer in the model
1442 for layer
in self
.model
.layers
:
1443 # Check if the layer has a reset_states method
1444 if hasattr(layer
, 'reset_states'):
1445 layer
.reset_states()
1447 ## Learning Schedules
1449 lr_schedule
= tf
.keras
.optimizers
.schedules
.CosineDecay(
1450 initial_learning_rate
=0.01,
1454 # warmup_target=None,
1459 def EarlyStoppingCallback(patience
=5):
1461 Creates an EarlyStopping callback with the specified patience.
1464 patience (int): Number of epochs with no improvement after which training will be stopped.
1467 EarlyStopping: Configured EarlyStopping callback.
1469 return EarlyStopping(
1474 restore_best_weights
=True
1478 'DeltaE': [0,-1], # bias correction
1479 'T1': 0.1, # 1/fuel class (10)
1480 'fm_raise_vs_rain': 0.2 # fm increase per mm rain
1485 def get_initial_weights(model_fit
,params
,scale_fm
=1):
1486 # Given a RNN architecture and hyperparameter dictionary, return array of physics-initiated weights
1488 # model_fit: output of create_RNN_2 with no training
1489 # params: (dict) dictionary of hyperparameters
1490 # rnn_dat: (dict) data dictionary, output of create_rnn_dat
1491 # Returns: numpy ndarray of weights that should be a rough solution to the moisture ODE
1492 DeltaE
= phys_params
['DeltaE']
1493 T1
= phys_params
['T1']
1494 fmr
= phys_params
['fm_raise_vs_rain']
1495 centering
= params
['centering'] # shift activation down
1497 w0_initial
={'Ed':(1.-np
.exp(-T1
))/2,
1498 'Ew':(1.-np
.exp(-T1
))/2,
1499 'rain':fmr
* scale_fm
} # wx - input feature
1500 # wh wb wd bd = bias -1
1502 w_initial
=np
.array([np
.nan
, np
.exp(-0.1), DeltaE
[0]/scale_fm
, # layer 0
1503 1.0, -centering
[0] + DeltaE
[1]/scale_fm
]) # layer 1
1504 if params
['verbose_weights']:
1505 print('Equilibrium moisture correction bias',DeltaE
[0],
1506 'in the hidden layer and',DeltaE
[1],' in the output layer')
1508 w_name
= ['wx','wh','bh','wd','bd']
1510 w
=model_fit
.get_weights()
1511 for j
in range(w
[0].shape
[0]):
1512 feature
= params
['features_list'][j
]
1513 for k
in range(w
[0].shape
[1]):
1514 w
[0][j
][k
]=w0_initial
[feature
]
1515 for i
in range(1,len(w
)): # number of the weight
1516 for j
in range(w
[i
].shape
[0]): # number of the inputs
1518 # initialize all entries of the weight matrix to the same number
1519 for k
in range(w
[i
].shape
[1]):
1520 w
[i
][j
][k
]=w_initial
[i
]/w
[i
].shape
[0]
1522 w
[i
][j
]=w_initial
[i
]
1524 print('weight',i
,'shape',w
[i
].shape
)
1525 raise ValueError("Only 1 or 2 dimensions supported")
1526 if params
['verbose_weights']:
1527 print('weight',i
,w_name
[i
],'shape',w
[i
].shape
,'ndim',w
[i
].ndim
,
1528 'initial: sum',np
.sum(w
[i
],axis
=0),'\nentries',w
[i
])
1532 class RNN(RNNModel
):
1534 A concrete implementation of the RNNModel abstract base class, using simple recurrent cells for hidden recurrent layers.
1539 A dictionary of model parameters.
1540 loss : str, optional
1541 The loss function to use during model training. Default is 'mean_squared_error'.
1543 def __init__(self
, params
, loss
='mean_squared_error'):
1545 Initializes the RNN model by building the training and prediction models.
1549 params : dict or RNNParams
1550 A dictionary containing the model's parameters.
1551 loss : str, optional
1552 The loss function to use during model training. Default is 'mean_squared_error'.
1554 super().__init
__(params
)
1555 self
.model_train
= self
._build
_model
_train
()
1556 self
.model_predict
= self
._build
_model
_predict
()
1558 def _build_model_train(self
):
1560 Builds and compiles the training model, with batch & sequence shape specifications for input.
1564 model : tf.keras.Model
1565 The compiled Keras model for training.
1567 inputs
= tf
.keras
.Input(batch_shape
=self
.params
['batch_shape'])
1569 for i
in range(self
.params
['rnn_layers']):
1570 # Return sequences True if recurrent layer feeds into another recurrent layer.
1571 # False if feeds into dense layer
1572 return_sequences
= True if i
< self
.params
['rnn_layers'] - 1 else False
1574 units
=self
.params
['rnn_units'],
1575 activation
=self
.params
['activation'][0],
1576 dropout
=self
.params
["dropout"][0],
1577 recurrent_dropout
= self
.params
["recurrent_dropout"],
1578 stateful
=self
.params
['stateful'],
1579 return_sequences
=return_sequences
)(x
)
1580 if self
.params
["dropout"][1] > 0:
1581 x
= Dropout(self
.params
["dropout"][1])(x
)
1582 for i
in range(self
.params
['dense_layers']):
1583 x
= Dense(self
.params
['dense_units'], activation
=self
.params
['activation'][1])(x
)
1584 # Add final output layer, must be 1 dense cell with linear activation if continuous scalar output
1585 x
= Dense(units
=1, activation
='linear')(x
)
1586 model
= tf
.keras
.Model(inputs
=inputs
, outputs
=x
)
1587 optimizer
=tf
.keras
.optimizers
.Adam(learning_rate
=self
.params
['learning_rate'])
1588 # optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule)
1589 model
.compile(loss
='mean_squared_error', optimizer
=optimizer
)
1591 if self
.params
["verbose_weights"]:
1592 print(f
"Initial Weights Hash: {hash_weights(model)}")
1593 # print(model.get_weights())
1595 if self
.params
['phys_initialize']:
1596 assert self
.params
['scaler'] == 'reproducibility', f
"Not implemented yet to do physics initialize with given data scaling {self.params['scaler']}"
1597 assert self
.params
['features_list'] == ['Ed', 'Ew', 'rain'], f
"Physics initiation can only be done with features ['Ed', 'Ew', 'rain'], but given features {self.params['features_list']}"
1598 print("Initializing Model with Physics based weights")
1599 w
, w_name
=get_initial_weights(model
, self
.params
)
1600 model
.set_weights(w
)
1601 print('initial weights hash =',hash_weights(model
))
1604 def _build_model_predict(self
, return_sequences
=True):
1606 Builds and compiles the prediction model, doesn't use batch shape nor sequence length to make it easier to predict arbitrary number of timesteps. This model has weights copied over from training model is not directly used for training itself.
1610 return_sequences : bool, optional
1611 Whether to return the full sequence of outputs. Default is True.
1615 model : tf.keras.Model
1616 The compiled Keras model for prediction.
1618 inputs
= tf
.keras
.Input(shape
=(None,self
.params
['n_features']))
1620 for i
in range(self
.params
['rnn_layers']):
1621 x
= SimpleRNN(self
.params
['rnn_units'],activation
=self
.params
['activation'][0],
1622 stateful
=False,return_sequences
=return_sequences
)(x
)
1623 for i
in range(self
.params
['dense_layers']):
1624 x
= Dense(self
.params
['dense_units'], activation
=self
.params
['activation'][1])(x
)
1625 # Add final output layer, must be 1 dense cell with linear activation if continuous scalar output
1626 x
= Dense(units
=1, activation
='linear')(x
)
1627 model
= tf
.keras
.Model(inputs
=inputs
, outputs
=x
)
1628 optimizer
=tf
.keras
.optimizers
.Adam(learning_rate
=self
.params
['learning_rate'])
1629 model
.compile(loss
='mean_squared_error', optimizer
=optimizer
)
1631 # Set Weights to model_train
1632 w_fitted
= self
.model_train
.get_weights()
1633 model
.set_weights(w_fitted
)
1638 class RNN_LSTM(RNNModel
):
1640 A concrete implementation of the RNNModel abstract base class, use LSTM cells for hidden recurrent layers.
1645 A dictionary of model parameters.
1646 loss : str, optional
1647 The loss function to use during model training. Default is 'mean_squared_error'.
1649 def __init__(self
, params
, loss
='mean_squared_error'):
1651 Initializes the RNN model by building the training and prediction models.
1655 params : dict or RNNParams
1656 A dictionary containing the model's parameters.
1657 loss : str, optional
1658 The loss function to use during model training. Default is 'mean_squared_error'.
1660 super().__init
__(params
)
1661 self
.model_train
= self
._build
_model
_train
()
1662 self
.model_predict
= self
._build
_model
_predict
()
1664 def _build_model_train(self
):
1666 Builds and compiles the training model, with batch & sequence shape specifications for input.
1670 model : tf.keras.Model
1671 The compiled Keras model for training.
1673 inputs
= tf
.keras
.Input(batch_shape
=self
.params
['batch_shape'])
1675 for i
in range(self
.params
['rnn_layers']):
1676 return_sequences
= True if i
< self
.params
['rnn_layers'] - 1 else False
1678 units
=self
.params
['rnn_units'],
1679 activation
=self
.params
['activation'][0],
1680 dropout
=self
.params
["dropout"][0],
1681 recurrent_dropout
= self
.params
["recurrent_dropout"],
1682 recurrent_activation
=self
.params
["recurrent_activation"],
1683 stateful
=self
.params
['stateful'],
1684 return_sequences
=return_sequences
)(x
)
1685 if self
.params
["dropout"][1] > 0:
1686 x
= Dropout(self
.params
["dropout"][1])(x
)
1687 for i
in range(self
.params
['dense_layers']):
1688 x
= Dense(self
.params
['dense_units'], activation
=self
.params
['activation'][1])(x
)
1689 model
= tf
.keras
.Model(inputs
=inputs
, outputs
=x
)
1690 # optimizer=tf.keras.optimizers.Adam(learning_rate=self.params['learning_rate'], clipvalue=self.params['clipvalue'])
1691 optimizer
=tf
.keras
.optimizers
.Adam(learning_rate
=self
.params
['learning_rate'])
1692 model
.compile(loss
='mean_squared_error', optimizer
=optimizer
)
1694 if self
.params
["verbose_weights"]:
1695 print(f
"Initial Weights Hash: {hash_weights(model)}")
1697 def _build_model_predict(self
, return_sequences
=True):
1699 Builds and compiles the prediction model, doesn't use batch shape nor sequence length to make it easier to predict arbitrary number of timesteps. This model has weights copied over from training model is not directly used for training itself.
1703 return_sequences : bool, optional
1704 Whether to return the full sequence of outputs. Default is True.
1708 model : tf.keras.Model
1709 The compiled Keras model for prediction.
1711 inputs
= tf
.keras
.Input(shape
=(None,self
.params
['n_features']))
1713 for i
in range(self
.params
['rnn_layers']):
1715 units
=self
.params
['rnn_units'],
1716 activation
=self
.params
['activation'][0],
1717 stateful
=False,return_sequences
=return_sequences
)(x
)
1718 for i
in range(self
.params
['dense_layers']):
1719 x
= Dense(self
.params
['dense_units'], activation
=self
.params
['activation'][1])(x
)
1720 model
= tf
.keras
.Model(inputs
=inputs
, outputs
=x
)
1721 optimizer
=tf
.keras
.optimizers
.Adam(learning_rate
=self
.params
['learning_rate'])
1722 model
.compile(loss
='mean_squared_error', optimizer
=optimizer
)
1724 # Set Weights to model_train
1725 w_fitted
= self
.model_train
.get_weights()
1726 model
.set_weights(w_fitted
)