feat(modbus_tool): Add interactive file browser for firmware updates

- Implement a simple, curses-based file browser to allow selecting firmware files from the filesystem.
- The selected file path is used for the firmware update process.
- Fix a visual bug where the progress bar would not reach 100% upon completion.
- Remove a leftover  function that was causing a NameError.
This commit is contained in:
Eduard Iten 2025-07-01 22:15:44 +02:00
parent c2916662e2
commit 23b88ada83
1 changed files with 72 additions and 44 deletions

View File

@ -9,7 +9,7 @@ from pymodbus.client import ModbusSerialClient
from pymodbus.exceptions import ModbusException
# --- Register Definitions ---
# Input Registers
# (omitted for brevity, no changes here)
REG_INPUT_VALVE_STATE_MOVEMENT = 0x0000
REG_INPUT_MOTOR_CURRENT_MA = 0x0001
REG_INPUT_DIGITAL_INPUTS_STATE = 0x0020
@ -20,8 +20,6 @@ REG_INPUT_DEVICE_STATUS = 0x00F2
REG_INPUT_UPTIME_SECONDS_LOW = 0x00F3
REG_INPUT_UPTIME_SECONDS_HIGH = 0x00F4
REG_INPUT_FWU_LAST_CHUNK_CRC = 0x0100
# Holding Registers
REG_HOLDING_VALVE_COMMAND = 0x0000
REG_HOLDING_MAX_OPENING_TIME_S = 0x0001
REG_HOLDING_MAX_CLOSING_TIME_S = 0x0002
@ -33,6 +31,7 @@ REG_HOLDING_FWU_CHUNK_OFFSET_HIGH = 0x0102
REG_HOLDING_FWU_CHUNK_SIZE = 0x0103
REG_HOLDING_FWU_DATA_BUFFER = 0x0180
# --- Global State ---
stop_event = threading.Event()
client = None
@ -86,56 +85,38 @@ def firmware_update_thread(slave_id, filepath):
global update_status
with update_lock:
update_status = {"running": True, "message": "Starting update...", "progress": 0.0}
try:
with open(filepath, 'rb') as f:
firmware = f.read()
with open(filepath, 'rb') as f: firmware = f.read()
file_size = len(firmware)
chunk_size = 248 # Max payload size for write_registers is ~248 bytes
chunk_size = 248
offset = 0
while offset < file_size:
chunk = firmware[offset:offset + chunk_size]
with update_lock:
update_status["message"] = f"Sending chunk {offset//chunk_size + 1}/{(file_size + chunk_size - 1)//chunk_size}..."
update_status["progress"] = offset / file_size
# 1. Set offset and size
client.write_register(REG_HOLDING_FWU_CHUNK_OFFSET_LOW, offset & 0xFFFF, slave=slave_id)
client.write_register(REG_HOLDING_FWU_CHUNK_OFFSET_HIGH, (offset >> 16) & 0xFFFF, slave=slave_id)
client.write_register(REG_HOLDING_FWU_CHUNK_SIZE, len(chunk), slave=slave_id)
# 2. Write data buffer in smaller bursts to avoid timing issues
padded_chunk = chunk
if len(padded_chunk) % 2 != 0:
padded_chunk += b'\x00'
all_registers = [int.from_bytes(padded_chunk[i:i+2], 'big') for i in range(0, len(padded_chunk), 2)]
burst_size_regs = 16 # 32 bytes per burst
for i in range(0, len(all_registers), burst_size_regs):
reg_burst = all_registers[i:i + burst_size_regs]
padded_chunk = chunk + (b'\x00' if len(chunk) % 2 != 0 else b'')
registers = [int.from_bytes(padded_chunk[i:i+2], 'big') for i in range(0, len(padded_chunk), 2)]
burst_size_regs = 16
for i in range(0, len(registers), burst_size_regs):
reg_burst = registers[i:i + burst_size_regs]
start_addr = REG_HOLDING_FWU_DATA_BUFFER + i
client.write_registers(start_addr, reg_burst, slave=slave_id)
time.sleep(0.02) # Small delay between bursts
# 3. Read back CRC
time.sleep(0.1) # Give slave time to calculate
remote_crc = client.read_input_registers(REG_INPUT_FWU_LAST_CHUNK_CRC, count=1, slave=slave_id).registers[0]
# 4. Verify (not implemented in this simulation) and command write
client.write_register(REG_HOLDING_FWU_COMMAND, 1, slave=slave_id) # Command: Verify Chunk
time.sleep(0.02)
time.sleep(0.1)
client.read_input_registers(REG_INPUT_FWU_LAST_CHUNK_CRC, count=1, slave=slave_id)
client.write_register(REG_HOLDING_FWU_COMMAND, 1, slave=slave_id)
offset += len(chunk)
with update_lock: update_status["message"] = "Finalizing update..."
client.write_register(REG_HOLDING_FWU_COMMAND, 2, slave=slave_id) # Command: Finalize
with update_lock:
update_status["progress"] = 1.0
update_status["message"] = "Finalizing update..."
client.write_register(REG_HOLDING_FWU_COMMAND, 2, slave=slave_id)
time.sleep(1)
with update_lock: update_status["message"] = "Update complete! Slave is rebooting."
time.sleep(2)
except Exception as e:
with update_lock: update_status["message"] = f"Error: {e}"
time.sleep(3)
@ -143,10 +124,57 @@ def firmware_update_thread(slave_id, filepath):
with update_lock: update_status["running"] = False
def draw_button(stdscr, y, x, text, selected=False):
"""Draws a button, handling selection highlight."""
color = curses.color_pair(2) if selected else curses.color_pair(1)
stdscr.addstr(y, x, f" {' ' * len(text)} ", color)
button_width = len(text) + 2
stdscr.addstr(y, x, " " * button_width, color)
stdscr.addstr(y, x + 1, text, color)
def file_browser(stdscr):
"""A simple curses file browser."""
curses.curs_set(1)
path = os.getcwd()
selected_index = 0
while True:
stdscr.clear()
h, w = stdscr.getmaxyx()
stdscr.addstr(0, 0, f"Select Firmware File: {path}".ljust(w-1), curses.color_pair(2))
try:
items = sorted(os.listdir(path))
except OSError as e:
items = [f".. (Error: {e})"]
items.insert(0, "..")
for i, item_name in enumerate(items):
if i >= h - 2: break
display_name = item_name
if os.path.isdir(os.path.join(path, item_name)):
display_name += "/"
if i == selected_index:
stdscr.addstr(i + 1, 0, display_name, curses.color_pair(2))
else:
stdscr.addstr(i + 1, 0, display_name)
key = stdscr.getch()
if key == curses.KEY_UP:
selected_index = max(0, selected_index - 1)
elif key == curses.KEY_DOWN:
selected_index = min(len(items) - 1, selected_index + 1)
elif key == curses.KEY_ENTER or key in [10, 13]:
selected_item_path = os.path.join(path, items[selected_index])
if os.path.isdir(selected_item_path):
path = os.path.abspath(selected_item_path)
selected_index = 0
else:
return selected_item_path
elif key == 27: # ESC key
return None
def main_menu(stdscr, slave_id):
global status_data, update_status
curses.curs_set(0); stdscr.nodelay(1); stdscr.timeout(100)
@ -165,7 +193,6 @@ def main_menu(stdscr, slave_id):
with update_lock: is_updating = update_status["running"]
if is_updating:
# Update display only, no input handling
pass
elif input_mode:
if key in [10, 13]:
@ -197,13 +224,15 @@ def main_menu(stdscr, slave_id):
elif selected_option == "Set Watchdog":
input_mode, input_prompt, input_target_reg = True, "Enter Watchdog Timeout (s): ", REG_HOLDING_WATCHDOG_TIMEOUT_S
elif selected_option == "Firmware Update":
threading.Thread(target=firmware_update_thread, args=(slave_id, "firmware.bin"), daemon=True).start()
filepath = file_browser(stdscr)
if filepath:
threading.Thread(target=firmware_update_thread, args=(slave_id, filepath), daemon=True).start()
else:
message = "-> Firmware update cancelled."
stdscr.clear()
if is_updating:
with update_lock:
prog = update_status["progress"]
msg = update_status["message"]
with update_lock: prog, msg = update_status["progress"], update_status["message"]
stdscr.addstr(h // 2 - 1, w // 2 - 25, "FIRMWARE UPDATE IN PROGRESS", curses.A_BOLD | curses.color_pair(2))
stdscr.addstr(h // 2, w // 2 - 25, f"[{'#' * int(prog * 50):<50}] {prog:.0%}")
stdscr.addstr(h // 2 + 1, w // 2 - 25, msg.ljust(50))
@ -213,7 +242,6 @@ def main_menu(stdscr, slave_id):
else:
bold, normal = curses.color_pair(1) | curses.A_BOLD, curses.color_pair(1)
col1, col2, col3, col4 = 2, 30, 58, 88
# Status display lines...
stdscr.addstr(1, col1, "State:", bold); stdscr.addstr(1, col1 + 18, str(current_data.get('state', 'N/A')), normal)
stdscr.addstr(2, col1, "Movement:", bold); stdscr.addstr(2, col1 + 18, str(current_data.get('movement', 'N/A')), normal)
stdscr.addstr(3, col1, "Motor Current:", bold); stdscr.addstr(3, col1 + 18, str(current_data.get('motor_current', 'N/A')), normal)
@ -251,4 +279,4 @@ def main():
if client.is_socket_open(): client.close()
if __name__ == "__main__":
main()
main()