Bug fix: hours in split dictionary func
[notebooks.git] / fmda / moisture_rnn.py
blobb5df47240aae54bce7b597193f8b04807890bafd
1 # v2 training and prediction class infrastructure
3 # Environment
4 import random
5 import numpy as np
6 import pandas as pd
7 import tensorflow as tf
8 import matplotlib.pyplot as plt
9 import sys
10 from tensorflow.keras.callbacks import Callback, EarlyStopping, TerminateOnNaN
11 # from sklearn.metrics import mean_squared_error
12 import logging
13 from tensorflow.keras.layers import LSTM, SimpleRNN, Input, Dropout, Dense
14 # Local modules
15 import reproducibility
16 # from utils import print_dict_summary
17 from abc import ABC, abstractmethod
18 from utils import hash2, all_items_exist, hash_ndarray, hash_weights
19 from data_funcs import rmse, plot_data, compare_dicts
20 import copy
21 # import yaml
22 from sklearn.preprocessing import MinMaxScaler, StandardScaler
23 import warnings
25 #*************************************************************************************
26 # Data Formatting Functions
28 def staircase(x,y,timesteps,datapoints,return_sequences=False, verbose = False):
29 # x [datapoints,features] all inputs
30 # y [datapoints,outputs]
31 # timesteps: split x and y into samples length timesteps, shifted by 1
32 # datapoints: number of timesteps to use for training, no more than y.shape[0]
33 if verbose:
34 print('staircase: shape x = ',x.shape)
35 print('staircase: shape y = ',y.shape)
36 print('staircase: timesteps=',timesteps)
37 print('staircase: datapoints=',datapoints)
38 print('staircase: return_sequences=',return_sequences)
39 outputs = y.shape[1]
40 features = x.shape[1]
41 samples = datapoints-timesteps+1
42 if verbose:
43 print('staircase: samples=',samples,'timesteps=',timesteps,'features=',features)
44 x_train = np.empty([samples, timesteps, features])
45 if return_sequences:
46 if verbose:
47 print('returning all timesteps in a sample')
48 y_train = np.empty([samples, timesteps, outputs]) # all
49 for i in range(samples):
50 for k in range(timesteps):
51 x_train[i,k,:] = x[i+k,:]
52 y_train[i,k,:] = y[i+k,:]
53 else:
54 if verbose:
55 print('returning only the last timestep in a sample')
56 y_train = np.empty([samples, outputs])
57 for i in range(samples):
58 for k in range(timesteps):
59 x_train[i,k,:] = x[i+k,:]
60 y_train[i,:] = y[i+timesteps-1,:]
62 return x_train, y_train
64 def staircase_2(x,y,timesteps,batch_size=None,trainsteps=np.inf,return_sequences=False, verbose = False):
65 # create RNN training data in multiple batches
66 # input:
67 # x (,features)
68 # y (,outputs)
69 # timesteps: split x and y into sequences length timesteps
70 # a.k.a. lookback or sequence_length
72 # print params if verbose
74 if batch_size is None:
75 raise ValueError('staircase_2 requires batch_size')
76 if verbose:
77 print('staircase_2: shape x = ',x.shape)
78 print('staircase_2: shape y = ',y.shape)
79 print('staircase_2: timesteps=',timesteps)
80 print('staircase_2: batch_size=',batch_size)
81 print('staircase_2: return_sequences=',return_sequences)
83 nx,features= x.shape
84 ny,outputs = y.shape
85 datapoints = min(nx,ny,trainsteps)
86 if verbose:
87 print('staircase_2: datapoints=',datapoints)
89 # sequence j in a given batch is assumed to be the continuation of sequence j in the previous batch
90 # https://www.tensorflow.org/guide/keras/working_with_rnns Cross-batch statefulness
92 # example with timesteps=3 batch_size=3 datapoints=15
93 # batch 0: [0 1 2] [1 2 3] [2 3 4]
94 # batch 1: [3 4 5] [4 5 6] [5 6 7]
95 # batch 2: [6 7 8] [7 8 9] [8 9 10]
96 # batch 3: [9 10 11] [10 11 12] [11 12 13]
97 # batch 4: [12 13 14] [13 14 15] when runs out this is the last batch, can be shorter
99 # TODO: implement for multiple locations, same starting time for each batch
100 # Loc 1 Loc 2 Loc 3
101 # batch 0: [0 1 2] [0 1 2] [0 1 2]
102 # batch 1: [3 4 5] [3 4 5] [3 4 5]
103 # batch 2: [6 7 8] [6 7 8] [6 7 8]
104 # TODO: second epoch shift starting time at batch 0 in time
106 # TODO: implement for multiple locations, different starting times for each batch
107 # Loc 1 Loc 2 Loc 3
108 # batch 0: [0 1 2] [1 2 3] [2 3 4]
109 # batch 1: [3 4 5] [4 5 6] [5 6 57
110 # batch 2: [6 7 8] [7 8 9] [8 9 10]
113 # the first sample in batch j starts from timesteps*j and ends with timesteps*(j+1)-1
114 # e.g. the final hidden state of the rnn after the sequence of steps [0 1 2] in batch 0
115 # becomes the starting hidden state of the rnn in the sequence of steps [3 4 5] in batch 1, etc.
117 # sample [0 1 2] means the rnn is used twice to map state 0 -> 1 -> 2
118 # the state at time 0 is fixed but the state is considered a variable at times 1 and 2
119 # the loss is computed from the output at time 2 and the gradient of the loss function by chain rule which ends at time 0 because the state there is a constant -> derivative is zero
120 # sample [3 4 5] means the rnn is used twice to map state 3 -> 4 -> 5 # the state at time 3 is fixed to the output of the first sequence [0 1 2]
121 # the loss is computed from the output at time 5 and the gradient of the loss function by chain rule which ends at time 3 because the state there is considered constant -> derivative is zero
122 # how is the gradient computed? I suppose keras adds gradient wrt the weights at 2 5 8 ... 3 6 9... 4 7 ... and uses that to update the weights
123 # there is only one set of weights h(2) = f(h(1),w) h(1) = f(h(0),w) but w is always the same
124 # each column is a one successive evaluation of h(n+1) = f(h(n),w) for n = n_startn n_start+1,...
125 # the cannot be evaluated efficiently on gpu because gpu is a parallel processor
126 # this of it as each column served by one thread, and the threads are independent because they execute in parallel, there needs to be large number of threads (32 is a good number)\
127 # each batch consists of independent calculations
128 # but it can depend on the result of the previous batch (that's the recurrent parr)
132 max_batches = datapoints // timesteps
133 max_sequences = max_batches * batch_size
135 if verbose:
136 print('staircase_2: max_batches=',max_batches)
137 print('staircase_2: max_sequences=',max_sequences)
139 x_train = np.zeros((max_sequences, timesteps, features))
140 if return_sequences:
141 y_train = np.empty((max_sequences, timesteps, outputs))
142 else:
143 y_train = np.empty((max_sequences, outputs ))
145 # build the sequences
147 for i in range(max_batches):
148 for j in range(batch_size):
149 begin = i*timesteps + j
150 next = begin + timesteps
151 if next > datapoints:
152 break
153 if verbose:
154 print('sequence',k,'batch',i,'sample',j,'data',begin,'to',next-1)
155 x_train[k,:,:] = x[begin:next,:]
156 if return_sequences:
157 y_train[k,:,:] = y[begin:next,:]
158 else:
159 y_train[k,:] = y[next-1,:]
160 k += 1
161 if verbose:
162 print('staircase_2: shape x_train = ',x_train.shape)
163 print('staircase_2: shape y_train = ',y_train.shape)
164 print('staircase_2: sequences generated',k)
165 print('staircase_2: batch_size=',batch_size)
166 k = (k // batch_size) * batch_size
167 if verbose:
168 print('staircase_2: removing partial and empty batches at the end, keeping',k)
169 x_train = x_train[:k,:,:]
170 if return_sequences:
171 y_train = y_train[:k,:,:]
172 else:
173 y_train = y_train[:k,:]
175 if verbose:
176 print('staircase_2: shape x_train = ',x_train.shape)
177 print('staircase_2: shape y_train = ',y_train.shape)
179 return x_train, y_train
182 # Dictionary of scalers, used to avoid multiple object creation and to avoid multiple if statements
183 scalers = {
184 'minmax': MinMaxScaler(),
185 'standard': StandardScaler()
189 def batch_setup(ids, batch_size):
191 Sets up stateful batched training data scheme for RNN training.
193 This function takes a list or array of identifiers (`ids`) and divides them into batches of a specified size (`batch_size`). If the last batch does not have enough elements to meet the `batch_size`, the function will loop back to the start of the identifiers and continue filling the batch until it reaches the required size.
195 Parameters:
196 -----------
197 ids : list or numpy array
198 A list or numpy array containing the ids to be batched.
200 batch_size : int
201 The desired size of each batch.
203 Returns:
204 --------
205 batches : list of lists
206 A list where each element is a batch (itself a list) of identifiers. Each batch will contain exactly `batch_size` elements.
208 Example:
209 --------
210 >>> ids = [1, 2, 3, 4, 5]
211 >>> batch_size = 3
212 >>> batch_setup(ids, batch_size)
213 [[1, 2, 3], [4, 5, 1]]
215 Notes:
216 ------
217 - If `ids` is shorter than `batch_size`, the returned list will contain a single batch where identifiers are repeated from the start of `ids` until the batch is filled.
218 """
219 # Ensure ids is a numpy array
220 x = np.array(ids)
222 # Initialize the list to hold the batches
223 batches = []
225 # Use a loop to slice the list/array into batches
226 for i in range(0, len(x), batch_size):
227 batch = list(x[i:i + batch_size])
229 # If the batch is not full, continue from the start
230 while len(batch) < batch_size:
231 # Calculate the remaining number of items needed
232 remaining = batch_size - len(batch)
233 # Append the needed number of items from the start of the array
234 batch.extend(x[:remaining])
236 batches.append(batch)
238 return batches
240 def staircase_spatial(X, y, batch_size, timesteps, hours=None, start_times = None, verbose = True):
242 Prepares spatially formatted time series data for RNN training by creating batches of sequences across different locations, stacked to be compatible with stateful models.
244 This function processes multi-location time series data by slicing it into batches and formatting it to fit into a recurrent neural network (RNN) model. It utilizes a staircase-like approach to prepare sequences for each location and then interlaces them to align with stateful RNN structures.
246 Parameters:
247 -----------
248 X : list of numpy arrays
249 A list where each element is a numpy array containing features for a specific location. The shape of each array is `(total_time_steps, features)`.
251 y : list of numpy arrays
252 A list where each element is a numpy array containing the target values for a specific location. The shape of each array is `(total_time_steps,)`.
254 batch_size : int
255 The number of sequences to include in each batch.
257 timesteps : int
258 The number of time steps to include in each sequence for the RNN.
260 hours : int, optional
261 The length of each time series to consider for each location. If `None`, it defaults to the minimum length of `y` across all locations.
263 start_times : numpy array, optional
264 The initial time step for each location. If `None`, it defaults to an array starting from 0 and incrementing by 1 for each location.
266 verbose : bool, optional
267 If `True`, prints additional information during processing. Default is `True`.
269 Returns:
270 --------
271 XX : numpy array
272 A 3D numpy array with shape `(total_sequences, timesteps, features)` containing the prepared feature sequences for all locations.
274 yy : numpy array
275 A 2D numpy array with shape `(total_sequences, 1)` containing the corresponding target values for all locations.
277 n_seqs : int
278 Number of sequences per location. Used to reset states when location changes. Hidden state of RNN will be reset after n_seqs number of batches
280 Notes:
281 ------
282 - The function handles spatially distributed time series data by batching and formatting it for stateful RNNs.
283 - `hours` determines how much of the time series is used for each location. If not provided, it defaults to the shortest series in `y`.
284 - If `start_times` is not provided, it assumes each location starts its series at progressively later time steps.
285 - The `batch_setup` function is used internally to manage the creation of location and time step batches.
286 - The returned feature sequences `XX` and target sequences `yy` are interlaced to align with the expected input format of stateful RNNs.
289 # Generate ids based on number of distinct timeseries provided
290 n_loc = len(y) # assuming each list entry for y is a separate location
291 loc_ids = np.arange(n_loc)
293 # Generate hours and start_times if None
294 if hours is None:
295 print("Setting total hours to minimum length of y in provided dictionary")
296 hours = min(len(yi) for yi in y)
297 if start_times is None:
298 print("Setting Start times to offset by 1 hour by location")
299 start_times = np.arange(n_loc)
300 # Set up batches
301 loc_batch, t_batch = batch_setup(loc_ids, batch_size), batch_setup(start_times, batch_size)
302 if verbose:
303 print(f"Location ID Batches: {loc_batch}")
304 print(f"Start Times for Batches: {t_batch}")
306 # Loop over batches and construct with staircase_2
307 Xs = []
308 ys = []
309 for i in range(0, len(loc_batch)):
310 locs_i = loc_batch[i]
311 ts = t_batch[i]
312 for j in range(0, len(locs_i)):
313 t0 = ts[j]
314 tend = t0 + hours
315 # Create RNNData Dict
316 # Subset data to given location and time from t0 to t0+hours
317 k = locs_i[j] # Used to account for fewer locations than batch size
318 X_temp = X[k][t0:tend,:]
319 y_temp = y[k][t0:tend].reshape(-1,1)
321 # Format sequences
322 Xi, yi = staircase_2(
323 X_temp,
324 y_temp,
325 timesteps = timesteps,
326 batch_size = 1, # note: using 1 here to format sequences for a single location, not same as target batch size for training data
327 verbose=False)
329 Xs.append(Xi)
330 ys.append(yi)
332 # Drop incomplete batches
333 lens = [yi.shape[0] for yi in ys]
334 n_seqs = min(lens)
335 if verbose:
336 print(f"Minimum number of sequences by location: {n_seqs}")
337 print(f"Applying minimum length to other arrays.")
338 Xs = [Xi[:n_seqs] for Xi in Xs]
339 ys = [yi[:n_seqs] for yi in ys]
341 # Interlace arrays to match stateful structure
342 n_features = Xi.shape[2]
343 XXs = []
344 yys = []
345 for i in range(0, len(loc_batch)):
346 locs_i = loc_batch[i]
347 XXi = np.empty((Xs[0].shape[0]*batch_size, 5, n_features))
348 yyi = np.empty((Xs[0].shape[0]*batch_size, 1))
349 for j in range(0, len(locs_i)):
350 XXi[j::(batch_size)] = Xs[locs_i[j]]
351 yyi[j::(batch_size)] = ys[locs_i[j]]
352 XXs.append(XXi)
353 yys.append(yyi)
354 yy = np.concatenate(yys, axis=0)
355 XX = np.concatenate(XXs, axis=0)
357 if verbose:
358 print(f"Spatially Formatted X Shape: {XX.shape}")
359 print(f"Spatially Formatted X Shape: {yy.shape}")
362 return XX, yy, n_seqs
364 #***********************************************************************************************
365 ### RNN Class Functionality
367 class RNNParams(dict):
369 A custom dictionary class for handling RNN parameters. Automatically calculates certain params based on others. Overwrites the update method to protect from incompatible parameter choices. Inherits from dict
370 """
371 def __init__(self, input_dict):
373 Initializes the RNNParams instance and runs checks and shape calculations.
375 Parameters:
376 -----------
377 input_dict : dict,
378 A dictionary containing RNN parameters.
380 super().__init__(input_dict)
381 # Automatically run checks on initialization
382 self.run_checks()
383 # Automatically calculate shapes on initialization
384 self.calc_param_shapes()
385 def run_checks(self, verbose=True):
387 Validates that required keys exist and are of the correct type.
389 Parameters:
390 -----------
391 verbose : bool, optional
392 If True, prints status messages. Default is True.
394 print("Checking params...")
395 # Keys must exist and be integers
396 int_keys = [
397 'batch_size', 'timesteps', 'rnn_layers',
398 'rnn_units', 'dense_layers', 'dense_units', 'epochs'
401 for key in int_keys:
402 assert key in self, f"Missing required key: {key}"
403 assert isinstance(self[key], int), f"Key '{key}' must be an integer"
405 # Keys must exist and be lists
406 list_keys = ['activation', 'features_list', 'dropout', 'time_fracs']
407 for key in list_keys:
408 assert key in self, f"Missing required key: {key}"
409 assert isinstance(self[key], list), f"Key '{key}' must be a list"
411 # Keys must exist and be floats
412 float_keys = ['learning_rate']
413 for key in float_keys:
414 assert key in self, f"Missing required key: {key}"
415 assert isinstance(self[key], float), f"Key '{key}' must be a float"
417 print("Input dictionary passed all checks.")
418 def calc_param_shapes(self, verbose=True):
420 Calculates and updates the shapes of certain parameters based on input data.
422 Parameters:
423 -----------
424 verbose : bool, optional
425 If True, prints status messages. Default is True.
427 if verbose:
428 print("Calculating shape params based on features list, timesteps, and batch size")
429 print(f"Input Feature List: {self['features_list']}")
430 print(f"Input Timesteps: {self['timesteps']}")
431 print(f"Input Batch Size: {self['batch_size']}")
433 n_features = len(self['features_list'])
434 batch_shape = (self["batch_size"], self["timesteps"], n_features)
435 if verbose:
436 print("Calculated params:")
437 print(f"Number of features: {n_features}")
438 print(f"Batch Shape: {batch_shape}")
440 # Update the dictionary
441 super().update({
442 'n_features': n_features,
443 'batch_shape': batch_shape
445 if verbose:
446 print(self)
448 def update(self, *args, verbose=True, **kwargs):
450 Overwrites the standard update functon from dict. This is to prevent certain keys from being modified directly and to automatically update keys to be compatible with each other. The keys handled relate to the shape of the input data to the RNN.
452 Parameters:
453 -----------
454 verbose : bool, optional
455 If True, prints status messages. Default is True.
457 # Prevent updating n_features and batch_shape
458 restricted_keys = {'n_features', 'batch_shape'}
459 keys_to_check = {'features_list', 'timesteps', 'batch_size'}
461 # Check for restricted keys in args
462 if args:
463 if isinstance(args[0], dict):
464 if restricted_keys & args[0].keys():
465 raise KeyError(f"Cannot directly update keys: {restricted_keys & args[0].keys()}, \n Instead update one of: {keys_to_check}")
466 elif isinstance(args[0], (tuple, list)) and all(isinstance(i, tuple) and len(i) == 2 for i in args[0]):
467 if restricted_keys & {k for k, v in args[0]}:
468 raise KeyError(f"Cannot directly update keys: {restricted_keys & {k for k, v in args[0]}}, \n Instead update one of: {keys_to_check}")
470 # Check for restricted keys in kwargs
471 if restricted_keys & kwargs.keys():
472 raise KeyError(f"Cannot update restricted keys: {restricted_keys & kwargs.keys()}")
475 # Track if specific keys are updated
476 keys_updated = set()
478 # Update using the standard dict update method
479 if args:
480 if isinstance(args[0], dict):
481 keys_updated.update(args[0].keys() & keys_to_check)
482 elif isinstance(args[0], (tuple, list)) and all(isinstance(i, tuple) and len(i) == 2 for i in args[0]):
483 keys_updated.update(k for k, v in args[0] if k in keys_to_check)
485 if kwargs:
486 keys_updated.update(kwargs.keys() & keys_to_check)
488 # Call the parent update method
489 super().update(*args, **kwargs)
491 # Recalculate shapes if necessary
492 if keys_updated:
493 self.calc_param_shapes(verbose=verbose)
496 ## Class for handling input data
497 class RNNData(dict):
499 A custom dictionary class for managing RNN data, with validation, scaling, and train-test splitting functionality.
500 """
501 required_keys = {"loc", "time", "X", "y", "features_list"}
502 def __init__(self, input_dict, scaler=None, features_list=None):
504 Initializes the RNNData instance, performs checks, and prepares data.
506 Parameters:
507 -----------
508 input_dict : dict
509 A dictionary containing the initial data.
510 scaler : str, optional
511 The name of the scaler to be used (e.g., 'minmax', 'standard'). Default is None.
512 features_list : list, optional
513 A subset of features to be used. Default is None which means all features.
516 # Copy to avoid changing external input
517 input_data = input_dict.copy()
518 # Initialize inherited dict class
519 super().__init__(input_data)
521 # Check if input data is one timeseries dataset or multiple
522 if type(self.loc['STID']) == str:
523 self.spatial = False
524 print("Input data is single timeseries.")
525 elif type(self.loc['STID']) == list:
526 self.spatial = True
527 print("Input data from multiple timeseries.")
528 else:
529 raise KeyError(f"Input locations not list or single string")
531 # Set up Data Scaling
532 self.scaler = None
533 if scaler is not None:
534 self.set_scaler(scaler)
536 # Rename and define other stuff.
537 if self.spatial:
538 self['hours'] = min(arr.shape[0] for arr in self.y)
539 else:
540 self['hours'] = len(self['y'])
542 self['all_features_list'] = self.pop('features_list')
543 if features_list is None:
544 print("Using all input features.")
545 self.features_list = self.all_features_list
546 else:
547 self.features_list = features_list
548 # self.run_checks()
549 self.__dict__.update(self)
551 # TODO: Fix checks for multilocation
552 def run_checks(self, verbose=True):
554 Validates that required keys are present and checks the integrity of data shapes.
556 Parameters:
557 -----------
558 verbose : bool, optional
559 If True, prints status messages. Default is True.
560 """
561 missing_keys = self.required_keys - self.keys()
562 if missing_keys:
563 raise KeyError(f"Missing required keys: {missing_keys}")
564 # # Check y 1-d
565 # y_shape = np.shape(self.y)
566 # if not (len(y_shape) == 1 or (len(y_shape) == 2 and y_shape[1] == 1)):
567 # raise ValueError(f"'y' must be one-dimensional, with shape (N,) or (N, 1). Current shape is {y_shape}.")
569 # # Check if 'hours' is provided and matches len(y)
570 # if 'hours' in self:
571 # if self.hours != len(self.y):
572 # raise ValueError(f"Provided 'hours' value {self.hours} does not match the length of 'y', which is {len(self.y)}.")
573 # Check desired subset of features is in all input features
574 if not all_items_exist(self.features_list, self.all_features_list):
575 raise ValueError(f"Provided 'features_list' {self.features_list} has elements not in input features.")
576 def set_scaler(self, scaler):
578 Sets the scaler to be used for data normalization.
580 Parameters:
581 -----------
582 scaler : str
583 The name of the scaler (e.g., 'minmax', 'standard').
584 """
585 recognized_scalers = ['minmax', 'standard']
586 if scaler in recognized_scalers:
587 print(f"Setting data scaler: {scaler}")
588 self.scaler = scalers[scaler]
589 else:
590 raise ValueError(f"Unrecognized scaler '{scaler}'. Recognized scalers are: {recognized_scalers}.")
591 # def train_test_split(self, train_frac, val_frac=0.0, subset_features=True, features_list=None, split_space=False, verbose=True):
592 # """
593 # Splits the data into training, validation, and test sets.
595 # Parameters:
596 # -----------
597 # train_frac : float
598 # The fraction of data to be used for training.
599 # val_frac : float, optional
600 # The fraction of data to be used for validation. Default is 0.0.
601 # subset_features : bool, optional
602 # If True, subsets the data to the specified features list. Default is True.
603 # features_list : list, optional
604 # A list of features to use for subsetting. Default is None.
605 # split_space : bool, optional
606 # Whether to split the data based on space. Default is False.
607 # verbose : bool, optional
608 # If True, prints status messages. Default is True.
609 # """
610 # # Indicate whether multi timeseries or not
611 # spatial = self.spatial
613 # # Extract data to desired features, copy to avoid changing input objects
614 # X = self.X.copy()
615 # y = self.y.copy()
616 # if subset_features:
617 # if verbose and self.features_list != self.all_features_list:
618 # print(f"Subsetting input data to features_list: {self.features_list}")
619 # # Indices to subset all features with based on params features
620 # indices = []
621 # for item in self.features_list:
622 # if item in self.all_features_list:
623 # indices.append(self.all_features_list.index(item))
624 # else:
625 # print(f"Warning: feature name '{item}' not found in list of all features from input data")
626 # if spatial:
627 # X = [Xi[:, indices] for Xi in X]
628 # else:
629 # X = X[:, indices]
631 # # Setup train/test in time
632 # train_ind = int(np.floor(self.hours * train_frac)); self.train_ind = train_ind
633 # test_ind= int(train_ind + round(self.hours * val_frac)); self.test_ind = test_ind
635 # # Check for any potential issues with indices
636 # if test_ind > self.hours:
637 # print(f"Setting test index to {self.hours}")
638 # test_ind = self.hours
639 # if train_ind >= test_ind:
640 # raise ValueError("Train index must be less than test index.")
642 # # Training data from 0 to train_ind
643 # # Validation data from train_ind to test_ind
644 # # Test data from test_ind to end
645 # if spatial:
646 # self.X_train = [Xi[:train_ind] for Xi in X]
647 # self.y_train = [yi[:train_ind].reshape(-1,1) for yi in y]
648 # if val_frac >0:
649 # self.X_val = [Xi[train_ind:test_ind] for Xi in X]
650 # self.y_val = [yi[train_ind:test_ind].reshape(-1,1) for yi in y]
651 # self.X_test = [Xi[test_ind:] for Xi in X]
652 # self.y_test = [yi[test_ind:].reshape(-1,1) for yi in y]
653 # else:
654 # self.X_train = X[:train_ind]
655 # self.y_train = y[:train_ind].reshape(-1,1) # assumes y 1-d, change this if vector output
656 # if val_frac >0:
657 # self.X_val = X[train_ind:test_ind]
658 # self.y_val = y[train_ind:test_ind].reshape(-1,1) # assumes y 1-d, change this if vector output
659 # self.X_test = X[test_ind:]
660 # self.y_test = y[test_ind:].reshape(-1,1) # assumes y 1-d, change this if vector output
664 # # Print statements if verbose
665 # if verbose:
666 # print(f"Train index: 0 to {train_ind}")
667 # print(f"Validation index: {train_ind} to {test_ind}")
668 # print(f"Test index: {test_ind} to {self.hours}")
670 # if spatial:
671 # print(f"X_train[0] shape: {self.X_train[0].shape}, y_train[0] shape: {self.y_train[0].shape}")
672 # print(f"X_val[0] shape: {self.X_val[0].shape}, y_val[0] shape: {self.y_val[0].shape}")
673 # print(f"X_test[0] shape: {self.X_test[0].shape}, y_test[0] shape: {self.y_test[0].shape}")
674 # else:
675 # print(f"X_train shape: {self.X_train.shape}, y_train shape: {self.y_train.shape}")
676 # print(f"X_val shape: {self.X_val.shape}, y_val shape: {self.y_val.shape}")
677 # print(f"X_test shape: {self.X_test.shape}, y_test shape: {self.y_test.shape}")
678 def train_test_split(self, time_fracs=[1.,0.,0.], space_fracs=[1.,0.,0.], subset_features=True, features_list=None, verbose=True):
680 Splits the data into training, validation, and test sets.
682 Parameters:
683 -----------
684 train_frac : float
685 The fraction of data to be used for training.
686 val_frac : float, optional
687 The fraction of data to be used for validation. Default is 0.0.
688 subset_features : bool, optional
689 If True, subsets the data to the specified features list. Default is True.
690 features_list : list, optional
691 A list of features to use for subsetting. Default is None.
692 split_space : bool, optional
693 Whether to split the data based on space. Default is False.
694 verbose : bool, optional
695 If True, prints status messages. Default is True.
697 # Indicate whether multi timeseries or not
698 spatial = self.spatial
700 # Set up
701 assert np.sum(time_fracs) == np.sum(space_fracs) == 1., f"Provided cross validation params don't sum to 1"
702 if (len(time_fracs) != 3) or (len(space_fracs) != 3):
703 raise ValueError("Cross-validation params `time_fracs` and `space_fracs` must be lists of length 3, representing (train/validation/test)")
705 train_frac = time_fracs[0]
706 val_frac = time_fracs[1]
707 test_frac = time_fracs[2]
709 # Setup train/val/test in time
710 train_ind = int(np.floor(self.hours * train_frac)); self.train_ind = train_ind
711 test_ind= int(train_ind + round(self.hours * val_frac)); self.test_ind = test_ind
712 # Check for any potential issues with indices
713 if test_ind > self.hours:
714 print(f"Setting test index to {self.hours}")
715 test_ind = self.hours
716 if train_ind > test_ind:
717 raise ValueError("Train index must be less than test index.")
719 # Setup train/val/test in space
720 if spatial:
721 train_frac_sp = space_fracs[0]
722 val_frac_sp = space_fracs[1]
723 locs = np.arange(len(self.loc['STID'])) # indices of locations
724 train_size = int(len(locs) * train_frac_sp)
725 val_size = int(len(locs) * val_frac_sp)
726 random.shuffle(locs)
727 train_locs = locs[:train_size]
728 val_locs = locs[train_size:train_size + val_size]
729 test_locs = locs[train_size + val_size:]
730 # Store Lists of IDs in loc subdirectory
731 self.loc['train_locs'] = [self.case[i] for i in train_locs]
732 self.loc['val_locs'] = [self.case[i] for i in val_locs]
733 self.loc['test_locs'] = [self.case[i] for i in test_locs]
736 # Extract data to desired features, copy to avoid changing input objects
737 X = self.X.copy()
738 y = self.y.copy()
739 if subset_features:
740 if verbose and self.features_list != self.all_features_list:
741 print(f"Subsetting input data to features_list: {self.features_list}")
742 # Indices to subset all features with based on params features
743 indices = []
744 for item in self.features_list:
745 if item in self.all_features_list:
746 indices.append(self.all_features_list.index(item))
747 else:
748 print(f"Warning: feature name '{item}' not found in list of all features from input data")
749 if spatial:
750 X = [Xi[:, indices] for Xi in X]
751 else:
752 X = X[:, indices]
754 # Training data from 0 to train_ind
755 # Validation data from train_ind to test_ind
756 # Test data from test_ind to end
757 if spatial:
758 X_train = [X[i] for i in train_locs]
759 X_val = [X[i] for i in val_locs]
760 X_test = [X[i] for i in test_locs]
761 y_train = [y[i] for i in train_locs]
762 y_val = [y[i] for i in val_locs]
763 y_test = [y[i] for i in test_locs]
765 self.X_train = [Xi[:train_ind] for Xi in X_train]
766 self.y_train = [yi[:train_ind].reshape(-1,1) for yi in y_train]
767 if (val_frac >0) and (val_frac_sp)>0:
768 self.X_val = [Xi[train_ind:test_ind] for Xi in X_val]
769 self.y_val = [yi[train_ind:test_ind].reshape(-1,1) for yi in y_val]
770 self.X_test = [Xi[test_ind:] for Xi in X_test]
771 self.y_test = [yi[test_ind:].reshape(-1,1) for yi in y_test]
772 else:
773 self.X_train = X[:train_ind]
774 self.y_train = y[:train_ind].reshape(-1,1) # assumes y 1-d, change this if vector output
775 if val_frac >0:
776 self.X_val = X[train_ind:test_ind]
777 self.y_val = y[train_ind:test_ind].reshape(-1,1) # assumes y 1-d, change this if vector output
778 self.X_test = X[test_ind:]
779 self.y_test = y[test_ind:].reshape(-1,1) # assumes y 1-d, change this if vector output
783 # Print statements if verbose
784 if verbose:
785 print(f"Train index: 0 to {train_ind}")
786 print(f"Validation index: {train_ind} to {test_ind}")
787 print(f"Test index: {test_ind} to {self.hours}")
789 if spatial:
790 print("Subsetting locations into train/val/test")
791 print(f"Total Locations: {len(locs)}")
792 print(f"Train Locations: {len(train_locs)}")
793 print(f"Val. Locations: {len(val_locs)}")
794 print(f"Test Locations: {len(test_locs)}")
795 print(f"X_train[0] shape: {self.X_train[0].shape}, y_train[0] shape: {self.y_train[0].shape}")
796 print(f"X_val[0] shape: {self.X_val[0].shape}, y_val[0] shape: {self.y_val[0].shape}")
797 print(f"X_test[0] shape: {self.X_test[0].shape}, y_test[0] shape: {self.y_test[0].shape}")
798 else:
799 print(f"X_train shape: {self.X_train.shape}, y_train shape: {self.y_train.shape}")
800 if hasattr(self, "X_val"):
801 print(f"X_val shape: {self.X_val.shape}, y_val shape: {self.y_val.shape}")
802 print(f"X_test shape: {self.X_test.shape}, y_test shape: {self.y_test.shape}")
803 def scale_data(self, verbose=True):
805 Scales the training data using the set scaler.
807 Parameters:
808 -----------
809 verbose : bool, optional
810 If True, prints status messages. Default is True.
811 """
812 # Indicate whether multi timeseries or not
813 spatial = self.spatial
814 if self.scaler is None:
815 raise ValueError("Scaler is not set. Use 'set_scaler' method to set a scaler before scaling data.")
816 # if hasattr(self.scaler, 'n_features_in_'):
817 # warnings.warn("Scale_data has already been called. Exiting to prevent issues.")
818 # return
819 if not hasattr(self, "X_train"):
820 raise AttributeError("No X_train within object. Run train_test_split first. This is to avoid fitting the scaler with prediction data.")
821 if verbose:
822 print(f"Scaling training data with scaler {self.scaler}, fitting on X_train")
824 if spatial:
825 # Fit scaler on row-joined training data
826 self.scaler.fit(np.vstack(self.X_train))
827 # Transform data using fitted scaler
828 self.X_train = [self.scaler.transform(Xi) for Xi in self.X_train]
829 if hasattr(self, 'X_val'):
830 self.X_val = [self.scaler.transform(Xi) for Xi in self.X_val]
831 self.X_test = [self.scaler.transform(Xi) for Xi in self.X_test]
832 else:
833 # Fit the scaler on the training data
834 self.scaler.fit(self.X_train)
835 # Transform the data using the fitted scaler
836 self.X_train = self.scaler.transform(self.X_train)
837 if hasattr(self, 'X_val'):
838 self.X_val = self.scaler.transform(self.X_val)
839 self.X_test = self.scaler.transform(self.X_test)
841 # NOTE: only works for non spatial
842 def scale_all_X(self, verbose=True):
844 Scales the all data using the set scaler.
846 Parameters:
847 -----------
848 verbose : bool, optional
849 If True, prints status messages. Default is True.
850 Returns:
851 -------
852 ndarray
853 Scaled X matrix, subsetted to features_list.
854 """
855 if self.spatial:
856 raise ValueError("Not implemented for spatial data")
858 if self.scaler is None:
859 raise ValueError("Scaler is not set. Use 'set_scaler' method to set a scaler before scaling data.")
860 if verbose:
861 print(f"Scaling all X data with scaler {self.scaler}, fitted on X_train")
862 # Subset features
863 indices = []
864 for item in self.features_list:
865 if item in self.all_features_list:
866 indices.append(self.all_features_list.index(item))
867 else:
868 print(f"Warning: feature name '{item}' not found in list of all features from input data")
869 X = self.X[:, indices]
870 X = self.scaler.transform(X)
872 return X
874 def inverse_scale(self, return_X = 'all_hours', save_changes=False, verbose=True):
876 Inversely scales the data to its original form.
878 Parameters:
879 -----------
880 return_X : str, optional
881 Specifies what data to return after inverse scaling. Default is 'all_hours'.
882 save_changes : bool, optional
883 If True, updates the internal data with the inversely scaled values. Default is False.
884 verbose : bool, optional
885 If True, prints status messages. Default is True.
886 """
887 if verbose:
888 print("Inverse scaling data...")
889 X_train = self.scaler.inverse_transform(self.X_train)
890 X_val = self.scaler.inverse_transform(self.X_val)
891 X_test = self.scaler.inverse_transform(self.X_test)
893 if save_changes:
894 print("Inverse transformed data saved")
895 self.X_train = X_train
896 self.X_val = X_val
897 self.X_test = X_test
898 else:
899 if verbose:
900 print("Inverse scaled, but internal data not changed.")
901 if verbose:
902 print(f"Attempting to return {return_X}")
903 if return_X == "all_hours":
904 return np.concatenate((X_train, X_val, X_test), axis=0)
905 else:
906 print(f"Unrecognized or unimplemented return value {return_X}")
907 def batch_reshape(self, timesteps, batch_size, hours=None, verbose=False):
909 Restructures input data to RNN using batches and sequences.
911 Parameters:
912 ----------
913 batch_size : int
914 The size of each training batch to reshape the data.
915 timesteps : int
916 The number of timesteps or sequence length. Consistitutes a single sample
917 timesteps : int
918 Number of timesteps or sequence length used for a single sequence in RNN training. Constitutes a single sample to the model
920 batch_size : int
921 Number of sequences used within a batch of training
923 Returns:
924 -------
925 None
926 This method reshapes the data in place.
927 Raises:
928 ------
929 AttributeError
930 If either 'X_train' or 'y_train' attributes do not exist within the instance.
932 Notes:
933 ------
934 The reshaping method depends on self param "spatial".
935 - spatial == False: Reshapes data assuming no spatial dimensions.
936 - spatial == True: Reshapes data considering spatial dimensions.
940 if not hasattr(self, 'X_train') or not hasattr(self, 'y_train'):
941 raise AttributeError("Both 'X_train' and 'y_train' must be set before reshaping batches.")
943 # Indicator of spatial training scheme or not
944 spatial = self.spatial
946 if spatial:
947 print(f"Reshaping spatial training data using batch size: {batch_size} and timesteps: {timesteps}")
948 self.X_train, self.y_train, self.n_seqs = staircase_spatial(self.X_train, self.y_train, timesteps = timesteps, batch_size=batch_size, hours=hours, verbose=verbose)
949 if hasattr(self, "X_val"):
950 print(f"Reshaping validation data using batch size: {batch_size} and timesteps: {timesteps}")
951 self.X_val, self.y_val, _ = staircase_spatial(self.X_val, self.y_val, timesteps = timesteps, batch_size=batch_size, hours=None, verbose=verbose)
952 else:
953 print(f"Reshaping training data using batch size: {batch_size} and timesteps: {timesteps}")
954 self.X_train, self.y_train = staircase_2(self.X_train, self.y_train, timesteps = timesteps, batch_size=batch_size, verbose=verbose)
955 if hasattr(self, "X_val"):
956 print(f"Reshaping validation data using batch size: {batch_size} and timesteps: {timesteps}")
957 self.X_val, self.y_val = staircase_2(self.X_val, self.y_val, timesteps = timesteps, batch_size=batch_size, verbose=verbose)
959 def print_hashes(self, attrs_to_check = ['X', 'y', 'X_train', 'y_train', 'X_val', 'y_val', 'X_test', 'y_test']):
961 Prints the hash of specified data attributes.
963 Parameters:
964 -----------
965 attrs_to_check : list, optional
966 A list of attribute names to hash and print. Default includes 'X', 'y', and split data.
968 for attr in attrs_to_check:
969 if hasattr(self, attr):
970 value = getattr(self, attr)
971 if self.spatial:
972 pass
973 else:
974 print(f"Hash of {attr}: {hash_ndarray(value)}")
975 def __getattr__(self, key):
977 Allows attribute-style access to dictionary keys, a.k.a. enables the "." operator for get elements
978 """
979 try:
980 return self[key]
981 except KeyError:
982 raise AttributeError(f"'rnn_data' object has no attribute '{key}'")
984 def __setitem__(self, key, value):
986 Ensures dictionary and attribute updates stay in sync for required keys.
987 """
988 super().__setitem__(key, value) # Update the dictionary
989 if key in self.required_keys:
990 super().__setattr__(key, value) # Ensure the attribute is updated as well
992 def __setattr__(self, key, value):
994 Ensures dictionary keys are updated when setting attributes.
996 self[key] = value
999 # Function to check reproduciblity hashes, environment info, and model parameters
1000 def check_reproducibility(dict0, params, m_hash, w_hash):
1002 Performs reproducibility checks on a model by comparing current settings and outputs with stored reproducibility information.
1004 Parameters:
1005 -----------
1006 dict0 : dict
1007 The data dictionary that should contain reproducibility information under the 'repro_info' attribute.
1008 params : dict
1009 The current model parameters to be checked against the reproducibility information.
1010 m_hash : str
1011 The hash of the current model predictions.
1012 w_hash : str
1013 The hash of the current fitted model weights.
1015 Returns:
1016 --------
1017 None
1018 The function returns None. It issues warnings if any reproducibility checks fail.
1020 Notes:
1021 ------
1022 - Checks are only performed if the `dict0` contains the 'repro_info' attribute.
1023 - Issues warnings for mismatches in model weights, predictions, Python version, TensorFlow version, and model parameters.
1024 - Skips checks if physics-based initialization is used (not implemented).
1025 """
1026 if not hasattr(dict0, "repro_info"):
1027 warnings.warn("The provided data dictionary does not have the required 'repro_info' attribute. Not running reproduciblity checks.")
1028 return
1030 repro_info = dict0.repro_info
1031 # Check Hashes
1032 if params['phys_initialize']:
1033 hashes = repro_info['phys_initialize']
1034 warnings.warn("Physics Initialization not implemented yet. Not running reproduciblity checks.")
1035 else:
1036 hashes = repro_info['rand_initialize']
1037 print(f"Fitted weights hash: {w_hash} \n Reproducibility weights hash: {hashes['fitted_weights_hash']}")
1038 print(f"Model predictions hash: {m_hash} \n Reproducibility preds hash: {hashes['preds_hash']}")
1039 if (w_hash != hashes['fitted_weights_hash']) or (m_hash != hashes['preds_hash']):
1040 if w_hash != hashes['fitted_weights_hash']:
1041 warnings.warn("The fitted weights hash does not match the reproducibility weights hash.")
1042 if m_hash != hashes['preds_hash']:
1043 warnings.warn("The predictions hash does not match the reproducibility predictions hash.")
1044 else:
1045 print("***Reproducibility Checks passed - model weights and model predictions match expected.***")
1047 # Check Environment
1048 current_py_version = sys.version[0:6]
1049 current_tf_version = tf.__version__
1050 if current_py_version != repro_info['env_info']['py_version']:
1051 warnings.warn(f"Python version mismatch: Current Python version is {current_py_version}, "
1052 f"expected {repro_info['env_info']['py_version']}.")
1054 if current_tf_version != repro_info['env_info']['tf_version']:
1055 warnings.warn(f"TensorFlow version mismatch: Current TensorFlow version is {current_tf_version}, "
1056 f"expected {repro_info['env_info']['tf_version']}.")
1058 # Check Params
1059 repro_params = repro_info.get('params', {})
1061 for key, repro_value in repro_params.items():
1062 if key in params:
1063 if params[key] != repro_value:
1064 warnings.warn(f"Parameter mismatch for '{key}': Current value is {params[key]}, "
1065 f"repro value is {repro_value}.")
1066 else:
1067 warnings.warn(f"Parameter '{key}' is missing in the current params.")
1069 return
1071 class RNNModel(ABC):
1073 Abstract base class for RNN models, providing structure for training, predicting, and running reproducibility checks.
1075 def __init__(self, params: dict):
1077 Initializes the RNNModel with the given parameters.
1079 Parameters:
1080 -----------
1081 params : dict
1082 A dictionary containing model parameters.
1084 self.params = params
1085 if type(self) is RNNModel:
1086 raise TypeError("MLModel is an abstract class and cannot be instantiated directly")
1087 super().__init__()
1089 @abstractmethod
1090 def _build_model_train(self):
1091 """Abstract method to build the training model."""
1092 pass
1094 @abstractmethod
1095 def _build_model_predict(self, return_sequences=True):
1096 """Abstract method to build the prediction model. This model copies weights from the train model but with input structure that allows for easier prediction of arbitrary length timeseries. This model is not to be used for training, or don't use with .fit calls"""
1097 pass
1099 def is_stateful(self):
1101 Checks whether any of the layers in the internal model (self.model_train) are stateful.
1103 Returns:
1104 bool: True if at least one layer in the model is stateful, False otherwise.
1106 This method iterates over all the layers in the model and checks if any of them
1107 have the 'stateful' attribute set to True. This is useful for determining if
1108 the model is designed to maintain state across batches during training.
1110 Example:
1111 --------
1112 model.is_stateful()
1113 """
1114 for layer in self.model_train.layers:
1115 if hasattr(layer, 'stateful') and layer.stateful:
1116 return True
1117 return False
1119 def fit(self, X_train, y_train, plot_history=True, plot_title = '',
1120 weights=None, callbacks=[], validation_data=None, return_epochs=False, *args, **kwargs):
1122 Trains the model on the provided training data. Uses the fit method of the training model and then copies the weights over to the prediction model, which has a less restrictive input shape. Formats a list of callbacks to use within the fit method based on params input
1124 Parameters:
1125 -----------
1126 X_train : np.ndarray
1127 The input matrix data for training.
1128 y_train : np.ndarray
1129 The target vector data for training.
1130 plot_history : bool, optional
1131 If True, plots the training history. Default is True.
1132 plot_title : str, optional
1133 The title for the training plot. Default is an empty string.
1134 weights : optional
1135 Initial weights for the model. Default is None.
1136 callbacks : list, optional
1137 A list of callback functions to use during training. Default is an empty list.
1138 validation_data : tuple, optional
1139 Validation data to use during training, expected format (X_val, y_val). Default is None.
1140 return_epochs : bool
1141 If True, return the number of epochs that training took. Used to test and optimize early stopping
1142 """
1143 # verbose_fit argument is for printing out update after each epoch, which gets very long
1144 verbose_fit = self.params['verbose_fit']
1145 verbose_weights = self.params['verbose_weights']
1146 if verbose_weights:
1147 print(f"Training simple RNN with params: {self.params}")
1149 # Setup callbacks
1150 if self.params["reset_states"]:
1151 callbacks=callbacks+[ResetStatesCallback(self.params), TerminateOnNaN()]
1153 # Early stopping callback requires validation data
1154 if validation_data is not None:
1155 X_val, y_val =validation_data[0], validation_data[1]
1156 print("Using early stopping callback.")
1157 early_stop = EarlyStoppingCallback(patience = self.params['early_stopping_patience'])
1158 callbacks=callbacks+[early_stop]
1159 if verbose_weights:
1160 print(f"Formatted X_train hash: {hash_ndarray(X_train)}")
1161 print(f"Formatted y_train hash: {hash_ndarray(y_train)}")
1162 if validation_data is not None:
1163 print(f"Formatted X_val hash: {hash_ndarray(X_val)}")
1164 print(f"Formatted y_val hash: {hash_ndarray(y_val)}")
1165 print(f"Initial weights before training hash: {hash_weights(self.model_train)}")
1167 ## TODO: Hidden State Initialization
1168 # Evaluate Model once to set nonzero initial state
1169 # self.model_train(X_train[0:self.params['batch_size'],:,:])
1171 if validation_data is not None:
1172 history = self.model_train.fit(
1173 X_train, y_train,
1174 epochs=self.params['epochs'],
1175 batch_size=self.params['batch_size'],
1176 callbacks = callbacks,
1177 verbose=verbose_fit,
1178 validation_data = (X_val, y_val),
1179 *args, **kwargs
1181 else:
1182 history = self.model_train.fit(
1183 X_train, y_train,
1184 epochs=self.params['epochs'],
1185 batch_size=self.params['batch_size'],
1186 callbacks = callbacks,
1187 verbose=verbose_fit,
1188 *args, **kwargs
1191 if plot_history:
1192 self.plot_history(history,plot_title)
1194 if self.params["verbose_weights"]:
1195 print(f"Fitted Weights Hash: {hash_weights(self.model_train)}")
1197 # Update Weights for Prediction Model
1198 w_fitted = self.model_train.get_weights()
1199 self.model_predict.set_weights(w_fitted)
1201 if return_epochs:
1202 # Epoch counting starts at 0, adding 1 for the count
1203 return early_stop.best_epoch + 1
1205 def predict(self, X_test):
1207 Generates predictions on the provided test data using the internal prediction model.
1209 Parameters:
1210 -----------
1211 X_test : np.ndarray
1212 The input data for generating predictions.
1214 Returns:
1215 --------
1216 np.ndarray
1217 The predicted values.
1218 """
1219 print("Predicting test data")
1220 X_test = self._format_pred_data(X_test)
1221 preds = self.model_predict.predict(X_test).flatten()
1222 return preds
1225 def _format_pred_data(self, X):
1227 Formats the prediction data for RNN input.
1229 Parameters:
1230 -----------
1231 X : np.ndarray
1232 The input data.
1234 Returns:
1235 --------
1236 np.ndarray
1237 The formatted input data.
1238 """
1239 return np.reshape(X,(1, X.shape[0], self.params['n_features']))
1241 def plot_history(self, history, plot_title, create_figure=True):
1243 Plots the training history. Uses log scale on y axis for readability.
1245 Parameters:
1246 -----------
1247 history : History object
1248 The training history object from model fitting. Output of keras' .fit command
1249 plot_title : str
1250 The title for the plot.
1253 if create_figure:
1254 plt.figure(figsize=(10, 6))
1255 plt.semilogy(history.history['loss'], label='Training loss')
1256 if 'val_loss' in history.history:
1257 plt.semilogy(history.history['val_loss'], label='Validation loss')
1258 plt.title(f'{plot_title} Model loss')
1259 plt.ylabel('Loss')
1260 plt.xlabel('Epoch')
1261 plt.legend(loc='upper left')
1262 plt.show()
1264 def run_model(self, dict0, reproducibility_run=False, plot_period='all', save_outputs=True, return_epochs=False):
1266 Runs the RNN model on input data dictionary, including training, prediction, and reproducibility checks.
1268 Parameters:
1269 -----------
1270 dict0 : RNNData (dict)
1271 The dictionary containing the input data and configuration.
1272 reproducibility_run : bool, optional
1273 If True, performs reproducibility checks after running the model. Default is False.
1274 save_outputs : bool
1275 If True, writes model outputs into input dictionary.
1276 return_epochs : bool
1277 If True, returns how many epochs of training happened. Used to optimize params related to early stopping
1279 Returns:
1280 --------
1281 tuple
1282 Model predictions and a dictionary of RMSE errors broken up by time period.
1283 """
1284 verbose_fit = self.params['verbose_fit']
1285 verbose_weights = self.params['verbose_weights']
1286 if verbose_weights:
1287 dict0.print_hashes()
1288 # Extract Datasets
1289 X_train, y_train, X_test, y_test = dict0.X_train, dict0.y_train, dict0.X_test, dict0.y_test
1290 if 'X_val' in dict0:
1291 X_val, y_val = dict0.X_val, dict0.y_val
1292 else:
1293 X_val = None
1294 if dict0.spatial:
1295 case_id = "Spatial Training Set"
1296 else:
1297 case_id = dict0.case
1299 # Fit model
1300 if X_val is None:
1301 eps = self.fit(X_train, y_train, plot_title=case_id, return_epochs=return_epochs)
1302 else:
1303 eps = self.fit(X_train, y_train, validation_data = (X_val, y_val), plot_title=case_id, return_epochs=return_epochs)
1305 # Generate Predictions and Evaluate Test Error
1306 if dict0.spatial:
1307 m, errs = self._eval_multi(dict0)
1308 if save_outputs:
1309 dict0['m']=m
1310 else:
1311 m, errs = self._eval_single(dict0, verbose_weights, reproducibility_run)
1312 if save_outputs:
1313 dict0['m']=m
1314 plot_data(dict0, title="RNN", title2=dict0.case, plot_period=plot_period)
1316 if return_epochs:
1317 return m, errs, eps
1318 else:
1319 return m, errs
1321 def _eval_single(self, dict0, verbose_weights, reproducibility_run):
1322 # Generate Predictions,
1323 # run through training to get hidden state set properly for forecast period
1324 print(f"Running prediction on all input data, Training through Test")
1325 X = dict0.scale_all_X()
1326 y = dict0.y.flatten()
1327 # Predict
1328 if verbose_weights:
1329 print(f"All X hash: {hash_ndarray(X)}")
1331 m = self.predict(X).flatten()
1332 if verbose_weights:
1333 print(f"Predictions Hash: {hash_ndarray(m)}")
1335 if reproducibility_run:
1336 print("Checking Reproducibility")
1337 check_reproducibility(dict0, self.params, hash_ndarray(m), hash_weights(self.model_predict))
1339 # print(dict0.keys())
1340 # Plot final fit and data
1341 # dict0['y'] = y
1342 # plot_data(dict0, title="RNN", title2=dict0['case'], plot_period=plot_period)
1344 # Calculate Errors
1345 err = rmse(m, y)
1346 train_ind = dict0.train_ind # index of final training set value
1347 test_ind = dict0.test_ind # index of first test set value
1349 err_train = rmse(m[:train_ind], y[:train_ind].flatten())
1350 err_pred = rmse(m[test_ind:], y[test_ind:].flatten())
1351 rmse_dict = {
1352 'all': err,
1353 'training': err_train,
1354 'prediction': err_pred
1356 return m, rmse_dict
1358 def _eval_multi(self, dict0):
1359 # Train Error: NOT DOING YET. DECIDE WHETHER THIS IS NEEDED
1361 # Test Error
1362 new_data = np.stack(dict0.X_test, axis=0)
1363 y_array = np.stack(dict0.y_test, axis=0)
1364 preds = self.model_predict.predict(new_data)
1366 # Calculate RMSE
1367 ## Note: not using util rmse function since this approach is for 3d arrays
1368 # Compute the squared differences
1369 squared_diff = np.square(preds - y_array)
1371 # Mean squared error along the timesteps and dimensions (axis 1 and 2)
1372 mse = np.mean(squared_diff, axis=(1, 2))
1374 # Root mean squared error (RMSE) for each timeseries
1375 rmses = np.sqrt(mse)
1377 return preds, rmses
1380 ## Callbacks
1382 # Helper functions for batch reset schedules
1383 def calc_exp_intervals(bmin, bmax, n_epochs, force_bmax = True):
1384 # Calculate the exponential intervals for each epoch
1385 epochs = np.arange(n_epochs)
1386 factors = epochs / n_epochs
1387 intervals = bmin * (bmax / bmin) ** factors
1388 if force_bmax:
1389 intervals[-1] = bmax # Ensure the last value is exactly bmax
1390 return intervals.astype(int)
1392 def calc_log_intervals(bmin, bmax, n_epochs, force_bmax = True):
1393 # Calculate the logarithmic intervals for each epoch
1394 epochs = np.arange(n_epochs)
1395 factors = np.log(1 + epochs) / np.log(1 + n_epochs)
1396 intervals = bmin + (bmax - bmin) * factors
1397 if force_bmax:
1398 intervals[-1] = bmax # Ensure the last value is exactly bmax
1399 return intervals.astype(int)
1401 class ResetStatesCallback(Callback):
1403 Custom callback to reset the states of RNN layers at the end of each epoch and optionally after a specified number of batches.
1405 Parameters:
1406 -----------
1407 batch_reset : int, optional
1408 If provided, resets the states of RNN layers after every `batch_reset` batches. Default is None.
1409 """
1410 # def __init__(self, bmin=None, bmax=None, epochs=None, loc_batch_reset = None, batch_schedule_type='linear', verbose=True):
1411 def __init__(self, params=None, verbose=True):
1413 Initializes the ResetStatesCallback with an optional batch reset interval.
1415 Parameters:
1416 -----------
1417 params: dict, optional
1418 Dictionary of parameters. If None provided, only on_epoch_end will trigger reset of hidden states.
1419 - bmin : int
1420 Minimum for batch reset schedule
1421 - bmax : int
1422 Maximum for batch reset schedule
1423 - epochs : int
1424 Number of training epochs.
1425 - loc_batch_reset : int
1426 Interval of batches after which to reset the states of RNN layers for location changes. Triggers reset for training AND validation phases
1427 - batch_schedule_type : str
1428 Type of batch scheduling to be used. Recognized methods are following:
1429 - 'constant' : Used fixed batch reset interval throughout training
1430 - 'linear' : Increases the batch reset interval linearly over epochs from bmin to bmax.
1431 - 'exp' : Increases the batch reset interval exponentially over epochs from bmin to bmax.
1432 - 'log' : Increases the batch reset interval logarithmically over epochs from bmin to bmax.
1435 Returns:
1436 -----------
1437 Only in-place reset of hidden states of RNN that calls uses this callback.
1439 """
1440 super(ResetStatesCallback, self).__init__()
1442 # Check for optional arguments, set None if missing in input params
1443 arg_list = ['bmin', 'bmax', 'epochs', 'loc_batch_reset', 'batch_schedule_type']
1444 for arg in arg_list:
1445 setattr(self, arg, params.get(arg, None))
1447 self.verbose = verbose
1448 if self.verbose:
1449 print(f"Using ResetStatesCallback with Batch Reset Schedule: {self.batch_schedule_type}")
1450 # Calculate the reset intervals for each epoch during initialization
1451 if self.batch_schedule_type is not None:
1452 if self.epochs is None:
1453 raise ValueError(f"Arugment `epochs` cannot be none with self.batch_schedule_type: {self.batch_schedule_type}")
1454 self.batch_reset_intervals = self._calc_reset_intervals(self.batch_schedule_type)
1455 if self.verbose:
1456 print(f"batch_reset_intervals: {self.batch_reset_intervals}")
1457 else:
1458 self.batch_reset_intervals = None
1459 def on_epoch_end(self, epoch, logs=None):
1461 Resets the states of RNN layers at the end of each epoch.
1463 Parameters:
1464 -----------
1465 epoch : int
1466 The index of the current epoch.
1467 logs : dict, optional
1468 A dictionary containing metrics from the epoch. Default is None.
1469 """
1470 # print(f" Resetting hidden state after epoch: {epoch+1}", flush=True)
1471 # Iterate over each layer in the model
1472 for layer in self.model.layers:
1473 # Check if the layer has a reset_states method
1474 if hasattr(layer, 'reset_states'):
1475 layer.reset_states()
1476 def _calc_reset_intervals(self,batch_schedule_type):
1477 methods = ['constant', 'linear', 'exp', 'log']
1478 if batch_schedule_type not in methods:
1479 raise ValueError(f"Batch schedule method {batch_schedule_type} not recognized. \n Available methods: {methods}")
1480 if batch_schedule_type == "constant":
1482 return np.repeat(self.bmin, self.epochs).astype(int)
1483 elif batch_schedule_type == "linear":
1484 return np.linspace(self.bmin, self.bmax, self.epochs).astype(int)
1485 elif batch_schedule_type == "exp":
1486 return calc_exp_intervals(self.bmin, self.bmax, self.epochs)
1487 elif batch_schedule_type == "log":
1488 return calc_log_intervals(self.bmin, self.bmax, self.epochs)
1489 def on_epoch_begin(self, epoch, logs=None):
1490 # Set the reset interval for the current epoch
1491 if self.batch_reset_intervals is not None:
1492 self.current_batch_reset = self.batch_reset_intervals[epoch]
1493 else:
1494 self.current_batch_reset = None
1495 def on_train_batch_end(self, batch, logs=None):
1497 Resets the states of RNN layers during training after a specified number of batches, if `batch_reset` or `loc_batch_reset` are provided. The `batch_reset` is used for stability and to avoid exploding gradients at the beginning of training when a hidden state is being passed with weights that haven't learned yet. The `loc_batch_reset` is used to reset the states when a particular batch is from a new location and thus the hidden state should be passed.
1499 Parameters:
1500 -----------
1501 batch : int
1502 The index of the current batch.
1503 logs : dict, optional
1504 A dictionary containing metrics from the batch. Default is None.
1505 """
1506 batch_reset = self.current_batch_reset
1507 if (batch_reset is not None and batch % batch_reset == 0):
1508 # print(f" Resetting states after batch {batch + 1}")
1509 # Iterate over each layer in the model
1510 for layer in self.model.layers:
1511 # Check if the layer has a reset_states method
1512 if hasattr(layer, 'reset_states'):
1513 layer.reset_states()
1514 def on_test_batch_end(self, batch, logs=None):
1516 Resets the states of RNN layers during validation if `loc_batch_reset` is provided to demarcate a new location and thus avoid passing a hidden state to a wrong location.
1518 Parameters:
1519 -----------
1520 batch : int
1521 The index of the current batch.
1522 logs : dict, optional
1523 A dictionary containing metrics from the batch. Default is None.
1524 """
1525 loc_batch_reset = self.loc_batch_reset
1526 if (loc_batch_reset is not None and batch % loc_batch_reset == 0):
1527 # print(f"Resetting states in Validation mode after batch {batch + 1}")
1528 # Iterate over each layer in the model
1529 for layer in self.model.layers:
1530 # Check if the layer has a reset_states method
1531 if hasattr(layer, 'reset_states'):
1532 layer.reset_states()
1534 ## Learning Schedules
1535 ## NOT TESTED YET
1536 lr_schedule = tf.keras.optimizers.schedules.CosineDecay(
1537 initial_learning_rate=0.01,
1538 decay_steps=200,
1539 alpha=0.0,
1540 name='CosineDecay',
1541 # warmup_target=None,
1542 # warmup_steps=100
1546 def EarlyStoppingCallback(patience=5):
1548 Creates an EarlyStopping callback with the specified patience.
1550 Args:
1551 patience (int): Number of epochs with no improvement after which training will be stopped.
1553 Returns:
1554 EarlyStopping: Configured EarlyStopping callback.
1556 return EarlyStopping(
1557 monitor='val_loss',
1558 patience=patience,
1559 verbose=1,
1560 mode='min',
1561 restore_best_weights=True
1564 phys_params = {
1565 'DeltaE': [0,-1], # bias correction
1566 'T1': 0.1, # 1/fuel class (10)
1567 'fm_raise_vs_rain': 0.2 # fm increase per mm rain
1572 def get_initial_weights(model_fit,params,scale_fm=1):
1573 # Given a RNN architecture and hyperparameter dictionary, return array of physics-initiated weights
1574 # Inputs:
1575 # model_fit: output of create_RNN_2 with no training
1576 # params: (dict) dictionary of hyperparameters
1577 # rnn_dat: (dict) data dictionary, output of create_rnn_dat
1578 # Returns: numpy ndarray of weights that should be a rough solution to the moisture ODE
1579 DeltaE = phys_params['DeltaE']
1580 T1 = phys_params['T1']
1581 fmr = phys_params['fm_raise_vs_rain']
1582 centering = params['centering'] # shift activation down
1584 w0_initial={'Ed':(1.-np.exp(-T1))/2,
1585 'Ew':(1.-np.exp(-T1))/2,
1586 'rain':fmr * scale_fm} # wx - input feature
1587 # wh wb wd bd = bias -1
1589 w_initial=np.array([np.nan, np.exp(-0.1), DeltaE[0]/scale_fm, # layer 0
1590 1.0, -centering[0] + DeltaE[1]/scale_fm]) # layer 1
1591 if params['verbose_weights']:
1592 print('Equilibrium moisture correction bias',DeltaE[0],
1593 'in the hidden layer and',DeltaE[1],' in the output layer')
1595 w_name = ['wx','wh','bh','wd','bd']
1597 w=model_fit.get_weights()
1598 for j in range(w[0].shape[0]):
1599 feature = params['features_list'][j]
1600 for k in range(w[0].shape[1]):
1601 w[0][j][k]=w0_initial[feature]
1602 for i in range(1,len(w)): # number of the weight
1603 for j in range(w[i].shape[0]): # number of the inputs
1604 if w[i].ndim==2:
1605 # initialize all entries of the weight matrix to the same number
1606 for k in range(w[i].shape[1]):
1607 w[i][j][k]=w_initial[i]/w[i].shape[0]
1608 elif w[i].ndim==1:
1609 w[i][j]=w_initial[i]
1610 else:
1611 print('weight',i,'shape',w[i].shape)
1612 raise ValueError("Only 1 or 2 dimensions supported")
1613 if params['verbose_weights']:
1614 print('weight',i,w_name[i],'shape',w[i].shape,'ndim',w[i].ndim,
1615 'initial: sum',np.sum(w[i],axis=0),'\nentries',w[i])
1617 return w, w_name
1619 class RNN(RNNModel):
1621 A concrete implementation of the RNNModel abstract base class, using simple recurrent cells for hidden recurrent layers.
1623 Parameters:
1624 -----------
1625 params : dict
1626 A dictionary of model parameters.
1627 loss : str, optional
1628 The loss function to use during model training. Default is 'mean_squared_error'.
1630 def __init__(self, params, loss='mean_squared_error'):
1632 Initializes the RNN model by building the training and prediction models.
1634 Parameters:
1635 -----------
1636 params : dict or RNNParams
1637 A dictionary containing the model's parameters.
1638 loss : str, optional
1639 The loss function to use during model training. Default is 'mean_squared_error'.
1640 """
1641 super().__init__(params)
1642 self.model_train = self._build_model_train()
1643 self.model_predict = self._build_model_predict()
1645 def _build_model_train(self):
1647 Builds and compiles the training model, with batch & sequence shape specifications for input.
1649 Returns:
1650 --------
1651 model : tf.keras.Model
1652 The compiled Keras model for training.
1653 """
1654 inputs = tf.keras.Input(batch_shape=self.params['batch_shape'])
1655 x = inputs
1656 for i in range(self.params['rnn_layers']):
1657 # Return sequences True if recurrent layer feeds into another recurrent layer.
1658 # False if feeds into dense layer
1659 return_sequences = True if i < self.params['rnn_layers'] - 1 else False
1660 x = SimpleRNN(
1661 units=self.params['rnn_units'],
1662 activation=self.params['activation'][0],
1663 dropout=self.params["dropout"][0],
1664 recurrent_dropout = self.params["recurrent_dropout"],
1665 stateful=self.params['stateful'],
1666 return_sequences=return_sequences)(x)
1667 if self.params["dropout"][1] > 0:
1668 x = Dropout(self.params["dropout"][1])(x)
1669 for i in range(self.params['dense_layers']):
1670 x = Dense(self.params['dense_units'], activation=self.params['activation'][1])(x)
1671 # Add final output layer, must be 1 dense cell with linear activation if continuous scalar output
1672 x = Dense(units=1, activation='linear')(x)
1673 model = tf.keras.Model(inputs=inputs, outputs=x)
1674 optimizer=tf.keras.optimizers.Adam(learning_rate=self.params['learning_rate'])
1675 # optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule)
1676 model.compile(loss='mean_squared_error', optimizer=optimizer)
1678 if self.params["verbose_weights"]:
1679 print(f"Initial Weights Hash: {hash_weights(model)}")
1680 # print(model.get_weights())
1682 if self.params['phys_initialize']:
1683 assert self.params['scaler'] == 'reproducibility', f"Not implemented yet to do physics initialize with given data scaling {self.params['scaler']}"
1684 assert self.params['features_list'] == ['Ed', 'Ew', 'rain'], f"Physics initiation can only be done with features ['Ed', 'Ew', 'rain'], but given features {self.params['features_list']}"
1685 print("Initializing Model with Physics based weights")
1686 w, w_name=get_initial_weights(model, self.params)
1687 model.set_weights(w)
1688 print('initial weights hash =',hash_weights(model))
1689 return model
1691 def _build_model_predict(self, return_sequences=True):
1693 Builds and compiles the prediction model, doesn't use batch shape nor sequence length to make it easier to predict arbitrary number of timesteps. This model has weights copied over from training model is not directly used for training itself.
1695 Parameters:
1696 -----------
1697 return_sequences : bool, optional
1698 Whether to return the full sequence of outputs. Default is True.
1700 Returns:
1701 --------
1702 model : tf.keras.Model
1703 The compiled Keras model for prediction.
1704 """
1705 inputs = tf.keras.Input(shape=(None,self.params['n_features']))
1706 x = inputs
1707 for i in range(self.params['rnn_layers']):
1708 x = SimpleRNN(self.params['rnn_units'],activation=self.params['activation'][0],
1709 stateful=False,return_sequences=return_sequences)(x)
1710 for i in range(self.params['dense_layers']):
1711 x = Dense(self.params['dense_units'], activation=self.params['activation'][1])(x)
1712 # Add final output layer, must be 1 dense cell with linear activation if continuous scalar output
1713 x = Dense(units=1, activation='linear')(x)
1714 model = tf.keras.Model(inputs=inputs, outputs=x)
1715 optimizer=tf.keras.optimizers.Adam(learning_rate=self.params['learning_rate'])
1716 model.compile(loss='mean_squared_error', optimizer=optimizer)
1718 # Set Weights to model_train
1719 w_fitted = self.model_train.get_weights()
1720 model.set_weights(w_fitted)
1722 return model
1725 class RNN_LSTM(RNNModel):
1727 A concrete implementation of the RNNModel abstract base class, use LSTM cells for hidden recurrent layers.
1729 Parameters:
1730 -----------
1731 params : dict
1732 A dictionary of model parameters.
1733 loss : str, optional
1734 The loss function to use during model training. Default is 'mean_squared_error'.
1736 def __init__(self, params, loss='mean_squared_error'):
1738 Initializes the RNN model by building the training and prediction models.
1740 Parameters:
1741 -----------
1742 params : dict or RNNParams
1743 A dictionary containing the model's parameters.
1744 loss : str, optional
1745 The loss function to use during model training. Default is 'mean_squared_error'.
1746 """
1747 super().__init__(params)
1748 self.model_train = self._build_model_train()
1749 self.model_predict = self._build_model_predict()
1751 def _build_model_train(self):
1753 Builds and compiles the training model, with batch & sequence shape specifications for input.
1755 Returns:
1756 --------
1757 model : tf.keras.Model
1758 The compiled Keras model for training.
1759 """
1760 inputs = tf.keras.Input(batch_shape=self.params['batch_shape'])
1761 x = inputs
1762 for i in range(self.params['rnn_layers']):
1763 return_sequences = True if i < self.params['rnn_layers'] - 1 else False
1764 x = LSTM(
1765 units=self.params['rnn_units'],
1766 activation=self.params['activation'][0],
1767 dropout=self.params["dropout"][0],
1768 recurrent_dropout = self.params["recurrent_dropout"],
1769 recurrent_activation=self.params["recurrent_activation"],
1770 stateful=self.params['stateful'],
1771 return_sequences=return_sequences)(x)
1772 if self.params["dropout"][1] > 0:
1773 x = Dropout(self.params["dropout"][1])(x)
1774 for i in range(self.params['dense_layers']):
1775 x = Dense(self.params['dense_units'], activation=self.params['activation'][1])(x)
1776 model = tf.keras.Model(inputs=inputs, outputs=x)
1777 # optimizer=tf.keras.optimizers.Adam(learning_rate=self.params['learning_rate'], clipvalue=self.params['clipvalue'])
1778 optimizer=tf.keras.optimizers.Adam(learning_rate=self.params['learning_rate'])
1779 model.compile(loss='mean_squared_error', optimizer=optimizer)
1781 if self.params["verbose_weights"]:
1782 print(f"Initial Weights Hash: {hash_weights(model)}")
1783 return model
1784 def _build_model_predict(self, return_sequences=True):
1786 Builds and compiles the prediction model, doesn't use batch shape nor sequence length to make it easier to predict arbitrary number of timesteps. This model has weights copied over from training model is not directly used for training itself.
1788 Parameters:
1789 -----------
1790 return_sequences : bool, optional
1791 Whether to return the full sequence of outputs. Default is True.
1793 Returns:
1794 --------
1795 model : tf.keras.Model
1796 The compiled Keras model for prediction.
1797 """
1798 inputs = tf.keras.Input(shape=(None,self.params['n_features']))
1799 x = inputs
1800 for i in range(self.params['rnn_layers']):
1801 x = LSTM(
1802 units=self.params['rnn_units'],
1803 activation=self.params['activation'][0],
1804 stateful=False,return_sequences=return_sequences)(x)
1805 for i in range(self.params['dense_layers']):
1806 x = Dense(self.params['dense_units'], activation=self.params['activation'][1])(x)
1807 model = tf.keras.Model(inputs=inputs, outputs=x)
1808 optimizer=tf.keras.optimizers.Adam(learning_rate=self.params['learning_rate'])
1809 model.compile(loss='mean_squared_error', optimizer=optimizer)
1811 # Set Weights to model_train
1812 w_fitted = self.model_train.get_weights()
1813 model.set_weights(w_fitted)
1815 return model