# helferscript mit klassen für den URI und adressen update # schuldatenbank heißt in der DB "schulen" import requests import psycopg2 import json class DBTools: """Tools für das Arbeiten mit der DB. Hier sind die select Funktionen zum selektieren und updaten der Daten in der Datenbank. """ def __init__(self, conn: str) -> None: """Aufbauen der Verbindung zu der Datenbank Args: data: Connection String mit allen Informationen für die Verbindung zur Datenbank """ self.db_connection = psycopg2.connect(conn) self.cursor = self.db_connection.cursor() def select_adress(self, table: str) -> "list[tuple[str, ... ]]": """Selektieren der Adressdaten Funktion zum selektieren der Adressdaten der Schulen aus der Datenbank. Die folgenden Daten werden aus der Datenbank ausgelesen: * ID der Schule in der Datenbank * Hausnummer * Straßenname * PLZ * Ortsname Args: table: Name der Tabelle aus welcher die Adressen selektiert werden sollen Returns: Liste mit mehreren Tupeln welche die Adressdaten der Schulen enthalten. """ select_sql = f""" select lokaleID, adressealt_hnr, adressealt_strasse, adressealt_plz, adressealt_ort from {table};""" self.cursor.execute(select_sql) query_result = self.cursor.fetchall() return query_result def select_uri(self, table: str) -> "list[tuple[str, ... ]]": """Selektieren der AREG URI Funktion zum selektieren der URI aus dem Datensatz um diese mit aktuellen AREG Adressdaten anzureichern. Args: table: Name der Tabelle aus welcher die Adressen selektiert werden sollen Returns: Liste mit meheren Tupeln welche die AREG URI enthalten. Es wird sichergestellt, dass keine NULL Werte vorkommen. """ select_sql = f""" select distinct(areg_uri) from {table} where areg_uri like 'https://%';""" self.cursor.execute(select_sql) query_data = self.cursor.fetchall() return query_data def update_uri(self, param_dbid: str, param_uri: str, table: str) -> None: """Update der URI in der DB Funktion zum Update der Datenbank mit den neuen URI aus dem Geocoder. Args: param_dbid: ID der Schule aus der Datenbank (nicht AREG URI) param_uri: AREG URI table: Name der Tabelle aus welcher die Adressen selektiert werden sollen """ sql_alter = f""" alter table {table} add column if not exists areg_uri varchar; """ sql_update = f""" update {table} set areg_uri='{param_uri}' where lokaleID='{param_dbid}'; """ self.cursor.execute(sql_alter) self.cursor.execute(sql_update) def update_adress(self, data: "dict[str,str]", param_uri: str, dienstart: str, table: str) -> None: # noqa """Updaten der Adressen mit AREG URI Funktion zum Update der Datenbank mit den neuen Adressdaten aus dem AREG anhand der AREG URI. Args: data: Dictionary mit neuen Adressdaten aus der AREG API param_uri: AREG URI zum zuordnen der Adressdaten table: Name der Tabelle aus welcher die Adressen selektiert werden sollen """ geometry = str(data['geometry']) geometry = geometry.replace("'", '"') sql_alter = f""" alter table {table} add column if not exists hnr varchar, add column if not exists stn varchar, add column if not exists plz varchar, add column if not exists ort varchar, add column if not exists dienstart varchar, add column if not exists geom geometry(Point, 4326); """ sql_update = f""" update {table} set hnr='{data['hnr']}', stn='{data['stn']}', plz='{data['plz']}', ort='{data['ort']}', dienstart='{dienstart}', geom=ST_GeomFromGeoJSON('{geometry}') where areg_uri='{param_uri}'; """ self.cursor.execute(sql_alter) self.cursor.execute(sql_update) def closecursor(self) -> None: """Commit und Schließen der DB-Connection Funktion zum Commit und Schießen der Datenbankverbindung nachdem alle Änderungen vorgenommen wurden. """ # schließt den cursor self.db_connection.commit() self.cursor.close() self.db_connection.close() class APITools: """Tools für das Arbeiten mit der pygeoAPI. Hier sind die Funktionen zum aufrufen des Geocoder Prozesses und zum Finden der Adresse anhand der URI """ def __init__(self, param_url): """Initialisiert Instanz der APITools. Festlegen der Basis URL der API für die weitere Verwendung in den Funktionen der Klasse. Args: param_url: Basis URL der API """ self.base_url = param_url def geocode(self, data: "dict[str,str]") -> "dict[str,str]": """Aufrufen des API Geocoders Funktion zum Aufrufen des Geocoder Prozesses in der API. Bereinigung der Adressdaten finden nicht in dieser Funktion oder diesem Skript sondern im Geocoder statt! Args: data: Dictionary mit den alten Adressdaten Returns: Dictionary entweder mit einem `uri` Objekt oder einem `message` Objekt. Wenn `uri` kommt ist alles gut, wenn `message` kommt ist ein Fehler aufgetreten. Der genaue Fehler ist in `message` enthalten. """ url = self.base_url + r"processes/geocoder/execution" result = {"message": "you should not see this"} request_body = { "inputs": { "hnr": data["hnr"], "ort": data["ort"], "plz": data["plz"], "stn": data["stn"], "uri": True } } try: api_result = requests.post(url, json.dumps(request_body)) api_result.raise_for_status() result = api_result.json() except requests.exceptions.ConnectionError as e: result = {"message": str(e)} except requests.exceptions.Timeout as e: result = {"message": str(e)} except requests.exceptions.HTTPError as e: result = {"message": str(e)} return result def findadresse(self, param_uri: str) -> "dict[str,str]": """Findet Adresse anhand von URI Funktion zum finden der Adresse im AREG API anhand der vergebenen AREG URI. Args: param_uri: URL zur API mit AREG URI der Adresse. Returns: Adressdaten des jeweiligen Items das der AREG URI entspricht """ url = param_uri + r"?f=json" result = {} response = requests.get(url) response_json = response.json() result['hnr'] = response_json['properties']['hnr'] result['stn'] = response_json['properties']['stn'] result['plz'] = response_json['properties']['plz'] result['ort'] = response_json['properties']['ort'] result['geometry'] = response_json['geometry'] return result