274 lines
12 KiB
Python
274 lines
12 KiB
Python
import sys
|
|
import os
|
|
import argparse
|
|
|
|
# # Falls buzz.py tief in Unterordnern liegt, stellen wir sicher,
|
|
# # dass das Hauptverzeichnis im Pfad ist:
|
|
# sys.path.append(os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
def main():
|
|
try:
|
|
parser = argparse.ArgumentParser(description="Buzzer Serial Comm Tool")
|
|
|
|
# Allgemeine Parameter
|
|
parser.add_argument("-c", "--config", help="Pfad zur config.yaml (optional)", type=str)
|
|
parser.add_argument("-d", "--debug", help="Aktiviert detaillierte Hex-Logs", action="store_true")
|
|
|
|
# Verbindungsparameter (können auch in config.yaml definiert werden)
|
|
parser.add_argument("-p", "--port", help="Serieller Port", type=str)
|
|
parser.add_argument("-b", "--baud", help="Baudrate", type=int)
|
|
parser.add_argument("-t", "--timeout", help="Timeout in Sekunden", type=float)
|
|
|
|
# Subparser für Befehle
|
|
subparsers = parser.add_subparsers(dest="command", help="Verfügbare Befehle")
|
|
|
|
# Befehl: crc32
|
|
crc32_parser = subparsers.add_parser("crc32", help="CRC32-Checksumme einer Datei oder eines Verzeichnisses berechnen")
|
|
crc32_parser.add_argument("path", help="Pfad der Datei auf dem Zielsystem")
|
|
|
|
# Befehl: flash_info
|
|
flash_info_parser = subparsers.add_parser("flash_info", help="Informationen über den Flash-Speicher des Controllers abfragen")
|
|
|
|
# Befehl: fw_status
|
|
fw_status_parser = subparsers.add_parser("fw_status", help="Firmware- und Kernel-Status des Controllers abfragen")
|
|
|
|
# Befehl: get_file
|
|
get_file_parser = subparsers.add_parser("get_file", help="Datei vom Zielsystem herunterladen")
|
|
get_file_parser.add_argument("source_path", help="Pfad der Datei auf dem Zielsystem")
|
|
get_file_parser.add_argument("dest_path", help="Zielpfad auf dem lokalen System")
|
|
|
|
# Befehl: ls
|
|
ls_parser = subparsers.add_parser("ls", help="Listet Dateien/Ordner in einem Verzeichnis auf")
|
|
ls_parser.add_argument("path", help="Pfad auf dem Zielsystem")
|
|
ls_parser.add_argument("-r", "--recursive", help="Rekursiv durch die Verzeichnisse durchsuchen", action="store_true")
|
|
|
|
# Befehl: proto
|
|
proto_parser = subparsers.add_parser("proto", help="Protokollversion des Controllers abfragen")
|
|
|
|
# Befehl: put_file
|
|
put_file_parser = subparsers.add_parser("put_file", help="Datei auf das Zielsystem hochladen")
|
|
put_file_parser.add_argument("source_path", help="Pfad der Datei auf dem lokalen System")
|
|
put_file_parser.add_argument("dest_path", help="Zielpfad auf dem Zielsystem")
|
|
put_file_parser.add_argument("-t", "--tags", help="Optionale JSON Tags für den Upload", type=str)
|
|
|
|
# Befehl: get_tags
|
|
get_tags_parser = subparsers.add_parser("get_tags", help="Tags einer Datei anzeigen")
|
|
get_tags_parser.add_argument("path", help="Pfad der Datei auf dem Zielsystem")
|
|
|
|
# Befehl: put_tags
|
|
put_tags_parser = subparsers.add_parser("put_tags", help="Tags schreiben")
|
|
put_tags_parser.add_argument("path", help="Pfad der Datei auf dem Zielsystem")
|
|
put_tags_parser.add_argument("json", help="JSON String (z.B. '{\"json\": {\"t\": \"Titel\"}}')")
|
|
put_tags_parser.add_argument("-o", "--overwrite", help="Alle bestehenden JSON-Tags vorher löschen", action="store_true")
|
|
# Befehl: rename
|
|
rename_parser = subparsers.add_parser("rename", help="Benennen Sie eine Datei oder einen Ordner auf dem Zielsystem um")
|
|
rename_parser.add_argument("source_path", help="Aktueller Pfad der Datei/des Ordners auf dem Zielsystem")
|
|
rename_parser.add_argument("dest_path", help="Neuer Pfad der Datei/des Ordners auf dem Zielsystem")
|
|
|
|
# Befehl: rm
|
|
rm_parser = subparsers.add_parser("rm", help="Entfernt eine Datei oder einen Ordner auf dem Zielsystem")
|
|
rm_parser.add_argument("path", help="Pfad auf dem Zielsystem")
|
|
|
|
# Befehl: stat
|
|
stat_parser = subparsers.add_parser("stat", help="Informationen zu einer Datei/Ordner")
|
|
stat_parser.add_argument("path", help="Pfad auf dem Zielsystem")
|
|
|
|
# Befehl: put_fw
|
|
put_fw_parser = subparsers.add_parser("put_fw", help="Firmware-Image auf den Controller hochladen")
|
|
put_fw_parser.add_argument("file_path", help="Pfad zur Firmware-Datei auf dem lokalen System")
|
|
|
|
# Befehl: confirm_fw
|
|
confirm_fw_parser = subparsers.add_parser("confirm_fw", help="Bestätigt ein als 'Testing' markiertes Firmware-Image, damit es beim permanent wird")
|
|
|
|
# Befehl: reboot
|
|
reboot_parser = subparsers.add_parser("reboot", help="Neustart des Controllers")
|
|
|
|
# Befehl: play
|
|
play_parser = subparsers.add_parser("play", help="Startet die Wiedergabe einer Datei")
|
|
play_parser.add_argument("path", help="Pfad der Datei auf dem Zielsystem")
|
|
play_parser.add_argument("-i", "--interrupt", help="Sofortige Wiedergabe (Interrupt)", action="store_true")
|
|
|
|
# Befehl: stop
|
|
stop_parser = subparsers.add_parser("stop", help="Stoppt die aktuelle Wiedergabe")
|
|
|
|
# Befehl: set
|
|
set_parser = subparsers.add_parser("set", help="System-Einstellung setzen")
|
|
set_parser.add_argument("key", help="Schlüssel (z.B. audio/vol)")
|
|
set_parser.add_argument("value", help="Wert")
|
|
|
|
# Befehl: get
|
|
get_parser = subparsers.add_parser("get", help="System-Einstellung auslesen")
|
|
get_parser.add_argument("key", help="Schlüssel (z.B. audio/vol)")
|
|
args = parser.parse_args()
|
|
|
|
if not args.command:
|
|
parser.print_help()
|
|
sys.exit(0)
|
|
|
|
from core.config import cfg
|
|
from core.utils import console, console_err
|
|
|
|
if args.config:
|
|
cfg.custom_path = args.config
|
|
|
|
if args.debug:
|
|
cfg.debug = True
|
|
|
|
console.print("[bold blue]Buzzer Tool v1.0[/bold blue]", justify="left")
|
|
|
|
settings = cfg.serial_settings
|
|
|
|
settings['debug'] = args.debug
|
|
|
|
# Überschreibe Einstellungen mit Kommandozeilenparametern, falls vorhanden
|
|
if args.port:
|
|
settings['port'] = args.port
|
|
if args.baud:
|
|
settings['baudrate'] = args.baud
|
|
if args.timeout:
|
|
settings['timeout'] = args.timeout
|
|
|
|
# Ausgabe der aktuellen Einstellungen
|
|
port = settings.get('port')
|
|
baud = settings.get('baudrate', 'N/A')
|
|
timeout = settings.get('timeout', 'N/A')
|
|
|
|
if not port:
|
|
console_err.print("[error]Fehler: Kein serieller Port angegeben.[/error]")
|
|
sys.exit(1)
|
|
|
|
if args.debug:
|
|
console.print(f" • Port: [info]{port}[/info]")
|
|
console.print(f" • Baud: [info]{baud}[/info]")
|
|
console.print(f" • Timeout: [info]{timeout:1.2f}s[/info]")
|
|
console.print("-" * 78)
|
|
|
|
from core.serial_conn import SerialBus
|
|
bus = SerialBus(settings)
|
|
|
|
try:
|
|
bus.open()
|
|
if args.command == "crc32":
|
|
from core.cmd.crc32 import crc32
|
|
cmd = crc32(bus)
|
|
result = cmd.get(args.path)
|
|
cmd.print(result, args.path)
|
|
elif args.command == "get_file":
|
|
from core.cmd.get_file import get_file
|
|
cmd = get_file(bus)
|
|
result = cmd.get(args.source_path, args.dest_path)
|
|
cmd.print(result)
|
|
elif args.command == "flash_info":
|
|
from core.cmd.flash_info import flash_info
|
|
cmd = flash_info(bus)
|
|
result = cmd.get()
|
|
cmd.print(result)
|
|
elif args.command == "fw_status":
|
|
from core.cmd.fw_status import fw_status
|
|
cmd = fw_status(bus)
|
|
result = cmd.get()
|
|
cmd.print(result)
|
|
elif args.command == "ls":
|
|
from core.cmd.list_dir import list_dir
|
|
cmd = list_dir(bus)
|
|
result = cmd.get(args.path, recursive=args.recursive)
|
|
cmd.print(result, args.path)
|
|
elif args.command == "proto":
|
|
from core.cmd.proto import proto
|
|
cmd = proto(bus)
|
|
result = cmd.get()
|
|
cmd.print(result)
|
|
elif args.command == "put_file":
|
|
from core.cmd.put_file import put_file
|
|
cmd = put_file(bus)
|
|
result = cmd.get(args.source_path, args.dest_path)
|
|
cmd.print(result)
|
|
elif args.command == "rename":
|
|
from core.cmd.rename import rename
|
|
cmd = rename(bus)
|
|
result = cmd.get(args.source_path, args.dest_path)
|
|
cmd.print(result)
|
|
elif args.command == "rm":
|
|
from core.cmd.rm import rm
|
|
cmd = rm(bus)
|
|
result = cmd.get(args.path)
|
|
cmd.print(result, args.path)
|
|
elif args.command == "stat":
|
|
from core.cmd.stat import stat
|
|
cmd = stat(bus)
|
|
result = cmd.get(args.path)
|
|
cmd.print(result, args.path)
|
|
elif args.command == "put_file":
|
|
from core.cmd.put_file import put_file
|
|
cmd = put_file(bus)
|
|
result = cmd.get(args.source_path, args.dest_path, cli_tags_json=args.tags)
|
|
cmd.print(result)
|
|
elif args.command == "get_tags":
|
|
from core.cmd.get_tags import get_tags
|
|
cmd = get_tags(bus)
|
|
result = cmd.get(args.path)
|
|
cmd.print(result, args.path)
|
|
elif args.command == "put_tags":
|
|
from core.cmd.put_tags import put_tags
|
|
cmd = put_tags(bus)
|
|
result = cmd.get(args.path, args.json, overwrite=args.overwrite)
|
|
cmd.print(result, args.path)
|
|
elif args.command == "confirm_fw":
|
|
from core.cmd.fw_confirm import fw_confirm
|
|
cmd = fw_confirm(bus)
|
|
result = cmd.get()
|
|
cmd.print(result)
|
|
elif args.command == "put_fw":
|
|
from core.cmd.put_fw import put_fw
|
|
cmd = put_fw(bus)
|
|
result = cmd.get(args.file_path)
|
|
cmd.print(result)
|
|
elif args.command == "reboot":
|
|
from core.cmd.reboot import reboot
|
|
cmd = reboot(bus)
|
|
result = cmd.get()
|
|
cmd.print(result)
|
|
elif args.command == "play":
|
|
from core.cmd.play import play
|
|
cmd = play(bus)
|
|
result = cmd.get(args.path, interrupt=args.interrupt)
|
|
cmd.print(result, args.path, interrupt=args.interrupt)
|
|
elif args.command == "stop":
|
|
from core.cmd.stop import stop
|
|
cmd = stop(bus)
|
|
result = cmd.get()
|
|
cmd.print(result)
|
|
elif args.command == "set":
|
|
from core.cmd.set_setting import set_setting
|
|
cmd = set_setting(bus)
|
|
result = cmd.get(args.key, args.value)
|
|
cmd.print(result, args.key, args.value)
|
|
elif args.command == "get":
|
|
from core.cmd.get_setting import get_setting
|
|
cmd = get_setting(bus)
|
|
result = cmd.get(args.key)
|
|
cmd.print(result, args.key)
|
|
finally:
|
|
bus.close()
|
|
|
|
except FileNotFoundError as e:
|
|
console_err.print(f"[error]Fehler: {e}[/error]")
|
|
sys.exit(1)
|
|
except (TimeoutError, IOError, ValueError) as e:
|
|
console_err.print(f"[bold red]KOMMUNIKATIONSFEHLER:[/bold red] [error_msg]{e}[/error_msg]")
|
|
sys.exit(1) # Beendet das Script mit Fehlercode 1 für Tests
|
|
except Exception as e:
|
|
# Hier fangen wir auch deinen neuen ControllerError ab
|
|
from core.serial_conn import ControllerError
|
|
if isinstance(e, ControllerError):
|
|
console_err.print(f"[bold red]CONTROLLER FEHLER:[/bold red] [error_msg]{e}[/error_msg]")
|
|
else:
|
|
console_err.print(f"[bold red]UNERWARTETER FEHLER:[/bold red] [error_msg]{e}[/error_msg]")
|
|
|
|
if args.debug:
|
|
import traceback
|
|
traceback.print_exc()
|
|
sys.exit(1)
|
|
|
|
if __name__ == "__main__":
|
|
main() |