pre uart exchange

This commit is contained in:
2026-03-04 16:32:51 +01:00
parent b665cb5def
commit 4f3fbff258
46 changed files with 2820 additions and 3186 deletions

View File

@@ -0,0 +1,53 @@
# tool/core/cmd/flash_info.py
import struct
from core.utils import console, console_err
from core.protocol import COMMANDS, ERRORS
class flash_info:
def __init__(self, bus):
self.bus = bus
def get(self):
import struct
self.bus.send_request(COMMANDS['get_flash_info'])
data = self.bus.receive_response(length=21)
if not data or data.get('type') == 'error':
return None
payload = data['data']
ext_block_size = struct.unpack('<I', payload[0:4])[0]
ext_total_blocks = struct.unpack('<I', payload[4:8])[0]
ext_free_blocks = struct.unpack('<I', payload[8:12])[0]
int_slot_size = struct.unpack('<I', payload[12:16])[0]
ext_page_size = struct.unpack('<H', payload[16:18])[0]
int_page_size = struct.unpack('<H', payload[18:20])[0]
max_path_len = payload[20]
result = {
'ext_block_size': ext_block_size,
'ext_total_blocks': ext_total_blocks,
'ext_free_blocks': ext_free_blocks,
'int_slot_size': int_slot_size,
'ext_page_size': ext_page_size,
'int_page_size': int_page_size,
'max_path_len': max_path_len,
'ext_total_size': ext_block_size * ext_total_blocks,
'ext_free_size': ext_block_size * ext_free_blocks,
'ext_used_size': ext_block_size * (ext_total_blocks - ext_free_blocks)
}
return result
def print(self, result):
if not result:
return
console.print(f"[info]Flash-Informationen:[/info]")
console.print(f" • [info]Externer Flash:[/info] {result['ext_total_size']/1024/1024:.2f} MB ({result['ext_total_blocks']} Blöcke à {result['ext_block_size']} Bytes)")
console.print(f" - Belegt: {result['ext_used_size']/1024/1024:.2f} MB ({result['ext_total_blocks'] - result['ext_free_blocks']} Blöcke)")
console.print(f" - Frei: {result['ext_free_size']/1024/1024:.2f} MB ({result['ext_free_blocks']} Blöcke)")
console.print(f" • [info]FW Flash Slot:[/info] {result['int_slot_size']/1024:.2f} KB")
console.print(f" • [info]EXTFLASH Seitengröße:[/info] {result['ext_page_size']} Bytes")
console.print(f" • [info]INTFLASH Seitengröße:[/info] {result['int_page_size']} Bytes")
console.print(f" • [info]Maximale Pfadlänge:[/info] {result['max_path_len']} Zeichen")

View File

@@ -0,0 +1,53 @@
# tool/core/cmd/fw_status.py
import struct
from core.utils import console, console_err
from core.protocol import COMMANDS, ERRORS
class fw_status:
def __init__(self, bus):
self.bus = bus
def get(self):
import struct
self.bus.send_request(COMMANDS['get_firmware_status'])
data = self.bus.receive_response(length=10)
if not data or data.get('type') == 'error':
return None
header = data['data']
status = header[0]
app_version_raw = struct.unpack('<I', header[1:5])[0]
ker_version_raw = struct.unpack('<I', header[5:9])[0]
str_len = header[9]
fw_string_bytes = self.bus.connection.read(str_len)
fw_string = fw_string_bytes.decode('utf-8')
result = {
'status': status,
'fw_version_raw': hex(app_version_raw),
'kernel_version_raw': hex(ker_version_raw),
'fw_major': (app_version_raw >> 24) & 0xFF,
'fw_minor': (app_version_raw >> 16) & 0xFF,
'fw_patch': (app_version_raw >> 8)& 0xFF,
'kernel_major': (ker_version_raw >> 16) & 0xFF,
'kernel_minor': (ker_version_raw >> 8) & 0xFF,
'kernel_patch': ker_version_raw & 0xFF,
'fw_string': fw_string,
'kernel_string': f"{(ker_version_raw >> 16) & 0xFF}.{(ker_version_raw >> 8) & 0xFF}.{ker_version_raw & 0xFF}"
}
return result
def print(self, result):
if not result:
return
status = "UNKNOWN"
if result['status'] == 0x00: status = "CONFIRMED"
elif result['status'] == 0x01: status = "PENDING"
elif result['status'] == 0x02: status = "TESTING"
console.print(f"[info]Firmware Status[/info] des Controllers ist [info]{status}[/info]:")
console.print(f" • Firmware: [info]{result['fw_string']}[/info] ({result['fw_major']}.{result['fw_minor']}.{result['fw_patch']})")
console.print(f" • Kernel: [info]{result['kernel_string']}[/info] ({result['kernel_major']}.{result['kernel_minor']}.{result['kernel_patch']})")

76
tool/core/cmd/get_file.py Normal file
View File

@@ -0,0 +1,76 @@
# tool/core/cmd/get_file.py
import struct
import zlib
from pathlib import Path
from core.utils import console, console_err
from core.protocol import COMMANDS
class get_file:
def __init__(self, bus):
self.bus = bus
def get(self, source_path: str, dest_path: str):
try:
p = Path(dest_path)
p.parent.mkdir(parents=True, exist_ok=True)
with open(p, 'wb') as f:
pass
except Exception as e:
console_err.print(f"Fehler: Kann Zieldatei nicht anlegen: {e}")
return None
source_path_bytes = source_path.encode('utf-8')
payload = struct.pack('B', len(source_path_bytes)) + source_path_bytes
device_file_crc = None
try:
self.bus.send_request(COMMANDS['crc_32'], payload)
crc_resp = self.bus.receive_response(length=4)
if crc_resp and crc_resp.get('type') == 'response':
device_file_crc = struct.unpack('<I', crc_resp['data'])[0]
except Exception:
device_file_crc = None
self.bus.send_request(COMMANDS['get_file'], payload)
stream_res = self.bus.receive_stream()
if not stream_res or stream_res.get('type') == 'error':
return None
file_data = stream_res['data']
remote_crc = stream_res['crc32']
if local_crc == remote_crc:
with open(p, 'wb') as f:
f.write(file_data)
success = True
else:
with open(p, 'wb') as f:
f.write(file_data)
success = False
return {
'success': success,
'source_path': source_path,
'dest_path': dest_path,
'crc32_remote': remote_crc,
'crc32_local': local_crc,
'crc32_device_file': device_file_crc,
'size': len(file_data)
}
def print(self, result):
if not result:
return
if result['success']:
console.print(f"✓ Datei [info]{result['source_path']}[/info] erfolgreich heruntergeladen.")
console.print(f" • Größe: [info]{result['size'] / 1024:.2f} KB[/info]")
else:
console_err.print(f"❌ CRC-FEHLER: Datei [error]{result['source_path']}[/error] wurde nicht korrekt empfangen!")
console.print(f" • Remote CRC: [info]{result['crc32_remote']:08X}[/info]")
console.print(f" • Local CRC: [info]{result['crc32_local']:08X}[/info]")
if result.get('crc32_device_file') is not None:
console.print(f" • Device CRC: [info]{result['crc32_device_file']:08X}[/info]")
console.print(f" • Zielpfad: [info]{result['dest_path']}[/info]")

71
tool/core/cmd/list_dir.py Normal file
View File

@@ -0,0 +1,71 @@
# tool/core/cmd/list_dir.py
import struct
from core.utils import console
from core.protocol import COMMANDS
class list_dir:
def __init__(self, bus):
self.bus = bus
def get(self, path: str, recursive: bool = False):
# Wir stellen sicher, dass der Pfad nicht leer ist und normalisieren ihn leicht
clean_path = path if path == "/" else path.rstrip('/')
path_bytes = clean_path.encode('utf-8')
payload = struct.pack('B', len(path_bytes)) + path_bytes
self.bus.send_request(COMMANDS['list_dir'], payload)
chunks = self.bus.receive_list()
if chunks is None:
return None
entries = []
for chunk in chunks:
if len(chunk) < 6: # Typ(1) + Size(4) + min 1 char Name
continue
is_dir = chunk[0] == 1
size = struct.unpack('<I', chunk[1:5])[0] if not is_dir else None
name = chunk[5:].decode('utf-8').rstrip('\x00')
entry = {
'name': name,
'is_dir': is_dir,
'size': size
}
if recursive and is_dir:
# Rekursiver Aufruf: Pfad sauber zusammenfügen
sub_path = f"{clean_path}/{name}"
entry['children'] = self.get(sub_path, recursive=True)
entries.append(entry)
return entries
def print(self, entries, path: str, prefix: str = ""):
if prefix == "":
console.print(f"Inhalt von [info]{path}[/info]:")
if not entries:
return
# Sortierung: Verzeichnisse zuerst
entries.sort(key=lambda x: (not x['is_dir'], x['name'].lower()))
for i, entry in enumerate(entries):
# Prüfen, ob es das letzte Element auf dieser Ebene ist
is_last = (i == len(entries) - 1)
connector = "" if is_last else ""
icon = "📁" if entry['is_dir'] else "📄"
size_str = f" ({entry['size']/1024:.2f} KB)" if entry['size'] is not None else ""
# Ausgabe der aktuellen Zeile
console.print(f"{prefix}{connector}{icon} [info]{entry['name']}[/info]{size_str}")
# Wenn Kinder vorhanden sind, rekursiv weiter
if 'children' in entry and entry['children']:
# Für die Kinder-Ebene das Prefix anpassen
extension = " " if is_last else ""
self.print(entry['children'], "", prefix=prefix + extension)

28
tool/core/cmd/proto.py Normal file
View File

@@ -0,0 +1,28 @@
# tool/core/cmd/proto.py
import struct
from core.utils import console, console_err
from core.protocol import COMMANDS, ERRORS
class proto:
def __init__(self, bus):
self.bus = bus
def get(self):
self.bus.send_request(COMMANDS['get_protocol_version'], None)
data = self.bus.receive_response(length=1)
if not data or data.get('type') == 'error':
return None
payload = data['data']
result = {
'protocol_version': payload[0]
}
return result
def print(self, result):
if not result:
return
protocol_version = result['protocol_version']
console.print(f"[title]Protokoll Version[/info] des Controllers ist [info]{protocol_version}[/info]:")

37
tool/core/cmd/rename.py Normal file
View File

@@ -0,0 +1,37 @@
# tool/core/cmd/rename.py
import struct
from core.utils import console, console_err
from core.protocol import COMMANDS, ERRORS
class rename:
def __init__(self, bus):
self.bus = bus
def get(self, source_path: str, dest_path: str):
source_path_bytes = source_path.encode('utf-8')
dest_path_bytes = dest_path.encode('utf-8')
payload = struct.pack('B', len(source_path_bytes)) + source_path_bytes
payload += struct.pack('B', len(dest_path_bytes)) + dest_path_bytes
self.bus.send_request(COMMANDS['rename'], payload)
data = self.bus.receive_ack()
if not data or data.get('type') == 'error':
return None
return {
'success': data.get('type') == 'ack',
'source_path': source_path,
'dest_path': dest_path
}
def print(self, result):
if not result or not result.get('success'):
return
console.print(
f"Pfad [info]{result['source_path']}[/info] wurde erfolgreich in "
f"[info]{result['dest_path']}[/info] umbenannt."
)

32
tool/core/cmd/rm.py Normal file
View File

@@ -0,0 +1,32 @@
# tool/core/cmd/rm.py
import struct
from core.utils import console, console_err
from core.protocol import COMMANDS, ERRORS
class rm:
def __init__(self, bus):
self.bus = bus
def get(self, path: str):
path_bytes = path.encode('utf-8')
payload = struct.pack('B', len(path_bytes)) + path_bytes
self.bus.send_request(COMMANDS['rm'], payload)
# 1 Byte Type + 4 Byte Size = 5
data = self.bus.receive_ack()
if not data or data.get('type') == 'error':
return None
if data.get('type') == 'ack':
return True
return False
def print(self, result, path: str):
if result is None:
console_err.print(f"Fehler: Pfad [error]{path}[/error] konnte nicht entfernt werden.")
elif result is False:
console_err.print(f"Fehler: Pfad [error]{path}[/error] existiert nicht oder konnte nicht entfernt werden.")
else:
console.print(f"Pfad [info]{path}[/info] wurde erfolgreich entfernt.")

36
tool/core/cmd/stat.py Normal file
View File

@@ -0,0 +1,36 @@
# tool/core/cmd/stat.py
import struct
from core.utils import console, console_err
from core.protocol import COMMANDS, ERRORS
class stat:
def __init__(self, bus):
self.bus = bus
def get(self, path: str):
path_bytes = path.encode('utf-8')
payload = struct.pack('B', len(path_bytes)) + path_bytes
self.bus.send_request(COMMANDS['stat'], payload)
# 1 Byte Type + 4 Byte Size = 5
data = self.bus.receive_response(length=5)
if not data or data.get('type') == 'error':
return None
payload = data['data']
result = {
'is_directory': payload[0] == 1,
'size': struct.unpack('<I', payload[1:5])[0]
}
return result
def print(self, result, path: str):
if not result:
return
t_name = "📁 Verzeichnis" if result['is_directory'] else "📄 Datei"
console.print(f"[info_title]Stat[/info_title] für [info]{path}[/info]:")
console.print(f" • Typ: [info]{t_name}[/info]")
if not result['is_directory']:
console.print(f" • Grösse: [info]{result['size']/1024:.2f} KB[/info] ({result['size']} Bytes)")