#!/usr/bin/env python3 import argparse import threading import time import sys import curses from pymodbus.client import ModbusSerialClient from pymodbus.exceptions import ModbusException # Register Definitions REG_INPUT_VALVE_STATE_MOVEMENT = 0x0000 REG_INPUT_MOTOR_CURRENT_MA = 0x0001 REG_INPUT_FIRMWARE_VERSION_MAJOR_MINOR = 0x00F0 REG_INPUT_FIRMWARE_VERSION_PATCH = 0x00F1 REG_INPUT_UPTIME_SECONDS_LOW = 0x00F3 REG_INPUT_UPTIME_SECONDS_HIGH = 0x00F4 REG_HOLDING_VALVE_COMMAND = 0x0000 REG_HOLDING_MAX_OPENING_TIME_S = 0x0001 REG_HOLDING_MAX_CLOSING_TIME_S = 0x0002 # Global state stop_event = threading.Event() client = None status_data = {} status_lock = threading.Lock() def format_uptime(seconds): """Formats seconds into a human-readable d/h/m/s string.""" if not isinstance(seconds, (int, float)) or seconds < 0: return "N/A" if seconds == 0: return "0s" days, remainder = divmod(seconds, 86400) hours, remainder = divmod(remainder, 3600) minutes, secs = divmod(remainder, 60) parts = [] if days > 0: parts.append(f"{int(days)}d") if hours > 0: parts.append(f"{int(hours)}h") if minutes > 0: parts.append(f"{int(minutes)}m") # Always show seconds if it's the only unit or if other units are present if secs > 0 or not parts: parts.append(f"{int(secs)}s") return " ".join(parts) def poll_status(slave_id, interval): """Periodically polls the status of the node and updates the global status_data dict.""" global status_data while not stop_event.is_set(): new_data = {"error": None} try: # Read all registers in a few calls rr = client.read_input_registers(REG_INPUT_VALVE_STATE_MOVEMENT, count=2, slave=slave_id) hr = client.read_holding_registers(REG_HOLDING_MAX_OPENING_TIME_S, count=2, slave=slave_id) rr_sys = client.read_input_registers(REG_INPUT_FIRMWARE_VERSION_MAJOR_MINOR, count=5, slave=slave_id) if rr.isError(): raise ModbusException(f"reading valve status: {rr}") if hr.isError(): raise ModbusException(f"reading holding registers: {hr}") if rr_sys.isError(): raise ModbusException(f"reading system status: {rr_sys}") valve_state_raw = rr.registers[0] movement_map = {0: "Idle", 1: "Opening", 2: "Closing", 3: "Error"} state_map = {0: "Closed", 1: "Open"} new_data["movement"] = movement_map.get(valve_state_raw >> 8, 'Unknown') new_data["state"] = state_map.get(valve_state_raw & 0xFF, 'Unknown') new_data["motor_current"] = f"{rr.registers[1]} mA" new_data["open_time"] = f"{hr.registers[0]}s" new_data["close_time"] = f"{hr.registers[1]}s" fw_major = rr_sys.registers[0] >> 8 fw_minor = rr_sys.registers[0] & 0xFF fw_patch = rr_sys.registers[1] uptime_low = rr_sys.registers[3] uptime_high = rr_sys.registers[4] uptime_seconds = (uptime_high << 16) | uptime_low new_data["firmware"] = f"v{fw_major}.{fw_minor}.{fw_patch}" new_data["uptime"] = format_uptime(uptime_seconds) except ModbusException as e: new_data["error"] = f"Modbus Error: {e}" except Exception as e: new_data["error"] = f"Unexpected Error: {e}" with status_lock: status_data = new_data time.sleep(interval) def draw_button(stdscr, y, x, text, selected=False): """Draws a button with a border, handling selection highlight.""" button_width = len(text) + 4 color = curses.color_pair(2) if selected else curses.color_pair(1) stdscr.addstr(y, x, " " * button_width, color) stdscr.addstr(y, x + 2, text, color) stdscr.addstr(y - 1, x, "┌" + "─" * (button_width - 2) + "┐", color) stdscr.addstr(y, x, "│", color) stdscr.addstr(y, x + button_width - 1, "│", color) stdscr.addstr(y + 1, x, "└" + "─" * (button_width - 2) + "┘", color) def main_menu(stdscr, slave_id): """The main curses UI with a flicker-free, state-based drawing loop.""" global status_data curses.curs_set(0) stdscr.nodelay(1) stdscr.timeout(100) # --- Color Pairs --- curses.start_color() curses.init_pair(1, curses.COLOR_WHITE, curses.COLOR_BLUE) # Main: white on blue curses.init_pair(2, curses.COLOR_BLUE, curses.COLOR_WHITE) # Selected: blue on white curses.init_pair(3, curses.COLOR_RED, curses.COLOR_BLUE) # Error: red on blue stdscr.bkgd(' ', curses.color_pair(1)) # --- UI State --- menu = ["Open Valve", "Close Valve", "Stop Valve", "Set Max Opening Time", "Set Max Closing Time", "Exit"] current_row_idx = 0 # State for transient messages message = "" message_time = 0 # State for user input input_mode = False input_prompt = "" input_str = "" input_target_reg = 0 while not stop_event.is_set(): h, w = stdscr.getmaxyx() # --- Handle Input and State Changes --- key = stdscr.getch() if input_mode: if key in [10, 13]: # Enter try: seconds = int(input_str) client.write_register(input_target_reg, seconds, slave=slave_id) message = f"-> Set time to {seconds}s" except ValueError: message = "-> Invalid input. Please enter a number." except Exception as e: message = f"-> Error: {e}" message_time = time.time() input_mode = False input_str = "" elif key == curses.KEY_BACKSPACE or key == 127: input_str = input_str[:-1] elif key != -1 and chr(key).isprintable(): input_str += chr(key) else: # Navigation mode if key == curses.KEY_UP and current_row_idx > 0: current_row_idx -= 1 elif key == curses.KEY_DOWN and current_row_idx < len(menu) - 1: current_row_idx += 1 elif key == curses.KEY_ENTER or key in [10, 13]: selected_option = menu[current_row_idx] message_time = time.time() # Set time for all actions if selected_option == "Exit": stop_event.set() continue elif selected_option == "Open Valve": client.write_register(REG_HOLDING_VALVE_COMMAND, 1, slave=slave_id) message = "-> Sent OPEN command" elif selected_option == "Close Valve": client.write_register(REG_HOLDING_VALVE_COMMAND, 2, slave=slave_id) message = "-> Sent CLOSE command" elif selected_option == "Stop Valve": client.write_register(REG_HOLDING_VALVE_COMMAND, 0, slave=slave_id) message = "-> Sent STOP command" elif "Set Max" in selected_option: input_mode = True input_prompt = f"Enter new value for '{selected_option}' (seconds): " input_target_reg = REG_HOLDING_MAX_OPENING_TIME_S if "Opening" in selected_option else REG_HOLDING_MAX_CLOSING_TIME_S # --- Drawing Logic (Single Source of Truth) --- stdscr.clear() # 1. Draw Status Area with status_lock: current_data = status_data.copy() if current_data.get("error"): stdscr.addstr(0, 0, current_data["error"], curses.color_pair(3) | curses.A_BOLD) else: bold = curses.color_pair(1) | curses.A_BOLD normal = curses.color_pair(1) col1_x, col2_x, col3_x = 2, 35, 70 stdscr.addstr(1, col1_x, "Valve State:", bold); stdscr.addstr(1, col1_x + 14, str(current_data.get('state', 'N/A')), normal) stdscr.addstr(2, col1_x, "Movement:", bold); stdscr.addstr(2, col1_x + 14, str(current_data.get('movement', 'N/A')), normal) stdscr.addstr(3, col1_x, "Motor Current:", bold); stdscr.addstr(3, col1_x + 14, str(current_data.get('motor_current', 'N/A')), normal) stdscr.addstr(1, col2_x, "Max Open Time:", bold); stdscr.addstr(1, col2_x + 16, str(current_data.get('open_time', 'N/A')), normal) stdscr.addstr(2, col2_x, "Max Close Time:", bold); stdscr.addstr(2, col2_x + 16, str(current_data.get('close_time', 'N/A')), normal) stdscr.addstr(1, col3_x, "Firmware:", bold); stdscr.addstr(1, col3_x + 11, str(current_data.get('firmware', 'N/A')), normal) stdscr.addstr(2, col3_x, "Uptime:", bold); stdscr.addstr(2, col3_x + 11, str(current_data.get('uptime', 'N/A')), normal) stdscr.addstr(5, 0, "─" * (w - 1), normal) # 2. Draw Menu Buttons for idx, row in enumerate(menu): x = w // 2 - (len(row) + 4) // 2 y = h // 2 - len(menu) + (idx * 3) draw_button(stdscr, y, x, row, idx == current_row_idx) # 3. Draw Transient Message or Input Prompt if time.time() - message_time < 2.0: stdscr.addstr(h - 2, 0, message.ljust(w - 1), curses.color_pair(1) | curses.A_BOLD) if input_mode: curses.curs_set(1) stdscr.addstr(h - 2, 0, (input_prompt + input_str).ljust(w-1), curses.color_pair(2)) stdscr.move(h - 2, len(input_prompt) + len(input_str)) else: curses.curs_set(0) stdscr.refresh() def main(): global client parser = argparse.ArgumentParser(description="Modbus tool for irrigation system nodes.") parser.add_argument("port", help="Serial port (e.g., /dev/ttyACM0)") parser.add_argument("--baud", type=int, default=19200, help="Baud rate") parser.add_argument("--slave-id", type=int, default=1, help="Modbus slave ID") parser.add_argument("--interval", type=float, default=1.0, help="Polling interval (sec)") args = parser.parse_args() client = ModbusSerialClient(port=args.port, baudrate=args.baud, stopbits=1, bytesize=8, parity="N", timeout=1) if not client.connect(): print(f"Error: Failed to connect to serial port {args.port}") sys.exit(1) print(f"Successfully connected to {args.port}. Starting UI...") time.sleep(0.5) poll_thread = threading.Thread(target=poll_status, args=(args.slave_id, args.interval)) poll_thread.daemon = True poll_thread.start() try: curses.wrapper(main_menu, args.slave_id) finally: stop_event.set() print("\nExiting...") if client.is_socket_open(): client.close() poll_thread.join(timeout=2) if __name__ == "__main__": main()