BUG FIX: add needed imports to utils
[notebooks.git] / fmda / moisture_rnn.py
blobeb7f225bdcdd6702da44782945f3d87d7084951f
1 # v2 training and prediction class infrastructure
3 # Environment
4 import numpy as np
5 import pandas as pd
6 import tensorflow as tf
7 import matplotlib.pyplot as plt
8 import sys
9 from tensorflow.keras.callbacks import Callback, EarlyStopping
10 # from sklearn.metrics import mean_squared_error
11 import logging
12 from tensorflow.keras.layers import LSTM, SimpleRNN, Input, Dropout, Dense
13 # Local modules
14 import reproducibility
15 # from utils import print_dict_summary
16 from abc import ABC, abstractmethod
17 from utils import hash2, all_items_exist, hash_ndarray, hash_weights
18 from data_funcs import rmse, plot_data, compare_dicts
19 import copy
20 # import yaml
21 from sklearn.preprocessing import MinMaxScaler, StandardScaler
22 import warnings
24 #*************************************************************************************
25 # Data Formatting Functions
27 def staircase(x,y,timesteps,datapoints,return_sequences=False, verbose = False):
28 # x [datapoints,features] all inputs
29 # y [datapoints,outputs]
30 # timesteps: split x and y into samples length timesteps, shifted by 1
31 # datapoints: number of timesteps to use for training, no more than y.shape[0]
32 if verbose:
33 print('staircase: shape x = ',x.shape)
34 print('staircase: shape y = ',y.shape)
35 print('staircase: timesteps=',timesteps)
36 print('staircase: datapoints=',datapoints)
37 print('staircase: return_sequences=',return_sequences)
38 outputs = y.shape[1]
39 features = x.shape[1]
40 samples = datapoints-timesteps+1
41 if verbose:
42 print('staircase: samples=',samples,'timesteps=',timesteps,'features=',features)
43 x_train = np.empty([samples, timesteps, features])
44 if return_sequences:
45 if verbose:
46 print('returning all timesteps in a sample')
47 y_train = np.empty([samples, timesteps, outputs]) # all
48 for i in range(samples):
49 for k in range(timesteps):
50 x_train[i,k,:] = x[i+k,:]
51 y_train[i,k,:] = y[i+k,:]
52 else:
53 if verbose:
54 print('returning only the last timestep in a sample')
55 y_train = np.empty([samples, outputs])
56 for i in range(samples):
57 for k in range(timesteps):
58 x_train[i,k,:] = x[i+k,:]
59 y_train[i,:] = y[i+timesteps-1,:]
61 return x_train, y_train
63 def staircase_2(x,y,timesteps,batch_size=None,trainsteps=np.inf,return_sequences=False, verbose = False):
64 # create RNN training data in multiple batches
65 # input:
66 # x (,features)
67 # y (,outputs)
68 # timesteps: split x and y into sequences length timesteps
69 # a.k.a. lookback or sequence_length
71 # print params if verbose
73 if batch_size is None:
74 raise ValueError('staircase_2 requires batch_size')
75 if verbose:
76 print('staircase_2: shape x = ',x.shape)
77 print('staircase_2: shape y = ',y.shape)
78 print('staircase_2: timesteps=',timesteps)
79 print('staircase_2: batch_size=',batch_size)
80 print('staircase_2: return_sequences=',return_sequences)
82 nx,features= x.shape
83 ny,outputs = y.shape
84 datapoints = min(nx,ny,trainsteps)
85 if verbose:
86 print('staircase_2: datapoints=',datapoints)
88 # sequence j in a given batch is assumed to be the continuation of sequence j in the previous batch
89 # https://www.tensorflow.org/guide/keras/working_with_rnns Cross-batch statefulness
91 # example with timesteps=3 batch_size=3 datapoints=15
92 # batch 0: [0 1 2] [1 2 3] [2 3 4]
93 # batch 1: [3 4 5] [4 5 6] [5 6 7]
94 # batch 2: [6 7 8] [7 8 9] [8 9 10]
95 # batch 3: [9 10 11] [10 11 12] [11 12 13]
96 # batch 4: [12 13 14] [13 14 15] when runs out this is the last batch, can be shorter
98 # TODO: implement for multiple locations, same starting time for each batch
99 # Loc 1 Loc 2 Loc 3
100 # batch 0: [0 1 2] [0 1 2] [0 1 2]
101 # batch 1: [3 4 5] [3 4 5] [3 4 5]
102 # batch 2: [6 7 8] [6 7 8] [6 7 8]
103 # TODO: second epoch shift starting time at batch 0 in time
105 # TODO: implement for multiple locations, different starting times for each batch
106 # Loc 1 Loc 2 Loc 3
107 # batch 0: [0 1 2] [1 2 3] [2 3 4]
108 # batch 1: [3 4 5] [4 5 6] [5 6 57
109 # batch 2: [6 7 8] [7 8 9] [8 9 10]
112 # the first sample in batch j starts from timesteps*j and ends with timesteps*(j+1)-1
113 # e.g. the final hidden state of the rnn after the sequence of steps [0 1 2] in batch 0
114 # becomes the starting hidden state of the rnn in the sequence of steps [3 4 5] in batch 1, etc.
116 # sample [0 1 2] means the rnn is used twice to map state 0 -> 1 -> 2
117 # the state at time 0 is fixed but the state is considered a variable at times 1 and 2
118 # the loss is computed from the output at time 2 and the gradient of the loss function by chain rule which ends at time 0 because the state there is a constant -> derivative is zero
119 # sample [3 4 5] means the rnn is used twice to map state 3 -> 4 -> 5 # the state at time 3 is fixed to the output of the first sequence [0 1 2]
120 # the loss is computed from the output at time 5 and the gradient of the loss function by chain rule which ends at time 3 because the state there is considered constant -> derivative is zero
121 # how is the gradient computed? I suppose keras adds gradient wrt the weights at 2 5 8 ... 3 6 9... 4 7 ... and uses that to update the weights
122 # there is only one set of weights h(2) = f(h(1),w) h(1) = f(h(0),w) but w is always the same
123 # each column is a one successive evaluation of h(n+1) = f(h(n),w) for n = n_startn n_start+1,...
124 # the cannot be evaluated efficiently on gpu because gpu is a parallel processor
125 # this of it as each column served by one thread, and the threads are independent because they execute in parallel, there needs to be large number of threads (32 is a good number)\
126 # each batch consists of independent calculations
127 # but it can depend on the result of the previous batch (that's the recurrent parr)
131 max_batches = datapoints // timesteps
132 max_sequences = max_batches * batch_size
134 if verbose:
135 print('staircase_2: max_batches=',max_batches)
136 print('staircase_2: max_sequences=',max_sequences)
138 x_train = np.zeros((max_sequences, timesteps, features))
139 if return_sequences:
140 y_train = np.empty((max_sequences, timesteps, outputs))
141 else:
142 y_train = np.empty((max_sequences, outputs ))
144 # build the sequences
146 for i in range(max_batches):
147 for j in range(batch_size):
148 begin = i*timesteps + j
149 next = begin + timesteps
150 if next > datapoints:
151 break
152 if verbose:
153 print('sequence',k,'batch',i,'sample',j,'data',begin,'to',next-1)
154 x_train[k,:,:] = x[begin:next,:]
155 if return_sequences:
156 y_train[k,:,:] = y[begin:next,:]
157 else:
158 y_train[k,:] = y[next-1,:]
159 k += 1
160 if verbose:
161 print('staircase_2: shape x_train = ',x_train.shape)
162 print('staircase_2: shape y_train = ',y_train.shape)
163 print('staircase_2: sequences generated',k)
164 print('staircase_2: batch_size=',batch_size)
165 k = (k // batch_size) * batch_size
166 if verbose:
167 print('staircase_2: removing partial and empty batches at the end, keeping',k)
168 x_train = x_train[:k,:,:]
169 if return_sequences:
170 y_train = y_train[:k,:,:]
171 else:
172 y_train = y_train[:k,:]
174 if verbose:
175 print('staircase_2: shape x_train = ',x_train.shape)
176 print('staircase_2: shape y_train = ',y_train.shape)
178 return x_train, y_train
181 # Dictionary of scalers, used to avoid multiple object creation and to avoid multiple if statements
182 scalers = {
183 'minmax': MinMaxScaler(),
184 'standard': StandardScaler()
187 # def scale_transform(X, method='minmax'):
188 # # Function to scale data in place
189 # # Inputs:
190 # # X: (ndarray) data to be scaled
191 # # method: (str) one of keys in scalers dictionary above
192 # scaler = scalers[method]
193 # scaler.fit(X)
194 # # Modify X in-place
195 # X[:] = scaler.transform(X)
197 def create_rnn_data2(dict1, params, atm_dict="HRRR", verbose=False, train_ind=None, test_ind=None):
198 # Given fmda data and hyperparameters, return formatted dictionary to be used in RNN
199 # Inputs:
200 # d: (dict) fmda dictionary
201 # params: (dict) hyperparameters
202 # atm_dict: (str) string specifying name of subdictionary for atmospheric vars
203 # train_frac: (float) fraction of data to use for training (starting from time 0)
204 # val_frac: (float) fraction of data to use for validation data (starting from end of train)
205 # Returns: (dict) formatted data used in RNN
206 logging.info('create_rnn_data start')
207 # Copy Dictionary to avoid changing the input to this function
208 d=copy.deepcopy(dict1)
209 scale = params['scale']
210 scaler= params['scaler']
211 # Features list given by params dict to be used in training
212 features_list = params["features_list"]
213 # All available features list, corresponds to shape of X
214 features_all = d["features_list"]
215 # Indices to subset all features with based on params features
216 indices = []
217 for item in features_list:
218 if item in features_all:
219 indices.append(features_all.index(item))
220 else:
221 print(f"Warning: feature name '{item}' not found in list of all features from input data")
223 # Extract desired features based on params, combine into matrix
224 # Extract response vector
225 y = d['y']
226 y = np.reshape(y,(-1,1))
227 # Extract Features matrix, subset to desired features
228 X_raw = d['X'][:, indices].copy() # saw untransformed features matrix
229 X = d['X']
230 X = X[:, indices]
232 # Check total observed hours
233 hours=d['hours']
234 assert hours == y.shape[0] # Check that it matches response
236 logging.info('create_rnn_data: total_hours=%s',hours)
237 logging.info('feature matrix X shape %s',np.shape(X))
238 logging.info('target matrix Y shape %s',np.shape(y))
239 logging.info('features_list: %s',features_list)
241 logging.info('splitting train/val/test')
242 if train_ind is None:
243 train_ind = round(hours * params['train_frac']) # index of last training observation
244 test_ind= train_ind + round(hours * params['val_frac'])# index of first test observation, if no validation data it is equal to train_ind
245 logging.info('Final index of training data=%s',train_ind)
246 logging.info('First index of Test data=%s',test_ind)
247 # Training data from 0 to train_ind
248 X_train = X[:train_ind]
249 y_train = y[:train_ind].reshape(-1,1)
250 # Validation data from train_ind to test_ind
251 X_val = X[train_ind:test_ind]
252 y_val = y[train_ind:test_ind].reshape(-1,1)
253 # Test data from test_ind to end
254 X_test = X[test_ind:]
255 y_test = y[test_ind:].reshape(-1,1)
257 # Scale Data if required
258 # TODO:
259 # Remove need for "scale_fm" param
260 # Reset reproducibility with this scaling
261 if scale:
262 logging.info('Scaling feature data with scaler: %s',scaler)
263 # scale=1
264 if scaler=="reproducibility":
265 scale_fm = 17.076346687085564
266 scale_rain = 0.01
267 else:
268 scale_fm=1.0
269 scale_rain=1.0
270 # Fit scaler to training data
271 scalers[scaler].fit(X_train)
272 # Apply scaling to all data using in-place operations
273 X_train[:] = scalers[scaler].transform(X_train)
274 if X_val.shape[0] > 0:
275 X_val[:] = scalers[scaler].transform(X_val)
276 X_test[:] = scalers[scaler].transform(X_test)
279 else:
280 print("Not scaling data")
281 scale_fm=1.0
282 scale_rain=1.0
283 scaler=None
285 logging.info('x_train shape=%s',X_train.shape)
286 logging.info('y_train shape=%s',y_train.shape)
287 if test_ind == train_ind:
288 logging.info('No validation data')
289 elif X_val.shape[0]!= 0:
290 logging.info('X_val shape=%s',X_val.shape)
291 logging.info('y_val shape=%s',y_val.shape)
292 logging.info('X_test shape=%s',X_test.shape)
293 logging.info('y_test shape=%s',y_test.shape)
295 # Set up return dictionary
296 rnn_dat={
297 'case':d['case'],
298 'hours':hours,
299 'features_list':features_list,
300 'n_features': len(features_list),
301 'scaler':scaler,
302 'train_ind':train_ind,
303 'test_ind':test_ind,
304 'X_raw': X_raw,
305 'X':X,
306 'y':y,
307 'X_train': X_train,
308 'y_train': y_train,
309 'X_test': X_test,
310 'y_test': y_test
313 if X_val.shape[0] > 0:
314 rnn_dat.update({
315 'X_val': X_val,
316 'y_val': y_val
319 # Update RNN params using data attributes
320 logging.info('Updating model params based on data')
321 timesteps = params['timesteps']
322 batch_size = params['batch_size']
323 logging.info('batch_size=%s',batch_size)
324 logging.info('timesteps=%s',timesteps)
325 features = len(features_list)
326 # params.update({
327 # 'n_features': features,
328 # 'batch_shape': (params["batch_size"],params["timesteps"],features),
329 # 'pred_input_shape': (None, features),
330 # 'scaler': scaler,
331 # 'scale_fm': scale_fm,
332 # 'scale_rain': scale_rain
333 # })
334 rnn_dat.update({
335 'scaler': scaler,
336 'scale_fm': scale_fm,
337 'scale_rain': scale_rain
340 logging.info('create_rnn_data2 done')
341 return rnn_dat
343 #***********************************************************************************************
344 ### RNN Class Functionality
346 # Custom class for parameters dictionary. Inherits from dict, but adds checks and safeguards
347 class RNNParams(dict):
348 def __init__(self, input_dict=None):
349 super().__init__(input_dict)
350 # Automatically run checks on initialization
351 self.run_checks()
352 # Automatically calculate shapes on initialization
353 self.calc_param_shapes()
354 def run_checks(self, verbose=True):
355 print("Checking params...")
356 # Keys must exist and be integers
357 int_keys = [
358 'batch_size', 'timesteps', 'rnn_layers',
359 'rnn_units', 'dense_layers', 'dense_units', 'epochs'
362 for key in int_keys:
363 assert key in self, f"Missing required key: {key}"
364 assert isinstance(self[key], int), f"Key '{key}' must be an integer"
366 # Keys must exist and be lists
367 list_keys = ['activation', 'features_list', 'dropout']
368 for key in list_keys:
369 assert key in self, f"Missing required key: {key}"
370 assert isinstance(self[key], list), f"Key '{key}' must be a list"
372 # Keys must exist and be floats
373 float_keys = ['learning_rate', 'train_frac', 'val_frac']
374 for key in float_keys:
375 assert key in self, f"Missing required key: {key}"
376 assert isinstance(self[key], float), f"Key '{key}' must be a float"
378 print("Input dictionary passed all checks.")
379 def calc_param_shapes(self, verbose=True):
380 if verbose:
381 print("Calculating shape params based on features list, timesteps, and batch size")
382 print(f"Input Feature List: {self['features_list']}")
383 print(f"Input Timesteps: {self['timesteps']}")
384 print(f"Input Batch Size: {self['batch_size']}")
386 n_features = len(self['features_list'])
387 batch_shape = (self["batch_size"], self["timesteps"], n_features)
388 if verbose:
389 print("Calculated params:")
390 print(f"Number of features: {n_features}")
391 print(f"Batch Shape: {batch_shape}")
393 # Update the dictionary
394 super().update({
395 'n_features': n_features,
396 'batch_shape': batch_shape
398 if verbose:
399 print(self)
401 def update(self, *args, verbose=True, **kwargs):
402 # Prevent updating n_features and batch_shape
403 restricted_keys = {'n_features', 'batch_shape'}
404 keys_to_check = {'features_list', 'timesteps', 'batch_size'}
406 # Check for restricted keys in args
407 if args:
408 if isinstance(args[0], dict):
409 if restricted_keys & args[0].keys():
410 raise KeyError(f"Cannot directly update keys: {restricted_keys & args[0].keys()}, \n Instead update one of: {keys_to_check}")
411 elif isinstance(args[0], (tuple, list)) and all(isinstance(i, tuple) and len(i) == 2 for i in args[0]):
412 if restricted_keys & {k for k, v in args[0]}:
413 raise KeyError(f"Cannot directly update keys: {restricted_keys & {k for k, v in args[0]}}, \n Instead update one of: {keys_to_check}")
415 # Check for restricted keys in kwargs
416 if restricted_keys & kwargs.keys():
417 raise KeyError(f"Cannot update restricted keys: {restricted_keys & kwargs.keys()}")
420 # Track if specific keys are updated
421 keys_updated = set()
423 # Update using the standard dict update method
424 if args:
425 if isinstance(args[0], dict):
426 keys_updated.update(args[0].keys() & keys_to_check)
427 elif isinstance(args[0], (tuple, list)) and all(isinstance(i, tuple) and len(i) == 2 for i in args[0]):
428 keys_updated.update(k for k, v in args[0] if k in keys_to_check)
430 if kwargs:
431 keys_updated.update(kwargs.keys() & keys_to_check)
433 # Call the parent update method
434 super().update(*args, **kwargs)
436 # Recalculate shapes if necessary
437 if keys_updated:
438 self.calc_param_shapes(verbose=verbose)
441 ## Class for handling input data
442 class RNNData(dict):
443 required_keys = {"loc", "time", "X", "y", "features_list"}
444 def __init__(self, input_dict, scaler=None, features_list=None):
445 # Copy to avoid
446 input_data = input_dict.copy()
447 super().__init__(input_data)
448 self.scaler = None
449 if scaler is not None:
450 self.set_scaler(scaler)
451 # Rename and define other stuff
452 self['hours'] = len(self['y'])
453 self['all_features_list'] = self.pop('features_list')
454 if features_list is None:
455 print("Using all input features.")
456 self.features_list = self.all_features_list
457 else:
458 self.features_list = features_list
459 self.run_checks()
460 self.__dict__.update(self)
461 def run_checks(self, verbose=True):
462 missing_keys = self.required_keys - self.keys()
463 if missing_keys:
464 raise KeyError(f"Missing required keys: {missing_keys}")
465 # Check y 1-d
466 y_shape = np.shape(self.y)
467 if not (len(y_shape) == 1 or (len(y_shape) == 2 and y_shape[1] == 1)):
468 raise ValueError(f"'y' must be one-dimensional, with shape (N,) or (N, 1). Current shape is {y_shape}.")
470 # Check if 'hours' is provided and matches len(y)
471 if 'hours' in self:
472 if self.hours != len(self.y):
473 raise ValueError(f"Provided 'hours' value {self.hours} does not match the length of 'y', which is {len(self.y)}.")
474 # Check desired subset of features is in all input features
475 if not all_items_exist(self.features_list, self.all_features_list):
476 raise ValueError(f"Provided 'features_list' {self.features_list} has elements not in input features.")
477 def set_scaler(self, scaler):
478 recognized_scalers = ['minmax', 'standard']
479 if scaler in recognized_scalers:
480 self.scaler = scalers[scaler]
481 else:
482 raise ValueError(f"Unrecognized scaler '{scaler}'. Recognized scalers are: {recognized_scalers}.")
483 def train_test_split(self, train_frac, val_frac=0.0, subset_features=True, features_list=None, split_time=True, split_space=False, verbose=True):
484 # Extract data to desired features
485 X = self.X.copy()
486 y = self.y.copy()
487 if subset_features:
488 if verbose and self.features_list != self.all_features_list:
489 print(f"Subsetting input data to features_list: {self.features_list}")
490 # Indices to subset all features with based on params features
491 indices = []
492 for item in self.features_list:
493 if item in self.all_features_list:
494 indices.append(self.all_features_list.index(item))
495 else:
496 print(f"Warning: feature name '{item}' not found in list of all features from input data")
497 X = X[:, indices]
498 # Setup train/test in time
499 train_ind = int(np.floor(self.hours * train_frac)); self.train_ind = train_ind
500 test_ind= int(train_ind + round(self.hours * val_frac)); self.test_ind = test_ind
502 # Check for any potential issues with indices
503 if test_ind > self.hours:
504 print(f"Setting test index to {self.hours}")
505 test_ind = self.hours
506 if train_ind >= test_ind:
507 raise ValueError("Train index must be less than test index.")
509 # Training data from 0 to train_ind
510 self.X_train = X[:train_ind]
511 self.y_train = y[:train_ind].reshape(-1,1) # assumes y 1-d, change this if vector output
512 # Validation data from train_ind to test_ind
513 if val_frac >0:
514 self.X_val = X[train_ind:test_ind]
515 self.y_val = y[train_ind:test_ind].reshape(-1,1) # assumes y 1-d, change this if vector output
516 # Test data from test_ind to end
517 self.X_test = X[test_ind:]
518 self.y_test = y[test_ind:].reshape(-1,1) # assumes y 1-d, change this if vector output
520 # Print statements if verbose
521 if verbose:
522 print(f"Train index: 0 to {train_ind}")
523 print(f"Validation index: {train_ind} to {test_ind}")
524 print(f"Test index: {test_ind} to {self.hours}")
525 print(f"X_train shape: {self.X_train.shape}, y_train shape: {self.y_train.shape}")
526 print(f"X_val shape: {self.X_val.shape}, y_val shape: {self.y_val.shape}")
527 print(f"X_test shape: {self.X_test.shape}, y_test shape: {self.y_test.shape}")
528 def scale_data(self, verbose=True):
529 if self.scaler is None:
530 raise ValueError("Scaler is not set. Use 'set_scaler' method to set a scaler before scaling data.")
531 if not hasattr(self, "X_train"):
532 raise AttributeError("No X_train within object. Run train_test_split first. This is to avoid fitting the scaler with prediction data.")
533 if verbose:
534 print(f"Scaling data with scaler {self.scaler}, fitting on X_train")
535 # Fit the scaler on the training data
536 self.scaler.fit(self.X_train)
537 # Transform the data using the fitted scaler
538 self.X_train = self.scaler.transform(self.X_train)
539 if hasattr(self, 'X_val'):
540 self.X_val = self.scaler.transform(self.X_val)
541 self.X_test = self.scaler.transform(self.X_test)
542 def inverse_scale(self, return_X = 'all_hours', save_changes=False, verbose=True):
543 if verbose:
544 print("Inverse scaling data...")
545 X_train = self.scaler.inverse_transform(self.X_train)
546 X_val = self.scaler.inverse_transform(self.X_val)
547 X_test = self.scaler.inverse_transform(self.X_test)
549 if save_changes:
550 print("Inverse transformed data saved")
551 self.X_train = X_train
552 self.X_val = X_val
553 self.X_test = X_test
554 else:
555 if verbose:
556 print("Inverse scaled, but internal data not changed.")
557 if verbose:
558 print(f"Attempting to return {return_X}")
559 if return_X == "all_hours":
560 return np.concatenate((X_train, X_val, X_test), axis=0)
561 else:
562 print(f"Unrecognized or unimplemented return value {return_X}")
563 def print_hashes(self, attrs_to_check = ['X', 'y', 'X_train', 'y_train', 'X_val', 'y_val', 'X_test', 'y_test']):
564 for attr in attrs_to_check:
565 if hasattr(self, attr):
566 value = getattr(self, attr)
567 print(f"Hash of {attr}: {hash_ndarray(value)}")
568 def __getattr__(self, key):
569 try:
570 return self[key]
571 except KeyError:
572 raise AttributeError(f"'rnn_data' object has no attribute '{key}'")
574 def __setitem__(self, key, value):
575 super().__setitem__(key, value) # Update the dictionary
576 if key in self.required_keys:
577 super().__setattr__(key, value) # Ensure the attribute is updated as well
579 def __setattr__(self, key, value):
580 self[key] = value
583 # Function to check reproduciblity hashes, environment info, and model parameters
584 def check_reproducibility(dict0, params, m_hash, w_hash):
585 if not hasattr(dict0, "repro_info"):
586 warnings.warn("The provided data dictionary does not have the required 'repro_info' attribute. Not running reproduciblity checks.")
587 return
589 repro_info = dict0.repro_info
590 # Check Hashes
591 if params['phys_initialize']:
592 hashes = repro_info['phys_initialize']
593 warnings.warn("Physics Initialization not implemented yet. Not running reproduciblity checks.")
594 else:
595 hashes = repro_info['rand_initialize']
596 print(f"Fitted weights hash: {w_hash} \n Reproducibility weights hash: {hashes['fitted_weights_hash']}")
597 print(f"Model predictions hash: {m_hash} \n Reproducibility preds hash: {hashes['preds_hash']}")
598 if (w_hash != hashes['fitted_weights_hash']) or (m_hash != hashes['preds_hash']):
599 if w_hash != hashes['fitted_weights_hash']:
600 warnings.warn("The fitted weights hash does not match the reproducibility weights hash.")
601 if m_hash != hashes['preds_hash']:
602 warnings.warn("The predictions hash does not match the reproducibility predictions hash.")
603 else:
604 print("***Reproducibility Checks passed - model weights and model predictions match expected.***")
606 # Check Environment
607 current_py_version = sys.version[0:6]
608 current_tf_version = tf.__version__
609 if current_py_version != repro_info['env_info']['py_version']:
610 warnings.warn(f"Python version mismatch: Current Python version is {current_py_version}, "
611 f"expected {repro_info['env_info']['py_version']}.")
613 if current_tf_version != repro_info['env_info']['tf_version']:
614 warnings.warn(f"TensorFlow version mismatch: Current TensorFlow version is {current_tf_version}, "
615 f"expected {repro_info['env_info']['tf_version']}.")
617 # Check Params
618 repro_params = repro_info.get('params', {})
620 for key, repro_value in repro_params.items():
621 if key in params:
622 if params[key] != repro_value:
623 warnings.warn(f"Parameter mismatch for '{key}': Current value is {params[key]}, "
624 f"repro value is {repro_value}.")
625 else:
626 warnings.warn(f"Parameter '{key}' is missing in the current params.")
628 return
630 class RNNModel(ABC):
631 def __init__(self, params: dict):
632 self.params = copy.deepcopy(params)
633 self.params['n_features'] = len(self.params['features_list'])
634 if type(self) is RNNModel:
635 raise TypeError("MLModel is an abstract class and cannot be instantiated directly")
636 super().__init__()
638 @abstractmethod
639 def _build_model_train(self):
640 pass
642 @abstractmethod
643 def _build_model_predict(self, return_sequences=True):
644 pass
646 def fit(self, X_train, y_train, plot=True, plot_title = '',
647 weights=None, callbacks=[], validation_data=None, *args, **kwargs):
648 # verbose_fit argument is for printing out update after each epoch, which gets very long
649 # These print statements at the top could be turned off with a verbose argument, but then
650 # there would be a bunch of different verbose params
651 verbose_fit = self.params['verbose_fit']
652 verbose_weights = self.params['verbose_weights']
653 if verbose_weights:
654 print(f"Training simple RNN with params: {self.params}")
655 X_train, y_train = self.format_train_data(X_train, y_train)
656 if validation_data is not None:
657 X_val, y_val = self.format_train_data(validation_data[0], validation_data[1])
658 if verbose_weights:
659 print(f"Formatted X_train hash: {hash_ndarray(X_train)}")
660 print(f"Formatted y_train hash: {hash_ndarray(y_train)}")
661 if validation_data is not None:
662 print(f"Formatted X_val hash: {hash_ndarray(X_val)}")
663 print(f"Formatted y_val hash: {hash_ndarray(y_val)}")
664 print(f"Initial weights before training hash: {hash_weights(self.model_train)}")
665 # Setup callbacks
666 if self.params["reset_states"]:
667 callbacks=callbacks+[ResetStatesCallback()]
668 # if validation_data is not None:
669 # callbacks=callbacks+[early_stopping]
671 # Note: we overload the params here so that verbose_fit can be easily turned on/off at the .fit call
673 # Evaluate Model once to set nonzero initial state
674 if self.params["batch_size"]>= X_train.shape[0]:
675 self.model_train(X_train)
676 if validation_data is not None:
677 history = self.model_train.fit(
678 X_train, y_train+self.params['centering'][1],
679 epochs=self.params['epochs'],
680 batch_size=self.params['batch_size'],
681 callbacks = callbacks,
682 verbose=verbose_fit,
683 validation_data = (X_val, y_val),
684 *args, **kwargs
686 else:
687 history = self.model_train.fit(
688 X_train, y_train+self.params['centering'][1],
689 epochs=self.params['epochs'],
690 batch_size=self.params['batch_size'],
691 callbacks = callbacks,
692 verbose=verbose_fit,
693 *args, **kwargs
696 if plot:
697 self.plot_history(history,plot_title)
698 if self.params["verbose_weights"]:
699 print(f"Fitted Weights Hash: {hash_weights(self.model_train)}")
701 # Update Weights for Prediction Model
702 w_fitted = self.model_train.get_weights()
703 self.model_predict.set_weights(w_fitted)
705 def predict(self, X_test):
706 print("Predicting")
707 X_test = self.format_pred_data(X_test)
708 preds = self.model_predict.predict(X_test).flatten()
709 return preds
711 def format_train_data(self, X, y, verbose=False):
712 X, y = staircase_2(X, y, timesteps = self.params["timesteps"], batch_size=self.params["batch_size"], verbose=verbose)
713 return X, y
714 def format_pred_data(self, X):
715 return np.reshape(X,(1, X.shape[0], self.params['n_features']))
717 def plot_history(self, history, plot_title):
718 plt.figure()
719 plt.semilogy(history.history['loss'], label='Training loss')
720 if 'val_loss' in history.history:
721 plt.semilogy(history.history['val_loss'], label='Validation loss')
722 plt.title(f'{plot_title} Model loss')
723 plt.ylabel('Loss')
724 plt.xlabel('Epoch')
725 plt.legend(loc='upper left')
726 plt.show()
728 def run_model(self, dict0, reproducibility_run=False):
729 verbose_fit = self.params['verbose_fit']
730 verbose_weights = self.params['verbose_weights']
731 # Make copy to prevent changing in place
732 dict1 = copy.deepcopy(dict0)
733 if verbose_weights:
734 print("Input data hashes, NOT formatted for rnn sequence/batches yet")
735 dict1.print_hashes()
736 # Extract Fields
737 scale_fm = dict1['scale_fm']
738 X_train, y_train, X_test, y_test = dict1['X_train'].copy(), dict1['y_train'].copy(), dict1["X_test"].copy(), dict1['y_test'].copy()
739 if 'X_val' in dict1:
740 X_val, y_val = dict1['X_val'].copy(), dict1['y_val'].copy()
741 else:
742 X_val = None
743 case_id = dict1['case']
745 # Fit model
746 if X_val is None:
747 self.fit(X_train, y_train, plot_title=case_id)
748 else:
749 self.fit(X_train, y_train, validation_data = (X_val, y_val), plot_title=case_id)
750 # Generate Predictions,
751 # run through training to get hidden state set proporly for forecast period
752 if X_val is None:
753 X = np.concatenate((X_train, X_test))
754 y = np.concatenate((y_train, y_test)).flatten()
755 else:
756 X = np.concatenate((X_train, X_val, X_test))
757 y = np.concatenate((y_train, y_val, y_test)).flatten()
758 # Predict
759 if verbose_weights:
760 print(f"Predicting Training through Test")
761 print(f"All X hash: {hash_ndarray(X)}")
763 m = self.predict(X).flatten()
764 if verbose_weights:
765 print(f"Predictions Hash: {hash_ndarray(m)}")
766 dict1['m']=m
767 dict0['m']=m # add to outside env dictionary, should be only place this happens
769 # if self.params['scale']:
770 # print(f"Rescaling data using {self.params['scaler']}")
771 # if self.params['scaler'] == "reproducibility":
772 # m *= scale_fm
773 # y *= scale_fm
774 # y_train *= scale_fm
775 # y_test *= scale_fm
776 # Check Reproducibility, TODO: old dict calls it hidden_units not rnn_units, so this doens't check that
777 # if (case_id == "reproducibility") and compare_dicts(self.params, repro_hashes['params'], ['epochs', 'batch_size', 'scale', 'activation', 'learning_rate']):
778 # print("Checking Reproducibility")
779 # checkm = m[350]
780 # hv = hash2(self.model_predict.get_weights())
781 # if self.params['phys_initialize']:
782 # hv5 = repro_hashes['phys_initialize']['fitted_weight_hash']
783 # mv = repro_hashes['phys_initialize']['predictions_hash']
784 # else:
785 # hv5 = repro_hashes['rand_initialize']['fitted_weight_hash']
786 # mv = repro_hashes['rand_initialize']['predictions_hash']
788 # print(f"Fitted weights hash (check 5): {hv} \n Reproducibility weights hash: {hv5} \n Error: {hv5-hv}")
789 # print(f"Model predictions hash: {checkm} \n Reproducibility preds hash: {mv} \n Error: {mv-checkm}")
790 if reproducibility_run:
791 print("Checking Reproducibility")
792 check_reproducibility(dict0, self.params, hash_ndarray(m), hash_weights(self.model_predict))
794 # print(dict1.keys())
795 # Plot final fit and data
796 dict1['y'] = y
797 plot_data(dict1, title="RNN", title2=dict1['case'])
799 # Calculate Errors
800 err = rmse(m, y)
801 train_ind = dict1["train_ind"] # index of final training set value
802 test_ind = dict1["test_ind"] # index of first test set value
803 err_train = rmse(m[:train_ind], y_train.flatten())
804 err_pred = rmse(m[test_ind:], y_test.flatten())
805 rmse_dict = {
806 'all': err,
807 'training': err_train,
808 'prediction': err_pred
810 return m, rmse_dict
814 ## Callbacks
816 class ResetStatesCallback(Callback):
817 def on_epoch_end(self, epoch, logs=None):
818 # Iterate over each layer in the model
819 for layer in self.model.layers:
820 # Check if the layer has a reset_states method
821 if hasattr(layer, 'reset_states'):
822 layer.reset_states()
826 ## Learning Schedules
827 lr_schedule = tf.keras.optimizers.schedules.CosineDecay(
828 initial_learning_rate=0.001,
829 decay_steps=1000,
830 alpha=0.0,
831 name='CosineDecay',
832 # warmup_target=None,
833 # warmup_steps=100
837 early_stopping = EarlyStopping(
838 monitor='val_loss', # Metric to monitor, e.g., 'val_loss', 'val_accuracy'
839 patience=5, # Number of epochs with no improvement after which training will be stopped
840 verbose=1, # Print information about early stopping
841 mode='min', # 'min' means training will stop when the quantity monitored has stopped decreasing
842 restore_best_weights=True # Restore model weights from the epoch with the best value of the monitored quantity
845 # with open("params.yaml") as file:
846 # phys_params = yaml.safe_load(file)["physics_initializer"]
848 phys_params = {
849 'DeltaE': [0,-1], # bias correction
850 'T1': 0.1, # 1/fuel class (10)
851 'fm_raise_vs_rain': 0.2 # fm increase per mm rain
856 def get_initial_weights(model_fit,params,scale_fm):
857 # Given a RNN architecture and hyperparameter dictionary, return array of physics-initiated weights
858 # Inputs:
859 # model_fit: output of create_RNN_2 with no training
860 # params: (dict) dictionary of hyperparameters
861 # rnn_dat: (dict) data dictionary, output of create_rnn_dat
862 # Returns: numpy ndarray of weights that should be a rough solution to the moisture ODE
863 DeltaE = phys_params['DeltaE']
864 T1 = phys_params['T1']
865 fmr = phys_params['fm_raise_vs_rain']
866 centering = params['centering'] # shift activation down
868 w0_initial={'Ed':(1.-np.exp(-T1))/2,
869 'Ew':(1.-np.exp(-T1))/2,
870 'rain':fmr * scale_fm} # wx - input feature
871 # wh wb wd bd = bias -1
873 w_initial=np.array([np.nan, np.exp(-0.1), DeltaE[0]/scale_fm, # layer 0
874 1.0, -centering[0] + DeltaE[1]/scale_fm]) # layer 1
875 if params['verbose_weights']:
876 print('Equilibrium moisture correction bias',DeltaE[0],
877 'in the hidden layer and',DeltaE[1],' in the output layer')
879 w_name = ['wx','wh','bh','wd','bd']
881 w=model_fit.get_weights()
882 for j in range(w[0].shape[0]):
883 feature = params['features_list'][j]
884 for k in range(w[0].shape[1]):
885 w[0][j][k]=w0_initial[feature]
886 for i in range(1,len(w)): # number of the weight
887 for j in range(w[i].shape[0]): # number of the inputs
888 if w[i].ndim==2:
889 # initialize all entries of the weight matrix to the same number
890 for k in range(w[i].shape[1]):
891 w[i][j][k]=w_initial[i]/w[i].shape[0]
892 elif w[i].ndim==1:
893 w[i][j]=w_initial[i]
894 else:
895 print('weight',i,'shape',w[i].shape)
896 raise ValueError("Only 1 or 2 dimensions supported")
897 if params['verbose_weights']:
898 print('weight',i,w_name[i],'shape',w[i].shape,'ndim',w[i].ndim,
899 'initial: sum',np.sum(w[i],axis=0),'\nentries',w[i])
901 return w, w_name
903 class RNN(RNNModel):
904 def __init__(self, params, loss='mean_squared_error'):
905 super().__init__(params)
906 self.model_train = self._build_model_train()
907 self.model_predict = self._build_model_predict()
909 def _build_model_train(self):
910 inputs = tf.keras.Input(batch_shape=self.params['batch_shape'])
911 x = inputs
912 for i in range(self.params['rnn_layers']):
913 return_sequences = True if i < self.params['rnn_layers'] - 1 else False
914 x = SimpleRNN(
915 units=self.params['rnn_units'],
916 activation=self.params['activation'][0],
917 dropout=self.params["dropout"][0],
918 recurrent_dropout = self.params["recurrent_dropout"],
919 stateful=self.params['stateful'],
920 return_sequences=return_sequences)(x)
921 if self.params["dropout"][1] > 0:
922 x = Dropout(self.params["dropout"][1])(x)
923 for i in range(self.params['dense_layers']):
924 x = Dense(self.params['dense_units'], activation=self.params['activation'][1])(x)
925 # Add final output layer, must be 1 dense cell with linear activation if continuous scalar output
926 x = Dense(units=1, activation='linear')(x)
927 model = tf.keras.Model(inputs=inputs, outputs=x)
928 optimizer=tf.keras.optimizers.Adam(learning_rate=self.params['learning_rate'])
929 # optimizer=tf.keras.optimizers.Adam(learning_rate=lr_schedule)
930 model.compile(loss='mean_squared_error', optimizer=optimizer)
932 if self.params["verbose_weights"]:
933 print(f"Initial Weights Hash: {hash_weights(model)}")
934 # print(model.get_weights())
936 if self.params['phys_initialize']:
937 assert self.params['scaler'] == 'reproducibility', f"Not implemented yet to do physics initialize with given data scaling {self.params['scaler']}"
938 assert self.params['features_list'] == ['Ed', 'Ew', 'rain'], f"Physics initiation can only be done with features ['Ed', 'Ew', 'rain'], but given features {self.params['features_list']}"
939 print("Initializing Model with Physics based weights")
940 w, w_name=get_initial_weights(model, self.params, scale_fm = scale_fm)
941 model.set_weights(w)
942 print('initial weights hash =',hash_weights(model))
943 return model
944 def _build_model_predict(self, return_sequences=True):
946 inputs = tf.keras.Input(shape=(None,self.params['n_features']))
947 x = inputs
948 for i in range(self.params['rnn_layers']):
949 x = SimpleRNN(self.params['rnn_units'],activation=self.params['activation'][0],
950 stateful=False,return_sequences=return_sequences)(x)
951 for i in range(self.params['dense_layers']):
952 x = Dense(self.params['dense_units'], activation=self.params['activation'][1])(x)
953 # Add final output layer, must be 1 dense cell with linear activation if continuous scalar output
954 x = Dense(units=1, activation='linear')(x)
955 model = tf.keras.Model(inputs=inputs, outputs=x)
956 optimizer=tf.keras.optimizers.Adam(learning_rate=self.params['learning_rate'])
957 model.compile(loss='mean_squared_error', optimizer=optimizer)
959 # Set Weights to model_train
960 w_fitted = self.model_train.get_weights()
961 model.set_weights(w_fitted)
963 return model
966 class RNN_LSTM(RNNModel):
967 def __init__(self, params, loss='mean_squared_error'):
968 super().__init__(params)
969 self.model_train = self._build_model_train()
970 self.model_predict = self._build_model_predict()
972 def _build_model_train(self):
973 inputs = tf.keras.Input(batch_shape=self.params['batch_shape'])
974 x = inputs
975 for i in range(self.params['rnn_layers']):
976 return_sequences = True if i < self.params['rnn_layers'] - 1 else False
977 x = LSTM(
978 units=self.params['rnn_units'],
979 activation=self.params['activation'][0],
980 dropout=self.params["dropout"][0],
981 recurrent_dropout = self.params["recurrent_dropout"],
982 recurrent_activation=self.params["recurrent_activation"],
983 stateful=self.params['stateful'],
984 return_sequences=return_sequences)(x)
985 if self.params["dropout"][1] > 0:
986 x = Dropout(self.params["dropout"][1])(x)
987 for i in range(self.params['dense_layers']):
988 x = Dense(self.params['dense_units'], activation=self.params['activation'][1])(x)
989 model = tf.keras.Model(inputs=inputs, outputs=x)
990 optimizer=tf.keras.optimizers.Adam(learning_rate=self.params['learning_rate'], clipvalue=self.params['clipvalue'])
991 model.compile(loss='mean_squared_error', optimizer=optimizer)
993 if self.params["verbose_weights"]:
994 print(f"Initial Weights Hash: {hash_weights(model)}")
995 return model
996 def _build_model_predict(self, return_sequences=True):
998 inputs = tf.keras.Input(shape=(None,self.params['n_features']))
999 x = inputs
1000 for i in range(self.params['rnn_layers']):
1001 x = LSTM(
1002 units=self.params['rnn_units'],
1003 activation=self.params['activation'][0],
1004 stateful=False,return_sequences=return_sequences)(x)
1005 for i in range(self.params['dense_layers']):
1006 x = Dense(self.params['dense_units'], activation=self.params['activation'][1])(x)
1007 model = tf.keras.Model(inputs=inputs, outputs=x)
1008 optimizer=tf.keras.optimizers.Adam(learning_rate=self.params['learning_rate'])
1009 model.compile(loss='mean_squared_error', optimizer=optimizer)
1011 # Set Weights to model_train
1012 w_fitted = self.model_train.get_weights()
1013 model.set_weights(w_fitted)
1015 return model