From 23b88ada8310197ab97a8b104426ebc501facfa0 Mon Sep 17 00:00:00 2001 From: Eduard Iten Date: Tue, 1 Jul 2025 22:15:44 +0200 Subject: [PATCH] feat(modbus_tool): Add interactive file browser for firmware updates - Implement a simple, curses-based file browser to allow selecting firmware files from the filesystem. - The selected file path is used for the firmware update process. - Fix a visual bug where the progress bar would not reach 100% upon completion. - Remove a leftover function that was causing a NameError. --- software/tools/modbus_tool/modbus_tool.py | 116 ++++++++++++++-------- 1 file changed, 72 insertions(+), 44 deletions(-) diff --git a/software/tools/modbus_tool/modbus_tool.py b/software/tools/modbus_tool/modbus_tool.py index 3e4a9f5..f5e8b67 100755 --- a/software/tools/modbus_tool/modbus_tool.py +++ b/software/tools/modbus_tool/modbus_tool.py @@ -9,7 +9,7 @@ from pymodbus.client import ModbusSerialClient from pymodbus.exceptions import ModbusException # --- Register Definitions --- -# Input Registers +# (omitted for brevity, no changes here) REG_INPUT_VALVE_STATE_MOVEMENT = 0x0000 REG_INPUT_MOTOR_CURRENT_MA = 0x0001 REG_INPUT_DIGITAL_INPUTS_STATE = 0x0020 @@ -20,8 +20,6 @@ REG_INPUT_DEVICE_STATUS = 0x00F2 REG_INPUT_UPTIME_SECONDS_LOW = 0x00F3 REG_INPUT_UPTIME_SECONDS_HIGH = 0x00F4 REG_INPUT_FWU_LAST_CHUNK_CRC = 0x0100 - -# Holding Registers REG_HOLDING_VALVE_COMMAND = 0x0000 REG_HOLDING_MAX_OPENING_TIME_S = 0x0001 REG_HOLDING_MAX_CLOSING_TIME_S = 0x0002 @@ -33,6 +31,7 @@ REG_HOLDING_FWU_CHUNK_OFFSET_HIGH = 0x0102 REG_HOLDING_FWU_CHUNK_SIZE = 0x0103 REG_HOLDING_FWU_DATA_BUFFER = 0x0180 + # --- Global State --- stop_event = threading.Event() client = None @@ -86,56 +85,38 @@ def firmware_update_thread(slave_id, filepath): global update_status with update_lock: update_status = {"running": True, "message": "Starting update...", "progress": 0.0} - try: - with open(filepath, 'rb') as f: - firmware = f.read() - + with open(filepath, 'rb') as f: firmware = f.read() file_size = len(firmware) - chunk_size = 248 # Max payload size for write_registers is ~248 bytes + chunk_size = 248 offset = 0 - while offset < file_size: chunk = firmware[offset:offset + chunk_size] - with update_lock: update_status["message"] = f"Sending chunk {offset//chunk_size + 1}/{(file_size + chunk_size - 1)//chunk_size}..." update_status["progress"] = offset / file_size - - # 1. Set offset and size client.write_register(REG_HOLDING_FWU_CHUNK_OFFSET_LOW, offset & 0xFFFF, slave=slave_id) client.write_register(REG_HOLDING_FWU_CHUNK_OFFSET_HIGH, (offset >> 16) & 0xFFFF, slave=slave_id) client.write_register(REG_HOLDING_FWU_CHUNK_SIZE, len(chunk), slave=slave_id) - - # 2. Write data buffer in smaller bursts to avoid timing issues - padded_chunk = chunk - if len(padded_chunk) % 2 != 0: - padded_chunk += b'\x00' - - all_registers = [int.from_bytes(padded_chunk[i:i+2], 'big') for i in range(0, len(padded_chunk), 2)] - - burst_size_regs = 16 # 32 bytes per burst - for i in range(0, len(all_registers), burst_size_regs): - reg_burst = all_registers[i:i + burst_size_regs] + padded_chunk = chunk + (b'\x00' if len(chunk) % 2 != 0 else b'') + registers = [int.from_bytes(padded_chunk[i:i+2], 'big') for i in range(0, len(padded_chunk), 2)] + burst_size_regs = 16 + for i in range(0, len(registers), burst_size_regs): + reg_burst = registers[i:i + burst_size_regs] start_addr = REG_HOLDING_FWU_DATA_BUFFER + i client.write_registers(start_addr, reg_burst, slave=slave_id) - time.sleep(0.02) # Small delay between bursts - - # 3. Read back CRC - time.sleep(0.1) # Give slave time to calculate - remote_crc = client.read_input_registers(REG_INPUT_FWU_LAST_CHUNK_CRC, count=1, slave=slave_id).registers[0] - - # 4. Verify (not implemented in this simulation) and command write - client.write_register(REG_HOLDING_FWU_COMMAND, 1, slave=slave_id) # Command: Verify Chunk - + time.sleep(0.02) + time.sleep(0.1) + client.read_input_registers(REG_INPUT_FWU_LAST_CHUNK_CRC, count=1, slave=slave_id) + client.write_register(REG_HOLDING_FWU_COMMAND, 1, slave=slave_id) offset += len(chunk) - - with update_lock: update_status["message"] = "Finalizing update..." - client.write_register(REG_HOLDING_FWU_COMMAND, 2, slave=slave_id) # Command: Finalize + with update_lock: + update_status["progress"] = 1.0 + update_status["message"] = "Finalizing update..." + client.write_register(REG_HOLDING_FWU_COMMAND, 2, slave=slave_id) time.sleep(1) with update_lock: update_status["message"] = "Update complete! Slave is rebooting." time.sleep(2) - except Exception as e: with update_lock: update_status["message"] = f"Error: {e}" time.sleep(3) @@ -143,10 +124,57 @@ def firmware_update_thread(slave_id, filepath): with update_lock: update_status["running"] = False def draw_button(stdscr, y, x, text, selected=False): + """Draws a button, handling selection highlight.""" color = curses.color_pair(2) if selected else curses.color_pair(1) - stdscr.addstr(y, x, f" {' ' * len(text)} ", color) + button_width = len(text) + 2 + stdscr.addstr(y, x, " " * button_width, color) stdscr.addstr(y, x + 1, text, color) +def file_browser(stdscr): + """A simple curses file browser.""" + curses.curs_set(1) + path = os.getcwd() + selected_index = 0 + + while True: + stdscr.clear() + h, w = stdscr.getmaxyx() + stdscr.addstr(0, 0, f"Select Firmware File: {path}".ljust(w-1), curses.color_pair(2)) + + try: + items = sorted(os.listdir(path)) + except OSError as e: + items = [f".. (Error: {e})"] + + items.insert(0, "..") + + for i, item_name in enumerate(items): + if i >= h - 2: break + display_name = item_name + if os.path.isdir(os.path.join(path, item_name)): + display_name += "/" + + if i == selected_index: + stdscr.addstr(i + 1, 0, display_name, curses.color_pair(2)) + else: + stdscr.addstr(i + 1, 0, display_name) + + key = stdscr.getch() + + if key == curses.KEY_UP: + selected_index = max(0, selected_index - 1) + elif key == curses.KEY_DOWN: + selected_index = min(len(items) - 1, selected_index + 1) + elif key == curses.KEY_ENTER or key in [10, 13]: + selected_item_path = os.path.join(path, items[selected_index]) + if os.path.isdir(selected_item_path): + path = os.path.abspath(selected_item_path) + selected_index = 0 + else: + return selected_item_path + elif key == 27: # ESC key + return None + def main_menu(stdscr, slave_id): global status_data, update_status curses.curs_set(0); stdscr.nodelay(1); stdscr.timeout(100) @@ -165,7 +193,6 @@ def main_menu(stdscr, slave_id): with update_lock: is_updating = update_status["running"] if is_updating: - # Update display only, no input handling pass elif input_mode: if key in [10, 13]: @@ -197,13 +224,15 @@ def main_menu(stdscr, slave_id): elif selected_option == "Set Watchdog": input_mode, input_prompt, input_target_reg = True, "Enter Watchdog Timeout (s): ", REG_HOLDING_WATCHDOG_TIMEOUT_S elif selected_option == "Firmware Update": - threading.Thread(target=firmware_update_thread, args=(slave_id, "firmware.bin"), daemon=True).start() + filepath = file_browser(stdscr) + if filepath: + threading.Thread(target=firmware_update_thread, args=(slave_id, filepath), daemon=True).start() + else: + message = "-> Firmware update cancelled." stdscr.clear() if is_updating: - with update_lock: - prog = update_status["progress"] - msg = update_status["message"] + with update_lock: prog, msg = update_status["progress"], update_status["message"] stdscr.addstr(h // 2 - 1, w // 2 - 25, "FIRMWARE UPDATE IN PROGRESS", curses.A_BOLD | curses.color_pair(2)) stdscr.addstr(h // 2, w // 2 - 25, f"[{'#' * int(prog * 50):<50}] {prog:.0%}") stdscr.addstr(h // 2 + 1, w // 2 - 25, msg.ljust(50)) @@ -213,7 +242,6 @@ def main_menu(stdscr, slave_id): else: bold, normal = curses.color_pair(1) | curses.A_BOLD, curses.color_pair(1) col1, col2, col3, col4 = 2, 30, 58, 88 - # Status display lines... stdscr.addstr(1, col1, "State:", bold); stdscr.addstr(1, col1 + 18, str(current_data.get('state', 'N/A')), normal) stdscr.addstr(2, col1, "Movement:", bold); stdscr.addstr(2, col1 + 18, str(current_data.get('movement', 'N/A')), normal) stdscr.addstr(3, col1, "Motor Current:", bold); stdscr.addstr(3, col1 + 18, str(current_data.get('motor_current', 'N/A')), normal) @@ -251,4 +279,4 @@ def main(): if client.is_socket_open(): client.close() if __name__ == "__main__": - main() \ No newline at end of file + main()