diff --git a/configs/db_import_model.yaml b/configs/db_import_model.yaml new file mode 100644 index 0000000000000000000000000000000000000000..aad2e3c6dcc5a9cfabffd5bccee6eac5ac69ebef --- /dev/null +++ b/configs/db_import_model.yaml @@ -0,0 +1,27 @@ +db_params: + user: c##mspils + #Passwort ist für eine lokale Datenbank ohne verbindung zum internet. Kann ruhig online stehen. + password: cobalt_deviancy + dsn: localhost/XE +models: + - gauge: mstr2,parm_sh,ts-sh + model_name : mein_model_unfug + model_folder: ../models_torch/tarp_version_49/ + columns: + - a,b,c + - mstr2,parm_sh,ts-sh + external_fcst: + - a,b,c + - gauge: 114547,S,60m.Cmd + model_name : mein_model + model_folder: ../models_torch/tarp_version_49/ + columns: + - 4466,SHum,vwsl + - 4466,SHum,bfwls + - 4466,AT,h.Cmd + - 114435,S,60m.Cmd + - 114050,S,60m.Cmd + - 114547,S,60m.Cmd + - 114547,Precip,h.Cmd + external_fcst: + - 114547,Precip,h.Cmd \ No newline at end of file diff --git a/src/dash_tools/style_configs.py b/src/dash_tools/style_configs.py index 3b7dfac4ef0b76ed6111a4064caf76bc08cd5af6..05b9d7edf1604ab249abd0f946db94b1c3d8773f 100644 --- a/src/dash_tools/style_configs.py +++ b/src/dash_tools/style_configs.py @@ -208,10 +208,11 @@ def create_inp_forecast_status_table(df_forecast): for sensor in sensor_names: sensor_data = df_forecast[df_forecast['sensor_name'] == sensor] for timestamp in time_range: - #TODO genauer hier + #TODO ENSEMBLES has_data = any( - (sensor_data['tstamp'] <= timestamp) & - (sensor_data['tstamp'] + timedelta(hours=48) >= timestamp) + sensor_data['tstamp'] == timestamp + #(sensor_data['tstamp'] <= timestamp) & + #(sensor_data['tstamp'] + timedelta(hours=48) >= timestamp) ) status_df.loc[timestamp, sensor] = 'OK' if has_data else 'Missing' diff --git a/src/dashboard.py b/src/dashboard.py index 025e736cd2d231921b135d325fea324c0bd471e2..c899b0eca5b82fc231544ee89c06482ff92823b7 100644 --- a/src/dashboard.py +++ b/src/dashboard.py @@ -6,7 +6,7 @@ import pandas as pd from datetime import datetime, timedelta from sqlalchemy import create_engine, select, and_, desc from sqlalchemy.orm import Session -from utils.orm_classes import Base, InputForecasts, Modell, PegelForecasts, Sensor, Log, ModellSensor, SensorData +from utils.db_tools.orm_classes import Base, InputForecasts, Modell, PegelForecasts, Sensor, Log, ModellSensor, SensorData import oracledb import os from sqlalchemy import select, func @@ -461,7 +461,7 @@ class ForecastMonitor: return {'display': 'none'}, '▶' def run(self, host='0.0.0.0', port=8050, debug=True): - self.app.run_server(host=host, port=port, debug=debug) + self.app.run(host=host, port=port, debug=debug) if __name__ == '__main__': monitor = ForecastMonitor( @@ -469,4 +469,6 @@ if __name__ == '__main__': password="cobalt_deviancy", dsn="localhost/XE" ) - monitor.run() \ No newline at end of file + #monitor.run(host="172.17.0.1", port=8050, debug=True) + monitor.run(host="134.245.232.166", port=8050, debug=True) + \ No newline at end of file diff --git a/src/import_model.py b/src/import_model.py new file mode 100644 index 0000000000000000000000000000000000000000..15781b11529efa09a291356a96776930f03bcdc3 --- /dev/null +++ b/src/import_model.py @@ -0,0 +1,284 @@ +"""Short script to add models to the database.""" +import argparse +import logging +import sys +from pathlib import Path + +import oracledb +import yaml +from sqlalchemy import create_engine, func, select +from sqlalchemy.orm import Session + +import utils.helpers as hp +from utils.db_tools.db_logging import OracleDBHandler +from utils.db_tools.orm_classes import (InputForecastsMeta, Modell, + ModellSensor, Sensor) + + +def parse_args() -> argparse.Namespace: + """Parse all the arguments and provides some help in the command line""" + parser: argparse.ArgumentParser = argparse.ArgumentParser( + description="Add models to the database. Needs a YAML File" + ) + + parser.add_argument( + "yaml_path", + metavar="yaml_path", + type=Path, + help="""The path to your config file. Example config file: + db_params: + user: c##mspils + #Passwort ist für eine lokale Datenbank ohne verbindung zum internet. Kann ruhig online stehen. + password: cobalt_deviancy + dsn: localhost/XE + models: + - gauge: 114547,S,60m.Cmd + model_name : mein_model + model_folder: ../../models_torch/tarp_version_49/ + columns: + - 4466,SHum,vwsl + - 4466,SHum,bfwls + - 4466,AT,h.Cmd + - 114435,S,60m.Cmd + - 114050,S,60m.Cmd + - 114547,S,60m.Cmd + - 114547,Precip,h.Cmd + external_fcst: + - 114547,Precip,h.Cmd + """ + ) + return parser.parse_args() + + +def get_configs(yaml_path: Path): + """ + Retrieves the configurations from a YAML file. + Also converts the paths to Path objects and the dates to datetime objects. + + Args: + passed_args (argparse.Namespace): The command-line arguments. + + Returns: + Tuple[dict, List[dict]]: A tuple containing the main configuration dictionary + and a list of gauge configuration dictionaries. + """ + + with open(yaml_path, "r", encoding="utf-8") as file: + config = yaml.safe_load(file) + + for model_config in config["models"]: + model_config["model_folder"] = Path(model_config["model_folder"]) + assert all([isinstance(c,str) for c in model_config["columns"]]) + assert isinstance(model_config["gauge"],str) + #assert all([isinstance(k,str) and isinstance(v,str) for k,v in gauge_config["external_fcst"].items()]) + assert all([isinstance(c,str) for c in model_config["external_fcst"]]) + return config + +def _prepare_logging(con) -> None: + old_factory = logging.getLogRecordFactory() + def record_factory(*args, **kwargs): + record = old_factory(*args, **kwargs) + record.gauge_id = kwargs.pop("gauge_id", None) + return record + logging.setLogRecordFactory(record_factory) + + log_formatter = logging.Formatter( + "%(asctime)s;%(levelname)s;%(message)s", datefmt="%Y-%m-%d %H:%M:%S" + ) + db_handler = OracleDBHandler(con) + db_handler.setFormatter(log_formatter) + + db_handler.setLevel(logging.INFO) + logging.getLogger().addHandler(db_handler) + logging.info("Executing %s with parameters %s ", sys.argv[0], sys.argv[1:]) + + +class ModelImporter: + """Class for handling the import of models into the database. + """ + def __init__(self, con: oracledb.Connection): + self.con = con + self.engine = create_engine("oracle+oracledb://", creator=lambda: con) + + def process_model(self, session: Session, model_config: dict): + """Process a model configuration and add it to the database. This includes adding the model, sensors,model sensors and input_forecast_meta objects. + + Args: + session (Session): orm session + model_config (dict): model configuration dictionary.Example: + + "gauge": "mstr2,parm_sh,ts-sh" + "model_name": "mein_model_unfug" + "model_folder": "../../models_torch/tarp_version_49/" + "columns": ["a,b,c", + "mstr2,parm_sh,ts-sh"] + "external_fcst": ["a,b,c"] + """ + self._validate_config(model_config) + model_hparams = hp.load_settings_model(model_config["model_folder"]) + + # Check if model already exists in database + if (old_model := self._get_model_maybe(session, model_config["model_name"])) is not None: + logging.warning("Model with name %s already exists in database,skipping",old_model) + #TODO updating requires also updating the ModellSensor table, otherwise we get 2 rows with the same modell_id sensor_name combination. + #if mi._get_user_confirmation(f"Press y to confirm changing the model {old_model} or n to skip"): + # logging.info(f"Updating model {old_model}") + # sensor_obj = mi._parse_sensor_maybe_add(session,target_sensor_name) + # model_id = old_model.id + # mi._update_model(old_model,sensor_obj,model_config,model_hparams) + #else: + # continue + return + else: + # Add Model to database + model_id = self._add_model(session,model_config,model_hparams) + + for i,col in enumerate(model_config["columns"]): + _ = self._parse_sensor_maybe_add(session,col) + inp_fcst_meta = self._maybe_add_inp_fcst_meta(session,model_config,col) + vhs_gebiet = None if inp_fcst_meta is None else inp_fcst_meta.vhs_gebiet + _ = self._maybe_add_modelsensor(session,model_id,col,vhs_gebiet,i) + + + def _get_next_model_id(self, session: Session) -> float: + """Get next available model ID""" + #TODO the database should use an IDENTITY column for the model table, then we could remove this + result = session.execute(select(func.max(Modell.id))).scalar() + return 1 if result is None else result + 1 + + def _get_sensor_maybe(self, session: Session, sensor_name: str): + """Check if model exists in database""" + stmt = select(Sensor).where(Sensor.sensor_name == sensor_name) + return session.scalars(stmt).first() + + def _get_model_maybe(self, session: Session, model_name: str) -> Modell: + """Check if model exists in database""" + stmt = select(Modell).where(Modell.modellname == model_name) + return session.scalars(stmt).first() + + def _get_user_confirmation(self, message: str) -> bool: + """Get user confirmation for action""" + response = input(f"{message} (y/n): ").lower() + return response == 'y' + + def _get_user_input(self, message: str) -> str: + """Get user input for action""" + response = input(f"{message} (y/n): ") + return response + + def _parse_sensor_maybe_add(self,session,sensor_name: str) -> Sensor: + """ Trie to get sensor from SENSOR table, if not found, tries to interpolate values from sensor name or user input. + Also creates and adds sensor to SENSOR table if not found. + """ + if (sensor_obj := self._get_sensor_maybe(session,sensor_name)) is None: + logging.info("Sensor %s not found in table SENSOR, attempting interpolation from string, adding to database",sensor_name) + try: + mstnr, type_short, ts_short = sensor_name.split(",") + except ValueError as e: + logging.error("Could not interpolate values from sensor name %s, expected format is 'mstnr, parameter_kurzname, TS Shortname' comma separated",sensor_name) + raise ValueError from e + + sensor_obj = Sensor(mstnr=mstnr,parameter_kurzname=type_short,ts_shortname=ts_short,beschreibung="Automatisch generierter Eintrag",sensor_name=sensor_name) + session.add(sensor_obj) + logging.info(msg=f"Added Sensor {sensor_obj} to database") + return sensor_obj + + def _maybe_add_modelsensor(self,session,model_id,col,vhs_gebiet,ix) -> ModellSensor: + """ Creates a ModellSensor object and adds it to the database. + """ + model_sensor = ModellSensor( + modell_id = model_id, + sensor_name = col, + vhs_gebiet = vhs_gebiet, + ix = ix + ) + session.add(model_sensor) + return model_sensor + + def _maybe_add_inp_fcst_meta(self,session,model_config:dict,col:str) -> InputForecastsMeta: + """Get or add input forecast metadata to database""" + if col not in model_config["external_fcst"]: + return None + if (inp_fcst := session.scalars(select(InputForecastsMeta).where(InputForecastsMeta.sensor_name == col)).first()) is None: + logging.info("Input forecast %s not found in table INPUT_FORECASTS_META, adding to database",col) + vhs_gebiet = self._get_user_input(f"Enter the name (vhs_gebiet) of the area or station for the input forecast {col}") + ensemble_members = self._get_user_input(f"Enter the number of ensemble members for the input forecast {col}, 1 for deterministic, 21 for ICON, other values not supported") + ensemble_members = int(ensemble_members) + assert ensemble_members in [1,21], "Only 1 (deterministic) and 21 (ICON) ensemble + deterministic are supported" + + inp_fcst = InputForecastsMeta(sensor_name=col,vhs_gebiet=vhs_gebiet,ensemble_members=ensemble_members) + session.add(inp_fcst) + logging.info(msg=f"Added Input_Forecasts_Meta {inp_fcst} to database") + + return inp_fcst + + def _validate_config(self,model_config): + if not set(model_config["external_fcst"]) <= set(model_config["columns"]): + logging.error("Not all external forecasts are in columns") + raise ValueError + if model_config["gauge"] not in model_config["columns"]: + logging.error("Gauge not in columns") + raise ValueError("Gauge %s not in columns" % model_config["gauge"]) + + def _add_model(self,session,model_config,model_hparams) -> int: + target_sensor_name = model_config["gauge"] + sensor_obj = self._parse_sensor_maybe_add(session,target_sensor_name) + model_id = self._get_next_model_id(session) + model_obj = Modell(id=model_id, + mstnr=sensor_obj.mstnr, + modellname=model_config["model_name"], + modelldatei=str(model_config["model_folder"].resolve()), + aktiv=1, + kommentar=' \n'.join(model_hparams["scaler"].feature_names_in_), + in_size=model_hparams["in_size"], + target_sensor_name=target_sensor_name, + ) + session.add(model_obj) + logging.info("Added model %s to database",model_obj) + return model_obj.id + + + + def _update_model(self,old_model,sensor_obj,model_config,model_hparams): + #def update_log_lambda(name,old,new): kurz, aber hacky. Müsster außerdim mit __dict__ oder so angepasst werden + # if old != new: + # logging.info(f"Updating {old_model} {name} from {old} to {new}") + # old = new + if old_model.mstnr !=sensor_obj.mstnr: + logging.info(msg=f"Updating {old_model} mstnr to {sensor_obj.mstnr}") + old_model.mstnr =sensor_obj.mstnr + if old_model.modelldatei != (modelldatei:=str(model_config["model_folder"].resolve())): + logging.info(msg=f"Updating {old_model} modelldatei to {modelldatei}") + old_model.modelldatei=modelldatei + if old_model.kommentar != (kommentar:= ' \n'.join(model_hparams["scaler"].feature_names_in_)): + logging.info(msg=f"Updating {old_model} kommentar to {kommentar}") + old_model.kommentar=kommentar + if old_model.in_size != model_hparams["in_size"]: + logging.info(msg=f"Updating {old_model} in_size to {model_hparams['in_size']}") + old_model.in_size=model_hparams["in_size"] + if old_model.target_sensor_name != model_config["gauge"]: + logging.info(msg=f"Updating {old_model} target_sensor_name to { model_config["gauge"]}") + old_model.target_sensor_name= model_config["gauge"] + + +def main(): + """Main function to import models into the database""" + passed_args = parse_args() + + config = get_configs(passed_args.yaml_path) + if "config_dir" in config["db_params"]: + logging.info("Initiating Thick mode with executable %s",config["db_params"]["config_dir"],) + oracledb.init_oracle_client(lib_dir=config["db_params"]["config_dir"]) + else: + logging.info("Initiating Thin mode") + con = oracledb.connect(**config["db_params"]) + _prepare_logging(con) + + mi = ModelImporter(con) + with Session(mi.engine) as session: + for model_config in config["models"]: + mi.process_model(session,model_config) + session.commit() + +if __name__ == "__main__": + main() diff --git a/src/predict_database.py b/src/predict_database.py index c67e59f35602a1ffcfa97a5403ced072b2865bf6..63afc47eb4d783d41ab4c64523dfbc6e7c6f41bb 100644 --- a/src/predict_database.py +++ b/src/predict_database.py @@ -11,7 +11,8 @@ import oracledb import pandas as pd import yaml -from utils.db_tools.db_tools import OracleDBHandler, OracleWaVoConnection +from utils.db_tools.db_tools import OracleWaVoConnection +from utils.db_tools.db_logging import OracleDBHandler def get_configs(passed_args: argparse.Namespace) -> Tuple[dict, List[dict]]: diff --git a/src/utils/db_tools/db_logging.py b/src/utils/db_tools/db_logging.py index a66640383684f8a158af064dd4d9fbf7b61db9e6..66e15c7e9a50bde1e4809d4efa109400bae23c41 100644 --- a/src/utils/db_tools/db_logging.py +++ b/src/utils/db_tools/db_logging.py @@ -1,5 +1,5 @@ import logging -from utils.orm_classes import Log +from utils.db_tools.orm_classes import Log from sqlalchemy import create_engine from sqlalchemy.orm import Session import pandas as pd diff --git a/src/utils/db_tools/db_tools.py b/src/utils/db_tools/db_tools.py index b04ec5ed4fcdea5432cb713413571f03d281cf1f..7842b476f7dd24e9c00753cef600be863190438e 100644 --- a/src/utils/db_tools/db_tools.py +++ b/src/utils/db_tools/db_tools.py @@ -21,7 +21,7 @@ from sqlalchemy.exc import IntegrityError import torch import utils.helpers as hp -from utils.orm_classes import ( +from utils.db_tools.orm_classes import ( ModellSensor, SensorData, InputForecasts, diff --git a/src/utils/orm_classes.py b/src/utils/db_tools/orm_classes.py similarity index 90% rename from src/utils/orm_classes.py rename to src/utils/db_tools/orm_classes.py index a80a75f680486c6657a02544e5bf27309cef5722..c871ace891cd6499a2a3057ccb71e978161faa85 100644 --- a/src/utils/orm_classes.py +++ b/src/utils/db_tools/orm_classes.py @@ -4,7 +4,7 @@ The alternative is to use table reflection from sqlalchemy. from typing import List, Optional import datetime -from sqlalchemy import Double, ForeignKeyConstraint, Index, Integer, PrimaryKeyConstraint, TIMESTAMP, VARCHAR,Identity +from sqlalchemy import Double, ForeignKeyConstraint, Index, Integer, PrimaryKeyConstraint, TIMESTAMP, VARCHAR,Identity, String, UniqueConstraint from sqlalchemy.dialects.oracle import NUMBER from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship @@ -40,6 +40,11 @@ class InputForecastsMeta(Base): vhs_gebiet : Mapped[str] = mapped_column(VARCHAR(256), primary_key=True) ensemble_members : Mapped[int] = mapped_column(NUMBER(5, 0, False)) + def __repr__(self) -> str: + return (f"InputForecastsMeta(sensor_name='{self.sensor_name}', " + f"vhs_gebiet='{self.vhs_gebiet}', " + f"ensemble_members={self.ensemble_members})") + class Modell(Base): __tablename__ = 'modell' @@ -48,15 +53,37 @@ class Modell(Base): Index('id_modellname', 'modellname', unique=True) ) - id: Mapped[float] = mapped_column(NUMBER(10, 0, False), primary_key=True) + #id: Mapped[float] = mapped_column(NUMBER(10, 0, False), primary_key=True) + id: Mapped[float] = mapped_column( + NUMBER(10, 0, False), + Identity( + start=1, + increment=1, + minvalue=1, + maxvalue=9999999999, + cycle=False, + cache=20, + order=False + ), + primary_key=True + ) + mstnr: Mapped[Optional[str]] = mapped_column(VARCHAR(30)) modellname: Mapped[Optional[str]] = mapped_column(VARCHAR(256)) modelldatei: Mapped[Optional[str]] = mapped_column(VARCHAR(256)) aktiv: Mapped[Optional[float]] = mapped_column(NUMBER(1, 0, False)) kommentar: Mapped[Optional[str]] = mapped_column(VARCHAR(1024)) + in_size: Mapped[Optional[int]] = mapped_column( NUMBER(38,0)) + target_sensor_name: Mapped[Optional[str]] = mapped_column(VARCHAR(256)) + modell_sensor: Mapped[List['ModellSensor']] = relationship('ModellSensor', back_populates='modell') + def __repr__(self) -> str: + return (f"Modell(id={self.id}, " + f"mstnr='{self.mstnr}', " + f"modellname='{self.modellname}', " + f"aktiv={self.aktiv})") class Sensor(Base): __tablename__ = 'sensor' @@ -77,6 +104,11 @@ class Sensor(Base): pegel_forecasts: Mapped[List['PegelForecasts']] = relationship('PegelForecasts', back_populates='sensor') sensor_data: Mapped[List['SensorData']] = relationship('SensorData', back_populates='sensor') + def __repr__(self) -> str: + return (f"Sensor(sensor_name='{self.sensor_name}', " + f"mstnr='{self.mstnr}', " + f"parameter_kurzname='{self.parameter_kurzname}', " + f"ts_shortname='{self.ts_shortname}')") class TmpSensorData(Base): __tablename__ = 'tmp_sensor_data'