import argparse import time import serial DEFAULT_SWEEP_CASES = [ (4096, 100, 100), (4096, 100, 1), (512, 400, 400), (1024, 200, 200), (2048, 100, 100), (4096, 500, 500), ] def wait_line(serial_port, timeout_s): deadline = time.monotonic() + timeout_s while time.monotonic() < deadline: raw = serial_port.readline() if not raw: continue line = raw.decode("utf-8", errors="ignore").strip() if line: return line return "" def parse_bench_result(line): parts = line.split(";") if len(parts) < 8 or parts[0] != "BENCH": return None try: return { "block": int(parts[1]), "count": int(parts[2]), "sync_every": int(parts[3]), "total_bytes": int(parts[4]), "sync_ms": int(parts[5]), "total_ms": int(parts[6]), "kib_s": int(parts[7]), } except ValueError: return None def run_bench(port, baudrate, block_size, count, sync_every, timeout_s, verbose=True): command = f"BENCH {block_size} {count} {sync_every}\n" with serial.Serial(port, baudrate, timeout=0.2, write_timeout=None) as serial_port: serial_port.dtr = True time.sleep(0.1) serial_port.reset_input_buffer() if verbose: print(f"Verbindung: {port} @ {baudrate}") print(f"Sende Befehl: {command.strip()}") serial_port.write(command.encode("utf-8")) serial_port.flush() bench_line = "" bench_result = None final_status = "" start = time.monotonic() while time.monotonic() - start < timeout_s: line = wait_line(serial_port, 0.5) if not line: continue if line.startswith("BENCH;"): candidate = parse_bench_result(line) if candidate is None: continue if ( candidate["block"] == block_size and candidate["count"] == count and candidate["sync_every"] == sync_every ): bench_line = line bench_result = candidate if verbose: print(f"Ergebnis: {bench_line}") elif verbose: print(f"Info: Fremdes BENCH-Ergebnis ignoriert: {line}") continue if line in {"OK", "ERR"}: final_status = line break if verbose: print(f"Info: {line}") if final_status != "OK": if verbose: print(f"Fehler: Kein OK erhalten (Status: '{final_status or 'timeout'}')") return 1, None if not bench_line: if verbose: print("Hinweis: Kein BENCH-Datensatz empfangen.") return 0, None result = bench_result if verbose and result is not None: print( "Zusammenfassung: " f"block={result['block']}, runs={result['count']}, sync_every={result['sync_every']}, " f"bytes={result['total_bytes']}, sync_ms={result['sync_ms']}, " f"total_ms={result['total_ms']}, speed={result['kib_s']} KiB/s" ) return 0, result def parse_case_string(case_text): cleaned = case_text.lower().replace(" ", "") for separator in ("x", ":", ","): cleaned = cleaned.replace(separator, ";") parts = cleaned.split(";") if len(parts) != 3: raise ValueError(f"Ungültiges Case-Format: '{case_text}'") block_size = max(int(parts[0]), 1) count = max(int(parts[1]), 1) sync_every = max(int(parts[2]), 1) return block_size, count, sync_every def run_sweep(port, baudrate, timeout_s, case_strings): if case_strings: cases = [parse_case_string(text) for text in case_strings] else: cases = DEFAULT_SWEEP_CASES print(f"Starte Sweep mit {len(cases)} Fällen ...") rows = [] for index, (block_size, count, sync_every) in enumerate(cases, start=1): print(f"[{index}/{len(cases)}] BENCH {block_size} {count} {sync_every}") status, result = run_bench( port, baudrate, block_size, count, sync_every, timeout_s, verbose=False, ) if status != 0 or result is None: rows.append({ "block": block_size, "count": count, "sync_every": sync_every, "kib_s": "FAIL", "total_ms": "-", "sync_ms": "-", }) continue rows.append({ "block": result["block"], "count": result["count"], "sync_every": result["sync_every"], "kib_s": result["kib_s"], "total_ms": result["total_ms"], "sync_ms": result["sync_ms"], }) print("\nErgebnis-Tabelle") print("block count sync_every speed(KiB/s) total_ms sync_ms") print("----- ----- ---------- ------------ -------- -------") for row in rows: print( f"{str(row['block']).rjust(5)} " f"{str(row['count']).rjust(5)} " f"{str(row['sync_every']).rjust(10)} " f"{str(row['kib_s']).rjust(12)} " f"{str(row['total_ms']).rjust(8)} " f"{str(row['sync_ms']).rjust(7)}" ) numeric_rows = [row for row in rows if isinstance(row["kib_s"], int)] if numeric_rows: best = max(numeric_rows, key=lambda row: row["kib_s"]) print( "\nBestes Ergebnis: " f"block={best['block']}, count={best['count']}, sync_every={best['sync_every']}, " f"speed={best['kib_s']} KiB/s" ) return 0 if numeric_rows else 1 if __name__ == "__main__": parser = argparse.ArgumentParser(description="LittleFS write benchmark über CDC-Protokoll") parser.add_argument("-p", "--port", required=True, help="Serieller Port (z.B. COM12)") parser.add_argument("-b", "--baud", type=int, default=2500000, help="Baudrate (Standard: 2500000)") parser.add_argument("--block", type=int, default=4096, help="Blockgröße in Bytes (Standard: 4096)") parser.add_argument("--count", type=int, default=100, help="Anzahl Writes (Standard: 100)") parser.add_argument( "--sync-every", type=int, default=100, help="fs_sync Intervall in Writes (Standard: 100, also nur am Ende)", ) parser.add_argument("--timeout", type=float, default=45.0, help="Timeout in Sekunden (Standard: 45)") parser.add_argument( "--sweep", action="store_true", help="Führt mehrere vordefinierte BENCH-Fälle nacheinander aus", ) parser.add_argument( "--case", action="append", default=[], help="Eigener Sweep-Fall als block,count,sync_every (mehrfach möglich)", ) args = parser.parse_args() if args.sweep: exit(run_sweep(args.port, args.baud, args.timeout, args.case)) status, _ = run_bench( args.port, args.baud, max(args.block, 1), max(args.count, 1), max(args.sync_every, 1), args.timeout, verbose=True, ) exit(status)