added python tool inital version

This commit is contained in:
2026-02-25 10:09:17 +01:00
parent 288b1e45ef
commit 80c0e825a7
26 changed files with 369 additions and 0 deletions

View File

@@ -0,0 +1,35 @@
# core/commands/info.py
from core.connection import BuzzerError
def execute(conn) -> dict:
"""Holt die Systeminformationen und gibt sie als strukturiertes Dictionary zurück."""
lines = conn.send_command("info")
if not lines:
raise BuzzerError("Keine Antwort auf 'info' empfangen.")
parts = lines[0].split(';')
if len(parts) != 5:
raise BuzzerError(f"Unerwartetes Info-Format: {lines[0]}")
protocol_version = int(parts[0])
if protocol_version != 1:
raise BuzzerError(f"Inkompatibles Protokoll: Gerät nutzt v{protocol_version}, Host erwartet v1.")
app_version = parts[1]
f_frsize = int(parts[2])
f_blocks = int(parts[3])
f_bfree = int(parts[4])
total_kb = (f_blocks * f_frsize) / 1024
free_kb = (f_bfree * f_frsize) / 1024
used_kb = total_kb - free_kb
percent_used = (used_kb / total_kb) * 100 if total_kb > 0 else 0
return {
"protocol_version": protocol_version,
"app_version": app_version,
"total_kb": total_kb,
"free_kb": free_kb,
"used_kb": used_kb,
"percent_used": percent_used
}

View File

@@ -0,0 +1,84 @@
# core/commands/ls.py
from core.connection import BuzzerError
def get_file_tree(conn, target_path="/", recursive=False) -> list:
"""
Liest das Dateisystem aus und gibt eine hierarchische Baumstruktur zurück.
"""
if not target_path.endswith('/'):
target_path += '/'
cmd_path = target_path.rstrip('/') if target_path != '/' else '/'
try:
lines = conn.send_command(f"ls {cmd_path}")
except BuzzerError as e:
return [{"type": "E", "name": f"Fehler beim Lesen: {e}", "path": target_path}]
nodes = []
if not lines:
return nodes
for line in lines:
parts = line.split(',', 2)
if len(parts) != 3:
continue
entry_type, entry_size, entry_name = parts
node = {
"type": entry_type,
"name": entry_name,
"path": f"{target_path}{entry_name}"
}
if entry_type == 'D':
if recursive:
# Rekursiver Aufruf auf dem Host für Unterverzeichnisse
node["children"] = get_file_tree(conn, f"{target_path}{entry_name}/", recursive=True)
else:
node["children"] = []
elif entry_type == 'F':
node["size"] = int(entry_size)
nodes.append(node)
return nodes
def print_tree(nodes, prefix="", path=""):
"""
Gibt die Baumstruktur optisch formatiert auf der Konsole aus.
"""
if path:
if path == "/":
display_path = "💾 "+"/ (Root)"
else:
display_path = "📁 " + path
print(f"{prefix}{display_path}")
for i, node in enumerate(nodes):
is_last = (i == len(nodes) - 1)
connector = " └─" if is_last else " ├─"
if node["type"] == 'D':
print(f"{prefix}{connector}📁 {node['name']}")
extension = " " if is_last else ""
if "children" in node and node["children"]:
print_tree(node["children"], prefix + extension)
elif node["type"] == 'F':
size_kb = node["size"] / 1024
# \033[90m macht den Text dunkelgrau, \033[0m setzt die Farbe zurück
print(f"{prefix}{connector}📄 {node['name']} \033[90m({size_kb:.1f} KB)\033[0m")
elif node["type"] == 'E':
print(f"{prefix}{connector}\033[31m{node['name']}\033[0m")
def get_flat_file_list(nodes) -> list:
"""
Wandelt die Baumstruktur in eine flache Liste von Dateipfaden um.
Wird von 'rm -r' benötigt, um nacheinander alle Dateien zu löschen.
"""
flat_list = []
for node in nodes:
if node["type"] == 'F':
flat_list.append(node)
elif node["type"] == 'D' and "children" in node:
flat_list.extend(get_flat_file_list(node["children"]))
return flat_list

View File

View File

