123 lines
5.8 KiB
Python
123 lines
5.8 KiB
Python
# buzzer.py
|
|
import argparse
|
|
import sys
|
|
from core.config import load_config
|
|
from core.connection import BuzzerConnection, BuzzerError
|
|
from core.commands import info, ls, put, mkdir, rm, confirm, reboot, play, check
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Edis Buzzer Host Tool")
|
|
|
|
# Globale Argumente (gelten für alle Befehle)
|
|
parser.add_argument("-p", "--port", type=str, help="Serielle Schnittstelle (z.B. COM15)")
|
|
parser.add_argument("-b", "--baudrate", type=int, help="Verbindungsgeschwindigkeit")
|
|
parser.add_argument("-t", "--timeout", type=float, help="Timeout in Sekunden (Standard: 5.0)")
|
|
|
|
# Subkommandos einrichten
|
|
subparsers = parser.add_subparsers(dest="command", help="Verfügbare Befehle")
|
|
|
|
# Befehl: info (expliziter Aufruf, obwohl es ohnehin immer angezeigt wird)
|
|
subparsers.add_parser("info", help="Zeigt nur die Systeminformationen an")
|
|
|
|
# Befehl: ls
|
|
ls_parser = subparsers.add_parser("ls", help="Listet Dateien und Verzeichnisse auf")
|
|
ls_parser.add_argument("path", nargs="?", default="/", help="Zielpfad (Standard: /)")
|
|
ls_parser.add_argument("-r", "--recursive", action="store_true", help="Rekursiv auflisten")
|
|
|
|
# Befehl: put
|
|
put_parser = subparsers.add_parser("put", help="Lädt eine oder mehrere Dateien auf den Controller hoch")
|
|
put_parser.add_argument("sources", nargs="+", help="Lokale Quelldatei(en) oder Wildcards (z.B. *.raw)")
|
|
put_parser.add_argument("target", type=str, help="Zielpfad auf dem Controller (Verzeichnis muss mit '/' enden)")
|
|
|
|
# Befehl: mkdir
|
|
mkdir_parser = subparsers.add_parser("mkdir", help="Erstellt ein neues Verzeichnis")
|
|
mkdir_parser.add_argument("path", type=str, help="Pfad des neuen Verzeichnisses (z.B. /lfs/a/neu)")
|
|
|
|
# Befehl: rm
|
|
rm_parser = subparsers.add_parser("rm", help="Löscht eine Datei oder ein Verzeichnis")
|
|
rm_parser.add_argument("path", type=str, help="Pfad der zu löschenden Datei/Ordner")
|
|
rm_parser.add_argument("-r", "--recursive", action="store_true", help="Ordnerinhalte rekursiv löschen")
|
|
|
|
# Befehl: play
|
|
play_parser = subparsers.add_parser("play", help="Spielt eine Datei auf dem Controller ab")
|
|
play_parser.add_argument("path", type=str, help="Pfad der abzuspielenden Datei (z.B. /lfs/a/neu)")
|
|
|
|
# Befehl: check
|
|
check_parser = subparsers.add_parser("check", help="Holt die CRC32 einer Datei und zeigt sie an")
|
|
check_parser.add_argument("path", type=str, help="Pfad der zu prüfenden Datei (z.B. /lfs/a/neu)")
|
|
|
|
# Befehl: confirm
|
|
confirm_parser = subparsers.add_parser("confirm", help="Bestätigt die aktuell laufende Firmware")
|
|
|
|
# Befehl: reboot
|
|
reboot_parser = subparsers.add_parser("reboot", help="Startet den Buzzer neu")
|
|
|
|
# Argumente parsen
|
|
args = parser.parse_args()
|
|
config = load_config(args)
|
|
|
|
print("--- Aktuelle Verbindungsparameter ---------------------")
|
|
print(f"Port: {config.get('port', 'Nicht definiert')}")
|
|
print(f"Baudrate: {config.get('baudrate')}")
|
|
print(f"Timeout: {config.get('timeout')}s")
|
|
print("-" * 55)
|
|
|
|
if not config.get("port"):
|
|
print("Abbruch: Es muss ein Port in der config.yaml oder via --port definiert werden.")
|
|
sys.exit(1)
|
|
|
|
try:
|
|
with BuzzerConnection(config) as conn:
|
|
# 1. Immer die Info holen und anzeigen
|
|
sys_info = info.execute(conn)
|
|
|
|
# Neu: Status auslesen und Farbe zuweisen (Grün für CONFIRMED, Gelb für UNCONFIRMED)
|
|
status = sys_info.get("image_status", "UNKNOWN")
|
|
status_color = "\033[32m" if status == "CONFIRMED" else "\033[33m"
|
|
|
|
# Neu: Die print-Anweisung enthält nun den formatierten Status
|
|
print(f"Buzzer Firmware: v{sys_info['app_version']} [{status_color}{status}\033[0m] (Protokoll v{sys_info['protocol_version']})")
|
|
print(f"LittleFS Status: {sys_info['used_kb']:.1f} KB / {sys_info['total_kb']:.1f} KB belegt ({sys_info['percent_used']:.1f}%)")
|
|
print("-" * 55)
|
|
# 2. Spezifisches Kommando ausführen
|
|
if args.command == "ls":
|
|
print(f"Inhalt von '{args.path}':\n")
|
|
tree = ls.get_file_tree(conn, target_path=args.path, recursive=args.recursive)
|
|
if not tree:
|
|
print(" (Leer)")
|
|
else:
|
|
ls.print_tree(tree, path=args.path )
|
|
elif args.command == "put":
|
|
put.execute(conn, sources=args.sources, target=args.target)
|
|
elif args.command == "mkdir":
|
|
mkdir.execute(conn, path=args.path)
|
|
elif args.command == "rm":
|
|
rm.execute(conn, path=args.path, recursive=args.recursive)
|
|
elif args.command == "confirm":
|
|
confirm.execute(conn)
|
|
elif args.command == "reboot":
|
|
reboot.execute(conn)
|
|
elif args.command == "play":
|
|
play.execute(conn, path=args.path)
|
|
elif args.command == "check":
|
|
CRC32 = check.execute(conn, path=args.path)
|
|
if CRC32:
|
|
print(f"CRC32 von '{args.path}': 0x{CRC32['crc32']:08x}")
|
|
else:
|
|
print(f"Fehler: Keine CRC32-Information für '{args.path}' erhalten.")
|
|
elif args.command == "info" or args.command is None:
|
|
# Wurde kein Befehl oder explizit 'info' angegeben, sind wir hier schon fertig
|
|
pass
|
|
|
|
except TimeoutError as e:
|
|
print(f"Fehler: {e}")
|
|
sys.exit(1)
|
|
except BuzzerError as e:
|
|
print(f"Buzzer hat die Aktion abgelehnt: {e}")
|
|
sys.exit(1)
|
|
except Exception as e:
|
|
print(f"Verbindungsfehler auf {config.get('port')}: {e}")
|
|
sys.exit(1)
|
|
|
|
if __name__ == "__main__":
|
|
main() |