Skip to content
Snippets Groups Projects
Commit 47d39af9 authored by Michel Spils's avatar Michel Spils
Browse files

ensemble bug fixes

parent 35146d34
No related branches found
No related tags found
No related merge requests found
...@@ -133,6 +133,7 @@ class WaVoLightningEnsemble(pl.LightningModule): ...@@ -133,6 +133,7 @@ class WaVoLightningEnsemble(pl.LightningModule):
self.model_list = model_list self.model_list = model_list
self.max_in_size = max([model.hparams['in_size'] for model in self.model_list]) self.max_in_size = max([model.hparams['in_size'] for model in self.model_list])
self.in_size = self.max_in_size
self.out_size = model_list[0].hparams['out_size'] self.out_size = model_list[0].hparams['out_size']
self.feature_count = model_list[0].hparams['feature_count'] self.feature_count = model_list[0].hparams['feature_count']
......
...@@ -125,7 +125,7 @@ class WaVoLightningModule(pl.LightningModule): ...@@ -125,7 +125,7 @@ class WaVoLightningModule(pl.LightningModule):
x = batch[0] x = batch[0]
#This is not great, but necessary if i want to use a scaler. #This is not great, but necessary if i want to use a scaler.
pred = self.model(x) pred = self.model(x)
self.scaler.inverse_transform(x.reshape(-1, self.feature_count).cpu()).reshape(-1,self.in_size,self.feature_count) #self.scaler.inverse_transform(x.reshape(-1, self.feature_count).cpu()).reshape(-1,self.in_size,self.feature_count)
pred = ut.inv_standard(pred, self.scaler.mean_[self.target_idx], self.scaler.scale_[self.target_idx]) pred = ut.inv_standard(pred, self.scaler.mean_[self.target_idx], self.scaler.scale_[self.target_idx])
elif self.model_architecture == "dlinear": elif self.model_architecture == "dlinear":
...@@ -147,7 +147,8 @@ class WaVoLightningModule(pl.LightningModule): ...@@ -147,7 +147,8 @@ class WaVoLightningModule(pl.LightningModule):
if self.differencing == 1: if self.differencing == 1:
#This is only tested with the lstms #This is only tested with the lstms
x_org = self.scaler.inverse_transform(x.reshape(-1, self.feature_count).cpu()).reshape(-1,self.in_size,self.feature_count) x_org = self.scaler.inverse_transform(x.reshape(-1, self.feature_count).cpu()).reshape(-1,self.in_size,self.feature_count).astype('float32')
x_base = x_org[:,-1,self.gauge_idx].round(2) #round to get rid of floating point errors, the measure value is always an integer or has at most 1 decimal x_base = x_org[:,-1,self.gauge_idx].round(2) #round to get rid of floating point errors, the measure value is always an integer or has at most 1 decimal
x_base = torch.from_numpy(x_base).unsqueeze(1) x_base = torch.from_numpy(x_base).unsqueeze(1)
x_base = x_base.to(pred.device) x_base = x_base.to(pred.device)
......
import argparse
import logging
import pickle
import sys
from pathlib import Path
import lightning.pytorch as pl
import optuna
import torch
import yaml
from lightning.pytorch.callbacks import EarlyStopping, ModelCheckpoint
from lightning.pytorch.loggers import TensorBoardLogger
import utils.helpers as hp
import utils.utility as ut
from utils.callbacks import OptunaPruningCallback, WaVoCallback
from data_tools.data_module import WaVoDataModule
from models.ensemble_models import WaVoLightningEnsemble, WaVoLightningAttentionEnsemble
use_cuda = torch.cuda.is_available()
if use_cuda:
accelerator = "cuda"
torch.set_float32_matmul_precision("high")
max_free = 0
best_device = None
for j in range(torch.cuda.device_count()):
free, _ = torch.cuda.mem_get_info(j)
if free > max_free:
max_free = free
best_device = j
devices = [best_device]
else:
accelerator = "cpu"
devices = "auto"
storage_base = "sqlite:///../../../../data-project/KIWaVo/models/optuna/"
if ut.debugger_is_active():
max_epochs = 2
default_storage_name = "sqlite:///../../../data-project/KIWaVo/models/optuna/icaart_ensemble_debug_01.db"
else:
max_epochs = 2000
default_storage_name = (
"sqlite:///../../../../data-project/KIWaVo/models/optuna/icaart_ensemble_01.db"
)
class Objective:
"""
This class defines the objective function for hyperparameter tuning using Optuna library.
Args:
filename (str|Path): Path to .csv file, first column should be a timeindex
model_dir (str|Path): Path to the directory containing the base models
log_dir (str|Path): Path to the logging directory
select_strat (str): How to select the base models (random or first n)
model_count (int): How many base models to use
monitor (str, optional): metric to monitor. Defaults to 'hp/val_loss'.
"""
def __init__(
self,
filename,
modeldir,
logdir,
select_strat,
model_count,
monitor="hp/val_loss",
**kwargs,
):
# Hold these implementation specific arguments as the fields of the class.
self.filename = filename
self.model_dir = modeldir
self.log_dir = logdir
self.select_strat = select_strat
self.model_count = model_count
self.monitor = monitor
def _get_callbacks(self, trial):
checkpoint_callback = ModelCheckpoint(
save_top_k=1, monitor=self.monitor, save_weights_only=True
)
pruning_callback = OptunaPruningCallback(trial, monitor=self.monitor)
early_stop_callback = EarlyStopping(
monitor=self.monitor, mode="min", patience=3
)
my_callback = WaVoCallback(ensemble=True)
return my_callback, [
checkpoint_callback,
pruning_callback,
early_stop_callback,
my_callback,
]
def _get_model_params(self, trial):
model_params = dict(
hidden_size=trial.suggest_int("hidden_size", 32, 512),
num_layers=trial.suggest_int("n_layers", 2, 4),
dropout=0.25,
learning_rate=trial.suggest_float("lr", 0.00001, 0.01),
norm_func=trial.suggest_categorical("norm_func", ["softmax", "minmax"]),
)
return model_params
def __call__(self, trial):
model_params = self._get_model_params(trial)
# log_dir = '../../../data-project/KIWaVo/models/ensemble_debug/'
# model_dir = Path('../../../data-project/KIWaVo/models/icaarts_hollingstedt/lightning_logs/')
model_list = []
model_path_list = []
yaml_data = None
if self.select_strat == "random":
all_models = [x.name.split("_")[1] for x in self.model_dir.iterdir()]
elif self.select_strat == "first":
model_choice = list(range(self.model_count))
elif self.select_strat == "hardcoded":
model_choice = [37,88,106,110,116,137,171,175,181,186]
else:
raise ValueError("Invalid selection strategy")
# for s in [0,1,2]:
for s in model_choice:
temp_dir = self.model_dir / f"version_{s}/"
model_list.append(
hp.load_model_cuda(temp_dir, use_cuda=use_cuda, devices=devices)
)
model_path_list.append(str(temp_dir.resolve()))
if yaml_data is None:
yaml_data = hp.load_settings_model(temp_dir)
# with open(temp_dir / 'hparams.yaml', 'r') as file:
# yaml_data = yaml.load(file, Loader=yaml.FullLoader)
# yaml_data['scaler'] = pickle.loads(yaml_data['scaler'])
config = {
"scaler": yaml_data[
"scaler"
], # TODO test how this works without giving scaler etc. outside of jupyterlab
#'filename' : yaml_data['filename'],
"filename": str(self.filename),
"level_name_org": yaml_data["level_name_org"],
"out_size": yaml_data["out_size"],
"threshold": yaml_data["threshold"],
"feature_count": yaml_data["feature_count"],
"differencing": yaml_data["differencing"],
"model_architecture": "ensemble",
}
print("ACHTUNG GGF. FALSCHES MODELL")
ensemble_model = WaVoLightningEnsemble(model_list,model_path_list,**model_params)#TODO 'ÄNDERN!
#ensemble_model = WaVoLightningAttentionEnsemble(
# model_list, model_path_list, **model_params
#)
config["in_size"] = ensemble_model.max_in_size
data_module = WaVoDataModule(**config)
logging.info("Params: %s", trial.params)
my_callback, callbacks = self._get_callbacks(trial)
logger = TensorBoardLogger(self.log_dir, default_hp_metric=False)
trainer = pl.Trainer(
default_root_dir=self.log_dir,
gradient_clip_val=0.5,
logger=logger,
accelerator=accelerator,
devices=devices,
callbacks=callbacks,
max_epochs=max_epochs,
log_every_n_steps=10,
)
trainer.fit(ensemble_model, data_module)
# save metrics to optuna
model_path = str(Path(trainer.log_dir).resolve())
logging.info("model_path: %s", model_path)
trial.set_user_attr("model_path", model_path)
for metric in ["hp/val_nse", "hp/val_mae", "hp/val_mae_flood"]:
for i in [23, 47]:
trial.set_user_attr(
f"{metric}_{i}", my_callback.metrics[metric][i].item()
)
return my_callback.metrics[self.monitor].item()
def parse_args() -> argparse.Namespace:
"""Parse all the arguments and provides some help in the command line"""
parser: argparse.ArgumentParser = argparse.ArgumentParser(
description="Execute experiments for exp_icaart."
)
parser.add_argument(
"filename", metavar="datafile", type=Path, help="The path to your input data."
)
parser.add_argument(
"modeldir", metavar="modeldir", type=Path, help="The path to your base models."
)
parser.add_argument(
"logdir", type=Path, help="set a directory for logs and model checkpoints."
)
parser.add_argument(
"trials",
metavar="trials",
type=int,
default=100,
help="How many trials to run.",
)
parser.add_argument("select_strat", choices=["random", "first","hardcoded"], help="How to select the base models.")
parser.add_argument(
"model_count",
metavar="mc",
type=int,
default=5,
help="How many base models to use.",
)
parser.add_argument(
"--expname",
metavar="experiment_name",
type=str,
default="nameless",
help="The name of the experiment.",
)
parser.add_argument(
"--storagename",
metavar="storage_name",
type=str,
default=None,
help="The database for the experiment.",
)
return parser.parse_args()
def main():
parsed_args = parse_args()
if not parsed_args.logdir.exists():
parsed_args.logdir.mkdir(parents=True)
if False:
pruner = optuna.pruners.HyperbandPruner(
min_resource=1, max_resource="auto", reduction_factor=3, bootstrap_count=0
)
else:
pruner = optuna.pruners.NopPruner()
study_name = f"{parsed_args.filename.stem} {parsed_args.expname}" # Unique identifier of the study.
storage_name = (
default_storage_name
if parsed_args.storagename is None
else f"{storage_base}{parsed_args.storagename}.db"
)
# Logging, add stream handler of stdout to show the messages
logging.basicConfig(level=logging.INFO)
logFormatter = logging.Formatter(
"%(asctime)s;%(levelname)s;%(message)s", datefmt="%Y-%m-%d %H:%M:%S"
)
fileHandler = logging.FileHandler(
parsed_args.logdir / "ensemble_hyper.log",
)
consoleHandler = logging.StreamHandler(sys.stdout)
fileHandler.setFormatter(logFormatter)
consoleHandler.setFormatter(logFormatter)
logging.getLogger().addHandler(fileHandler)
# logging.getLogger().addHandler(consoleHandler)
optuna.logging.get_logger("optuna").addHandler(fileHandler)
optuna.logging.get_logger("optuna").addHandler(consoleHandler)
logging.info(
"Start of this execution======================================================================"
)
logging.info(
"Executing %s with device %s and parameters %s ",
sys.argv[0],
devices,
sys.argv[1:],
)
study = optuna.create_study(
study_name=study_name,
storage=storage_name,
direction="minimize",
pruner=pruner,
load_if_exists=True,
)
study.set_metric_names(["hp/val_loss"])
objective = Objective(**vars(parsed_args), gc_after_trial=True)
study.optimize(
objective,
n_trials=parsed_args.trials,
timeout=None,
callbacks=[lambda study, trial: torch.cuda.empty_cache()],
)
if __name__ == "__main__":
# TODO command line arguments
main()
...@@ -10,6 +10,7 @@ import numpy as np ...@@ -10,6 +10,7 @@ import numpy as np
#import pylab as plt #import pylab as plt
import torch import torch
from torch.utils.data import DataLoader from torch.utils.data import DataLoader
from pathlib import Path
from lightning.pytorch.callbacks import Callback from lightning.pytorch.callbacks import Callback
from optuna.integration import PyTorchLightningPruningCallback from optuna.integration import PyTorchLightningPruningCallback
...@@ -17,6 +18,9 @@ from tqdm import tqdm ...@@ -17,6 +18,9 @@ from tqdm import tqdm
import utils.metrics as mt import utils.metrics as mt
import utils.utility as ut import utils.utility as ut
import utils.helpers as hp
from models.lightning_module import WaVoLightningModule
from models.ensemble_models import WaVoLightningEnsemble
class WaVoCallback(Callback): class WaVoCallback(Callback):
...@@ -24,7 +28,9 @@ class WaVoCallback(Callback): ...@@ -24,7 +28,9 @@ class WaVoCallback(Callback):
First initialize all metrics and losses so that they are displayed in the tensorboard hyperparameter tab. First initialize all metrics and losses so that they are displayed in the tensorboard hyperparameter tab.
After fitting the model metrics are calculated for each forecast horizon on the predict, val and test dataloaders After fitting the model metrics are calculated for each forecast horizon on the predict, val and test dataloaders
provided by the used trainers DataModule. provided by the used trainers DataModule.
Each metric is also calculated for the subset of samples that are defined as flood samples. Each metric is also for the subset of samples that are defined as flood samples.
METRICS ARE CALCULATED FOR THE FIRST MODEL CHECKPOINT FOUND!
SHOULD ALWAYS BE THE LAST CALLBACK IN THE CALLBACK LIST. Uses predict, which overwrites some values in the trainer. SHOULD ALWAYS BE THE LAST CALLBACK IN THE CALLBACK LIST. Uses predict, which overwrites some values in the trainer.
...@@ -67,8 +73,18 @@ class WaVoCallback(Callback): ...@@ -67,8 +73,18 @@ class WaVoCallback(Callback):
metric_placeholder = trainer.callback_metrics.copy() metric_placeholder = trainer.callback_metrics.copy()
self.metrics = {} self.metrics = {}
model = trainer.model #model = trainer.model
#model.eval()
if isinstance(pl_module,WaVoLightningEnsemble):
checkpoint_path = next((Path(trainer.log_dir) / "checkpoints").iterdir())
model = WaVoLightningEnsemble.load_from_checkpoint(checkpoint_path,model_list=pl_module.model_list)
elif isinstance(pl_module,WaVoLightningModule):
model = hp.load_model(Path(trainer.log_dir))#TODO consider using trainer.predict ckpt_path param
else:
raise ValueError("Model is not of type WaVoLightningModule or WaVoLightningEnsemble")
model.eval() model.eval()
datamodule = trainer.datamodule datamodule = trainer.datamodule
for mode in ['train','val','test']: for mode in ['train','val','test']:
...@@ -163,18 +179,20 @@ class WaVoCallback(Callback): ...@@ -163,18 +179,20 @@ class WaVoCallback(Callback):
elif mode == 'test': elif mode == 'test':
dataset = data_module.test_set dataset = data_module.test_set
data_loader = data_module.test_dataloader() data_loader = data_module.test_dataloader()
else:
raise ValueError(f"Mode {mode} not recognized")
if ut.need_classic_input(model.model_architecture): if ut.need_classic_input(data_module.model_architecture):
y_true = torch.stack([y for _,y in dataset]) y_true = torch.stack([y for _,y in dataset])
else: else:
y_true = torch.stack([y for _,y,_,_ in dataset]) y_true = torch.stack([y for _,y,_,_ in dataset])
y_true = y_true[:,-model.out_size:,model.target_idx] y_true = y_true[:,-data_module.out_size:,data_module.target_idx]
y_true = ut.inv_standard(y_true, model.scaler.mean_[model.target_idx], model.scaler.scale_[model.target_idx]) y_true = ut.inv_standard(y_true, data_module.scaler.mean_[data_module.target_idx], data_module.scaler.scale_[data_module.target_idx])
if data_module.differencing == 1: if data_module.differencing == 1:
x_true = torch.stack([x for x,*_ in dataset]) x_true = torch.stack([x for x,*_ in dataset])
x_org = model.scaler.inverse_transform(x_true.reshape(-1,model.feature_count)).reshape(-1,model.in_size,model.feature_count) x_org = data_module.scaler.inverse_transform(x_true.reshape(-1,data_module.feature_count)).reshape(-1,data_module.in_size,data_module.feature_count)
x_base = x_org[:,-1,model.gauge_idx].round(2) x_base = x_org[:,-1,data_module.gauge_idx].round(2)
y_true = torch.from_numpy(x_base).unsqueeze(1) + y_true.cumsum(1) y_true = torch.from_numpy(x_base).unsqueeze(1) + y_true.cumsum(1)
return data_loader, y_true.cuda() return data_loader, y_true.cuda()
......
...@@ -64,7 +64,7 @@ def load_lightning_modules( ...@@ -64,7 +64,7 @@ def load_lightning_modules(
def get_pred( def get_pred(
model: WaVoLightningModule, data_loader: DataLoader, y_true: pd.Series,devices=None model: WaVoLightningModule, data_loader: DataLoader, y_true: pd.Series,devices=None,verbose=True
) -> pd.DataFrame: ) -> pd.DataFrame:
"""Makes predictions for the given model and data_loader and returns them as a DataFrame with the same index as y_true. """Makes predictions for the given model and data_loader and returns them as a DataFrame with the same index as y_true.
...@@ -73,6 +73,7 @@ def get_pred( ...@@ -73,6 +73,7 @@ def get_pred(
data_loader (DataLoader): data loader data_loader (DataLoader): data loader
y_true (pd.Series): Ground truth and index y_true (pd.Series): Ground truth and index
devices (optional,list): List of devices to use for prediction devices (optional,list): List of devices to use for prediction
verbose (bool, optional): Whether to print the progress bar etc. Defaults to True.
Returns: Returns:
pd.DataFrame: Prediction with correct index and ground truth as first column pd.DataFrame: Prediction with correct index and ground truth as first column
...@@ -80,7 +81,7 @@ def get_pred( ...@@ -80,7 +81,7 @@ def get_pred(
if devices is None: if devices is None:
devices=1 devices=1
trainer = pl.Trainer(accelerator="auto", devices=devices, logger=False) trainer = pl.Trainer(accelerator="auto", devices=devices, logger=False,enable_model_summary=verbose,enable_progress_bar=verbose)
# for whatever reason the model is moved to cpu in the predict function # for whatever reason the model is moved to cpu in the predict function
# I'm moving it back manually for now # I'm moving it back manually for now
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment