Files
buzzer_2/firmware/fs/build_lfs_audio.py
2026-05-19 07:58:11 +02:00

285 lines
9.2 KiB
Python

#!/usr/bin/env python3
import argparse
import shutil
import subprocess
import tempfile
from pathlib import Path
import soundfile as sf
import yaml
from intelhex import IntelHex
from kokoro import KPipeline
from littlefs import LittleFS
DEFAULT_SAMPLE_RATE = 16000
DEFAULT_VOICE = "af_bella"
DEFAULT_BLOCK_SIZE = 4096
DEFAULT_BLOCK_COUNT = 2048
DEFAULT_READ_SIZE = 512
DEFAULT_LOOKAHEAD_SIZE = 256
DEFAULT_FILTERS = [
"highpass=f=120",
"lowpass=f=6000",
"acompressor=threshold=-18dB:ratio=3:attack=5:release=80",
"loudnorm=I=-16:TP=-1.0",
]
DEFAULT_SYS_PROMPTS = [
{"id": "404", "text": "No sound sample was found on the device."},
{"id": "update", "text": "Firmware updated. Awaiting confirmation."},
{"id": "confirm", "text": "State confirmed."},
{"id": "voltest", "text": "Volume test. This is a sample of the current volume level."},
]
def run_ffmpeg(cmd: list[str]) -> None:
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
def export_raw_pcm(input_path: Path, output_path: Path, sample_rate: int, filters: list[str]) -> None:
output_path.parent.mkdir(parents=True, exist_ok=True)
filter_str = ",".join(filters)
run_ffmpeg([
"ffmpeg",
"-y",
"-i",
str(input_path),
"-af",
filter_str,
"-ar",
str(sample_rate),
"-ac",
"1",
"-f",
"s16le",
"-acodec",
"pcm_s16le",
str(output_path),
])
def load_sys_prompts(yaml_path: Path | None) -> list[dict]:
if yaml_path is None:
return DEFAULT_SYS_PROMPTS
with open(yaml_path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f) or []
prompts: list[dict] = []
if isinstance(data, list):
for item in data:
if isinstance(item, dict) and item.get("id") and item.get("text"):
prompts.append({"id": item["id"], "text": item["text"]})
elif isinstance(data, dict):
for item in data.get("assets", []):
if isinstance(item, dict) and item.get("id") and item.get("text"):
prompts.append({"id": item["id"], "text": item["text"]})
return prompts
def build_sys_tts(prompts: list[dict], out_sys_dir: Path, sample_rate: int) -> int:
pipeline = KPipeline(lang_code="a")
generated = 0
with tempfile.TemporaryDirectory(prefix="tts_tmp_") as tmp_dir_name:
tmp_dir = Path(tmp_dir_name)
for prompt in prompts:
asset_id = prompt["id"]
text = prompt["text"]
tmp_wav = tmp_dir / f"{asset_id}.wav"
generator = pipeline(text, voice=DEFAULT_VOICE, speed=1.0)
for _, _, audio in generator:
sf.write(tmp_wav, audio, 24000)
break
out_file = out_sys_dir / asset_id
export_raw_pcm(tmp_wav, out_file, sample_rate=sample_rate, filters=DEFAULT_FILTERS)
generated += 1
return generated
def import_wavs_to_a(wav_dir: Path | None, out_a_dir: Path, sample_rate: int) -> int:
if wav_dir is None:
return 0
if not wav_dir.exists() or not wav_dir.is_dir():
raise FileNotFoundError(f"WAV directory not found: {wav_dir}")
count = 0
for wav_file in sorted(wav_dir.glob("*.wav")):
out_file = out_a_dir / wav_file.stem
export_raw_pcm(wav_file, out_file, sample_rate=sample_rate, filters=DEFAULT_FILTERS)
count += 1
return count
def add_files_recursive(fs: LittleFS, local_path: Path, lfs_path: str = "/") -> None:
if not local_path.exists():
return
for item in sorted(local_path.iterdir()):
target_path = f"{lfs_path.rstrip('/')}/{item.name}".replace("//", "/")
if item.is_file():
with open(item, "rb") as f:
with fs.open(target_path, "wb") as lfs_file:
lfs_file.write(f.read())
elif item.is_dir():
fs.mkdir(target_path)
add_files_recursive(fs, item, target_path)
def build_littlefs_image(source_folder: Path, block_size: int, block_count: int) -> tuple[LittleFS, int]:
fs = LittleFS(
block_size=block_size,
block_count=block_count,
read_size=DEFAULT_READ_SIZE,
prog_size=256,
lookahead_size=DEFAULT_LOOKAHEAD_SIZE,
cache_size=4096,
)
add_files_recursive(fs, source_folder)
used_blocks = 0
lfs_buffer = fs.context.buffer
for idx in range(block_count):
offset = idx * block_size
block_data = lfs_buffer[offset : offset + block_size]
if any(byte != 0xFF for byte in block_data):
used_blocks += 1
return fs, used_blocks
def build_littlefs_hex(source_folder: Path, output_hex: Path, block_size: int, block_count: int, start_addr: int) -> int:
fs, used_blocks = build_littlefs_image(source_folder, block_size, block_count)
ih = IntelHex()
lfs_buffer = fs.context.buffer
for idx in range(block_count):
offset = idx * block_size
block_data = lfs_buffer[offset : offset + block_size]
if any(byte != 0xFF for byte in block_data):
ih.frombytes(block_data, offset=start_addr + offset)
ih.tofile(str(output_hex), format="hex")
return used_blocks
def summarize_directory(directory: Path) -> tuple[int, int]:
if not directory.exists():
return 0, 0
file_count = 0
raw_bytes = 0
for file_path in directory.rglob("*"):
if file_path.is_file():
file_count += 1
raw_bytes += file_path.stat().st_size
return file_count, raw_bytes
def format_bytes(size: int) -> str:
units = ["B", "KiB", "MiB", "GiB"]
value = float(size)
for unit in units:
if value < 1024.0 or unit == units[-1]:
if unit == "B":
return f"{int(value)} {unit}"
return f"{value:.1f} {unit}"
value /= 1024.0
def report_directory_usage(label: str, directory: Path, block_size: int, block_count: int) -> None:
file_count, raw_bytes = summarize_directory(directory)
_, used_blocks = build_littlefs_image(directory, block_size, block_count)
fs_bytes = used_blocks * block_size
print(
f" {label}: files={file_count}, raw={format_bytes(raw_bytes)}, "
f"fs={format_bytes(fs_bytes)} ({used_blocks} blocks, standalone estimate)"
)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description=(
"Generate Bella-only TTS in /lfs/sys, optionally import WAVs into /lfs/a, "
"then create LittleFS HEX."
)
)
parser.add_argument("--wav-dir", default=None, help="Optional directory with WAV files for /lfs/a")
parser.add_argument("--sys-yaml", default=None, help="Optional YAML with system prompts (id/text)")
parser.add_argument("--sample-rate", type=int, default=DEFAULT_SAMPLE_RATE)
parser.add_argument("--block-size", type=int, default=DEFAULT_BLOCK_SIZE)
parser.add_argument("--block-count", type=int, default=DEFAULT_BLOCK_COUNT)
parser.add_argument("--start-addr", type=lambda x: int(x, 0), default=0x12000000)
parser.add_argument(
"--output-hex",
default=str(Path(__file__).resolve().parent / "lfs_external_flash.hex"),
help="Output path for generated HEX",
)
parser.add_argument(
"--keep-staging",
action="store_true",
help="Keep temporary staging directory for inspection",
)
return parser.parse_args()
def main() -> None:
args = parse_args()
fs_dir = Path(__file__).resolve().parent
output_hex = Path(args.output_hex).resolve()
wav_dir = Path(args.wav_dir).resolve() if args.wav_dir else None
sys_yaml = Path(args.sys_yaml).resolve() if args.sys_yaml else None
staging_root = fs_dir / ".tmp_lfs_build"
if staging_root.exists():
shutil.rmtree(staging_root)
# The LittleFS partition is mounted at /lfs in firmware, so image root
# must contain /sys and /a directly (not /lfs/sys and /lfs/a).
out_sys = staging_root / "sys"
out_a = staging_root / "a"
out_sys.mkdir(parents=True, exist_ok=True)
out_a.mkdir(parents=True, exist_ok=True)
prompts = load_sys_prompts(sys_yaml)
if not prompts:
raise RuntimeError("No system prompts available. Provide --sys-yaml or use defaults.")
tts_count = build_sys_tts(prompts, out_sys, args.sample_rate)
wav_count = import_wavs_to_a(wav_dir, out_a, args.sample_rate)
total_used_blocks = build_littlefs_hex(
source_folder=staging_root,
output_hex=output_hex,
block_size=args.block_size,
block_count=args.block_count,
start_addr=args.start_addr,
)
print(f"Done. TTS assets: {tts_count}, WAV imports: {wav_count}, HEX: {output_hex}")
print("Usage summary:")
report_directory_usage("sys", out_sys, args.block_size, args.block_count)
report_directory_usage("audio", out_a, args.block_size, args.block_count)
print(
f" total: raw={format_bytes(summarize_directory(staging_root)[1])}, "
f"fs={format_bytes(total_used_blocks * args.block_size)} "
f"({total_used_blocks} blocks in combined image)"
)
if not args.keep_staging and staging_root.exists():
shutil.rmtree(staging_root)
if __name__ == "__main__":
main()