diff --git a/configs/db_new.yaml b/configs/db_new.yaml index 08bed34b38c26c63b517faca51963961f34b82b0..dd07f1acb2c833fe4b7d033e2a46459c8d13e822 100644 --- a/configs/db_new.yaml +++ b/configs/db_new.yaml @@ -12,34 +12,23 @@ main_config: #If True exports the forecasts if successful to zrxp export_zrxp : True #if True tries to put all files in folder in the external forecast table + fake_external : False load_sensor : False load_zrxp : True ensemble : True - single : False + single : True dummy : False #start: 2019-12-01 02:00 #start: 2023-04-19 03:00 #start: 2023-11-19 03:00 start: 2024-09-13 09:00 + end: !!str 2024-09-14 09:00 #end: !!str 2021-02-06 # if range is true will try all values from start to end for predictions - #range: !!bool True - range: !!bool False + range: !!bool True + #range: !!bool False zrxp: !!bool True gauge_configs: - - gauge: 114547,S,60m.Cmd - model_folder: ../models_torch/tarp_version_49/ - columns: - - 4466,SHum,vwsl - - 4466,SHum,bfwls - - 4466,AT,h.Cmd - - 114435,S,60m.Cmd - - 114050,S,60m.Cmd - - 114547,S,60m.Cmd - - 114547,Precip,h.Cmd - external_fcst: - - 114547,Precip,h.Cmd - #114547,Precip,h.Cmd : Tarp - gauge: 112211,S,5m.Cmd model_folder: ../models_torch/hollingstedt_version_37 columns: @@ -55,6 +44,19 @@ gauge_configs: external_fcst: - 9530010,S_Tide,1m.Cmd - 112211,Precip,h.Cmd + - gauge: 114547,S,60m.Cmd + model_folder: ../models_torch/tarp_version_49/ + columns: + - 4466,SHum,vwsl + - 4466,SHum,bfwls + - 4466,AT,h.Cmd + - 114435,S,60m.Cmd + - 114050,S,60m.Cmd + - 114547,S,60m.Cmd + - 114547,Precip,h.Cmd + external_fcst: + - 114547,Precip,h.Cmd + #114547,Precip,h.Cmd : Tarp - gauge: 114069,Precip,h.Cmd model_folder: ../models_torch/version_74_treia/ columns: diff --git a/src/app.py b/src/app.py new file mode 100644 index 0000000000000000000000000000000000000000..bb7532c31a291db5d84b3669e4793e165eaeddc3 --- /dev/null +++ b/src/app.py @@ -0,0 +1,509 @@ +import os +from datetime import datetime, timedelta + +import dash_bootstrap_components as dbc +import oracledb +import pandas as pd +#import plotly.express as px +#import plotly.graph_objs as go +import dash +from dash import Dash, dash_table, dcc, html,ctx +from dash.dependencies import MATCH, Input, Output, State +from sqlalchemy import and_, create_engine, desc, func, select,distinct +from sqlalchemy.orm import Session + +from dashboard.styles import TABLE_STYLE +#from dash_tools.layout_helper import create_collapsible_section +from dashboard.tables_and_plots import (create_historical_plot, + create_historical_table, + create_inp_forecast_status_table, + create_input_forecasts_plot, + create_log_table, + create_model_forecasts_plot, + create_status_summary_chart) + +from dashboard.constants import NUM_RECENT_FORECASTS, NUM_RECENT_LOGS, BUFFER_TIME,LOOKBACK_EXTERNAL_FORECAST_EXISTENCE +from utils.db_tools.orm_classes import (InputForecasts, Log, Modell, + ModellSensor, PegelForecasts, + SensorData) + + + +#TODO add logging to dashboard? +class ForecastMonitor: + def __init__(self, username, password, dsn): + self.db_params = { + "user": username, + "password": password, + "dsn": dsn + } + try: + self.con = oracledb.connect(**self.db_params) + self.engine = create_engine("oracle+oracledb://", creator=lambda: self.con) + except oracledb.OperationalError as e: + print(f"Error connecting to database: {str(e)}") + self.con = None + self.engine = None + + self.app = Dash(__name__, + suppress_callback_exceptions=True, + external_stylesheets=[dbc.themes.BOOTSTRAP], + assets_url_path='dashboard') + self.setup_layout() + self.setup_callbacks() + + + def _make_section(self,title,section_id,view_id): + # Input Forecasts Section + section_card = dbc.Card([ + dbc.CardHeader([ + dbc.Button( + children=[ + html.Span(title, style={'flex': '1'}), + html.Span("▼", className="ms-2") + ], + color="link", + id={'type': 'collapse-button', 'section': section_id}, + className="text-decoration-none text-dark h5 mb-0 w-100 d-flex align-items-center", + ) + ]), + dbc.Collapse( + dbc.CardBody(html.Div(id=view_id)), + id={'type': 'collapse-content', 'section': section_id}, + is_open=True + ) + ], className="mb-4") + return section_card + + + def setup_layout(self): + # Header + header = dbc.Navbar( + dbc.Container( + dbc.Row([ + dbc.Col( + html.H1("Pegel Dashboard", className="mb-0 text-white"), + width="auto" + ) + ]), + fluid=True + ), + color="primary", + dark=True, + className="mb-4" + ) + + # Main content + main_content = dbc.Container([ + # Row for the entire dashboard content + dbc.Row([ + # Left column - fixed width + dbc.Col([ + dbc.Card([ + dbc.CardHeader(html.H3("Modell Status", className="h5 mb-0")), + dbc.CardBody([ + html.Div(id='model-status-table'), + dcc.Interval( + id='status-update', + interval=600000 # Update every 10 minutes + ) + ]) + ], className="mb-4"), + dbc.Card([ + dbc.CardHeader(html.H3("Status Übersicht", className="h5 mb-0")), + dbc.CardBody([dcc.Graph(id='status-summary-chart')]) + ]) + ], xs=12, md=5, className="mb-4 mb-md-0"), # Responsive column + # Right column + dbc.Col([ + dcc.Store(id='current-sensor-names'), + # Model Forecasts Section + self._make_section("Vorhersagen",'forecasts','fcst-view'), + # Historical Data Section + self._make_section("Messwerte",'historical','historical-view'), + # Input Forecasts Section + #self._make_section("Input Forecasts",'input-forecasts','inp-fcst-view'), + # Recent Logs Section + self._make_section("Logs",'logs','log-view'), + ], xs=12, md=7) # Responsive column + ]) + ], fluid=True, className="py-4") + + # Combine all elements + self.app.layout = html.Div([ + header, + main_content + ], className="min-vh-100 bg-light") + + def get_model_name_from_path(self, modelldatei): + if not modelldatei: + return None + return os.path.basename(modelldatei) + + def get_active_models_status(self): + try: + now = datetime.now() + last_required_hour = now -BUFFER_TIME + last_required_hour = last_required_hour.replace(minute=0, second=0, microsecond=0) + while last_required_hour.hour % 3 != 0: + last_required_hour -= timedelta(hours=1) + + with Session(self.engine) as session: + active_models = session.query(Modell).filter(Modell.aktiv == 1).all() + + model_status = [] + for model in active_models: + actual_model_name = self.get_model_name_from_path(model.modelldatei) + if not actual_model_name: + continue + + current_forecast = session.query(PegelForecasts).filter( + and_( + PegelForecasts.model == actual_model_name, + PegelForecasts.tstamp == last_required_hour + ) + ).first() + + last_valid_forecast = session.query(PegelForecasts).filter( + and_( + PegelForecasts.model == actual_model_name, + PegelForecasts.h1 is not None + ) + ).order_by(PegelForecasts.tstamp.desc()).first() + + model_status.append({ + 'model_name': model.modellname, + 'actual_model_name': actual_model_name, + 'model_id': model.id, # Added for historical data lookup + 'sensor_name': last_valid_forecast.sensor_name if last_valid_forecast else None, + 'has_current_forecast': current_forecast is not None, + 'last_forecast_time': last_valid_forecast.tstamp if last_valid_forecast else None, + 'forecast_created': last_valid_forecast.created if last_valid_forecast else None, + }) + + return { + 'model_status': model_status, + 'last_check_time': now, + 'required_timestamp': last_required_hour + } + + except Exception as e: + print(f"Error getting model status: {str(e)}") + return None + + def get_recent_forecasts(self, actual_model_name): + """Get recent forecasts for a specific model and sensor""" + try: + subq = ( + select( + PegelForecasts, + func.row_number() + .over( + partition_by=[PegelForecasts.member], + order_by=PegelForecasts.tstamp.desc() + ).label('rn')) + .where(PegelForecasts.model == actual_model_name) + .subquery() + ) + + stmt = ( + select(subq) + .where(subq.c.rn <= NUM_RECENT_FORECASTS) + .order_by( + subq.c.member, + subq.c.tstamp.desc() + ) + ) + df = pd.read_sql(sql=stmt, con=self.engine) + df.drop(columns=['rn'], inplace=True) + df.dropna(inplace=True) + return df + + + except Exception as e: + print(f"Error getting recent forecasts: {str(e)}") + return pd.DataFrame() + + def get_input_forecasts(self, sensor_names): + """Get input forecasts for the last NUM_RECENT_FORECASTS+1 *3 hours for the given sensor names""" + try: + stmt = select(InputForecasts).where(InputForecasts.sensor_name.in_(sensor_names),InputForecasts.tstamp > (datetime.now() - (NUM_RECENT_FORECASTS+1)*pd.Timedelta(hours=3))) + df = pd.read_sql(sql=stmt, con=self.engine) + + with Session(self.engine) as session: + existing = session.query(distinct(InputForecasts.sensor_name))\ + .filter(InputForecasts.sensor_name.in_(sensor_names))\ + .filter(InputForecasts.tstamp >= datetime.now() - timedelta(days=LOOKBACK_EXTERNAL_FORECAST_EXISTENCE))\ + .all() + ext_forecast_names = [x[0] for x in existing] + + + #df.drop(columns=['rn'], inplace=True) + return df, ext_forecast_names + except Exception as e: + raise RuntimeError(f"Error getting input forecasts: {str(e)}") from e + + def get_recent_logs(self, sensor_name): + """Get the NUM_RECENT_LOGS most recent logs for a specific sensor""" + if sensor_name is None: + return [] + try: + with Session(self.engine) as session: + logs = session.query(Log).filter( + Log.gauge == sensor_name + ).order_by( + desc(Log.created) + ).limit(NUM_RECENT_LOGS).all() + + return [ + { + 'timestamp': log.created.strftime('%Y-%m-%d %H:%M:%S'), + 'level': log.loglevelname, + 'sensor': log.gauge, + 'message': log.message, + 'module': log.module, + 'function': log.funcname, + 'line': log.lineno, + 'exception': log.exception or '' + } + for log in logs + ] + except Exception as e: + print(f"Error getting logs: {str(e)}") + return [] + + def get_historical_data(self, model_id): + """Get last 144 hours of sensor data for all sensors associated with the model""" + try: + with Session(self.engine) as session: + # Get all sensors for this model + model_sensors = session.query(ModellSensor).filter( + ModellSensor.modell_id == model_id + ).all() + + sensor_names = [ms.sensor_name for ms in model_sensors] + + # Get last 144 hours of sensor data + time_threshold = datetime.now() - timedelta(hours=144) + time_threshold= pd.to_datetime("2024-09-13 14:00:00.000") - timedelta(hours=144) #TODO rausnehmen + + stmt = select(SensorData).where( + SensorData.tstamp >= time_threshold, + SensorData.sensor_name.in_(sensor_names)) + + df = pd.read_sql(sql=stmt,con=self.engine,index_col="tstamp") + df = df.pivot(columns="sensor_name", values="sensor_value")[sensor_names] + + return df + + except Exception as e: + print(f"Error getting historical data: {str(e)}") + return pd.DataFrame() + + + def setup_callbacks(self): + @self.app.callback( + [Output('model-status-table', 'children'), + Output('status-summary-chart', 'figure')], + Input('status-update', 'n_intervals') + ) + def update_dashboard(n): + if self.con is None: + try: + self.con = oracledb.connect(**self.db_params) + self.engine = create_engine("oracle+oracledb://", creator=lambda: self.con) + except oracledb.OperationalError: + return dbc.Alert("Error connecting to database", color="danger"), {} + + status = self.get_active_models_status() + if not status: + #return html.Div("Error fetching data"), go.Figure() + return dbc.Alert("Error fetching data", color="danger"), {} + + header = html.Div([ + html.H4(f"Status am {status['last_check_time'].strftime('%Y-%m-%d %H:%M:%S')}",className="h6 mb-2"), + html.P(f"Vorhersagen erwartet für: {status['required_timestamp'].strftime('%Y-%m-%d %H:%M:00')}",className="text-muted small") + ]) + + table = dash_table.DataTable( + id='status-table', + columns=[ + {'name': 'Modell', 'id': 'model_name'}, + {'name': 'Status', 'id': 'has_current_forecast'}, + {'name': 'Neueste Vorhersage', 'id': 'last_forecast_time'}, + {'name': 'Vorhersage erstellt', 'id': 'forecast_created'}, + {'name': 'Pegel', 'id': 'sensor_name'}, + {'name': 'model_id', 'id': 'model_id', 'hideable': True}, + {'name': 'actual_model_name', 'id': 'actual_model_name', 'hideable': True} + ], + data=[{ + 'model_name': row['model_name'], + 'has_current_forecast': '✓' if row['has_current_forecast'] else '✗', + 'last_forecast_time': row['last_forecast_time'].strftime('%Y-%m-%d %H:%M:%S') if row['last_forecast_time'] else 'No valid forecast', + 'forecast_created': row['forecast_created'].strftime('%Y-%m-%d %H:%M:%S') if row['forecast_created'] else 'N/A', + 'sensor_name': row['sensor_name'], + 'model_id': row['model_id'], + 'actual_model_name': row['actual_model_name'] + } for row in status['model_status']], + hidden_columns=['model_id', 'actual_model_name'], # Specify hidden columns here + **TABLE_STYLE, + row_selectable='single', + selected_rows=[], + ) + + fig = create_status_summary_chart(status) + + return html.Div([header, table]), fig + + + + @self.app.callback( + [Output('fcst-view', 'children'), + Output('historical-view', 'children'), + Output('log-view', 'children'), + #Output('inp-fcst-view', 'children'), + Output('current-sensor-names', 'data')], # Removed input-forecasts-view + [Input('status-table', 'selected_rows')], + [State('status-table', 'data')] + ) + def update_right_column(selected_rows, table_data): + if not selected_rows: + return (html.Div("Wähle ein Modell um Vorhersagen anzuzeigen."), + html.Div("Wähle ein Modell um Messwerte anzuzeigen."), + #html.Div("Wähle ein Modell um Eingangsvorhersagen anzuzeigen."), + html.Div("Wähle ein Modell um Logs anzuzeigen."), + None) # Removed input forecasts return + + selected_row = table_data[selected_rows[0]] + sensor_name = selected_row['sensor_name'] + model_id = selected_row['model_id'] + model_name = selected_row['model_name'] + actual_model_name = selected_row['actual_model_name'] + + # Get logs + logs = self.get_recent_logs(sensor_name) + log_table = create_log_table(logs) + log_view = html.Div([ + html.H4(f"Neueste Logs für {model_name}"), + log_table + ]) + + # Get historical data + df_historical = self.get_historical_data(model_id) + df_filtered = df_historical[df_historical.isna().any(axis=1)] + + sensor_names = list(df_historical.columns) + + if not df_historical.empty: + fig = create_historical_plot(df_historical, model_name) + historical_table = create_historical_table(df_filtered) + + + historical_view = html.Div([ + #html.H4(f"Messdaten {model_name}"), + dcc.Graph(figure=fig), + html.H4("Zeitpunkte mit fehlenden Messdaten", style={'marginTop': '20px', 'marginBottom': '10px'}), + html.Div(historical_table, style={'width': '100%', 'padding': '10px'}) + ]) + else: + historical_view = html.Div("Keine Messdaten verfügbar") + + + ## Get Input Forecasts + #df_inp_fcst = self.get_input_forecasts(sensor_names) + #if not df_inp_fcst.empty: + # fig_inp_fcst = create_input_forecasts_plot(df_inp_fcst,df_historical) + # inp_fcst_table = create_inp_forecast_status_table(df_inp_fcst) + # inp_fcst_view = html.Div([ + # html.H4(f"Input Forecasts for {model_name}"), + # dcc.Graph(figure=fig_inp_fcst), + # html.H4("Input Forecast Status", style={'marginTop': '20px', 'marginBottom': '10px'}), + # html.Div(inp_fcst_table, style={'width': '100%', 'padding': '10px'}) + # ]) + #else: + # inp_fcst_view = html.Div("No input forecasts available") + + + # Get forecast data + df_inp_fcst,ext_forecast_names = self.get_input_forecasts(sensor_names) + inp_fcst_table = create_inp_forecast_status_table(df_inp_fcst,ext_forecast_names) + df_forecasts = self.get_recent_forecasts(actual_model_name) + if not df_forecasts.empty: + fig_fcst = create_model_forecasts_plot(df_forecasts,df_historical,df_inp_fcst) + fcst_view = html.Div([ + html.H4(f"Pegelvorhersage Modell {model_name}"), + dcc.Graph(figure=fig_fcst), + html.H4("Status Eingangsvorhersagen", style={'marginTop': '20px', 'marginBottom': '10px'}), + html.Div(inp_fcst_table, style={'width': '100%', 'padding': '10px'}) + ]) + else: + fcst_view = html.Div("No forecasts available") + + + return (fcst_view, + historical_view, + log_view, + #inp_fcst_view, + sensor_names) # Removed input forecasts return + + + @self.app.callback( + Output('historical-table', 'data'), + [Input('toggle-missing-values', 'n_clicks'), + Input('status-table', 'selected_rows')], + [State('status-table', 'data'), + State('toggle-missing-values', 'children')] + ) + def update_historical_table(n_clicks, selected_rows, table_data, button_text): + #print("update_historical_table") + if not selected_rows: + return [] + + selected_row = table_data[selected_rows[0]] + model_id = selected_row['model_id'] + + df_historical = self.get_historical_data(model_id) + if df_historical.empty: + return [] + + df_table = df_historical.reset_index() + df_table['tstamp'] = df_table['tstamp'].dt.strftime('%Y-%m-%d %H:%M') + + # Show filtered data by default, show all data after first click + if n_clicks and n_clicks % 2 == 1: + df_filtered = df_table + else: + df_filtered = df_table[df_table.isna().any(axis=1)] + + return df_filtered.to_dict('records') + + + # Update the collapse callback + @self.app.callback( + [Output({'type': 'collapse-content', 'section': MATCH}, 'is_open'), + Output({'type': 'collapse-button', 'section': MATCH}, 'children')], + [Input({'type': 'collapse-button', 'section': MATCH}, 'n_clicks')], + [State({'type': 'collapse-content', 'section': MATCH}, 'is_open'), + State({'type': 'collapse-button', 'section': MATCH}, 'children')] + ) + def toggle_collapse(n_clicks, is_open, current_children): + if n_clicks: + title = current_children[0]['props']['children'] + if is_open: + return False, [html.Span(title, style={'flex': '1'}), html.Span("►", className="ms-2")] + else: + return True, [html.Span(title, style={'flex': '1'}), html.Span("▼", className="ms-2")] + return is_open, current_children + + def run(self, host='0.0.0.0', port=8050, debug=True): + self.app.run(host=host, port=port, debug=debug) + +if __name__ == '__main__': + monitor = ForecastMonitor( + username="c##mspils", + password="cobalt_deviancy", + dsn="localhost/XE" + ) + #monitor.run(host="172.17.0.1", port=8050, debug=True) + #monitor.run(host="134.245.232.166", port=8050, debug=True) + monitor.run() \ No newline at end of file diff --git a/src/dashboard.py b/src/dashboard.py deleted file mode 100644 index 544ae7c39c865c88fef8a8e3181569a28e211bd2..0000000000000000000000000000000000000000 --- a/src/dashboard.py +++ /dev/null @@ -1,541 +0,0 @@ -import os -from datetime import datetime, timedelta -import oracledb -import pandas as pd -import plotly.express as px -import plotly.graph_objs as go -from dash import Dash, dash_table, dcc, html -from dash.dependencies import MATCH, Input, Output, State -from sqlalchemy import and_, create_engine, desc, func, select -from sqlalchemy.orm import Session - -from dash_tools.layout_helper import create_collapsible_section -from dash_tools.style_configs import (create_historical_plot, - create_historical_table, - create_inp_forecast_status_table, - create_input_forecasts_plot, - create_log_table, - create_model_forecasts_plot) -from utils.db_tools.orm_classes import (InputForecasts, Log, Modell, - ModellSensor, PegelForecasts, - SensorData) - -NUM_RECENT_INPUT_FORECASTS = 12 -NUM_RECENT_PEGEL_FORECASTS = 5 - - -class ForecastMonitor: - def __init__(self, username, password, dsn): - self.db_params = { - "user": username, - "password": password, - "dsn": dsn - } - self.con = oracledb.connect(**self.db_params) - self.engine = create_engine("oracle+oracledb://", creator=lambda: self.con) - - self.app = Dash(__name__, suppress_callback_exceptions=True) - self.setup_layout() - self.setup_callbacks() - - def get_model_name_from_path(self, modelldatei): - if not modelldatei: - return None - return os.path.basename(modelldatei) - - def get_active_models_status(self): - try: - now = datetime.now() - last_required_hour = now.replace(minute=0, second=0, microsecond=0) - while last_required_hour.hour % 3 != 0: - last_required_hour -= timedelta(hours=1) - - with Session(self.engine) as session: - active_models = session.query(Modell).filter(Modell.aktiv == 1).all() - - model_status = [] - for model in active_models: - actual_model_name = self.get_model_name_from_path(model.modelldatei) - if not actual_model_name: - continue - - current_forecast = session.query(PegelForecasts).filter( - and_( - PegelForecasts.model == actual_model_name, - PegelForecasts.tstamp == last_required_hour - ) - ).first() - - last_valid_forecast = session.query(PegelForecasts).filter( - and_( - PegelForecasts.model == actual_model_name, - PegelForecasts.h1 != None - ) - ).order_by(PegelForecasts.tstamp.desc()).first() - - model_status.append({ - 'model_name': model.modellname, - 'actual_model_name': actual_model_name, - 'model_id': model.id, # Added for historical data lookup - 'sensor_name': last_valid_forecast.sensor_name if last_valid_forecast else None, - 'has_current_forecast': current_forecast is not None, - 'last_forecast_time': last_valid_forecast.tstamp if last_valid_forecast else None, - 'forecast_created': last_valid_forecast.created if last_valid_forecast else None, - }) - - return { - 'model_status': model_status, - 'last_check_time': now, - 'required_timestamp': last_required_hour - } - - except Exception as e: - print(f"Error getting model status: {str(e)}") - return None - - def get_recent_forecasts(self, actual_model_name, limit=NUM_RECENT_PEGEL_FORECASTS): - """Get recent forecasts for a specific model and sensor""" - try: - subq = ( - select( - PegelForecasts, - func.row_number() - .over( - partition_by=[PegelForecasts.member], - order_by=PegelForecasts.tstamp.desc() - ).label('rn')) - .where(PegelForecasts.model == actual_model_name) - .subquery() - ) - - stmt = ( - select(subq) - .where(subq.c.rn <= NUM_RECENT_PEGEL_FORECASTS) - .order_by( - subq.c.member, - subq.c.tstamp.desc() - ) - ) - df = pd.read_sql(sql=stmt, con=self.engine) - df.drop(columns=['rn'], inplace=True) - return df - - - except Exception as e: - print(f"Error getting recent forecasts: {str(e)}") - return pd.DataFrame() - - def get_input_forecasts(self, sensor_names): - """Get 3 most recent input forecasts for the given sensor names""" - try: - with Session(self.engine) as session: - # Subquery to rank rows by timestamp for each sensor/member combination - subq = ( - select( - InputForecasts, - func.row_number() - .over( - partition_by=[InputForecasts.sensor_name, InputForecasts.member], - order_by=InputForecasts.tstamp.desc() - ).label('rn') - ) - .where(InputForecasts.sensor_name.in_(sensor_names)) - .subquery() - ) - - # Main query to get only the top 3 rows - stmt = ( - select(subq) - .where(subq.c.rn <= NUM_RECENT_INPUT_FORECASTS) - .order_by( - subq.c.sensor_name, - subq.c.member, - subq.c.tstamp.desc() - ) - ) - - df = pd.read_sql(sql=stmt, con=self.engine) - df.drop(columns=['rn'], inplace=True) - return df - - except Exception as e: - raise RuntimeError(f"Error getting input forecasts: {str(e)}") from e - - - def get_recent_logs(self, sensor_name): - if sensor_name is None: - return [] - try: - with Session(self.engine) as session: - logs = session.query(Log).filter( - Log.gauge == sensor_name - ).order_by( - desc(Log.created) - ).limit(10).all() - - return [ - { - 'timestamp': log.created.strftime('%Y-%m-%d %H:%M:%S'), - 'level': log.loglevelname, - 'sensor': log.gauge, - 'message': log.message, - 'module': log.module, - 'function': log.funcname, - 'line': log.lineno, - 'exception': log.exception or '' - } - for log in logs - ] - except Exception as e: - print(f"Error getting logs: {str(e)}") - return [] - - def get_historical_data(self, model_id): - """Get last 144 hours of sensor data for all sensors associated with the model""" - try: - with Session(self.engine) as session: - # Get all sensors for this model - model_sensors = session.query(ModellSensor).filter( - ModellSensor.modell_id == model_id - ).all() - - sensor_names = [ms.sensor_name for ms in model_sensors] - - # Get last 144 hours of sensor data - time_threshold = datetime.now() - timedelta(hours=144) - #time_threshold= pd.to_datetime("2024-09-13 14:00:00.000") - timedelta(hours=144) #TODO rausnehmen - - stmt = select(SensorData).where( - SensorData.tstamp >= time_threshold, - SensorData.sensor_name.in_(sensor_names)) - - df = pd.read_sql(sql=stmt,con=self.engine,index_col="tstamp") - df = df.pivot(columns="sensor_name", values="sensor_value")[sensor_names] - - return df - - except Exception as e: - print(f"Error getting historical data: {str(e)}") - return pd.DataFrame() - - def setup_layout(self): - self.app.layout = html.Div([ - # Add CSS for resizable columns in the app's assets folder instead of inline - html.Div([ - html.H1("Forecast Monitoring Dashboard", style={'padding': '20px'}), - - # Main container with resizable columns - html.Div([ - # Left column - html.Div([ - html.Div([ - html.H3("Active Models Status"), - html.Div(id='model-status-table'), - dcc.Interval( - id='status-update', - interval=600000, # Update every 10 minutes - ) - ]), - - html.Div([ - html.H3("Status Summary"), - dcc.Graph(id='status-summary-chart') - ]), - ], id='left-column', className='column', style={ - 'width': '40%', - 'minWidth': '200px', - 'height': 'calc(100vh - 100px)', - 'overflow': 'auto' - }), - - # Resizer - html.Div(id='resizer', style={ - 'cursor': 'col-resize', - 'width': '10px', - 'backgroundColor': '#f0f0f0', - 'transition': 'background-color 0.3s', - ':hover': { - 'backgroundColor': '#ccc' - } - }), - # Right column - # Right column - html.Div([ - dcc.Store(id='current-sensor-names'), # Store current sensor names - create_collapsible_section( - "Model Forecasts", - html.Div(id='fcst-view'), - is_open=True - ), - create_collapsible_section( - "Recent Logs", - html.Div(id='log-view'), - is_open=True - ), - create_collapsible_section( - "Input Forecasts", - html.Div(id='inp-fcst-view'), - is_open=True - ), - create_collapsible_section( - "Historical Data", - html.Div(id='historical-view'), - is_open=True - ), - - ], id='right-column', className='column', style={ - 'width': '60%', - 'minWidth': '400px', - 'height': 'calc(100vh - 100px)', - 'overflow': 'auto' - }), - - ], style={ - 'display': 'flex', - 'flexDirection': 'row', - 'width': '100%', - 'height': 'calc(100vh - 100px)' - }), - - # Add JavaScript for column resizing using dcc.Store to trigger clientside callback - dcc.Store(id='column-widths', data={'left': 40, 'right': 60}), - ]), - ]) - - # Add clientside callback for resizing - self.app.clientside_callback( - """ - function(trigger) { - if (!window.resizeInitialized) { - const resizer = document.getElementById('resizer'); - const leftColumn = document.getElementById('left-column'); - const rightColumn = document.getElementById('right-column'); - - let x = 0; - let leftWidth = 0; - - const mouseDownHandler = function(e) { - x = e.clientX; - leftWidth = leftColumn.getBoundingClientRect().width; - - document.addEventListener('mousemove', mouseMoveHandler); - document.addEventListener('mouseup', mouseUpHandler); - }; - - const mouseMoveHandler = function(e) { - const dx = e.clientX - x; - const newLeftWidth = ((leftWidth + dx) / resizer.parentNode.getBoundingClientRect().width) * 100; - - if (newLeftWidth > 20 && newLeftWidth < 80) { - leftColumn.style.width = `${newLeftWidth}%`; - rightColumn.style.width = `${100 - newLeftWidth}%`; - } - }; - - const mouseUpHandler = function() { - document.removeEventListener('mousemove', mouseMoveHandler); - document.removeEventListener('mouseup', mouseUpHandler); - }; - - resizer.addEventListener('mousedown', mouseDownHandler); - window.resizeInitialized = true; - } - return window.dash_clientside.no_update; - } - """, - Output('column-widths', 'data'), - Input('column-widths', 'data'), - ) - - - def setup_callbacks(self): - @self.app.callback( - [Output('model-status-table', 'children'), - Output('status-summary-chart', 'figure')], - Input('status-update', 'n_intervals') - ) - def update_dashboard(n): - status = self.get_active_models_status() - if not status: - return html.Div("Error fetching data"), go.Figure() - - header = html.Div([ - html.H4(f"Status as of {status['last_check_time'].strftime('%Y-%m-%d %H:%M:%S')}"), - html.P(f"Checking forecasts for timestamp: {status['required_timestamp'].strftime('%Y-%m-%d %H:%M:00')}") - ]) - - table = dash_table.DataTable( - id='status-table', - columns=[ - {'name': 'Model', 'id': 'model_name'}, - {'name': 'Status', 'id': 'has_current_forecast'}, - {'name': 'Last Valid', 'id': 'last_forecast_time'}, - {'name': 'Created', 'id': 'forecast_created'}, - {'name': 'Target Sensor', 'id': 'sensor_name'}, - {'name': 'model_id', 'id': 'model_id', 'hideable': True}, - #{'name': 'actual_model_name', 'id': 'actual_model_name', 'hidden': True} - #{'name': 'actual_model_name', 'id': 'actual_model_name', 'visible': False} - {'name': 'actual_model_name', 'id': 'actual_model_name', 'hideable': True} - ], - data=[{ - 'model_name': row['model_name'], - 'has_current_forecast': '✓' if row['has_current_forecast'] else '✗', - 'last_forecast_time': row['last_forecast_time'].strftime('%Y-%m-%d %H:%M:%S') if row['last_forecast_time'] else 'No valid forecast', - 'forecast_created': row['forecast_created'].strftime('%Y-%m-%d %H:%M:%S') if row['forecast_created'] else 'N/A', - 'sensor_name': row['sensor_name'], - 'model_id': row['model_id'], - 'actual_model_name': row['actual_model_name'] - } for row in status['model_status']], - hidden_columns=['model_id', 'actual_model_name'], # Specify hidden columns here - style_data_conditional=[ - { - 'if': {'filter_query': '{has_current_forecast} = "✓"', "column_id": "has_current_forecast"}, - 'color': 'green' - }, - { - 'if': {'filter_query': '{has_current_forecast} = "✗"', "column_id": "has_current_forecast"}, - 'color': 'red' - } - ], - style_table={'overflowX': 'auto'}, - style_cell={ - 'textAlign': 'left', - 'padding': '8px', - 'whiteSpace': 'normal', - 'height': 'auto', - 'fontSize': '14px', - }, - style_header={ - 'backgroundColor': '#f4f4f4', - 'fontWeight': 'bold' - }, - row_selectable='single', - selected_rows=[], - ) - - df_status = pd.DataFrame(status['model_status']) - fig = px.bar( - df_status.groupby('model_name')['has_current_forecast'].apply(lambda x: (x.sum()/len(x))*100).reset_index(), - x='model_name', - y='has_current_forecast', - title='Forecast Completion Rate by Model (%)', - labels={'has_current_forecast': 'Completion Rate (%)', 'model_name': 'Model Name'} - ) - fig.update_layout(yaxis_range=[0, 100]) - - return html.Div([header, table]), fig - - - - @self.app.callback( - [Output('fcst-view', 'children'), - Output('log-view', 'children'), - Output('historical-view', 'children'), - Output('inp-fcst-view', 'children'), - Output('current-sensor-names', 'data')], # Removed input-forecasts-view - [Input('status-table', 'selected_rows')], - [State('status-table', 'data')] - ) - def update_right_column(selected_rows, table_data): - if not selected_rows: - return (html.Div("Select a model to view Forecasts"), - html.Div("Select a model to view logs"), - html.Div("Select a model to view Input Forecasts"), - html.Div("Select a model to view historical data"), - None) # Removed input forecasts return - - selected_row = table_data[selected_rows[0]] - sensor_name = selected_row['sensor_name'] - model_id = selected_row['model_id'] - model_name = selected_row['model_name'] - actual_model_name = selected_row['actual_model_name'] - - # Get logs - logs = self.get_recent_logs(sensor_name) - log_table = create_log_table(logs) - log_view = html.Div([ - html.H4(f"Recent Logs for {model_name}"), - log_table - ]) - - - - # Get historical data - df_historical = self.get_historical_data(model_id) - sensor_names = list(df_historical.columns) - - if not df_historical.empty: - fig = create_historical_plot(df_historical, model_name) - historical_table = create_historical_table(df_historical) - historical_view = html.Div([ - html.H4(f"Historical Data for {model_name}"), - dcc.Graph(figure=fig), - html.H4("Sensor Data Table", style={'marginTop': '20px', 'marginBottom': '10px'}), - html.Div(historical_table, style={'width': '100%', 'padding': '10px'}) - ]) - else: - historical_view = html.Div("No historical data available") - - # Get forecast data - df_forecasts = self.get_recent_forecasts(actual_model_name) - if not df_forecasts.empty: - fig_fcst = create_model_forecasts_plot(df_forecasts,df_historical) - fcst_view = html.Div([ - html.H4(f"Gauge Forecasts for {model_name}"), - dcc.Graph(figure=fig_fcst), - #html.H4("Input Forecast Status", style={'marginTop': '20px', 'marginBottom': '10px'}), - #html.Div(inp_fcst_table, style={'width': '100%', 'padding': '10px'}) - ]) - else: - fcst_view = html.Div("No forecasts available") - - # Get Input Forecasts - df_inp_fcst = self.get_input_forecasts(sensor_names) - if not df_inp_fcst.empty: - fig_inp_fcst = create_input_forecasts_plot(df_inp_fcst,df_historical) - inp_fcst_table = create_inp_forecast_status_table(df_inp_fcst) - inp_fcst_view = html.Div([ - html.H4(f"Input Forecasts for {model_name}"), - dcc.Graph(figure=fig_inp_fcst), - html.H4("Input Forecast Status", style={'marginTop': '20px', 'marginBottom': '10px'}), - html.Div(inp_fcst_table, style={'width': '100%', 'padding': '10px'}) - ]) - else: - inp_fcst_view = html.Div("No input forecasts available") - - return (fcst_view, - log_view, - historical_view, - inp_fcst_view, - sensor_names) # Removed input forecasts return - - - - @self.app.callback( - Output({'type': 'collapse-content', 'section': MATCH}, 'style'), - Output({'type': 'collapse-button', 'section': MATCH}, 'children'), - Input({'type': 'collapse-button', 'section': MATCH}, 'n_clicks'), - State({'type': 'collapse-content', 'section': MATCH}, 'style'), - prevent_initial_call=True - ) - def toggle_collapse(n_clicks, current_style): - if current_style is None: - current_style = {} - - if current_style.get('display') == 'none': - return {'display': 'block'}, '▼' - else: - return {'display': 'none'}, '▶' - - def run(self, host='0.0.0.0', port=8050, debug=True): - self.app.run(host=host, port=port, debug=debug) - -if __name__ == '__main__': - monitor = ForecastMonitor( - username="c##mspils", - password="cobalt_deviancy", - dsn="localhost/XE" - ) - #monitor.run(host="172.17.0.1", port=8050, debug=True) - #monitor.run(host="134.245.232.166", port=8050, debug=True) - monitor.run() \ No newline at end of file diff --git a/src/dashboard/constants.py b/src/dashboard/constants.py new file mode 100644 index 0000000000000000000000000000000000000000..2aa1f61f34bae35ad4b461529ad869eaff95fef8 --- /dev/null +++ b/src/dashboard/constants.py @@ -0,0 +1,11 @@ +from datetime import timedelta + +NUM_RECENT_FORECASTS = 5 +NUM_RECENT_LOGS = 10 +BUFFER_TIME = timedelta(hours=2,minutes=30) +PERIODS_EXT_FORECAST_TABLE = 10 + + +# Wie viele Tage zurück in die Vergangenheit schauen, um zu prüfen, ob ein Eingangswert in das Modelll externe Vorhersagen hat. +# Ginge auch ohne, das wäre dann aber deutlich ineffizienter. Falls die externen Vorhersagen zuverlässig kommen lässt sich das auch auf wenige Tage reduzieren. +LOOKBACK_EXTERNAL_FORECAST_EXISTENCE = 100 \ No newline at end of file diff --git a/src/dash_tools/layout_helper.py b/src/dashboard/layout_helper.py similarity index 100% rename from src/dash_tools/layout_helper.py rename to src/dashboard/layout_helper.py diff --git a/src/dashboard/styles.py b/src/dashboard/styles.py new file mode 100644 index 0000000000000000000000000000000000000000..3ac83dba402dcde5e26c064262f2cf114e0f5d40 --- /dev/null +++ b/src/dashboard/styles.py @@ -0,0 +1,187 @@ +from typing import Dict, List, Any +import plotly.express as px + +# Common colors (using Bootstrap colors for consistency) +COLORS = { + 'success': '#198754', + 'danger': '#dc3545', + 'light_gray': '#f4f4f4', + 'border': 'rgba(0,0,0,.125)', + 'background': '#f8f9fa' +} + +COLOR_SET = px.colors.qualitative.Set3 + +# Table Styles +TABLE_STYLE = { + 'style_table': { + 'overflowX': 'auto', + 'borderRadius': '4px', + 'border': f'1px solid {COLORS["border"]}' + }, + 'style_header': { + 'backgroundColor': COLORS['background'], + 'fontWeight': '600', + 'textAlign': 'left', + 'padding': '12px 16px', + 'borderBottom': f'1px solid {COLORS["border"]}' + }, + 'style_cell': { + 'textAlign': 'left', + 'padding': '12px 16px', + 'fontSize': '14px' + }, + 'style_data_conditional': [ + { + 'if': {'row_index': 'odd'}, + 'backgroundColor': 'rgb(248, 249, 250)' + }, + { + 'if': {'filter_query': '{has_current_forecast} = "✓"', "column_id": "has_current_forecast"}, + 'color': COLORS['success'] + }, + { + 'if': {'filter_query': '{has_current_forecast} = "✗"', "column_id": "has_current_forecast"}, + 'color': COLORS['danger'] + } + ] +} + +# Historical Table Style +HISTORICAL_TABLE_STYLE = { + 'style_table': { + 'maxHeight': '1200px', + 'maxWidth': '1600px', + 'overflowY': 'auto', + 'width': '100%' + }, + 'style_cell': { + 'textAlign': 'right', + 'padding': '5px', + 'minWidth': '100px', + 'whiteSpace': 'normal', + 'fontSize': '12px' + }, + 'style_header': { + 'backgroundColor': COLORS['light_gray'], + 'fontWeight': 'bold', + 'textAlign': 'center' + } +} + +# Forecast Status Table Style +FORECAST_STATUS_TABLE_STYLE = { + 'style_table': { + 'overflowX': 'auto', + 'width': '100%' + }, + 'style_cell': { + 'textAlign': 'center', + 'padding': '5px', + 'minWidth': '100px', + 'fontSize': '12px' + }, + 'style_header': { + 'backgroundColor': COLORS['light_gray'], + 'fontWeight': 'bold', + 'textAlign': 'center' + } +} + +# Plot Layouts +STATUS_SUMMARY_LAYOUT = { + 'height': 300, + 'margin': dict(l=30, r=30, t=50, b=30), + 'showlegend': True +} + +MODEL_FORECASTS_LAYOUT = { + 'height': 600, + 'xaxis_title': 'Time', + 'yaxis_title': 'Gauge [cm]', + 'legend': dict( + yanchor="top", + y=0.99, + xanchor="left", + x=1.05 + ) +} + +HISTORICAL_PLOT_LAYOUT = { + 'xaxis_title': 'Time', + 'yaxis_title': 'Value', + 'height': 600, + 'showlegend': True, + 'legend': dict( + yanchor="top", + y=0.99, + xanchor="left", + x=1.05 + ), + 'margin': dict(r=150) +} + +INPUT_FORECASTS_LAYOUT = { + 'showlegend': True, + 'legend': dict( + yanchor="top", + y=0.99, + xanchor="left", + x=1.05, + groupclick="togglegroup", + itemsizing='constant', + tracegroupgap=5 + ), + 'margin': dict(r=150) +} + +# Plot Colors and Traces +FORECAST_COLORS = { + 'historical': 'black', + 'historical_width': 2 +} + +def get_conditional_styles_for_columns(columns: List[str], timestamp_col: str = 'tstamp') -> List[Dict[str, Any]]: + """Generate conditional styles for columns with blank values.""" + styles = [ + { + 'if': { + 'filter_query': '{{{}}} is blank'.format(col), + 'column_id': col + }, + 'backgroundColor': '#ffebee', + 'color': '#c62828' + } for col in columns if col != timestamp_col + ] + + styles.append({ + 'if': {'column_id': timestamp_col}, + 'textAlign': 'left', + 'minWidth': '150px' + }) + + return styles + +def get_forecast_status_conditional_styles(sensor_names: List[str]) -> List[Dict[str, Any]]: + """Generate conditional styles for forecast status table.""" + styles = [] + for col in sensor_names: + styles.extend([ + { + 'if': { + 'filter_query': '{{{col}}} = "Missing"'.format(col=col), + 'column_id': col + }, + 'backgroundColor': '#ffebee', + 'color': '#c62828' + }, + { + 'if': { + 'filter_query': '{{{col}}} = "OK"'.format(col=col), + 'column_id': col + }, + 'backgroundColor': '#e8f5e9', + 'color': '#2e7d32' + } + ]) + return styles \ No newline at end of file diff --git a/src/dash_tools/style_configs.py b/src/dashboard/tables_and_plots.py similarity index 52% rename from src/dash_tools/style_configs.py rename to src/dashboard/tables_and_plots.py index 785e6090fb7be1dd8ea4119890ff1dc2d8f44f28..300fad2b55d69e5250181ee9d237b7f32c8169c6 100644 --- a/src/dash_tools/style_configs.py +++ b/src/dashboard/tables_and_plots.py @@ -1,93 +1,180 @@ +from datetime import datetime, timedelta + +import pandas as pd +import plotly.express as px import plotly.graph_objects as go -from dash import html, dash_table -import plotly.graph_objects as go +from dash import dash_table, html from plotly.subplots import make_subplots -import datetime -from datetime import timedelta, datetime -import plotly.express as px -import pandas as pd +import dash_bootstrap_components as dbc + +from dashboard.styles import (FORECAST_STATUS_TABLE_STYLE, + HISTORICAL_TABLE_STYLE, + STATUS_SUMMARY_LAYOUT, + TABLE_STYLE, + COLORS, + COLOR_SET, + get_conditional_styles_for_columns, + get_forecast_status_conditional_styles) +from dashboard.constants import PERIODS_EXT_FORECAST_TABLE + +def create_status_summary_chart(status): + """Creates a simple pie chart showing the percentage of models with current forecasts.""" + total_models = len(status['model_status']) + models_with_forecast = sum(1 for model in status['model_status'] if model['has_current_forecast']) + models_without_forecast = total_models - models_with_forecast + + fig = go.Figure(data=[go.Pie( + labels=['Current Forecast', 'Missing Forecast'], + values=[models_with_forecast, models_without_forecast], + hole=0.4, + marker_colors=[COLORS['success'], COLORS['danger']], + textinfo='percent', + hovertemplate="Status: %{label}<br>Count: %{value}<br>Percentage: %{percent}<extra></extra>" + )]) + + fig.update_layout( + title={ + 'text': f'Model Status ({models_with_forecast} of {total_models} models have current forecasts)', + 'y': 0.95 + }, + **STATUS_SUMMARY_LAYOUT + ) + + return fig -def create_model_forecasts_plot(df_forecasts, df_historical): +def create_model_forecasts_plot(df_forecasts, df_historical,df_inp_fcst): """Create a plot showing recent model forecasts alongside historical data""" - #TODO drop empty rows? sensor_name = df_forecasts["sensor_name"][0] - - colors = px.colors.qualitative.Set3 - start_times = sorted(df_forecasts["tstamp"].unique()) - start_time_colors = {t: colors[i % len(colors)] for i, t in enumerate(start_times)} + start_times = sorted(set(df_forecasts["tstamp"]) | set(df_inp_fcst["tstamp"])) + start_time_colors = {t: COLOR_SET[i % len(COLOR_SET)] for i, t in enumerate(start_times)} fig = go.Figure() - df_measured = df_historical[start_times[0]-timedelta(days=3):] - # Add historical data - fig.add_trace(go.Scatter( - x=df_measured.index, - y=df_measured[sensor_name], - name=f"Historical - {sensor_name}", - line=dict(color='black', width=2), - mode='lines' - )) + #fig = make_subplots(specs=[[{"secondary_y": True}]]) + + if not df_historical.empty: + df_measured = df_historical[start_times[0]-timedelta(days=3):] + # Add historical data + fig.add_trace(go.Scatter( + x=df_measured.index, + y=df_measured[sensor_name], + name=f"Messwerte - {sensor_name}", + line=dict(color='black', width=2), + mode='lines' + )) df_forecasts = df_forecasts.round(1) members = df_forecasts['member'].unique() + show_legend_check = set() for member in members: member_data = df_forecasts[df_forecasts['member'] == member] - - - for _, row in member_data.iterrows(): + for _, row in filter(lambda x: x[1]["h1"] is not None,member_data.iterrows()): start_time = row['tstamp'] - legend_group = f'{start_time.strftime("%Y%m%d%H%M")}' - legendgrouptitle_text = f'{start_time.strftime("%Y-%m-%d %H:%M")}' - - # Convert h1-h48 columns to forecast points - timestamps = [start_time + timedelta(hours=i) for i in range(1, 49)] - #values = row[5:] - values = [row[f'h{i}'] for i in range(1, 49)] + if start_time in show_legend_check: + show_legend = False + else: + show_legend = True + show_legend_check.add(start_time) + + legend_group = f'{start_time.strftime("%Y-%m-%d %H:%M")}' fig.add_trace( go.Scatter( - x=timestamps, - y=values, - name=f'{start_time.strftime("%Y-%m-%d %H:%M")}', - #name=f'M{member} {start_time.strftime("%Y-%m-%d %H:%M")}', + x=[start_time + timedelta(hours=i) for i in range(1, 49)], + y=[row[f'h{i}'] for i in range(1, 49)], + name=legend_group, legendgroup=legend_group, - showlegend=(member == members[0]).item(),# and sensor_idx == 1, # Show all traces in legend + showlegend=show_legend, line=dict( color=start_time_colors[start_time], - width=1, - #dash='solid' if member == members[0] else 'dot' + width=3 if member == 0 else 1, ), hovertemplate=( 'Time: %{x}<br>' 'Value: %{y:.2f}<br>' f'Member: {member}<br>' f'Start: {start_time}<extra></extra>' - ) + ), + visible= True if start_time == df_forecasts["tstamp"].max() else 'legendonly' ), ) + temp_layout_dict = {"yaxis": {"title" : {"text":"Pegel [cm]"}}} + + fig.update_layout(**temp_layout_dict) + + + ##### + # Add input forecast + if True: + sensors = df_inp_fcst['sensor_name'].unique() + show_legend_check = set() + + # For each sensor + for sensor_idx, sensor in enumerate(sensors, 1): + temp_layout_dict = {f"yaxis{sensor_idx+1}": { + "title" : {"text":"Pegel [cm]"}, + "anchor" : "free", + "overlaying" : "y", + "autoshift":True, + } + } + + fig.update_layout(**temp_layout_dict) + + + sensor_data = df_inp_fcst[df_inp_fcst['sensor_name'] == sensor] + members = sensor_data['member'].unique() + - #fig.add_trace(go.Scatter( - # x=timestamps, - # y=row[5:], - # name=f"Forecast Member {member} ({row['tstamp']})", - # line=dict(color=colors[member % len(colors)], width=1, dash='dot'), - # opacity=1, - # showlegend=True - #)) - - #### - #sensor_data = df_forecast[df_forecast['sensor_name'] == sensor] - #members = sensor_data['member'].unique() - # Add trace to the subplot - - #### + # Get unique forecast start times for color assignment + # For each member + for member in members: + member_data = sensor_data[sensor_data['member'] == member] + + # For each forecast (row) + for _, row in member_data.iterrows(): + start_time = row['tstamp'] + #legend_group = f'{sensor} {start_time.strftime("%Y%m%d%H%M")}' + legend_group = f'{start_time.strftime("%Y-%m-%d %H:%M")} {sensor}' + if start_time in show_legend_check: + show_legend = False + else: + show_legend = True + show_legend_check.add(start_time) + + timestamps = [start_time + timedelta(hours=i) for i in range(1, 49)] + values = [row[f'h{i}'] for i in range(1, 49)] + + fig.add_trace( + go.Scatter( + x=timestamps, + y=values, + name=legend_group, + legendgroup=legend_group, + showlegend=show_legend, + line=dict( + color=start_time_colors[start_time], + width=3 if member == 0 else 1, + dash='solid' if member == 0 else 'dot' + ), + hovertemplate=( + 'Time: %{x}<br>' + 'Value: %{y:.2f}<br>' + f'Member: {member}<br>' + f'Start: {start_time}<extra></extra>' + ), + visible= True if start_time == df_forecasts["tstamp"].max() else 'legendonly', + yaxis=f"y{sensor_idx+1}" + ) + ) + fig.update_layout( - height=600, - title='Model Forecasts', - xaxis_title='Time', - yaxis_title='Gauge [cm]', + #height=600, + #title='Model Forecasts', + #xaxis_title='Time', + #yaxis_title='', #hovermode='x unified', legend=dict( yanchor="top", @@ -117,38 +204,14 @@ def create_log_table(logs_data): {'name': 'Message', 'id': 'message'}, {'name': 'Exception', 'id': 'exception'} ] - - style_data_conditional = [ - { - 'if': {'filter_query': '{level} = "ERROR"'}, - 'backgroundColor': '#ffebee', - 'color': '#c62828' - }, - { - 'if': {'filter_query': '{level} = "WARNING"'}, - 'backgroundColor': '#fff3e0', - 'color': '#ef6c00' - } - ] - - style_table = { - 'overflowX': 'auto' - } - - style_cell = { - 'textAlign': 'left', - 'padding': '8px', - 'whiteSpace': 'normal', - 'height': 'auto', - 'fontSize': '12px', - } - + return dash_table.DataTable( columns=columns, data=logs_data, - style_data_conditional=style_data_conditional, - style_table=style_table, - style_cell=style_cell + **TABLE_STYLE + #style_data_conditional=style_data_conditional, + #style_table=style_table, + #style_cell=style_cell ) @@ -181,7 +244,7 @@ def create_historical_plot(df_historical, model_name): # Update layout fig.update_layout( - title=f'Sensor Measurements - Last 144 Hours for {model_name}', + #title=f'Sensor Measurements - Last 144 Hours for {model_name}', xaxis_title='Time', yaxis_title='Value', height=600, @@ -198,84 +261,70 @@ def create_historical_plot(df_historical, model_name): return fig def create_historical_table(df_historical): - """ - Creates a formatted DataTable for historical sensor data - - Args: - df_historical: DataFrame with sensor measurements - - Returns: - dash.html.Div: Div containing configured DataTable - """ + """Creates a formatted DataTable for historical sensor data.""" df_table = df_historical.reset_index() df_table['tstamp'] = df_table['tstamp'].dt.strftime('%Y-%m-%d %H:%M') - # Table styles - style_table = { - 'maxHeight': '1200px', - 'maxWidth': '1600px', - 'overflowY': 'auto', - 'width': '100%' - } - - style_cell = { - 'textAlign': 'right', - 'padding': '5px', - 'minWidth': '100px', - 'whiteSpace': 'normal', - 'fontSize': '12px', - } - - style_header = { - 'backgroundColor': '#f4f4f4', - 'fontWeight': 'bold', - 'textAlign': 'center', - } - - # Create conditional styles for blank values and timestamp - style_data_conditional = [ - { - 'if': { - 'filter_query': '{{{}}} is blank'.format(col), - 'column_id': col - }, - 'backgroundColor': '#ffebee', - 'color': '#c62828' - } for col in df_table.columns if col != 'tstamp' - ] + [ - { - 'if': {'column_id': 'tstamp'}, - 'textAlign': 'left', - 'minWidth': '150px' - } - ] - - # Create table columns configuration columns = [{'name': col, 'id': col} for col in df_table.columns] + style_data_conditional = get_conditional_styles_for_columns(df_table.columns) - return html.Div( - dash_table.DataTable( + #TODO move buttons etc. to app.py + hist_table = dash_table.DataTable( id='historical-table', columns=columns, data=df_table.to_dict('records'), - style_table=style_table, - style_cell=style_cell, - style_header=style_header, style_data_conditional=style_data_conditional, fixed_columns={'headers': True, 'data': 1}, sort_action='native', sort_mode='single', + **HISTORICAL_TABLE_STYLE ) - ) + + #return dbc.Card([ + # dbc.CardHeader([ + # dbc.Row([ + # dbc.Col( + # dbc.Button( + # "Show All Data", + # id='toggle-missing-values', + # color="primary", + # className="mb-3", + # n_clicks=0 + # ), + # width="auto" + # ) + # ]) + # ]), + # dbc.CardBody(hist_table) + #]) + return html.Div([ + dbc.Row([ + dbc.Col( + dbc.Button( + "Show All Data", + id='toggle-missing-values', + color="primary", + className="mb-3", + n_clicks=0 + ), + width="auto" + ) + ]), + hist_table + + ]) + + #return hist_table -def create_inp_forecast_status_table(df_forecast): +def create_inp_forecast_status_table(df_forecast,ext_forecast_names): """ Creates a status table showing availability of forecasts for each sensor at 3-hour intervals Args: df_forecast: DataFrame with columns tstamp, sensor_name containing forecast data + ext_forecast_names: List of external forecast names Returns: dash.html.Div: Div containing configured DataTable @@ -291,15 +340,17 @@ def create_inp_forecast_status_table(df_forecast): time_range = pd.date_range( end=last_required_hour, - periods=17, # 48 hours / 3 + 1 + periods=PERIODS_EXT_FORECAST_TABLE, # 48 hours / 3 + 1 freq='3h' ) # Initialize result DataFrame with NaN - status_df = pd.DataFrame(index=time_range, columns=sensor_names) + #status_df = pd.DataFrame(index=time_range, columns=sensor_names) + status_df = pd.DataFrame(index=time_range, columns=ext_forecast_names) # For each sensor and timestamp, check if data exists - for sensor in sensor_names: + #for sensor in sensor_names: + for sensor in ext_forecast_names: sensor_data = df_forecast[df_forecast['sensor_name'] == sensor] for timestamp in time_range: #TODO ENSEMBLES @@ -315,53 +366,21 @@ def create_inp_forecast_status_table(df_forecast): status_df['index'] = status_df['index'].dt.strftime('%Y-%m-%d %H:%M') # Configure table styles - style_data_conditional = [ - { - 'if': { - 'filter_query': '{{{col}}} = "Missing"'.format(col=col), - 'column_id': col - }, - 'backgroundColor': '#ffebee', - 'color': '#c62828' - } for col in sensor_names - ] + [ - { - 'if': { - 'filter_query': '{{{col}}} = "OK"'.format(col=col), - 'column_id': col - }, - 'backgroundColor': '#e8f5e9', - 'color': '#2e7d32' - } for col in sensor_names - ] + style_data_conditional = get_forecast_status_conditional_styles(ext_forecast_names) return html.Div( dash_table.DataTable( id='forecast-status-table', columns=[ {'name': 'Timestamp', 'id': 'index'}, - *[{'name': col, 'id': col} for col in sensor_names] + *[{'name': col, 'id': col} for col in ext_forecast_names] ], data=status_df.to_dict('records'), - style_table={ - 'overflowX': 'auto', - 'width': '100%' - }, - style_cell={ - 'textAlign': 'center', - 'padding': '5px', - 'minWidth': '100px', - 'fontSize': '12px', - }, - style_header={ - 'backgroundColor': '#f4f4f4', - 'fontWeight': 'bold', - 'textAlign': 'center', - }, style_data_conditional=style_data_conditional, fixed_columns={'headers': True, 'data': 1}, sort_action='native', sort_mode='single', + **FORECAST_STATUS_TABLE_STYLE ) ) @@ -384,12 +403,12 @@ def create_input_forecasts_plot(df_forecast, df_historical): # Create subplot figure fig = make_subplots( - rows=len(sensors), + rows=len(sensors), cols=1, subplot_titles=[f'Sensor: {sensor}' for sensor in sensors], vertical_spacing=0.1 ) - + # For each sensor for sensor_idx, sensor in enumerate(sensors, 1): # Add historical data @@ -414,7 +433,7 @@ def create_input_forecasts_plot(df_forecast, df_historical): # Get unique forecast start times for color assignment start_times = sorted(sensor_data['tstamp'].unique()) - start_time_colors = {t: colors[i % len(colors)] for i, t in enumerate(start_times)} + start_time_colors = {t: COLOR_SET[i % len(COLOR_SET)] for i, t in enumerate(start_times)} # For each member for member in members: @@ -424,25 +443,17 @@ def create_input_forecasts_plot(df_forecast, df_historical): for _, row in member_data.iterrows(): start_time = row['tstamp'] legend_group = f'{sensor} {start_time.strftime("%Y%m%d%H%M")}' - legendgrouptitle_text = f'{sensor}: {start_time.strftime("%Y-%m-%d %H:%M")}' - - # Create x values (timestamps) for this forecast + timestamps = [start_time + timedelta(hours=i) for i in range(1, 49)] - - # Get y values (h1 to h48) values = [row[f'h{i}'] for i in range(1, 49)] - # Add trace to the subplot fig.add_trace( go.Scatter( x=timestamps, y=values, name=f'{start_time.strftime("%Y-%m-%d %H:%M")}', - #name=f'M{member} {start_time.strftime("%Y-%m-%d %H:%M")}', legendgroup=legend_group, - #legendgrouptitle_text=legendgrouptitle_text, - #showlegend=True, # Show all traces in legend - showlegend=(member == members[0]).item(),# and sensor_idx == 1, # Show all traces in legend + showlegend=(member == members[0]).item(), line=dict( color=start_time_colors[start_time], width=1, @@ -462,7 +473,7 @@ def create_input_forecasts_plot(df_forecast, df_historical): # Update layout fig.update_layout( height=400 * len(sensors), # Adjust height based on number of sensors - title='Forecast Values by Sensor with Historical Data', + #title='Forecast Values by Sensor with Historical Data', showlegend=True, legend=dict( yanchor="top", @@ -470,7 +481,6 @@ def create_input_forecasts_plot(df_forecast, df_historical): xanchor="left", x=1.05, groupclick="togglegroup", # Allows clicking group title to toggle all traces - #groupclick="toggleitem", # Allows clicking group title to toggle all traces itemsizing='constant', # Makes legend items constant size tracegroupgap=5 # Add small gap between groups ), diff --git a/src/import_model.py b/src/import_model.py index 15781b11529efa09a291356a96776930f03bcdc3..cf40609089f652fad3af821c8a707ad0aa81d205 100644 --- a/src/import_model.py +++ b/src/import_model.py @@ -3,16 +3,16 @@ import argparse import logging import sys from pathlib import Path - +import pandas as pd import oracledb import yaml from sqlalchemy import create_engine, func, select from sqlalchemy.orm import Session +from tbparse import SummaryReader import utils.helpers as hp from utils.db_tools.db_logging import OracleDBHandler -from utils.db_tools.orm_classes import (InputForecastsMeta, Modell, - ModellSensor, Sensor) +from utils.db_tools.orm_classes import InputForecastsMeta, Modell,ModellSensor, Sensor, Metric def parse_args() -> argparse.Namespace: @@ -133,13 +133,56 @@ class ModelImporter: # Add Model to database model_id = self._add_model(session,model_config,model_hparams) + self._maybe_add_metrics(session,model_id,model_config,model_hparams) + for i,col in enumerate(model_config["columns"]): _ = self._parse_sensor_maybe_add(session,col) inp_fcst_meta = self._maybe_add_inp_fcst_meta(session,model_config,col) vhs_gebiet = None if inp_fcst_meta is None else inp_fcst_meta.vhs_gebiet _ = self._maybe_add_modelsensor(session,model_id,col,vhs_gebiet,i) - + def _maybe_add_metrics(self,session,model_id,model_config,model_hparams): + #TODO error handling and updating old models with metrics + reader = SummaryReader(model_config["model_folder"],pivot=True) + df =reader.scalars + loss_cols = df.columns[df.columns.str.contains("loss")] + df = df.drop(["step","epoch"] + list(loss_cols),axis=1)[1:49] + + + metrics = [] + for col in df: + for i,row in enumerate(df[col]): + col_name = col + if col_name.startswith("hp/"): + col_name = col_name[3:] + + set_name = col_name.split("_")[0] + if set_name == "train": + start_time = pd.to_datetime(model_hparams["train_start"]) + end_time = pd.to_datetime(model_hparams["val_start"]) + elif set_name == "val": + start_time = pd.to_datetime(model_hparams["val_start"]) + end_time = pd.to_datetime(model_hparams["test_start"]) + elif set_name == "test": + start_time = pd.to_datetime(model_hparams["test_start"]) + + #TODO ungenauer Pfusch, besser end_time loggen + not_test_time = (pd.to_datetime(model_hparams["test_start"]) - pd.to_datetime(model_hparams["train_start"])) + not_test_share = model_hparams["train"]+model_hparams["val"] + test_share = 1-not_test_share + + end_time = start_time + (not_test_time/not_test_share*test_share) + end_time = end_time.round("h") + if "flood" in col_name: + tag = "flood" + else: + tag = "normal" + + metric = Metric(model_id=model_id,metric_name=col_name,timestep=i+1,start_time=start_time,end_time=end_time,value=row,tag=tag) + metrics.append(metric) + + session.add_all(metrics) + def _get_next_model_id(self, session: Session) -> float: """Get next available model ID""" #TODO the database should use an IDENTITY column for the model table, then we could remove this @@ -163,7 +206,7 @@ class ModelImporter: def _get_user_input(self, message: str) -> str: """Get user input for action""" - response = input(f"{message} (y/n): ") + response = input(f"{message}: ") return response def _parse_sensor_maybe_add(self,session,sensor_name: str) -> Sensor: diff --git a/src/predict_database.py b/src/predict_database.py index 63afc47eb4d783d41ab4c64523dfbc6e7c6f41bb..5ef0d5b27fd49f6f0a940b98ca6b69da81a90983 100644 --- a/src/predict_database.py +++ b/src/predict_database.py @@ -40,6 +40,10 @@ def get_configs(passed_args: argparse.Namespace) -> Tuple[dict, List[dict]]: main_config["start"] = pd.to_datetime(main_config["start"]) if "end" in main_config: main_config["end"] = pd.to_datetime(main_config["end"]) + + if main_config.get("fake_external"): + assert main_config["range"] == True, "fake_external only works with range=True" + if passed_args.start is not None: main_config["start"] = passed_args.start main_config.pop("end", None) @@ -131,7 +135,6 @@ def main(passed_args) -> None: else: logging.info("Initiating Thin mode") connector = OracleWaVoConnection(con, main_config) - # connector.maybe_create_tables() # Create tables if they don't exist connector.maybe_update_tables() # Potentially load new data for gauge_config in gauge_configs: diff --git a/src/utils/db_tools/db_tools.py b/src/utils/db_tools/db_tools.py index 7842b476f7dd24e9c00753cef600be863190438e..b64a16cba96248c3d0671aa1bb493869a25afd30 100644 --- a/src/utils/db_tools/db_tools.py +++ b/src/utils/db_tools/db_tools.py @@ -10,6 +10,7 @@ import warnings # from warnings import deprecated (works from python 3.13 on) +from functools import reduce import numpy as np import oracledb import pandas as pd @@ -31,12 +32,15 @@ from utils.db_tools.orm_classes import ( InputForecastsMeta, Modell ) - +from torch.utils.data import DataLoader,Dataset +#from data_tools.datasets import TimeSeriesDataSet # pylint: disable=unsupported-assignment-operation # pylint: disable=unsubscriptable-object # pylint: disable=line-too-long + + class OracleWaVoConnection: """Class for writing/reading from the Wavo Database.""" @@ -67,42 +71,64 @@ class OracleWaVoConnection: model.eval() created = datetime.now() - for end_time in self.times: - # load sensor sensor input data - df_base_input = self.load_input_db( - in_size=model.in_size, - end_time=end_time, - gauge_config=gauge_config, - created=created, - ) - if df_base_input is None: - continue + #get the metadata for the external forecasts (to access them later from the db) + input_meta_data = self._ext_fcst_metadata(gauge_config) - # predict - # Get all external forecasts for this gauge - stmt = select(InputForecastsMeta).where(InputForecastsMeta.sensor_name.in_(bindparam("external_fcst"))) - params = {"external_fcst" : gauge_config["external_fcst"]} - with Session(self.engine) as session: - input_meta_data = session.scalars(statement=stmt, params=params).fetchall() + if len(self.times)> 1: + df_base_input = self.load_input_db(model.in_size, self.times[-1], gauge_config,created) - stmst2 = select(InputForecasts).where(InputForecasts.sensor_name.in_(bindparam("ext_forecasts")),InputForecasts.tstamp == (bindparam("tstamp"))) - params2 = {"ext_forecasts" : gauge_config["external_fcst"], "tstamp" : end_time} - df_temp = pd.read_sql(sql=stmst2,con=self.engine,index_col="tstamp",params=params2) - if len(gauge_config["external_fcst"]) != len(input_meta_data): - logging.error("Not all external can be found in InputForecastsMeta, only %s",[x.sensor_name for x in input_meta_data],extra={"gauge" : gauge_config["gauge"]}) - return + #load external forecasts + if self.main_config.get("fake_external"): + df_temp = None + else: + stmst = select(InputForecasts).where(InputForecasts.sensor_name.in_(gauge_config["external_fcst"]), + between(InputForecasts.tstamp, self.times[0], self.times[-1]), + InputForecasts.member.in_(self.members)) - for member in self.members: - self.handle_member( - gauge_config, - model, - df_base_input, - member, - end_time, - input_meta_data, - df_temp, - created + df_temp = pd.read_sql(sql=stmst,con=self.engine,index_col="tstamp") + + dataset = self._make_dataset(df_base_input, model,input_meta_data,df_temp) + #else: + + + pred_multiple_db(model, df_base_input) + print("a") + + else: + #TODO this is weird, cause it's just one at the moment + for end_time in self.times: + # load sensor sensor input data + df_base_input = self.load_input_db( + in_size=model.in_size, + end_time=end_time, + gauge_config=gauge_config, + created=created, ) + if df_base_input is None: + continue + + # load external forecasts + stmst = select(InputForecasts).where(InputForecasts.sensor_name.in_(bindparam("ext_forecasts")),InputForecasts.tstamp == (bindparam("tstamp"))) + params = {"ext_forecasts" : gauge_config["external_fcst"], "tstamp" : end_time} + df_temp = pd.read_sql(sql=stmst,con=self.engine,index_col="tstamp",params=params) + + if len(gauge_config["external_fcst"]) != len(input_meta_data): + logging.error("Not all external can be found in InputForecastsMeta, only %s",[x.sensor_name for x in input_meta_data],extra={"gauge" : gauge_config["gauge"]}) + return + + for member in self.members: + self.handle_member( + gauge_config, + model, + df_base_input, + member, + end_time, + input_meta_data, + df_temp, + created + ) + + def handle_member( @@ -170,6 +196,7 @@ class OracleWaVoConnection: y,gauge_config,end_time, member, created ) + def load_input_db(self, in_size, end_time, gauge_config,created) -> pd.DataFrame: """ Loads input data from the database based on the current configuration. @@ -186,7 +213,14 @@ class OracleWaVoConnection: columns=gauge_config["columns"] external_fcst=gauge_config["external_fcst"] - start_time = end_time - pd.Timedelta(in_size - 1, "hours") + + if len(self.times) > 1: + first_end_time = self.times[0] + start_time = first_end_time - pd.Timedelta(in_size - 1, "hours") + if self.main_config.get("fake_external"): + end_time = end_time + pd.Timedelta(48, "hours") + else: + start_time = end_time - pd.Timedelta(in_size - 1, "hours") stmt = select(SensorData).where( between(SensorData.tstamp, bindparam("start_time"), bindparam("end_time")), @@ -211,7 +245,7 @@ class OracleWaVoConnection: df_rest = df_main[end_time + pd.Timedelta("1h") :] df_rest.index = df_rest.index - pd.Timedelta("48h") - if len(df_input) != in_size: + if len(self.times)== 1 and len(df_input) != in_size: raise MissingTimestampsError( f"Some timestamps are completely missing, found {len(df_input)}/{in_size} hours from {start_time} to {end_time}" ) @@ -244,6 +278,9 @@ class OracleWaVoConnection: # This is a little messy because we don't want to interpolate those values and need to be careful about missing timestamps df_input.loc[df_rest.index, col] = df_rest[col] + if self.main_config.get("fake_external"): + df_input = df_input.iloc[:-48] + except MissingTimestampsError as e: logging.error(e.args[0],extra={"gauge":gauge_config["gauge"]}) df_input = None @@ -251,14 +288,18 @@ class OracleWaVoConnection: logging.error(e.args[0],extra={"gauge":gauge_config["gauge"]}) df_input = None except KeyError as e: + #logging.error( + # "Some columns are missing in the database, found %s", + # df_main.columns,extra={"gauge":gauge_config["gauge"]}) logging.error( "Some columns are missing in the database, found %s", - df_main.columns,extra={"gauge":gauge_config["gauge"]}) + df_main["sensor_name"].unique(),extra={"gauge":gauge_config["gauge"]}) logging.error(e.args[0]) df_input = None if df_input is None: logging.error("Input sensordata could not be loaded, inserting empty forecasts",extra={"gauge":gauge_config["gauge"]}) + #TODO logging here is weird/wrong with multiple predictions. y = torch.Tensor(48).fill_(np.nan) for member in self.members: try: @@ -517,6 +558,72 @@ class OracleWaVoConnection: date_format = '%Y%m%d%H%M') + def _ext_fcst_metadata(self, gauge_config): + # Get all external forecasts for this gauge + stmt = select(InputForecastsMeta).where(InputForecastsMeta.sensor_name.in_(bindparam("external_fcst"))) + with Session(self.engine) as session: + input_meta_data = session.scalars(statement=stmt, params={"external_fcst" : gauge_config["external_fcst"]}).fetchall() + + return input_meta_data + + + + def _make_dataset(self, df_input, model,input_meta_data,df_ext_forecasts): + with warnings.catch_warnings(): + warnings.filterwarnings(action="ignore", category=UserWarning) + if model.differencing == 1: + df_input["d1"] = df_input.iloc[:, model.gauge_idx].diff().fillna(0) + #TODO time embedding stuff + df_scaled = pd.DataFrame(model.scaler.transform(df_input.values),index=df_input.index,columns=df_input.columns) + #x = torch.tensor(data=x, device=model.device, dtype=torch.float32).unsqueeze(dim=0) + #TODO skalierung df_ext_forecast + + class DBDataSet(Dataset): + def __init__(self, df_scaled,in_size,ext_meta,df_ext) -> None: + self.df_scaled = df_scaled + self.in_size = in_size + self.df_ext = df_ext + self.ext_meta = ext_meta #TODO rename + + self.max_ens_size = max([i_meta.ensemble_members for i_meta in ext_meta]) + + + self.unique_indexes = [df_ext[df_ext["sensor_name"] == meta.sensor_name].index.unique() for meta in ext_meta] + self.common_indexes = reduce(np.intersect1d, self.unique_indexes) + + def __len__(self) -> int: + return len(self.common_indexes)*self.max_ens_size + + def get_whole_index(self): + rows = [(self.common_indexes[idx // self.max_ens_size],idx % self.max_ens_size) for idx in range(len(self))] + + return pd.DataFrame(rows,columns=["tstamp","member"]) + + + def __getitem__(self, idx): + tstamp = self.common_indexes[idx // self.max_ens_size] + ens_num = idx % self.max_ens_size + + x = self.df_scaled[:tstamp][-self.in_size:] + + for meta in self.ext_meta: + print(meta) + if meta.ensemble_members == 1: + #ext_values = self.df_ext_forecasts.loc[tstamp,meta_data.sensor_name] + ext_values = self.df_ext[self.df_ext["sensor_name"]== meta.sensor_name].loc[tstamp].iloc[2:].values + else: + ext_values = self.df_ext[(self.df_ext["sensor_name"]== meta.sensor_name) & (self.df_ext["member"]== ens_num)].loc[tstamp].iloc[2:].values + + + col_idx = x.columns.get_loc(meta.sensor_name) + x.iloc[-48:,col_idx] = ext_values.astype("float32") + + if idx >= len(self): + raise IndexError(f"Index {idx} is out of range, dataset has length {len(self)}") + #y = self.y[idx+self.in_size:idx+self.in_size+self.out_size] + return x.values, None #TODO auch y ? + + def maybe_update_tables(self) -> None: """ Updates the database tables if necessary based on the configuration settings. @@ -533,89 +640,6 @@ class OracleWaVoConnection: if self.main_config.get("load_zrxp"): self.add_zrxp_data(self.main_config["zrxp_folder"]) - def maybe_create_tables(self) -> None: - """ - Checks if the required tables exist in the database and creates them if they don't exist. - """ - print( - "THIS METHOD IS DEPRECATED, the LFU created tables, not sure how to map their constraints to the oracle_db library" - ) - - with self.con.cursor() as cursor: - table_list = [ - row[0] for row in cursor.execute("SELECT table_name FROM user_tables") - ] - - if "SENSOR_DATA" not in table_list: - logging.warning( - "Warning: Table SENSOR_DATA not in database, creating new empty table" - ) - self.create_sensor_table() - - if "PEGEL_FORECASTS" not in table_list: - logging.warning( - "Warning: Table PEGEL_FORECASTS not in database, creating new empty table" - ) - self.create_forecast_table() - - if "INPUT_FORECASTS" not in table_list: - logging.warning( - "Warning: Table INPUT_FORECASTS not in database, creating new empty table" - ) - self.create_external_forecast_table() - - def create_sensor_table(self) -> None: - """ - Creates the table SENSOR_DATA with columns tstamp, sensor_name, sensor_value. - """ - with self.con.cursor() as cursor: - query = """CREATE TABLE SENSOR_DATA ( - tstamp TIMESTAMP NOT NULL, - sensor_name varchar(256) NOT NULL, - sensor_value FLOAT, - PRIMARY KEY (tstamp, sensor_name))""" - cursor.execute(query) - self.con.commit() - - def create_forecast_table(self) -> None: - """ - Creates the table PEGEL_FORECASTS with columns tstamp, gauge, model, member, created and a value column for each of the 48 forecast hours. - """ - with self.con.cursor() as cursor: - query = ( - """CREATE TABLE PEGEL_FORECASTS ( - tstamp TIMESTAMP NOT NULL, - sensor_name varchar(256) NOT NULL, - model varchar(256) NOT NULL, - member INTEGER NOT NULL, - created TIMESTAMP NOT NULL, - """ - + "\n".join([f"h{i} FLOAT," for i in range(1, 49)]) - + """ - PRIMARY KEY (tstamp,sensor_name,model,member) - )""" - ) - cursor.execute(query) - self.con.commit() - - def create_external_forecast_table(self) -> None: - """ - Creates the table INPUT_FORECASTS with columns tstamp, sensor_name, member and a value column for each of the 48 forecast hours. - """ - with self.con.cursor() as cursor: - query = ( - """CREATE TABLE INPUT_FORECASTS ( - tstamp TIMESTAMP NOT NULL, - sensor_name varchar(255) NOT NULL, - member INTEGER NOT NULL, - """ - + "\n".join([f"h{i} FLOAT," for i in range(1, 49)]) - + """ - PRIMARY KEY (tstamp,sensor_name,member) - )""" - ) - cursor.execute(query) - self.con.commit() def get_member_list(self) -> List[int]: """Returns a list of member identifiers based on the main configuration. @@ -646,7 +670,7 @@ class OracleWaVoConnection: if "range" in self.main_config and self.main_config["range"]: return list( pd.date_range( - self.main_config["start"], self.main_config["end"], freq="H" + self.main_config["start"], self.main_config["end"], freq="h" ) ) return [pd.to_datetime(self.main_config["start"])] @@ -775,7 +799,38 @@ def pred_single_db(model, df_input: pd.DataFrame) -> torch.Tensor: ) # TODO values/df column names wrong/old x = torch.tensor(data=x, device=model.device, dtype=torch.float32).unsqueeze(dim=0) + y = model.predict_step((x, None), 0) + + return y[0] + + + +@torch.no_grad() +def pred_multiple_db(model, df_input: pd.DataFrame) -> torch.Tensor: + """ + Predicts the output for a single input data point. + + Args: + model (LightningModule): The model to use for prediction. + df_input (pandas.DataFrame): Input data point as a DataFrame. + + Returns: + torch.Tensor: Predicted output as a tensor. + """ + with warnings.catch_warnings(): + warnings.filterwarnings(action="ignore", category=UserWarning) + if model.differencing == 1: + df_input["d1"] = df_input.iloc[:, model.gauge_idx].diff().fillna(0) + #TODO time embedding stuff + x = model.scaler.transform( + df_input.values + ) # TODO values/df column names wrong/old + x = torch.tensor(data=x, device=model.device, dtype=torch.float32).unsqueeze(dim=0) + # y = ut.inv_standard(model(x), model.scaler.mean_[model.target_idx], model.scaler.scale_[model.target_idx]) + + data_loader = DataLoader(dataset, batch_size=256,num_workers=8) + hp.get_pred(model, x,None) y = model.predict_step((x, None), 0) return y[0] diff --git a/src/utils/db_tools/orm_classes.py b/src/utils/db_tools/orm_classes.py index c871ace891cd6499a2a3057ccb71e978161faa85..eea22b84b4fbd502ee68e58e5fcd219bf12caa11 100644 --- a/src/utils/db_tools/orm_classes.py +++ b/src/utils/db_tools/orm_classes.py @@ -4,7 +4,7 @@ The alternative is to use table reflection from sqlalchemy. from typing import List, Optional import datetime -from sqlalchemy import Double, ForeignKeyConstraint, Index, Integer, PrimaryKeyConstraint, TIMESTAMP, VARCHAR,Identity, String, UniqueConstraint +from sqlalchemy import Double, ForeignKeyConstraint, Index, Integer, PrimaryKeyConstraint, TIMESTAMP, VARCHAR,Identity, String, UniqueConstraint, text from sqlalchemy.dialects.oracle import NUMBER from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship @@ -76,7 +76,7 @@ class Modell(Base): in_size: Mapped[Optional[int]] = mapped_column( NUMBER(38,0)) target_sensor_name: Mapped[Optional[str]] = mapped_column(VARCHAR(256)) - + metric: Mapped[List['Metric']] = relationship('Metric', back_populates='model') modell_sensor: Mapped[List['ModellSensor']] = relationship('ModellSensor', back_populates='modell') def __repr__(self) -> str: @@ -122,7 +122,6 @@ class TmpSensorData(Base): - class InputForecasts(Base): __tablename__ = 'input_forecasts' __table_args__ = ( @@ -187,6 +186,31 @@ class InputForecasts(Base): sensor: Mapped['Sensor'] = relationship('Sensor', back_populates='input_forecasts') + +class Metric(Base): + __tablename__ = 'metric' + __table_args__ = ( + ForeignKeyConstraint(['model_id'], ['modell.id'], name='metric_to_model'), + PrimaryKeyConstraint('id', name='sys_c008840'), + Index('metric_unique', 'model_id', 'metric_name', 'timestep', 'tag', unique=True) + ) + #TODO Constraints verbessern (Startzeitpunkt + Endzeitpunkt eindeutig) + #TODO the server_default will probably not work at the lfu + #id: Mapped[float] = mapped_column(NUMBER(19, 0, False), primary_key=True, server_default=text('"C##MSPILS"."METRICS_SEQ"."NEXTVAL"')) + id: Mapped[float] = mapped_column(NUMBER(19, 0, False), + Identity(always=True, on_null=False, start=1, increment=1, minvalue=1, maxvalue=9999999999999999999999999999, cycle=False, cache=20, order=False), + primary_key=True) + model_id: Mapped[float] = mapped_column(NUMBER(10, 0, False)) + metric_name: Mapped[str] = mapped_column(VARCHAR(100)) + timestep: Mapped[int] = mapped_column(Integer) + start_time: Mapped[datetime.datetime] = mapped_column(TIMESTAMP) + end_time: Mapped[datetime.datetime] = mapped_column(TIMESTAMP) + value: Mapped[float] = mapped_column(Double) + tag: Mapped[Optional[str]] = mapped_column(VARCHAR(100)) + + model: Mapped['Modell'] = relationship('Modell', back_populates='metric') + + class ModellSensor(Base): __tablename__ = 'modell_sensor' __table_args__ = ( diff --git a/src/utils/helpers.py b/src/utils/helpers.py index 950f0424a9b789eacb3d491a014d712374c994a8..56bf836d999dab1fda8d0ed1c138982523dfc831 100644 --- a/src/utils/helpers.py +++ b/src/utils/helpers.py @@ -85,10 +85,11 @@ def get_pred( if move_model: model.cuda() - pred = np.concatenate(pred) + y_pred = np.concatenate(pred) - y_pred = np.concatenate([np.expand_dims(y_true, axis=1), pred], axis=1) - y_pred = pd.DataFrame(y_pred, index=y_true.index, columns=range(y_pred.shape[1])) + if y_true is not None: + y_pred = np.concatenate([np.expand_dims(y_true, axis=1), pred], axis=1) + y_pred = pd.DataFrame(y_pred, index=y_true.index, columns=range(y_pred.shape[1])) return y_pred