Select Git revision
utility.py 6.66 KiB
"""
All utility functions that didn't fit anywhere else
"""
import sys
from typing import Union
import numpy as np
import torch
import pandas as pd
from utils.timefeatures import time_features
def fill_missing_values(df: pd.DataFrame, max_fill=10) -> pd.DataFrame:
"""
Fills values in a DataFrame.
First columns recognized as Precipitation columns (containing NEW or NVh) are filled with 0,
other columns are filled linear with a limit of 24, with warning if more than 5 continuos values are missing
Args:
df : DataFrame that might be missing values
max_fill : How many continuosly missing values are allowed
Returns:
DataFrame: Filled DataFrame (if possible)
"""
old_size = df.shape[0]
df = df.resample('h').mean()#TODO maybe not 'h' for other datasets?
na_count = df.isna().sum(axis=0)
# get all columns with precipitation and fill missing values with 0
mask = df.columns.str.contains('NEW') | df.columns.str.contains('NVh')
prec_cols = list(na_count[mask][na_count[mask] > 0].index)
if len(prec_cols) > 0:
df.loc[:, mask] = df.loc[:, mask].fillna(0)
# interpolate data in all other columns
df = df.interpolate(limit=max_fill, limit_direction='both')
if df.isna().sum().sum() > 0:
raise LargeGapError(f"Some columns were missing more than {max_fill} continuous values, either raise the limit or fill values manually." +
f"{df.isna().sum().sum()} still missing, maybe due to {len(df)-old_size} missing timestamps?")
return df
class LargeGapError(Exception):
"""Basic Custom error, thrown if a gap in a dataset is too large
Args:
Exception (Exception):
"""
def scale_minmax(x, xmin: float, xmax: float):
"""Minmax scaling, also unpacks torch tensors if needed
Args:
x (np.array,torch.Tensor or similar): array to inverse scaling
xmin (float): minimum value of a (training) dataset
xmax (float): maximum value of a (training) dataset
Returns:
(np.array,torch.Tensor or similar): minmax scaled values
"""
if isinstance(x,torch.Tensor) and isinstance(xmin, torch.Tensor) and isinstance(xmax, torch.Tensor):
return (x - xmin) / (xmax - xmin)
if isinstance(xmin, torch.Tensor):
xmin = xmin.item()
if isinstance(xmax, torch.Tensor):
xmax = xmax.item()
return (x - xmin) / (xmax - xmin)
def inv_minmax(x, xmin: float, xmax: float):
"""Inverse Minmax scaling, also unpacks torch tensors if needed
Args:
x (np.array,torch.Tensor or similar): array to inverse scaling
xmin (float): minimum value of a (training) dataset
xmax (float): maximum value of a (training) dataset
Returns:
(np.array,torch.Tensor or similar): inverse minmax scaled values
"""
if isinstance(x,torch.Tensor) and isinstance(xmin, torch.Tensor) and isinstance(xmax, torch.Tensor):
return x * (xmax-xmin) + xmin
if isinstance(xmin, torch.Tensor):
xmin = xmin.item()
if isinstance(xmax, torch.Tensor):
xmax = xmax.item()
return x * (xmax-xmin) + xmin
def scale_standard(x, mean: float, std: float):
"""Standard scaling, also unpacks torch tensors if needed
Args:
x (np.array,torch.Tensor or similar): array to inverse scaling
mean (float): mean value of a (training) dataset
std (float): standard deviation value of a (training) dataset
Returns:
(np.array,torch.Tensor or similar): minmax scaled values
"""
if isinstance(x,torch.Tensor) and isinstance(mean, torch.Tensor) and isinstance(std, torch.Tensor):
return (x - mean) / std
if isinstance(mean, torch.Tensor):
mean = mean.item()
if isinstance(std, torch.Tensor):
std = std.item()
return (x - mean) / std
def inv_standard(x, mean: float, std: float):
"""Inverse standard scaling, also unpacks torch tensors if needed
Args:
x (np.array,torch.Tensor or similar): array to inverse scaling
mean (float): mean value of a (training) dataset
std (float): standard deviation value of a (training) dataset
Returns:
(np.array,torch.Tensor or similar): inverse minmax scaled values
"""
if isinstance(x,torch.Tensor) and isinstance(mean, torch.Tensor) and isinstance(std, torch.Tensor):
return x * std + mean
if isinstance(mean, torch.Tensor):
mean = mean.item()
if isinstance(std, torch.Tensor):
std = std.item()
return x * std + mean
def is_float(s) -> bool:
"""Tests if a string can be casted to float
Args:
s (_type_): _description_
Returns:
Boolean: True ist str is castable to float
"""
if isinstance(s, int):
return False
try:
float(s)
return True
except (ValueError, TypeError):
return False
def is_int(s):
"""Tests if a string can be casted to int
Args:
s (_type_): _description_
Returns:
Boolean: True ist str is castable to int
"""
if isinstance(s, float) or (isinstance(s, str) and not s.isnumeric()):
return False
try:
int(s)
return True
except (ValueError, TypeError):
return False
def get_objective_metric(s):
"""Return a reasonable 0th value for metrics, to not mess with chart scaling.
Without a 0th value available at the start the hyperparameter tab doesn't work.
Args:
s (string): metric name
Returns:
float: 0th value for a metric
"""
if s in ['nse', 'kge']:
return 1
else:
return 0
def debugger_is_active() -> bool:
"""Return if the debugger is currently active"""
return hasattr(sys, 'gettrace') and sys.gettrace() is not None
def need_classic_input(s :str) -> bool:
return s in ["classic_lstm","last_lstm","tsmixer","chained_dense","ensemble"] or s is None
def encode_time(df_stamp,encoding='fixed'):
if encoding=='timeF':
data_stamp = time_features(pd.to_datetime(df_stamp.iloc[:,0].values), freq='h').astype('float32')
data_stamp = data_stamp.transpose(1, 0)
elif encoding in ['fixed','neural']:
df_stamp['month'] = df_stamp.iloc[:,0].apply(lambda row: row.month, 1)
df_stamp['day'] = df_stamp.iloc[:,0].apply(lambda row: row.day, 1)
df_stamp['weekday'] = df_stamp.iloc[:,0].apply(lambda row: row.weekday(), 1)
df_stamp['hour'] = df_stamp.iloc[:,0].apply(lambda row: row.hour, 1)
data_stamp = df_stamp.drop([df_stamp.columns[0]],axis=1).values
return torch.Tensor(data_stamp)
def softmax(x,axis=0):
"""Compute softmax values for each sets of scores in x."""
e_x = np.exp(x - np.max(x,axis=axis))
return e_x / e_x.sum(axis=axis)