Update step method for batch reset, better param handling
[notebooks.git] / fmda / moisture_rnn.py
blobb5af2333b29d98547139624b11ca7d8ab1e9df5f
2 # v2 training and prediction class infrastructure
4 # Environment
5 import random
6 import numpy as np
7 import pandas as pd
8 import tensorflow as tf
9 import matplotlib.pyplot as plt
10 import sys
11 from tensorflow.keras.callbacks import Callback, EarlyStopping, TerminateOnNaN
12 # from sklearn.metrics import mean_squared_error
13 import logging
14 from tensorflow.keras.layers import LSTM, SimpleRNN, Input, Dropout, Dense
15 # Local modules
16 import reproducibility
17 # from utils import print_dict_summary
18 from abc import ABC, abstractmethod
19 from utils import hash2, all_items_exist, hash_ndarray, hash_weights
20 from data_funcs import rmse, plot_data, compare_dicts
21 import copy
22 # import yaml
23 from sklearn.preprocessing import MinMaxScaler, StandardScaler
24 import warnings
26 #*************************************************************************************
27 # Data Formatting Functions
29 def staircase(x,y,timesteps,datapoints,return_sequences=False, verbose = False):
30 # x [datapoints,features] all inputs
31 # y [datapoints,outputs]
32 # timesteps: split x and y into samples length timesteps, shifted by 1
33 # datapoints: number of timesteps to use for training, no more than y.shape[0]
34 if verbose:
35 print('staircase: shape x = ',x.shape)
36 print('staircase: shape y = ',y.shape)
37 print('staircase: timesteps=',timesteps)
38 print('staircase: datapoints=',datapoints)
39 print('staircase: return_sequences=',return_sequences)
40 outputs = y.shape[1]
41 features = x.shape[1]
42 samples = datapoints-timesteps+1
43 if verbose:
44 print('staircase: samples=',samples,'timesteps=',timesteps,'features=',features)
45 x_train = np.empty([samples, timesteps, features])
46 if return_sequences:
47 if verbose:
48 print('returning all timesteps in a sample')
49 y_train = np.empty([samples, timesteps, outputs]) # all
50 for i in range(samples):
51 for k in range(timesteps):
52 x_train[i,k,:] = x[i+k,:]
53 y_train[i,k,:] = y[i+k,:]
54 else:
55 if verbose:
56 print('returning only the last timestep in a sample')
57 y_train = np.empty([samples, outputs])
58 for i in range(samples):
59 for k in range(timesteps):
60 x_train[i,k,:] = x[i+k,:]
61 y_train[i,:] = y[i+timesteps-1,:]
63 return x_train, y_train
65 def staircase_2(x,y,timesteps,batch_size=None,trainsteps=np.inf,return_sequences=False, verbose = False):
66 # create RNN training data in multiple batches
67 # input:
68 # x (,features)
69 # y (,outputs)
70 # timesteps: split x and y into sequences length timesteps
71 # a.k.a. lookback or sequence_length
73 # print params if verbose
75 if batch_size is None:
76 raise ValueError('staircase_2 requires batch_size')
77 if verbose:
78 print('staircase_2: shape x = ',x.shape)
79 print('staircase_2: shape y = ',y.shape)
80 print('staircase_2: timesteps=',timesteps)
81 print('staircase_2: batch_size=',batch_size)
82 print('staircase_2: return_sequences=',return_sequences)
84 nx,features= x.shape
85 ny,outputs = y.shape
86 datapoints = min(nx,ny,trainsteps)
87 if verbose:
88 print('staircase_2: datapoints=',datapoints)
90 # sequence j in a given batch is assumed to be the continuation of sequence j in the previous batch
91 # https://www.tensorflow.org/guide/keras/working_with_rnns Cross-batch statefulness
93 # example with timesteps=3 batch_size=3 datapoints=15
94 # batch 0: [0 1 2] [1 2 3] [2 3 4]
95 # batch 1: [3 4 5] [4 5 6] [5 6 7]
96 # batch 2: [6 7 8] [7 8 9] [8 9 10]
97 # batch 3: [9 10 11] [10 11 12] [11 12 13]
98 # batch 4: [12 13 14] [13 14 15] when runs out this is the last batch, can be shorter
100 # TODO: implement for multiple locations, same starting time for each batch
101 # Loc 1 Loc 2 Loc 3
102 # batch 0: [0 1 2] [0 1 2] [0 1 2]
103 # batch 1: [3 4 5] [3 4 5] [3 4 5]
104 # batch 2: [6 7 8] [6 7 8] [6 7 8]
105 # TODO: second epoch shift starting time at batch 0 in time
107 # TODO: implement for multiple locations, different starting times for each batch
108 # Loc 1 Loc 2 Loc 3
109 # batch 0: [0 1 2] [1 2 3] [2 3 4]
110 # batch 1: [3 4 5] [4 5 6] [5 6 57
111 # batch 2: [6 7 8] [7 8 9] [8 9 10]
114 # the first sample in batch j starts from timesteps*j and ends with timesteps*(j+1)-1
115 # e.g. the final hidden state of the rnn after the sequence of steps [0 1 2] in batch 0
116 # becomes the starting hidden state of the rnn in the sequence of steps [3 4 5] in batch 1, etc.
118 # sample [0 1 2] means the rnn is used twice to map state 0 -> 1 -> 2
119 # the state at time 0 is fixed but the state is considered a variable at times 1 and 2
120 # the loss is computed from the output at time 2 and the gradient of the loss function by chain rule which ends at time 0 because the state there is a constant -> derivative is zero
121 # sample [3 4 5] means the rnn is used twice to map state 3 -> 4 -> 5 # the state at time 3 is fixed to the output of the first sequence [0 1 2]
122 # the loss is computed from the output at time 5 and the gradient of the loss function by chain rule which ends at time 3 because the state there is considered constant -> derivative is zero
123 # how is the gradient computed? I suppose keras adds gradient wrt the weights at 2 5 8 ... 3 6 9... 4 7 ... and uses that to update the weights
124 # there is only one set of weights h(2) = f(h(1),w) h(1) = f(h(0),w) but w is always the same
125 # each column is a one successive evaluation of h(n+1) = f(h(n),w) for n = n_startn n_start+1,...
126 # the cannot be evaluated efficiently on gpu because gpu is a parallel processor
127 # this of it as each column served by one thread, and the threads are independent because they execute in parallel, there needs to be large number of threads (32 is a good number)\
128 # each batch consists of independent calculations
129 # but it can depend on the result of the previous batch (that's the recurrent parr)
133 max_batches = datapoints // timesteps
134 max_sequences = max_batches * batch_size
136 if verbose:
137 print('staircase_2: max_batches=',max_batches)
138 print('staircase_2: max_sequences=',max_sequences)
140 x_train = np.zeros((max_sequences, timesteps, features))
141 if return_sequences:
142 y_train = np.empty((max_sequences, timesteps, outputs))
143 else:
144 y_train = np.empty((max_sequences, outputs ))
146 # build the sequences
148 for i in range(max_batches):
149 for j in range(batch_size):
150 begin = i*timesteps + j
151 next = begin + timesteps
152 if next > datapoints:
153 break
154 if verbose:
155 print('sequence',k,'batch',i,'sample',j,'data',begin,'to',next-1)
156 x_train[k,:,:] = x[begin:next,:]
157 if return_sequences:
158 y_train[k,:,:] = y[begin:next,:]
159 else:
160 y_train[k,:] = y[next-1,:]
161 k += 1
162 if verbose:
163 print('staircase_2: shape x_train = ',x_train.shape)
164 print('staircase_2: shape y_train = ',y_train.shape)
165 print('staircase_2: sequences generated',k)
166 print('staircase_2: batch_size=',batch_size)
167 k = (k // batch_size) * batch_size
168 if verbose:
169 print('staircase_2: removing partial and empty batches at the end, keeping',k)
170 x_train = x_train[:k,:,:]
171 if return_sequences:
172 y_train = y_train[:k,:,:]
173 else:
174 y_train = y_train[:k,:]
176 if verbose:
177 print('staircase_2: shape x_train = ',x_train.shape)
178 print('staircase_2: shape y_train = ',y_train.shape)
180 return x_train, y_train
183 # Dictionary of scalers, used to avoid multiple object creation and to avoid multiple if statements
184 scalers = {
185 'minmax': MinMaxScaler(),
186 'standard': StandardScaler()
190 def batch_setup(ids, batch_size):
192 Sets up stateful batched training data scheme for RNN training.
194 This function takes a list or array of identifiers (`ids`) and divides them into batches of a specified size (`batch_size`). If the last batch does not have enough elements to meet the `batch_size`, the function will loop back to the start of the identifiers and continue filling the batch until it reaches the required size.
196 Parameters:
197 -----------
198 ids : list or numpy array
199 A list or numpy array containing the ids to be batched.
201 batch_size : int
202 The desired size of each batch.
204 Returns:
205 --------
206 batches : list of lists
207 A list where each element is a batch (itself a list) of identifiers. Each batch will contain exactly `batch_size` elements.
209 Example:
210 --------
211 >>> ids = [1, 2, 3, 4, 5]
212 >>> batch_size = 3
213 >>> batch_setup(ids, batch_size)
214 [[1, 2, 3], [4, 5, 1]]
216 Notes:
217 ------
218 - If `ids` is shorter than `batch_size`, the returned list will contain a single batch where identifiers are repeated from the start of `ids` until the batch is filled.
219 """
220 # Ensure ids is a numpy array
221 x = np.array(ids)
223 # Initialize the list to hold the batches
224 batches = []
226 # Use a loop to slice the list/array into batches
227 for i in range(0, len(x), batch_size):
228 batch = list(x[i:i + batch_size])
230 # If the batch is not full, continue from the start
231 while len(batch) < batch_size:
232 # Calculate the remaining number of items needed
233 remaining = batch_size - len(batch)
234 # Append the needed number of items from the start of the array
235 batch.extend(x[:remaining])
237 batches.append(batch)
239 return batches
241 def staircase_spatial(X, y, batch_size, timesteps, hours=None, start_times = None, verbose = True):
243 Prepares spatially formatted time series data for RNN training by creating batches of sequences across different locations, stacked to be compatible with stateful models.
245 This function processes multi-location time series data by slicing it into batches and formatting it to fit into a recurrent neural network (RNN) model. It utilizes a staircase-like approach to prepare sequences for each location and then interlaces them to align with stateful RNN structures.
247 Parameters:
248 -----------
249 X : list of numpy arrays
250 A list where each element is a numpy array containing features for a specific location. The shape of each array is `(total_time_steps, features)`.
252 y : list of numpy arrays
253 A list where each element is a numpy array containing the target values for a specific location. The shape of each array is `(total_time_steps,)`.
255 batch_size : int
256 The number of sequences to include in each batch.
258 timesteps : int
259 The number of time steps to include in each sequence for the RNN.
261 hours : int, optional
262 The length of each time series to consider for each location. If `None`, it defaults to the minimum length of `y` across all locations.
264 start_times : numpy array, optional
265 The initial time step for each location. If `None`, it defaults to an array starting from 0 and incrementing by 1 for each location.
267 verbose : bool, optional
268 If `True`, prints additional information during processing. Default is `True`.
270 Returns:
271 --------
272 XX : numpy array
273 A 3D numpy array with shape `(total_sequences, timesteps, features)` containing the prepared feature sequences for all locations.
275 yy : numpy array
276 A 2D numpy array with shape `(total_sequences, 1)` containing the corresponding target values for all locations.
278 n_seqs : int
279 Number of sequences per location. Used to reset states when location changes. Hidden state of RNN will be reset after n_seqs number of batches
281 Notes:
282 ------
283 - The function handles spatially distributed time series data by batching and formatting it for stateful RNNs.
284 - `hours` determines how much of the time series is used for each location. If not provided, it defaults to the shortest series in `y`.
285 - If `start_times` is not provided, it assumes each location starts its series at progressively later time steps.
286 - The `batch_setup` function is used internally to manage the creation of location and time step batches.
287 - The returned feature sequences `XX` and target sequences `yy` are interlaced to align with the expected input format of stateful RNNs.
290 # Generate ids based on number of distinct timeseries provided
291 n_loc = len(y) # assuming each list entry for y is a separate location
292 loc_ids = np.arange(n_loc)
294 # Generate hours and start_times if None
295 if hours is None:
296 print("Setting total hours to minimum length of y in provided dictionary")
297 hours = min(len(yi) for yi in y)
298 if start_times is None:
299 print(f"Setting Start times to offset by 1 hour by location, from 0 through {batch_size} (batch_size)")
300 start_times = np.tile(np.arange(batch_size), n_loc // batch_size + 1)[:n_loc]
301 # Set up batches
302 loc_batch, t_batch = batch_setup(loc_ids, batch_size), batch_setup(start_times, batch_size)
303 if verbose:
304 print(f"Location ID Batches: {loc_batch}")
305 print(f"Start Times for Batches: {t_batch}")
307 # Loop over batches and construct with staircase_2
308 Xs = []
309 ys = []
310 for i in range(0, len(loc_batch)):
311 locs_i = loc_batch[i]
312 ts = t_batch[i]
313 for j in range(0, len(locs_i)):
314 t0 = int(ts[j])
315 tend = t0 + hours
316 # Create RNNData Dict
317 # Subset data to given location and time from t0 to t0+hours
318 k = locs_i[j] # Used to account for fewer locations than batch size
319 X_temp = X[k][t0:tend,:]
320 y_temp = y[k][t0:tend].reshape(-1,1)
322 # Format sequences
323 Xi, yi = staircase_2(
324 X_temp,
325 y_temp,
326 timesteps = timesteps,
327 batch_size = 1, # note: using 1 here to format sequences for a single location, not same as target batch size for training data
328 verbose=False)
330 Xs.append(Xi)
331 ys.append(yi)
333 # Drop incomplete batches
334 lens = [yi.shape[0] for yi in ys]
335 n_seqs = min(lens)
336 if verbose:
337 print(f"Minimum number of sequences by location: {n_seqs}")
338 print(f"Applying minimum length to other arrays.")
339 Xs = [Xi[:n_seqs] for Xi in Xs]
340 ys = [yi[:n_seqs] for yi in ys]
342 # Interlace arrays to match stateful structure
343 n_features = Xi.shape[2]
344 XXs = []
345 yys = []
346 for i in range(0, len(loc_batch)):
347 locs_i = loc_batch[i]
348 XXi = np.empty((Xs[0].shape[0]*batch_size, timesteps, n_features))
349 yyi = np.empty((Xs[0].shape[0]*batch_size, 1))
350 for j in range(0, len(locs_i)):
351 XXi[j::(batch_size)] = Xs[locs_i[j]]
352 yyi[j::(batch_size)] = ys[locs_i[j]]
353 XXs.append(XXi)
354 yys.append(yyi)
355 yy = np.concatenate(yys, axis=0)
356 XX = np.concatenate(XXs, axis=0)
358 if verbose:
359 print(f"Spatially Formatted X Shape: {XX.shape}")
360 print(f"Spatially Formatted X Shape: {yy.shape}")
363 return XX, yy, n_seqs
365 #***********************************************************************************************
366 ### RNN Class Functionality
368 class RNNParams(dict):
370 A custom dictionary class for handling RNN parameters. Automatically calculates certain params based on others. Overwrites the update method to protect from incompatible parameter choices. Inherits from dict
371 """
372 def __init__(self, input_dict):
374 Initializes the RNNParams instance and runs checks and shape calculations.
376 Parameters:
377 -----------
378 input_dict : dict,
379 A dictionary containing RNN parameters.
381 super().__init__(input_dict)
382 # Automatically run checks on initialization
383 self._run_checks()
384 # Automatically calculate shapes on initialization
385 self.calc_param_shapes()
386 def _run_checks(self, verbose=True):
388 Validates that required keys exist and are of the correct type.
390 Parameters:
391 -----------
392 verbose : bool, optional
393 If True, prints status messages. Default is True.
395 print("Checking params...")
396 # Keys must exist and be integers
397 int_keys = [
398 'batch_size', 'timesteps', 'rnn_layers',
399 'rnn_units', 'dense_layers', 'dense_units', 'epochs'
402 for key in int_keys:
403 assert key in self, f"Missing required key: {key}"
404 assert isinstance(self[key], int), f"Key '{key}' must be an integer"
406 # Keys must exist and be lists
407 list_keys = ['activation', 'features_list', 'dropout', 'time_fracs']
408 for key in list_keys:
409 assert key in self, f"Missing required key: {key}"
410 assert isinstance(self[key], list), f"Key '{key}' must be a list"
412 # Keys must exist and be floats
413 float_keys = ['learning_rate']
414 for key in float_keys:
415 assert key in self, f"Missing required key: {key}"
416 assert isinstance(self[key], float), f"Key '{key}' must be a float"
418 print("Input dictionary passed all checks.")
419 def calc_param_shapes(self, verbose=True):
421 Calculates and updates the shapes of certain parameters based on input data.
423 Parameters:
424 -----------
425 verbose : bool, optional
426 If True, prints status messages. Default is True.
428 if verbose:
429 print("Calculating shape params based on features list, timesteps, and batch size")
430 print(f"Input Feature List: {self['features_list']}")
431 print(f"Input Timesteps: {self['timesteps']}")
432 print(f"Input Batch Size: {self['batch_size']}")
434 n_features = len(self['features_list'])
435 batch_shape = (self["batch_size"], self["timesteps"], n_features)
436 if verbose:
437 print("Calculated params:")
438 print(f"Number of features: {n_features}")
439 print(f"Batch Shape: {batch_shape}")
441 # Update the dictionary
442 super().update({
443 'n_features': n_features,
444 'batch_shape': batch_shape
446 if verbose:
447 print(self)
449 def update(self, *args, verbose=True, **kwargs):
451 Overwrites the standard update functon from dict. This is to prevent certain keys from being modified directly and to automatically update keys to be compatible with each other. The keys handled relate to the shape of the input data to the RNN.
453 Parameters:
454 -----------
455 verbose : bool, optional
456 If True, prints status messages. Default is True.
458 # Prevent updating n_features and batch_shape
459 restricted_keys = {'n_features', 'batch_shape'}
460 keys_to_check = {'features_list', 'timesteps', 'batch_size'}
462 # Check for restricted keys in args
463 if args:
464 if isinstance(args[0], dict):
465 if restricted_keys & args[0].keys():
466 raise KeyError(f"Cannot directly update keys: {restricted_keys & args[0].keys()}, \n Instead update one of: {keys_to_check}")
467 elif isinstance(args[0], (tuple, list)) and all(isinstance(i, tuple) and len(i) == 2 for i in args[0]):
468 if restricted_keys & {k for k, v in args[0]}:
469 raise KeyError(f"Cannot directly update keys: {restricted_keys & {k for k, v in args[0]}}, \n Instead update one of: {keys_to_check}")
471 # Check for restricted keys in kwargs
472 if restricted_keys & kwargs.keys():
473 raise KeyError(f"Cannot update restricted keys: {restricted_keys & kwargs.keys()}")
476 # Track if specific keys are updated
477 keys_updated = set()
479 # Update using the standard dict update method
480 if args:
481 if isinstance(args[0], dict):
482 keys_updated.update(args[0].keys() & keys_to_check)
483 elif isinstance(args[0], (tuple, list)) and all(isinstance(i, tuple) and len(i) == 2 for i in args[0]):
484 keys_updated.update(k for k, v in args[0] if k in keys_to_check)
486 if kwargs:
487 keys_updated.update(kwargs.keys() & keys_to_check)
489 # Call the parent update method
490 super().update(*args, **kwargs)
492 # Recalculate shapes if necessary
493 if keys_updated:
494 self.calc_param_shapes(verbose=verbose)
497 ## Class for handling input data
498 class RNNData(dict):
500 A custom dictionary class for managing RNN data, with validation, scaling, and train-test splitting functionality.
501 """
502 required_keys = {"loc", "time", "X", "y", "features_list"}
503 def __init__(self, input_dict, scaler=None, features_list=None):
505 Initializes the RNNData instance, performs checks, and prepares data.
507 Parameters:
508 -----------
509 input_dict : dict
510 A dictionary containing the initial data.
511 scaler : str, optional
512 The name of the scaler to be used (e.g., 'minmax', 'standard'). Default is None.
513 features_list : list, optional
514 A subset of features to be used. Default is None which means all features.
517 # Copy to avoid changing external input
518 input_data = input_dict.copy()
519 # Initialize inherited dict class
520 super().__init__(input_data)
522 # Check if input data is one timeseries dataset or multiple
523 if type(self.loc['STID']) == str:
524 self.spatial = False
525 print("Input data is single timeseries.")
526 elif type(self.loc['STID']) == list:
527 self.spatial = True
528 print("Input data from multiple timeseries.")
529 else:
530 raise KeyError(f"Input locations not list or single string")
532 # Set up Data Scaling
533 self.scaler = None
534 if scaler is not None:
535 self.set_scaler(scaler)
537 # Rename and define other stuff.
538 if self.spatial:
539 self['hours'] = min(arr.shape[0] for arr in self.y)
540 else:
541 self['hours'] = len(self['y'])
543 self['all_features_list'] = self.pop('features_list')
544 if features_list is None:
545 print("Using all input features.")
546 self.features_list = self.all_features_list
547 else:
548 self.features_list = features_list
550 print(f"Setting features_list to {features_list}. \n NOTE: not subsetting features yet. That happens in train_test_split.")
552 self._run_checks()
553 self.__dict__.update(self)
556 # TODO: Fix checks for multilocation
557 def _run_checks(self, verbose=True):
559 Validates that required keys are present and checks the integrity of data shapes.
561 Parameters:
562 -----------
563 verbose : bool, optional
564 If True, prints status messages. Default is True.
565 """
566 missing_keys = self.required_keys - self.keys()
567 if missing_keys:
568 raise KeyError(f"Missing required keys: {missing_keys}")
570 # # Check if 'hours' is provided and matches len(y)
571 # if 'hours' in self:
572 # if self.hours != len(self.y):
573 # raise ValueError(f"Provided 'hours' value {self.hours} does not match the length of 'y', which is {len(self.y)}.")
574 # Check desired subset of features is in all input features
575 if not all_items_exist(self.features_list, self.all_features_list):
576 raise ValueError(f"Provided 'features_list' {self.features_list} has elements not in input features.")
577 def set_scaler(self, scaler):
579 Sets the scaler to be used for data normalization.
581 Parameters:
582 -----------
583 scaler : str
584 The name of the scaler (e.g., 'minmax', 'standard').
585 """
586 recognized_scalers = ['minmax', 'standard']
587 if scaler in recognized_scalers:
588 print(f"Setting data scaler: {scaler}")
589 self.scaler = scalers[scaler]
590 else:
591 raise ValueError(f"Unrecognized scaler '{scaler}'. Recognized scalers are: {recognized_scalers}.")
592 def train_test_split(self, time_fracs=[1.,0.,0.], space_fracs=[1.,0.,0.], subset_features=True, features_list=None, verbose=True):
594 Splits the data into training, validation, and test sets.
596 Parameters:
597 -----------
598 train_frac : float
599 The fraction of data to be used for training.
600 val_frac : float, optional
601 The fraction of data to be used for validation. Default is 0.0.
602 subset_features : bool, optional
603 If True, subsets the data to the specified features list. Default is True.
604 features_list : list, optional
605 A list of features to use for subsetting. Default is None.
606 split_space : bool, optional
607 Whether to split the data based on space. Default is False.
608 verbose : bool, optional
609 If True, prints status messages. Default is True.
611 # Indicate whether multi timeseries or not
612 spatial = self.spatial
614 # Set up
615 assert np.sum(time_fracs) == np.sum(space_fracs) == 1., f"Provided cross validation params don't sum to 1"
616 if (len(time_fracs) != 3) or (len(space_fracs) != 3):
617 raise ValueError("Cross-validation params `time_fracs` and `space_fracs` must be lists of length 3, representing (train/validation/test)")
619 train_frac = time_fracs[0]
620 val_frac = time_fracs[1]
621 test_frac = time_fracs[2]
623 # Setup train/val/test in time
624 train_ind = int(np.floor(self.hours * train_frac)); self.train_ind = train_ind
625 test_ind= int(train_ind + round(self.hours * val_frac)); self.test_ind = test_ind
626 # Check for any potential issues with indices
627 if test_ind > self.hours:
628 print(f"Setting test index to {self.hours}")
629 test_ind = self.hours
630 if train_ind > test_ind:
631 raise ValueError("Train index must be less than test index.")
633 # Setup train/val/test in space
634 if spatial:
635 train_frac_sp = space_fracs[0]
636 val_frac_sp = space_fracs[1]
637 locs = np.arange(len(self.loc['STID'])) # indices of locations
638 train_size = int(len(locs) * train_frac_sp)
639 val_size = int(len(locs) * val_frac_sp)
640 random.shuffle(locs)
641 train_locs = locs[:train_size]
642 val_locs = locs[train_size:train_size + val_size]
643 test_locs = locs[train_size + val_size:]
644 # Store Lists of IDs in loc subdirectory
645 self.loc['train_locs'] = [self.case[i] for i in train_locs]
646 self.loc['val_locs'] = [self.case[i] for i in val_locs]
647 self.loc['test_locs'] = [self.case[i] for i in test_locs]
650 # Extract data to desired features, copy to avoid changing input objects
651 X = self.X.copy()
652 y = self.y.copy()
653 if subset_features:
654 if verbose and self.features_list != self.all_features_list:
655 print(f"Subsetting input data to features_list: {self.features_list}")
656 # Indices to subset all features with based on params features
657 indices = []
658 for item in self.features_list:
659 if item in self.all_features_list:
660 indices.append(self.all_features_list.index(item))
661 else:
662 print(f"Warning: feature name '{item}' not found in list of all features from input data. Removing from internal features list")
663 # self.features_list.remove(item)
664 if spatial:
665 X = [Xi[:, indices] for Xi in X]
666 else:
667 X = X[:, indices]
669 # Training data from 0 to train_ind
670 # Validation data from train_ind to test_ind
671 # Test data from test_ind to end
672 if spatial:
673 # Split by space
674 X_train = [X[i] for i in train_locs]
675 X_val = [X[i] for i in val_locs]
676 X_test = [X[i] for i in test_locs]
677 y_train = [y[i] for i in train_locs]
678 y_val = [y[i] for i in val_locs]
679 y_test = [y[i] for i in test_locs]
681 # Split by time
682 self.X_train = [Xi[:train_ind] for Xi in X_train]
683 self.y_train = [yi[:train_ind].reshape(-1,1) for yi in y_train]
684 if (val_frac >0) and (val_frac_sp)>0:
685 self.X_val = [Xi[train_ind:test_ind] for Xi in X_val]
686 self.y_val = [yi[train_ind:test_ind].reshape(-1,1) for yi in y_val]
687 self.X_test = [Xi[test_ind:] for Xi in X_test]
688 self.y_test = [yi[test_ind:].reshape(-1,1) for yi in y_test]
689 else:
690 self.X_train = X[:train_ind]
691 self.y_train = y[:train_ind].reshape(-1,1) # assumes y 1-d, change this if vector output
692 if val_frac >0:
693 self.X_val = X[train_ind:test_ind]
694 self.y_val = y[train_ind:test_ind].reshape(-1,1) # assumes y 1-d, change this if vector output
695 self.X_test = X[test_ind:]
696 self.y_test = y[test_ind:].reshape(-1,1) # assumes y 1-d, change this if vector output
700 # Print statements if verbose
701 if verbose:
702 print(f"Train index: 0 to {train_ind}")
703 print(f"Validation index: {train_ind} to {test_ind}")
704 print(f"Test index: {test_ind} to {self.hours}")
706 if spatial:
707 print("Subsetting locations into train/val/test")
708 print(f"Total Locations: {len(locs)}")
709 print(f"Train Locations: {len(train_locs)}")
710 print(f"Val. Locations: {len(val_locs)}")
711 print(f"Test Locations: {len(test_locs)}")
712 print(f"X_train[0] shape: {self.X_train[0].shape}, y_train[0] shape: {self.y_train[0].shape}")
713 if hasattr(self, "X_val"):
714 print(f"X_val[0] shape: {self.X_val[0].shape}, y_val[0] shape: {self.y_val[0].shape}")
715 print(f"X_test[0] shape: {self.X_test[0].shape}, y_test[0] shape: {self.y_test[0].shape}")
716 else:
717 print(f"X_train shape: {self.X_train.shape}, y_train shape: {self.y_train.shape}")
718 if hasattr(self, "X_val"):
719 print(f"X_val shape: {self.X_val.shape}, y_val shape: {self.y_val.shape}")
720 print(f"X_test shape: {self.X_test.shape}, y_test shape: {self.y_test.shape}")
721 # Make Test Data None if zero length
722 if len(self.X_test) == 0:
723 self.X_test = None
724 warnings.warn("Zero Test Data, setting X_test to None")
725 def scale_data(self, verbose=True):
727 Scales the training data using the set scaler.
729 Parameters:
730 -----------
731 verbose : bool, optional
732 If True, prints status messages. Default is True.
733 """
734 # Indicate whether multi timeseries or not
735 spatial = self.spatial
736 if self.scaler is None:
737 raise ValueError("Scaler is not set. Use 'set_scaler' method to set a scaler before scaling data.")
738 # if hasattr(self.scaler, 'n_features_in_'):
739 # warnings.warn("Scale_data has already been called. Exiting to prevent issues.")
740 # return
741 if not hasattr(self, "X_train"):
742 raise AttributeError("No X_train within object. Run train_test_split first. This is to avoid fitting the scaler with prediction data.")
743 if verbose:
744 print(f"Scaling training data with scaler {self.scaler}, fitting on X_train")
746 if spatial:
747 # Fit scaler on row-joined training data
748 self.scaler.fit(np.vstack(self.X_train))
749 # Transform data using fitted scaler
750 self.X_train = [self.scaler.transform(Xi) for Xi in self.X_train]
751 if hasattr(self, 'X_val'):
752 if self.X_val is not None:
753 self.X_val = [self.scaler.transform(Xi) for Xi in self.X_val]
754 if self.X_test is not None:
755 self.X_test = [self.scaler.transform(Xi) for Xi in self.X_test]
756 else:
757 # Fit the scaler on the training data
758 self.scaler.fit(self.X_train)
759 # Transform the data using the fitted scaler
760 self.X_train = self.scaler.transform(self.X_train)
761 if hasattr(self, 'X_val'):
762 self.X_val = self.scaler.transform(self.X_val)
763 self.X_test = self.scaler.transform(self.X_test)
765 # NOTE: only works for non spatial
766 def scale_all_X(self, verbose=True):
768 Scales the all data using the set scaler.
770 Parameters:
771 -----------
772 verbose : bool, optional
773 If True, prints status messages. Default is True.
774 Returns:
775 -------
776 ndarray
777 Scaled X matrix, subsetted to features_list.
778 """
779 if self.spatial:
780 raise ValueError("Not implemented for spatial data")
782 if self.scaler is None:
783 raise ValueError("Scaler is not set. Use 'set_scaler' method to set a scaler before scaling data.")
784 if verbose:
785 print(f"Scaling all X data with scaler {self.scaler}, fitted on X_train")
786 # Subset features
787 indices = []
788 for item in self.features_list:
789 if item in self.all_features_list:
790 indices.append(self.all_features_list.index(item))
791 else:
792 print(f"Warning: feature name '{item}' not found in list of all features from input data")
793 X = self.X[:, indices]
794 X = self.scaler.transform(X)
796 return X
798 def subset_X_features(self, verbose=True):
799 if self.spatial:
800 raise ValueError("Not implemented for spatial data")
801 # Subset features
802 indices = []
803 for item in self.features_list:
804 if item in self.all_features_list:
805 indices.append(self.all_features_list.index(item))
806 else:
807 print(f"Warning: feature name '{item}' not found in list of all features from input data")
808 X = self.X[:, indices]
809 return X
811 def inverse_scale(self, return_X = 'all_hours', save_changes=False, verbose=True):
813 Inversely scales the data to its original form.
815 Parameters:
816 -----------
817 return_X : str, optional
818 Specifies what data to return after inverse scaling. Default is 'all_hours'.
819 save_changes : bool, optional
820 If True, updates the internal data with the inversely scaled values. Default is False.
821 verbose : bool, optional
822 If True, prints status messages. Default is True.
823 """
824 if verbose:
825 print("Inverse scaling data...")
826 X_train = self.scaler.inverse_transform(self.X_train)
827 X_val = self.scaler.inverse_transform(self.X_val)
828 X_test = self.scaler.inverse_transform(self.X_test)
830 if save_changes:
831 print("Inverse transformed data saved")
832 self.X_train = X_train
833 self.X_val = X_val
834 self.X_test = X_test
835 else:
836 if verbose:
837 print("Inverse scaled, but internal data not changed.")
838 if verbose:
839 print(f"Attempting to return {return_X}")
840 if return_X == "all_hours":
841 return np.concatenate((X_train, X_val, X_test), axis=0)
842 else:
843 print(f"Unrecognized or unimplemented return value {return_X}")
844 def batch_reshape(self, timesteps, batch_size, hours=None, verbose=False, start_times=None):
846 Restructures input data to RNN using batches and sequences.
848 Parameters:
849 ----------
850 batch_size : int
851 The size of each training batch to reshape the data.
852 timesteps : int
853 The number of timesteps or sequence length. Consistitutes a single sample
854 timesteps : int
855 Number of timesteps or sequence length used for a single sequence in RNN training. Constitutes a single sample to the model
857 batch_size : int
858 Number of sequences used within a batch of training
860 Returns:
861 -------
862 None
863 This method reshapes the data in place.
864 Raises:
865 ------
866 AttributeError
867 If either 'X_train' or 'y_train' attributes do not exist within the instance.
869 Notes:
870 ------
871 The reshaping method depends on self param "spatial".
872 - spatial == False: Reshapes data assuming no spatial dimensions.
873 - spatial == True: Reshapes data considering spatial dimensions.
877 if not hasattr(self, 'X_train') or not hasattr(self, 'y_train'):
878 raise AttributeError("Both 'X_train' and 'y_train' must be set before reshaping batches.")
880 # Indicator of spatial training scheme or not
881 spatial = self.spatial
883 if spatial:
884 print(f"Reshaping spatial training data using batch size: {batch_size} and timesteps: {timesteps}")
885 # Format training data
886 self.X_train, self.y_train, self.n_seqs = staircase_spatial(self.X_train, self.y_train, timesteps = timesteps,
887 batch_size=batch_size, hours=hours, verbose=verbose, start_times=start_times)
888 # Format Validation Data if provided
889 if hasattr(self, "X_val"):
890 print(f"Reshaping validation data using batch size: {batch_size} and timesteps: {timesteps}")
891 self.X_val, self.y_val, _ = staircase_spatial(self.X_val, self.y_val, timesteps = timesteps,
892 batch_size=batch_size, hours=None, verbose=verbose, start_times=start_times)
893 # Format Test Data. Same (batch_size, timesteps, features) format, but for prediction model batch_size and timesteps are unconstrained
894 # So here batch_size will represent the number of locations aka separate timeseries to predict
895 if self.X_test is not None:
896 print(f"Reshaping test data by stacking. Output dimension will be (n_locs, test_hours, features)")
897 self.X_test = np.stack(self.X_test, axis=0)
898 self.y_test = np.stack(self.y_test, axis=0)
899 print(f"X_test shape: {self.X_test.shape}")
900 print(f"y_test shape: {self.y_test.shape}")
902 else:
903 print(f"Reshaping training data using batch size: {batch_size} and timesteps: {timesteps}")
904 # Format Training Data
905 self.X_train, self.y_train = staircase_2(self.X_train, self.y_train, timesteps = timesteps,
906 batch_size=batch_size, verbose=verbose)
907 # Format Validation Data if provided
908 if hasattr(self, "X_val"):
909 print(f"Reshaping validation data using batch size: {batch_size} and timesteps: {timesteps}")
910 self.X_val, self.y_val = staircase_2(self.X_val, self.y_val, timesteps = timesteps,
911 batch_size=batch_size, verbose=verbose)
912 # Format Test Data. Shape should be (1, hours, features)
913 if self.X_test is not None:
914 self.X_test = self.X_test.reshape((1, self.X_test.shape[0], len(self.features_list)))
915 # self.y_test =
916 print(f"X_test shape: {self.X_test.shape}")
917 print(f"y_test shape: {self.y_test.shape}")
918 if self.X_train.shape[0] == 0:
919 raise ValueError("X_train has zero rows. Try different combo of cross-validation fractions, batch size or start_times. Train/val/test data partially processed, need to return train_test_split")
921 def print_hashes(self, attrs_to_check = ['X', 'y', 'X_train', 'y_train', 'X_val', 'y_val', 'X_test', 'y_test']):
923 Prints the hash of specified data attributes.
925 Parameters:
926 -----------
927 attrs_to_check : list, optional
928 A list of attribute names to hash and print. Default includes 'X', 'y', and split data.
930 for attr in attrs_to_check:
931 if hasattr(self, attr):
932 value = getattr(self, attr)
933 if self.spatial:
934 pass
935 else:
936 print(f"Hash of {attr}: {hash_ndarray(value)}")
937 def __getattr__(self, key):
939 Allows attribute-style access to dictionary keys, a.k.a. enables the "." operator for get elements
940 """
941 try:
942 return self[key]
943 except KeyError:
944 raise AttributeError(f"'rnn_data' object has no attribute '{key}'")
946 def __setitem__(self, key, value):
948 Ensures dictionary and attribute updates stay in sync for required keys.
949 """
950 super().__setitem__(key, value) # Update the dictionary
951 if key in self.required_keys:
952 super().__setattr__(key, value) # Ensure the attribute is updated as well
954 def __setattr__(self, key, value):
956 Ensures dictionary keys are updated when setting attributes.
958 self[key] = value
961 # Function to check reproduciblity hashes, environment info, and model parameters
962 def check_reproducibility(dict0, params, m_hash, w_hash):
964 Performs reproducibility checks on a model by comparing current settings and outputs with stored reproducibility information.
966 Parameters:
967 -----------
968 dict0 : dict
969 The data dictionary that should contain reproducibility information under the 'repro_info' attribute.
970 params : dict
971 The current model parameters to be checked against the reproducibility information.
972 m_hash : str
973 The hash of the current model predictions.
974 w_hash : str
975 The hash of the current fitted model weights.
977 Returns:
978 --------
979 None
980 The function returns None. It issues warnings if any reproducibility checks fail.
982 Notes:
983 ------
984 - Checks are only performed if the `dict0` contains the 'repro_info' attribute.
985 - Issues warnings for mismatches in model weights, predictions, Python version, TensorFlow version, and model parameters.
986 - Skips checks if physics-based initialization is used (not implemented).
987 """
988 if not hasattr(dict0, "repro_info"):
989 warnings.warn("The provided data dictionary does not have the required 'repro_info' attribute. Not running reproduciblity checks.")
990 return
992 repro_info = dict0.repro_info
993 # Check Hashes
994 if params['phys_initialize']:
995 hashes = repro_info['phys_initialize']
996 warnings.warn("Physics Initialization not implemented yet. Not running reproduciblity checks.")
997 else:
998 hashes = repro_info['rand_initialize']
999 print(f"Fitted weights hash: {w_hash} \n Reproducibility weights hash: {hashes['fitted_weights_hash']}")
1000 print(f"Model predictions hash: {m_hash} \n Reproducibility preds hash: {hashes['preds_hash']}")
1001 if (w_hash != hashes['fitted_weights_hash']) or (m_hash != hashes['preds_hash']):
1002 if w_hash != hashes['fitted_weights_hash']:
1003 warnings.warn("The fitted weights hash does not match the reproducibility weights hash.")
1004 if m_hash != hashes['preds_hash']:
1005 warnings.warn("The predictions hash does not match the reproducibility predictions hash.")
1006 else:
1007 print("***Reproducibility Checks passed - model weights and model predictions match expected.***")
1009 # Check Environment
1010 current_py_version = sys.version[0:6]
1011 current_tf_version = tf.__version__
1012 if current_py_version != repro_info['env_info']['py_version']:
1013 warnings.warn(f"Python version mismatch: Current Python version is {current_py_version}, "
1014 f"expected {repro_info['env_info']['py_version']}.")
1016 if current_tf_version != repro_info['env_info']['tf_version']:
1017 warnings.warn(f"TensorFlow version mismatch: Current TensorFlow version is {current_tf_version}, "
1018 f"expected {repro_info['env_info']['tf_version']}.")
1020 # Check Params
1021 repro_params = repro_info.get('params', {})
1023 for key, repro_value in repro_params.items():
1024 if key in params:
1025 if params[key] != repro_value:
1026 warnings.warn(f"Parameter mismatch for '{key}': Current value is {params[key]}, "
1027 f"repro value is {repro_value}.")
1028 else:
1029 warnings.warn(f"Parameter '{key}' is missing in the current params.")
1031 return
1033 class RNNModel(ABC):
1035 Abstract base class for RNN models, providing structure for training, predicting, and running reproducibility checks.
1037 def __init__(self, params: dict):
1039 Initializes the RNNModel with the given parameters.
1041 Parameters:
1042 -----------
1043 params : dict
1044 A dictionary containing model parameters.
1046 self.params = params
1047 if type(self) is RNNModel:
1048 raise TypeError("MLModel is an abstract class and cannot be instantiated directly")
1049 super().__init__()
1051 @abstractmethod
1052 def _build_model_train(self):
1053 """Abstract method to build the training model."""
1054 pass
1056 @abstractmethod
1057 def _build_model_predict(self, return_sequences=True):
1058 """Abstract method to build the prediction model. This model copies weights from the train model but with input structure that allows for easier prediction of arbitrary length timeseries. This model is not to be used for training, or don't use with .fit calls"""
1059 pass
1061 def is_stateful(self):
1063 Checks whether any of the layers in the internal model (self.model_train) are stateful.
1065 Returns:
1066 bool: True if at least one layer in the model is stateful, False otherwise.
1068 This method iterates over all the layers in the model and checks if any of them
1069 have the 'stateful' attribute set to True. This is useful for determining if
1070 the model is designed to maintain state across batches during training.
1072 Example:
1073 --------
1074 model.is_stateful()
1075 """
1076 for layer in self.model_train.layers:
1077 if hasattr(layer, 'stateful') and layer.stateful:
1078 return True
1079 return False
1081 def fit(self, X_train, y_train, plot_history=True, plot_title = '',
1082 weights=None, callbacks=[], validation_data=None, return_epochs=False, *args, **kwargs):
1084 Trains the model on the provided training data. Uses the fit method of the training model and then copies the weights over to the prediction model, which has a less restrictive input shape. Formats a list of callbacks to use within the fit method based on params input
1086 Parameters:
1087 -----------
1088 X_train : np.ndarray
1089 The input matrix data for training.
1090 y_train : np.ndarray
1091 The target vector data for training.
1092 plot_history : bool, optional
1093 If True, plots the training history. Default is True.
1094 plot_title : str, optional
1095 The title for the training plot. Default is an empty string.
1096 weights : optional
1097 Initial weights for the model. Default is None.
1098 callbacks : list, optional
1099 A list of callback functions to use during training. Default is an empty list.
1100 validation_data : tuple, optional
1101 Validation data to use during training, expected format (X_val, y_val). Default is None.
1102 return_epochs : bool
1103 If True, return the number of epochs that training took. Used to test and optimize early stopping
1104 """
1105 # Check Compatibility, assume features dimension is last in X_train array
1106 if X_train.shape[-1] != self.params['n_features']:
1107 raise ValueError(f"X_train and number of features from params not equal. \n X_train shape: {X_train.shape} \n params['n_features']: {self.params['n_features']}. \n Try recreating RNNData object with features list from params: `RNNData(..., features_list = parmas['features_list'])`")
1109 # verbose_fit argument is for printing out update after each epoch, which gets very long
1110 verbose_fit = self.params['verbose_fit']
1111 verbose_weights = self.params['verbose_weights']
1112 if verbose_weights:
1113 print(f"Training simple RNN with params: {self.params}")
1115 # Setup callbacks
1116 if self.params["reset_states"]:
1117 callbacks=callbacks+[ResetStatesCallback(self.params), TerminateOnNaN()]
1119 # Early stopping callback requires validation data
1120 if validation_data is not None:
1121 X_val, y_val =validation_data[0], validation_data[1]
1122 print("Using early stopping callback.")
1123 early_stop = EarlyStoppingCallback(patience = self.params['early_stopping_patience'])
1124 callbacks=callbacks+[early_stop]
1125 if verbose_weights:
1126 print(f"Formatted X_train hash: {hash_ndarray(X_train)}")
1127 print(f"Formatted y_train hash: {hash_ndarray(y_train)}")
1128 if validation_data is not None:
1129 print(f"Formatted X_val hash: {hash_ndarray(X_val)}")
1130 print(f"Formatted y_val hash: {hash_ndarray(y_val)}")
1131 print(f"Initial weights before training hash: {hash_weights(self.model_train)}")
1133 ## TODO: Hidden State Initialization
1134 # Evaluate Model once to set nonzero initial state
1135 # self.model_train(X_train[0:self.params['batch_size'],:,:])
1137 if validation_data is not None:
1138 history = self.model_train.fit(
1139 X_train, y_train,
1140 epochs=self.params['epochs'],
1141 batch_size=self.params['batch_size'],
1142 callbacks = callbacks,
1143 verbose=verbose_fit,
1144 validation_data = (X_val, y_val),
1145 *args, **kwargs
1147 else:
1148 history = self.model_train.fit(
1149 X_train, y_train,
1150 epochs=self.params['epochs'],
1151 batch_size=self.params['batch_size'],
1152 callbacks = callbacks,
1153 verbose=verbose_fit,
1154 *args, **kwargs
1157 if plot_history:
1158 self.plot_history(history,plot_title)
1160 if self.params["verbose_weights"]:
1161 print(f"Fitted Weights Hash: {hash_weights(self.model_train)}")
1163 # Update Weights for Prediction Model
1164 w_fitted = self.model_train.get_weights()
1165 self.model_predict.set_weights(w_fitted)
1167 if return_epochs:
1168 # Epoch counting starts at 0, adding 1 for the count
1169 return early_stop.best_epoch + 1
1171 def predict(self, X_test):
1173 Generates predictions on the provided test data using the internal prediction model.
1175 Parameters:
1176 -----------
1177 X_test : np.ndarray
1178 The input data for generating predictions.
1180 Returns:
1181 --------
1182 np.ndarray
1183 The predicted values.
1184 """
1185 print("Predicting test data")
1186 # X_test = self._format_pred_data(X_test)
1187 # preds = self.model_predict.predict(X_test).flatten()
1188 preds = self.model_predict.predict(X_test)
1189 return preds
1192 def _format_pred_data(self, X):
1194 Formats the prediction data for RNN input.
1196 Parameters:
1197 -----------
1198 X : np.ndarray
1199 The input data.
1201 Returns:
1202 --------
1203 np.ndarray
1204 The formatted input data.
1205 """
1206 return np.reshape(X,(1, X.shape[0], self.params['n_features']))
1208 def plot_history(self, history, plot_title, create_figure=True):
1210 Plots the training history. Uses log scale on y axis for readability.
1212 Parameters:
1213 -----------
1214 history : History object
1215 The training history object from model fitting. Output of keras' .fit command
1216 plot_title : str
1217 The title for the plot.
1220 if create_figure:
1221 plt.figure(figsize=(10, 6))
1222 plt.semilogy(history.history['loss'], label='Training loss')
1223 if 'val_loss' in history.history:
1224 plt.semilogy(history.history['val_loss'], label='Validation loss')
1225 plt.title(f'{plot_title} Model loss')
1226 plt.ylabel('Loss')
1227 plt.xlabel('Epoch')
1228 plt.legend(loc='upper left')
1229 plt.show()
1231 def run_model(self, dict0, reproducibility_run=False, plot_period='all', save_outputs=True, return_epochs=False):
1233 Runs the RNN model on input data dictionary, including training, prediction, and reproducibility checks.
1235 Parameters:
1236 -----------
1237 dict0 : RNNData (dict)
1238 The dictionary containing the input data and configuration.
1239 reproducibility_run : bool, optional
1240 If True, performs reproducibility checks after running the model. Default is False.
1241 save_outputs : bool
1242 If True, writes model outputs into input dictionary.
1243 return_epochs : bool
1244 If True, returns how many epochs of training happened. Used to optimize params related to early stopping
1246 Returns:
1247 --------
1248 tuple
1249 Model predictions and a dictionary of RMSE errors broken up by time period.
1250 """
1251 if dict0.X_test is None:
1252 raise ValueError("No test data X_test in input dictionary. Provide test data or just run .fit")
1254 verbose_fit = self.params['verbose_fit']
1255 verbose_weights = self.params['verbose_weights']
1256 if verbose_weights:
1257 dict0.print_hashes()
1258 # Extract Datasets
1259 X_train, y_train, X_test, y_test = dict0.X_train, dict0.y_train, dict0.X_test, dict0.y_test
1260 if 'X_val' in dict0:
1261 X_val, y_val = dict0.X_val, dict0.y_val
1262 else:
1263 X_val = None
1264 if dict0.spatial:
1265 case_id = "Spatial Training Set"
1266 else:
1267 case_id = dict0.case
1269 # Fit model
1270 if X_val is None:
1271 eps = self.fit(X_train, y_train, plot_title=case_id, return_epochs=return_epochs)
1272 else:
1273 eps = self.fit(X_train, y_train, validation_data = (X_val, y_val), plot_title=case_id, return_epochs=return_epochs)
1275 # Generate Predictions and Evaluate Test Error
1276 if dict0.spatial:
1277 m, errs = self._eval_multi(dict0)
1278 if save_outputs:
1279 dict0['m']=m
1280 else:
1281 m, errs = self._eval_single(dict0, verbose_weights, reproducibility_run)
1282 if save_outputs:
1283 dict0['m']=m
1284 plot_data(dict0, title="RNN", title2=dict0.case, plot_period=plot_period)
1286 # print(f"Mean Test Error: {errs.mean()}")
1288 if return_epochs:
1289 return m, errs, eps
1290 else:
1291 return m, errs
1293 def _eval_single(self, dict0, verbose_weights, reproducibility_run):
1294 # Generate Predictions,
1295 # run through training to get hidden state set properly for forecast period
1296 print(f"Running prediction on all input data, Training through Test")
1297 if dict0.scaler is not None:
1298 X = dict0.scale_all_X()
1299 else:
1300 X = dict0.subset_X_features()
1301 y = dict0.y.flatten()
1302 # Predict
1303 if verbose_weights:
1304 print(f"All X hash: {hash_ndarray(X)}")
1305 # Reshape X
1306 X = X.reshape(1, X.shape[0], X.shape[1])
1307 m = self.predict(X).flatten()
1308 if verbose_weights:
1309 print(f"Predictions Hash: {hash_ndarray(m)}")
1311 if reproducibility_run:
1312 print("Checking Reproducibility")
1313 check_reproducibility(dict0, self.params, hash_ndarray(m), hash_weights(self.model_predict))
1315 # print(dict0.keys())
1316 # Plot final fit and data
1317 # dict0['y'] = y
1318 # plot_data(dict0, title="RNN", title2=dict0['case'], plot_period=plot_period)
1320 # Calculate Errors
1321 err = rmse(m, y)
1322 train_ind = dict0.train_ind # index of final training set value
1323 test_ind = dict0.test_ind # index of first test set value
1325 err_train = rmse(m[:train_ind], y[:train_ind].flatten())
1326 err_pred = rmse(m[test_ind:], y[test_ind:].flatten())
1327 rmse_dict = {
1328 'all': err,
1329 'training': err_train,
1330 'prediction': err_pred
1332 return m, rmse_dict
1334 def _eval_multi(self, dict0):
1335 # Train Error: NOT DOING YET. DECIDE WHETHER THIS IS NEEDED
1337 # Test Error
1338 # new_data = np.stack(dict0.X_test, axis=0)
1339 # y_array = np.stack(dict0.y_test, axis=0)
1340 # preds = self.model_predict.predict(new_data)
1341 y_array = dict0.y_test
1342 preds = self.model_predict.predict(dict0.X_test)
1344 # Calculate RMSE
1345 ## Note: not using util rmse function since this approach is for 3d arrays
1346 # Compute the squared differences
1347 squared_diff = np.square(preds - y_array)
1349 # Mean squared error along the timesteps and dimensions (axis 1 and 2)
1350 mse = np.mean(squared_diff, axis=(1, 2))
1352 # Root mean squared error (RMSE) for each timeseries
1353 rmses = np.sqrt(mse)
1355 return preds, rmses
1358 ## Callbacks
1360 # Helper functions for batch reset schedules
1361 def calc_exp_intervals(bmin, bmax, n_epochs, force_bmax = True):
1362 # Calculate the exponential intervals for each epoch
1363 epochs = np.arange(n_epochs)
1364 factors = epochs / n_epochs
1365 intervals = bmin * (bmax / bmin) ** factors
1366 if force_bmax:
1367 intervals[-1] = bmax # Ensure the last value is exactly bmax
1368 return intervals.astype(int)
1370 def calc_log_intervals(bmin, bmax, n_epochs, force_bmax = True):
1371 # Calculate the logarithmic intervals for each epoch
1372 epochs = np.arange(n_epochs)
1373 factors = np.log(1 + epochs) / np.log(1 + n_epochs)
1374 intervals = bmin + (bmax - bmin) * factors
1375 if force_bmax:
1376 intervals[-1] = bmax # Ensure the last value is exactly bmax
1377 return intervals.astype(int)
1379 def calc_step_intervals(bmin, bmax, n_epochs, estep=None, force_bmax=True):
1380 # Set estep to 10% of the total number of epochs if not provided
1381 if estep is None:
1382 estep = int(0.1 * n_epochs)
1384 # Initialize intervals with bmin for the first part, then step up to bmax
1385 intervals = np.full(n_epochs, bmin)
1387 # Step up to bmax after 'estep' epochs
1388 intervals[estep:] = bmax
1390 # Force the last value to be exactly bmax if specified
1391 if force_bmax:
1392 intervals[-1] = bmax
1394 return intervals.astype(int)
1396 class ResetStatesCallback(Callback):
1398 Custom callback to reset the states of RNN layers at the end of each epoch and optionally after a specified number of batches.
1400 Parameters:
1401 -----------
1402 batch_reset : int, optional
1403 If provided, resets the states of RNN layers after every `batch_reset` batches. Default is None.
1404 """
1405 # def __init__(self, bmin=None, bmax=None, epochs=None, loc_batch_reset = None, batch_schedule_type='linear', verbose=True):
1406 def __init__(self, params=None, verbose=True):
1408 Initializes the ResetStatesCallback with an optional batch reset interval.
1410 Parameters:
1411 -----------
1412 params: dict, optional
1413 Dictionary of parameters. If None provided, only on_epoch_end will trigger reset of hidden states.
1414 - bmin : int
1415 Minimum for batch reset schedule
1416 - bmax : int
1417 Maximum for batch reset schedule
1418 - epochs : int
1419 Number of training epochs.
1420 - loc_batch_reset : int
1421 Interval of batches after which to reset the states of RNN layers for location changes. Triggers reset for training AND validation phases
1422 - batch_schedule_type : str
1423 Type of batch scheduling to be used. Recognized methods are following:
1424 - 'constant' : Used fixed batch reset interval throughout training
1425 - 'linear' : Increases the batch reset interval linearly over epochs from bmin to bmax.
1426 - 'exp' : Increases the batch reset interval exponentially over epochs from bmin to bmax.
1427 - 'log' : Increases the batch reset interval logarithmically over epochs from bmin to bmax.
1428 - 'step' : Increases the batch reset interval from constant at bmin to constant at bmax after estep number o epochs
1431 Returns:
1432 -----------
1433 Only in-place reset of hidden states of RNN that calls uses this callback.
1435 """
1436 super(ResetStatesCallback, self).__init__()
1438 # Check for optional arguments, set None if missing in input params
1439 arg_list = ['bmin', 'bmax', 'epochs', 'estep', 'loc_batch_reset', 'batch_schedule_type']
1440 for arg in arg_list:
1441 setattr(self, arg, params.get(arg, None))
1443 self.verbose = verbose
1444 if self.verbose:
1445 print(f"Using ResetStatesCallback with Batch Reset Schedule: {self.batch_schedule_type}")
1446 # Calculate the reset intervals for each epoch during initialization
1447 if self.batch_schedule_type is not None:
1448 if self.epochs is None:
1449 raise ValueError(f"Arugment `epochs` cannot be none with self.batch_schedule_type: {self.batch_schedule_type}")
1450 self.batch_reset_intervals = self._calc_reset_intervals(self.batch_schedule_type)
1451 if self.verbose:
1452 print(f"batch_reset_intervals: {self.batch_reset_intervals}")
1453 else:
1454 self.batch_reset_intervals = None
1455 def on_epoch_end(self, epoch, logs=None):
1457 Resets the states of RNN layers at the end of each epoch.
1459 Parameters:
1460 -----------
1461 epoch : int
1462 The index of the current epoch.
1463 logs : dict, optional
1464 A dictionary containing metrics from the epoch. Default is None.
1465 """
1466 # print(f" Resetting hidden state after epoch: {epoch+1}", flush=True)
1467 # Iterate over each layer in the model
1468 for layer in self.model.layers:
1469 # Check if the layer has a reset_states method
1470 if hasattr(layer, 'reset_states'):
1471 layer.reset_states()
1472 def _calc_reset_intervals(self,batch_schedule_type):
1473 methods = ['constant', 'linear', 'exp', 'log', 'step']
1474 if batch_schedule_type not in methods:
1475 raise ValueError(f"Batch schedule method {batch_schedule_type} not recognized. \n Available methods: {methods}")
1476 if batch_schedule_type == "constant":
1478 return np.repeat(self.bmin, self.epochs).astype(int)
1479 elif batch_schedule_type == "linear":
1480 return np.linspace(self.bmin, self.bmax, self.epochs).astype(int)
1481 elif batch_schedule_type == "exp":
1482 return calc_exp_intervals(self.bmin, self.bmax, self.epochs)
1483 elif batch_schedule_type == "log":
1484 return calc_log_intervals(self.bmin, self.bmax, self.epochs)
1485 elif batch_schedule_type == "step":
1486 return calc_step_intervals(self.bmin, self.bmax, self.epochs, self.estep)
1487 def on_epoch_begin(self, epoch, logs=None):
1488 # Set the reset interval for the current epoch
1489 if self.batch_reset_intervals is not None:
1490 self.current_batch_reset = self.batch_reset_intervals[epoch]
1491 else:
1492 self.current_batch_reset = None
1493 def on_train_batch_end(self, batch, logs=None):
1495 Resets the states of RNN layers during training after a specified number of batches, if `batch_reset` or `loc_batch_reset` are provided. The `batch_reset` is used for stability and to avoid exploding gradients at the beginning of training when a hidden state is being passed with weights that haven't learned yet. The `loc_batch_reset` is used to reset the states when a particular batch is from a new location and thus the hidden state should be passed.
1497 Parameters:
1498 -----------
1499 batch : int
1500 The index of the current batch.
1501 logs : dict, optional
1502 A dictionary containing metrics from the batch. Default is None.
1503 """
1504 batch_reset = self.current_batch_reset
1505 if (batch_reset is not None and batch % batch_reset == 0):
1506 # print(f" Resetting states after batch {batch + 1}")
1507 # Iterate over each layer in the model
1508 for layer in self.model.layers:
1509 # Check if the layer has a reset_states method
1510 if hasattr(layer, 'reset_states'):
1511 layer.reset_states()
1512 def on_test_batch_end(self, batch, logs=None):
1514 Resets the states of RNN layers during validation if `loc_batch_reset` is provided to demarcate a new location and thus avoid passing a hidden state to a wrong location.
1516 Parameters:
1517 -----------
1518 batch : int
1519 The index of the current batch.
1520 logs : dict, optional
1521 A dictionary containing metrics from the batch. Default is None.
1522 """
1523 loc_batch_reset = self.loc_batch_reset
1524 if (loc_batch_reset is not None and batch % loc_batch_reset == 0):
1525 # print(f"Resetting states in Validation mode after batch {batch + 1}")
1526 # Iterate over each layer in the model
1527 for layer in self.model.layers:
1528 # Check if the layer has a reset_states method
1529 if hasattr(layer, 'reset_states'):
1530 layer.reset_states()
1532 ## Learning Schedules
1533 ## NOT TESTED YET
1534 lr_schedule = tf.keras.optimizers.schedules.CosineDecay(
1535 initial_learning_rate=0.01,
1536 decay_steps=200,
1537 alpha=0.0,
1538 name='CosineDecay',
1539 # warmup_target=None,
1540 # warmup_steps=100
1544 def EarlyStoppingCallback(patience=5):
1546 Creates an EarlyStopping callback with the specified patience.
1548 Args:
1549 patience (int): Number of epochs with no improvement after which training will be stopped.
1551 Returns:
1552 EarlyStopping: Configured EarlyStopping callback.
1554 return EarlyStopping(
1555 monitor='val_loss',
1556 patience=patience,
1557 verbose=1,
1558 mode='min',
1559 restore_best_weights=True
1562 phys_params = {
1563 'DeltaE': [0,-1], # bias correction
1564 'T1': 0.1, # 1/fuel class (10)
1565 'fm_raise_vs_rain': 0.2 # fm increase per mm rain
1570 def get_initial_weights(model_fit,params,scale_fm=1):
1571 # Given a RNN architecture and hyperparameter dictionary, return array of physics-initiated weights
1572 # Inputs:
1573 # model_fit: output of create_RNN_2 with no training
1574 # params: (dict) dictionary of hyperparameters
1575 # rnn_dat: (dict) data dictionary, output of create_rnn_dat
1576 # Returns: numpy ndarray of weights that should be a rough solution to the moisture ODE
1577 DeltaE = phys_params['DeltaE']
1578 T1 = phys_params['T1']
1579 fmr = phys_params['fm_raise_vs_rain']
1580 centering = [0] # shift activation down
1582 w0_initial={'Ed':(1.-np.exp(-T1))/2,
1583 'Ew':(1.-np.exp(-T1))/2,
1584 'rain':fmr * scale_fm} # wx - input feature
1585 # wh wb wd bd = bias -1
1587 w_initial=np.array([np.nan, np.exp(-0.1), DeltaE[0]/scale_fm, # layer 0
1588 1.0, -centering[0] + DeltaE[1]/scale_fm]) # layer 1
1589 if params['verbose_weights']:
1590 print('Equilibrium moisture correction bias',DeltaE[0],
1591 'in the hidden layer and',DeltaE[1],' in the output layer')
1593 w_name = ['wx','wh','bh','wd','bd']
1595 w=model_fit.get_weights()
1596 for j in range(w[0].shape[0]):
1597 feature = params['features_list'][j]
1598 for k in range(w[0].shape[1]):
1599 w[0][j][k]=w0_initial[feature]
1601 for i in range(1,len(w)): # number of the weight
1602 for j in range(w[i].shape[0]): # number of the inputs
1603 if w[i].ndim==2:
1604 # initialize all entries of the weight matrix to the same number
1605 for k in range(w[i].shape[1]):
1606 w[i][j][k]=w_initial[i]/w[i].shape[0]
1607 elif w[i].ndim==1:
1608 w[i][j]=w_initial[i]
1609 else:
1610 print('weight',i,'shape',w[i].shape)
1611 raise ValueError("Only 1 or 2 dimensions supported")
1612 if params['verbose_weights']:
1613 print('weight',i,w_name[i],'shape',w[i].shape,'ndim',w[i].ndim,
1614 'initial: sum',np.sum(w[i],axis=0),'\nentries',w[i])
1616 return w, w_name
1618 class RNN(RNNModel):
1620 A concrete implementation of the RNNModel abstract base class, using simple recurrent cells for hidden recurrent layers.
1622 Parameters:
1623 -----------
1624 params : dict
1625 A dictionary of model parameters.
1626 loss : str, optional
1627 The loss function to use during model training. Default is 'mean_squared_error'.
1629 def __init__(self, params, loss='mean_squared_error'):
1631 Initializes the RNN model by building the training and prediction models.
1633 Parameters:
1634 -----------
1635 params : dict or RNNParams
1636 A dictionary containing the model's parameters.
1637 loss : str, optional
1638 The loss function to use during model training. Default is 'mean_squared_error'.
1639 """
1640 super().__init__(params)
1641 self.model_train = self._build_model_train()
1642 self.model_predict = self._build_model_predict()
1644 def _build_model_train(self):
1646 Builds and compiles the training model, with batch & sequence shape specifications for input.
1648 Returns:
1649 --------
1650 model : tf.keras.Model
1651 The compiled Keras model for training.
1652 """
1653 inputs = tf.keras.Input(batch_shape=self.params['batch_shape'])
1654 x = inputs
1655 for i in range(self.params['rnn_layers']):
1656 # Return sequences True if recurrent layer feeds into another recurrent layer.
1657 # False if feeds into dense layer
1658 return_sequences = True if i < self.params['rnn_layers'] - 1 else False
1659 x = SimpleRNN(
1660 units=self.params['rnn_units'],
1661 activation=self.params['activation'][0],
1662 dropout=self.params["dropout"][0],
1663 recurrent_dropout = self.params["recurrent_dropout"],
1664 stateful=self.params['stateful'],
1665 return_sequences=return_sequences)(x)
1666 if self.params["dropout"][1] > 0:
1667 x = Dropout(self.params["dropout"][1])(x)
1668 for i in range(self.params['dense_layers']):
1669 x = Dense(self.params['dense_units'], activation=self.params['activation'][1])(x)
1670 # Add final output layer, must be 1 dense cell with linear activation if continuous scalar output
1671 x = Dense(units=1, activation='linear')(x)
1672 model = tf.keras.Model(inputs=inputs, outputs=x)
1673 optimizer=tf.keras.optimizers.Adam(learning_rate=self.params['learning_rate'])
1674 # optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule)
1675 model.compile(loss='mean_squared_error', optimizer=optimizer)
1677 if self.params["verbose_weights"]:
1678 print(f"Initial Weights Hash: {hash_weights(model)}")
1679 # print(model.get_weights())
1681 if self.params['phys_initialize']:
1682 assert self.params['scaler'] is None, f"Not implemented yet to do physics initialize with given data scaling {self.params['scaler']}"
1683 assert self.params['features_list'] == ['Ed', 'Ew', 'rain'], f"Physics initiation can only be done with features ['Ed', 'Ew', 'rain'], but given features {self.params['features_list']}"
1684 assert self.params['dense_layers'] == 0, f"Physics initiation requires no hidden dense layers, received params['dense_layers'] = {self.params['dense_layers']}"
1685 print("Initializing Model with Physics based weights")
1686 w, w_name=get_initial_weights(model, self.params)
1687 model.set_weights(w)
1688 print('initial weights hash =',hash_weights(model))
1689 return model
1691 def _build_model_predict(self, return_sequences=True):
1693 Builds and compiles the prediction model, doesn't use batch shape nor sequence length to make it easier to predict arbitrary number of timesteps. This model has weights copied over from training model is not directly used for training itself.
1695 Parameters:
1696 -----------
1697 return_sequences : bool, optional
1698 Whether to return the full sequence of outputs. Default is True.
1700 Returns:
1701 --------
1702 model : tf.keras.Model
1703 The compiled Keras model for prediction.
1704 """
1705 inputs = tf.keras.Input(shape=(None,self.params['n_features']))
1706 x = inputs
1707 for i in range(self.params['rnn_layers']):
1708 x = SimpleRNN(self.params['rnn_units'],activation=self.params['activation'][0],
1709 stateful=False,return_sequences=return_sequences)(x)
1710 for i in range(self.params['dense_layers']):
1711 x = Dense(self.params['dense_units'], activation=self.params['activation'][1])(x)
1712 # Add final output layer, must be 1 dense cell with linear activation if continuous scalar output
1713 x = Dense(units=1, activation='linear')(x)
1714 model = tf.keras.Model(inputs=inputs, outputs=x)
1715 optimizer=tf.keras.optimizers.Adam(learning_rate=self.params['learning_rate'])
1716 model.compile(loss='mean_squared_error', optimizer=optimizer)
1718 # Set Weights to model_train
1719 w_fitted = self.model_train.get_weights()
1720 model.set_weights(w_fitted)
1722 return model
1725 class RNN_LSTM(RNNModel):
1727 A concrete implementation of the RNNModel abstract base class, use LSTM cells for hidden recurrent layers.
1729 Parameters:
1730 -----------
1731 params : dict
1732 A dictionary of model parameters.
1733 loss : str, optional
1734 The loss function to use during model training. Default is 'mean_squared_error'.
1736 def __init__(self, params, loss='mean_squared_error'):
1738 Initializes the RNN model by building the training and prediction models.
1740 Parameters:
1741 -----------
1742 params : dict or RNNParams
1743 A dictionary containing the model's parameters.
1744 loss : str, optional
1745 The loss function to use during model training. Default is 'mean_squared_error'.
1746 """
1747 super().__init__(params)
1748 self.model_train = self._build_model_train()
1749 self.model_predict = self._build_model_predict()
1751 def _build_model_train(self):
1753 Builds and compiles the training model, with batch & sequence shape specifications for input.
1755 Returns:
1756 --------
1757 model : tf.keras.Model
1758 The compiled Keras model for training.
1759 """
1760 inputs = tf.keras.Input(batch_shape=self.params['batch_shape'])
1761 x = inputs
1762 for i in range(self.params['rnn_layers']):
1763 return_sequences = True if i < self.params['rnn_layers'] - 1 else False
1764 x = LSTM(
1765 units=self.params['rnn_units'],
1766 activation=self.params['activation'][0],
1767 dropout=self.params["dropout"][0],
1768 recurrent_dropout = self.params["recurrent_dropout"],
1769 recurrent_activation=self.params["recurrent_activation"],
1770 stateful=self.params['stateful'],
1771 return_sequences=return_sequences)(x)
1772 if self.params["dropout"][1] > 0:
1773 x = Dropout(self.params["dropout"][1])(x)
1774 for i in range(self.params['dense_layers']):
1775 x = Dense(self.params['dense_units'], activation=self.params['activation'][1])(x)
1776 x = Dense(units=1, activation="linear")(x)
1777 model = tf.keras.Model(inputs=inputs, outputs=x)
1778 # optimizer=tf.keras.optimizers.Adam(learning_rate=self.params['learning_rate'], clipvalue=self.params['clipvalue'])
1779 optimizer=tf.keras.optimizers.Adam(learning_rate=self.params['learning_rate'])
1780 model.compile(loss='mean_squared_error', optimizer=optimizer)
1782 if self.params["verbose_weights"]:
1783 print(f"Initial Weights Hash: {hash_weights(model)}")
1784 return model
1785 def _build_model_predict(self, return_sequences=True):
1787 Builds and compiles the prediction model, doesn't use batch shape nor sequence length to make it easier to predict arbitrary number of timesteps. This model has weights copied over from training model is not directly used for training itself.
1789 Parameters:
1790 -----------
1791 return_sequences : bool, optional
1792 Whether to return the full sequence of outputs. Default is True.
1794 Returns:
1795 --------
1796 model : tf.keras.Model
1797 The compiled Keras model for prediction.
1798 """
1799 inputs = tf.keras.Input(shape=(None,self.params['n_features']))
1800 x = inputs
1801 for i in range(self.params['rnn_layers']):
1802 x = LSTM(
1803 units=self.params['rnn_units'],
1804 activation=self.params['activation'][0],
1805 stateful=False,return_sequences=return_sequences)(x)
1806 for i in range(self.params['dense_layers']):
1807 x = Dense(self.params['dense_units'], activation=self.params['activation'][1])(x)
1808 x = Dense(units=1, activation="linear")(x)
1809 model = tf.keras.Model(inputs=inputs, outputs=x)
1810 optimizer=tf.keras.optimizers.Adam(learning_rate=self.params['learning_rate'])
1811 model.compile(loss='mean_squared_error', optimizer=optimizer)
1813 # Set Weights to model_train
1814 w_fitted = self.model_train.get_weights()
1815 model.set_weights(w_fitted)
1817 return model
1819 # Wrapper Functions putting everything together.
1820 # Useful for deploying from command line
1821 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1823 def rnn_data_wrap(dict0, params, features_subset=None):
1824 dict1 = dict0.copy()
1826 rnn_dat = RNNData(
1827 dict1, # input dictionary
1828 scaler=params['scaler'], # data scaling type
1829 features_list = params['features_list'] # features for predicting outcome
1833 rnn_dat.train_test_split(
1834 time_fracs = params['time_fracs'], # Percent of total time steps used for train/val/test
1835 space_fracs = params['space_fracs'] # Percent of total timeseries used for train/val/test
1837 if rnn_dat.scaler is not None:
1838 rnn_dat.scale_data()
1840 rnn_dat.batch_reshape(
1841 timesteps = params['timesteps'], # Timesteps aka sequence length for RNN input data.
1842 batch_size = params['batch_size'], # Number of samples of length timesteps for a single round of grad. descent
1843 start_times = np.zeros(len(rnn_dat.loc['train_locs']))
1846 return rnn_dat
1848 def forecast(model, data, spinup_hours = 0):
1849 return