1 # v2 training and prediction class infrastructure
6 import tensorflow
as tf
7 import matplotlib
.pyplot
as plt
9 from tensorflow
.keras
.callbacks
import Callback
, EarlyStopping
, TerminateOnNaN
10 # from sklearn.metrics import mean_squared_error
12 from tensorflow
.keras
.layers
import LSTM
, SimpleRNN
, Input
, Dropout
, Dense
14 import reproducibility
15 # from utils import print_dict_summary
16 from abc
import ABC
, abstractmethod
17 from utils
import hash2
, all_items_exist
, hash_ndarray
, hash_weights
18 from data_funcs
import rmse
, plot_data
, compare_dicts
21 from sklearn
.preprocessing
import MinMaxScaler
, StandardScaler
24 #*************************************************************************************
25 # Data Formatting Functions
27 def staircase(x
,y
,timesteps
,datapoints
,return_sequences
=False, verbose
= False):
28 # x [datapoints,features] all inputs
29 # y [datapoints,outputs]
30 # timesteps: split x and y into samples length timesteps, shifted by 1
31 # datapoints: number of timesteps to use for training, no more than y.shape[0]
33 print('staircase: shape x = ',x
.shape
)
34 print('staircase: shape y = ',y
.shape
)
35 print('staircase: timesteps=',timesteps
)
36 print('staircase: datapoints=',datapoints
)
37 print('staircase: return_sequences=',return_sequences
)
40 samples
= datapoints
-timesteps
+1
42 print('staircase: samples=',samples
,'timesteps=',timesteps
,'features=',features
)
43 x_train
= np
.empty([samples
, timesteps
, features
])
46 print('returning all timesteps in a sample')
47 y_train
= np
.empty([samples
, timesteps
, outputs
]) # all
48 for i
in range(samples
):
49 for k
in range(timesteps
):
50 x_train
[i
,k
,:] = x
[i
+k
,:]
51 y_train
[i
,k
,:] = y
[i
+k
,:]
54 print('returning only the last timestep in a sample')
55 y_train
= np
.empty([samples
, outputs
])
56 for i
in range(samples
):
57 for k
in range(timesteps
):
58 x_train
[i
,k
,:] = x
[i
+k
,:]
59 y_train
[i
,:] = y
[i
+timesteps
-1,:]
61 return x_train
, y_train
63 def staircase_2(x
,y
,timesteps
,batch_size
=None,trainsteps
=np
.inf
,return_sequences
=False, verbose
= False):
64 # create RNN training data in multiple batches
68 # timesteps: split x and y into sequences length timesteps
69 # a.k.a. lookback or sequence_length
71 # print params if verbose
73 if batch_size
is None:
74 raise ValueError('staircase_2 requires batch_size')
76 print('staircase_2: shape x = ',x
.shape
)
77 print('staircase_2: shape y = ',y
.shape
)
78 print('staircase_2: timesteps=',timesteps
)
79 print('staircase_2: batch_size=',batch_size
)
80 print('staircase_2: return_sequences=',return_sequences
)
84 datapoints
= min(nx
,ny
,trainsteps
)
86 print('staircase_2: datapoints=',datapoints
)
88 # sequence j in a given batch is assumed to be the continuation of sequence j in the previous batch
89 # https://www.tensorflow.org/guide/keras/working_with_rnns Cross-batch statefulness
91 # example with timesteps=3 batch_size=3 datapoints=15
92 # batch 0: [0 1 2] [1 2 3] [2 3 4]
93 # batch 1: [3 4 5] [4 5 6] [5 6 7]
94 # batch 2: [6 7 8] [7 8 9] [8 9 10]
95 # batch 3: [9 10 11] [10 11 12] [11 12 13]
96 # batch 4: [12 13 14] [13 14 15] when runs out this is the last batch, can be shorter
98 # TODO: implement for multiple locations, same starting time for each batch
100 # batch 0: [0 1 2] [0 1 2] [0 1 2]
101 # batch 1: [3 4 5] [3 4 5] [3 4 5]
102 # batch 2: [6 7 8] [6 7 8] [6 7 8]
103 # TODO: second epoch shift starting time at batch 0 in time
105 # TODO: implement for multiple locations, different starting times for each batch
107 # batch 0: [0 1 2] [1 2 3] [2 3 4]
108 # batch 1: [3 4 5] [4 5 6] [5 6 57
109 # batch 2: [6 7 8] [7 8 9] [8 9 10]
112 # the first sample in batch j starts from timesteps*j and ends with timesteps*(j+1)-1
113 # e.g. the final hidden state of the rnn after the sequence of steps [0 1 2] in batch 0
114 # becomes the starting hidden state of the rnn in the sequence of steps [3 4 5] in batch 1, etc.
116 # sample [0 1 2] means the rnn is used twice to map state 0 -> 1 -> 2
117 # the state at time 0 is fixed but the state is considered a variable at times 1 and 2
118 # the loss is computed from the output at time 2 and the gradient of the loss function by chain rule which ends at time 0 because the state there is a constant -> derivative is zero
119 # sample [3 4 5] means the rnn is used twice to map state 3 -> 4 -> 5 # the state at time 3 is fixed to the output of the first sequence [0 1 2]
120 # the loss is computed from the output at time 5 and the gradient of the loss function by chain rule which ends at time 3 because the state there is considered constant -> derivative is zero
121 # how is the gradient computed? I suppose keras adds gradient wrt the weights at 2 5 8 ... 3 6 9... 4 7 ... and uses that to update the weights
122 # there is only one set of weights h(2) = f(h(1),w) h(1) = f(h(0),w) but w is always the same
123 # each column is a one successive evaluation of h(n+1) = f(h(n),w) for n = n_startn n_start+1,...
124 # the cannot be evaluated efficiently on gpu because gpu is a parallel processor
125 # this of it as each column served by one thread, and the threads are independent because they execute in parallel, there needs to be large number of threads (32 is a good number)\
126 # each batch consists of independent calculations
127 # but it can depend on the result of the previous batch (that's the recurrent parr)
131 max_batches
= datapoints
// timesteps
132 max_sequences
= max_batches
* batch_size
135 print('staircase_2: max_batches=',max_batches
)
136 print('staircase_2: max_sequences=',max_sequences
)
138 x_train
= np
.zeros((max_sequences
, timesteps
, features
))
140 y_train
= np
.empty((max_sequences
, timesteps
, outputs
))
142 y_train
= np
.empty((max_sequences
, outputs
))
144 # build the sequences
146 for i
in range(max_batches
):
147 for j
in range(batch_size
):
148 begin
= i
*timesteps
+ j
149 next
= begin
+ timesteps
150 if next
> datapoints
:
153 print('sequence',k
,'batch',i
,'sample',j
,'data',begin
,'to',next
-1)
154 x_train
[k
,:,:] = x
[begin
:next
,:]
156 y_train
[k
,:,:] = y
[begin
:next
,:]
158 y_train
[k
,:] = y
[next
-1,:]
161 print('staircase_2: shape x_train = ',x_train
.shape
)
162 print('staircase_2: shape y_train = ',y_train
.shape
)
163 print('staircase_2: sequences generated',k
)
164 print('staircase_2: batch_size=',batch_size
)
165 k
= (k
// batch_size
) * batch_size
167 print('staircase_2: removing partial and empty batches at the end, keeping',k
)
168 x_train
= x_train
[:k
,:,:]
170 y_train
= y_train
[:k
,:,:]
172 y_train
= y_train
[:k
,:]
175 print('staircase_2: shape x_train = ',x_train
.shape
)
176 print('staircase_2: shape y_train = ',y_train
.shape
)
178 return x_train
, y_train
181 # Dictionary of scalers, used to avoid multiple object creation and to avoid multiple if statements
183 'minmax': MinMaxScaler(),
184 'standard': StandardScaler()
188 ## DEPRECATED, use RNNData class instead
189 def create_rnn_data2(dict1
, params
, atm_dict
="HRRR", verbose
=False, train_ind
=None, test_ind
=None):
190 # Given fmda data and hyperparameters, return formatted dictionary to be used in RNN
192 # d: (dict) fmda dictionary
193 # params: (dict) hyperparameters
194 # atm_dict: (str) string specifying name of subdictionary for atmospheric vars
195 # train_frac: (float) fraction of data to use for training (starting from time 0)
196 # val_frac: (float) fraction of data to use for validation data (starting from end of train)
197 # Returns: (dict) formatted data used in RNN
198 logging
.info('create_rnn_data start')
199 # Copy Dictionary to avoid changing the input to this function
200 d
=copy
.deepcopy(dict1
)
201 scale
= params
['scale']
202 scaler
= params
['scaler']
203 # Features list given by params dict to be used in training
204 features_list
= params
["features_list"]
205 # All available features list, corresponds to shape of X
206 features_all
= d
["features_list"]
207 # Indices to subset all features with based on params features
209 for item
in features_list
:
210 if item
in features_all
:
211 indices
.append(features_all
.index(item
))
213 print(f
"Warning: feature name '{item}' not found in list of all features from input data")
215 # Extract desired features based on params, combine into matrix
216 # Extract response vector
218 y
= np
.reshape(y
,(-1,1))
219 # Extract Features matrix, subset to desired features
220 X_raw
= d
['X'][:, indices
].copy() # saw untransformed features matrix
224 # Check total observed hours
226 assert hours
== y
.shape
[0] # Check that it matches response
228 logging
.info('create_rnn_data: total_hours=%s',hours
)
229 logging
.info('feature matrix X shape %s',np
.shape(X
))
230 logging
.info('target matrix Y shape %s',np
.shape(y
))
231 logging
.info('features_list: %s',features_list
)
233 logging
.info('splitting train/val/test')
234 if train_ind
is None:
235 train_ind
= round(hours
* params
['train_frac']) # index of last training observation
236 test_ind
= train_ind
+ round(hours
* params
['val_frac'])# index of first test observation, if no validation data it is equal to train_ind
237 logging
.info('Final index of training data=%s',train_ind
)
238 logging
.info('First index of Test data=%s',test_ind
)
239 # Training data from 0 to train_ind
240 X_train
= X
[:train_ind
]
241 y_train
= y
[:train_ind
].reshape(-1,1)
242 # Validation data from train_ind to test_ind
243 X_val
= X
[train_ind
:test_ind
]
244 y_val
= y
[train_ind
:test_ind
].reshape(-1,1)
245 # Test data from test_ind to end
246 X_test
= X
[test_ind
:]
247 y_test
= y
[test_ind
:].reshape(-1,1)
249 # Scale Data if required
251 # Remove need for "scale_fm" param
252 # Reset reproducibility with this scaling
254 logging
.info('Scaling feature data with scaler: %s',scaler
)
256 if scaler
=="reproducibility":
257 scale_fm
= 17.076346687085564
262 # Fit scaler to training data
263 scalers
[scaler
].fit(X_train
)
264 # Apply scaling to all data using in-place operations
265 X_train
[:] = scalers
[scaler
].transform(X_train
)
266 if X_val
.shape
[0] > 0:
267 X_val
[:] = scalers
[scaler
].transform(X_val
)
268 X_test
[:] = scalers
[scaler
].transform(X_test
)
272 print("Not scaling data")
277 logging
.info('x_train shape=%s',X_train
.shape
)
278 logging
.info('y_train shape=%s',y_train
.shape
)
279 if test_ind
== train_ind
:
280 logging
.info('No validation data')
281 elif X_val
.shape
[0]!= 0:
282 logging
.info('X_val shape=%s',X_val
.shape
)
283 logging
.info('y_val shape=%s',y_val
.shape
)
284 logging
.info('X_test shape=%s',X_test
.shape
)
285 logging
.info('y_test shape=%s',y_test
.shape
)
287 # Set up return dictionary
291 'features_list':features_list
,
292 'n_features': len(features_list
),
294 'train_ind':train_ind
,
305 if X_val
.shape
[0] > 0:
311 # Update RNN params using data attributes
312 logging
.info('Updating model params based on data')
313 timesteps
= params
['timesteps']
314 batch_size
= params
['batch_size']
315 logging
.info('batch_size=%s',batch_size
)
316 logging
.info('timesteps=%s',timesteps
)
317 features
= len(features_list
)
319 # 'n_features': features,
320 # 'batch_shape': (params["batch_size"],params["timesteps"],features),
321 # 'pred_input_shape': (None, features),
323 # 'scale_fm': scale_fm,
324 # 'scale_rain': scale_rain
328 'scale_fm': scale_fm
,
329 'scale_rain': scale_rain
332 logging
.info('create_rnn_data2 done')
335 #***********************************************************************************************
336 ### RNN Class Functionality
338 class RNNParams(dict):
340 A custom dictionary class for handling RNN parameters. Automatically calculates certain params based on others. Overwrites the update method to protect from incompatible parameter choices. Inherits from dict
342 def __init__(self
, input_dict
):
344 Initializes the RNNParams instance and runs checks and shape calculations.
349 A dictionary containing RNN parameters.
351 super().__init
__(input_dict
)
352 # Automatically run checks on initialization
354 # Automatically calculate shapes on initialization
355 self
.calc_param_shapes()
356 def run_checks(self
, verbose
=True):
358 Validates that required keys exist and are of the correct type.
362 verbose : bool, optional
363 If True, prints status messages. Default is True.
365 print("Checking params...")
366 # Keys must exist and be integers
368 'batch_size', 'timesteps', 'rnn_layers',
369 'rnn_units', 'dense_layers', 'dense_units', 'epochs'
373 assert key
in self
, f
"Missing required key: {key}"
374 assert isinstance(self
[key
], int), f
"Key '{key}' must be an integer"
376 # Keys must exist and be lists
377 list_keys
= ['activation', 'features_list', 'dropout']
378 for key
in list_keys
:
379 assert key
in self
, f
"Missing required key: {key}"
380 assert isinstance(self
[key
], list), f
"Key '{key}' must be a list"
382 # Keys must exist and be floats
383 float_keys
= ['learning_rate', 'train_frac', 'val_frac']
384 for key
in float_keys
:
385 assert key
in self
, f
"Missing required key: {key}"
386 assert isinstance(self
[key
], float), f
"Key '{key}' must be a float"
388 print("Input dictionary passed all checks.")
389 def calc_param_shapes(self
, verbose
=True):
391 Calculates and updates the shapes of certain parameters based on input data.
395 verbose : bool, optional
396 If True, prints status messages. Default is True.
399 print("Calculating shape params based on features list, timesteps, and batch size")
400 print(f
"Input Feature List: {self['features_list']}")
401 print(f
"Input Timesteps: {self['timesteps']}")
402 print(f
"Input Batch Size: {self['batch_size']}")
404 n_features
= len(self
['features_list'])
405 batch_shape
= (self
["batch_size"], self
["timesteps"], n_features
)
407 print("Calculated params:")
408 print(f
"Number of features: {n_features}")
409 print(f
"Batch Shape: {batch_shape}")
411 # Update the dictionary
413 'n_features': n_features
,
414 'batch_shape': batch_shape
419 def update(self
, *args
, verbose
=True, **kwargs
):
421 Updates the dictionary, with restrictions on certain keys, and recalculates shapes if necessary.
425 verbose : bool, optional
426 If True, prints status messages. Default is True.
428 # Prevent updating n_features and batch_shape
429 restricted_keys
= {'n_features', 'batch_shape'}
430 keys_to_check
= {'features_list', 'timesteps', 'batch_size'}
432 # Check for restricted keys in args
434 if isinstance(args
[0], dict):
435 if restricted_keys
& args
[0].keys():
436 raise KeyError(f
"Cannot directly update keys: {restricted_keys & args[0].keys()}, \n Instead update one of: {keys_to_check}")
437 elif isinstance(args
[0], (tuple, list)) and all(isinstance(i
, tuple) and len(i
) == 2 for i
in args
[0]):
438 if restricted_keys
& {k
for k
, v
in args
[0]}:
439 raise KeyError(f
"Cannot directly update keys: {restricted_keys & {k for k, v in args[0]}}, \n Instead update one of: {keys_to_check}")
441 # Check for restricted keys in kwargs
442 if restricted_keys
& kwargs
.keys():
443 raise KeyError(f
"Cannot update restricted keys: {restricted_keys & kwargs.keys()}")
446 # Track if specific keys are updated
449 # Update using the standard dict update method
451 if isinstance(args
[0], dict):
452 keys_updated
.update(args
[0].keys() & keys_to_check
)
453 elif isinstance(args
[0], (tuple, list)) and all(isinstance(i
, tuple) and len(i
) == 2 for i
in args
[0]):
454 keys_updated
.update(k
for k
, v
in args
[0] if k
in keys_to_check
)
457 keys_updated
.update(kwargs
.keys() & keys_to_check
)
459 # Call the parent update method
460 super().update(*args
, **kwargs
)
462 # Recalculate shapes if necessary
464 self
.calc_param_shapes(verbose
=verbose
)
467 ## Class for handling input data
470 A custom dictionary class for managing RNN data, with validation, scaling, and train-test splitting functionality.
472 required_keys
= {"loc", "time", "X", "y", "features_list"}
473 def __init__(self
, input_dict
, scaler
=None, features_list
=None):
475 Initializes the RNNData instance, performs checks, and prepares data.
480 A dictionary containing the initial data.
481 scaler : str, optional
482 The name of the scaler to be used (e.g., 'minmax', 'standard'). Default is None.
483 features_list : list, optional
484 A subset of features to be used. Default is None which means all features.
488 input_data
= input_dict
.copy()
489 super().__init
__(input_data
)
491 if scaler
is not None:
492 self
.set_scaler(scaler
)
493 # Rename and define other stuff
494 self
['hours'] = len(self
['y'])
495 self
['all_features_list'] = self
.pop('features_list')
496 if features_list
is None:
497 print("Using all input features.")
498 self
.features_list
= self
.all_features_list
500 self
.features_list
= features_list
502 self
.__dict
__.update(self
)
503 def run_checks(self
, verbose
=True):
505 Validates that required keys are present and checks the integrity of data shapes.
509 verbose : bool, optional
510 If True, prints status messages. Default is True.
512 missing_keys
= self
.required_keys
- self
.keys()
514 raise KeyError(f
"Missing required keys: {missing_keys}")
516 y_shape
= np
.shape(self
.y
)
517 if not (len(y_shape
) == 1 or (len(y_shape
) == 2 and y_shape
[1] == 1)):
518 raise ValueError(f
"'y' must be one-dimensional, with shape (N,) or (N, 1). Current shape is {y_shape}.")
520 # Check if 'hours' is provided and matches len(y)
522 if self
.hours
!= len(self
.y
):
523 raise ValueError(f
"Provided 'hours' value {self.hours} does not match the length of 'y', which is {len(self.y)}.")
524 # Check desired subset of features is in all input features
525 if not all_items_exist(self
.features_list
, self
.all_features_list
):
526 raise ValueError(f
"Provided 'features_list' {self.features_list} has elements not in input features.")
527 def set_scaler(self
, scaler
):
529 Sets the scaler to be used for data normalization.
534 The name of the scaler (e.g., 'minmax', 'standard').
536 recognized_scalers
= ['minmax', 'standard']
537 if scaler
in recognized_scalers
:
538 self
.scaler
= scalers
[scaler
]
540 raise ValueError(f
"Unrecognized scaler '{scaler}'. Recognized scalers are: {recognized_scalers}.")
541 def train_test_split(self
, train_frac
, val_frac
=0.0, subset_features
=True, features_list
=None, split_time
=True, split_space
=False, verbose
=True):
543 Splits the data into training, validation, and test sets.
548 The fraction of data to be used for training.
549 val_frac : float, optional
550 The fraction of data to be used for validation. Default is 0.0.
551 subset_features : bool, optional
552 If True, subsets the data to the specified features list. Default is True.
553 features_list : list, optional
554 A list of features to use for subsetting. Default is None.
555 split_time : bool, optional
556 Whether to split the data based on time. Default is True.
557 split_space : bool, optional
558 Whether to split the data based on space. Default is False.
559 verbose : bool, optional
560 If True, prints status messages. Default is True.
562 # Extract data to desired features
566 if verbose
and self
.features_list
!= self
.all_features_list
:
567 print(f
"Subsetting input data to features_list: {self.features_list}")
568 # Indices to subset all features with based on params features
570 for item
in self
.features_list
:
571 if item
in self
.all_features_list
:
572 indices
.append(self
.all_features_list
.index(item
))
574 print(f
"Warning: feature name '{item}' not found in list of all features from input data")
576 # Setup train/test in time
577 train_ind
= int(np
.floor(self
.hours
* train_frac
)); self
.train_ind
= train_ind
578 test_ind
= int(train_ind
+ round(self
.hours
* val_frac
)); self
.test_ind
= test_ind
580 # Check for any potential issues with indices
581 if test_ind
> self
.hours
:
582 print(f
"Setting test index to {self.hours}")
583 test_ind
= self
.hours
584 if train_ind
>= test_ind
:
585 raise ValueError("Train index must be less than test index.")
587 # Training data from 0 to train_ind
588 self
.X_train
= X
[:train_ind
]
589 self
.y_train
= y
[:train_ind
].reshape(-1,1) # assumes y 1-d, change this if vector output
590 # Validation data from train_ind to test_ind
592 self
.X_val
= X
[train_ind
:test_ind
]
593 self
.y_val
= y
[train_ind
:test_ind
].reshape(-1,1) # assumes y 1-d, change this if vector output
594 # Test data from test_ind to end
595 self
.X_test
= X
[test_ind
:]
596 self
.y_test
= y
[test_ind
:].reshape(-1,1) # assumes y 1-d, change this if vector output
598 # Print statements if verbose
600 print(f
"Train index: 0 to {train_ind}")
601 print(f
"Validation index: {train_ind} to {test_ind}")
602 print(f
"Test index: {test_ind} to {self.hours}")
603 print(f
"X_train shape: {self.X_train.shape}, y_train shape: {self.y_train.shape}")
604 print(f
"X_val shape: {self.X_val.shape}, y_val shape: {self.y_val.shape}")
605 print(f
"X_test shape: {self.X_test.shape}, y_test shape: {self.y_test.shape}")
606 def scale_data(self
, verbose
=True):
608 Scales the data using the set scaler.
612 verbose : bool, optional
613 If True, prints status messages. Default is True.
615 if self
.scaler
is None:
616 raise ValueError("Scaler is not set. Use 'set_scaler' method to set a scaler before scaling data.")
617 if not hasattr(self
, "X_train"):
618 raise AttributeError("No X_train within object. Run train_test_split first. This is to avoid fitting the scaler with prediction data.")
620 print(f
"Scaling data with scaler {self.scaler}, fitting on X_train")
621 # Fit the scaler on the training data
622 self
.scaler
.fit(self
.X_train
)
623 # Transform the data using the fitted scaler
624 self
.X_train
= self
.scaler
.transform(self
.X_train
)
625 if hasattr(self
, 'X_val'):
626 self
.X_val
= self
.scaler
.transform(self
.X_val
)
627 self
.X_test
= self
.scaler
.transform(self
.X_test
)
628 def inverse_scale(self
, return_X
= 'all_hours', save_changes
=False, verbose
=True):
630 Inversely scales the data to its original form.
634 return_X : str, optional
635 Specifies what data to return after inverse scaling. Default is 'all_hours'.
636 save_changes : bool, optional
637 If True, updates the internal data with the inversely scaled values. Default is False.
638 verbose : bool, optional
639 If True, prints status messages. Default is True.
642 print("Inverse scaling data...")
643 X_train
= self
.scaler
.inverse_transform(self
.X_train
)
644 X_val
= self
.scaler
.inverse_transform(self
.X_val
)
645 X_test
= self
.scaler
.inverse_transform(self
.X_test
)
648 print("Inverse transformed data saved")
649 self
.X_train
= X_train
654 print("Inverse scaled, but internal data not changed.")
656 print(f
"Attempting to return {return_X}")
657 if return_X
== "all_hours":
658 return np
.concatenate((X_train
, X_val
, X_test
), axis
=0)
660 print(f
"Unrecognized or unimplemented return value {return_X}")
661 def print_hashes(self
, attrs_to_check
= ['X', 'y', 'X_train', 'y_train', 'X_val', 'y_val', 'X_test', 'y_test']):
663 Prints the hash of specified data attributes.
667 attrs_to_check : list, optional
668 A list of attribute names to hash and print. Default includes 'X', 'y', and split data.
670 for attr
in attrs_to_check
:
671 if hasattr(self
, attr
):
672 value
= getattr(self
, attr
)
673 print(f
"Hash of {attr}: {hash_ndarray(value)}")
674 def __getattr__(self
, key
):
676 Allows attribute-style access to dictionary keys, a.k.a. enables the "." operator for get elements
681 raise AttributeError(f
"'rnn_data' object has no attribute '{key}'")
683 def __setitem__(self
, key
, value
):
685 Ensures dictionary and attribute updates stay in sync for required keys.
687 super().__setitem
__(key
, value
) # Update the dictionary
688 if key
in self
.required_keys
:
689 super().__setattr
__(key
, value
) # Ensure the attribute is updated as well
691 def __setattr__(self
, key
, value
):
693 Ensures dictionary keys are updated when setting attributes.
698 # Function to check reproduciblity hashes, environment info, and model parameters
699 def check_reproducibility(dict0
, params
, m_hash
, w_hash
):
701 Performs reproducibility checks on a model by comparing current settings and outputs with stored reproducibility information.
706 The data dictionary that should contain reproducibility information under the 'repro_info' attribute.
708 The current model parameters to be checked against the reproducibility information.
710 The hash of the current model predictions.
712 The hash of the current fitted model weights.
717 The function returns None. It issues warnings if any reproducibility checks fail.
721 - Checks are only performed if the `dict0` contains the 'repro_info' attribute.
722 - Issues warnings for mismatches in model weights, predictions, Python version, TensorFlow version, and model parameters.
723 - Skips checks if physics-based initialization is used (not implemented).
725 if not hasattr(dict0
, "repro_info"):
726 warnings
.warn("The provided data dictionary does not have the required 'repro_info' attribute. Not running reproduciblity checks.")
729 repro_info
= dict0
.repro_info
731 if params
['phys_initialize']:
732 hashes
= repro_info
['phys_initialize']
733 warnings
.warn("Physics Initialization not implemented yet. Not running reproduciblity checks.")
735 hashes
= repro_info
['rand_initialize']
736 print(f
"Fitted weights hash: {w_hash} \n Reproducibility weights hash: {hashes['fitted_weights_hash']}")
737 print(f
"Model predictions hash: {m_hash} \n Reproducibility preds hash: {hashes['preds_hash']}")
738 if (w_hash
!= hashes
['fitted_weights_hash']) or (m_hash
!= hashes
['preds_hash']):
739 if w_hash
!= hashes
['fitted_weights_hash']:
740 warnings
.warn("The fitted weights hash does not match the reproducibility weights hash.")
741 if m_hash
!= hashes
['preds_hash']:
742 warnings
.warn("The predictions hash does not match the reproducibility predictions hash.")
744 print("***Reproducibility Checks passed - model weights and model predictions match expected.***")
747 current_py_version
= sys
.version
[0:6]
748 current_tf_version
= tf
.__version
__
749 if current_py_version
!= repro_info
['env_info']['py_version']:
750 warnings
.warn(f
"Python version mismatch: Current Python version is {current_py_version}, "
751 f
"expected {repro_info['env_info']['py_version']}.")
753 if current_tf_version
!= repro_info
['env_info']['tf_version']:
754 warnings
.warn(f
"TensorFlow version mismatch: Current TensorFlow version is {current_tf_version}, "
755 f
"expected {repro_info['env_info']['tf_version']}.")
758 repro_params
= repro_info
.get('params', {})
760 for key
, repro_value
in repro_params
.items():
762 if params
[key
] != repro_value
:
763 warnings
.warn(f
"Parameter mismatch for '{key}': Current value is {params[key]}, "
764 f
"repro value is {repro_value}.")
766 warnings
.warn(f
"Parameter '{key}' is missing in the current params.")
772 Abstract base class for RNN models, providing structure for training, predicting, and running reproducibility checks.
774 def __init__(self
, params
: dict):
776 Initializes the RNNModel with the given parameters.
781 A dictionary containing model parameters.
783 self
.params
= copy
.deepcopy(params
)
784 self
.params
['n_features'] = len(self
.params
['features_list'])
785 if type(self
) is RNNModel
:
786 raise TypeError("MLModel is an abstract class and cannot be instantiated directly")
790 def _build_model_train(self
):
791 """Abstract method to build the training model."""
795 def _build_model_predict(self
, return_sequences
=True):
796 """Abstract method to build the prediction model. This model copies weights from the train model but with input structure that allows for easier prediction of arbitrary length timeseries. This model is not to be used for training, or don't use with .fit calls"""
799 def fit(self
, X_train
, y_train
, plot
=True, plot_title
= '',
800 weights
=None, callbacks
=[], validation_data
=None, *args
, **kwargs
):
802 Trains the model on the provided training data.
807 The input matrix data for training.
809 The target vector data for training.
810 plot : bool, optional
811 If True, plots the training history. Default is True.
812 plot_title : str, optional
813 The title for the training plot. Default is an empty string.
815 Initial weights for the model. Default is None.
816 callbacks : list, optional
817 A list of callback functions to use during training. Default is an empty list.
818 validation_data : tuple, optional
819 Validation data to use during training, expected format (X_val, y_val). Default is None.
821 # verbose_fit argument is for printing out update after each epoch, which gets very long
822 # These print statements at the top could be turned off with a verbose argument, but then
823 # there would be a bunch of different verbose params
824 verbose_fit
= self
.params
['verbose_fit']
825 verbose_weights
= self
.params
['verbose_weights']
827 print(f
"Training simple RNN with params: {self.params}")
828 X_train
, y_train
= self
.format_train_data(X_train
, y_train
)
829 if validation_data
is not None:
830 X_val
, y_val
= self
.format_train_data(validation_data
[0], validation_data
[1])
832 print(f
"Formatted X_train hash: {hash_ndarray(X_train)}")
833 print(f
"Formatted y_train hash: {hash_ndarray(y_train)}")
834 if validation_data
is not None:
835 print(f
"Formatted X_val hash: {hash_ndarray(X_val)}")
836 print(f
"Formatted y_val hash: {hash_ndarray(y_val)}")
837 print(f
"Initial weights before training hash: {hash_weights(self.model_train)}")
839 if self
.params
["reset_states"]:
840 callbacks
=callbacks
+[ResetStatesCallback(batch_reset
= self
.params
['batch_reset']), TerminateOnNaN()]
841 if validation_data
is not None:
842 print("Using early stopping callback.")
843 callbacks
=callbacks
+[EarlyStoppingCallback(patience
= self
.params
['early_stopping_patience'])]
845 # Note: we overload the params here so that verbose_fit can be easily turned on/off at the .fit call
847 # Evaluate Model once to set nonzero initial state
848 if self
.params
["batch_size"]>= X_train
.shape
[0]:
849 self
.model_train(X_train
)
850 if validation_data
is not None:
851 history
= self
.model_train
.fit(
852 X_train
, y_train
+self
.params
['centering'][1],
853 epochs
=self
.params
['epochs'],
854 batch_size
=self
.params
['batch_size'],
855 callbacks
= callbacks
,
857 validation_data
= (X_val
, y_val
),
861 history
= self
.model_train
.fit(
862 X_train
, y_train
+self
.params
['centering'][1],
863 epochs
=self
.params
['epochs'],
864 batch_size
=self
.params
['batch_size'],
865 callbacks
= callbacks
,
871 self
.plot_history(history
,plot_title
)
872 if self
.params
["verbose_weights"]:
873 print(f
"Fitted Weights Hash: {hash_weights(self.model_train)}")
875 # Update Weights for Prediction Model
876 w_fitted
= self
.model_train
.get_weights()
877 self
.model_predict
.set_weights(w_fitted
)
879 def predict(self
, X_test
):
881 Generates predictions on the provided test data using the internal prediction model.
886 The input data for generating predictions.
891 The predicted values.
894 X_test
= self
.format_pred_data(X_test
)
895 preds
= self
.model_predict
.predict(X_test
).flatten()
898 def format_train_data(self
, X
, y
, verbose
=False):
900 Formats the training data for RNN input.
908 verbose : bool, optional
909 If True, prints status messages. Default is False.
914 Formatted input and target data.
916 X
, y
= staircase_2(X
, y
, timesteps
= self
.params
["timesteps"], batch_size
=self
.params
["batch_size"], verbose
=verbose
)
918 def format_pred_data(self
, X
):
920 Formats the prediction data for RNN input.
930 The formatted input data.
932 return np
.reshape(X
,(1, X
.shape
[0], self
.params
['n_features']))
934 def plot_history(self
, history
, plot_title
):
936 Plots the training history.
940 history : History object
941 The training history object from model fitting. Output of keras' .fit command
943 The title for the plot.
946 plt
.semilogy(history
.history
['loss'], label
='Training loss')
947 if 'val_loss' in history
.history
:
948 plt
.semilogy(history
.history
['val_loss'], label
='Validation loss')
949 plt
.title(f
'{plot_title} Model loss')
952 plt
.legend(loc
='upper left')
955 def run_model(self
, dict0
, reproducibility_run
=False, plot_period
='all'):
957 Runs the RNN model, including training, prediction, and reproducibility checks.
962 The dictionary containing the input data and configuration.
963 reproducibility_run : bool, optional
964 If True, performs reproducibility checks after running the model. Default is False.
969 Model predictions and a dictionary of RMSE errors.
971 verbose_fit
= self
.params
['verbose_fit']
972 verbose_weights
= self
.params
['verbose_weights']
973 # Make copy to prevent changing in place
974 dict1
= copy
.deepcopy(dict0
)
976 print("Input data hashes, NOT formatted for rnn sequence/batches yet")
979 scale_fm
= dict1
['scale_fm']
980 X_train
, y_train
, X_test
, y_test
= dict1
['X_train'].copy(), dict1
['y_train'].copy(), dict1
["X_test"].copy(), dict1
['y_test'].copy()
982 X_val
, y_val
= dict1
['X_val'].copy(), dict1
['y_val'].copy()
985 case_id
= dict1
['case']
989 self
.fit(X_train
, y_train
, plot_title
=case_id
)
991 self
.fit(X_train
, y_train
, validation_data
= (X_val
, y_val
), plot_title
=case_id
)
992 # Generate Predictions,
993 # run through training to get hidden state set proporly for forecast period
995 X
= np
.concatenate((X_train
, X_test
))
996 y
= np
.concatenate((y_train
, y_test
)).flatten()
998 X
= np
.concatenate((X_train
, X_val
, X_test
))
999 y
= np
.concatenate((y_train
, y_val
, y_test
)).flatten()
1002 print(f
"Predicting Training through Test")
1003 print(f
"All X hash: {hash_ndarray(X)}")
1005 m
= self
.predict(X
).flatten()
1007 print(f
"Predictions Hash: {hash_ndarray(m)}")
1009 dict0
['m']=m
# add to outside env dictionary, should be only place this happens
1011 if reproducibility_run
:
1012 print("Checking Reproducibility")
1013 check_reproducibility(dict0
, self
.params
, hash_ndarray(m
), hash_weights(self
.model_predict
))
1015 # print(dict1.keys())
1016 # Plot final fit and data
1018 plot_data(dict1
, title
="RNN", title2
=dict1
['case'], plot_period
=plot_period
)
1022 train_ind
= dict1
["train_ind"] # index of final training set value
1023 test_ind
= dict1
["test_ind"] # index of first test set value
1024 err_train
= rmse(m
[:train_ind
], y_train
.flatten())
1025 err_pred
= rmse(m
[test_ind
:], y_test
.flatten())
1028 'training': err_train
,
1029 'prediction': err_pred
1037 class ResetStatesCallback(Callback
):
1039 Custom callback to reset the states of RNN layers at the end of each epoch and optionally after a specified number of batches.
1043 batch_reset : int, optional
1044 If provided, resets the states of RNN layers after every `batch_reset` batches. Default is None.
1046 def __init__(self
, batch_reset
=None):
1048 Initializes the ResetStatesCallback with an optional batch reset interval.
1052 batch_reset : int, optional
1053 The interval of batches after which to reset the states of RNN layers. Default is None.
1055 super(ResetStatesCallback
, self
).__init
__()
1056 self
.batch_reset
= batch_reset
1057 def on_epoch_end(self
, epoch
, logs
=None):
1059 Resets the states of RNN layers at the end of each epoch.
1064 The index of the current epoch.
1065 logs : dict, optional
1066 A dictionary containing metrics from the epoch. Default is None.
1068 # Iterate over each layer in the model
1069 for layer
in self
.model
.layers
:
1070 # Check if the layer has a reset_states method
1071 if hasattr(layer
, 'reset_states'):
1072 layer
.reset_states()
1073 def on_train_batch_end(self
, batch
, logs
=None):
1075 Resets the states of RNN layers after a specified number of batches, if `batch_reset` is provided.
1080 The index of the current batch.
1081 logs : dict, optional
1082 A dictionary containing metrics from the batch. Default is None.
1084 batch_reset
= self
.batch_reset
1085 if batch_reset
is not None and batch
% batch_reset
== 0:
1086 # print(f"Resetting states after batch {batch + 1}")
1087 # Iterate over each layer in the model
1088 for layer
in self
.model
.layers
:
1089 # Check if the layer has a reset_states method
1090 if hasattr(layer
, 'reset_states'):
1091 layer
.reset_states()
1094 ## Learning Schedules
1096 lr_schedule
= tf
.keras
.optimizers
.schedules
.CosineDecay(
1097 initial_learning_rate
=0.001,
1101 # warmup_target=None,
1106 def EarlyStoppingCallback(patience
=5):
1108 Creates an EarlyStopping callback with the specified patience.
1111 patience (int): Number of epochs with no improvement after which training will be stopped.
1114 EarlyStopping: Configured EarlyStopping callback.
1116 return EarlyStopping(
1121 restore_best_weights
=True
1123 # early_stopping = EarlyStopping(
1124 # monitor='val_loss', # Metric to monitor, e.g., 'val_loss', 'val_accuracy'
1125 # patience=5, # Number of epochs with no improvement after which training will be stopped
1126 # verbose=1, # Print information about early stopping
1127 # mode='min', # 'min' means training will stop when the quantity monitored has stopped decreasing
1128 # restore_best_weights=True # Restore model weights from the epoch with the best value of the monitored quantity
1131 # with open("params.yaml") as file:
1132 # phys_params = yaml.safe_load(file)["physics_initializer"]
1135 'DeltaE': [0,-1], # bias correction
1136 'T1': 0.1, # 1/fuel class (10)
1137 'fm_raise_vs_rain': 0.2 # fm increase per mm rain
1142 def get_initial_weights(model_fit
,params
,scale_fm
):
1143 # Given a RNN architecture and hyperparameter dictionary, return array of physics-initiated weights
1145 # model_fit: output of create_RNN_2 with no training
1146 # params: (dict) dictionary of hyperparameters
1147 # rnn_dat: (dict) data dictionary, output of create_rnn_dat
1148 # Returns: numpy ndarray of weights that should be a rough solution to the moisture ODE
1149 DeltaE
= phys_params
['DeltaE']
1150 T1
= phys_params
['T1']
1151 fmr
= phys_params
['fm_raise_vs_rain']
1152 centering
= params
['centering'] # shift activation down
1154 w0_initial
={'Ed':(1.-np
.exp(-T1
))/2,
1155 'Ew':(1.-np
.exp(-T1
))/2,
1156 'rain':fmr
* scale_fm
} # wx - input feature
1157 # wh wb wd bd = bias -1
1159 w_initial
=np
.array([np
.nan
, np
.exp(-0.1), DeltaE
[0]/scale_fm
, # layer 0
1160 1.0, -centering
[0] + DeltaE
[1]/scale_fm
]) # layer 1
1161 if params
['verbose_weights']:
1162 print('Equilibrium moisture correction bias',DeltaE
[0],
1163 'in the hidden layer and',DeltaE
[1],' in the output layer')
1165 w_name
= ['wx','wh','bh','wd','bd']
1167 w
=model_fit
.get_weights()
1168 for j
in range(w
[0].shape
[0]):
1169 feature
= params
['features_list'][j
]
1170 for k
in range(w
[0].shape
[1]):
1171 w
[0][j
][k
]=w0_initial
[feature
]
1172 for i
in range(1,len(w
)): # number of the weight
1173 for j
in range(w
[i
].shape
[0]): # number of the inputs
1175 # initialize all entries of the weight matrix to the same number
1176 for k
in range(w
[i
].shape
[1]):
1177 w
[i
][j
][k
]=w_initial
[i
]/w
[i
].shape
[0]
1179 w
[i
][j
]=w_initial
[i
]
1181 print('weight',i
,'shape',w
[i
].shape
)
1182 raise ValueError("Only 1 or 2 dimensions supported")
1183 if params
['verbose_weights']:
1184 print('weight',i
,w_name
[i
],'shape',w
[i
].shape
,'ndim',w
[i
].ndim
,
1185 'initial: sum',np
.sum(w
[i
],axis
=0),'\nentries',w
[i
])
1189 class RNN(RNNModel
):
1191 A concrete implementation of the RNNModel abstract base class, using simple recurrent cells for hidden recurrent layers.
1196 A dictionary of model parameters.
1197 loss : str, optional
1198 The loss function to use during model training. Default is 'mean_squared_error'.
1200 def __init__(self
, params
, loss
='mean_squared_error'):
1202 Initializes the RNN model by building the training and prediction models.
1206 params : dict or RNNParams
1207 A dictionary containing the model's parameters.
1208 loss : str, optional
1209 The loss function to use during model training. Default is 'mean_squared_error'.
1211 super().__init
__(params
)
1212 self
.model_train
= self
._build
_model
_train
()
1213 self
.model_predict
= self
._build
_model
_predict
()
1215 def _build_model_train(self
):
1217 Builds and compiles the training model, with batch & sequence shape specifications for input.
1221 model : tf.keras.Model
1222 The compiled Keras model for training.
1224 inputs
= tf
.keras
.Input(batch_shape
=self
.params
['batch_shape'])
1226 for i
in range(self
.params
['rnn_layers']):
1227 return_sequences
= True if i
< self
.params
['rnn_layers'] - 1 else False
1229 units
=self
.params
['rnn_units'],
1230 activation
=self
.params
['activation'][0],
1231 dropout
=self
.params
["dropout"][0],
1232 recurrent_dropout
= self
.params
["recurrent_dropout"],
1233 stateful
=self
.params
['stateful'],
1234 return_sequences
=return_sequences
)(x
)
1235 if self
.params
["dropout"][1] > 0:
1236 x
= Dropout(self
.params
["dropout"][1])(x
)
1237 for i
in range(self
.params
['dense_layers']):
1238 x
= Dense(self
.params
['dense_units'], activation
=self
.params
['activation'][1])(x
)
1239 # Add final output layer, must be 1 dense cell with linear activation if continuous scalar output
1240 x
= Dense(units
=1, activation
='linear')(x
)
1241 model
= tf
.keras
.Model(inputs
=inputs
, outputs
=x
)
1242 optimizer
=tf
.keras
.optimizers
.Adam(learning_rate
=self
.params
['learning_rate'])
1243 # optimizer=tf.keras.optimizers.Adam(learning_rate=self.params['learning_rate'], clipvalue=self.params['clipvalue'])
1244 # optimizer=tf.keras.optimizers.Adam(learning_rate=lr_schedule)
1245 model
.compile(loss
='mean_squared_error', optimizer
=optimizer
)
1247 if self
.params
["verbose_weights"]:
1248 print(f
"Initial Weights Hash: {hash_weights(model)}")
1249 # print(model.get_weights())
1251 if self
.params
['phys_initialize']:
1252 assert self
.params
['scaler'] == 'reproducibility', f
"Not implemented yet to do physics initialize with given data scaling {self.params['scaler']}"
1253 assert self
.params
['features_list'] == ['Ed', 'Ew', 'rain'], f
"Physics initiation can only be done with features ['Ed', 'Ew', 'rain'], but given features {self.params['features_list']}"
1254 print("Initializing Model with Physics based weights")
1255 w
, w_name
=get_initial_weights(model
, self
.params
, scale_fm
= scale_fm
)
1256 model
.set_weights(w
)
1257 print('initial weights hash =',hash_weights(model
))
1259 def _build_model_predict(self
, return_sequences
=True):
1261 Builds and compiles the prediction model, doesn't use batch shape nor sequence length.
1265 return_sequences : bool, optional
1266 Whether to return the full sequence of outputs. Default is True.
1270 model : tf.keras.Model
1271 The compiled Keras model for prediction.
1273 inputs
= tf
.keras
.Input(shape
=(None,self
.params
['n_features']))
1275 for i
in range(self
.params
['rnn_layers']):
1276 x
= SimpleRNN(self
.params
['rnn_units'],activation
=self
.params
['activation'][0],
1277 stateful
=False,return_sequences
=return_sequences
)(x
)
1278 for i
in range(self
.params
['dense_layers']):
1279 x
= Dense(self
.params
['dense_units'], activation
=self
.params
['activation'][1])(x
)
1280 # Add final output layer, must be 1 dense cell with linear activation if continuous scalar output
1281 x
= Dense(units
=1, activation
='linear')(x
)
1282 model
= tf
.keras
.Model(inputs
=inputs
, outputs
=x
)
1283 optimizer
=tf
.keras
.optimizers
.Adam(learning_rate
=self
.params
['learning_rate'])
1284 model
.compile(loss
='mean_squared_error', optimizer
=optimizer
)
1286 # Set Weights to model_train
1287 w_fitted
= self
.model_train
.get_weights()
1288 model
.set_weights(w_fitted
)
1293 class RNN_LSTM(RNNModel
):
1295 A concrete implementation of the RNNModel abstract base class, use LSTM cells for hidden recurrent layers.
1300 A dictionary of model parameters.
1301 loss : str, optional
1302 The loss function to use during model training. Default is 'mean_squared_error'.
1304 def __init__(self
, params
, loss
='mean_squared_error'):
1306 Initializes the RNN model by building the training and prediction models.
1310 params : dict or RNNParams
1311 A dictionary containing the model's parameters.
1312 loss : str, optional
1313 The loss function to use during model training. Default is 'mean_squared_error'.
1315 super().__init
__(params
)
1316 self
.model_train
= self
._build
_model
_train
()
1317 self
.model_predict
= self
._build
_model
_predict
()
1319 def _build_model_train(self
):
1321 Builds and compiles the training model, with batch & sequence shape specifications for input.
1325 model : tf.keras.Model
1326 The compiled Keras model for training.
1328 inputs
= tf
.keras
.Input(batch_shape
=self
.params
['batch_shape'])
1330 for i
in range(self
.params
['rnn_layers']):
1331 return_sequences
= True if i
< self
.params
['rnn_layers'] - 1 else False
1333 units
=self
.params
['rnn_units'],
1334 activation
=self
.params
['activation'][0],
1335 dropout
=self
.params
["dropout"][0],
1336 recurrent_dropout
= self
.params
["recurrent_dropout"],
1337 recurrent_activation
=self
.params
["recurrent_activation"],
1338 stateful
=self
.params
['stateful'],
1339 return_sequences
=return_sequences
)(x
)
1340 if self
.params
["dropout"][1] > 0:
1341 x
= Dropout(self
.params
["dropout"][1])(x
)
1342 for i
in range(self
.params
['dense_layers']):
1343 x
= Dense(self
.params
['dense_units'], activation
=self
.params
['activation'][1])(x
)
1344 model
= tf
.keras
.Model(inputs
=inputs
, outputs
=x
)
1345 # optimizer=tf.keras.optimizers.Adam(learning_rate=self.params['learning_rate'], clipvalue=self.params['clipvalue'])
1346 optimizer
=tf
.keras
.optimizers
.Adam(learning_rate
=self
.params
['learning_rate'])
1347 model
.compile(loss
='mean_squared_error', optimizer
=optimizer
)
1349 if self
.params
["verbose_weights"]:
1350 print(f
"Initial Weights Hash: {hash_weights(model)}")
1352 def _build_model_predict(self
, return_sequences
=True):
1354 Builds and compiles the prediction model, doesn't use batch shape nor sequence length.
1358 return_sequences : bool, optional
1359 Whether to return the full sequence of outputs. Default is True.
1363 model : tf.keras.Model
1364 The compiled Keras model for prediction.
1366 inputs
= tf
.keras
.Input(shape
=(None,self
.params
['n_features']))
1368 for i
in range(self
.params
['rnn_layers']):
1370 units
=self
.params
['rnn_units'],
1371 activation
=self
.params
['activation'][0],
1372 stateful
=False,return_sequences
=return_sequences
)(x
)
1373 for i
in range(self
.params
['dense_layers']):
1374 x
= Dense(self
.params
['dense_units'], activation
=self
.params
['activation'][1])(x
)
1375 model
= tf
.keras
.Model(inputs
=inputs
, outputs
=x
)
1376 optimizer
=tf
.keras
.optimizers
.Adam(learning_rate
=self
.params
['learning_rate'])
1377 model
.compile(loss
='mean_squared_error', optimizer
=optimizer
)
1379 # Set Weights to model_train
1380 w_fitted
= self
.model_train
.get_weights()
1381 model
.set_weights(w_fitted
)