Files
buzzer/tool/buzz.py
2026-03-07 08:51:50 +01:00

274 lines
12 KiB
Python

import sys
import os
import argparse
# # Falls buzz.py tief in Unterordnern liegt, stellen wir sicher,
# # dass das Hauptverzeichnis im Pfad ist:
# sys.path.append(os.path.dirname(os.path.abspath(__file__)))
def main():
try:
parser = argparse.ArgumentParser(description="Buzzer Serial Comm Tool")
# Allgemeine Parameter
parser.add_argument("-c", "--config", help="Pfad zur config.yaml (optional)", type=str)
parser.add_argument("-d", "--debug", help="Aktiviert detaillierte Hex-Logs", action="store_true")
# Verbindungsparameter (können auch in config.yaml definiert werden)
parser.add_argument("-p", "--port", help="Serieller Port", type=str)
parser.add_argument("-b", "--baud", help="Baudrate", type=int)
parser.add_argument("-t", "--timeout", help="Timeout in Sekunden", type=float)
# Subparser für Befehle
subparsers = parser.add_subparsers(dest="command", help="Verfügbare Befehle")
# Befehl: crc32
crc32_parser = subparsers.add_parser("crc32", help="CRC32-Checksumme einer Datei oder eines Verzeichnisses berechnen")
crc32_parser.add_argument("path", help="Pfad der Datei auf dem Zielsystem")
# Befehl: flash_info
flash_info_parser = subparsers.add_parser("flash_info", help="Informationen über den Flash-Speicher des Controllers abfragen")
# Befehl: fw_status
fw_status_parser = subparsers.add_parser("fw_status", help="Firmware- und Kernel-Status des Controllers abfragen")
# Befehl: get_file
get_file_parser = subparsers.add_parser("get_file", help="Datei vom Zielsystem herunterladen")
get_file_parser.add_argument("source_path", help="Pfad der Datei auf dem Zielsystem")
get_file_parser.add_argument("dest_path", help="Zielpfad auf dem lokalen System")
# Befehl: ls
ls_parser = subparsers.add_parser("ls", help="Listet Dateien/Ordner in einem Verzeichnis auf")
ls_parser.add_argument("path", help="Pfad auf dem Zielsystem")
ls_parser.add_argument("-r", "--recursive", help="Rekursiv durch die Verzeichnisse durchsuchen", action="store_true")
# Befehl: proto
proto_parser = subparsers.add_parser("proto", help="Protokollversion des Controllers abfragen")
# Befehl: put_file
put_file_parser = subparsers.add_parser("put_file", help="Datei auf das Zielsystem hochladen")
put_file_parser.add_argument("source_path", help="Pfad der Datei auf dem lokalen System")
put_file_parser.add_argument("dest_path", help="Zielpfad auf dem Zielsystem")
put_file_parser.add_argument("-t", "--tags", help="Optionale JSON Tags für den Upload", type=str)
# Befehl: get_tags
get_tags_parser = subparsers.add_parser("get_tags", help="Tags einer Datei anzeigen")
get_tags_parser.add_argument("path", help="Pfad der Datei auf dem Zielsystem")
# Befehl: put_tags
put_tags_parser = subparsers.add_parser("put_tags", help="Tags schreiben")
put_tags_parser.add_argument("path", help="Pfad der Datei auf dem Zielsystem")
put_tags_parser.add_argument("json", help="JSON String (z.B. '{\"json\": {\"t\": \"Titel\"}}')")
put_tags_parser.add_argument("-o", "--overwrite", help="Alle bestehenden JSON-Tags vorher löschen", action="store_true")
# Befehl: rename
rename_parser = subparsers.add_parser("rename", help="Benennen Sie eine Datei oder einen Ordner auf dem Zielsystem um")
rename_parser.add_argument("source_path", help="Aktueller Pfad der Datei/des Ordners auf dem Zielsystem")
rename_parser.add_argument("dest_path", help="Neuer Pfad der Datei/des Ordners auf dem Zielsystem")
# Befehl: rm
rm_parser = subparsers.add_parser("rm", help="Entfernt eine Datei oder einen Ordner auf dem Zielsystem")
rm_parser.add_argument("path", help="Pfad auf dem Zielsystem")
# Befehl: stat
stat_parser = subparsers.add_parser("stat", help="Informationen zu einer Datei/Ordner")
stat_parser.add_argument("path", help="Pfad auf dem Zielsystem")
# Befehl: put_fw
put_fw_parser = subparsers.add_parser("put_fw", help="Firmware-Image auf den Controller hochladen")
put_fw_parser.add_argument("file_path", help="Pfad zur Firmware-Datei auf dem lokalen System")
# Befehl: confirm_fw
confirm_fw_parser = subparsers.add_parser("confirm_fw", help="Bestätigt ein als 'Testing' markiertes Firmware-Image, damit es beim permanent wird")
# Befehl: reboot
reboot_parser = subparsers.add_parser("reboot", help="Neustart des Controllers")
# Befehl: play
play_parser = subparsers.add_parser("play", help="Startet die Wiedergabe einer Datei")
play_parser.add_argument("path", help="Pfad der Datei auf dem Zielsystem")
play_parser.add_argument("-i", "--interrupt", help="Sofortige Wiedergabe (Interrupt)", action="store_true")
# Befehl: stop
stop_parser = subparsers.add_parser("stop", help="Stoppt die aktuelle Wiedergabe")
# Befehl: set
set_parser = subparsers.add_parser("set", help="System-Einstellung setzen")
set_parser.add_argument("key", help="Schlüssel (z.B. audio/vol)")
set_parser.add_argument("value", help="Wert")
# Befehl: get
get_parser = subparsers.add_parser("get", help="System-Einstellung auslesen")
get_parser.add_argument("key", help="Schlüssel (z.B. audio/vol)")
args = parser.parse_args()
if not args.command:
parser.print_help()
sys.exit(0)
from core.config import cfg
from core.utils import console, console_err
if args.config:
cfg.custom_path = args.config
if args.debug:
cfg.debug = True
console.print("[bold blue]Buzzer Tool v1.0[/bold blue]", justify="left")
settings = cfg.serial_settings
settings['debug'] = args.debug
# Überschreibe Einstellungen mit Kommandozeilenparametern, falls vorhanden
if args.port:
settings['port'] = args.port
if args.baud:
settings['baudrate'] = args.baud
if args.timeout:
settings['timeout'] = args.timeout
# Ausgabe der aktuellen Einstellungen
port = settings.get('port')
baud = settings.get('baudrate', 'N/A')
timeout = settings.get('timeout', 'N/A')
if not port:
console_err.print("[error]Fehler: Kein serieller Port angegeben.[/error]")
sys.exit(1)
if args.debug:
console.print(f" • Port: [info]{port}[/info]")
console.print(f" • Baud: [info]{baud}[/info]")
console.print(f" • Timeout: [info]{timeout:1.2f}s[/info]")
console.print("-" * 78)
from core.serial_conn import SerialBus
bus = SerialBus(settings)
try:
bus.open()
if args.command == "crc32":
from core.cmd.crc32 import crc32
cmd = crc32(bus)
result = cmd.get(args.path)
cmd.print(result, args.path)
elif args.command == "get_file":
from core.cmd.get_file import get_file
cmd = get_file(bus)
result = cmd.get(args.source_path, args.dest_path)
cmd.print(result)
elif args.command == "flash_info":
from core.cmd.flash_info import flash_info
cmd = flash_info(bus)
result = cmd.get()
cmd.print(result)
elif args.command == "fw_status":
from core.cmd.fw_status import fw_status
cmd = fw_status(bus)
result = cmd.get()
cmd.print(result)
elif args.command == "ls":
from core.cmd.list_dir import list_dir
cmd = list_dir(bus)
result = cmd.get(args.path, recursive=args.recursive)
cmd.print(result, args.path)
elif args.command == "proto":
from core.cmd.proto import proto
cmd = proto(bus)
result = cmd.get()
cmd.print(result)
elif args.command == "put_file":
from core.cmd.put_file import put_file
cmd = put_file(bus)
result = cmd.get(args.source_path, args.dest_path)
cmd.print(result)
elif args.command == "rename":
from core.cmd.rename import rename
cmd = rename(bus)
result = cmd.get(args.source_path, args.dest_path)
cmd.print(result)
elif args.command == "rm":
from core.cmd.rm import rm
cmd = rm(bus)
result = cmd.get(args.path)
cmd.print(result, args.path)
elif args.command == "stat":
from core.cmd.stat import stat
cmd = stat(bus)
result = cmd.get(args.path)
cmd.print(result, args.path)
elif args.command == "put_file":
from core.cmd.put_file import put_file
cmd = put_file(bus)
result = cmd.get(args.source_path, args.dest_path, cli_tags_json=args.tags)
cmd.print(result)
elif args.command == "get_tags":
from core.cmd.get_tags import get_tags
cmd = get_tags(bus)
result = cmd.get(args.path)
cmd.print(result, args.path)
elif args.command == "put_tags":
from core.cmd.put_tags import put_tags
cmd = put_tags(bus)
result = cmd.get(args.path, args.json, overwrite=args.overwrite)
cmd.print(result, args.path)
elif args.command == "confirm_fw":
from core.cmd.fw_confirm import fw_confirm
cmd = fw_confirm(bus)
result = cmd.get()
cmd.print(result)
elif args.command == "put_fw":
from core.cmd.put_fw import put_fw
cmd = put_fw(bus)
result = cmd.get(args.file_path)
cmd.print(result)
elif args.command == "reboot":
from core.cmd.reboot import reboot
cmd = reboot(bus)
result = cmd.get()
cmd.print(result)
elif args.command == "play":
from core.cmd.play import play
cmd = play(bus)
result = cmd.get(args.path, interrupt=args.interrupt)
cmd.print(result, args.path, interrupt=args.interrupt)
elif args.command == "stop":
from core.cmd.stop import stop
cmd = stop(bus)
result = cmd.get()
cmd.print(result)
elif args.command == "set":
from core.cmd.set_setting import set_setting
cmd = set_setting(bus)
result = cmd.get(args.key, args.value)
cmd.print(result, args.key, args.value)
elif args.command == "get":
from core.cmd.get_setting import get_setting
cmd = get_setting(bus)
result = cmd.get(args.key)
cmd.print(result, args.key)
finally:
bus.close()
except FileNotFoundError as e:
console_err.print(f"[error]Fehler: {e}[/error]")
sys.exit(1)
except (TimeoutError, IOError, ValueError) as e:
console_err.print(f"[bold red]KOMMUNIKATIONSFEHLER:[/bold red] [error_msg]{e}[/error_msg]")
sys.exit(1) # Beendet das Script mit Fehlercode 1 für Tests
except Exception as e:
# Hier fangen wir auch deinen neuen ControllerError ab
from core.serial_conn import ControllerError
if isinstance(e, ControllerError):
console_err.print(f"[bold red]CONTROLLER FEHLER:[/bold red] [error_msg]{e}[/error_msg]")
else:
console_err.print(f"[bold red]UNERWARTETER FEHLER:[/bold red] [error_msg]{e}[/error_msg]")
if args.debug:
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
main()