diff --git a/src/utils/db_tools.py b/src/utils/db_tools.py index 2601a25c97736280ee7b7363e6b2d7e2d4fb59d2..043c2e2a911345b5259d004765c3b857ec222a9d 100644 --- a/src/utils/db_tools.py +++ b/src/utils/db_tools.py @@ -5,6 +5,7 @@ A collection of classes and functions for interacting with the oracle based Wavo import logging from datetime import datetime from pathlib import Path +from re import I from typing import List import warnings @@ -28,7 +29,8 @@ from utils.orm_classes import ( PegelForecasts, Log, Sensor, - InputForecastsMeta + InputForecastsMeta, + Modell ) # pylint: disable=unsupported-assignment-operation @@ -164,7 +166,7 @@ class OracleWaVoConnection: y = pred_single_db(model, df_input) finally: self.insert_forecast( - y, gauge_config["gauge"], gauge_config["model_folder"].name, end_time, member, created + y,gauge_config,end_time, member, created ) def load_input_db(self, in_size, end_time, gauge_config,created) -> pd.DataFrame: @@ -261,8 +263,7 @@ class OracleWaVoConnection: try: self.insert_forecast( y, - gauge_config["gauge"], - gauge_config["model_folder"].name, + gauge_config, end_time, member, created, @@ -409,8 +410,7 @@ class OracleWaVoConnection: def insert_forecast( self, forecast: torch.Tensor, - sensor_name: str, - model_name: str, + gauge_config: dict, end_time: pd.Timestamp, member: int, created: pd.Timestamp = None, @@ -421,40 +421,22 @@ class OracleWaVoConnection: Args: forecast (Tensor): The forecast data to be inserted. + gauge_config (dict): The configuration for the gauge. + end_time (pd.Timestamp): The timestamp of the forecast. member (int): The member identifier. - + created (pd.Timestamp, optional): The timestamp of the forecast creation. Returns: None """ + sensor_name = gauge_config["gauge"] + model_name = gauge_config["model_folder"].name + # Turn tensor into dict to if forecast.isnan().any(): fcst_values = {f"h{i}": None for i in range(1, 49)} else: fcst_values = {f"h{i}": forecast[i - 1].item() for i in range(1, 49)} - - if self.main_config.get("export_zrxp"): - target_file =self.main_config["zrxp_out_folder"] / f"{end_time.strftime("%Y%m%d%H")}_{sensor_name}_{model_name}_{member}.zrx" - #2023 11 19 03 - df_zrxp = pd.DataFrame(forecast,columns=["value"]) - df_zrxp["timestamp"] = end_time - df_zrxp["forecast"] = pd.date_range(start=end_time,periods=49,freq="1h")[1:] - df_zrxp["member"] = member - df_zrxp = df_zrxp[['timestamp','forecast','member','value']] - - - with open(target_file , 'w', encoding="utf-8") as file: - file.write('#REXCHANGEWISKI.' + model_name.split("_")[0] + '.W.KNN|*|\n') - file.write('#RINVAL-777|*|\n') - file.write('#LAYOUT(timestamp,forecast, member,value)|*|\n') - - df_zrxp.to_csv(path_or_buf = target_file, - header = False, - index=False, - mode='a', - sep = ' ', - date_format = '%Y%m%d%H%M') - - + self._export_zrxp(forecast, gauge_config, end_time, member, sensor_name, model_name, created) stmt = select(PegelForecasts).where( PegelForecasts.tstamp == bindparam("tstamp"), @@ -501,7 +483,39 @@ class OracleWaVoConnection: #try: session.commit() - + + def _export_zrxp(self, forecast, gauge_config, end_time, member, sensor_name, model_name, created): + if self.main_config.get("export_zrxp") and member == 0: + with Session(self.engine) as session: + stmt = select(Modell).where(Modell.modelldatei == str(gauge_config["model_folder"].resolve())) + model_obj = session.scalar(stmt) + + if model_obj is None: + logging.error("Model %s not found in database, skipping zrxp export",gauge_config["model_folder"]) + return + + target_file =self.main_config["zrxp_out_folder"] / f"{end_time.strftime("%Y%m%d%H")}_{sensor_name.replace(",","_")}_{model_name}.zrx" + #2023 11 19 03 + df_zrxp = pd.DataFrame(forecast,columns=["value"]) + df_zrxp["timestamp"] = end_time + df_zrxp["forecast"] = pd.date_range(start=end_time,periods=49,freq="1h")[1:] + df_zrxp["member"] = member + df_zrxp = df_zrxp[['timestamp','forecast','member','value']] + + + with open(target_file , 'w', encoding="utf-8") as file: + file.write('#REXCHANGEWISKI.' + model_obj.modellname + '.W.KNN|*|\n') + file.write('#RINVAL-777|*|\n') + file.write('#LAYOUT(timestamp,forecast, member,value)|*|\n') + + df_zrxp.to_csv(path_or_buf = target_file, + header = False, + index=False, + mode='a', + sep = ' ', + date_format = '%Y%m%d%H%M') + + def maybe_update_tables(self) -> None: """ Updates the database tables if necessary based on the configuration settings.