# core/connection.py import serial import time import os PROTOCOL_ERROR_MESSAGES = { 0x01: "Ungültiger Befehl.", 0x02: "Ungültige Parameter.", 0x03: "Befehl oder Parameter sind zu lang.", 0x10: "Datei oder Verzeichnis wurde nicht gefunden.", 0x11: "Ziel existiert bereits.", 0x12: "Pfad ist kein Verzeichnis.", 0x13: "Pfad ist ein Verzeichnis.", 0x14: "Zugriff verweigert.", 0x15: "Kein freier Speicher mehr vorhanden.", 0x16: "Datei ist zu groß.", 0x20: "Allgemeiner Ein-/Ausgabefehler auf dem Gerät.", 0x21: "Zeitüberschreitung auf dem Gerät.", 0x22: "CRC-Prüfung fehlgeschlagen (Daten beschädigt).", 0x23: "Übertragung wurde vom Gerät abgebrochen.", 0x30: "Befehl wird vom Gerät nicht unterstützt.", 0x31: "Gerät ist beschäftigt.", 0x32: "Interner Gerätefehler.", } class BuzzerError(Exception): pass class BuzzerConnection: def __init__(self, config): self.port = config.get("port") self.baudrate = config.get("baudrate", 115200) self.timeout = config.get("timeout", 5.0) self.serial = None def __enter__(self): if not self.port: raise ValueError("Kein serieller Port konfiguriert.") # write_timeout verhindert endloses Blockieren auf inaktiven Ports self.serial = serial.Serial( port=self.port, baudrate=self.baudrate, timeout=self.timeout, write_timeout=self.timeout ) self.serial.reset_input_buffer() return self def __exit__(self, exc_type, exc_val, exc_tb): if self.serial and self.serial.is_open: self.serial.close() def _parse_controller_error(self, line: str) -> str: code_str = line.split(" ", 1)[1].strip() if " " in line else "" try: code = int(code_str, 10) except ValueError: return f"Controller meldet einen unbekannten Fehler: '{line}'" message = PROTOCOL_ERROR_MESSAGES.get(code, "Unbekannter Fehlercode vom Gerät.") return f"Controller-Fehler {code} (0x{code:02X}): {message}" def send_command(self, command: str, custom_timeout: float = None) -> list: eff_timeout = custom_timeout if custom_timeout is not None else self.timeout self.serial.reset_input_buffer() try: self.serial.write(f"{command}\n".encode('utf-8')) self.serial.flush() except serial.SerialTimeoutException: raise TimeoutError(f"Schreib-Timeout am Port {self.port}. Ist das Gerät blockiert?") lines = [] start_time = time.monotonic() while (time.monotonic() - start_time) < eff_timeout: if self.serial.in_waiting > 0: try: line = self.serial.readline().decode('utf-8', errors='ignore').strip() if not line: continue if line == "OK": return lines elif line.startswith("ERR"): raise BuzzerError(self._parse_controller_error(line)) else: lines.append(line) except BuzzerError: raise except Exception as e: raise BuzzerError(f"Fehler beim Lesen der Antwort: {e}") else: time.sleep(0.01) raise TimeoutError(f"Lese-Timeout ({eff_timeout}s) beim Warten auf Antwort für: '{command}'") def send_binary(self, filepath: str, chunk_size: int = 4096, timeout: float = 10.0, progress_callback=None): """ Überträgt eine Binärdatei in Chunks, nachdem das READY-Signal empfangen wurde. """ # 1. Warte auf die READY-Bestätigung vom Controller start_time = time.time() ready = False while (time.time() - start_time) < timeout: if self.serial.in_waiting > 0: line = self.serial.readline().decode('utf-8', errors='ignore').strip() if line == "READY": ready = True break elif line.startswith("ERR"): raise BuzzerError(f"Fehler vor Binärtransfer: {self._parse_controller_error(line)}") time.sleep(0.01) if not ready: raise TimeoutError("Kein READY-Signal vom Controller empfangen.") # 2. Sende die Datei in Blöcken file_size = os.path.getsize(filepath) bytes_sent = 0 with open(filepath, 'rb') as f: while bytes_sent < file_size: # 1. Nicht blockierende Fehlerprüfung vor jedem Chunk if self.serial.in_waiting > 0: line = self.serial.readline().decode('utf-8', errors='ignore').strip() if line.startswith("ERR"): raise BuzzerError(f"Controller hat Transfer abgebrochen: {self._parse_controller_error(line)}") # 2. Chunk lesen und schreiben chunk = f.read(chunk_size) if not chunk: break self.serial.write(chunk) # WICHTIG: self.serial.flush() hier entfernen. # Dies verhindert den Deadlock mit dem OS-USB-Puffer. bytes_sent += len(chunk) # 3. Callback für UI if progress_callback: progress_callback(len(chunk)) # 3. Warte auf das finale OK (oder ERR bei CRC/Schreib-Fehlern) start_time = time.time() while (time.time() - start_time) < timeout: if self.serial.in_waiting > 0: line = self.serial.readline().decode('utf-8', errors='ignore').strip() if line == "OK": return True elif line.startswith("ERR"): raise BuzzerError(f"Fehler beim Speichern der Binärdatei: {self._parse_controller_error(line)}") time.sleep(0.01) raise TimeoutError("Zeitüberschreitung nach Binärtransfer (kein OK empfangen).")