Update moisture_rnn.py
[notebooks.git] / fmda / moisture_rnn.py
bloba2c7c1cff112344e637d736b9d869c07f983bca6
1 # v2 training and prediction class infrastructure
3 # Environment
4 import random
5 import numpy as np
6 import pandas as pd
7 import tensorflow as tf
8 import matplotlib.pyplot as plt
9 import sys
10 from tensorflow.keras.callbacks import Callback, EarlyStopping, TerminateOnNaN
11 # from sklearn.metrics import mean_squared_error
12 import logging
13 from tensorflow.keras.layers import LSTM, SimpleRNN, Input, Dropout, Dense
14 # Local modules
15 import reproducibility
16 # from utils import print_dict_summary
17 from abc import ABC, abstractmethod
18 from utils import hash2, all_items_exist, hash_ndarray, hash_weights, Dict
19 from data_funcs import rmse, plot_data, compare_dicts
20 import copy
21 # import yaml
22 from sklearn.preprocessing import MinMaxScaler, StandardScaler
23 import warnings
25 #*************************************************************************************
26 # Data Formatting Functions
28 def staircase(x,y,timesteps,datapoints,return_sequences=False, verbose = False):
29 # x [datapoints,features] all inputs
30 # y [datapoints,outputs]
31 # timesteps: split x and y into samples length timesteps, shifted by 1
32 # datapoints: number of timesteps to use for training, no more than y.shape[0]
33 if verbose:
34 print('staircase: shape x = ',x.shape)
35 print('staircase: shape y = ',y.shape)
36 print('staircase: timesteps=',timesteps)
37 print('staircase: datapoints=',datapoints)
38 print('staircase: return_sequences=',return_sequences)
39 outputs = y.shape[1]
40 features = x.shape[1]
41 samples = datapoints-timesteps+1
42 if verbose:
43 print('staircase: samples=',samples,'timesteps=',timesteps,'features=',features)
44 x_train = np.empty([samples, timesteps, features])
45 if return_sequences:
46 if verbose:
47 print('returning all timesteps in a sample')
48 y_train = np.empty([samples, timesteps, outputs]) # all
49 for i in range(samples):
50 for k in range(timesteps):
51 x_train[i,k,:] = x[i+k,:]
52 y_train[i,k,:] = y[i+k,:]
53 else:
54 if verbose:
55 print('returning only the last timestep in a sample')
56 y_train = np.empty([samples, outputs])
57 for i in range(samples):
58 for k in range(timesteps):
59 x_train[i,k,:] = x[i+k,:]
60 y_train[i,:] = y[i+timesteps-1,:]
62 return x_train, y_train
64 def staircase_2(x,y,timesteps,batch_size=None,trainsteps=np.inf,return_sequences=False, verbose = False):
65 # create RNN training data in multiple batches
66 # input:
67 # x (,features)
68 # y (,outputs)
69 # timesteps: split x and y into sequences length timesteps
70 # a.k.a. lookback or sequence_length
72 # print params if verbose
74 if batch_size is None:
75 raise ValueError('staircase_2 requires batch_size')
76 if verbose:
77 print('staircase_2: shape x = ',x.shape)
78 print('staircase_2: shape y = ',y.shape)
79 print('staircase_2: timesteps=',timesteps)
80 print('staircase_2: batch_size=',batch_size)
81 print('staircase_2: return_sequences=',return_sequences)
83 nx,features= x.shape
84 ny,outputs = y.shape
85 datapoints = min(nx,ny,trainsteps)
86 if verbose:
87 print('staircase_2: datapoints=',datapoints)
89 # sequence j in a given batch is assumed to be the continuation of sequence j in the previous batch
90 # https://www.tensorflow.org/guide/keras/working_with_rnns Cross-batch statefulness
92 # example with timesteps=3 batch_size=3 datapoints=15
93 # batch 0: [0 1 2] [1 2 3] [2 3 4]
94 # batch 1: [3 4 5] [4 5 6] [5 6 7]
95 # batch 2: [6 7 8] [7 8 9] [8 9 10]
96 # batch 3: [9 10 11] [10 11 12] [11 12 13]
97 # batch 4: [12 13 14] [13 14 15] when runs out this is the last batch, can be shorter
99 # TODO: implement for multiple locations, same starting time for each batch
100 # Loc 1 Loc 2 Loc 3
101 # batch 0: [0 1 2] [0 1 2] [0 1 2]
102 # batch 1: [3 4 5] [3 4 5] [3 4 5]
103 # batch 2: [6 7 8] [6 7 8] [6 7 8]
104 # TODO: second epoch shift starting time at batch 0 in time
106 # TODO: implement for multiple locations, different starting times for each batch
107 # Loc 1 Loc 2 Loc 3
108 # batch 0: [0 1 2] [1 2 3] [2 3 4]
109 # batch 1: [3 4 5] [4 5 6] [5 6 57
110 # batch 2: [6 7 8] [7 8 9] [8 9 10]
113 # the first sample in batch j starts from timesteps*j and ends with timesteps*(j+1)-1
114 # e.g. the final hidden state of the rnn after the sequence of steps [0 1 2] in batch 0
115 # becomes the starting hidden state of the rnn in the sequence of steps [3 4 5] in batch 1, etc.
117 # sample [0 1 2] means the rnn is used twice to map state 0 -> 1 -> 2
118 # the state at time 0 is fixed but the state is considered a variable at times 1 and 2
119 # the loss is computed from the output at time 2 and the gradient of the loss function by chain rule which ends at time 0 because the state there is a constant -> derivative is zero
120 # sample [3 4 5] means the rnn is used twice to map state 3 -> 4 -> 5 # the state at time 3 is fixed to the output of the first sequence [0 1 2]
121 # the loss is computed from the output at time 5 and the gradient of the loss function by chain rule which ends at time 3 because the state there is considered constant -> derivative is zero
122 # how is the gradient computed? I suppose keras adds gradient wrt the weights at 2 5 8 ... 3 6 9... 4 7 ... and uses that to update the weights
123 # there is only one set of weights h(2) = f(h(1),w) h(1) = f(h(0),w) but w is always the same
124 # each column is a one successive evaluation of h(n+1) = f(h(n),w) for n = n_startn n_start+1,...
125 # the cannot be evaluated efficiently on gpu because gpu is a parallel processor
126 # this of it as each column served by one thread, and the threads are independent because they execute in parallel, there needs to be large number of threads (32 is a good number)\
127 # each batch consists of independent calculations
128 # but it can depend on the result of the previous batch (that's the recurrent parr)
132 max_batches = datapoints // timesteps
133 max_sequences = max_batches * batch_size
135 if verbose:
136 print('staircase_2: max_batches=',max_batches)
137 print('staircase_2: max_sequences=',max_sequences)
139 x_train = np.zeros((max_sequences, timesteps, features))
140 if return_sequences:
141 y_train = np.empty((max_sequences, timesteps, outputs))
142 else:
143 y_train = np.empty((max_sequences, outputs ))
145 # build the sequences
147 for i in range(max_batches):
148 for j in range(batch_size):
149 begin = i*timesteps + j
150 next = begin + timesteps
151 if next > datapoints:
152 break
153 if verbose:
154 print('sequence',k,'batch',i,'sample',j,'data',begin,'to',next-1)
155 x_train[k,:,:] = x[begin:next,:]
156 if return_sequences:
157 y_train[k,:,:] = y[begin:next,:]
158 else:
159 y_train[k,:] = y[next-1,:]
160 k += 1
161 if verbose:
162 print('staircase_2: shape x_train = ',x_train.shape)
163 print('staircase_2: shape y_train = ',y_train.shape)
164 print('staircase_2: sequences generated',k)
165 print('staircase_2: batch_size=',batch_size)
166 k = (k // batch_size) * batch_size
167 if verbose:
168 print('staircase_2: removing partial and empty batches at the end, keeping',k)
169 x_train = x_train[:k,:,:]
170 if return_sequences:
171 y_train = y_train[:k,:,:]
172 else:
173 y_train = y_train[:k,:]
175 if verbose:
176 print('staircase_2: shape x_train = ',x_train.shape)
177 print('staircase_2: shape y_train = ',y_train.shape)
179 return x_train, y_train
182 # Dictionary of scalers, used to avoid multiple object creation and to avoid multiple if statements
183 scalers = {
184 'minmax': MinMaxScaler(),
185 'standard': StandardScaler()
189 def batch_setup(ids, batch_size):
191 Sets up stateful batched training data scheme for RNN training.
193 This function takes a list or array of identifiers (`ids`) and divides them into batches of a specified size (`batch_size`). If the last batch does not have enough elements to meet the `batch_size`, the function will loop back to the start of the identifiers and continue filling the batch until it reaches the required size.
195 Parameters:
196 -----------
197 ids : list or numpy array
198 A list or numpy array containing the ids to be batched.
200 batch_size : int
201 The desired size of each batch.
203 Returns:
204 --------
205 batches : list of lists
206 A list where each element is a batch (itself a list) of identifiers. Each batch will contain exactly `batch_size` elements.
208 Example:
209 --------
210 >>> ids = [1, 2, 3, 4, 5]
211 >>> batch_size = 3
212 >>> batch_setup(ids, batch_size)
213 [[1, 2, 3], [4, 5, 1]]
215 Notes:
216 ------
217 - If `ids` is shorter than `batch_size`, the returned list will contain a single batch where identifiers are repeated from the start of `ids` until the batch is filled.
218 """
219 # Ensure ids is a numpy array
220 x = np.array(ids)
222 # Initialize the list to hold the batches
223 batches = []
225 # Use a loop to slice the list/array into batches
226 for i in range(0, len(x), batch_size):
227 batch = list(x[i:i + batch_size])
229 # If the batch is not full, continue from the start
230 while len(batch) < batch_size:
231 # Calculate the remaining number of items needed
232 remaining = batch_size - len(batch)
233 # Append the needed number of items from the start of the array
234 batch.extend(x[:remaining])
236 batches.append(batch)
238 return batches
240 def staircase_spatial(X, y, batch_size, timesteps, hours=None, start_times = None, verbose = True):
242 Prepares spatially formatted time series data for RNN training by creating batches of sequences across different locations, stacked to be compatible with stateful models.
244 This function processes multi-location time series data by slicing it into batches and formatting it to fit into a recurrent neural network (RNN) model. It utilizes a staircase-like approach to prepare sequences for each location and then interlaces them to align with stateful RNN structures.
246 Parameters:
247 -----------
248 X : list of numpy arrays
249 A list where each element is a numpy array containing features for a specific location. The shape of each array is `(total_time_steps, features)`.
251 y : list of numpy arrays
252 A list where each element is a numpy array containing the target values for a specific location. The shape of each array is `(total_time_steps,)`.
254 batch_size : int
255 The number of sequences to include in each batch.
257 timesteps : int
258 The number of time steps to include in each sequence for the RNN.
260 hours : int, optional
261 The length of each time series to consider for each location. If `None`, it defaults to the minimum length of `y` across all locations.
263 start_times : numpy array, optional
264 The initial time step for each location. If `None`, it defaults to an array starting from 0 and incrementing by 1 for each location.
266 verbose : bool, optional
267 If `True`, prints additional information during processing. Default is `True`.
269 Returns:
270 --------
271 XX : numpy array
272 A 3D numpy array with shape `(total_sequences, timesteps, features)` containing the prepared feature sequences for all locations.
274 yy : numpy array
275 A 2D numpy array with shape `(total_sequences, 1)` containing the corresponding target values for all locations.
277 n_seqs : int
278 Number of sequences per location. Used to reset states when location changes. Hidden state of RNN will be reset after n_seqs number of batches
280 Notes:
281 ------
282 - The function handles spatially distributed time series data by batching and formatting it for stateful RNNs.
283 - `hours` determines how much of the time series is used for each location. If not provided, it defaults to the shortest series in `y`.
284 - If `start_times` is not provided, it assumes each location starts its series at progressively later time steps.
285 - The `batch_setup` function is used internally to manage the creation of location and time step batches.
286 - The returned feature sequences `XX` and target sequences `yy` are interlaced to align with the expected input format of stateful RNNs.
289 # Generate ids based on number of distinct timeseries provided
290 n_loc = len(y) # assuming each list entry for y is a separate location
291 loc_ids = np.arange(n_loc)
293 # Generate hours and start_times if None
294 if hours is None:
295 print("Setting total hours to minimum length of y in provided dictionary")
296 hours = min(len(yi) for yi in y)
297 if start_times is None:
298 print(f"Setting Start times to offset by 1 hour by location, from 0 through {batch_size} (batch_size)")
299 start_times = np.tile(np.arange(batch_size), n_loc // batch_size + 1)[:n_loc]
300 # Set up batches
301 loc_batch, t_batch = batch_setup(loc_ids, batch_size), batch_setup(start_times, batch_size)
302 if verbose:
303 print(f"Location ID Batches: {loc_batch}")
304 print(f"Start Times for Batches: {t_batch}")
306 # Loop over batches and construct with staircase_2
307 Xs = []
308 ys = []
309 for i in range(0, len(loc_batch)):
310 locs_i = loc_batch[i]
311 ts = t_batch[i]
312 for j in range(0, len(locs_i)):
313 t0 = int(ts[j])
314 tend = t0 + hours
315 # Create RNNData Dict
316 # Subset data to given location and time from t0 to t0+hours
317 k = locs_i[j] # Used to account for fewer locations than batch size
318 X_temp = X[k][t0:tend,:]
319 y_temp = y[k][t0:tend].reshape(-1,1)
321 # Format sequences
322 Xi, yi = staircase_2(
323 X_temp,
324 y_temp,
325 timesteps = timesteps,
326 batch_size = 1, # note: using 1 here to format sequences for a single location, not same as target batch size for training data
327 verbose=False)
329 Xs.append(Xi)
330 ys.append(yi)
332 # Drop incomplete batches
333 lens = [yi.shape[0] for yi in ys]
334 n_seqs = min(lens)
335 if verbose:
336 print(f"Minimum number of sequences by location: {n_seqs}")
337 print(f"Applying minimum length to other arrays.")
338 Xs = [Xi[:n_seqs] for Xi in Xs]
339 ys = [yi[:n_seqs] for yi in ys]
341 # Interlace arrays to match stateful structure
342 n_features = Xi.shape[2]
343 XXs = []
344 yys = []
345 for i in range(0, len(loc_batch)):
346 locs_i = loc_batch[i]
347 XXi = np.empty((Xs[0].shape[0]*batch_size, timesteps, n_features))
348 yyi = np.empty((Xs[0].shape[0]*batch_size, 1))
349 for j in range(0, len(locs_i)):
350 XXi[j::(batch_size)] = Xs[locs_i[j]]
351 yyi[j::(batch_size)] = ys[locs_i[j]]
352 XXs.append(XXi)
353 yys.append(yyi)
354 yy = np.concatenate(yys, axis=0)
355 XX = np.concatenate(XXs, axis=0)
357 if verbose:
358 print(f"Spatially Formatted X Shape: {XX.shape}")
359 print(f"Spatially Formatted X Shape: {yy.shape}")
362 return XX, yy, n_seqs
364 #***********************************************************************************************
365 ### RNN Class Functionality
367 class RNNParams(dict):
369 A custom dictionary class for handling RNN parameters. Automatically calculates certain params based on others. Overwrites the update method to protect from incompatible parameter choices. Inherits from dict
370 """
371 def __init__(self, input_dict):
373 Initializes the RNNParams instance and runs checks and shape calculations.
375 Parameters:
376 -----------
377 input_dict : dict,
378 A dictionary containing RNN parameters.
380 super().__init__(input_dict)
381 # Automatically run checks on initialization
382 self._run_checks()
383 # Automatically calculate shapes on initialization
384 self.calc_param_shapes()
385 def _run_checks(self, verbose=True):
387 Validates that required keys exist and are of the correct type.
389 Parameters:
390 -----------
391 verbose : bool, optional
392 If True, prints status messages. Default is True.
394 print("Checking params...")
395 # Keys must exist and be integers
396 int_keys = [
397 'batch_size', 'timesteps', 'rnn_layers',
398 'rnn_units', 'dense_layers', 'dense_units', 'epochs'
401 for key in int_keys:
402 assert key in self, f"Missing required key: {key}"
403 assert isinstance(self[key], int), f"Key '{key}' must be an integer"
405 # Keys must exist and be lists
406 list_keys = ['activation', 'features_list', 'dropout', 'time_fracs']
407 for key in list_keys:
408 assert key in self, f"Missing required key: {key}"
409 assert isinstance(self[key], list), f"Key '{key}' must be a list"
411 # Keys must exist and be floats
412 float_keys = ['learning_rate']
413 for key in float_keys:
414 assert key in self, f"Missing required key: {key}"
415 assert isinstance(self[key], float), f"Key '{key}' must be a float"
417 print("Input dictionary passed all checks.")
418 def calc_param_shapes(self, verbose=True):
420 Calculates and updates the shapes of certain parameters based on input data.
422 Parameters:
423 -----------
424 verbose : bool, optional
425 If True, prints status messages. Default is True.
427 if verbose:
428 print("Calculating shape params based on features list, timesteps, and batch size")
429 print(f"Input Feature List: {self['features_list']}")
430 print(f"Input Timesteps: {self['timesteps']}")
431 print(f"Input Batch Size: {self['batch_size']}")
433 n_features = len(self['features_list'])
434 batch_shape = (self["batch_size"], self["timesteps"], n_features)
435 if verbose:
436 print("Calculated params:")
437 print(f"Number of features: {n_features}")
438 print(f"Batch Shape: {batch_shape}")
440 # Update the dictionary
441 super().update({
442 'n_features': n_features,
443 'batch_shape': batch_shape
445 if verbose:
446 print(self)
448 def update(self, *args, verbose=True, **kwargs):
450 Overwrites the standard update functon from dict. This is to prevent certain keys from being modified directly and to automatically update keys to be compatible with each other. The keys handled relate to the shape of the input data to the RNN.
452 Parameters:
453 -----------
454 verbose : bool, optional
455 If True, prints status messages. Default is True.
457 # Prevent updating n_features and batch_shape
458 restricted_keys = {'n_features', 'batch_shape'}
459 keys_to_check = {'features_list', 'timesteps', 'batch_size'}
461 # Check for restricted keys in args
462 if args:
463 if isinstance(args[0], dict):
464 if restricted_keys & args[0].keys():
465 raise KeyError(f"Cannot directly update keys: {restricted_keys & args[0].keys()}, \n Instead update one of: {keys_to_check}")
466 elif isinstance(args[0], (tuple, list)) and all(isinstance(i, tuple) and len(i) == 2 for i in args[0]):
467 if restricted_keys & {k for k, v in args[0]}:
468 raise KeyError(f"Cannot directly update keys: {restricted_keys & {k for k, v in args[0]}}, \n Instead update one of: {keys_to_check}")
470 # Check for restricted keys in kwargs
471 if restricted_keys & kwargs.keys():
472 raise KeyError(f"Cannot update restricted keys: {restricted_keys & kwargs.keys()}")
475 # Track if specific keys are updated
476 keys_updated = set()
478 # Update using the standard dict update method
479 if args:
480 if isinstance(args[0], dict):
481 keys_updated.update(args[0].keys() & keys_to_check)
482 elif isinstance(args[0], (tuple, list)) and all(isinstance(i, tuple) and len(i) == 2 for i in args[0]):
483 keys_updated.update(k for k, v in args[0] if k in keys_to_check)
485 if kwargs:
486 keys_updated.update(kwargs.keys() & keys_to_check)
488 # Call the parent update method
489 super().update(*args, **kwargs)
491 # Recalculate shapes if necessary
492 if keys_updated:
493 self.calc_param_shapes(verbose=verbose)
496 ## Class for handling input data
497 class RNNData(dict):
499 A custom dictionary class for managing RNN data, with validation, scaling, and train-test splitting functionality.
500 """
501 required_keys = {"loc", "time", "X", "y", "features_list"}
502 def __init__(self, input_dict, scaler=None, features_list=None):
504 Initializes the RNNData instance, performs checks, and prepares data.
506 Parameters:
507 -----------
508 input_dict : dict
509 A dictionary containing the initial data.
510 scaler : str, optional
511 The name of the scaler to be used (e.g., 'minmax', 'standard'). Default is None.
512 features_list : list, optional
513 A subset of features to be used. Default is None which means all features.
516 # Copy to avoid changing external input
517 input_data = input_dict.copy()
518 # Initialize inherited dict class
519 super().__init__(input_data)
521 # Check if input data is one timeseries dataset or multiple
522 if type(self.loc['STID']) == str:
523 self.spatial = False
524 print("Input data is single timeseries.")
525 elif type(self.loc['STID']) == list:
526 self.spatial = True
527 print("Input data from multiple timeseries.")
528 else:
529 raise KeyError(f"Input locations not list or single string")
531 # Set up Data Scaling
532 self.scaler = None
533 if scaler is not None:
534 self.set_scaler(scaler)
536 # Rename and define other stuff.
537 if self.spatial:
538 self['hours'] = min(arr.shape[0] for arr in self.y)
539 else:
540 self['hours'] = len(self['y'])
542 self['all_features_list'] = self.pop('features_list')
543 if features_list is None:
544 print("Using all input features.")
545 self.features_list = self.all_features_list
546 else:
547 self.features_list = features_list
549 print(f"Setting features_list to {features_list}. \n NOTE: not subsetting features yet. That happens in train_test_split.")
551 self._run_checks()
552 self.__dict__.update(self)
555 # TODO: Fix checks for multilocation
556 def _run_checks(self, verbose=True):
558 Validates that required keys are present and checks the integrity of data shapes.
560 Parameters:
561 -----------
562 verbose : bool, optional
563 If True, prints status messages. Default is True.
564 """
565 missing_keys = self.required_keys - self.keys()
566 if missing_keys:
567 raise KeyError(f"Missing required keys: {missing_keys}")
569 # # Check if 'hours' is provided and matches len(y)
570 # if 'hours' in self:
571 # if self.hours != len(self.y):
572 # raise ValueError(f"Provided 'hours' value {self.hours} does not match the length of 'y', which is {len(self.y)}.")
573 # Check desired subset of features is in all input features
574 if not all_items_exist(self.features_list, self.all_features_list):
575 raise ValueError(f"Provided 'features_list' {self.features_list} has elements not in input features.")
576 def set_scaler(self, scaler):
578 Sets the scaler to be used for data normalization.
580 Parameters:
581 -----------
582 scaler : str
583 The name of the scaler (e.g., 'minmax', 'standard').
584 """
585 recognized_scalers = ['minmax', 'standard']
586 if scaler in recognized_scalers:
587 print(f"Setting data scaler: {scaler}")
588 self.scaler = scalers[scaler]
589 else:
590 raise ValueError(f"Unrecognized scaler '{scaler}'. Recognized scalers are: {recognized_scalers}.")
591 def train_test_split(self, time_fracs=[1.,0.,0.], space_fracs=[1.,0.,0.], subset_features=True, features_list=None, verbose=True):
593 Splits the data into training, validation, and test sets.
595 Parameters:
596 -----------
597 train_frac : float
598 The fraction of data to be used for training.
599 val_frac : float, optional
600 The fraction of data to be used for validation. Default is 0.0.
601 subset_features : bool, optional
602 If True, subsets the data to the specified features list. Default is True.
603 features_list : list, optional
604 A list of features to use for subsetting. Default is None.
605 split_space : bool, optional
606 Whether to split the data based on space. Default is False.
607 verbose : bool, optional
608 If True, prints status messages. Default is True.
610 # Indicate whether multi timeseries or not
611 spatial = self.spatial
613 # Set up
614 assert np.sum(time_fracs) == np.sum(space_fracs) == 1., f"Provided cross validation params don't sum to 1"
615 if (len(time_fracs) != 3) or (len(space_fracs) != 3):
616 raise ValueError("Cross-validation params `time_fracs` and `space_fracs` must be lists of length 3, representing (train/validation/test)")
618 train_frac = time_fracs[0]
619 val_frac = time_fracs[1]
620 test_frac = time_fracs[2]
622 # Setup train/val/test in time
623 train_ind = int(np.floor(self.hours * train_frac)); self.train_ind = train_ind
624 test_ind= int(train_ind + round(self.hours * val_frac)); self.test_ind = test_ind
625 # Check for any potential issues with indices
626 if test_ind > self.hours:
627 print(f"Setting test index to {self.hours}")
628 test_ind = self.hours
629 if train_ind > test_ind:
630 raise ValueError("Train index must be less than test index.")
632 # Setup train/val/test in space
633 if spatial:
634 train_frac_sp = space_fracs[0]
635 val_frac_sp = space_fracs[1]
636 locs = np.arange(len(self.loc['STID'])) # indices of locations
637 train_size = int(len(locs) * train_frac_sp)
638 val_size = int(len(locs) * val_frac_sp)
639 random.shuffle(locs)
640 train_locs = locs[:train_size]
641 val_locs = locs[train_size:train_size + val_size]
642 test_locs = locs[train_size + val_size:]
643 # Store Lists of IDs in loc subdirectory
644 self.loc['train_locs'] = [self.case[i] for i in train_locs]
645 self.loc['val_locs'] = [self.case[i] for i in val_locs]
646 self.loc['test_locs'] = [self.case[i] for i in test_locs]
649 # Extract data to desired features, copy to avoid changing input objects
650 X = self.X.copy()
651 y = self.y.copy()
652 if subset_features:
653 if verbose and self.features_list != self.all_features_list:
654 print(f"Subsetting input data to features_list: {self.features_list}")
655 # Indices to subset all features with based on params features
656 indices = []
657 for item in self.features_list:
658 if item in self.all_features_list:
659 indices.append(self.all_features_list.index(item))
660 else:
661 print(f"Warning: feature name '{item}' not found in list of all features from input data. Removing from internal features list")
662 # self.features_list.remove(item)
663 if spatial:
664 X = [Xi[:, indices] for Xi in X]
665 else:
666 X = X[:, indices]
668 # Training data from 0 to train_ind
669 # Validation data from train_ind to test_ind
670 # Test data from test_ind to end
671 if spatial:
672 # Split by space
673 X_train = [X[i] for i in train_locs]
674 X_val = [X[i] for i in val_locs]
675 X_test = [X[i] for i in test_locs]
676 y_train = [y[i] for i in train_locs]
677 y_val = [y[i] for i in val_locs]
678 y_test = [y[i] for i in test_locs]
680 # Split by time
681 self.X_train = [Xi[:train_ind] for Xi in X_train]
682 self.y_train = [yi[:train_ind].reshape(-1,1) for yi in y_train]
683 if (val_frac >0) and (val_frac_sp)>0:
684 self.X_val = [Xi[train_ind:test_ind] for Xi in X_val]
685 self.y_val = [yi[train_ind:test_ind].reshape(-1,1) for yi in y_val]
686 self.X_test = [Xi[test_ind:] for Xi in X_test]
687 self.y_test = [yi[test_ind:].reshape(-1,1) for yi in y_test]
688 else:
689 self.X_train = X[:train_ind]
690 self.y_train = y[:train_ind].reshape(-1,1) # assumes y 1-d, change this if vector output
691 if val_frac >0:
692 self.X_val = X[train_ind:test_ind]
693 self.y_val = y[train_ind:test_ind].reshape(-1,1) # assumes y 1-d, change this if vector output
694 self.X_test = X[test_ind:]
695 self.y_test = y[test_ind:].reshape(-1,1) # assumes y 1-d, change this if vector output
699 # Print statements if verbose
700 if verbose:
701 print(f"Train index: 0 to {train_ind}")
702 print(f"Validation index: {train_ind} to {test_ind}")
703 print(f"Test index: {test_ind} to {self.hours}")
705 if spatial:
706 print("Subsetting locations into train/val/test")
707 print(f"Total Locations: {len(locs)}")
708 print(f"Train Locations: {len(train_locs)}")
709 print(f"Val. Locations: {len(val_locs)}")
710 print(f"Test Locations: {len(test_locs)}")
711 print(f"X_train[0] shape: {self.X_train[0].shape}, y_train[0] shape: {self.y_train[0].shape}")
712 if hasattr(self, "X_val"):
713 print(f"X_val[0] shape: {self.X_val[0].shape}, y_val[0] shape: {self.y_val[0].shape}")
714 print(f"X_test[0] shape: {self.X_test[0].shape}, y_test[0] shape: {self.y_test[0].shape}")
715 else:
716 print(f"X_train shape: {self.X_train.shape}, y_train shape: {self.y_train.shape}")
717 if hasattr(self, "X_val"):
718 print(f"X_val shape: {self.X_val.shape}, y_val shape: {self.y_val.shape}")
719 print(f"X_test shape: {self.X_test.shape}, y_test shape: {self.y_test.shape}")
720 # Make Test Data None if zero length
721 if len(self.X_test) == 0:
722 self.X_test = None
723 warnings.warn("Zero Test Data, setting X_test to None")
724 def scale_data(self, verbose=True):
726 Scales the training data using the set scaler.
728 Parameters:
729 -----------
730 verbose : bool, optional
731 If True, prints status messages. Default is True.
732 """
733 # Indicate whether multi timeseries or not
734 spatial = self.spatial
735 if self.scaler is None:
736 raise ValueError("Scaler is not set. Use 'set_scaler' method to set a scaler before scaling data.")
737 # if hasattr(self.scaler, 'n_features_in_'):
738 # warnings.warn("Scale_data has already been called. Exiting to prevent issues.")
739 # return
740 if not hasattr(self, "X_train"):
741 raise AttributeError("No X_train within object. Run train_test_split first. This is to avoid fitting the scaler with prediction data.")
742 if verbose:
743 print(f"Scaling training data with scaler {self.scaler}, fitting on X_train")
745 if spatial:
746 # Fit scaler on row-joined training data
747 self.scaler.fit(np.vstack(self.X_train))
748 # Transform data using fitted scaler
749 self.X_train = [self.scaler.transform(Xi) for Xi in self.X_train]
750 if hasattr(self, 'X_val'):
751 if self.X_val is not None:
752 self.X_val = [self.scaler.transform(Xi) for Xi in self.X_val]
753 if self.X_test is not None:
754 self.X_test = [self.scaler.transform(Xi) for Xi in self.X_test]
755 else:
756 # Fit the scaler on the training data
757 self.scaler.fit(self.X_train)
758 # Transform the data using the fitted scaler
759 self.X_train = self.scaler.transform(self.X_train)
760 if hasattr(self, 'X_val'):
761 self.X_val = self.scaler.transform(self.X_val)
762 self.X_test = self.scaler.transform(self.X_test)
764 # NOTE: only works for non spatial
765 def scale_all_X(self, verbose=True):
767 Scales the all data using the set scaler.
769 Parameters:
770 -----------
771 verbose : bool, optional
772 If True, prints status messages. Default is True.
773 Returns:
774 -------
775 ndarray
776 Scaled X matrix, subsetted to features_list.
777 """
778 if self.spatial:
779 raise ValueError("Not implemented for spatial data")
781 if self.scaler is None:
782 raise ValueError("Scaler is not set. Use 'set_scaler' method to set a scaler before scaling data.")
783 if verbose:
784 print(f"Scaling all X data with scaler {self.scaler}, fitted on X_train")
785 # Subset features
786 indices = []
787 for item in self.features_list:
788 if item in self.all_features_list:
789 indices.append(self.all_features_list.index(item))
790 else:
791 print(f"Warning: feature name '{item}' not found in list of all features from input data")
792 X = self.X[:, indices]
793 X = self.scaler.transform(X)
795 return X
797 def subset_X_features(self, verbose=True):
798 if self.spatial:
799 raise ValueError("Not implemented for spatial data")
800 # Subset features
801 indices = []
802 for item in self.features_list:
803 if item in self.all_features_list:
804 indices.append(self.all_features_list.index(item))
805 else:
806 print(f"Warning: feature name '{item}' not found in list of all features from input data")
807 X = self.X[:, indices]
808 return X
810 def inverse_scale(self, return_X = 'all_hours', save_changes=False, verbose=True):
812 Inversely scales the data to its original form.
814 Parameters:
815 -----------
816 return_X : str, optional
817 Specifies what data to return after inverse scaling. Default is 'all_hours'.
818 save_changes : bool, optional
819 If True, updates the internal data with the inversely scaled values. Default is False.
820 verbose : bool, optional
821 If True, prints status messages. Default is True.
822 """
823 if verbose:
824 print("Inverse scaling data...")
825 X_train = self.scaler.inverse_transform(self.X_train)
826 X_val = self.scaler.inverse_transform(self.X_val)
827 X_test = self.scaler.inverse_transform(self.X_test)
829 if save_changes:
830 print("Inverse transformed data saved")
831 self.X_train = X_train
832 self.X_val = X_val
833 self.X_test = X_test
834 else:
835 if verbose:
836 print("Inverse scaled, but internal data not changed.")
837 if verbose:
838 print(f"Attempting to return {return_X}")
839 if return_X == "all_hours":
840 return np.concatenate((X_train, X_val, X_test), axis=0)
841 else:
842 print(f"Unrecognized or unimplemented return value {return_X}")
843 def batch_reshape(self, timesteps, batch_size, hours=None, verbose=False, start_times=None):
845 Restructures input data to RNN using batches and sequences.
847 Parameters:
848 ----------
849 batch_size : int
850 The size of each training batch to reshape the data.
851 timesteps : int
852 The number of timesteps or sequence length. Consistitutes a single sample
853 timesteps : int
854 Number of timesteps or sequence length used for a single sequence in RNN training. Constitutes a single sample to the model
856 batch_size : int
857 Number of sequences used within a batch of training
859 Returns:
860 -------
861 None
862 This method reshapes the data in place.
863 Raises:
864 ------
865 AttributeError
866 If either 'X_train' or 'y_train' attributes do not exist within the instance.
868 Notes:
869 ------
870 The reshaping method depends on self param "spatial".
871 - spatial == False: Reshapes data assuming no spatial dimensions.
872 - spatial == True: Reshapes data considering spatial dimensions.
876 if not hasattr(self, 'X_train') or not hasattr(self, 'y_train'):
877 raise AttributeError("Both 'X_train' and 'y_train' must be set before reshaping batches.")
879 # Indicator of spatial training scheme or not
880 spatial = self.spatial
882 if spatial:
883 print(f"Reshaping spatial training data using batch size: {batch_size} and timesteps: {timesteps}")
884 # Format training data
885 self.X_train, self.y_train, self.n_seqs = staircase_spatial(self.X_train, self.y_train, timesteps = timesteps,
886 batch_size=batch_size, hours=hours, verbose=verbose, start_times=start_times)
887 # Format Validation Data if provided
888 if hasattr(self, "X_val"):
889 print(f"Reshaping validation data using batch size: {batch_size} and timesteps: {timesteps}")
890 self.X_val, self.y_val, _ = staircase_spatial(self.X_val, self.y_val, timesteps = timesteps,
891 batch_size=batch_size, hours=None, verbose=verbose, start_times=start_times)
892 # Format Test Data. Same (batch_size, timesteps, features) format, but for prediction model batch_size and timesteps are unconstrained
893 # So here batch_size will represent the number of locations aka separate timeseries to predict
894 if self.X_test is not None:
895 print(f"Reshaping test data by stacking. Output dimension will be (n_locs, test_hours, features)")
896 self.X_test = np.stack(self.X_test, axis=0)
897 self.y_test = np.stack(self.y_test, axis=0)
898 print(f"X_test shape: {self.X_test.shape}")
899 print(f"y_test shape: {self.y_test.shape}")
901 else:
902 print(f"Reshaping training data using batch size: {batch_size} and timesteps: {timesteps}")
903 # Format Training Data
904 self.X_train, self.y_train = staircase_2(self.X_train, self.y_train, timesteps = timesteps,
905 batch_size=batch_size, verbose=verbose)
906 # Format Validation Data if provided
907 if hasattr(self, "X_val"):
908 print(f"Reshaping validation data using batch size: {batch_size} and timesteps: {timesteps}")
909 self.X_val, self.y_val = staircase_2(self.X_val, self.y_val, timesteps = timesteps,
910 batch_size=batch_size, verbose=verbose)
911 # Format Test Data. Shape should be (1, hours, features)
912 if self.X_test is not None:
913 self.X_test = self.X_test.reshape((1, self.X_test.shape[0], len(self.features_list)))
914 # self.y_test =
915 print(f"X_test shape: {self.X_test.shape}")
916 print(f"y_test shape: {self.y_test.shape}")
917 if self.X_train.shape[0] == 0:
918 raise ValueError("X_train has zero rows. Try different combo of cross-validation fractions, batch size or start_times. Train/val/test data partially processed, need to return train_test_split")
920 def print_hashes(self, attrs_to_check = ['X', 'y', 'X_train', 'y_train', 'X_val', 'y_val', 'X_test', 'y_test']):
922 Prints the hash of specified data attributes.
924 Parameters:
925 -----------
926 attrs_to_check : list, optional
927 A list of attribute names to hash and print. Default includes 'X', 'y', and split data.
929 for attr in attrs_to_check:
930 if hasattr(self, attr):
931 value = getattr(self, attr)
932 if self.spatial:
933 pass
934 else:
935 print(f"Hash of {attr}: {hash_ndarray(value)}")
936 def __getattr__(self, key):
938 Allows attribute-style access to dictionary keys, a.k.a. enables the "." operator for get elements
939 """
940 try:
941 return self[key]
942 except KeyError:
943 raise AttributeError(f"'rnn_data' object has no attribute '{key}'")
945 def __setitem__(self, key, value):
947 Ensures dictionary and attribute updates stay in sync for required keys.
948 """
949 super().__setitem__(key, value) # Update the dictionary
950 if key in self.required_keys:
951 super().__setattr__(key, value) # Ensure the attribute is updated as well
953 def __setattr__(self, key, value):
955 Ensures dictionary keys are updated when setting attributes.
957 self[key] = value
960 # Function to check reproduciblity hashes, environment info, and model parameters
961 def check_reproducibility(dict0, params, m_hash, w_hash):
963 Performs reproducibility checks on a model by comparing current settings and outputs with stored reproducibility information.
965 Parameters:
966 -----------
967 dict0 : dict
968 The data dictionary that should contain reproducibility information under the 'repro_info' attribute.
969 params : dict
970 The current model parameters to be checked against the reproducibility information.
971 m_hash : str
972 The hash of the current model predictions.
973 w_hash : str
974 The hash of the current fitted model weights.
976 Returns:
977 --------
978 None
979 The function returns None. It issues warnings if any reproducibility checks fail.
981 Notes:
982 ------
983 - Checks are only performed if the `dict0` contains the 'repro_info' attribute.
984 - Issues warnings for mismatches in model weights, predictions, Python version, TensorFlow version, and model parameters.
985 - Skips checks if physics-based initialization is used (not implemented).
986 """
987 if not hasattr(dict0, "repro_info"):
988 warnings.warn("The provided data dictionary does not have the required 'repro_info' attribute. Not running reproduciblity checks.")
989 return
991 repro_info = dict0.repro_info
992 # Check Hashes
993 if params['phys_initialize']:
994 hashes = repro_info['phys_initialize']
995 warnings.warn("Physics Initialization not implemented yet. Not running reproduciblity checks.")
996 else:
997 hashes = repro_info['rand_initialize']
998 print(f"Fitted weights hash: {w_hash} \n Reproducibility weights hash: {hashes['fitted_weights_hash']}")
999 print(f"Model predictions hash: {m_hash} \n Reproducibility preds hash: {hashes['preds_hash']}")
1000 if (w_hash != hashes['fitted_weights_hash']) or (m_hash != hashes['preds_hash']):
1001 if w_hash != hashes['fitted_weights_hash']:
1002 warnings.warn("The fitted weights hash does not match the reproducibility weights hash.")
1003 if m_hash != hashes['preds_hash']:
1004 warnings.warn("The predictions hash does not match the reproducibility predictions hash.")
1005 else:
1006 print("***Reproducibility Checks passed - model weights and model predictions match expected.***")
1008 # Check Environment
1009 current_py_version = sys.version[0:6]
1010 current_tf_version = tf.__version__
1011 if current_py_version != repro_info['env_info']['py_version']:
1012 warnings.warn(f"Python version mismatch: Current Python version is {current_py_version}, "
1013 f"expected {repro_info['env_info']['py_version']}.")
1015 if current_tf_version != repro_info['env_info']['tf_version']:
1016 warnings.warn(f"TensorFlow version mismatch: Current TensorFlow version is {current_tf_version}, "
1017 f"expected {repro_info['env_info']['tf_version']}.")
1019 # Check Params
1020 repro_params = repro_info.get('params', {})
1022 for key, repro_value in repro_params.items():
1023 if key in params:
1024 if params[key] != repro_value:
1025 warnings.warn(f"Parameter mismatch for '{key}': Current value is {params[key]}, "
1026 f"repro value is {repro_value}.")
1027 else:
1028 warnings.warn(f"Parameter '{key}' is missing in the current params.")
1030 return
1032 class RNNModel(ABC):
1034 Abstract base class for RNN models, providing structure for training, predicting, and running reproducibility checks.
1036 def __init__(self, params: dict):
1038 Initializes the RNNModel with the given parameters.
1040 Parameters:
1041 -----------
1042 params : dict
1043 A dictionary containing model parameters.
1045 self.params = params
1046 if type(self) is RNNModel:
1047 raise TypeError("MLModel is an abstract class and cannot be instantiated directly")
1048 super().__init__()
1050 @abstractmethod
1051 def _build_model_train(self):
1052 """Abstract method to build the training model."""
1053 pass
1055 @abstractmethod
1056 def _build_model_predict(self, return_sequences=True):
1057 """Abstract method to build the prediction model. This model copies weights from the train model but with input structure that allows for easier prediction of arbitrary length timeseries. This model is not to be used for training, or don't use with .fit calls"""
1058 pass
1060 def is_stateful(self):
1062 Checks whether any of the layers in the internal model (self.model_train) are stateful.
1064 Returns:
1065 bool: True if at least one layer in the model is stateful, False otherwise.
1067 This method iterates over all the layers in the model and checks if any of them
1068 have the 'stateful' attribute set to True. This is useful for determining if
1069 the model is designed to maintain state across batches during training.
1071 Example:
1072 --------
1073 model.is_stateful()
1074 """
1075 for layer in self.model_train.layers:
1076 if hasattr(layer, 'stateful') and layer.stateful:
1077 return True
1078 return False
1080 def fit(self, X_train, y_train, plot_history=True, plot_title = '',
1081 weights=None, callbacks=[], validation_data=None, return_epochs=False, *args, **kwargs):
1083 Trains the model on the provided training data. Uses the fit method of the training model and then copies the weights over to the prediction model, which has a less restrictive input shape. Formats a list of callbacks to use within the fit method based on params input
1085 Parameters:
1086 -----------
1087 X_train : np.ndarray
1088 The input matrix data for training.
1089 y_train : np.ndarray
1090 The target vector data for training.
1091 plot_history : bool, optional
1092 If True, plots the training history. Default is True.
1093 plot_title : str, optional
1094 The title for the training plot. Default is an empty string.
1095 weights : optional
1096 Initial weights for the model. Default is None.
1097 callbacks : list, optional
1098 A list of callback functions to use during training. Default is an empty list.
1099 validation_data : tuple, optional
1100 Validation data to use during training, expected format (X_val, y_val). Default is None.
1101 return_epochs : bool
1102 If True, return the number of epochs that training took. Used to test and optimize early stopping
1103 """
1104 # Check Compatibility, assume features dimension is last in X_train array
1105 if X_train.shape[-1] != self.params['n_features']:
1106 raise ValueError(f"X_train and number of features from params not equal. \n X_train shape: {X_train.shape} \n params['n_features']: {self.params['n_features']}. \n Try recreating RNNData object with features list from params: `RNNData(..., features_list = parmas['features_list'])`")
1108 # verbose_fit argument is for printing out update after each epoch, which gets very long
1109 verbose_fit = self.params['verbose_fit']
1110 verbose_weights = self.params['verbose_weights']
1111 if verbose_weights:
1112 print(f"Training simple RNN with params: {self.params}")
1114 # Setup callbacks
1115 if self.params["reset_states"]:
1116 callbacks=callbacks+[ResetStatesCallback(self.params), TerminateOnNaN()]
1118 # Early stopping callback requires validation data
1119 if validation_data is not None:
1120 X_val, y_val =validation_data[0], validation_data[1]
1121 print("Using early stopping callback.")
1122 early_stop = EarlyStoppingCallback(patience = self.params['early_stopping_patience'])
1123 callbacks=callbacks+[early_stop]
1124 if verbose_weights:
1125 print(f"Formatted X_train hash: {hash_ndarray(X_train)}")
1126 print(f"Formatted y_train hash: {hash_ndarray(y_train)}")
1127 if validation_data is not None:
1128 print(f"Formatted X_val hash: {hash_ndarray(X_val)}")
1129 print(f"Formatted y_val hash: {hash_ndarray(y_val)}")
1130 print(f"Initial weights before training hash: {hash_weights(self.model_train)}")
1132 ## TODO: Hidden State Initialization
1133 # Evaluate Model once to set nonzero initial state
1134 # self.model_train(X_train[0:self.params['batch_size'],:,:])
1136 if validation_data is not None:
1137 history = self.model_train.fit(
1138 X_train, y_train,
1139 epochs=self.params['epochs'],
1140 batch_size=self.params['batch_size'],
1141 callbacks = callbacks,
1142 verbose=verbose_fit,
1143 validation_data = (X_val, y_val),
1144 *args, **kwargs
1146 else:
1147 history = self.model_train.fit(
1148 X_train, y_train,
1149 epochs=self.params['epochs'],
1150 batch_size=self.params['batch_size'],
1151 callbacks = callbacks,
1152 verbose=verbose_fit,
1153 *args, **kwargs
1156 if plot_history:
1157 self.plot_history(history,plot_title)
1159 if self.params["verbose_weights"]:
1160 print(f"Fitted Weights Hash: {hash_weights(self.model_train)}")
1162 # Update Weights for Prediction Model
1163 w_fitted = self.model_train.get_weights()
1164 self.model_predict.set_weights(w_fitted)
1166 if return_epochs:
1167 # Epoch counting starts at 0, adding 1 for the count
1168 return early_stop.best_epoch + 1
1170 def predict(self, X_test):
1172 Generates predictions on the provided test data using the internal prediction model.
1174 Parameters:
1175 -----------
1176 X_test : np.ndarray
1177 The input data for generating predictions.
1179 Returns:
1180 --------
1181 np.ndarray
1182 The predicted values.
1183 """
1184 print("Predicting test data")
1185 # X_test = self._format_pred_data(X_test)
1186 # preds = self.model_predict.predict(X_test).flatten()
1187 preds = self.model_predict.predict(X_test)
1188 return preds
1191 def _format_pred_data(self, X):
1193 Formats the prediction data for RNN input.
1195 Parameters:
1196 -----------
1197 X : np.ndarray
1198 The input data.
1200 Returns:
1201 --------
1202 np.ndarray
1203 The formatted input data.
1204 """
1205 return np.reshape(X,(1, X.shape[0], self.params['n_features']))
1207 def plot_history(self, history, plot_title, create_figure=True):
1209 Plots the training history. Uses log scale on y axis for readability.
1211 Parameters:
1212 -----------
1213 history : History object
1214 The training history object from model fitting. Output of keras' .fit command
1215 plot_title : str
1216 The title for the plot.
1219 if create_figure:
1220 plt.figure(figsize=(10, 6))
1221 plt.semilogy(history.history['loss'], label='Training loss')
1222 if 'val_loss' in history.history:
1223 plt.semilogy(history.history['val_loss'], label='Validation loss')
1224 plt.title(f'{plot_title} Model loss')
1225 plt.ylabel('Loss')
1226 plt.xlabel('Epoch')
1227 plt.legend(loc='upper left')
1228 plt.show()
1230 def run_model(self, dict0, reproducibility_run=False, plot_period='all', save_outputs=True, return_epochs=False):
1232 Runs the RNN model on input data dictionary, including training, prediction, and reproducibility checks.
1234 Parameters:
1235 -----------
1236 dict0 : RNNData (dict)
1237 The dictionary containing the input data and configuration.
1238 reproducibility_run : bool, optional
1239 If True, performs reproducibility checks after running the model. Default is False.
1240 save_outputs : bool
1241 If True, writes model outputs into input dictionary.
1242 return_epochs : bool
1243 If True, returns how many epochs of training happened. Used to optimize params related to early stopping
1245 Returns:
1246 --------
1247 tuple
1248 Model predictions and a dictionary of RMSE errors broken up by time period.
1249 """
1250 if dict0.X_test is None:
1251 raise ValueError("No test data X_test in input dictionary. Provide test data or just run .fit")
1253 verbose_fit = self.params['verbose_fit']
1254 verbose_weights = self.params['verbose_weights']
1255 if verbose_weights:
1256 dict0.print_hashes()
1257 # Extract Datasets
1258 X_train, y_train, X_test, y_test = dict0.X_train, dict0.y_train, dict0.X_test, dict0.y_test
1259 if 'X_val' in dict0:
1260 X_val, y_val = dict0.X_val, dict0.y_val
1261 else:
1262 X_val = None
1263 if dict0.spatial:
1264 case_id = "Spatial Training Set"
1265 else:
1266 case_id = dict0.case
1268 # Fit model
1269 if X_val is None:
1270 eps = self.fit(X_train, y_train, plot_title=case_id, return_epochs=return_epochs)
1271 else:
1272 eps = self.fit(X_train, y_train, validation_data = (X_val, y_val), plot_title=case_id, return_epochs=return_epochs)
1274 # Generate Predictions and Evaluate Test Error
1275 if dict0.spatial:
1276 m, errs = self._eval_multi(dict0)
1277 if save_outputs:
1278 dict0['m']=m
1279 else:
1280 m, errs = self._eval_single(dict0, verbose_weights, reproducibility_run)
1281 if save_outputs:
1282 dict0['m']=m
1283 plot_data(dict0, title="RNN", title2=dict0.case, plot_period=plot_period)
1285 if return_epochs:
1286 return m, errs, eps
1287 else:
1288 return m, errs
1290 def _eval_single(self, dict0, verbose_weights, reproducibility_run):
1291 # Generate Predictions,
1292 # run through training to get hidden state set properly for forecast period
1293 print(f"Running prediction on all input data, Training through Test")
1294 if dict0.scaler is not None:
1295 X = dict0.scale_all_X()
1296 else:
1297 X = dict0.subset_X_features()
1298 y = dict0.y.flatten()
1299 # Predict
1300 if verbose_weights:
1301 print(f"All X hash: {hash_ndarray(X)}")
1302 # Reshape X
1303 X = X.reshape(1, X.shape[0], X.shape[1])
1304 m = self.predict(X).flatten()
1305 if verbose_weights:
1306 print(f"Predictions Hash: {hash_ndarray(m)}")
1308 if reproducibility_run:
1309 print("Checking Reproducibility")
1310 check_reproducibility(dict0, self.params, hash_ndarray(m), hash_weights(self.model_predict))
1312 # print(dict0.keys())
1313 # Plot final fit and data
1314 # dict0['y'] = y
1315 # plot_data(dict0, title="RNN", title2=dict0['case'], plot_period=plot_period)
1317 # Calculate Errors
1318 err = rmse(m, y)
1319 train_ind = dict0.train_ind # index of final training set value
1320 test_ind = dict0.test_ind # index of first test set value
1322 err_train = rmse(m[:train_ind], y[:train_ind].flatten())
1323 err_pred = rmse(m[test_ind:], y[test_ind:].flatten())
1324 rmse_dict = {
1325 'all': err,
1326 'training': err_train,
1327 'prediction': err_pred
1329 return m, rmse_dict
1331 def _eval_multi(self, dict0):
1332 # Train Error: NOT DOING YET. DECIDE WHETHER THIS IS NEEDED
1334 # Test Error
1335 # new_data = np.stack(dict0.X_test, axis=0)
1336 # y_array = np.stack(dict0.y_test, axis=0)
1337 # preds = self.model_predict.predict(new_data)
1338 y_array = dict0.y_test
1339 preds = self.model_predict.predict(dict0.X_test)
1341 # Calculate RMSE
1342 ## Note: not using util rmse function since this approach is for 3d arrays
1343 # Compute the squared differences
1344 squared_diff = np.square(preds - y_array)
1346 # Mean squared error along the timesteps and dimensions (axis 1 and 2)
1347 mse = np.mean(squared_diff, axis=(1, 2))
1349 # Root mean squared error (RMSE) for each timeseries
1350 rmses = np.sqrt(mse)
1352 return preds, rmses
1355 ## Callbacks
1357 # Helper functions for batch reset schedules
1358 def calc_exp_intervals(bmin, bmax, n_epochs, force_bmax = True):
1359 # Calculate the exponential intervals for each epoch
1360 epochs = np.arange(n_epochs)
1361 factors = epochs / n_epochs
1362 intervals = bmin * (bmax / bmin) ** factors
1363 if force_bmax:
1364 intervals[-1] = bmax # Ensure the last value is exactly bmax
1365 return intervals.astype(int)
1367 def calc_log_intervals(bmin, bmax, n_epochs, force_bmax = True):
1368 # Calculate the logarithmic intervals for each epoch
1369 epochs = np.arange(n_epochs)
1370 factors = np.log(1 + epochs) / np.log(1 + n_epochs)
1371 intervals = bmin + (bmax - bmin) * factors
1372 if force_bmax:
1373 intervals[-1] = bmax # Ensure the last value is exactly bmax
1374 return intervals.astype(int)
1376 class ResetStatesCallback(Callback):
1378 Custom callback to reset the states of RNN layers at the end of each epoch and optionally after a specified number of batches.
1380 Parameters:
1381 -----------
1382 batch_reset : int, optional
1383 If provided, resets the states of RNN layers after every `batch_reset` batches. Default is None.
1384 """
1385 # def __init__(self, bmin=None, bmax=None, epochs=None, loc_batch_reset = None, batch_schedule_type='linear', verbose=True):
1386 def __init__(self, params=None, verbose=True):
1388 Initializes the ResetStatesCallback with an optional batch reset interval.
1390 Parameters:
1391 -----------
1392 params: dict, optional
1393 Dictionary of parameters. If None provided, only on_epoch_end will trigger reset of hidden states.
1394 - bmin : int
1395 Minimum for batch reset schedule
1396 - bmax : int
1397 Maximum for batch reset schedule
1398 - epochs : int
1399 Number of training epochs.
1400 - loc_batch_reset : int
1401 Interval of batches after which to reset the states of RNN layers for location changes. Triggers reset for training AND validation phases
1402 - batch_schedule_type : str
1403 Type of batch scheduling to be used. Recognized methods are following:
1404 - 'constant' : Used fixed batch reset interval throughout training
1405 - 'linear' : Increases the batch reset interval linearly over epochs from bmin to bmax.
1406 - 'exp' : Increases the batch reset interval exponentially over epochs from bmin to bmax.
1407 - 'log' : Increases the batch reset interval logarithmically over epochs from bmin to bmax.
1410 Returns:
1411 -----------
1412 Only in-place reset of hidden states of RNN that calls uses this callback.
1414 """
1415 super(ResetStatesCallback, self).__init__()
1417 # Check for optional arguments, set None if missing in input params
1418 arg_list = ['bmin', 'bmax', 'epochs', 'loc_batch_reset', 'batch_schedule_type']
1419 for arg in arg_list:
1420 setattr(self, arg, params.get(arg, None))
1422 self.verbose = verbose
1423 if self.verbose:
1424 print(f"Using ResetStatesCallback with Batch Reset Schedule: {self.batch_schedule_type}")
1425 # Calculate the reset intervals for each epoch during initialization
1426 if self.batch_schedule_type is not None:
1427 if self.epochs is None:
1428 raise ValueError(f"Arugment `epochs` cannot be none with self.batch_schedule_type: {self.batch_schedule_type}")
1429 self.batch_reset_intervals = self._calc_reset_intervals(self.batch_schedule_type)
1430 if self.verbose:
1431 print(f"batch_reset_intervals: {self.batch_reset_intervals}")
1432 else:
1433 self.batch_reset_intervals = None
1434 def on_epoch_end(self, epoch, logs=None):
1436 Resets the states of RNN layers at the end of each epoch.
1438 Parameters:
1439 -----------
1440 epoch : int
1441 The index of the current epoch.
1442 logs : dict, optional
1443 A dictionary containing metrics from the epoch. Default is None.
1444 """
1445 # print(f" Resetting hidden state after epoch: {epoch+1}", flush=True)
1446 # Iterate over each layer in the model
1447 for layer in self.model.layers:
1448 # Check if the layer has a reset_states method
1449 if hasattr(layer, 'reset_states'):
1450 layer.reset_states()
1451 def _calc_reset_intervals(self,batch_schedule_type):
1452 methods = ['constant', 'linear', 'exp', 'log']
1453 if batch_schedule_type not in methods:
1454 raise ValueError(f"Batch schedule method {batch_schedule_type} not recognized. \n Available methods: {methods}")
1455 if batch_schedule_type == "constant":
1457 return np.repeat(self.bmin, self.epochs).astype(int)
1458 elif batch_schedule_type == "linear":
1459 return np.linspace(self.bmin, self.bmax, self.epochs).astype(int)
1460 elif batch_schedule_type == "exp":
1461 return calc_exp_intervals(self.bmin, self.bmax, self.epochs)
1462 elif batch_schedule_type == "log":
1463 return calc_log_intervals(self.bmin, self.bmax, self.epochs)
1464 def on_epoch_begin(self, epoch, logs=None):
1465 # Set the reset interval for the current epoch
1466 if self.batch_reset_intervals is not None:
1467 self.current_batch_reset = self.batch_reset_intervals[epoch]
1468 else:
1469 self.current_batch_reset = None
1470 def on_train_batch_end(self, batch, logs=None):
1472 Resets the states of RNN layers during training after a specified number of batches, if `batch_reset` or `loc_batch_reset` are provided. The `batch_reset` is used for stability and to avoid exploding gradients at the beginning of training when a hidden state is being passed with weights that haven't learned yet. The `loc_batch_reset` is used to reset the states when a particular batch is from a new location and thus the hidden state should be passed.
1474 Parameters:
1475 -----------
1476 batch : int
1477 The index of the current batch.
1478 logs : dict, optional
1479 A dictionary containing metrics from the batch. Default is None.
1480 """
1481 batch_reset = self.current_batch_reset
1482 if (batch_reset is not None and batch % batch_reset == 0):
1483 # print(f" Resetting states after batch {batch + 1}")
1484 # Iterate over each layer in the model
1485 for layer in self.model.layers:
1486 # Check if the layer has a reset_states method
1487 if hasattr(layer, 'reset_states'):
1488 layer.reset_states()
1489 def on_test_batch_end(self, batch, logs=None):
1491 Resets the states of RNN layers during validation if `loc_batch_reset` is provided to demarcate a new location and thus avoid passing a hidden state to a wrong location.
1493 Parameters:
1494 -----------
1495 batch : int
1496 The index of the current batch.
1497 logs : dict, optional
1498 A dictionary containing metrics from the batch. Default is None.
1499 """
1500 loc_batch_reset = self.loc_batch_reset
1501 if (loc_batch_reset is not None and batch % loc_batch_reset == 0):
1502 # print(f"Resetting states in Validation mode after batch {batch + 1}")
1503 # Iterate over each layer in the model
1504 for layer in self.model.layers:
1505 # Check if the layer has a reset_states method
1506 if hasattr(layer, 'reset_states'):
1507 layer.reset_states()
1509 ## Learning Schedules
1510 ## NOT TESTED YET
1511 lr_schedule = tf.keras.optimizers.schedules.CosineDecay(
1512 initial_learning_rate=0.01,
1513 decay_steps=200,
1514 alpha=0.0,
1515 name='CosineDecay',
1516 # warmup_target=None,
1517 # warmup_steps=100
1521 def EarlyStoppingCallback(patience=5):
1523 Creates an EarlyStopping callback with the specified patience.
1525 Args:
1526 patience (int): Number of epochs with no improvement after which training will be stopped.
1528 Returns:
1529 EarlyStopping: Configured EarlyStopping callback.
1531 return EarlyStopping(
1532 monitor='val_loss',
1533 patience=patience,
1534 verbose=1,
1535 mode='min',
1536 restore_best_weights=True
1539 phys_params = {
1540 'DeltaE': [0,-1], # bias correction
1541 'T1': 0.1, # 1/fuel class (10)
1542 'fm_raise_vs_rain': 0.2 # fm increase per mm rain
1547 def get_initial_weights(model_fit,params,scale_fm=1):
1548 # Given a RNN architecture and hyperparameter dictionary, return array of physics-initiated weights
1549 # Inputs:
1550 # model_fit: output of create_RNN_2 with no training
1551 # params: (dict) dictionary of hyperparameters
1552 # rnn_dat: (dict) data dictionary, output of create_rnn_dat
1553 # Returns: numpy ndarray of weights that should be a rough solution to the moisture ODE
1554 DeltaE = phys_params['DeltaE']
1555 T1 = phys_params['T1']
1556 fmr = phys_params['fm_raise_vs_rain']
1557 centering = [0] # shift activation down
1559 w0_initial={'Ed':(1.-np.exp(-T1))/2,
1560 'Ew':(1.-np.exp(-T1))/2,
1561 'rain':fmr * scale_fm} # wx - input feature
1562 # wh wb wd bd = bias -1
1564 w_initial=np.array([np.nan, np.exp(-0.1), DeltaE[0]/scale_fm, # layer 0
1565 1.0, -centering[0] + DeltaE[1]/scale_fm]) # layer 1
1566 if params['verbose_weights']:
1567 print('Equilibrium moisture correction bias',DeltaE[0],
1568 'in the hidden layer and',DeltaE[1],' in the output layer')
1570 w_name = ['wx','wh','bh','wd','bd']
1572 w=model_fit.get_weights()
1573 for j in range(w[0].shape[0]):
1574 feature = params['features_list'][j]
1575 for k in range(w[0].shape[1]):
1576 w[0][j][k]=w0_initial[feature]
1578 for i in range(1,len(w)): # number of the weight
1579 for j in range(w[i].shape[0]): # number of the inputs
1580 if w[i].ndim==2:
1581 # initialize all entries of the weight matrix to the same number
1582 for k in range(w[i].shape[1]):
1583 w[i][j][k]=w_initial[i]/w[i].shape[0]
1584 elif w[i].ndim==1:
1585 w[i][j]=w_initial[i]
1586 else:
1587 print('weight',i,'shape',w[i].shape)
1588 raise ValueError("Only 1 or 2 dimensions supported")
1589 if params['verbose_weights']:
1590 print('weight',i,w_name[i],'shape',w[i].shape,'ndim',w[i].ndim,
1591 'initial: sum',np.sum(w[i],axis=0),'\nentries',w[i])
1593 return w, w_name
1595 class RNN(RNNModel):
1597 A concrete implementation of the RNNModel abstract base class, using simple recurrent cells for hidden recurrent layers.
1599 Parameters:
1600 -----------
1601 params : dict
1602 A dictionary of model parameters.
1603 loss : str, optional
1604 The loss function to use during model training. Default is 'mean_squared_error'.
1606 def __init__(self, params, loss='mean_squared_error'):
1608 Initializes the RNN model by building the training and prediction models.
1610 Parameters:
1611 -----------
1612 params : dict or RNNParams
1613 A dictionary containing the model's parameters.
1614 loss : str, optional
1615 The loss function to use during model training. Default is 'mean_squared_error'.
1616 """
1617 super().__init__(params)
1618 self.model_train = self._build_model_train()
1619 self.model_predict = self._build_model_predict()
1621 def _build_model_train(self):
1623 Builds and compiles the training model, with batch & sequence shape specifications for input.
1625 Returns:
1626 --------
1627 model : tf.keras.Model
1628 The compiled Keras model for training.
1629 """
1630 inputs = tf.keras.Input(batch_shape=self.params['batch_shape'])
1631 x = inputs
1632 for i in range(self.params['rnn_layers']):
1633 # Return sequences True if recurrent layer feeds into another recurrent layer.
1634 # False if feeds into dense layer
1635 return_sequences = True if i < self.params['rnn_layers'] - 1 else False
1636 x = SimpleRNN(
1637 units=self.params['rnn_units'],
1638 activation=self.params['activation'][0],
1639 dropout=self.params["dropout"][0],
1640 recurrent_dropout = self.params["recurrent_dropout"],
1641 stateful=self.params['stateful'],
1642 return_sequences=return_sequences)(x)
1643 if self.params["dropout"][1] > 0:
1644 x = Dropout(self.params["dropout"][1])(x)
1645 for i in range(self.params['dense_layers']):
1646 x = Dense(self.params['dense_units'], activation=self.params['activation'][1])(x)
1647 # Add final output layer, must be 1 dense cell with linear activation if continuous scalar output
1648 x = Dense(units=1, activation='linear')(x)
1649 model = tf.keras.Model(inputs=inputs, outputs=x)
1650 optimizer=tf.keras.optimizers.Adam(learning_rate=self.params['learning_rate'])
1651 # optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule)
1652 model.compile(loss='mean_squared_error', optimizer=optimizer)
1654 if self.params["verbose_weights"]:
1655 print(f"Initial Weights Hash: {hash_weights(model)}")
1656 # print(model.get_weights())
1658 if self.params['phys_initialize']:
1659 assert self.params['scaler'] is None, f"Not implemented yet to do physics initialize with given data scaling {self.params['scaler']}"
1660 assert self.params['features_list'] == ['Ed', 'Ew', 'rain'], f"Physics initiation can only be done with features ['Ed', 'Ew', 'rain'], but given features {self.params['features_list']}"
1661 assert self.params['dense_layers'] == 0, f"Physics initiation requires no hidden dense layers, received params['dense_layers'] = {self.params['dense_layers']}"
1662 print("Initializing Model with Physics based weights")
1663 w, w_name=get_initial_weights(model, self.params)
1664 model.set_weights(w)
1665 print('initial weights hash =',hash_weights(model))
1666 return model
1668 def _build_model_predict(self, return_sequences=True):
1670 Builds and compiles the prediction model, doesn't use batch shape nor sequence length to make it easier to predict arbitrary number of timesteps. This model has weights copied over from training model is not directly used for training itself.
1672 Parameters:
1673 -----------
1674 return_sequences : bool, optional
1675 Whether to return the full sequence of outputs. Default is True.
1677 Returns:
1678 --------
1679 model : tf.keras.Model
1680 The compiled Keras model for prediction.
1681 """
1682 inputs = tf.keras.Input(shape=(None,self.params['n_features']))
1683 x = inputs
1684 for i in range(self.params['rnn_layers']):
1685 x = SimpleRNN(self.params['rnn_units'],activation=self.params['activation'][0],
1686 stateful=False,return_sequences=return_sequences)(x)
1687 for i in range(self.params['dense_layers']):
1688 x = Dense(self.params['dense_units'], activation=self.params['activation'][1])(x)
1689 # Add final output layer, must be 1 dense cell with linear activation if continuous scalar output
1690 x = Dense(units=1, activation='linear')(x)
1691 model = tf.keras.Model(inputs=inputs, outputs=x)
1692 optimizer=tf.keras.optimizers.Adam(learning_rate=self.params['learning_rate'])
1693 model.compile(loss='mean_squared_error', optimizer=optimizer)
1695 # Set Weights to model_train
1696 w_fitted = self.model_train.get_weights()
1697 model.set_weights(w_fitted)
1699 return model
1702 class RNN_LSTM(RNNModel):
1704 A concrete implementation of the RNNModel abstract base class, use LSTM cells for hidden recurrent layers.
1706 Parameters:
1707 -----------
1708 params : dict
1709 A dictionary of model parameters.
1710 loss : str, optional
1711 The loss function to use during model training. Default is 'mean_squared_error'.
1713 def __init__(self, params, loss='mean_squared_error'):
1715 Initializes the RNN model by building the training and prediction models.
1717 Parameters:
1718 -----------
1719 params : dict or RNNParams
1720 A dictionary containing the model's parameters.
1721 loss : str, optional
1722 The loss function to use during model training. Default is 'mean_squared_error'.
1723 """
1724 super().__init__(params)
1725 self.model_train = self._build_model_train()
1726 self.model_predict = self._build_model_predict()
1728 def _build_model_train(self):
1730 Builds and compiles the training model, with batch & sequence shape specifications for input.
1732 Returns:
1733 --------
1734 model : tf.keras.Model
1735 The compiled Keras model for training.
1736 """
1737 inputs = tf.keras.Input(batch_shape=self.params['batch_shape'])
1738 x = inputs
1739 for i in range(self.params['rnn_layers']):
1740 return_sequences = True if i < self.params['rnn_layers'] - 1 else False
1741 x = LSTM(
1742 units=self.params['rnn_units'],
1743 activation=self.params['activation'][0],
1744 dropout=self.params["dropout"][0],
1745 recurrent_dropout = self.params["recurrent_dropout"],
1746 recurrent_activation=self.params["recurrent_activation"],
1747 stateful=self.params['stateful'],
1748 return_sequences=return_sequences)(x)
1749 if self.params["dropout"][1] > 0:
1750 x = Dropout(self.params["dropout"][1])(x)
1751 for i in range(self.params['dense_layers']):
1752 x = Dense(self.params['dense_units'], activation=self.params['activation'][1])(x)
1753 x = Dense(units=1, activation="linear")(x)
1754 model = tf.keras.Model(inputs=inputs, outputs=x)
1755 # optimizer=tf.keras.optimizers.Adam(learning_rate=self.params['learning_rate'], clipvalue=self.params['clipvalue'])
1756 optimizer=tf.keras.optimizers.Adam(learning_rate=self.params['learning_rate'])
1757 model.compile(loss='mean_squared_error', optimizer=optimizer)
1759 if self.params["verbose_weights"]:
1760 print(f"Initial Weights Hash: {hash_weights(model)}")
1761 return model
1762 def _build_model_predict(self, return_sequences=True):
1764 Builds and compiles the prediction model, doesn't use batch shape nor sequence length to make it easier to predict arbitrary number of timesteps. This model has weights copied over from training model is not directly used for training itself.
1766 Parameters:
1767 -----------
1768 return_sequences : bool, optional
1769 Whether to return the full sequence of outputs. Default is True.
1771 Returns:
1772 --------
1773 model : tf.keras.Model
1774 The compiled Keras model for prediction.
1775 """
1776 inputs = tf.keras.Input(shape=(None,self.params['n_features']))
1777 x = inputs
1778 for i in range(self.params['rnn_layers']):
1779 x = LSTM(
1780 units=self.params['rnn_units'],
1781 activation=self.params['activation'][0],
1782 stateful=False,return_sequences=return_sequences)(x)
1783 for i in range(self.params['dense_layers']):
1784 x = Dense(self.params['dense_units'], activation=self.params['activation'][1])(x)
1785 x = Dense(units=1, activation="linear")(x)
1786 model = tf.keras.Model(inputs=inputs, outputs=x)
1787 optimizer=tf.keras.optimizers.Adam(learning_rate=self.params['learning_rate'])
1788 model.compile(loss='mean_squared_error', optimizer=optimizer)
1790 # Set Weights to model_train
1791 w_fitted = self.model_train.get_weights()
1792 model.set_weights(w_fitted)
1794 return model
1796 # Wrapper Functions putting everything together.
1797 # Useful for deploying from command line
1798 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1800 def rnn_data_wrap(dict0, params):
1801 rnn_dat = RNNData(
1802 Dict(dict0), # input dictionary
1803 scaler="standard", # data scaling type
1804 features_list = params['features_list'] # features for predicting outcome
1808 rnn_dat.train_test_split(
1809 time_fracs = params['time_fracs'], # Percent of total time steps used for train/val/test
1810 space_fracs = params['space_fracs'] # Percent of total timeseries used for train/val/test
1812 rnn_dat.scale_data()
1814 rnn_dat.batch_reshape(
1815 timesteps = params['timesteps'], # Timesteps aka sequence length for RNN input data.
1816 batch_size = params['batch_size'], # Number of samples of length timesteps for a single round of grad. descent
1817 start_times = np.zeros(len(rnn_dat.loc['train_locs']))
1820 return rnn_dat
1822 def forecast(model, data, spinup_hours = 0):
1823 return