Process code
[notebooks.git] / fmda / moisture_rnn.py
blob0cbbe394d7f2fe85a9949735251e5d38b124284d
2 # v2 training and prediction class infrastructure
4 # Environment
5 import random
6 import numpy as np
7 import pandas as pd
8 import tensorflow as tf
9 import matplotlib.pyplot as plt
10 import sys
11 from tensorflow.keras.callbacks import Callback, EarlyStopping, TerminateOnNaN
12 # from sklearn.metrics import mean_squared_error
13 import logging
14 from tensorflow.keras import layers,models
15 from tensorflow.keras.layers import LSTM, SimpleRNN, Input, Dropout, Dense
16 # Local modules
17 import reproducibility
18 # from utils import print_dict_summary
19 from abc import ABC, abstractmethod
20 from utils import hash2, all_items_exist, hash_ndarray, hash_weights, rmse_3d
21 from data_funcs import rmse, plot_data, compare_dicts
22 import copy
23 # import yaml
24 from sklearn.preprocessing import MinMaxScaler, StandardScaler
25 import warnings
27 #*************************************************************************************
28 # Data Formatting Functions
30 def staircase(x,y,timesteps,datapoints,return_sequences=False, verbose = False):
31 # x [datapoints,features] all inputs
32 # y [datapoints,outputs]
33 # timesteps: split x and y into samples length timesteps, shifted by 1
34 # datapoints: number of timesteps to use for training, no more than y.shape[0]
35 if verbose:
36 print('staircase: shape x = ',x.shape)
37 print('staircase: shape y = ',y.shape)
38 print('staircase: timesteps=',timesteps)
39 print('staircase: datapoints=',datapoints)
40 print('staircase: return_sequences=',return_sequences)
41 outputs = y.shape[1]
42 features = x.shape[1]
43 samples = datapoints-timesteps+1
44 if verbose:
45 print('staircase: samples=',samples,'timesteps=',timesteps,'features=',features)
46 x_train = np.empty([samples, timesteps, features])
47 if return_sequences:
48 if verbose:
49 print('returning all timesteps in a sample')
50 y_train = np.empty([samples, timesteps, outputs]) # all
51 for i in range(samples):
52 for k in range(timesteps):
53 x_train[i,k,:] = x[i+k,:]
54 y_train[i,k,:] = y[i+k,:]
55 else:
56 if verbose:
57 print('returning only the last timestep in a sample')
58 y_train = np.empty([samples, outputs])
59 for i in range(samples):
60 for k in range(timesteps):
61 x_train[i,k,:] = x[i+k,:]
62 y_train[i,:] = y[i+timesteps-1,:]
64 return x_train, y_train
66 def staircase_2(x,y,timesteps,batch_size=None,trainsteps=np.inf,return_sequences=False, verbose = False):
67 # create RNN training data in multiple batches
68 # input:
69 # x (,features)
70 # y (,outputs)
71 # timesteps: split x and y into sequences length timesteps
72 # a.k.a. lookback or sequence_length
74 # print params if verbose
76 if batch_size is None:
77 raise ValueError('staircase_2 requires batch_size')
78 if verbose:
79 print('staircase_2: shape x = ',x.shape)
80 print('staircase_2: shape y = ',y.shape)
81 print('staircase_2: timesteps=',timesteps)
82 print('staircase_2: batch_size=',batch_size)
83 print('staircase_2: return_sequences=',return_sequences)
85 nx,features= x.shape
86 ny,outputs = y.shape
87 datapoints = min(nx,ny,trainsteps)
88 if verbose:
89 print('staircase_2: datapoints=',datapoints)
91 # sequence j in a given batch is assumed to be the continuation of sequence j in the previous batch
92 # https://www.tensorflow.org/guide/keras/working_with_rnns Cross-batch statefulness
94 # example with timesteps=3 batch_size=3 datapoints=15
95 # batch 0: [0 1 2] [1 2 3] [2 3 4]
96 # batch 1: [3 4 5] [4 5 6] [5 6 7]
97 # batch 2: [6 7 8] [7 8 9] [8 9 10]
98 # batch 3: [9 10 11] [10 11 12] [11 12 13]
99 # batch 4: [12 13 14] [13 14 15] when runs out this is the last batch, can be shorter
101 # TODO: implement for multiple locations, same starting time for each batch
102 # Loc 1 Loc 2 Loc 3
103 # batch 0: [0 1 2] [0 1 2] [0 1 2]
104 # batch 1: [3 4 5] [3 4 5] [3 4 5]
105 # batch 2: [6 7 8] [6 7 8] [6 7 8]
106 # TODO: second epoch shift starting time at batch 0 in time
108 # TODO: implement for multiple locations, different starting times for each batch
109 # Loc 1 Loc 2 Loc 3
110 # batch 0: [0 1 2] [1 2 3] [2 3 4]
111 # batch 1: [3 4 5] [4 5 6] [5 6 57
112 # batch 2: [6 7 8] [7 8 9] [8 9 10]
115 # the first sample in batch j starts from timesteps*j and ends with timesteps*(j+1)-1
116 # e.g. the final hidden state of the rnn after the sequence of steps [0 1 2] in batch 0
117 # becomes the starting hidden state of the rnn in the sequence of steps [3 4 5] in batch 1, etc.
119 # sample [0 1 2] means the rnn is used twice to map state 0 -> 1 -> 2
120 # the state at time 0 is fixed but the state is considered a variable at times 1 and 2
121 # the loss is computed from the output at time 2 and the gradient of the loss function by chain rule which ends at time 0 because the state there is a constant -> derivative is zero
122 # sample [3 4 5] means the rnn is used twice to map state 3 -> 4 -> 5 # the state at time 3 is fixed to the output of the first sequence [0 1 2]
123 # the loss is computed from the output at time 5 and the gradient of the loss function by chain rule which ends at time 3 because the state there is considered constant -> derivative is zero
124 # how is the gradient computed? I suppose keras adds gradient wrt the weights at 2 5 8 ... 3 6 9... 4 7 ... and uses that to update the weights
125 # there is only one set of weights h(2) = f(h(1),w) h(1) = f(h(0),w) but w is always the same
126 # each column is a one successive evaluation of h(n+1) = f(h(n),w) for n = n_startn n_start+1,...
127 # the cannot be evaluated efficiently on gpu because gpu is a parallel processor
128 # this of it as each column served by one thread, and the threads are independent because they execute in parallel, there needs to be large number of threads (32 is a good number)\
129 # each batch consists of independent calculations
130 # but it can depend on the result of the previous batch (that's the recurrent parr)
134 max_batches = datapoints // timesteps
135 max_sequences = max_batches * batch_size
137 if verbose:
138 print('staircase_2: max_batches=',max_batches)
139 print('staircase_2: max_sequences=',max_sequences)
141 x_train = np.zeros((max_sequences, timesteps, features))
142 if return_sequences:
143 y_train = np.empty((max_sequences, timesteps, outputs))
144 else:
145 y_train = np.empty((max_sequences, outputs ))
147 # build the sequences
149 for i in range(max_batches):
150 for j in range(batch_size):
151 begin = i*timesteps + j
152 next = begin + timesteps
153 if next > datapoints:
154 break
155 if verbose:
156 print('sequence',k,'batch',i,'sample',j,'data',begin,'to',next-1)
157 x_train[k,:,:] = x[begin:next,:]
158 if return_sequences:
159 y_train[k,:,:] = y[begin:next,:]
160 else:
161 y_train[k,:] = y[next-1,:]
162 k += 1
163 if verbose:
164 print('staircase_2: shape x_train = ',x_train.shape)
165 print('staircase_2: shape y_train = ',y_train.shape)
166 print('staircase_2: sequences generated',k)
167 print('staircase_2: batch_size=',batch_size)
168 k = (k // batch_size) * batch_size
169 if verbose:
170 print('staircase_2: removing partial and empty batches at the end, keeping',k)
171 x_train = x_train[:k,:,:]
172 if return_sequences:
173 y_train = y_train[:k,:,:]
174 else:
175 y_train = y_train[:k,:]
177 if verbose:
178 print('staircase_2: shape x_train = ',x_train.shape)
179 print('staircase_2: shape y_train = ',y_train.shape)
181 return x_train, y_train
184 # Dictionary of scalers, used to avoid multiple object creation and to avoid multiple if statements
185 scalers = {
186 'minmax': MinMaxScaler(),
187 'standard': StandardScaler()
191 def batch_setup(ids, batch_size):
193 Sets up stateful batched training data scheme for RNN training.
195 This function takes a list or array of identifiers (`ids`) and divides them into batches of a specified size (`batch_size`). If the last batch does not have enough elements to meet the `batch_size`, the function will loop back to the start of the identifiers and continue filling the batch until it reaches the required size.
197 Parameters:
198 -----------
199 ids : list or numpy array
200 A list or numpy array containing the ids to be batched.
202 batch_size : int
203 The desired size of each batch.
205 Returns:
206 --------
207 batches : list of lists
208 A list where each element is a batch (itself a list) of identifiers. Each batch will contain exactly `batch_size` elements.
210 Example:
211 --------
212 >>> ids = [1, 2, 3, 4, 5]
213 >>> batch_size = 3
214 >>> batch_setup(ids, batch_size)
215 [[1, 2, 3], [4, 5, 1]]
217 Notes:
218 ------
219 - If `ids` is shorter than `batch_size`, the returned list will contain a single batch where identifiers are repeated from the start of `ids` until the batch is filled.
220 """
221 # Ensure ids is a numpy array
222 x = np.array(ids)
224 # Initialize the list to hold the batches
225 batches = []
227 # Use a loop to slice the list/array into batches
228 for i in range(0, len(x), batch_size):
229 batch = list(x[i:i + batch_size])
231 # If the batch is not full, continue from the start
232 while len(batch) < batch_size:
233 # Calculate the remaining number of items needed
234 remaining = batch_size - len(batch)
235 # Append the needed number of items from the start of the array
236 batch.extend(x[:remaining])
238 batches.append(batch)
240 return batches
242 def staircase_spatial(X, y, batch_size, timesteps, hours=None, start_times = None, verbose = True):
244 Prepares spatially formatted time series data for RNN training by creating batches of sequences across different locations, stacked to be compatible with stateful models.
246 This function processes multi-location time series data by slicing it into batches and formatting it to fit into a recurrent neural network (RNN) model. It utilizes a staircase-like approach to prepare sequences for each location and then interlaces them to align with stateful RNN structures.
248 Parameters:
249 -----------
250 X : list of numpy arrays
251 A list where each element is a numpy array containing features for a specific location. The shape of each array is `(total_time_steps, features)`.
253 y : list of numpy arrays
254 A list where each element is a numpy array containing the target values for a specific location. The shape of each array is `(total_time_steps,)`.
256 batch_size : int
257 The number of sequences to include in each batch.
259 timesteps : int
260 The number of time steps to include in each sequence for the RNN.
262 hours : int, optional
263 The length of each time series to consider for each location. If `None`, it defaults to the minimum length of `y` across all locations.
265 start_times : numpy array, optional
266 The initial time step for each location. If `None`, it defaults to an array starting from 0 and incrementing by 1 for each location.
268 verbose : bool, optional
269 If `True`, prints additional information during processing. Default is `True`.
271 Returns:
272 --------
273 XX : numpy array
274 A 3D numpy array with shape `(total_sequences, timesteps, features)` containing the prepared feature sequences for all locations.
276 yy : numpy array
277 A 2D numpy array with shape `(total_sequences, 1)` containing the corresponding target values for all locations.
279 n_seqs : int
280 Number of sequences per location. Used to reset states when location changes. Hidden state of RNN will be reset after n_seqs number of batches
282 Notes:
283 ------
284 - The function handles spatially distributed time series data by batching and formatting it for stateful RNNs.
285 - `hours` determines how much of the time series is used for each location. If not provided, it defaults to the shortest series in `y`.
286 - If `start_times` is not provided, it assumes each location starts its series at progressively later time steps.
287 - The `batch_setup` function is used internally to manage the creation of location and time step batches.
288 - The returned feature sequences `XX` and target sequences `yy` are interlaced to align with the expected input format of stateful RNNs.
291 # Generate ids based on number of distinct timeseries provided
292 n_loc = len(y) # assuming each list entry for y is a separate location
293 loc_ids = np.arange(n_loc)
295 # Generate hours and start_times if None
296 if hours is None:
297 print("Setting total hours to minimum length of y in provided dictionary")
298 hours = min(len(yi) for yi in y)
299 if start_times is None:
300 print(f"Setting Start times to offset by 1 hour by location, from 0 through {batch_size} (batch_size)")
301 start_times = np.tile(np.arange(batch_size), n_loc // batch_size + 1)[:n_loc]
302 # Set up batches
303 loc_batch, t_batch = batch_setup(loc_ids, batch_size), batch_setup(start_times, batch_size)
304 if verbose:
305 print(f"Location ID Batches: {loc_batch}")
306 print(f"Start Times for Batches: {t_batch}")
308 # Loop over batches and construct with staircase_2
309 Xs = []
310 ys = []
311 for i in range(0, len(loc_batch)):
312 locs_i = loc_batch[i]
313 ts = t_batch[i]
314 for j in range(0, len(locs_i)):
315 t0 = int(ts[j])
316 tend = t0 + hours
317 # Create RNNData Dict
318 # Subset data to given location and time from t0 to t0+hours
319 k = locs_i[j] # Used to account for fewer locations than batch size
320 X_temp = X[k][t0:tend,:]
321 y_temp = y[k][t0:tend].reshape(-1,1)
323 # Format sequences
324 Xi, yi = staircase_2(
325 X_temp,
326 y_temp,
327 timesteps = timesteps,
328 batch_size = 1, # note: using 1 here to format sequences for a single location, not same as target batch size for training data
329 verbose=False)
331 Xs.append(Xi)
332 ys.append(yi)
334 # Drop incomplete batches
335 lens = [yi.shape[0] for yi in ys]
336 n_seqs = min(lens)
337 if verbose:
338 print(f"Minimum number of sequences by location: {n_seqs}")
339 print(f"Applying minimum length to other arrays.")
340 Xs = [Xi[:n_seqs] for Xi in Xs]
341 ys = [yi[:n_seqs] for yi in ys]
343 # Interlace arrays to match stateful structure
344 n_features = Xi.shape[2]
345 XXs = []
346 yys = []
347 for i in range(0, len(loc_batch)):
348 locs_i = loc_batch[i]
349 XXi = np.empty((Xs[0].shape[0]*batch_size, timesteps, n_features))
350 yyi = np.empty((Xs[0].shape[0]*batch_size, 1))
351 for j in range(0, len(locs_i)):
352 XXi[j::(batch_size)] = Xs[locs_i[j]]
353 yyi[j::(batch_size)] = ys[locs_i[j]]
354 XXs.append(XXi)
355 yys.append(yyi)
356 yy = np.concatenate(yys, axis=0)
357 XX = np.concatenate(XXs, axis=0)
359 if verbose:
360 print(f"Spatially Formatted X Shape: {XX.shape}")
361 print(f"Spatially Formatted X Shape: {yy.shape}")
364 return XX, yy, n_seqs
366 #***********************************************************************************************
367 ### RNN Class Functionality
369 class RNNParams(dict):
371 A custom dictionary class for handling RNN parameters. Automatically calculates certain params based on others. Overwrites the update method to protect from incompatible parameter choices. Inherits from dict
372 """
373 def __init__(self, input_dict):
375 Initializes the RNNParams instance and runs checks and shape calculations.
377 Parameters:
378 -----------
379 input_dict : dict,
380 A dictionary containing RNN parameters.
382 super().__init__(input_dict)
383 # Automatically run checks on initialization
384 self._run_checks()
385 # Automatically calculate shapes on initialization
386 self.calc_param_shapes()
387 def _run_checks(self, verbose=True):
389 Validates that required keys exist and are of the correct type.
391 Parameters:
392 -----------
393 verbose : bool, optional
394 If True, prints status messages. Default is True.
396 print("Checking params...")
397 # Keys must exist and be integers
398 int_keys = [
399 'batch_size', 'timesteps', 'rnn_layers',
400 'rnn_units', 'dense_layers', 'dense_units', 'epochs'
403 for key in int_keys:
404 assert key in self, f"Missing required key: {key}"
405 assert isinstance(self[key], int), f"Key '{key}' must be an integer"
407 # Keys must exist and be lists
408 list_keys = ['activation', 'features_list', 'dropout', 'time_fracs']
409 for key in list_keys:
410 assert key in self, f"Missing required key: {key}"
411 assert isinstance(self[key], list), f"Key '{key}' must be a list"
413 # Keys must exist and be floats
414 float_keys = ['learning_rate']
415 for key in float_keys:
416 assert key in self, f"Missing required key: {key}"
417 assert isinstance(self[key], float), f"Key '{key}' must be a float"
419 print("Input dictionary passed all checks.")
420 def calc_param_shapes(self, verbose=True):
422 Calculates and updates the shapes of certain parameters based on input data.
424 Parameters:
425 -----------
426 verbose : bool, optional
427 If True, prints status messages. Default is True.
429 if verbose:
430 print("Calculating shape params based on features list, timesteps, and batch size")
431 print(f"Input Feature List: {self['features_list']}")
432 print(f"Input Timesteps: {self['timesteps']}")
433 print(f"Input Batch Size: {self['batch_size']}")
435 n_features = len(self['features_list'])
436 batch_shape = (self["batch_size"], self["timesteps"], n_features)
437 if verbose:
438 print("Calculated params:")
439 print(f"Number of features: {n_features}")
440 print(f"Batch Shape: {batch_shape}")
442 # Update the dictionary
443 super().update({
444 'n_features': n_features,
445 'batch_shape': batch_shape
447 if verbose:
448 print(self)
450 def update(self, *args, verbose=True, **kwargs):
452 Overwrites the standard update functon from dict. This is to prevent certain keys from being modified directly and to automatically update keys to be compatible with each other. The keys handled relate to the shape of the input data to the RNN.
454 Parameters:
455 -----------
456 verbose : bool, optional
457 If True, prints status messages. Default is True.
459 # Prevent updating n_features and batch_shape
460 restricted_keys = {'n_features', 'batch_shape'}
461 keys_to_check = {'features_list', 'timesteps', 'batch_size'}
463 # Check for restricted keys in args
464 if args:
465 if isinstance(args[0], dict):
466 if restricted_keys & args[0].keys():
467 raise KeyError(f"Cannot directly update keys: {restricted_keys & args[0].keys()}, \n Instead update one of: {keys_to_check}")
468 elif isinstance(args[0], (tuple, list)) and all(isinstance(i, tuple) and len(i) == 2 for i in args[0]):
469 if restricted_keys & {k for k, v in args[0]}:
470 raise KeyError(f"Cannot directly update keys: {restricted_keys & {k for k, v in args[0]}}, \n Instead update one of: {keys_to_check}")
472 # Check for restricted keys in kwargs
473 if restricted_keys & kwargs.keys():
474 raise KeyError(f"Cannot update restricted keys: {restricted_keys & kwargs.keys()}")
477 # Track if specific keys are updated
478 keys_updated = set()
480 # Update using the standard dict update method
481 if args:
482 if isinstance(args[0], dict):
483 keys_updated.update(args[0].keys() & keys_to_check)
484 elif isinstance(args[0], (tuple, list)) and all(isinstance(i, tuple) and len(i) == 2 for i in args[0]):
485 keys_updated.update(k for k, v in args[0] if k in keys_to_check)
487 if kwargs:
488 keys_updated.update(kwargs.keys() & keys_to_check)
490 # Call the parent update method
491 super().update(*args, **kwargs)
493 # Recalculate shapes if necessary
494 if keys_updated:
495 self.calc_param_shapes(verbose=verbose)
498 ## Class for handling input data
499 class RNNData(dict):
501 A custom dictionary class for managing RNN data, with validation, scaling, and train-test splitting functionality.
502 """
503 required_keys = {"loc", "time", "X", "y", "features_list"}
504 def __init__(self, input_dict, scaler=None, features_list=None):
506 Initializes the RNNData instance, performs checks, and prepares data.
508 Parameters:
509 -----------
510 input_dict : dict
511 A dictionary containing the initial data.
512 scaler : str, optional
513 The name of the scaler to be used (e.g., 'minmax', 'standard'). Default is None.
514 features_list : list, optional
515 A subset of features to be used. Default is None which means all features.
518 # Copy to avoid changing external input
519 input_data = input_dict.copy()
520 # Initialize inherited dict class
521 super().__init__(input_data)
523 # Check if input data is one timeseries dataset or multiple
524 if type(self.loc['STID']) == str:
525 self.spatial = False
526 print("Input data is single timeseries.")
527 elif type(self.loc['STID']) == list:
528 self.spatial = True
529 print("Input data from multiple timeseries.")
530 else:
531 raise KeyError(f"Input locations not list or single string")
533 # Set up Data Scaling
534 self.scaler = None
535 if scaler is not None:
536 self.set_scaler(scaler)
538 # Rename and define other stuff.
539 if self.spatial:
540 self['hours'] = min(arr.shape[0] for arr in self.y)
541 else:
542 self['hours'] = len(self['y'])
544 self['all_features_list'] = self.pop('features_list')
545 if features_list is None:
546 print("Using all input features.")
547 self.features_list = self.all_features_list
548 else:
549 self.features_list = features_list
551 print(f"Setting features_list to {features_list}. \n NOTE: not subsetting features yet. That happens in train_test_split.")
553 self._run_checks()
554 self.__dict__.update(self)
557 # TODO: Fix checks for multilocation
558 def _run_checks(self, verbose=True):
560 Validates that required keys are present and checks the integrity of data shapes.
562 Parameters:
563 -----------
564 verbose : bool, optional
565 If True, prints status messages. Default is True.
566 """
567 missing_keys = self.required_keys - self.keys()
568 if missing_keys:
569 raise KeyError(f"Missing required keys: {missing_keys}")
571 # # Check if 'hours' is provided and matches len(y)
572 # if 'hours' in self:
573 # if self.hours != len(self.y):
574 # raise ValueError(f"Provided 'hours' value {self.hours} does not match the length of 'y', which is {len(self.y)}.")
575 # Check desired subset of features is in all input features
576 if not all_items_exist(self.features_list, self.all_features_list):
577 raise ValueError(f"Provided 'features_list' {self.features_list} has elements not in input features.")
578 def set_scaler(self, scaler):
580 Sets the scaler to be used for data normalization.
582 Parameters:
583 -----------
584 scaler : str
585 The name of the scaler (e.g., 'minmax', 'standard').
586 """
587 recognized_scalers = ['minmax', 'standard']
588 if scaler in recognized_scalers:
589 print(f"Setting data scaler: {scaler}")
590 self.scaler = scalers[scaler]
591 else:
592 raise ValueError(f"Unrecognized scaler '{scaler}'. Recognized scalers are: {recognized_scalers}.")
593 def train_test_split(self, time_fracs=[1.,0.,0.], space_fracs=[1.,0.,0.], subset_features=True, features_list=None, verbose=True):
595 Splits the data into training, validation, and test sets.
597 Parameters:
598 -----------
599 train_frac : float
600 The fraction of data to be used for training.
601 val_frac : float, optional
602 The fraction of data to be used for validation. Default is 0.0.
603 subset_features : bool, optional
604 If True, subsets the data to the specified features list. Default is True.
605 features_list : list, optional
606 A list of features to use for subsetting. Default is None.
607 split_space : bool, optional
608 Whether to split the data based on space. Default is False.
609 verbose : bool, optional
610 If True, prints status messages. Default is True.
612 # Indicate whether multi timeseries or not
613 spatial = self.spatial
615 # Set up
616 assert np.sum(time_fracs) == np.sum(space_fracs) == 1., f"Provided cross validation params don't sum to 1"
617 if (len(time_fracs) != 3) or (len(space_fracs) != 3):
618 raise ValueError("Cross-validation params `time_fracs` and `space_fracs` must be lists of length 3, representing (train/validation/test)")
620 train_frac = time_fracs[0]
621 val_frac = time_fracs[1]
622 test_frac = time_fracs[2]
624 # Setup train/val/test in time
625 train_ind = int(np.floor(self.hours * train_frac)); self.train_ind = train_ind
626 test_ind= int(train_ind + round(self.hours * val_frac)); self.test_ind = test_ind
627 # Check for any potential issues with indices
628 if test_ind > self.hours:
629 print(f"Setting test index to {self.hours}")
630 test_ind = self.hours
631 if train_ind > test_ind:
632 raise ValueError("Train index must be less than test index.")
634 # Setup train/val/test in space
635 if spatial:
636 train_frac_sp = space_fracs[0]
637 val_frac_sp = space_fracs[1]
638 locs = np.arange(len(self.loc['STID'])) # indices of locations
639 train_size = int(len(locs) * train_frac_sp)
640 val_size = int(len(locs) * val_frac_sp)
641 random.shuffle(locs)
642 train_locs = locs[:train_size]
643 val_locs = locs[train_size:train_size + val_size]
644 test_locs = locs[train_size + val_size:]
645 # Store Lists of IDs in loc subdirectory
646 self.loc['train_locs'] = [self.case[i] for i in train_locs]
647 self.loc['val_locs'] = [self.case[i] for i in val_locs]
648 self.loc['test_locs'] = [self.case[i] for i in test_locs]
651 # Extract data to desired features, copy to avoid changing input objects
652 X = self.X.copy()
653 y = self.y.copy()
654 if subset_features:
655 if verbose and self.features_list != self.all_features_list:
656 print(f"Subsetting input data to features_list: {self.features_list}")
657 # Indices to subset all features with based on params features
658 indices = []
659 for item in self.features_list:
660 if item in self.all_features_list:
661 indices.append(self.all_features_list.index(item))
662 else:
663 print(f"Warning: feature name '{item}' not found in list of all features from input data. Removing from internal features list")
664 # self.features_list.remove(item)
665 if spatial:
666 X = [Xi[:, indices] for Xi in X]
667 else:
668 X = X[:, indices]
670 # Training data from 0 to train_ind
671 # Validation data from train_ind to test_ind
672 # Test data from test_ind to end
673 if spatial:
674 # Split by space
675 X_train = [X[i] for i in train_locs]
676 X_val = [X[i] for i in val_locs]
677 X_test = [X[i] for i in test_locs]
678 y_train = [y[i] for i in train_locs]
679 y_val = [y[i] for i in val_locs]
680 y_test = [y[i] for i in test_locs]
682 # Split by time
683 self.X_train = [Xi[:train_ind] for Xi in X_train]
684 self.y_train = [yi[:train_ind].reshape(-1,1) for yi in y_train]
685 if (val_frac >0) and (val_frac_sp)>0:
686 self.X_val = [Xi[train_ind:test_ind] for Xi in X_val]
687 self.y_val = [yi[train_ind:test_ind].reshape(-1,1) for yi in y_val]
688 self.X_test = [Xi[test_ind:] for Xi in X_test]
689 self.y_test = [yi[test_ind:].reshape(-1,1) for yi in y_test]
690 else:
691 self.X_train = X[:train_ind]
692 self.y_train = y[:train_ind].reshape(-1,1) # assumes y 1-d, change this if vector output
693 if val_frac >0:
694 self.X_val = X[train_ind:test_ind]
695 self.y_val = y[train_ind:test_ind].reshape(-1,1) # assumes y 1-d, change this if vector output
696 self.X_test = X[test_ind:]
697 self.y_test = y[test_ind:].reshape(-1,1) # assumes y 1-d, change this if vector output
701 # Print statements if verbose
702 if verbose:
703 print(f"Train index: 0 to {train_ind}")
704 print(f"Validation index: {train_ind} to {test_ind}")
705 print(f"Test index: {test_ind} to {self.hours}")
707 if spatial:
708 print("Subsetting locations into train/val/test")
709 print(f"Total Locations: {len(locs)}")
710 print(f"Train Locations: {len(train_locs)}")
711 print(f"Val. Locations: {len(val_locs)}")
712 print(f"Test Locations: {len(test_locs)}")
713 print(f"X_train[0] shape: {self.X_train[0].shape}, y_train[0] shape: {self.y_train[0].shape}")
714 if hasattr(self, "X_val"):
715 print(f"X_val[0] shape: {self.X_val[0].shape}, y_val[0] shape: {self.y_val[0].shape}")
716 print(f"X_test[0] shape: {self.X_test[0].shape}, y_test[0] shape: {self.y_test[0].shape}")
717 else:
718 print(f"X_train shape: {self.X_train.shape}, y_train shape: {self.y_train.shape}")
719 if hasattr(self, "X_val"):
720 print(f"X_val shape: {self.X_val.shape}, y_val shape: {self.y_val.shape}")
721 print(f"X_test shape: {self.X_test.shape}, y_test shape: {self.y_test.shape}")
722 # Make Test Data None if zero length
723 if len(self.X_test) == 0:
724 self.X_test = None
725 warnings.warn("Zero Test Data, setting X_test to None")
726 def scale_data(self, verbose=True):
728 Scales the training data using the set scaler.
730 Parameters:
731 -----------
732 verbose : bool, optional
733 If True, prints status messages. Default is True.
734 """
735 # Indicate whether multi timeseries or not
736 spatial = self.spatial
737 if self.scaler is None:
738 raise ValueError("Scaler is not set. Use 'set_scaler' method to set a scaler before scaling data.")
739 # if hasattr(self.scaler, 'n_features_in_'):
740 # warnings.warn("Scale_data has already been called. Exiting to prevent issues.")
741 # return
742 if not hasattr(self, "X_train"):
743 raise AttributeError("No X_train within object. Run train_test_split first. This is to avoid fitting the scaler with prediction data.")
744 if verbose:
745 print(f"Scaling training data with scaler {self.scaler}, fitting on X_train")
747 if spatial:
748 # Fit scaler on row-joined training data
749 self.scaler.fit(np.vstack(self.X_train))
750 # Transform data using fitted scaler
751 self.X_train = [self.scaler.transform(Xi) for Xi in self.X_train]
752 if hasattr(self, 'X_val'):
753 if self.X_val is not None:
754 self.X_val = [self.scaler.transform(Xi) for Xi in self.X_val]
755 if self.X_test is not None:
756 self.X_test = [self.scaler.transform(Xi) for Xi in self.X_test]
757 else:
758 # Fit the scaler on the training data
759 self.scaler.fit(self.X_train)
760 # Transform the data using the fitted scaler
761 self.X_train = self.scaler.transform(self.X_train)
762 if hasattr(self, 'X_val'):
763 self.X_val = self.scaler.transform(self.X_val)
764 self.X_test = self.scaler.transform(self.X_test)
766 # NOTE: only works for non spatial
767 def scale_all_X(self, verbose=True):
769 Scales the all data using the set scaler.
771 Parameters:
772 -----------
773 verbose : bool, optional
774 If True, prints status messages. Default is True.
775 Returns:
776 -------
777 ndarray
778 Scaled X matrix, subsetted to features_list.
779 """
780 if self.spatial:
781 raise ValueError("Not implemented for spatial data")
783 if self.scaler is None:
784 raise ValueError("Scaler is not set. Use 'set_scaler' method to set a scaler before scaling data.")
785 if verbose:
786 print(f"Scaling all X data with scaler {self.scaler}, fitted on X_train")
787 # Subset features
788 indices = []
789 for item in self.features_list:
790 if item in self.all_features_list:
791 indices.append(self.all_features_list.index(item))
792 else:
793 print(f"Warning: feature name '{item}' not found in list of all features from input data")
794 X = self.X[:, indices]
795 X = self.scaler.transform(X)
797 return X
799 def subset_X_features(self, verbose=True):
800 if self.spatial:
801 raise ValueError("Not implemented for spatial data")
802 # Subset features
803 indices = []
804 for item in self.features_list:
805 if item in self.all_features_list:
806 indices.append(self.all_features_list.index(item))
807 else:
808 print(f"Warning: feature name '{item}' not found in list of all features from input data")
809 X = self.X[:, indices]
810 return X
812 def inverse_scale(self, return_X = 'all_hours', save_changes=False, verbose=True):
814 Inversely scales the data to its original form.
816 Parameters:
817 -----------
818 return_X : str, optional
819 Specifies what data to return after inverse scaling. Default is 'all_hours'.
820 save_changes : bool, optional
821 If True, updates the internal data with the inversely scaled values. Default is False.
822 verbose : bool, optional
823 If True, prints status messages. Default is True.
824 """
825 if verbose:
826 print("Inverse scaling data...")
827 X_train = self.scaler.inverse_transform(self.X_train)
828 X_val = self.scaler.inverse_transform(self.X_val)
829 X_test = self.scaler.inverse_transform(self.X_test)
831 if save_changes:
832 print("Inverse transformed data saved")
833 self.X_train = X_train
834 self.X_val = X_val
835 self.X_test = X_test
836 else:
837 if verbose:
838 print("Inverse scaled, but internal data not changed.")
839 if verbose:
840 print(f"Attempting to return {return_X}")
841 if return_X == "all_hours":
842 return np.concatenate((X_train, X_val, X_test), axis=0)
843 else:
844 print(f"Unrecognized or unimplemented return value {return_X}")
845 def batch_reshape(self, timesteps, batch_size, hours=None, verbose=False, start_times=None):
847 Restructures input data to RNN using batches and sequences.
849 Parameters:
850 ----------
851 batch_size : int
852 The size of each training batch to reshape the data.
853 timesteps : int
854 The number of timesteps or sequence length. Consistitutes a single sample
855 timesteps : int
856 Number of timesteps or sequence length used for a single sequence in RNN training. Constitutes a single sample to the model
858 batch_size : int
859 Number of sequences used within a batch of training
861 Returns:
862 -------
863 None
864 This method reshapes the data in place.
865 Raises:
866 ------
867 AttributeError
868 If either 'X_train' or 'y_train' attributes do not exist within the instance.
870 Notes:
871 ------
872 The reshaping method depends on self param "spatial".
873 - spatial == False: Reshapes data assuming no spatial dimensions.
874 - spatial == True: Reshapes data considering spatial dimensions.
878 if not hasattr(self, 'X_train') or not hasattr(self, 'y_train'):
879 raise AttributeError("Both 'X_train' and 'y_train' must be set before reshaping batches.")
881 # Indicator of spatial training scheme or not
882 spatial = self.spatial
884 if spatial:
885 print(f"Reshaping spatial training data using batch size: {batch_size} and timesteps: {timesteps}")
886 # Format training data
887 self.X_train, self.y_train, self.n_seqs = staircase_spatial(self.X_train, self.y_train, timesteps = timesteps,
888 batch_size=batch_size, hours=hours, verbose=verbose, start_times=start_times)
889 # Format Validation Data if provided
890 if hasattr(self, "X_val"):
891 print(f"Reshaping validation data using batch size: {batch_size} and timesteps: {timesteps}")
892 self.X_val, self.y_val, _ = staircase_spatial(self.X_val, self.y_val, timesteps = timesteps,
893 batch_size=batch_size, hours=None, verbose=verbose, start_times=start_times)
894 # Format Test Data. Same (batch_size, timesteps, features) format, but for prediction model batch_size and timesteps are unconstrained
895 # So here batch_size will represent the number of locations aka separate timeseries to predict
896 if self.X_test is not None:
897 print(f"Reshaping test data by stacking. Output dimension will be (n_locs, test_hours, features)")
898 self.X_test = np.stack(self.X_test, axis=0)
899 self.y_test = np.stack(self.y_test, axis=0)
900 print(f"X_test shape: {self.X_test.shape}")
901 print(f"y_test shape: {self.y_test.shape}")
903 else:
904 print(f"Reshaping training data using batch size: {batch_size} and timesteps: {timesteps}")
905 # Format Training Data
906 self.X_train, self.y_train = staircase_2(self.X_train, self.y_train, timesteps = timesteps,
907 batch_size=batch_size, verbose=verbose)
908 # Format Validation Data if provided
909 if hasattr(self, "X_val"):
910 print(f"Reshaping validation data using batch size: {batch_size} and timesteps: {timesteps}")
911 self.X_val, self.y_val = staircase_2(self.X_val, self.y_val, timesteps = timesteps,
912 batch_size=batch_size, verbose=verbose)
913 # Format Test Data. Shape should be (1, hours, features)
914 if self.X_test is not None:
915 self.X_test = self.X_test.reshape((1, self.X_test.shape[0], len(self.features_list)))
916 # self.y_test =
917 print(f"X_test shape: {self.X_test.shape}")
918 print(f"y_test shape: {self.y_test.shape}")
919 if self.X_train.shape[0] == 0:
920 raise ValueError("X_train has zero rows. Try different combo of cross-validation fractions, batch size or start_times. Train/val/test data partially processed, need to return train_test_split")
922 def print_hashes(self, attrs_to_check = ['X', 'y', 'X_train', 'y_train', 'X_val', 'y_val', 'X_test', 'y_test']):
924 Prints the hash of specified data attributes.
926 Parameters:
927 -----------
928 attrs_to_check : list, optional
929 A list of attribute names to hash and print. Default includes 'X', 'y', and split data.
932 for attr in attrs_to_check:
933 if hasattr(self, attr):
934 value = getattr(self, attr)
935 print(f"Hash of {attr}: {hash_ndarray(value)}")
936 def __getattr__(self, key):
938 Allows attribute-style access to dictionary keys, a.k.a. enables the "." operator for get elements
939 """
940 try:
941 return self[key]
942 except KeyError:
943 raise AttributeError(f"'rnn_data' object has no attribute '{key}'")
945 def __setitem__(self, key, value):
947 Ensures dictionary and attribute updates stay in sync for required keys.
948 """
949 super().__setitem__(key, value) # Update the dictionary
950 if key in self.required_keys:
951 super().__setattr__(key, value) # Ensure the attribute is updated as well
953 def __setattr__(self, key, value):
955 Ensures dictionary keys are updated when setting attributes.
957 self[key] = value
960 # Function to check reproduciblity hashes, environment info, and model parameters
961 def check_reproducibility(dict0, params, m_hash, w_hash):
963 Performs reproducibility checks on a model by comparing current settings and outputs with stored reproducibility information.
965 Parameters:
966 -----------
967 dict0 : dict
968 The data dictionary that should contain reproducibility information under the 'repro_info' attribute.
969 params : dict
970 The current model parameters to be checked against the reproducibility information.
971 m_hash : str
972 The hash of the current model predictions.
973 w_hash : str
974 The hash of the current fitted model weights.
976 Returns:
977 --------
978 None
979 The function returns None. It issues warnings if any reproducibility checks fail.
981 Notes:
982 ------
983 - Checks are only performed if the `dict0` contains the 'repro_info' attribute.
984 - Issues warnings for mismatches in model weights, predictions, Python version, TensorFlow version, and model parameters.
985 - Skips checks if physics-based initialization is used (not implemented).
986 """
987 if not hasattr(dict0, "repro_info"):
988 warnings.warn("The provided data dictionary does not have the required 'repro_info' attribute. Not running reproduciblity checks.")
989 return
991 repro_info = dict0.repro_info
992 # Check Hashes
993 if params['phys_initialize']:
994 hashes = repro_info['phys_initialize']
995 print(f"Fitted weights hash: {w_hash} \n Reproducibility weights hash: {hashes['fitted_weights_hash']}")
996 print(f"Model predictions hash: {m_hash} \n Reproducibility preds hash: {hashes['preds_hash']}")
997 if (w_hash != hashes['fitted_weights_hash']) or (m_hash != hashes['preds_hash']):
998 if w_hash != hashes['fitted_weights_hash']:
999 warnings.warn("The fitted weights hash does not match the reproducibility weights hash.")
1000 if m_hash != hashes['preds_hash']:
1001 warnings.warn("The predictions hash does not match the reproducibility predictions hash.")
1002 else:
1003 print("***Reproducibility Checks passed - model weights and model predictions match expected.***")
1004 else:
1005 hashes = repro_info['rand_initialize']
1006 print(f"Fitted weights hash: {w_hash} \n Reproducibility weights hash: {hashes['fitted_weights_hash']}")
1007 print(f"Model predictions hash: {m_hash} \n Reproducibility preds hash: {hashes['preds_hash']}")
1008 if (w_hash != hashes['fitted_weights_hash']) or (m_hash != hashes['preds_hash']):
1009 if w_hash != hashes['fitted_weights_hash']:
1010 warnings.warn("The fitted weights hash does not match the reproducibility weights hash.")
1011 if m_hash != hashes['preds_hash']:
1012 warnings.warn("The predictions hash does not match the reproducibility predictions hash.")
1013 else:
1014 print("***Reproducibility Checks passed - model weights and model predictions match expected.***")
1016 # Check Environment
1017 current_py_version = sys.version[0:6]
1018 current_tf_version = tf.__version__
1019 if current_py_version != repro_info['env_info']['py_version']:
1020 warnings.warn(f"Python version mismatch: Current Python version is {current_py_version}, "
1021 f"expected {repro_info['env_info']['py_version']}.")
1023 if current_tf_version != repro_info['env_info']['tf_version']:
1024 warnings.warn(f"TensorFlow version mismatch: Current TensorFlow version is {current_tf_version}, "
1025 f"expected {repro_info['env_info']['tf_version']}.")
1027 # Check Params
1028 repro_params = repro_info.get('params', {})
1030 for key, repro_value in repro_params.items():
1031 if key in params:
1032 if params[key] != repro_value:
1033 warnings.warn(f"Parameter mismatch for '{key}': Current value is {params[key]}, "
1034 f"repro value is {repro_value}.")
1035 else:
1036 warnings.warn(f"Parameter '{key}' is missing in the current params.")
1038 return
1040 # class RNNModel(ABC):
1041 # """
1042 # Abstract base class for RNN models, providing structure for training, predicting, and running reproducibility checks.
1043 # """
1044 # def __init__(self, params: dict):
1045 # """
1046 # Initializes the RNNModel with the given parameters.
1048 # Parameters:
1049 # -----------
1050 # params : dict
1051 # A dictionary containing model parameters.
1052 # """
1053 # self.params = params
1054 # if type(self) is RNNModel:
1055 # raise TypeError("MLModel is an abstract class and cannot be instantiated directly")
1056 # super().__init__()
1058 # @abstractmethod
1059 # def _build_model_train(self):
1060 # """Abstract method to build the training model."""
1061 # pass
1063 # @abstractmethod
1064 # def _build_model_predict(self, return_sequences=True):
1065 # """Abstract method to build the prediction model. This model copies weights from the train model but with input structure that allows for easier prediction of arbitrary length timeseries. This model is not to be used for training, or don't use with .fit calls"""
1066 # pass
1068 # def is_stateful(self):
1069 # """
1070 # Checks whether any of the layers in the internal model (self.model_train) are stateful.
1072 # Returns:
1073 # bool: True if at least one layer in the model is stateful, False otherwise.
1075 # This method iterates over all the layers in the model and checks if any of them
1076 # have the 'stateful' attribute set to True. This is useful for determining if
1077 # the model is designed to maintain state across batches during training.
1079 # Example:
1080 # --------
1081 # model.is_stateful()
1082 # """
1083 # for layer in self.model_train.layers:
1084 # if hasattr(layer, 'stateful') and layer.stateful:
1085 # return True
1086 # return False
1088 # def fit(self, X_train, y_train, plot_history=True, plot_title = '',
1089 # weights=None, callbacks=[], validation_data=None, return_epochs=False, *args, **kwargs):
1090 # """
1091 # Trains the model on the provided training data. Uses the fit method of the training model and then copies the weights over to the prediction model, which has a less restrictive input shape. Formats a list of callbacks to use within the fit method based on params input
1093 # Parameters:
1094 # -----------
1095 # X_train : np.ndarray
1096 # The input matrix data for training.
1097 # y_train : np.ndarray
1098 # The target vector data for training.
1099 # plot_history : bool, optional
1100 # If True, plots the training history. Default is True.
1101 # plot_title : str, optional
1102 # The title for the training plot. Default is an empty string.
1103 # weights : optional
1104 # Initial weights for the model. Default is None.
1105 # callbacks : list, optional
1106 # A list of callback functions to use during training. Default is an empty list.
1107 # validation_data : tuple, optional
1108 # Validation data to use during training, expected format (X_val, y_val). Default is None.
1109 # return_epochs : bool
1110 # If True, return the number of epochs that training took. Used to test and optimize early stopping
1111 # """
1112 # # Check Compatibility, assume features dimension is last in X_train array
1113 # if X_train.shape[-1] != self.params['n_features']:
1114 # raise ValueError(f"X_train and number of features from params not equal. \n X_train shape: {X_train.shape} \n params['n_features']: {self.params['n_features']}. \n Try recreating RNNData object with features list from params: `RNNData(..., features_list = parmas['features_list'])`")
1116 # # verbose_fit argument is for printing out update after each epoch, which gets very long
1117 # verbose_fit = self.params['verbose_fit']
1118 # verbose_weights = self.params['verbose_weights']
1119 # if verbose_weights:
1120 # print(f"Training simple RNN with params: {self.params}")
1122 # # Setup callbacks
1123 # if self.params["reset_states"]:
1124 # callbacks=callbacks+[ResetStatesCallback(self.params), TerminateOnNaN()]
1126 # # Early stopping callback requires validation data
1127 # if validation_data is not None:
1128 # X_val, y_val =validation_data[0], validation_data[1]
1129 # print("Using early stopping callback.")
1130 # early_stop = EarlyStoppingCallback(patience = self.params['early_stopping_patience'])
1131 # callbacks=callbacks+[early_stop]
1132 # if verbose_weights:
1133 # print(f"Formatted X_train hash: {hash_ndarray(X_train)}")
1134 # print(f"Formatted y_train hash: {hash_ndarray(y_train)}")
1135 # if validation_data is not None:
1136 # print(f"Formatted X_val hash: {hash_ndarray(X_val)}")
1137 # print(f"Formatted y_val hash: {hash_ndarray(y_val)}")
1138 # print(f"Initial weights before training hash: {hash_weights(self.model_train)}")
1140 # ## TODO: Hidden State Initialization
1141 # # Evaluate Model once to set nonzero initial state
1142 # # self.model_train(X_train[0:self.params['batch_size'],:,:])
1144 # if validation_data is not None:
1145 # history = self.model_train.fit(
1146 # X_train, y_train,
1147 # epochs=self.params['epochs'],
1148 # batch_size=self.params['batch_size'],
1149 # callbacks = callbacks,
1150 # verbose=verbose_fit,
1151 # validation_data = (X_val, y_val),
1152 # *args, **kwargs
1154 # else:
1155 # history = self.model_train.fit(
1156 # X_train, y_train,
1157 # epochs=self.params['epochs'],
1158 # batch_size=self.params['batch_size'],
1159 # callbacks = callbacks,
1160 # verbose=verbose_fit,
1161 # *args, **kwargs
1164 # if plot_history:
1165 # self.plot_history(history,plot_title)
1167 # if self.params["verbose_weights"]:
1168 # print(f"Fitted Weights Hash: {hash_weights(self.model_train)}")
1170 # # Update Weights for Prediction Model
1171 # w_fitted = self.model_train.get_weights()
1172 # self.model_predict.set_weights(w_fitted)
1174 # if return_epochs:
1175 # # Epoch counting starts at 0, adding 1 for the count
1176 # return early_stop.best_epoch + 1
1178 # def predict(self, X_test):
1179 # """
1180 # Generates predictions on the provided test data using the internal prediction model.
1182 # Parameters:
1183 # -----------
1184 # X_test : np.ndarray
1185 # The input data for generating predictions.
1187 # Returns:
1188 # --------
1189 # np.ndarray
1190 # The predicted values.
1191 # """
1192 # print("Predicting test data")
1193 # # X_test = self._format_pred_data(X_test)
1194 # # preds = self.model_predict.predict(X_test).flatten()
1195 # preds = self.model_predict.predict(X_test)
1196 # return preds
1199 # def _format_pred_data(self, X):
1200 # """
1201 # Formats the prediction data for RNN input.
1203 # Parameters:
1204 # -----------
1205 # X : np.ndarray
1206 # The input data.
1208 # Returns:
1209 # --------
1210 # np.ndarray
1211 # The formatted input data.
1212 # """
1213 # return np.reshape(X,(1, X.shape[0], self.params['n_features']))
1215 # def plot_history(self, history, plot_title, create_figure=True):
1216 # """
1217 # Plots the training history. Uses log scale on y axis for readability.
1219 # Parameters:
1220 # -----------
1221 # history : History object
1222 # The training history object from model fitting. Output of keras' .fit command
1223 # plot_title : str
1224 # The title for the plot.
1225 # """
1227 # if create_figure:
1228 # plt.figure(figsize=(10, 6))
1229 # plt.semilogy(history.history['loss'], label='Training loss')
1230 # if 'val_loss' in history.history:
1231 # plt.semilogy(history.history['val_loss'], label='Validation loss')
1232 # plt.title(f'{plot_title} Model loss')
1233 # plt.ylabel('Loss')
1234 # plt.xlabel('Epoch')
1235 # plt.legend(loc='upper left')
1236 # plt.show()
1238 # def run_model(self, dict0, reproducibility_run=False, plot_period='all', save_outputs=True, return_epochs=False):
1239 # """
1240 # Runs the RNN model on input data dictionary, including training, prediction, and reproducibility checks.
1242 # Parameters:
1243 # -----------
1244 # dict0 : RNNData (dict)
1245 # The dictionary containing the input data and configuration.
1246 # reproducibility_run : bool, optional
1247 # If True, performs reproducibility checks after running the model. Default is False.
1248 # save_outputs : bool
1249 # If True, writes model outputs into input dictionary.
1250 # return_epochs : bool
1251 # If True, returns how many epochs of training happened. Used to optimize params related to early stopping
1253 # Returns:
1254 # --------
1255 # tuple
1256 # Model predictions and a dictionary of RMSE errors broken up by time period.
1257 # """
1258 # if dict0.X_test is None:
1259 # raise ValueError("No test data X_test in input dictionary. Provide test data or just run .fit")
1261 # verbose_fit = self.params['verbose_fit']
1262 # verbose_weights = self.params['verbose_weights']
1263 # if verbose_weights:
1264 # dict0.print_hashes()
1265 # # Extract Datasets
1266 # X_train, y_train, X_test, y_test = dict0.X_train, dict0.y_train, dict0.X_test, dict0.y_test
1267 # if 'X_val' in dict0:
1268 # X_val, y_val = dict0.X_val, dict0.y_val
1269 # else:
1270 # X_val = None
1271 # if dict0.spatial:
1272 # case_id = "Spatial Training Set"
1273 # else:
1274 # case_id = dict0.case
1276 # # Fit model
1277 # if X_val is None:
1278 # eps = self.fit(X_train, y_train, plot_title=case_id, return_epochs=return_epochs)
1279 # else:
1280 # eps = self.fit(X_train, y_train, validation_data = (X_val, y_val), plot_title=case_id, return_epochs=return_epochs)
1282 # # Generate Predictions and Evaluate Test Error
1283 # if dict0.spatial:
1284 # m, errs = self._eval_multi(dict0, reproducibility_run)
1285 # if save_outputs:
1286 # dict0['m']=m
1287 # else:
1288 # m, errs = self._eval_single(dict0, verbose_weights, reproducibility_run)
1289 # if save_outputs:
1290 # dict0['m']=m
1291 # plot_data(dict0, title="RNN", title2=dict0.case, plot_period=plot_period)
1293 # # print(f"Mean Test Error: {errs.mean()}")
1295 # if return_epochs:
1296 # return m, errs, eps
1297 # else:
1298 # return m, errs
1300 # def _eval_single(self, dict0, verbose_weights, reproducibility_run):
1301 # # Generate Predictions,
1302 # # run through training to get hidden state set properly for forecast period
1303 # print(f"Running prediction on all input data, Training through Test")
1304 # if dict0.scaler is not None:
1305 # X = dict0.scale_all_X()
1306 # else:
1307 # X = dict0.subset_X_features()
1308 # y = dict0.y.flatten()
1309 # # Predict
1310 # if verbose_weights:
1311 # print(f"All X hash: {hash_ndarray(X)}")
1312 # # Reshape X
1313 # X = X.reshape(1, X.shape[0], X.shape[1])
1314 # m = self.predict(X).flatten()
1315 # if verbose_weights:
1316 # print(f"Predictions Hash: {hash_ndarray(m)}")
1318 # if reproducibility_run:
1319 # print("Checking Reproducibility")
1320 # check_reproducibility(dict0, self.params, hash_ndarray(m), hash_weights(self.model_predict))
1322 # # print(dict0.keys())
1323 # # Plot final fit and data
1324 # # dict0['y'] = y
1325 # # plot_data(dict0, title="RNN", title2=dict0['case'], plot_period=plot_period)
1327 # # Calculate Errors
1328 # err = rmse(m, y)
1329 # train_ind = dict0.train_ind # index of final training set value
1330 # test_ind = dict0.test_ind # index of first test set value
1332 # err_train = rmse(m[:train_ind], y[:train_ind].flatten())
1333 # err_pred = rmse(m[test_ind:], y[test_ind:].flatten())
1334 # rmse_dict = {
1335 # 'all': err,
1336 # 'training': err_train,
1337 # 'prediction': err_pred
1339 # return m, rmse_dict
1341 # def _eval_multi(self, dict0, reproducibility_run):
1342 # # Train Error: NOT DOING YET. DECIDE WHETHER THIS IS NEEDED
1344 # # Test Error
1345 # # new_data = np.stack(dict0.X_test, axis=0)
1346 # # y_array = np.stack(dict0.y_test, axis=0)
1347 # # preds = self.model_predict.predict(new_data)
1348 # y_array = dict0.y_test
1349 # preds = self.model_predict.predict(dict0.X_test)
1351 # if reproducibility_run:
1352 # print("Checking Reproducibility")
1353 # check_reproducibility(dict0, self.params, hash_ndarray(preds), hash_weights(self.model_predict))
1355 # # Calculate RMSE
1356 # ## Note: not using util rmse function since this approach is for 3d arrays
1357 # # Compute the squared differences
1358 # squared_diff = np.square(preds - y_array)
1360 # # Mean squared error along the timesteps and dimensions (axis 1 and 2)
1361 # mse = np.mean(squared_diff, axis=(1, 2))
1363 # # Root mean squared error (RMSE) for each timeseries
1364 # rmses = np.sqrt(mse)
1366 # return preds, rmses
1369 ## Callbacks
1371 # Helper functions for batch reset schedules
1372 def calc_exp_intervals(bmin, bmax, n_epochs, force_bmax = True):
1373 # Calculate the exponential intervals for each epoch
1374 epochs = np.arange(n_epochs)
1375 factors = epochs / n_epochs
1376 intervals = bmin * (bmax / bmin) ** factors
1377 if force_bmax:
1378 intervals[-1] = bmax # Ensure the last value is exactly bmax
1379 return intervals.astype(int)
1381 def calc_log_intervals(bmin, bmax, n_epochs, force_bmax = True):
1382 # Calculate the logarithmic intervals for each epoch
1383 epochs = np.arange(n_epochs)
1384 factors = np.log(1 + epochs) / np.log(1 + n_epochs)
1385 intervals = bmin + (bmax - bmin) * factors
1386 if force_bmax:
1387 intervals[-1] = bmax # Ensure the last value is exactly bmax
1388 return intervals.astype(int)
1390 def calc_step_intervals(bmin, bmax, n_epochs, estep=None, force_bmax=True):
1391 # Set estep to 10% of the total number of epochs if not provided
1392 if estep is None:
1393 estep = int(0.1 * n_epochs)
1395 # Initialize intervals with bmin for the first part, then step up to bmax
1396 intervals = np.full(n_epochs, bmin)
1398 # Step up to bmax after 'estep' epochs
1399 intervals[estep:] = bmax
1401 # Force the last value to be exactly bmax if specified
1402 if force_bmax:
1403 intervals[-1] = bmax
1405 return intervals.astype(int)
1407 class ResetStatesCallback(Callback):
1409 Custom callback to reset the states of RNN layers at the end of each epoch and optionally after a specified number of batches.
1411 Parameters:
1412 -----------
1413 batch_reset : int, optional
1414 If provided, resets the states of RNN layers after every `batch_reset` batches. Default is None.
1415 """
1416 # def __init__(self, bmin=None, bmax=None, epochs=None, loc_batch_reset = None, batch_schedule_type='linear', verbose=True):
1417 def __init__(self, params=None, verbose=True):
1419 Initializes the ResetStatesCallback with an optional batch reset interval.
1421 Parameters:
1422 -----------
1423 params: dict, optional
1424 Dictionary of parameters. If None provided, only on_epoch_end will trigger reset of hidden states.
1425 - bmin : int
1426 Minimum for batch reset schedule
1427 - bmax : int
1428 Maximum for batch reset schedule
1429 - epochs : int
1430 Number of training epochs.
1431 - loc_batch_reset : int
1432 Interval of batches after which to reset the states of RNN layers for location changes. Triggers reset for training AND validation phases
1433 - batch_schedule_type : str
1434 Type of batch scheduling to be used. Recognized methods are following:
1435 - 'constant' : Used fixed batch reset interval throughout training
1436 - 'linear' : Increases the batch reset interval linearly over epochs from bmin to bmax.
1437 - 'exp' : Increases the batch reset interval exponentially over epochs from bmin to bmax.
1438 - 'log' : Increases the batch reset interval logarithmically over epochs from bmin to bmax.
1439 - 'step' : Increases the batch reset interval from constant at bmin to constant at bmax after estep number o epochs
1442 Returns:
1443 -----------
1444 Only in-place reset of hidden states of RNN that calls uses this callback.
1446 """
1447 super(ResetStatesCallback, self).__init__()
1449 # Check for optional arguments, set None if missing in input params
1450 arg_list = ['bmin', 'bmax', 'epochs', 'estep', 'loc_batch_reset', 'batch_schedule_type']
1451 for arg in arg_list:
1452 setattr(self, arg, params.get(arg, None))
1454 self.verbose = verbose
1455 if self.verbose:
1456 print(f"Using ResetStatesCallback with Batch Reset Schedule: {self.batch_schedule_type}")
1457 # Calculate the reset intervals for each epoch during initialization
1458 if self.batch_schedule_type is not None:
1459 if self.epochs is None:
1460 raise ValueError(f"Arugment `epochs` cannot be none with self.batch_schedule_type: {self.batch_schedule_type}")
1461 self.batch_reset_intervals = self._calc_reset_intervals(self.batch_schedule_type)
1462 if self.verbose:
1463 print(f"batch_reset_intervals: {self.batch_reset_intervals}")
1464 else:
1465 self.batch_reset_intervals = None
1466 def on_epoch_end(self, epoch, logs=None):
1468 Resets the states of RNN layers at the end of each epoch.
1470 Parameters:
1471 -----------
1472 epoch : int
1473 The index of the current epoch.
1474 logs : dict, optional
1475 A dictionary containing metrics from the epoch. Default is None.
1476 """
1477 # print(f" Resetting hidden state after epoch: {epoch+1}", flush=True)
1478 # Iterate over each layer in the model
1479 for layer in self.model.layers:
1480 # Check if the layer has a reset_states method
1481 if hasattr(layer, 'reset_states'):
1482 layer.reset_states()
1483 def _calc_reset_intervals(self,batch_schedule_type):
1484 methods = ['constant', 'linear', 'exp', 'log', 'step']
1485 if batch_schedule_type not in methods:
1486 raise ValueError(f"Batch schedule method {batch_schedule_type} not recognized. \n Available methods: {methods}")
1487 if batch_schedule_type == "constant":
1489 return np.repeat(self.bmin, self.epochs).astype(int)
1490 elif batch_schedule_type == "linear":
1491 return np.linspace(self.bmin, self.bmax, self.epochs).astype(int)
1492 elif batch_schedule_type == "exp":
1493 return calc_exp_intervals(self.bmin, self.bmax, self.epochs)
1494 elif batch_schedule_type == "log":
1495 return calc_log_intervals(self.bmin, self.bmax, self.epochs)
1496 elif batch_schedule_type == "step":
1497 return calc_step_intervals(self.bmin, self.bmax, self.epochs, self.estep)
1498 def on_epoch_begin(self, epoch, logs=None):
1499 # Set the reset interval for the current epoch
1500 if self.batch_reset_intervals is not None:
1501 self.current_batch_reset = self.batch_reset_intervals[epoch]
1502 else:
1503 self.current_batch_reset = None
1504 def on_train_batch_end(self, batch, logs=None):
1506 Resets the states of RNN layers during training after a specified number of batches, if `batch_reset` or `loc_batch_reset` are provided. The `batch_reset` is used for stability and to avoid exploding gradients at the beginning of training when a hidden state is being passed with weights that haven't learned yet. The `loc_batch_reset` is used to reset the states when a particular batch is from a new location and thus the hidden state should be passed.
1508 Parameters:
1509 -----------
1510 batch : int
1511 The index of the current batch.
1512 logs : dict, optional
1513 A dictionary containing metrics from the batch. Default is None.
1514 """
1515 batch_reset = self.current_batch_reset
1516 if (batch_reset is not None and batch % batch_reset == 0):
1517 # print(f" Resetting states after batch {batch + 1}")
1518 # Iterate over each layer in the model
1519 for layer in self.model.layers:
1520 # Check if the layer has a reset_states method
1521 if hasattr(layer, 'reset_states'):
1522 layer.reset_states()
1523 def on_test_batch_end(self, batch, logs=None):
1525 Resets the states of RNN layers during validation if `loc_batch_reset` is provided to demarcate a new location and thus avoid passing a hidden state to a wrong location.
1527 Parameters:
1528 -----------
1529 batch : int
1530 The index of the current batch.
1531 logs : dict, optional
1532 A dictionary containing metrics from the batch. Default is None.
1533 """
1534 loc_batch_reset = self.loc_batch_reset
1535 if (loc_batch_reset is not None and batch % loc_batch_reset == 0):
1536 # print(f"Resetting states in Validation mode after batch {batch + 1}")
1537 # Iterate over each layer in the model
1538 for layer in self.model.layers:
1539 # Check if the layer has a reset_states method
1540 if hasattr(layer, 'reset_states'):
1541 layer.reset_states()
1543 ## Learning Schedules
1544 ## NOT TESTED YET
1545 lr_schedule = tf.keras.optimizers.schedules.CosineDecay(
1546 initial_learning_rate=0.01,
1547 decay_steps=200,
1548 alpha=0.0,
1549 name='CosineDecay',
1550 # warmup_target=None,
1551 # warmup_steps=100
1555 def EarlyStoppingCallback(patience=5):
1557 Creates an EarlyStopping callback with the specified patience.
1559 Args:
1560 patience (int): Number of epochs with no improvement after which training will be stopped.
1562 Returns:
1563 EarlyStopping: Configured EarlyStopping callback.
1565 return EarlyStopping(
1566 monitor='val_loss',
1567 patience=patience,
1568 verbose=1,
1569 mode='min',
1570 restore_best_weights=True
1573 phys_params = {
1574 'DeltaE': [0,-1], # bias correction
1575 'T1': 0.1, # 1/fuel class (10)
1576 'fm_raise_vs_rain': 0.2 # fm increase per mm rain
1581 def get_initial_weights(model_fit,params,scale_fm=1):
1582 # Given a RNN architecture and hyperparameter dictionary, return array of physics-initiated weights
1583 # Inputs:
1584 # model_fit: output of create_RNN_2 with no training
1585 # params: (dict) dictionary of hyperparameters
1586 # rnn_dat: (dict) data dictionary, output of create_rnn_dat
1587 # Returns: numpy ndarray of weights that should be a rough solution to the moisture ODE
1588 DeltaE = phys_params['DeltaE']
1589 T1 = phys_params['T1']
1590 fmr = phys_params['fm_raise_vs_rain']
1591 centering = [0] # shift activation down
1593 w0_initial={'Ed':(1.-np.exp(-T1))/2,
1594 'Ew':(1.-np.exp(-T1))/2,
1595 'rain':fmr * scale_fm} # wx - input feature
1596 # wh wb wd bd = bias -1
1598 w_initial=np.array([np.nan, np.exp(-0.1), DeltaE[0]/scale_fm, # layer 0
1599 1.0, -centering[0] + DeltaE[1]/scale_fm]) # layer 1
1600 if params['verbose_weights']:
1601 print('Equilibrium moisture correction bias',DeltaE[0],
1602 'in the hidden layer and',DeltaE[1],' in the output layer')
1604 w_name = ['wx','wh','bh','wd','bd']
1606 w=model_fit.get_weights()
1607 for j in range(w[0].shape[0]):
1608 feature = params['features_list'][j]
1609 for k in range(w[0].shape[1]):
1610 w[0][j][k]=w0_initial[feature]
1612 for i in range(1,len(w)): # number of the weight
1613 for j in range(w[i].shape[0]): # number of the inputs
1614 if w[i].ndim==2:
1615 # initialize all entries of the weight matrix to the same number
1616 for k in range(w[i].shape[1]):
1617 w[i][j][k]=w_initial[i]/w[i].shape[0]
1618 elif w[i].ndim==1:
1619 w[i][j]=w_initial[i]
1620 else:
1621 print('weight',i,'shape',w[i].shape)
1622 raise ValueError("Only 1 or 2 dimensions supported")
1623 if params['verbose_weights']:
1624 print('weight',i,w_name[i],'shape',w[i].shape,'ndim',w[i].ndim,
1625 'initial: sum',np.sum(w[i],axis=0),'\nentries',w[i])
1627 return w, w_name
1629 # class RNN(RNNModel):
1630 # """
1631 # A concrete implementation of the RNNModel abstract base class, using simple recurrent cells for hidden recurrent layers.
1633 # Parameters:
1634 # -----------
1635 # params : dict
1636 # A dictionary of model parameters.
1637 # loss : str, optional
1638 # The loss function to use during model training. Default is 'mean_squared_error'.
1639 # """
1640 # def __init__(self, params, loss='mean_squared_error'):
1641 # """
1642 # Initializes the RNN model by building the training and prediction models.
1644 # Parameters:
1645 # -----------
1646 # params : dict or RNNParams
1647 # A dictionary containing the model's parameters.
1648 # loss : str, optional
1649 # The loss function to use during model training. Default is 'mean_squared_error'.
1650 # """
1651 # super().__init__(params)
1652 # self.model_train = self._build_model_train()
1653 # self.model_predict = self._build_model_predict()
1655 # def _build_model_train(self):
1656 # """
1657 # Builds and compiles the training model, with batch & sequence shape specifications for input.
1659 # Returns:
1660 # --------
1661 # model : tf.keras.Model
1662 # The compiled Keras model for training.
1663 # """
1664 # inputs = tf.keras.Input(batch_shape=self.params['batch_shape'])
1665 # x = inputs
1666 # for i in range(self.params['rnn_layers']):
1667 # # Return sequences True if recurrent layer feeds into another recurrent layer.
1668 # # False if feeds into dense layer
1669 # return_sequences = True if i < self.params['rnn_layers'] - 1 else False
1670 # x = SimpleRNN(
1671 # units=self.params['rnn_units'],
1672 # activation=self.params['activation'][0],
1673 # dropout=self.params["dropout"][0],
1674 # recurrent_dropout = self.params["recurrent_dropout"],
1675 # stateful=self.params['stateful'],
1676 # return_sequences=return_sequences)(x)
1677 # if self.params["dropout"][1] > 0:
1678 # x = Dropout(self.params["dropout"][1])(x)
1679 # for i in range(self.params['dense_layers']):
1680 # x = Dense(self.params['dense_units'], activation=self.params['activation'][1])(x)
1681 # # Add final output layer, must be 1 dense cell with linear activation if continuous scalar output
1682 # x = Dense(units=1, activation='linear')(x)
1683 # model = tf.keras.Model(inputs=inputs, outputs=x)
1684 # optimizer=tf.keras.optimizers.Adam(learning_rate=self.params['learning_rate'])
1685 # # optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule)
1686 # model.compile(loss='mean_squared_error', optimizer=optimizer)
1688 # if self.params["verbose_weights"]:
1689 # print(f"Initial Weights Hash: {hash_weights(model)}")
1690 # # print(model.get_weights())
1692 # if self.params['phys_initialize']:
1693 # assert self.params['scaler'] is None, f"Not implemented yet to do physics initialize with given data scaling {self.params['scaler']}"
1694 # assert self.params['features_list'] == ['Ed', 'Ew', 'rain'], f"Physics initiation can only be done with features ['Ed', 'Ew', 'rain'], but given features {self.params['features_list']}"
1695 # assert self.params['dense_layers'] == 0, f"Physics initiation requires no hidden dense layers, received params['dense_layers'] = {self.params['dense_layers']}"
1696 # print("Initializing Model with Physics based weights")
1697 # w, w_name=get_initial_weights(model, self.params)
1698 # model.set_weights(w)
1699 # print('initial weights hash =',hash_weights(model))
1700 # return model
1702 # def _build_model_predict(self, return_sequences=True):
1703 # """
1704 # Builds and compiles the prediction model, doesn't use batch shape nor sequence length to make it easier to predict arbitrary number of timesteps. This model has weights copied over from training model is not directly used for training itself.
1706 # Parameters:
1707 # -----------
1708 # return_sequences : bool, optional
1709 # Whether to return the full sequence of outputs. Default is True.
1711 # Returns:
1712 # --------
1713 # model : tf.keras.Model
1714 # The compiled Keras model for prediction.
1715 # """
1716 # inputs = tf.keras.Input(shape=(None,self.params['n_features']))
1717 # x = inputs
1718 # for i in range(self.params['rnn_layers']):
1719 # x = SimpleRNN(self.params['rnn_units'],activation=self.params['activation'][0],
1720 # stateful=False,return_sequences=return_sequences)(x)
1721 # for i in range(self.params['dense_layers']):
1722 # x = Dense(self.params['dense_units'], activation=self.params['activation'][1])(x)
1723 # # Add final output layer, must be 1 dense cell with linear activation if continuous scalar output
1724 # x = Dense(units=1, activation='linear')(x)
1725 # model = tf.keras.Model(inputs=inputs, outputs=x)
1726 # optimizer=tf.keras.optimizers.Adam(learning_rate=self.params['learning_rate'])
1727 # model.compile(loss='mean_squared_error', optimizer=optimizer)
1729 # # Set Weights to model_train
1730 # w_fitted = self.model_train.get_weights()
1731 # model.set_weights(w_fitted)
1733 # return model
1736 # class RNN_LSTM(RNNModel):
1737 # """
1738 # A concrete implementation of the RNNModel abstract base class, use LSTM cells for hidden recurrent layers.
1740 # Parameters:
1741 # -----------
1742 # params : dict
1743 # A dictionary of model parameters.
1744 # loss : str, optional
1745 # The loss function to use during model training. Default is 'mean_squared_error'.
1746 # """
1747 # def __init__(self, params, loss='mean_squared_error'):
1748 # """
1749 # Initializes the RNN model by building the training and prediction models.
1751 # Parameters:
1752 # -----------
1753 # params : dict or RNNParams
1754 # A dictionary containing the model's parameters.
1755 # loss : str, optional
1756 # The loss function to use during model training. Default is 'mean_squared_error'.
1757 # """
1758 # super().__init__(params)
1759 # self.model_train = self._build_model_train()
1760 # self.model_predict = self._build_model_predict()
1762 # def _build_model_train(self):
1763 # """
1764 # Builds and compiles the training model, with batch & sequence shape specifications for input.
1766 # Returns:
1767 # --------
1768 # model : tf.keras.Model
1769 # The compiled Keras model for training.
1770 # """
1771 # inputs = tf.keras.Input(batch_shape=self.params['batch_shape'])
1772 # x = inputs
1773 # for i in range(self.params['rnn_layers']):
1774 # return_sequences = True if i < self.params['rnn_layers'] - 1 else False
1775 # x = LSTM(
1776 # units=self.params['rnn_units'],
1777 # activation=self.params['activation'][0],
1778 # dropout=self.params["dropout"][0],
1779 # recurrent_dropout = self.params["recurrent_dropout"],
1780 # recurrent_activation=self.params["recurrent_activation"],
1781 # stateful=self.params['stateful'],
1782 # return_sequences=return_sequences)(x)
1783 # if self.params["dropout"][1] > 0:
1784 # x = Dropout(self.params["dropout"][1])(x)
1785 # for i in range(self.params['dense_layers']):
1786 # x = Dense(self.params['dense_units'], activation=self.params['activation'][1])(x)
1787 # x = Dense(units=1, activation="linear")(x)
1788 # model = tf.keras.Model(inputs=inputs, outputs=x)
1789 # # optimizer=tf.keras.optimizers.Adam(learning_rate=self.params['learning_rate'], clipvalue=self.params['clipvalue'])
1790 # optimizer=tf.keras.optimizers.Adam(learning_rate=self.params['learning_rate'])
1791 # model.compile(loss='mean_squared_error', optimizer=optimizer)
1793 # if self.params["verbose_weights"]:
1794 # print(f"Initial Weights Hash: {hash_weights(model)}")
1795 # return model
1796 # def _build_model_predict(self, return_sequences=True):
1797 # """
1798 # Builds and compiles the prediction model, doesn't use batch shape nor sequence length to make it easier to predict arbitrary number of timesteps. This model has weights copied over from training model is not directly used for training itself.
1800 # Parameters:
1801 # -----------
1802 # return_sequences : bool, optional
1803 # Whether to return the full sequence of outputs. Default is True.
1805 # Returns:
1806 # --------
1807 # model : tf.keras.Model
1808 # The compiled Keras model for prediction.
1809 # """
1810 # inputs = tf.keras.Input(shape=(None,self.params['n_features']))
1811 # x = inputs
1812 # for i in range(self.params['rnn_layers']):
1813 # x = LSTM(
1814 # units=self.params['rnn_units'],
1815 # activation=self.params['activation'][0],
1816 # stateful=False,return_sequences=return_sequences)(x)
1817 # for i in range(self.params['dense_layers']):
1818 # x = Dense(self.params['dense_units'], activation=self.params['activation'][1])(x)
1819 # x = Dense(units=1, activation="linear")(x)
1820 # model = tf.keras.Model(inputs=inputs, outputs=x)
1821 # optimizer=tf.keras.optimizers.Adam(learning_rate=self.params['learning_rate'])
1822 # model.compile(loss='mean_squared_error', optimizer=optimizer)
1824 # # Set Weights to model_train
1825 # w_fitted = self.model_train.get_weights()
1826 # model.set_weights(w_fitted)
1828 # return model
1830 # Wrapper Functions putting everything together.
1831 # Useful for deploying from command line
1832 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1834 def rnn_data_wrap(dict0, params, features_subset=None):
1835 dict1 = dict0.copy()
1837 rnn_dat = RNNData(
1838 dict1, # input dictionary
1839 scaler=params['scaler'], # data scaling type
1840 features_list = params['features_list'] # features for predicting outcome
1844 rnn_dat.train_test_split(
1845 time_fracs = params['time_fracs'], # Percent of total time steps used for train/val/test
1846 space_fracs = params['space_fracs'] # Percent of total timeseries used for train/val/test
1848 if rnn_dat.scaler is not None:
1849 rnn_dat.scale_data()
1851 rnn_dat.batch_reshape(
1852 timesteps = params['timesteps'], # Timesteps aka sequence length for RNN input data.
1853 batch_size = params['batch_size'], # Number of samples of length timesteps for a single round of grad. descent
1854 start_times = np.zeros(len(rnn_dat.loc['train_locs']))
1857 return rnn_dat
1859 def forecast(model, data, spinup_hours = 0):
1860 return
1863 # v2.3 method
1864 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1866 class RNN():
1868 TEST
1870 def __init__(self, params: dict):
1872 Initializes the RNNModel with the given parameters.
1874 Parameters:
1875 -----------
1876 params : dict
1877 A dictionary containing model parameters.
1879 self.params = params
1880 # Build model architectures based on input params
1881 self.model_train = self._build_model_train()
1882 self.model_predict = self._build_model_predict()
1883 # Compile Models
1884 optimizer=tf.keras.optimizers.Adam(learning_rate=self.params['learning_rate'])
1885 self.model_train.compile(loss='mean_squared_error', optimizer=optimizer)
1886 self.model_predict.compile(loss='mean_squared_error', optimizer=optimizer)
1888 def _build_hidden_layers(self, x, stateful=True, return_sequences=True):
1889 params = self.params
1890 last_recurrent = None
1892 # Identify the last RNN/LSTM layer, unless an Attention layer follows it
1893 for i, layer_type in enumerate(params['hidden_layers']):
1894 if layer_type in ['rnn', 'lstm']:
1895 # Check if there's an Attention layer following the current RNN/LSTM layer
1896 if i < len(params['hidden_layers']) - 1 and params['hidden_layers'][i + 1] == 'attention':
1897 continue
1898 last_recurrent = i
1900 # Loop over each layer specified in 'hidden_layers'
1901 for i, layer_type in enumerate(params['hidden_layers']):
1902 units = params['hidden_units'][i]
1903 activation = params['hidden_activation'][i]
1905 if layer_type == 'dense':
1906 x = layers.Dense(units=units, activation=activation)(x)
1908 elif layer_type == 'dropout':
1909 x = layers.Dropout(params['dropout'])(x)
1911 elif layer_type == 'rnn':
1913 print()
1915 is_last_recurrent = (i == last_recurrent)
1916 return_seqs_logic = not is_last_recurrent or return_sequences
1917 x = layers.SimpleRNN(units=units, activation=activation, dropout=params['dropout'], recurrent_dropout=params['recurrent_dropout'], stateful=stateful,
1918 return_sequences=return_seqs_logic)(x)
1920 elif layer_type == 'lstm':
1921 is_last_recurrent = (i == last_recurrent)
1922 return_seqs_logic = not is_last_recurrent or return_sequences
1923 x = layers.LSTM(units=units, activation=activation, dropout=params['dropout'], recurrent_dropout=params['recurrent_dropout'], stateful=stateful,
1924 return_sequences=return_seqs_logic)(x)
1926 elif layer_type == 'attention':
1927 # Self-attention mechanism
1928 x = layers.Attention()([x, x])
1929 elif layer_type == 'conv1d':
1930 kernel_size = params.get('kernel_size', 3) # Check for kernel size, use 3 if missing
1931 x = layers.Conv1D(filters=units, kernel_size=kernel_size, activation=activation, padding='same')(x)
1932 else:
1933 raise ValueError(f"Unrecognized layer type: {layer_type}, skipping")
1935 return x
1937 def _build_model_train(self):
1938 params = self.params
1940 # Define the input layer with the specified batch size, timesteps, and features
1941 inputs = tf.keras.Input(batch_shape=(params['batch_size'], params['timesteps'], params['n_features']))
1942 x = inputs
1943 # Build hidden layers
1944 x = self._build_hidden_layers(x, stateful = params['stateful'], return_sequences = params['return_sequences'])
1946 # Add the output layer
1947 if params['output_layer'] == 'dense':
1948 outputs = layers.Dense(units=params['output_dimension'], activation=params['output_activation'])(x)
1949 else:
1950 raise ValueError("Unsupported output layer type: {}".format(params['output_layer']))
1952 # Create the model
1953 model = models.Model(inputs=inputs, outputs=outputs)
1955 if self.params["verbose_weights"]:
1956 print(f"Initial Weights Hash: {hash_weights(model)}")
1958 return model
1960 def _build_model_predict(self, return_sequences=True):
1961 params = self.params
1963 # Define the input layer with flexible batch size and sequence length
1964 inputs = tf.keras.Input(shape=(None, params['n_features']))
1965 x = inputs
1966 # Build hidden layers
1967 x = self._build_hidden_layers(x, stateful=False, return_sequences = True)
1969 # Add the output layer
1970 if params['output_layer'] == 'dense':
1971 outputs = layers.Dense(units=params['output_dimension'], activation=params['output_activation'])(x)
1972 else:
1973 raise ValueError("Unsupported output layer type: {}".format(params['output_layer']))
1975 # Create the prediction model
1976 model = models.Model(inputs=inputs, outputs=outputs)
1977 return model
1979 def _setup_callbacks(self, val=False):
1981 callbacks = [TerminateOnNaN()]
1983 if self.params["reset_states"]:
1984 print("Using ResetStatesCallback.")
1985 callbacks=callbacks+[ResetStatesCallback(self.params)]
1987 if val:
1988 print("Using EarlyStoppingCallback")
1989 early_stop = EarlyStoppingCallback(patience = self.params['early_stopping_patience'])
1990 callbacks=callbacks+[early_stop]
1991 else:
1992 early_stop = None
1994 return callbacks, early_stop
1996 def is_stateful(self):
1998 Checks whether any of the layers in the internal model (self.model_train) are stateful.
2000 Returns:
2001 bool: True if at least one layer in the model is stateful, False otherwise.
2003 This method iterates over all the layers in the model and checks if any of them
2004 have the 'stateful' attribute set to True. This is useful for determining if
2005 the model is designed to maintain state across batches during training.
2007 Example:
2008 --------
2009 model.is_stateful()
2010 """
2011 for layer in self.model_train.layers:
2012 if hasattr(layer, 'stateful') and layer.stateful:
2013 return True
2014 return False
2016 def plot_history(self, history, plot_title, create_figure=True):
2018 Plots the training history. Uses log scale on y axis for readability.
2020 Parameters:
2021 -----------
2022 history : History object
2023 The training history object from model fitting. Output of keras' .fit command
2024 plot_title : str
2025 The title for the plot.
2028 if create_figure:
2029 plt.figure(figsize=(10, 6))
2030 plt.semilogy(history.history['loss'], label='Training loss')
2031 if 'val_loss' in history.history:
2032 plt.semilogy(history.history['val_loss'], label='Validation loss')
2033 plt.title(f'{plot_title} Model loss')
2034 plt.ylabel('Loss')
2035 plt.xlabel('Epoch')
2036 plt.legend(loc='upper left')
2037 plt.show()
2040 def fit(self, X_train, y_train, verbose_fit = False, verbose_weights=False,
2041 plot_history=True, plot_title = '',
2042 weights=None, callbacks=[], validation_data=None, return_epochs=False, *args, **kwargs):
2044 Trains the model on the provided training data. Uses the fit method of the training model and then copies the weights over to the prediction model, which has a less restrictive input shape. Formats a list of callbacks to use within the fit method based on params input
2046 Parameters:
2047 -----------
2048 X_train : np.ndarray
2049 The input matrix data for training.
2050 y_train : np.ndarray
2051 The target vector data for training.
2052 plot_history : bool, optional
2053 If True, plots the training history. Default is True.
2054 plot_title : str, optional
2055 The title for the training plot. Default is an empty string.
2056 weights : optional
2057 Initial weights for the model. Default is None.
2058 callbacks : list, optional
2059 A list of callback functions to use during training. Default is an empty list.
2060 validation_data : tuple, optional
2061 Validation data to use during training, expected format (X_val, y_val). Default is None.
2062 return_epochs : bool
2063 If True, return the number of epochs that training took. Used to test and optimize early stopping
2064 """
2065 if verbose_weights:
2066 print(f"Training simple RNN with params: {self.params}")
2068 # Setup callbacks
2069 # if self.params["reset_states"]:
2070 # callbacks=callbacks+[ResetStatesCallback(self.params), TerminateOnNaN()]
2072 # Check if validation data exists to modify callbacks
2073 val = validation_data is not None
2074 callbacks, early_stop = self._setup_callbacks(val)
2076 # if validation_data is not None:
2077 history = self.model_train.fit(
2078 X_train, y_train,
2079 epochs=self.params['epochs'],
2080 batch_size=self.params['batch_size'],
2081 callbacks = callbacks,
2082 verbose=verbose_fit,
2083 validation_data = validation_data,
2084 *args, **kwargs
2087 if plot_history:
2088 self.plot_history(history,plot_title)
2090 if verbose_weights:
2091 print(f"Fitted Weights Hash: {hash_weights(self.model_train)}")
2093 # Update Weights for Prediction Model
2094 w_fitted = self.model_train.get_weights()
2095 self.model_predict.set_weights(w_fitted)
2097 if return_epochs:
2098 # Epoch counting starts at 0, adding 1 for the count
2099 return early_stop.best_epoch + 1
2101 def predict(self, X_test, verbose=True):
2102 if verbose:
2103 print("Predicting test data")
2104 preds = self.model_predict.predict(X_test)
2106 return preds
2108 def run_model(self, data, reproducibility_run=False, plot_period='all', return_epochs=False):
2110 # Set up print statements with verbose args
2111 verbose_fit = self.params['verbose_fit']
2112 verbose_weights = self.params['verbose_weights']
2113 if verbose_weights:
2114 data.print_hashes()
2116 # Set up name for run, used for plotting
2117 case_id = "Spatial Training Set" if data.spatial else data.id
2118 print(f"Running {case_id}")
2120 # Extract Datasets
2121 X_train, y_train, X_test, y_test = data.X_train, data.y_train, data.X_test, data.y_test
2122 X_val, y_val = data.get('X_val', None), data.get('y_val', None)
2123 validation_data = (X_val, y_val) if X_val is not None and y_val is not None else None
2125 # Fit model, assign epochs to object, will just asign None if return_epochs is false
2126 # NOTE: when using early stopping, number of epochs much be extracted here at the fit call
2127 eps = self.fit(X_train, y_train, validation_data=validation_data, plot_title=case_id, return_epochs=return_epochs, verbose_fit=verbose_fit, verbose_weights=verbose_weights)
2129 # Generate Predictions
2130 m = self.predict(X_test)
2131 errs = rmse_3d(m, y_test)
2133 if reproducibility_run:
2134 print("Checking Reproducibility")
2135 check_reproducibility(data, self.params, hash_ndarray(m), hash_weights(self.model_predict))
2137 if return_epochs:
2138 return m, errs, eps
2139 else:
2140 return m, errs
2145 class RNNParams(dict):
2147 A custom dictionary class for handling RNN parameters. Automatically calculates certain params based on others. Overwrites the update method to protect from incompatible parameter choices. Inherits from dict
2148 """
2149 def __init__(self, input_dict):
2151 Initializes the RNNParams instance and runs checks and shape calculations.
2153 Parameters:
2154 -----------
2155 input_dict : dict,
2156 A dictionary containing RNN parameters.
2158 super().__init__(input_dict)
2159 # Automatically run checks on initialization
2160 self._run_checks()
2161 # Automatically calculate shapes on initialization
2162 self.calc_param_shapes()
2163 def _run_checks(self, verbose=True):
2165 Validates that required keys exist and are of the correct type.
2167 Parameters:
2168 -----------
2169 verbose : bool, optional
2170 If True, prints status messages. Default is True.
2172 if verbose:
2173 print("Checking params...")
2175 ### Check required keys and data types
2176 # Keys must exist and be integers
2177 int_keys = ['batch_size', 'timesteps', 'epochs', 'output_dimension']
2178 # Keys must exist and be lists
2179 list_keys = ['hidden_layers', 'hidden_units', 'hidden_activation', 'time_fracs', 'features_list']
2180 # Keys must exist and be floats
2181 float_keys = ['learning_rate']
2182 # Keys must exist and be strings
2183 str_keys = ['output_layer', 'output_activation']
2185 for key in int_keys:
2186 assert key in self, f"Missing required key: {key}"
2187 assert isinstance(self[key], int), f"Key '{key}' must be an integer"
2189 for key in list_keys:
2190 assert key in self, f"Missing required key: {key}"
2191 assert isinstance(self[key], list), f"Key '{key}' must be a list"
2193 for key in float_keys:
2194 assert key in self, f"Missing required key: {key}"
2195 assert isinstance(self[key], float), f"Key '{key}' must be a float"
2197 for key in str_keys:
2198 assert key in self, f"Missing required key: {key}"
2199 assert isinstance(self[key], str), f"Key '{key}' must be a float"
2201 ### Check Hidden layer lists
2202 hidden_keys = ['hidden_layers', 'hidden_units', 'hidden_activation']
2203 # Check same lengths and proper data types
2204 lengths = [len(self[key]) for key in hidden_keys]
2205 assert all(length == lengths[0] for length in lengths), "Keys 'hidden_layers', 'hidden_units', 'hidden_activation' must have the same length"
2206 # Check that 'hidden_units' must be integers and 'hidden_activation' must be strings,
2207 # unless 'hidden_layers' is set to 'attention', in which case both should be None.
2208 for i, layer in enumerate(self['hidden_layers']):
2209 if layer == 'attention' or layer == 'dropout':
2210 assert self['hidden_units'][i] is None, f"hidden_units[{i}] must be None for 'attention' layer"
2211 assert self['hidden_activation'][i] is None, f"hidden_activation[{i}] must be None for 'attention' layer"
2212 else:
2213 assert isinstance(self['hidden_units'][i], int), f"hidden_units[{i}] must be an integer for non-'attention' layers"
2214 assert isinstance(self['hidden_activation'][i], str), f"hidden_activation[{i}] must be a string for non-'attention' layers"
2216 if verbose:
2217 print("Input dictionary passed all checks.")
2219 def calc_param_shapes(self, verbose=True):
2221 Calculates and updates the shapes of certain parameters based on input data.
2223 Parameters:
2224 -----------
2225 verbose : bool, optional
2226 If True, prints status messages. Default is True.
2228 if verbose:
2229 print("Calculating shape params based on features list, timesteps, and batch size")
2230 print(f"Input Feature List: {self['features_list']}")
2231 print(f"Input Timesteps: {self['timesteps']}")
2232 print(f"Input Batch Size: {self['batch_size']}")
2234 n_features = len(self['features_list'])
2235 batch_shape = (self["batch_size"], self["timesteps"], n_features)
2236 if verbose:
2237 print("Calculated params:")
2238 print(f"Number of features: {n_features}")
2239 print(f"Batch Shape: {batch_shape}")
2241 # Update the dictionary
2242 super().update({
2243 'n_features': n_features,
2244 'batch_shape': batch_shape
2246 if verbose:
2247 print(self)
2249 def update(self, *args, verbose=True, **kwargs):
2251 Overwrites the standard update functon from dict. This is to prevent certain keys from being modified directly and to automatically update keys to be compatible with each other. The keys handled relate to the shape of the input data to the RNN.
2253 Parameters:
2254 -----------
2255 verbose : bool, optional
2256 If True, prints status messages. Default is True.
2258 # Prevent updating n_features and batch_shape
2259 restricted_keys = {'n_features', 'batch_shape'}
2260 keys_to_check = {'features_list', 'timesteps', 'batch_size'}
2262 # Check for restricted keys in args
2263 if args:
2264 if isinstance(args[0], dict):
2265 if restricted_keys & args[0].keys():
2266 raise KeyError(f"Cannot directly update keys: {restricted_keys & args[0].keys()}, \n Instead update one of: {keys_to_check}")
2267 elif isinstance(args[0], (tuple, list)) and all(isinstance(i, tuple) and len(i) == 2 for i in args[0]):
2268 if restricted_keys & {k for k, v in args[0]}:
2269 raise KeyError(f"Cannot directly update keys: {restricted_keys & {k for k, v in args[0]}}, \n Instead update one of: {keys_to_check}")
2271 # Check for restricted keys in kwargs
2272 if restricted_keys & kwargs.keys():
2273 raise KeyError(f"Cannot update restricted keys: {restricted_keys & kwargs.keys()}")
2276 # Track if specific keys are updated
2277 keys_updated = set()
2279 # Update using the standard dict update method
2280 if args:
2281 if isinstance(args[0], dict):
2282 keys_updated.update(args[0].keys() & keys_to_check)
2283 elif isinstance(args[0], (tuple, list)) and all(isinstance(i, tuple) and len(i) == 2 for i in args[0]):
2284 keys_updated.update(k for k, v in args[0] if k in keys_to_check)
2286 if kwargs:
2287 keys_updated.update(kwargs.keys() & keys_to_check)
2289 # Call the parent update method
2290 super().update(*args, **kwargs)
2292 # Recalculate shapes if necessary
2293 if keys_updated:
2294 self.calc_param_shapes(verbose=verbose)
2296 # Run checks again to make sure nothing changed to incompatible
2297 self._run_checks(verbose=False)