From 6dcb11ae0c78984f4996d2b61eedfa1a3d83197e Mon Sep 17 00:00:00 2001 From: Eduard Iten Date: Tue, 1 Jul 2025 21:17:30 +0200 Subject: [PATCH] fix(modbus_tool): Improve UI stability and readability - Refactor the curses drawing loop to be state-based, eliminating screen flicker after user input. - Add a helper function to format uptime from seconds into a human-readable string (d/h/m/s). --- software/tools/modbus_tool/modbus_tool.py | 207 ++++++++++++---------- 1 file changed, 111 insertions(+), 96 deletions(-) diff --git a/software/tools/modbus_tool/modbus_tool.py b/software/tools/modbus_tool/modbus_tool.py index 64c9473..856cb49 100755 --- a/software/tools/modbus_tool/modbus_tool.py +++ b/software/tools/modbus_tool/modbus_tool.py @@ -24,6 +24,30 @@ client = None status_data = {} status_lock = threading.Lock() +def format_uptime(seconds): + """Formats seconds into a human-readable d/h/m/s string.""" + if not isinstance(seconds, (int, float)) or seconds < 0: + return "N/A" + if seconds == 0: + return "0s" + + days, remainder = divmod(seconds, 86400) + hours, remainder = divmod(remainder, 3600) + minutes, secs = divmod(remainder, 60) + + parts = [] + if days > 0: + parts.append(f"{int(days)}d") + if hours > 0: + parts.append(f"{int(hours)}h") + if minutes > 0: + parts.append(f"{int(minutes)}m") + # Always show seconds if it's the only unit or if other units are present + if secs > 0 or not parts: + parts.append(f"{int(secs)}s") + + return " ".join(parts) + def poll_status(slave_id, interval): """Periodically polls the status of the node and updates the global status_data dict.""" global status_data @@ -53,9 +77,9 @@ def poll_status(slave_id, interval): fw_patch = rr_sys.registers[1] uptime_low = rr_sys.registers[3] uptime_high = rr_sys.registers[4] - uptime = (uptime_high << 16) | uptime_low + uptime_seconds = (uptime_high << 16) | uptime_low new_data["firmware"] = f"v{fw_major}.{fw_minor}.{fw_patch}" - new_data["uptime"] = f"{uptime}s" + new_data["uptime"] = format_uptime(uptime_seconds) except ModbusException as e: new_data["error"] = f"Modbus Error: {e}" @@ -69,141 +93,132 @@ def poll_status(slave_id, interval): def draw_button(stdscr, y, x, text, selected=False): """Draws a button with a border, handling selection highlight.""" button_width = len(text) + 4 - if selected: - color = curses.color_pair(2) # Highlighted (blue on white) - else: - color = curses.color_pair(1) # Normal (white on blue) + color = curses.color_pair(2) if selected else curses.color_pair(1) - # Draw the button background and text stdscr.addstr(y, x, " " * button_width, color) stdscr.addstr(y, x + 2, text, color) - # Draw the border with the same color attribute stdscr.addstr(y - 1, x, "┌" + "─" * (button_width - 2) + "┐", color) stdscr.addstr(y, x, "│", color) stdscr.addstr(y, x + button_width - 1, "│", color) stdscr.addstr(y + 1, x, "└" + "─" * (button_width - 2) + "┘", color) -def get_user_input(stdscr, h, w, prompt): - """Safely get user input in a curses window.""" - curses.echo() - curses.curs_set(1) - stdscr.nodelay(0) - input_line_y = h - 2 - stdscr.addstr(input_line_y, 0, " " * (w - 1), curses.color_pair(1)) - stdscr.addstr(input_line_y, 0, prompt, curses.color_pair(1) | curses.A_BOLD) - stdscr.refresh() - # Input should be on a white background for readability - input_win = curses.newwin(1, w - len(prompt) - 1, input_line_y, len(prompt)) - input_win.bkgd(' ', curses.color_pair(4)) # Black on white - input_str = input_win.getstr().decode('utf-8') - - stdscr.nodelay(1) - curses.noecho() - curses.curs_set(0) - return input_str - -def show_message(stdscr, h, w, message, is_error=False, duration=1.5): - """Display a message for a short duration.""" - msg_y = h - 2 - color = curses.color_pair(3) if is_error else curses.color_pair(1) - stdscr.addstr(msg_y, 0, " " * (w - 1), color) - stdscr.addstr(msg_y, 0, message, color | curses.A_BOLD) - stdscr.refresh() - time.sleep(duration) - def main_menu(stdscr, slave_id): - """The main curses UI with a full blue background.""" + """The main curses UI with a flicker-free, state-based drawing loop.""" global status_data curses.curs_set(0) stdscr.nodelay(1) stdscr.timeout(100) - # Initialize colors + # --- Color Pairs --- curses.start_color() curses.init_pair(1, curses.COLOR_WHITE, curses.COLOR_BLUE) # Main: white on blue curses.init_pair(2, curses.COLOR_BLUE, curses.COLOR_WHITE) # Selected: blue on white curses.init_pair(3, curses.COLOR_RED, curses.COLOR_BLUE) # Error: red on blue - curses.init_pair(4, curses.COLOR_BLACK, curses.COLOR_WHITE) # Input: black on white - - # Set the background for the whole screen stdscr.bkgd(' ', curses.color_pair(1)) + # --- UI State --- menu = ["Open Valve", "Close Valve", "Stop Valve", "Set Max Opening Time", "Set Max Closing Time", "Exit"] current_row_idx = 0 + + # State for transient messages + message = "" + message_time = 0 + + # State for user input + input_mode = False + input_prompt = "" + input_str = "" + input_target_reg = 0 while not stop_event.is_set(): - stdscr.clear() # Clear screen with the background color h, w = stdscr.getmaxyx() + + # --- Handle Input and State Changes --- + key = stdscr.getch() - # --- Draw Status Area --- + if input_mode: + if key in [10, 13]: # Enter + try: + seconds = int(input_str) + client.write_register(input_target_reg, seconds, slave=slave_id) + message = f"-> Set time to {seconds}s" + except ValueError: + message = "-> Invalid input. Please enter a number." + except Exception as e: + message = f"-> Error: {e}" + message_time = time.time() + input_mode = False + input_str = "" + elif key == curses.KEY_BACKSPACE or key == 127: + input_str = input_str[:-1] + elif key != -1 and chr(key).isprintable(): + input_str += chr(key) + else: # Navigation mode + if key == curses.KEY_UP and current_row_idx > 0: + current_row_idx -= 1 + elif key == curses.KEY_DOWN and current_row_idx < len(menu) - 1: + current_row_idx += 1 + elif key == curses.KEY_ENTER or key in [10, 13]: + selected_option = menu[current_row_idx] + message_time = time.time() # Set time for all actions + + if selected_option == "Exit": + stop_event.set() + continue + elif selected_option == "Open Valve": + client.write_register(REG_HOLDING_VALVE_COMMAND, 1, slave=slave_id) + message = "-> Sent OPEN command" + elif selected_option == "Close Valve": + client.write_register(REG_HOLDING_VALVE_COMMAND, 2, slave=slave_id) + message = "-> Sent CLOSE command" + elif selected_option == "Stop Valve": + client.write_register(REG_HOLDING_VALVE_COMMAND, 0, slave=slave_id) + message = "-> Sent STOP command" + elif "Set Max" in selected_option: + input_mode = True + input_prompt = f"Enter new value for '{selected_option}' (seconds): " + input_target_reg = REG_HOLDING_MAX_OPENING_TIME_S if "Opening" in selected_option else REG_HOLDING_MAX_CLOSING_TIME_S + + # --- Drawing Logic (Single Source of Truth) --- + stdscr.clear() + + # 1. Draw Status Area with status_lock: current_data = status_data.copy() - + if current_data.get("error"): stdscr.addstr(0, 0, current_data["error"], curses.color_pair(3) | curses.A_BOLD) else: - # Use a single color pair, bold for labels bold = curses.color_pair(1) | curses.A_BOLD normal = curses.color_pair(1) col1_x, col2_x, col3_x = 2, 35, 70 - stdscr.addstr(1, col1_x, "Valve State:", bold) - stdscr.addstr(1, col1_x + 14, str(current_data.get('state', 'N/A')), normal) - stdscr.addstr(2, col1_x, "Movement:", bold) - stdscr.addstr(2, col1_x + 14, str(current_data.get('movement', 'N/A')), normal) - stdscr.addstr(3, col1_x, "Motor Current:", bold) - stdscr.addstr(3, col1_x + 14, str(current_data.get('motor_current', 'N/A')), normal) - - stdscr.addstr(1, col2_x, "Max Open Time:", bold) - stdscr.addstr(1, col2_x + 16, str(current_data.get('open_time', 'N/A')), normal) - stdscr.addstr(2, col2_x, "Max Close Time:", bold) - stdscr.addstr(2, col2_x + 16, str(current_data.get('close_time', 'N/A')), normal) - - stdscr.addstr(1, col3_x, "Firmware:", bold) - stdscr.addstr(1, col3_x + 11, str(current_data.get('firmware', 'N/A')), normal) - stdscr.addstr(2, col3_x, "Uptime:", bold) - stdscr.addstr(2, col3_x + 11, str(current_data.get('uptime', 'N/A')), normal) + stdscr.addstr(1, col1_x, "Valve State:", bold); stdscr.addstr(1, col1_x + 14, str(current_data.get('state', 'N/A')), normal) + stdscr.addstr(2, col1_x, "Movement:", bold); stdscr.addstr(2, col1_x + 14, str(current_data.get('movement', 'N/A')), normal) + stdscr.addstr(3, col1_x, "Motor Current:", bold); stdscr.addstr(3, col1_x + 14, str(current_data.get('motor_current', 'N/A')), normal) + stdscr.addstr(1, col2_x, "Max Open Time:", bold); stdscr.addstr(1, col2_x + 16, str(current_data.get('open_time', 'N/A')), normal) + stdscr.addstr(2, col2_x, "Max Close Time:", bold); stdscr.addstr(2, col2_x + 16, str(current_data.get('close_time', 'N/A')), normal) + stdscr.addstr(1, col3_x, "Firmware:", bold); stdscr.addstr(1, col3_x + 11, str(current_data.get('firmware', 'N/A')), normal) + stdscr.addstr(2, col3_x, "Uptime:", bold); stdscr.addstr(2, col3_x + 11, str(current_data.get('uptime', 'N/A')), normal) stdscr.addstr(5, 0, "─" * (w - 1), normal) - # --- Draw Menu Buttons --- + # 2. Draw Menu Buttons for idx, row in enumerate(menu): x = w // 2 - (len(row) + 4) // 2 - y = h // 2 - (len(menu)) + (idx * 3) + y = h // 2 - len(menu) + (idx * 3) draw_button(stdscr, y, x, row, idx == current_row_idx) + # 3. Draw Transient Message or Input Prompt + if time.time() - message_time < 2.0: + stdscr.addstr(h - 2, 0, message.ljust(w - 1), curses.color_pair(1) | curses.A_BOLD) + + if input_mode: + curses.curs_set(1) + stdscr.addstr(h - 2, 0, (input_prompt + input_str).ljust(w-1), curses.color_pair(2)) + stdscr.move(h - 2, len(input_prompt) + len(input_str)) + else: + curses.curs_set(0) + stdscr.refresh() - key = stdscr.getch() - - if key == curses.KEY_UP and current_row_idx > 0: - current_row_idx -= 1 - elif key == curses.KEY_DOWN and current_row_idx < len(menu) - 1: - current_row_idx += 1 - elif key == curses.KEY_ENTER or key in [10, 13]: - selected_option = menu[current_row_idx] - - if selected_option == "Exit": - stop_event.set() - break - elif selected_option == "Open Valve": - client.write_register(REG_HOLDING_VALVE_COMMAND, 1, slave=slave_id) - show_message(stdscr, h, w, "-> Sent OPEN command") - elif selected_option == "Close Valve": - client.write_register(REG_HOLDING_VALVE_COMMAND, 2, slave=slave_id) - show_message(stdscr, h, w, "-> Sent CLOSE command") - elif selected_option == "Stop Valve": - client.write_register(REG_HOLDING_VALVE_COMMAND, 0, slave=slave_id) - show_message(stdscr, h, w, "-> Sent STOP command") - elif "Set Max" in selected_option: - prompt = f"Enter new value for '{selected_option}' (seconds): " - input_str = get_user_input(stdscr, h, w, prompt) - try: - seconds = int(input_str) - reg = REG_HOLDING_MAX_OPENING_TIME_S if "Opening" in selected_option else REG_HOLDING_MAX_CLOSING_TIME_S - client.write_register(reg, seconds, slave=slave_id) - show_message(stdscr, h, w, f"-> Set '{selected_option}' to {seconds}s") - except ValueError: - show_message(stdscr, h, w, "-> Invalid input. Please enter a number.", is_error=True) - except Exception as e: - show_message(stdscr, h, w, f"-> Error: {e}", is_error=True) def main(): global client