@@ -0,0 +1,40 @@
# core/config.py
import os
import sys
import yaml
DEFAULT_CONFIG = {
"port": None,
"baudrate": 115200,
"timeout": 5.0
}
def load_config(cli_args=None):
config = DEFAULT_CONFIG.copy()
cwd_config = os.path.join(os.getcwd(), "config.yaml")
script_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
script_config = os.path.join(script_dir, "config.yaml")
yaml_path = cwd_config if os.path.exists(cwd_config) else script_config if os.path.exists(script_config) else None
if yaml_path:
try:
with open(yaml_path, "r", encoding="utf-8") as f:
yaml_data = yaml.safe_load(f)
if yaml_data and "serial" in yaml_data:
config["port"] = yaml_data["serial"].get("port", config["port"])
config["baudrate"] = yaml_data["serial"].get("baudrate", config["baudrate"])
config["timeout"] = yaml_data["serial"].get("timeout", config["timeout"])
except Exception as e:
print(f"Fehler beim Laden der Konfigurationsdatei {yaml_path}: {e}")
if cli_args:
if getattr(cli_args, "port", None) is not None:
config["port"] = cli_args.port
if getattr(cli_args, "baudrate", None) is not None:
config["baudrate"] = cli_args.baudrate
if getattr(cli_args, "timeout", None) is not None:
config["timeout"] = cli_args.timeout
return config

View File

@@ -0,0 +1,111 @@
# core/connection.py
import serial
import time
class BuzzerError(Exception):
pass
class BuzzerConnection:
def __init__(self, config):
self.port = config.get("port")
self.baudrate = config.get("baudrate", 115200)
self.timeout = config.get("timeout", 5.0)
self.serial = None
def __enter__(self):
if not self.port:
raise ValueError("Kein serieller Port konfiguriert.")
# write_timeout verhindert endloses Blockieren auf inaktiven Ports
self.serial = serial.Serial(
port=self.port,
baudrate=self.baudrate,
timeout=self.timeout,
write_timeout=self.timeout
)
self.serial.reset_input_buffer()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.serial and self.serial.is_open:
self.serial.close()
def send_command(self, command: str, custom_timeout: float = None) -> list:
eff_timeout = custom_timeout if custom_timeout is not None else self.timeout
self.serial.reset_input_buffer()
try:
self.serial.write(f"{command}\n".encode('utf-8'))
self.serial.flush()
except serial.SerialTimeoutException:
raise TimeoutError(f"Schreib-Timeout am Port {self.port}. Ist das Gerät blockiert?")
lines = []
start_time = time.monotonic()
while (time.monotonic() - start_time) < eff_timeout:
if self.serial.in_waiting > 0:
try:
line = self.serial.readline().decode('utf-8', errors='ignore').strip()
if not line:
continue
if line == "OK":
return lines
elif line.startswith("ERR"):
err_code = line.split(" ")[1] if " " in line else "UNKNOWN"
raise BuzzerError(f"Controller meldet Fehlercode: {err_code}")
else:
lines.append(line)
except Exception as e:
raise BuzzerError(f"Fehler beim Lesen der Antwort: {e}")
else:
time.sleep(0.01)
raise TimeoutError(f"Lese-Timeout ({eff_timeout}s) beim Warten auf Antwort für: '{command}'")
def send_binary(self, filepath: str, chunk_size: int = 512, timeout: float = 10.0):
"""
Überträgt eine Binärdatei in Chunks, nachdem das READY-Signal empfangen wurde.
"""
# 1. Warte auf die READY-Bestätigung vom Controller
start_time = time.time()
ready = False
while (time.time() - start_time) < timeout:
if self.serial.in_waiting > 0:
line = self.serial.readline().decode('utf-8', errors='ignore').strip()
if line == "READY":
ready = True
break
elif line.startswith("ERR"):
raise BuzzerError(f"Fehler vor Binärtransfer: {line}")
time.sleep(0.01)
if not ready:
raise TimeoutError("Kein READY-Signal vom Controller empfangen.")
# 2. Sende die Datei in Blöcken
file_size = os.path.getsize(filepath)
bytes_sent = 0
with open(filepath, 'rb') as f:
while bytes_sent < file_size:
chunk = f.read(chunk_size)
if not chunk:
break
self.serial.write(chunk)
# Flush blockiert, bis die Daten an den OS-USB-Treiber übergeben wurden
self.serial.flush()
bytes_sent += len(chunk)
# 3. Warte auf das finale OK (oder ERR bei CRC/Schreib-Fehlern)
start_time = time.time()
while (time.time() - start_time) < timeout:
if self.serial.in_waiting > 0:
line = self.serial.readline().decode('utf-8', errors='ignore').strip()
if line == "OK":
return True
elif line.startswith("ERR"):
raise BuzzerError(f"Fehler beim Speichern der Binärdatei: {line}")
time.sleep(0.01)
raise TimeoutError("Zeitüberschreitung nach Binärtransfer (kein OK empfangen).")