1 # v2 training and prediction class infrastructure
6 import tensorflow
as tf
7 import matplotlib
.pyplot
as plt
9 from tensorflow
.keras
.callbacks
import Callback
, EarlyStopping
10 # from sklearn.metrics import mean_squared_error
12 from tensorflow
.keras
.layers
import LSTM
, SimpleRNN
, Input
, Dropout
, Dense
14 import reproducibility
15 # from utils import print_dict_summary
16 from abc
import ABC
, abstractmethod
17 from utils
import hash2
, all_items_exist
, hash_ndarray
, hash_weights
18 from data_funcs
import rmse
, plot_data
, compare_dicts
21 from sklearn
.preprocessing
import MinMaxScaler
, StandardScaler
24 #*************************************************************************************
25 # Data Formatting Functions
27 def staircase(x
,y
,timesteps
,datapoints
,return_sequences
=False, verbose
= False):
28 # x [datapoints,features] all inputs
29 # y [datapoints,outputs]
30 # timesteps: split x and y into samples length timesteps, shifted by 1
31 # datapoints: number of timesteps to use for training, no more than y.shape[0]
33 print('staircase: shape x = ',x
.shape
)
34 print('staircase: shape y = ',y
.shape
)
35 print('staircase: timesteps=',timesteps
)
36 print('staircase: datapoints=',datapoints
)
37 print('staircase: return_sequences=',return_sequences
)
40 samples
= datapoints
-timesteps
+1
42 print('staircase: samples=',samples
,'timesteps=',timesteps
,'features=',features
)
43 x_train
= np
.empty([samples
, timesteps
, features
])
46 print('returning all timesteps in a sample')
47 y_train
= np
.empty([samples
, timesteps
, outputs
]) # all
48 for i
in range(samples
):
49 for k
in range(timesteps
):
50 x_train
[i
,k
,:] = x
[i
+k
,:]
51 y_train
[i
,k
,:] = y
[i
+k
,:]
54 print('returning only the last timestep in a sample')
55 y_train
= np
.empty([samples
, outputs
])
56 for i
in range(samples
):
57 for k
in range(timesteps
):
58 x_train
[i
,k
,:] = x
[i
+k
,:]
59 y_train
[i
,:] = y
[i
+timesteps
-1,:]
61 return x_train
, y_train
63 def staircase_2(x
,y
,timesteps
,batch_size
=None,trainsteps
=np
.inf
,return_sequences
=False, verbose
= False):
64 # create RNN training data in multiple batches
68 # timesteps: split x and y into sequences length timesteps
69 # a.k.a. lookback or sequence_length
71 # print params if verbose
73 if batch_size
is None:
74 raise ValueError('staircase_2 requires batch_size')
76 print('staircase_2: shape x = ',x
.shape
)
77 print('staircase_2: shape y = ',y
.shape
)
78 print('staircase_2: timesteps=',timesteps
)
79 print('staircase_2: batch_size=',batch_size
)
80 print('staircase_2: return_sequences=',return_sequences
)
84 datapoints
= min(nx
,ny
,trainsteps
)
86 print('staircase_2: datapoints=',datapoints
)
88 # sequence j in a given batch is assumed to be the continuation of sequence j in the previous batch
89 # https://www.tensorflow.org/guide/keras/working_with_rnns Cross-batch statefulness
91 # example with timesteps=3 batch_size=3 datapoints=15
92 # batch 0: [0 1 2] [1 2 3] [2 3 4]
93 # batch 1: [3 4 5] [4 5 6] [5 6 7]
94 # batch 2: [6 7 8] [7 8 9] [8 9 10]
95 # batch 3: [9 10 11] [10 11 12] [11 12 13]
96 # batch 4: [12 13 14] [13 14 15] when runs out this is the last batch, can be shorter
98 # TODO: implement for multiple locations, same starting time for each batch
100 # batch 0: [0 1 2] [0 1 2] [0 1 2]
101 # batch 1: [3 4 5] [3 4 5] [3 4 5]
102 # batch 2: [6 7 8] [6 7 8] [6 7 8]
103 # TODO: second epoch shift starting time at batch 0 in time
105 # TODO: implement for multiple locations, different starting times for each batch
107 # batch 0: [0 1 2] [1 2 3] [2 3 4]
108 # batch 1: [3 4 5] [4 5 6] [5 6 57
109 # batch 2: [6 7 8] [7 8 9] [8 9 10]
112 # the first sample in batch j starts from timesteps*j and ends with timesteps*(j+1)-1
113 # e.g. the final hidden state of the rnn after the sequence of steps [0 1 2] in batch 0
114 # becomes the starting hidden state of the rnn in the sequence of steps [3 4 5] in batch 1, etc.
116 # sample [0 1 2] means the rnn is used twice to map state 0 -> 1 -> 2
117 # the state at time 0 is fixed but the state is considered a variable at times 1 and 2
118 # the loss is computed from the output at time 2 and the gradient of the loss function by chain rule which ends at time 0 because the state there is a constant -> derivative is zero
119 # sample [3 4 5] means the rnn is used twice to map state 3 -> 4 -> 5 # the state at time 3 is fixed to the output of the first sequence [0 1 2]
120 # the loss is computed from the output at time 5 and the gradient of the loss function by chain rule which ends at time 3 because the state there is considered constant -> derivative is zero
121 # how is the gradient computed? I suppose keras adds gradient wrt the weights at 2 5 8 ... 3 6 9... 4 7 ... and uses that to update the weights
122 # there is only one set of weights h(2) = f(h(1),w) h(1) = f(h(0),w) but w is always the same
123 # each column is a one successive evaluation of h(n+1) = f(h(n),w) for n = n_startn n_start+1,...
124 # the cannot be evaluated efficiently on gpu because gpu is a parallel processor
125 # this of it as each column served by one thread, and the threads are independent because they execute in parallel, there needs to be large number of threads (32 is a good number)\
126 # each batch consists of independent calculations
127 # but it can depend on the result of the previous batch (that's the recurrent parr)
131 max_batches
= datapoints
// timesteps
132 max_sequences
= max_batches
* batch_size
135 print('staircase_2: max_batches=',max_batches
)
136 print('staircase_2: max_sequences=',max_sequences
)
138 x_train
= np
.zeros((max_sequences
, timesteps
, features
))
140 y_train
= np
.empty((max_sequences
, timesteps
, outputs
))
142 y_train
= np
.empty((max_sequences
, outputs
))
144 # build the sequences
146 for i
in range(max_batches
):
147 for j
in range(batch_size
):
148 begin
= i
*timesteps
+ j
149 next
= begin
+ timesteps
150 if next
> datapoints
:
153 print('sequence',k
,'batch',i
,'sample',j
,'data',begin
,'to',next
-1)
154 x_train
[k
,:,:] = x
[begin
:next
,:]
156 y_train
[k
,:,:] = y
[begin
:next
,:]
158 y_train
[k
,:] = y
[next
-1,:]
161 print('staircase_2: shape x_train = ',x_train
.shape
)
162 print('staircase_2: shape y_train = ',y_train
.shape
)
163 print('staircase_2: sequences generated',k
)
164 print('staircase_2: batch_size=',batch_size
)
165 k
= (k
// batch_size
) * batch_size
167 print('staircase_2: removing partial and empty batches at the end, keeping',k
)
168 x_train
= x_train
[:k
,:,:]
170 y_train
= y_train
[:k
,:,:]
172 y_train
= y_train
[:k
,:]
175 print('staircase_2: shape x_train = ',x_train
.shape
)
176 print('staircase_2: shape y_train = ',y_train
.shape
)
178 return x_train
, y_train
181 # Dictionary of scalers, used to avoid multiple object creation and to avoid multiple if statements
183 'minmax': MinMaxScaler(),
184 'standard': StandardScaler()
187 # def scale_transform(X, method='minmax'):
188 # # Function to scale data in place
190 # # X: (ndarray) data to be scaled
191 # # method: (str) one of keys in scalers dictionary above
192 # scaler = scalers[method]
194 # # Modify X in-place
195 # X[:] = scaler.transform(X)
197 def create_rnn_data2(dict1
, params
, atm_dict
="HRRR", verbose
=False, train_ind
=None, test_ind
=None):
198 # Given fmda data and hyperparameters, return formatted dictionary to be used in RNN
200 # d: (dict) fmda dictionary
201 # params: (dict) hyperparameters
202 # atm_dict: (str) string specifying name of subdictionary for atmospheric vars
203 # train_frac: (float) fraction of data to use for training (starting from time 0)
204 # val_frac: (float) fraction of data to use for validation data (starting from end of train)
205 # Returns: (dict) formatted data used in RNN
206 logging
.info('create_rnn_data start')
207 # Copy Dictionary to avoid changing the input to this function
208 d
=copy
.deepcopy(dict1
)
209 scale
= params
['scale']
210 scaler
= params
['scaler']
211 # Features list given by params dict to be used in training
212 features_list
= params
["features_list"]
213 # All available features list, corresponds to shape of X
214 features_all
= d
["features_list"]
215 # Indices to subset all features with based on params features
217 for item
in features_list
:
218 if item
in features_all
:
219 indices
.append(features_all
.index(item
))
221 print(f
"Warning: feature name '{item}' not found in list of all features from input data")
223 # Extract desired features based on params, combine into matrix
224 # Extract response vector
226 y
= np
.reshape(y
,(-1,1))
227 # Extract Features matrix, subset to desired features
228 X_raw
= d
['X'][:, indices
].copy() # saw untransformed features matrix
232 # Check total observed hours
234 assert hours
== y
.shape
[0] # Check that it matches response
236 logging
.info('create_rnn_data: total_hours=%s',hours
)
237 logging
.info('feature matrix X shape %s',np
.shape(X
))
238 logging
.info('target matrix Y shape %s',np
.shape(y
))
239 logging
.info('features_list: %s',features_list
)
241 logging
.info('splitting train/val/test')
242 if train_ind
is None:
243 train_ind
= round(hours
* params
['train_frac']) # index of last training observation
244 test_ind
= train_ind
+ round(hours
* params
['val_frac'])# index of first test observation, if no validation data it is equal to train_ind
245 logging
.info('Final index of training data=%s',train_ind
)
246 logging
.info('First index of Test data=%s',test_ind
)
247 # Training data from 0 to train_ind
248 X_train
= X
[:train_ind
]
249 y_train
= y
[:train_ind
].reshape(-1,1)
250 # Validation data from train_ind to test_ind
251 X_val
= X
[train_ind
:test_ind
]
252 y_val
= y
[train_ind
:test_ind
].reshape(-1,1)
253 # Test data from test_ind to end
254 X_test
= X
[test_ind
:]
255 y_test
= y
[test_ind
:].reshape(-1,1)
257 # Scale Data if required
259 # Remove need for "scale_fm" param
260 # Reset reproducibility with this scaling
262 logging
.info('Scaling feature data with scaler: %s',scaler
)
264 if scaler
=="reproducibility":
265 scale_fm
= 17.076346687085564
270 # Fit scaler to training data
271 scalers
[scaler
].fit(X_train
)
272 # Apply scaling to all data using in-place operations
273 X_train
[:] = scalers
[scaler
].transform(X_train
)
274 if X_val
.shape
[0] > 0:
275 X_val
[:] = scalers
[scaler
].transform(X_val
)
276 X_test
[:] = scalers
[scaler
].transform(X_test
)
280 print("Not scaling data")
285 logging
.info('x_train shape=%s',X_train
.shape
)
286 logging
.info('y_train shape=%s',y_train
.shape
)
287 if test_ind
== train_ind
:
288 logging
.info('No validation data')
289 elif X_val
.shape
[0]!= 0:
290 logging
.info('X_val shape=%s',X_val
.shape
)
291 logging
.info('y_val shape=%s',y_val
.shape
)
292 logging
.info('X_test shape=%s',X_test
.shape
)
293 logging
.info('y_test shape=%s',y_test
.shape
)
295 # Set up return dictionary
299 'features_list':features_list
,
300 'n_features': len(features_list
),
302 'train_ind':train_ind
,
313 if X_val
.shape
[0] > 0:
319 # Update RNN params using data attributes
320 logging
.info('Updating model params based on data')
321 timesteps
= params
['timesteps']
322 batch_size
= params
['batch_size']
323 logging
.info('batch_size=%s',batch_size
)
324 logging
.info('timesteps=%s',timesteps
)
325 features
= len(features_list
)
327 # 'n_features': features,
328 # 'batch_shape': (params["batch_size"],params["timesteps"],features),
329 # 'pred_input_shape': (None, features),
331 # 'scale_fm': scale_fm,
332 # 'scale_rain': scale_rain
336 'scale_fm': scale_fm
,
337 'scale_rain': scale_rain
340 logging
.info('create_rnn_data2 done')
343 #***********************************************************************************************
344 ### RNN Class Functionality
346 # Custom class for parameters dictionary. Inherits from dict, but adds checks and safeguards
347 class RNNParams(dict):
348 def __init__(self
, input_dict
=None):
349 super().__init
__(input_dict
)
350 # Automatically run checks on initialization
352 # Automatically calculate shapes on initialization
353 self
.calc_param_shapes()
354 def run_checks(self
, verbose
=True):
355 print("Checking params...")
356 # Keys must exist and be integers
358 'batch_size', 'timesteps', 'rnn_layers',
359 'rnn_units', 'dense_layers', 'dense_units', 'epochs'
363 assert key
in self
, f
"Missing required key: {key}"
364 assert isinstance(self
[key
], int), f
"Key '{key}' must be an integer"
366 # Keys must exist and be lists
367 list_keys
= ['activation', 'features_list', 'dropout']
368 for key
in list_keys
:
369 assert key
in self
, f
"Missing required key: {key}"
370 assert isinstance(self
[key
], list), f
"Key '{key}' must be a list"
372 # Keys must exist and be floats
373 float_keys
= ['learning_rate', 'train_frac', 'val_frac']
374 for key
in float_keys
:
375 assert key
in self
, f
"Missing required key: {key}"
376 assert isinstance(self
[key
], float), f
"Key '{key}' must be a float"
378 print("Input dictionary passed all checks.")
379 def calc_param_shapes(self
, verbose
=True):
381 print("Calculating shape params based on features list, timesteps, and batch size")
382 print(f
"Input Feature List: {self['features_list']}")
383 print(f
"Input Timesteps: {self['timesteps']}")
384 print(f
"Input Batch Size: {self['batch_size']}")
386 n_features
= len(self
['features_list'])
387 batch_shape
= (self
["batch_size"], self
["timesteps"], n_features
)
389 print("Calculated params:")
390 print(f
"Number of features: {n_features}")
391 print(f
"Batch Shape: {batch_shape}")
393 # Update the dictionary
395 'n_features': n_features
,
396 'batch_shape': batch_shape
401 def update(self
, *args
, verbose
=True, **kwargs
):
402 # Prevent updating n_features and batch_shape
403 restricted_keys
= {'n_features', 'batch_shape'}
404 keys_to_check
= {'features_list', 'timesteps', 'batch_size'}
406 # Check for restricted keys in args
408 if isinstance(args
[0], dict):
409 if restricted_keys
& args
[0].keys():
410 raise KeyError(f
"Cannot directly update keys: {restricted_keys & args[0].keys()}, \n Instead update one of: {keys_to_check}")
411 elif isinstance(args
[0], (tuple, list)) and all(isinstance(i
, tuple) and len(i
) == 2 for i
in args
[0]):
412 if restricted_keys
& {k
for k
, v
in args
[0]}:
413 raise KeyError(f
"Cannot directly update keys: {restricted_keys & {k for k, v in args[0]}}, \n Instead update one of: {keys_to_check}")
415 # Check for restricted keys in kwargs
416 if restricted_keys
& kwargs
.keys():
417 raise KeyError(f
"Cannot update restricted keys: {restricted_keys & kwargs.keys()}")
420 # Track if specific keys are updated
423 # Update using the standard dict update method
425 if isinstance(args
[0], dict):
426 keys_updated
.update(args
[0].keys() & keys_to_check
)
427 elif isinstance(args
[0], (tuple, list)) and all(isinstance(i
, tuple) and len(i
) == 2 for i
in args
[0]):
428 keys_updated
.update(k
for k
, v
in args
[0] if k
in keys_to_check
)
431 keys_updated
.update(kwargs
.keys() & keys_to_check
)
433 # Call the parent update method
434 super().update(*args
, **kwargs
)
436 # Recalculate shapes if necessary
438 self
.calc_param_shapes(verbose
=verbose
)
441 ## Class for handling input data
443 required_keys
= {"loc", "time", "X", "y", "features_list"}
444 def __init__(self
, input_dict
, scaler
=None, features_list
=None):
446 input_data
= input_dict
.copy()
447 super().__init
__(input_data
)
449 if scaler
is not None:
450 self
.set_scaler(scaler
)
451 # Rename and define other stuff
452 self
['hours'] = len(self
['y'])
453 self
['all_features_list'] = self
.pop('features_list')
454 if features_list
is None:
455 print("Using all input features.")
456 self
.features_list
= self
.all_features_list
458 self
.features_list
= features_list
460 self
.__dict
__.update(self
)
461 def run_checks(self
, verbose
=True):
462 missing_keys
= self
.required_keys
- self
.keys()
464 raise KeyError(f
"Missing required keys: {missing_keys}")
466 y_shape
= np
.shape(self
.y
)
467 if not (len(y_shape
) == 1 or (len(y_shape
) == 2 and y_shape
[1] == 1)):
468 raise ValueError(f
"'y' must be one-dimensional, with shape (N,) or (N, 1). Current shape is {y_shape}.")
470 # Check if 'hours' is provided and matches len(y)
472 if self
.hours
!= len(self
.y
):
473 raise ValueError(f
"Provided 'hours' value {self.hours} does not match the length of 'y', which is {len(self.y)}.")
474 # Check desired subset of features is in all input features
475 if not all_items_exist(self
.features_list
, self
.all_features_list
):
476 raise ValueError(f
"Provided 'features_list' {self.features_list} has elements not in input features.")
477 def set_scaler(self
, scaler
):
478 recognized_scalers
= ['minmax', 'standard']
479 if scaler
in recognized_scalers
:
480 self
.scaler
= scalers
[scaler
]
482 raise ValueError(f
"Unrecognized scaler '{scaler}'. Recognized scalers are: {recognized_scalers}.")
483 def train_test_split(self
, train_frac
, val_frac
=0.0, subset_features
=True, features_list
=None, split_time
=True, split_space
=False, verbose
=True):
484 # Extract data to desired features
488 if verbose
and self
.features_list
!= self
.all_features_list
:
489 print(f
"Subsetting input data to features_list: {self.features_list}")
490 # Indices to subset all features with based on params features
492 for item
in self
.features_list
:
493 if item
in self
.all_features_list
:
494 indices
.append(self
.all_features_list
.index(item
))
496 print(f
"Warning: feature name '{item}' not found in list of all features from input data")
498 # Setup train/test in time
499 train_ind
= int(np
.floor(self
.hours
* train_frac
)); self
.train_ind
= train_ind
500 test_ind
= int(train_ind
+ round(self
.hours
* val_frac
)); self
.test_ind
= test_ind
502 # Check for any potential issues with indices
503 if test_ind
> self
.hours
:
504 print(f
"Setting test index to {self.hours}")
505 test_ind
= self
.hours
506 if train_ind
>= test_ind
:
507 raise ValueError("Train index must be less than test index.")
509 # Training data from 0 to train_ind
510 self
.X_train
= X
[:train_ind
]
511 self
.y_train
= y
[:train_ind
].reshape(-1,1) # assumes y 1-d, change this if vector output
512 # Validation data from train_ind to test_ind
514 self
.X_val
= X
[train_ind
:test_ind
]
515 self
.y_val
= y
[train_ind
:test_ind
].reshape(-1,1) # assumes y 1-d, change this if vector output
516 # Test data from test_ind to end
517 self
.X_test
= X
[test_ind
:]
518 self
.y_test
= y
[test_ind
:].reshape(-1,1) # assumes y 1-d, change this if vector output
520 # Print statements if verbose
522 print(f
"Train index: 0 to {train_ind}")
523 print(f
"Validation index: {train_ind} to {test_ind}")
524 print(f
"Test index: {test_ind} to {self.hours}")
525 print(f
"X_train shape: {self.X_train.shape}, y_train shape: {self.y_train.shape}")
526 print(f
"X_val shape: {self.X_val.shape}, y_val shape: {self.y_val.shape}")
527 print(f
"X_test shape: {self.X_test.shape}, y_test shape: {self.y_test.shape}")
528 def scale_data(self
, verbose
=True):
529 if self
.scaler
is None:
530 raise ValueError("Scaler is not set. Use 'set_scaler' method to set a scaler before scaling data.")
531 if not hasattr(self
, "X_train"):
532 raise AttributeError("No X_train within object. Run train_test_split first. This is to avoid fitting the scaler with prediction data.")
534 print(f
"Scaling data with scaler {self.scaler}, fitting on X_train")
535 # Fit the scaler on the training data
536 self
.scaler
.fit(self
.X_train
)
537 # Transform the data using the fitted scaler
538 self
.X_train
= self
.scaler
.transform(self
.X_train
)
539 if hasattr(self
, 'X_val'):
540 self
.X_val
= self
.scaler
.transform(self
.X_val
)
541 self
.X_test
= self
.scaler
.transform(self
.X_test
)
542 def inverse_scale(self
, return_X
= 'all_hours', save_changes
=False, verbose
=True):
544 print("Inverse scaling data...")
545 X_train
= self
.scaler
.inverse_transform(self
.X_train
)
546 X_val
= self
.scaler
.inverse_transform(self
.X_val
)
547 X_test
= self
.scaler
.inverse_transform(self
.X_test
)
550 print("Inverse transformed data saved")
551 self
.X_train
= X_train
556 print("Inverse scaled, but internal data not changed.")
558 print(f
"Attempting to return {return_X}")
559 if return_X
== "all_hours":
560 return np
.concatenate((X_train
, X_val
, X_test
), axis
=0)
562 print(f
"Unrecognized or unimplemented return value {return_X}")
563 def print_hashes(self
, attrs_to_check
= ['X', 'y', 'X_train', 'y_train', 'X_val', 'y_val', 'X_test', 'y_test']):
564 for attr
in attrs_to_check
:
565 if hasattr(self
, attr
):
566 value
= getattr(self
, attr
)
567 print(f
"Hash of {attr}: {hash_ndarray(value)}")
568 def __getattr__(self
, key
):
572 raise AttributeError(f
"'rnn_data' object has no attribute '{key}'")
574 def __setitem__(self
, key
, value
):
575 super().__setitem
__(key
, value
) # Update the dictionary
576 if key
in self
.required_keys
:
577 super().__setattr
__(key
, value
) # Ensure the attribute is updated as well
579 def __setattr__(self
, key
, value
):
583 # Function to check reproduciblity hashes, environment info, and model parameters
584 def check_reproducibility(dict0
, params
, m_hash
, w_hash
):
585 if not hasattr(dict0
, "repro_info"):
586 warnings
.warn("The provided data dictionary does not have the required 'repro_info' attribute. Not running reproduciblity checks.")
589 repro_info
= dict0
.repro_info
591 if params
['phys_initialize']:
592 hashes
= repro_info
['phys_initialize']
593 warnings
.warn("Physics Initialization not implemented yet. Not running reproduciblity checks.")
595 hashes
= repro_info
['rand_initialize']
596 print(f
"Fitted weights hash: {w_hash} \n Reproducibility weights hash: {hashes['fitted_weights_hash']}")
597 print(f
"Model predictions hash: {m_hash} \n Reproducibility preds hash: {hashes['preds_hash']}")
598 if (w_hash
!= hashes
['fitted_weights_hash']) or (m_hash
!= hashes
['preds_hash']):
599 if w_hash
!= hashes
['fitted_weights_hash']:
600 warnings
.warn("The fitted weights hash does not match the reproducibility weights hash.")
601 if m_hash
!= hashes
['preds_hash']:
602 warnings
.warn("The predictions hash does not match the reproducibility predictions hash.")
604 print("***Reproducibility Checks passed - model weights and model predictions match expected.***")
607 current_py_version
= sys
.version
[0:6]
608 current_tf_version
= tf
.__version
__
609 if current_py_version
!= repro_info
['env_info']['py_version']:
610 warnings
.warn(f
"Python version mismatch: Current Python version is {current_py_version}, "
611 f
"expected {repro_info['env_info']['py_version']}.")
613 if current_tf_version
!= repro_info
['env_info']['tf_version']:
614 warnings
.warn(f
"TensorFlow version mismatch: Current TensorFlow version is {current_tf_version}, "
615 f
"expected {repro_info['env_info']['tf_version']}.")
618 repro_params
= repro_info
.get('params', {})
620 for key
, repro_value
in repro_params
.items():
622 if params
[key
] != repro_value
:
623 warnings
.warn(f
"Parameter mismatch for '{key}': Current value is {params[key]}, "
624 f
"repro value is {repro_value}.")
626 warnings
.warn(f
"Parameter '{key}' is missing in the current params.")
631 def __init__(self
, params
: dict):
632 self
.params
= copy
.deepcopy(params
)
633 self
.params
['n_features'] = len(self
.params
['features_list'])
634 if type(self
) is RNNModel
:
635 raise TypeError("MLModel is an abstract class and cannot be instantiated directly")
639 def _build_model_train(self
):
643 def _build_model_predict(self
, return_sequences
=True):
646 def fit(self
, X_train
, y_train
, plot
=True, plot_title
= '',
647 weights
=None, callbacks
=[], validation_data
=None, *args
, **kwargs
):
648 # verbose_fit argument is for printing out update after each epoch, which gets very long
649 # These print statements at the top could be turned off with a verbose argument, but then
650 # there would be a bunch of different verbose params
651 verbose_fit
= self
.params
['verbose_fit']
652 verbose_weights
= self
.params
['verbose_weights']
654 print(f
"Training simple RNN with params: {self.params}")
655 X_train
, y_train
= self
.format_train_data(X_train
, y_train
)
656 if validation_data
is not None:
657 X_val
, y_val
= self
.format_train_data(validation_data
[0], validation_data
[1])
659 print(f
"Formatted X_train hash: {hash_ndarray(X_train)}")
660 print(f
"Formatted y_train hash: {hash_ndarray(y_train)}")
661 if validation_data
is not None:
662 print(f
"Formatted X_val hash: {hash_ndarray(X_val)}")
663 print(f
"Formatted y_val hash: {hash_ndarray(y_val)}")
664 print(f
"Initial weights before training hash: {hash_weights(self.model_train)}")
666 if self
.params
["reset_states"]:
667 callbacks
=callbacks
+[ResetStatesCallback()]
668 # if validation_data is not None:
669 # callbacks=callbacks+[early_stopping]
671 # Note: we overload the params here so that verbose_fit can be easily turned on/off at the .fit call
673 # Evaluate Model once to set nonzero initial state
674 if self
.params
["batch_size"]>= X_train
.shape
[0]:
675 self
.model_train(X_train
)
676 if validation_data
is not None:
677 history
= self
.model_train
.fit(
678 X_train
, y_train
+self
.params
['centering'][1],
679 epochs
=self
.params
['epochs'],
680 batch_size
=self
.params
['batch_size'],
681 callbacks
= callbacks
,
683 validation_data
= (X_val
, y_val
),
687 history
= self
.model_train
.fit(
688 X_train
, y_train
+self
.params
['centering'][1],
689 epochs
=self
.params
['epochs'],
690 batch_size
=self
.params
['batch_size'],
691 callbacks
= callbacks
,
697 self
.plot_history(history
,plot_title
)
698 if self
.params
["verbose_weights"]:
699 print(f
"Fitted Weights Hash: {hash_weights(self.model_train)}")
701 # Update Weights for Prediction Model
702 w_fitted
= self
.model_train
.get_weights()
703 self
.model_predict
.set_weights(w_fitted
)
705 def predict(self
, X_test
):
707 X_test
= self
.format_pred_data(X_test
)
708 preds
= self
.model_predict
.predict(X_test
).flatten()
711 def format_train_data(self
, X
, y
, verbose
=False):
712 X
, y
= staircase_2(X
, y
, timesteps
= self
.params
["timesteps"], batch_size
=self
.params
["batch_size"], verbose
=verbose
)
714 def format_pred_data(self
, X
):
715 return np
.reshape(X
,(1, X
.shape
[0], self
.params
['n_features']))
717 def plot_history(self
, history
, plot_title
):
719 plt
.semilogy(history
.history
['loss'], label
='Training loss')
720 if 'val_loss' in history
.history
:
721 plt
.semilogy(history
.history
['val_loss'], label
='Validation loss')
722 plt
.title(f
'{plot_title} Model loss')
725 plt
.legend(loc
='upper left')
728 def run_model(self
, dict0
, reproducibility_run
=False):
729 verbose_fit
= self
.params
['verbose_fit']
730 verbose_weights
= self
.params
['verbose_weights']
731 # Make copy to prevent changing in place
732 dict1
= copy
.deepcopy(dict0
)
734 print("Input data hashes, NOT formatted for rnn sequence/batches yet")
737 scale_fm
= dict1
['scale_fm']
738 X_train
, y_train
, X_test
, y_test
= dict1
['X_train'].copy(), dict1
['y_train'].copy(), dict1
["X_test"].copy(), dict1
['y_test'].copy()
740 X_val
, y_val
= dict1
['X_val'].copy(), dict1
['y_val'].copy()
743 case_id
= dict1
['case']
747 self
.fit(X_train
, y_train
, plot_title
=case_id
)
749 self
.fit(X_train
, y_train
, validation_data
= (X_val
, y_val
), plot_title
=case_id
)
750 # Generate Predictions,
751 # run through training to get hidden state set proporly for forecast period
753 X
= np
.concatenate((X_train
, X_test
))
754 y
= np
.concatenate((y_train
, y_test
)).flatten()
756 X
= np
.concatenate((X_train
, X_val
, X_test
))
757 y
= np
.concatenate((y_train
, y_val
, y_test
)).flatten()
760 print(f
"Predicting Training through Test")
761 print(f
"All X hash: {hash_ndarray(X)}")
763 m
= self
.predict(X
).flatten()
765 print(f
"Predictions Hash: {hash_ndarray(m)}")
767 dict0
['m']=m
# add to outside env dictionary, should be only place this happens
769 # if self.params['scale']:
770 # print(f"Rescaling data using {self.params['scaler']}")
771 # if self.params['scaler'] == "reproducibility":
774 # y_train *= scale_fm
776 # Check Reproducibility, TODO: old dict calls it hidden_units not rnn_units, so this doens't check that
777 # if (case_id == "reproducibility") and compare_dicts(self.params, repro_hashes['params'], ['epochs', 'batch_size', 'scale', 'activation', 'learning_rate']):
778 # print("Checking Reproducibility")
780 # hv = hash2(self.model_predict.get_weights())
781 # if self.params['phys_initialize']:
782 # hv5 = repro_hashes['phys_initialize']['fitted_weight_hash']
783 # mv = repro_hashes['phys_initialize']['predictions_hash']
785 # hv5 = repro_hashes['rand_initialize']['fitted_weight_hash']
786 # mv = repro_hashes['rand_initialize']['predictions_hash']
788 # print(f"Fitted weights hash (check 5): {hv} \n Reproducibility weights hash: {hv5} \n Error: {hv5-hv}")
789 # print(f"Model predictions hash: {checkm} \n Reproducibility preds hash: {mv} \n Error: {mv-checkm}")
790 if reproducibility_run
:
791 print("Checking Reproducibility")
792 check_reproducibility(dict0
, self
.params
, hash_ndarray(m
), hash_weights(self
.model_predict
))
794 # print(dict1.keys())
795 # Plot final fit and data
797 plot_data(dict1
, title
="RNN", title2
=dict1
['case'])
801 train_ind
= dict1
["train_ind"] # index of final training set value
802 test_ind
= dict1
["test_ind"] # index of first test set value
803 err_train
= rmse(m
[:train_ind
], y_train
.flatten())
804 err_pred
= rmse(m
[test_ind
:], y_test
.flatten())
807 'training': err_train
,
808 'prediction': err_pred
816 class ResetStatesCallback(Callback
):
817 def on_epoch_end(self
, epoch
, logs
=None):
818 # Iterate over each layer in the model
819 for layer
in self
.model
.layers
:
820 # Check if the layer has a reset_states method
821 if hasattr(layer
, 'reset_states'):
826 ## Learning Schedules
827 lr_schedule
= tf
.keras
.optimizers
.schedules
.CosineDecay(
828 initial_learning_rate
=0.001,
832 # warmup_target=None,
837 early_stopping
= EarlyStopping(
838 monitor
='val_loss', # Metric to monitor, e.g., 'val_loss', 'val_accuracy'
839 patience
=5, # Number of epochs with no improvement after which training will be stopped
840 verbose
=1, # Print information about early stopping
841 mode
='min', # 'min' means training will stop when the quantity monitored has stopped decreasing
842 restore_best_weights
=True # Restore model weights from the epoch with the best value of the monitored quantity
845 # with open("params.yaml") as file:
846 # phys_params = yaml.safe_load(file)["physics_initializer"]
849 'DeltaE': [0,-1], # bias correction
850 'T1': 0.1, # 1/fuel class (10)
851 'fm_raise_vs_rain': 0.2 # fm increase per mm rain
856 def get_initial_weights(model_fit
,params
,scale_fm
):
857 # Given a RNN architecture and hyperparameter dictionary, return array of physics-initiated weights
859 # model_fit: output of create_RNN_2 with no training
860 # params: (dict) dictionary of hyperparameters
861 # rnn_dat: (dict) data dictionary, output of create_rnn_dat
862 # Returns: numpy ndarray of weights that should be a rough solution to the moisture ODE
863 DeltaE
= phys_params
['DeltaE']
864 T1
= phys_params
['T1']
865 fmr
= phys_params
['fm_raise_vs_rain']
866 centering
= params
['centering'] # shift activation down
868 w0_initial
={'Ed':(1.-np
.exp(-T1
))/2,
869 'Ew':(1.-np
.exp(-T1
))/2,
870 'rain':fmr
* scale_fm
} # wx - input feature
871 # wh wb wd bd = bias -1
873 w_initial
=np
.array([np
.nan
, np
.exp(-0.1), DeltaE
[0]/scale_fm
, # layer 0
874 1.0, -centering
[0] + DeltaE
[1]/scale_fm
]) # layer 1
875 if params
['verbose_weights']:
876 print('Equilibrium moisture correction bias',DeltaE
[0],
877 'in the hidden layer and',DeltaE
[1],' in the output layer')
879 w_name
= ['wx','wh','bh','wd','bd']
881 w
=model_fit
.get_weights()
882 for j
in range(w
[0].shape
[0]):
883 feature
= params
['features_list'][j
]
884 for k
in range(w
[0].shape
[1]):
885 w
[0][j
][k
]=w0_initial
[feature
]
886 for i
in range(1,len(w
)): # number of the weight
887 for j
in range(w
[i
].shape
[0]): # number of the inputs
889 # initialize all entries of the weight matrix to the same number
890 for k
in range(w
[i
].shape
[1]):
891 w
[i
][j
][k
]=w_initial
[i
]/w
[i
].shape
[0]
895 print('weight',i
,'shape',w
[i
].shape
)
896 raise ValueError("Only 1 or 2 dimensions supported")
897 if params
['verbose_weights']:
898 print('weight',i
,w_name
[i
],'shape',w
[i
].shape
,'ndim',w
[i
].ndim
,
899 'initial: sum',np
.sum(w
[i
],axis
=0),'\nentries',w
[i
])
904 def __init__(self
, params
, loss
='mean_squared_error'):
905 super().__init
__(params
)
906 self
.model_train
= self
._build
_model
_train
()
907 self
.model_predict
= self
._build
_model
_predict
()
909 def _build_model_train(self
):
910 inputs
= tf
.keras
.Input(batch_shape
=self
.params
['batch_shape'])
912 for i
in range(self
.params
['rnn_layers']):
913 return_sequences
= True if i
< self
.params
['rnn_layers'] - 1 else False
915 units
=self
.params
['rnn_units'],
916 activation
=self
.params
['activation'][0],
917 dropout
=self
.params
["dropout"][0],
918 recurrent_dropout
= self
.params
["recurrent_dropout"],
919 stateful
=self
.params
['stateful'],
920 return_sequences
=return_sequences
)(x
)
921 if self
.params
["dropout"][1] > 0:
922 x
= Dropout(self
.params
["dropout"][1])(x
)
923 for i
in range(self
.params
['dense_layers']):
924 x
= Dense(self
.params
['dense_units'], activation
=self
.params
['activation'][1])(x
)
925 # Add final output layer, must be 1 dense cell with linear activation if continuous scalar output
926 x
= Dense(units
=1, activation
='linear')(x
)
927 model
= tf
.keras
.Model(inputs
=inputs
, outputs
=x
)
928 optimizer
=tf
.keras
.optimizers
.Adam(learning_rate
=self
.params
['learning_rate'])
929 # optimizer=tf.keras.optimizers.Adam(learning_rate=lr_schedule)
930 model
.compile(loss
='mean_squared_error', optimizer
=optimizer
)
932 if self
.params
["verbose_weights"]:
933 print(f
"Initial Weights Hash: {hash_weights(model)}")
934 # print(model.get_weights())
936 if self
.params
['phys_initialize']:
937 assert self
.params
['scaler'] == 'reproducibility', f
"Not implemented yet to do physics initialize with given data scaling {self.params['scaler']}"
938 assert self
.params
['features_list'] == ['Ed', 'Ew', 'rain'], f
"Physics initiation can only be done with features ['Ed', 'Ew', 'rain'], but given features {self.params['features_list']}"
939 print("Initializing Model with Physics based weights")
940 w
, w_name
=get_initial_weights(model
, self
.params
, scale_fm
= scale_fm
)
942 print('initial weights hash =',hash_weights(model
))
944 def _build_model_predict(self
, return_sequences
=True):
946 inputs
= tf
.keras
.Input(shape
=(None,self
.params
['n_features']))
948 for i
in range(self
.params
['rnn_layers']):
949 x
= SimpleRNN(self
.params
['rnn_units'],activation
=self
.params
['activation'][0],
950 stateful
=False,return_sequences
=return_sequences
)(x
)
951 for i
in range(self
.params
['dense_layers']):
952 x
= Dense(self
.params
['dense_units'], activation
=self
.params
['activation'][1])(x
)
953 # Add final output layer, must be 1 dense cell with linear activation if continuous scalar output
954 x
= Dense(units
=1, activation
='linear')(x
)
955 model
= tf
.keras
.Model(inputs
=inputs
, outputs
=x
)
956 optimizer
=tf
.keras
.optimizers
.Adam(learning_rate
=self
.params
['learning_rate'])
957 model
.compile(loss
='mean_squared_error', optimizer
=optimizer
)
959 # Set Weights to model_train
960 w_fitted
= self
.model_train
.get_weights()
961 model
.set_weights(w_fitted
)
966 class RNN_LSTM(RNNModel
):
967 def __init__(self
, params
, loss
='mean_squared_error'):
968 super().__init
__(params
)
969 self
.model_train
= self
._build
_model
_train
()
970 self
.model_predict
= self
._build
_model
_predict
()
972 def _build_model_train(self
):
973 inputs
= tf
.keras
.Input(batch_shape
=self
.params
['batch_shape'])
975 for i
in range(self
.params
['rnn_layers']):
976 return_sequences
= True if i
< self
.params
['rnn_layers'] - 1 else False
978 units
=self
.params
['rnn_units'],
979 activation
=self
.params
['activation'][0],
980 dropout
=self
.params
["dropout"][0],
981 recurrent_dropout
= self
.params
["recurrent_dropout"],
982 recurrent_activation
=self
.params
["recurrent_activation"],
983 stateful
=self
.params
['stateful'],
984 return_sequences
=return_sequences
)(x
)
985 if self
.params
["dropout"][1] > 0:
986 x
= Dropout(self
.params
["dropout"][1])(x
)
987 for i
in range(self
.params
['dense_layers']):
988 x
= Dense(self
.params
['dense_units'], activation
=self
.params
['activation'][1])(x
)
989 model
= tf
.keras
.Model(inputs
=inputs
, outputs
=x
)
990 optimizer
=tf
.keras
.optimizers
.Adam(learning_rate
=self
.params
['learning_rate'], clipvalue
=self
.params
['clipvalue'])
991 model
.compile(loss
='mean_squared_error', optimizer
=optimizer
)
993 if self
.params
["verbose_weights"]:
994 print(f
"Initial Weights Hash: {hash_weights(model)}")
996 def _build_model_predict(self
, return_sequences
=True):
998 inputs
= tf
.keras
.Input(shape
=(None,self
.params
['n_features']))
1000 for i
in range(self
.params
['rnn_layers']):
1002 units
=self
.params
['rnn_units'],
1003 activation
=self
.params
['activation'][0],
1004 stateful
=False,return_sequences
=return_sequences
)(x
)
1005 for i
in range(self
.params
['dense_layers']):
1006 x
= Dense(self
.params
['dense_units'], activation
=self
.params
['activation'][1])(x
)
1007 model
= tf
.keras
.Model(inputs
=inputs
, outputs
=x
)
1008 optimizer
=tf
.keras
.optimizers
.Adam(learning_rate
=self
.params
['learning_rate'])
1009 model
.compile(loss
='mean_squared_error', optimizer
=optimizer
)
1011 # Set Weights to model_train
1012 w_fitted
= self
.model_train
.get_weights()
1013 model
.set_weights(w_fitted
)