From 95fd88e93eebed5b42a1cc96617756abd3c84957 Mon Sep 17 00:00:00 2001 From: Eduard Iten Date: Tue, 1 Jul 2025 21:36:28 +0200 Subject: [PATCH] feat(modbus_tool): Adapt UI to full register map - Update the TUI to display all new registers from the slave, including digital I/O and system status. - Add new menu buttons to control digital outputs and set the watchdog timer. - Add a placeholder button for the firmware update process. - Fix various bugs, including incorrect argument passing in Modbus calls and a module import error. --- software/tools/modbus_tool/modbus_tool.py | 220 +++++++++++----------- 1 file changed, 107 insertions(+), 113 deletions(-) diff --git a/software/tools/modbus_tool/modbus_tool.py b/software/tools/modbus_tool/modbus_tool.py index 856cb49..2a3a16d 100755 --- a/software/tools/modbus_tool/modbus_tool.py +++ b/software/tools/modbus_tool/modbus_tool.py @@ -6,19 +6,34 @@ import sys import curses from pymodbus.client import ModbusSerialClient from pymodbus.exceptions import ModbusException +import os -# Register Definitions +# --- Register Definitions --- +# Input Registers REG_INPUT_VALVE_STATE_MOVEMENT = 0x0000 REG_INPUT_MOTOR_CURRENT_MA = 0x0001 +REG_INPUT_DIGITAL_INPUTS_STATE = 0x0020 +REG_INPUT_BUTTON_EVENTS = 0x0021 REG_INPUT_FIRMWARE_VERSION_MAJOR_MINOR = 0x00F0 REG_INPUT_FIRMWARE_VERSION_PATCH = 0x00F1 +REG_INPUT_DEVICE_STATUS = 0x00F2 REG_INPUT_UPTIME_SECONDS_LOW = 0x00F3 REG_INPUT_UPTIME_SECONDS_HIGH = 0x00F4 +REG_INPUT_FWU_LAST_CHUNK_CRC = 0x0100 + +# Holding Registers REG_HOLDING_VALVE_COMMAND = 0x0000 REG_HOLDING_MAX_OPENING_TIME_S = 0x0001 REG_HOLDING_MAX_CLOSING_TIME_S = 0x0002 +REG_HOLDING_DIGITAL_OUTPUTS_STATE = 0x0010 +REG_HOLDING_WATCHDOG_TIMEOUT_S = 0x00F0 +REG_HOLDING_FWU_COMMAND = 0x0100 +REG_HOLDING_FWU_CHUNK_OFFSET_LOW = 0x0101 +REG_HOLDING_FWU_CHUNK_OFFSET_HIGH = 0x0102 +REG_HOLDING_FWU_CHUNK_SIZE = 0x0103 +REG_HOLDING_FWU_DATA_BUFFER = 0x0180 -# Global state +# --- Global State --- stop_event = threading.Event() client = None status_data = {} @@ -26,26 +41,16 @@ status_lock = threading.Lock() def format_uptime(seconds): """Formats seconds into a human-readable d/h/m/s string.""" - if not isinstance(seconds, (int, float)) or seconds < 0: - return "N/A" - if seconds == 0: - return "0s" - - days, remainder = divmod(seconds, 86400) - hours, remainder = divmod(remainder, 3600) - minutes, secs = divmod(remainder, 60) - + if not isinstance(seconds, (int, float)) or seconds < 0: return "N/A" + if seconds == 0: return "0s" + days, rem = divmod(seconds, 86400) + hours, rem = divmod(rem, 3600) + minutes, secs = divmod(rem, 60) parts = [] - if days > 0: - parts.append(f"{int(days)}d") - if hours > 0: - parts.append(f"{int(hours)}h") - if minutes > 0: - parts.append(f"{int(minutes)}m") - # Always show seconds if it's the only unit or if other units are present - if secs > 0 or not parts: - parts.append(f"{int(secs)}s") - + if days > 0: parts.append(f"{int(days)}d") + if hours > 0: parts.append(f"{int(hours)}h") + if minutes > 0: parts.append(f"{int(minutes)}m") + if secs > 0 or not parts: parts.append(f"{int(secs)}s") return " ".join(parts) def poll_status(slave_id, interval): @@ -54,32 +59,42 @@ def poll_status(slave_id, interval): while not stop_event.is_set(): new_data = {"error": None} try: - # Read all registers in a few calls - rr = client.read_input_registers(REG_INPUT_VALVE_STATE_MOVEMENT, count=2, slave=slave_id) - hr = client.read_holding_registers(REG_HOLDING_MAX_OPENING_TIME_S, count=2, slave=slave_id) - rr_sys = client.read_input_registers(REG_INPUT_FIRMWARE_VERSION_MAJOR_MINOR, count=5, slave=slave_id) + # Grouped reads for efficiency + ir_valve = client.read_input_registers(REG_INPUT_VALVE_STATE_MOVEMENT, count=2, slave=slave_id) + ir_dig = client.read_input_registers(REG_INPUT_DIGITAL_INPUTS_STATE, count=2, slave=slave_id) + ir_sys = client.read_input_registers(REG_INPUT_FIRMWARE_VERSION_MAJOR_MINOR, count=5, slave=slave_id) + hr_valve = client.read_holding_registers(REG_HOLDING_MAX_OPENING_TIME_S, count=2, slave=slave_id) + hr_dig = client.read_holding_registers(REG_HOLDING_DIGITAL_OUTPUTS_STATE, count=1, slave=slave_id) + hr_sys = client.read_holding_registers(REG_HOLDING_WATCHDOG_TIMEOUT_S, count=1, slave=slave_id) - if rr.isError(): raise ModbusException(f"reading valve status: {rr}") - if hr.isError(): raise ModbusException(f"reading holding registers: {hr}") - if rr_sys.isError(): raise ModbusException(f"reading system status: {rr_sys}") + # Check for errors + for res in [ir_valve, ir_dig, ir_sys, hr_valve, hr_dig, hr_sys]: + if res.isError(): raise ModbusException(str(res)) - valve_state_raw = rr.registers[0] + # --- Process Valve & Motor Data --- + valve_state_raw = ir_valve.registers[0] movement_map = {0: "Idle", 1: "Opening", 2: "Closing", 3: "Error"} state_map = {0: "Closed", 1: "Open"} new_data["movement"] = movement_map.get(valve_state_raw >> 8, 'Unknown') new_data["state"] = state_map.get(valve_state_raw & 0xFF, 'Unknown') - new_data["motor_current"] = f"{rr.registers[1]} mA" - new_data["open_time"] = f"{hr.registers[0]}s" - new_data["close_time"] = f"{hr.registers[1]}s" + new_data["motor_current"] = f"{ir_valve.registers[1]} mA" + new_data["open_time"] = f"{hr_valve.registers[0]}s" + new_data["close_time"] = f"{hr_valve.registers[1]}s" - fw_major = rr_sys.registers[0] >> 8 - fw_minor = rr_sys.registers[0] & 0xFF - fw_patch = rr_sys.registers[1] - uptime_low = rr_sys.registers[3] - uptime_high = rr_sys.registers[4] - uptime_seconds = (uptime_high << 16) | uptime_low + # --- Process Digital I/O --- + new_data["digital_inputs"] = f"0x{ir_dig.registers[0]:04X}" + new_data["button_events"] = f"0x{ir_dig.registers[1]:04X}" + new_data["digital_outputs"] = f"0x{hr_dig.registers[0]:04X}" + + # --- Process System Data --- + fw_major = ir_sys.registers[0] >> 8 + fw_minor = ir_sys.registers[0] & 0xFF + fw_patch = ir_sys.registers[1] + uptime_seconds = (ir_sys.registers[4] << 16) | ir_sys.registers[3] new_data["firmware"] = f"v{fw_major}.{fw_minor}.{fw_patch}" + new_data["device_status"] = "OK" if ir_sys.registers[2] == 0 else "ERROR" new_data["uptime"] = format_uptime(uptime_seconds) + new_data["watchdog"] = f"{hr_sys.registers[0]}s" except ModbusException as e: new_data["error"] = f"Modbus Error: {e}" @@ -94,7 +109,6 @@ def draw_button(stdscr, y, x, text, selected=False): """Draws a button with a border, handling selection highlight.""" button_width = len(text) + 4 color = curses.color_pair(2) if selected else curses.color_pair(1) - stdscr.addstr(y, x, " " * button_width, color) stdscr.addstr(y, x + 2, text, color) stdscr.addstr(y - 1, x, "┌" + "─" * (button_width - 2) + "┐", color) @@ -109,105 +123,90 @@ def main_menu(stdscr, slave_id): stdscr.nodelay(1) stdscr.timeout(100) - # --- Color Pairs --- curses.start_color() - curses.init_pair(1, curses.COLOR_WHITE, curses.COLOR_BLUE) # Main: white on blue - curses.init_pair(2, curses.COLOR_BLUE, curses.COLOR_WHITE) # Selected: blue on white - curses.init_pair(3, curses.COLOR_RED, curses.COLOR_BLUE) # Error: red on blue + curses.init_pair(1, curses.COLOR_WHITE, curses.COLOR_BLUE) + curses.init_pair(2, curses.COLOR_BLUE, curses.COLOR_WHITE) + curses.init_pair(3, curses.COLOR_RED, curses.COLOR_BLUE) stdscr.bkgd(' ', curses.color_pair(1)) - # --- UI State --- - menu = ["Open Valve", "Close Valve", "Stop Valve", "Set Max Opening Time", "Set Max Closing Time", "Exit"] + menu = ["Open Valve", "Close Valve", "Stop Valve", "Toggle Output 1", "Toggle Output 2", "Set Watchdog", "Firmware Update", "Exit"] current_row_idx = 0 - - # State for transient messages - message = "" - message_time = 0 - - # State for user input - input_mode = False - input_prompt = "" - input_str = "" - input_target_reg = 0 + message, message_time = "", 0 + input_mode, input_prompt, input_str, input_target_reg = False, "", "", 0 while not stop_event.is_set(): h, w = stdscr.getmaxyx() - - # --- Handle Input and State Changes --- key = stdscr.getch() if input_mode: if key in [10, 13]: # Enter try: - seconds = int(input_str) - client.write_register(input_target_reg, seconds, slave=slave_id) - message = f"-> Set time to {seconds}s" - except ValueError: - message = "-> Invalid input. Please enter a number." + value = int(input_str) + client.write_register(input_target_reg, value, slave=slave_id) + message = f"-> Set register 0x{input_target_reg:04X} to {value}" except Exception as e: message = f"-> Error: {e}" - message_time = time.time() - input_mode = False - input_str = "" + message_time, input_mode, input_str = time.time(), False, "" elif key == curses.KEY_BACKSPACE or key == 127: input_str = input_str[:-1] elif key != -1 and chr(key).isprintable(): input_str += chr(key) else: # Navigation mode - if key == curses.KEY_UP and current_row_idx > 0: - current_row_idx -= 1 - elif key == curses.KEY_DOWN and current_row_idx < len(menu) - 1: - current_row_idx += 1 + if key == curses.KEY_UP: current_row_idx = (current_row_idx - 1) % len(menu) + elif key == curses.KEY_DOWN: current_row_idx = (current_row_idx + 1) % len(menu) elif key == curses.KEY_ENTER or key in [10, 13]: selected_option = menu[current_row_idx] - message_time = time.time() # Set time for all actions + message_time = time.time() - if selected_option == "Exit": - stop_event.set() - continue - elif selected_option == "Open Valve": - client.write_register(REG_HOLDING_VALVE_COMMAND, 1, slave=slave_id) - message = "-> Sent OPEN command" - elif selected_option == "Close Valve": - client.write_register(REG_HOLDING_VALVE_COMMAND, 2, slave=slave_id) - message = "-> Sent CLOSE command" - elif selected_option == "Stop Valve": - client.write_register(REG_HOLDING_VALVE_COMMAND, 0, slave=slave_id) - message = "-> Sent STOP command" - elif "Set Max" in selected_option: - input_mode = True - input_prompt = f"Enter new value for '{selected_option}' (seconds): " - input_target_reg = REG_HOLDING_MAX_OPENING_TIME_S if "Opening" in selected_option else REG_HOLDING_MAX_CLOSING_TIME_S + if selected_option == "Exit": stop_event.set(); continue + elif selected_option == "Open Valve": client.write_register(REG_HOLDING_VALVE_COMMAND, 1, slave=slave_id); message = "-> Sent OPEN command" + elif selected_option == "Close Valve": client.write_register(REG_HOLDING_VALVE_COMMAND, 2, slave=slave_id); message = "-> Sent CLOSE command" + elif selected_option == "Stop Valve": client.write_register(REG_HOLDING_VALVE_COMMAND, 0, slave=slave_id); message = "-> Sent STOP command" + elif "Toggle Output" in selected_option: + bit = 0 if "1" in selected_option else 1 + try: + current_val = client.read_holding_registers(REG_HOLDING_DIGITAL_OUTPUTS_STATE, count=1, slave=slave_id).registers[0] + new_val = current_val ^ (1 << bit) + client.write_register(REG_HOLDING_DIGITAL_OUTPUTS_STATE, new_val, slave=slave_id) + message = f"-> Toggled Output {bit+1}" + except Exception as e: message = f"-> Error: {e}" + elif selected_option == "Set Watchdog": + input_mode, input_prompt, input_target_reg = True, "Enter Watchdog Timeout (s): ", REG_HOLDING_WATCHDOG_TIMEOUT_S + elif selected_option == "Firmware Update": + message = "-> Firmware update process not yet implemented." - # --- Drawing Logic (Single Source of Truth) --- stdscr.clear() - - # 1. Draw Status Area - with status_lock: - current_data = status_data.copy() + with status_lock: current_data = status_data.copy() if current_data.get("error"): stdscr.addstr(0, 0, current_data["error"], curses.color_pair(3) | curses.A_BOLD) else: - bold = curses.color_pair(1) | curses.A_BOLD - normal = curses.color_pair(1) - col1_x, col2_x, col3_x = 2, 35, 70 - stdscr.addstr(1, col1_x, "Valve State:", bold); stdscr.addstr(1, col1_x + 14, str(current_data.get('state', 'N/A')), normal) - stdscr.addstr(2, col1_x, "Movement:", bold); stdscr.addstr(2, col1_x + 14, str(current_data.get('movement', 'N/A')), normal) - stdscr.addstr(3, col1_x, "Motor Current:", bold); stdscr.addstr(3, col1_x + 14, str(current_data.get('motor_current', 'N/A')), normal) - stdscr.addstr(1, col2_x, "Max Open Time:", bold); stdscr.addstr(1, col2_x + 16, str(current_data.get('open_time', 'N/A')), normal) - stdscr.addstr(2, col2_x, "Max Close Time:", bold); stdscr.addstr(2, col2_x + 16, str(current_data.get('close_time', 'N/A')), normal) - stdscr.addstr(1, col3_x, "Firmware:", bold); stdscr.addstr(1, col3_x + 11, str(current_data.get('firmware', 'N/A')), normal) - stdscr.addstr(2, col3_x, "Uptime:", bold); stdscr.addstr(2, col3_x + 11, str(current_data.get('uptime', 'N/A')), normal) + bold, normal = curses.color_pair(1) | curses.A_BOLD, curses.color_pair(1) + # Status Area + col1, col2, col3, col4 = 2, 30, 58, 88 + stdscr.addstr(1, col1, "State:", bold); stdscr.addstr(1, col1 + 18, str(current_data.get('state', 'N/A')), normal) + stdscr.addstr(2, col1, "Movement:", bold); stdscr.addstr(2, col1 + 18, str(current_data.get('movement', 'N/A')), normal) + stdscr.addstr(3, col1, "Motor Current:", bold); stdscr.addstr(3, col1 + 18, str(current_data.get('motor_current', 'N/A')), normal) + + stdscr.addstr(1, col2, "Digital Inputs:", bold); stdscr.addstr(1, col2 + 18, str(current_data.get('digital_inputs', 'N/A')), normal) + stdscr.addstr(2, col2, "Digital Outputs:", bold); stdscr.addstr(2, col2 + 18, str(current_data.get('digital_outputs', 'N/A')), normal) + stdscr.addstr(3, col2, "Button Events:", bold); stdscr.addstr(3, col2 + 18, str(current_data.get('button_events', 'N/A')), normal) + + stdscr.addstr(1, col3, "Max Open Time:", bold); stdscr.addstr(1, col3 + 16, str(current_data.get('open_time', 'N/A')), normal) + stdscr.addstr(2, col3, "Max Close Time:", bold); stdscr.addstr(2, col3 + 16, str(current_data.get('close_time', 'N/A')), normal) + stdscr.addstr(3, col3, "Watchdog:", bold); stdscr.addstr(3, col3 + 16, str(current_data.get('watchdog', 'N/A')), normal) + + stdscr.addstr(1, col4, "Firmware:", bold); stdscr.addstr(1, col4 + 12, str(current_data.get('firmware', 'N/A')), normal) + stdscr.addstr(2, col4, "Uptime:", bold); stdscr.addstr(2, col4 + 12, str(current_data.get('uptime', 'N/A')), normal) + stdscr.addstr(3, col4, "Device Status:", bold); stdscr.addstr(3, col4 + 12, str(current_data.get('device_status', 'N/A')), normal) + stdscr.addstr(5, 0, "─" * (w - 1), normal) - # 2. Draw Menu Buttons for idx, row in enumerate(menu): x = w // 2 - (len(row) + 4) // 2 y = h // 2 - len(menu) + (idx * 3) draw_button(stdscr, y, x, row, idx == current_row_idx) - # 3. Draw Transient Message or Input Prompt if time.time() - message_time < 2.0: stdscr.addstr(h - 2, 0, message.ljust(w - 1), curses.color_pair(1) | curses.A_BOLD) @@ -231,14 +230,10 @@ def main(): client = ModbusSerialClient(port=args.port, baudrate=args.baud, stopbits=1, bytesize=8, parity="N", timeout=1) if not client.connect(): - print(f"Error: Failed to connect to serial port {args.port}") - sys.exit(1) + print(f"Error: Failed to connect to serial port {args.port}"); sys.exit(1) - print(f"Successfully connected to {args.port}. Starting UI...") - time.sleep(0.5) - - poll_thread = threading.Thread(target=poll_status, args=(args.slave_id, args.interval)) - poll_thread.daemon = True + print("Successfully connected. Starting UI..."); time.sleep(0.5) + poll_thread = threading.Thread(target=poll_status, args=(args.slave_id, args.interval), daemon=True) poll_thread.start() try: @@ -246,9 +241,8 @@ def main(): finally: stop_event.set() print("\nExiting...") - if client.is_socket_open(): - client.close() + if client.is_socket_open(): client.close() poll_thread.join(timeout=2) if __name__ == "__main__": - main() \ No newline at end of file + main()