217 lines
11 KiB
Python
217 lines
11 KiB
Python
# buzzer.py
|
|
import argparse
|
|
import sys
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Edis Buzzer Host Tool")
|
|
|
|
# Globale Argumente (gelten für alle Befehle)
|
|
parser.add_argument("-p", "--port", type=str, help="Serielle Schnittstelle (z.B. COM15)")
|
|
parser.add_argument("-b", "--baudrate", type=int, help="Verbindungsgeschwindigkeit")
|
|
parser.add_argument("-t", "--timeout", type=float, help="Timeout in Sekunden (Standard: 5.0)")
|
|
parser.add_argument("--no-auto-info", action="store_true", help="Überspringt den automatischen Info-Call beim Start")
|
|
|
|
# Subkommandos einrichten
|
|
subparsers = parser.add_subparsers(dest="command", help="Verfügbare Befehle")
|
|
|
|
# Befehl: info (expliziter Aufruf, obwohl es ohnehin immer angezeigt wird)
|
|
subparsers.add_parser("info", help="Zeigt nur die Systeminformationen an")
|
|
|
|
# Befehl: ls
|
|
ls_parser = subparsers.add_parser("ls", help="Listet Dateien und Verzeichnisse auf")
|
|
ls_parser.add_argument("path", nargs="?", default="/", help="Zielpfad (Standard: /)")
|
|
ls_parser.add_argument("-r", "--recursive", action="store_true", help="Rekursiv auflisten")
|
|
|
|
# Befehl: put
|
|
put_parser = subparsers.add_parser("put", help="Lädt eine oder mehrere Dateien auf den Controller hoch")
|
|
put_parser.add_argument("sources", nargs="+", help="Lokale Quelldatei(en), Verzeichnisse oder Wildcards (z.B. *.raw)")
|
|
put_parser.add_argument("target", type=str, help="Zielpfad auf dem Controller (Verzeichnis muss mit '/' enden)")
|
|
put_parser.add_argument("-r", "--recursive", action="store_true", help="Verzeichnisse rekursiv hochladen")
|
|
|
|
# Befehl: put_many
|
|
put_many_parser = subparsers.add_parser("put_many", help="Lädt mehrere Dateien/Verzeichnisse (typisch rekursiv) hoch")
|
|
put_many_parser.add_argument("sources", nargs="+", help="Lokale Quelldatei(en), Verzeichnisse oder Wildcards")
|
|
put_many_parser.add_argument("target", type=str, help="Zielpfad auf dem Controller")
|
|
put_many_parser.add_argument("-r", "--recursive", action="store_true", help="Verzeichnisse rekursiv hochladen (Standard für put_many)")
|
|
|
|
# Befehl: fw_put
|
|
fw_put_parser = subparsers.add_parser("fw_put", help="Lädt eine Firmware in den Secondary Slot (Test-Upgrade)")
|
|
fw_put_parser.add_argument("source", type=str, help="Lokale Firmware-Datei (.bin)")
|
|
|
|
# Befehl: mkdir
|
|
mkdir_parser = subparsers.add_parser("mkdir", help="Erstellt ein neues Verzeichnis")
|
|
mkdir_parser.add_argument("path", type=str, help="Pfad des neuen Verzeichnisses (z.B. /lfs/a/neu)")
|
|
|
|
# Befehl: rm
|
|
rm_parser = subparsers.add_parser("rm", help="Löscht eine Datei oder ein Verzeichnis")
|
|
rm_parser.add_argument("path", type=str, help="Pfad der zu löschenden Datei/Ordner")
|
|
rm_parser.add_argument("-r", "--recursive", action="store_true", help="Ordnerinhalte rekursiv löschen")
|
|
|
|
# Befehl: stat
|
|
stat_parser = subparsers.add_parser("stat", help="Zeigt Typ und Größe einer Datei/eines Verzeichnisses")
|
|
stat_parser.add_argument("path", type=str, help="Pfad der Datei/des Verzeichnisses")
|
|
|
|
# Befehl: mv
|
|
mv_parser = subparsers.add_parser("mv", help="Benennt eine Datei/ein Verzeichnis um oder verschiebt es")
|
|
mv_parser.add_argument("source", type=str, help="Alter Pfad")
|
|
mv_parser.add_argument("target", type=str, help="Neuer Pfad")
|
|
|
|
# Befehl: pull
|
|
pull_parser = subparsers.add_parser("pull", help="Lädt eine Datei vom Controller herunter")
|
|
pull_parser.add_argument("source", type=str, help="Quellpfad auf dem Controller")
|
|
pull_parser.add_argument("target", nargs="?", default=None, help="Optionaler lokaler Zielpfad")
|
|
|
|
# Alias: get_file
|
|
get_file_parser = subparsers.add_parser("get_file", help="Alias für pull")
|
|
get_file_parser.add_argument("source", type=str, help="Quellpfad auf dem Controller")
|
|
get_file_parser.add_argument("target", nargs="?", default=None, help="Optionaler lokaler Zielpfad")
|
|
|
|
# Befehl: play
|
|
play_parser = subparsers.add_parser("play", help="Spielt eine Datei auf dem Controller ab")
|
|
play_parser.add_argument("path", type=str, help="Pfad der abzuspielenden Datei (z.B. /lfs/a/neu)")
|
|
|
|
# Befehl: check
|
|
check_parser = subparsers.add_parser("check", help="Holt die CRC32 einer Datei und zeigt sie an")
|
|
check_parser.add_argument("path", type=str, help="Pfad der zu prüfenden Datei (z.B. /lfs/a/neu)")
|
|
|
|
# Befehl: confirm
|
|
confirm_parser = subparsers.add_parser("confirm", help="Bestätigt die aktuell laufende Firmware")
|
|
|
|
# Befehl: reboot
|
|
reboot_parser = subparsers.add_parser("reboot", help="Startet den Buzzer neu")
|
|
|
|
# Befehl: get_tags (neuer Blob/TLV-Parser)
|
|
get_tags_parser = subparsers.add_parser("get_tags", help="Holt alle Tags einer Datei als JSON")
|
|
get_tags_parser.add_argument("path", type=str, help="Pfad der Datei")
|
|
|
|
# Befehl: write_tags (merge/replace per JSON)
|
|
write_tags_parser = subparsers.add_parser("write_tags", help="Fügt Tags ein/ersetzt bestehende Tags per JSON")
|
|
write_tags_parser.add_argument("path", type=str, help="Pfad der Datei")
|
|
write_tags_parser.add_argument("json", type=str, help="JSON-Objekt oder @datei.json")
|
|
|
|
# Befehl: remove_tag
|
|
remove_tag_parser = subparsers.add_parser("remove_tag", help="Entfernt einen Tag und schreibt den Rest zurück")
|
|
remove_tag_parser.add_argument("path", type=str, help="Pfad der Datei")
|
|
remove_tag_parser.add_argument("key", type=str, choices=["description", "author", "crc32", "fileformat"], help="Zu entfernender Tag-Key")
|
|
|
|
# Argumente parsen
|
|
args = parser.parse_args()
|
|
from core.config import load_config
|
|
config = load_config(args)
|
|
|
|
print("--- Aktuelle Verbindungsparameter -------------------------------")
|
|
print(f"Port: {config.get('port', 'Nicht definiert')}")
|
|
print(f"Baudrate: {config.get('baudrate')}")
|
|
print(f"Timeout: {config.get('timeout')}s")
|
|
print("-" * 65)
|
|
|
|
if not config.get("port"):
|
|
print("Abbruch: Es muss ein Port in der config.yaml oder via --port definiert werden.")
|
|
sys.exit(1)
|
|
|
|
from core.connection import BuzzerConnection, BuzzerError
|
|
|
|
try:
|
|
with BuzzerConnection(config) as conn:
|
|
if not args.no_auto_info:
|
|
from core.commands import info
|
|
sys_info = info.execute(conn)
|
|
|
|
status = sys_info.get("image_status", "UNKNOWN")
|
|
status_colors = {
|
|
"CONFIRMED": "\033[32m",
|
|
"TESTING": "\033[33m",
|
|
"PENDING": "\033[36m",
|
|
}
|
|
status_color = status_colors.get(status, "\033[37m")
|
|
|
|
print(f"Buzzer Firmware: v{sys_info['app_version']} [{status_color}{status}\033[0m] (Protokoll v{sys_info['protocol_version']})")
|
|
print(f"LittleFS Status: {sys_info['used_kb']:.1f} KB / {sys_info['total_kb']:.1f} KB belegt ({sys_info['percent_used']:.1f}%)")
|
|
print("-" * 65)
|
|
|
|
# 2. Spezifisches Kommando ausführen
|
|
if args.command == "ls":
|
|
from core.commands import ls
|
|
print(f"Inhalt von '{args.path}':\n")
|
|
tree = ls.get_file_tree(conn, target_path=args.path, recursive=args.recursive)
|
|
if not tree:
|
|
print(" (Leer)")
|
|
else:
|
|
ls.print_tree(tree, path=args.path )
|
|
elif args.command == "put":
|
|
from core.commands import put
|
|
put.execute(conn, sources=args.sources, target=args.target, recursive=args.recursive)
|
|
elif args.command == "put_many":
|
|
from core.commands import put
|
|
recursive = True if not args.recursive else args.recursive
|
|
put.execute(conn, sources=args.sources, target=args.target, recursive=recursive)
|
|
elif args.command == "fw_put":
|
|
from core.commands import fw_put
|
|
fw_put.execute(conn, source=args.source)
|
|
elif args.command == "mkdir":
|
|
from core.commands import mkdir
|
|
mkdir.execute(conn, path=args.path)
|
|
elif args.command == "rm":
|
|
from core.commands import rm
|
|
rm.execute(conn, path=args.path, recursive=args.recursive)
|
|
elif args.command == "stat":
|
|
from core.commands import stat
|
|
stat.execute(conn, path=args.path)
|
|
elif args.command == "mv":
|
|
from core.commands import mv
|
|
mv.execute(conn, source=args.source, target=args.target)
|
|
elif args.command == "pull" or args.command == "get_file":
|
|
from core.commands import pull
|
|
pull.execute(conn, source=args.source, target=args.target)
|
|
elif args.command == "confirm":
|
|
from core.commands import confirm
|
|
confirm.execute(conn)
|
|
elif args.command == "reboot":
|
|
from core.commands import reboot
|
|
reboot.execute(conn)
|
|
elif args.command == "play":
|
|
from core.commands import play
|
|
play.execute(conn, path=args.path)
|
|
elif args.command == "check":
|
|
from core.commands import check
|
|
CRC32 = check.execute(conn, path=args.path)
|
|
if CRC32:
|
|
size_bytes = CRC32.get("size_bytes")
|
|
if isinstance(size_bytes, int) and size_bytes >= 0:
|
|
size_kb = size_bytes / 1024.0
|
|
print(f"CRC32 von '{args.path}': 0x{CRC32['crc32']:08x} (Größe: {size_bytes} B / {size_kb:.1f} KB)")
|
|
else:
|
|
print(f"CRC32 von '{args.path}': 0x{CRC32['crc32']:08x}")
|
|
else:
|
|
print(f"Fehler: Keine CRC32-Information für '{args.path}' erhalten.")
|
|
elif args.command == "get_tags":
|
|
from core.commands import tags
|
|
tag_map = tags.get_tags(conn, args.path)
|
|
print(tag_map)
|
|
elif args.command == "write_tags":
|
|
from core.commands import tags
|
|
updates = tags.parse_tags_json_input(args.json)
|
|
result = tags.write_tags(conn, args.path, updates)
|
|
print("Aktuelle Tags:")
|
|
print(result)
|
|
elif args.command == "remove_tag":
|
|
from core.commands import tags
|
|
result = tags.remove_tag(conn, args.path, args.key)
|
|
print("Aktuelle Tags:")
|
|
print(result)
|
|
elif args.command == "info" or args.command is None:
|
|
# Wurde kein Befehl oder explizit 'info' angegeben, sind wir hier schon fertig
|
|
pass
|
|
|
|
except TimeoutError as e:
|
|
print(f"Fehler: {e}")
|
|
sys.exit(1)
|
|
except BuzzerError as e:
|
|
print(f"Buzzer hat die Aktion abgelehnt: {e}")
|
|
sys.exit(1)
|
|
except Exception as e:
|
|
print(f"Verbindungsfehler auf {config.get('port')}: {e}")
|
|
sys.exit(1)
|
|
|
|
if __name__ == "__main__":
|
|
main() |