#!/usr/bin/env python3 import argparse import shutil import subprocess import tempfile from pathlib import Path import soundfile as sf import yaml from intelhex import IntelHex from kokoro import KPipeline from littlefs import LittleFS DEFAULT_SAMPLE_RATE = 16000 DEFAULT_VOICE = "af_bella" DEFAULT_BLOCK_SIZE = 4096 DEFAULT_BLOCK_COUNT = 2048 DEFAULT_READ_SIZE = 512 DEFAULT_LOOKAHEAD_SIZE = 256 DEFAULT_FILTERS = [ "highpass=f=120", "lowpass=f=6000", "acompressor=threshold=-18dB:ratio=3:attack=5:release=80", "loudnorm=I=-16:TP=-1.0", ] DEFAULT_SYS_PROMPTS = [ {"id": "404", "text": "No sound sample was found on the device."}, {"id": "update", "text": "Firmware updated. Awaiting confirmation."}, {"id": "confirm", "text": "State confirmed."}, {"id": "voltest", "text": "Volume test. This is a sample of the current volume level."}, ] def run_ffmpeg(cmd: list[str]) -> None: subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) def export_raw_pcm(input_path: Path, output_path: Path, sample_rate: int, filters: list[str]) -> None: output_path.parent.mkdir(parents=True, exist_ok=True) filter_str = ",".join(filters) run_ffmpeg([ "ffmpeg", "-y", "-i", str(input_path), "-af", filter_str, "-ar", str(sample_rate), "-ac", "1", "-f", "s16le", "-acodec", "pcm_s16le", str(output_path), ]) def load_sys_prompts(yaml_path: Path | None) -> list[dict]: if yaml_path is None: return DEFAULT_SYS_PROMPTS with open(yaml_path, "r", encoding="utf-8") as f: data = yaml.safe_load(f) or [] prompts: list[dict] = [] if isinstance(data, list): for item in data: if isinstance(item, dict) and item.get("id") and item.get("text"): prompts.append({"id": item["id"], "text": item["text"]}) elif isinstance(data, dict): for item in data.get("assets", []): if isinstance(item, dict) and item.get("id") and item.get("text"): prompts.append({"id": item["id"], "text": item["text"]}) return prompts def build_sys_tts(prompts: list[dict], out_sys_dir: Path, sample_rate: int) -> int: pipeline = KPipeline(lang_code="a") generated = 0 with tempfile.TemporaryDirectory(prefix="tts_tmp_") as tmp_dir_name: tmp_dir = Path(tmp_dir_name) for prompt in prompts: asset_id = prompt["id"] text = prompt["text"] tmp_wav = tmp_dir / f"{asset_id}.wav" generator = pipeline(text, voice=DEFAULT_VOICE, speed=1.0) for _, _, audio in generator: sf.write(tmp_wav, audio, 24000) break out_file = out_sys_dir / asset_id export_raw_pcm(tmp_wav, out_file, sample_rate=sample_rate, filters=DEFAULT_FILTERS) generated += 1 return generated def import_wavs_to_a(wav_dir: Path | None, out_a_dir: Path, sample_rate: int) -> int: if wav_dir is None: return 0 if not wav_dir.exists() or not wav_dir.is_dir(): raise FileNotFoundError(f"WAV directory not found: {wav_dir}") count = 0 for wav_file in sorted(wav_dir.glob("*.wav")): out_file = out_a_dir / wav_file.stem export_raw_pcm(wav_file, out_file, sample_rate=sample_rate, filters=DEFAULT_FILTERS) count += 1 return count def add_files_recursive(fs: LittleFS, local_path: Path, lfs_path: str = "/") -> None: if not local_path.exists(): return for item in sorted(local_path.iterdir()): target_path = f"{lfs_path.rstrip('/')}/{item.name}".replace("//", "/") if item.is_file(): with open(item, "rb") as f: with fs.open(target_path, "wb") as lfs_file: lfs_file.write(f.read()) elif item.is_dir(): fs.mkdir(target_path) add_files_recursive(fs, item, target_path) def build_littlefs_image(source_folder: Path, block_size: int, block_count: int) -> tuple[LittleFS, int]: fs = LittleFS( block_size=block_size, block_count=block_count, read_size=DEFAULT_READ_SIZE, prog_size=256, lookahead_size=DEFAULT_LOOKAHEAD_SIZE, cache_size=4096, ) add_files_recursive(fs, source_folder) used_blocks = 0 lfs_buffer = fs.context.buffer for idx in range(block_count): offset = idx * block_size block_data = lfs_buffer[offset : offset + block_size] if any(byte != 0xFF for byte in block_data): used_blocks += 1 return fs, used_blocks def build_littlefs_hex(source_folder: Path, output_hex: Path, block_size: int, block_count: int, start_addr: int) -> int: fs, used_blocks = build_littlefs_image(source_folder, block_size, block_count) ih = IntelHex() lfs_buffer = fs.context.buffer for idx in range(block_count): offset = idx * block_size block_data = lfs_buffer[offset : offset + block_size] if any(byte != 0xFF for byte in block_data): ih.frombytes(block_data, offset=start_addr + offset) ih.tofile(str(output_hex), format="hex") return used_blocks def summarize_directory(directory: Path) -> tuple[int, int]: if not directory.exists(): return 0, 0 file_count = 0 raw_bytes = 0 for file_path in directory.rglob("*"): if file_path.is_file(): file_count += 1 raw_bytes += file_path.stat().st_size return file_count, raw_bytes def format_bytes(size: int) -> str: units = ["B", "KiB", "MiB", "GiB"] value = float(size) for unit in units: if value < 1024.0 or unit == units[-1]: if unit == "B": return f"{int(value)} {unit}" return f"{value:.1f} {unit}" value /= 1024.0 def report_directory_usage(label: str, directory: Path, block_size: int, block_count: int) -> None: file_count, raw_bytes = summarize_directory(directory) _, used_blocks = build_littlefs_image(directory, block_size, block_count) fs_bytes = used_blocks * block_size print( f" {label}: files={file_count}, raw={format_bytes(raw_bytes)}, " f"fs={format_bytes(fs_bytes)} ({used_blocks} blocks, standalone estimate)" ) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description=( "Generate Bella-only TTS in /lfs/sys, optionally import WAVs into /lfs/a, " "then create LittleFS HEX." ) ) parser.add_argument("--wav-dir", default=None, help="Optional directory with WAV files for /lfs/a") parser.add_argument("--sys-yaml", default=None, help="Optional YAML with system prompts (id/text)") parser.add_argument("--sample-rate", type=int, default=DEFAULT_SAMPLE_RATE) parser.add_argument("--block-size", type=int, default=DEFAULT_BLOCK_SIZE) parser.add_argument("--block-count", type=int, default=DEFAULT_BLOCK_COUNT) parser.add_argument("--start-addr", type=lambda x: int(x, 0), default=0x12000000) parser.add_argument( "--output-hex", default=str(Path(__file__).resolve().parent / "lfs_external_flash.hex"), help="Output path for generated HEX", ) parser.add_argument( "--keep-staging", action="store_true", help="Keep temporary staging directory for inspection", ) return parser.parse_args() def main() -> None: args = parse_args() fs_dir = Path(__file__).resolve().parent output_hex = Path(args.output_hex).resolve() wav_dir = Path(args.wav_dir).resolve() if args.wav_dir else None sys_yaml = Path(args.sys_yaml).resolve() if args.sys_yaml else None staging_root = fs_dir / ".tmp_lfs_build" if staging_root.exists(): shutil.rmtree(staging_root) out_sys = staging_root / "lfs" / "sys" out_a = staging_root / "lfs" / "a" out_sys.mkdir(parents=True, exist_ok=True) out_a.mkdir(parents=True, exist_ok=True) prompts = load_sys_prompts(sys_yaml) if not prompts: raise RuntimeError("No system prompts available. Provide --sys-yaml or use defaults.") tts_count = build_sys_tts(prompts, out_sys, args.sample_rate) wav_count = import_wavs_to_a(wav_dir, out_a, args.sample_rate) total_used_blocks = build_littlefs_hex( source_folder=staging_root, output_hex=output_hex, block_size=args.block_size, block_count=args.block_count, start_addr=args.start_addr, ) print(f"Done. TTS assets: {tts_count}, WAV imports: {wav_count}, HEX: {output_hex}") print("Usage summary:") report_directory_usage("sys", out_sys, args.block_size, args.block_count) report_directory_usage("audio", out_a, args.block_size, args.block_count) print( f" total: raw={format_bytes(summarize_directory(staging_root)[1])}, " f"fs={format_bytes(total_used_blocks * args.block_size)} " f"({total_used_blocks} blocks in combined image)" ) if not args.keep_staging and staging_root.exists(): shutil.rmtree(staging_root) if __name__ == "__main__": main()