#!/usr/bin/env python3 import argparse import threading import time import sys import curses from pymodbus.client import ModbusSerialClient from pymodbus.exceptions import ModbusException # Register Definitions REG_INPUT_VALVE_STATE_MOVEMENT = 0x0000 REG_INPUT_MOTOR_CURRENT_MA = 0x0001 REG_INPUT_FIRMWARE_VERSION_MAJOR_MINOR = 0x00F0 REG_INPUT_FIRMWARE_VERSION_PATCH = 0x00F1 REG_INPUT_UPTIME_SECONDS_LOW = 0x00F3 REG_INPUT_UPTIME_SECONDS_HIGH = 0x00F4 REG_HOLDING_VALVE_COMMAND = 0x0000 REG_HOLDING_MAX_OPENING_TIME_S = 0x0001 REG_HOLDING_MAX_CLOSING_TIME_S = 0x0002 # Global state stop_event = threading.Event() client = None status_data = {} status_lock = threading.Lock() def poll_status(slave_id, interval): """Periodically polls the status of the node and updates the global status_data dict.""" global status_data while not stop_event.is_set(): new_data = {"error": None} try: # Read all registers in a few calls rr = client.read_input_registers(REG_INPUT_VALVE_STATE_MOVEMENT, count=2, slave=slave_id) hr = client.read_holding_registers(REG_HOLDING_MAX_OPENING_TIME_S, count=2, slave=slave_id) rr_sys = client.read_input_registers(REG_INPUT_FIRMWARE_VERSION_MAJOR_MINOR, count=5, slave=slave_id) if rr.isError(): raise ModbusException(f"reading valve status: {rr}") if hr.isError(): raise ModbusException(f"reading holding registers: {hr}") if rr_sys.isError(): raise ModbusException(f"reading system status: {rr_sys}") valve_state_raw = rr.registers[0] movement_map = {0: "Idle", 1: "Opening", 2: "Closing", 3: "Error"} state_map = {0: "Closed", 1: "Open"} new_data["movement"] = movement_map.get(valve_state_raw >> 8, 'Unknown') new_data["state"] = state_map.get(valve_state_raw & 0xFF, 'Unknown') new_data["motor_current"] = f"{rr.registers[1]} mA" new_data["open_time"] = f"{hr.registers[0]}s" new_data["close_time"] = f"{hr.registers[1]}s" fw_major = rr_sys.registers[0] >> 8 fw_minor = rr_sys.registers[0] & 0xFF fw_patch = rr_sys.registers[1] uptime_low = rr_sys.registers[3] uptime_high = rr_sys.registers[4] uptime = (uptime_high << 16) | uptime_low new_data["firmware"] = f"v{fw_major}.{fw_minor}.{fw_patch}" new_data["uptime"] = f"{uptime}s" except ModbusException as e: new_data["error"] = f"Modbus Error: {e}" except Exception as e: new_data["error"] = f"Unexpected Error: {e}" with status_lock: status_data = new_data time.sleep(interval) def draw_button(stdscr, y, x, text, selected=False): """Draws a button with a border, handling selection highlight.""" button_width = len(text) + 4 if selected: color = curses.color_pair(2) # Highlighted (blue on white) else: color = curses.color_pair(1) # Normal (white on blue) # Draw the button background and text stdscr.addstr(y, x, " " * button_width, color) stdscr.addstr(y, x + 2, text, color) # Draw the border with the same color attribute stdscr.addstr(y - 1, x, "┌" + "─" * (button_width - 2) + "┐", color) stdscr.addstr(y, x, "│", color) stdscr.addstr(y, x + button_width - 1, "│", color) stdscr.addstr(y + 1, x, "└" + "─" * (button_width - 2) + "┘", color) def get_user_input(stdscr, h, w, prompt): """Safely get user input in a curses window.""" curses.echo() curses.curs_set(1) stdscr.nodelay(0) input_line_y = h - 2 stdscr.addstr(input_line_y, 0, " " * (w - 1), curses.color_pair(1)) stdscr.addstr(input_line_y, 0, prompt, curses.color_pair(1) | curses.A_BOLD) stdscr.refresh() # Input should be on a white background for readability input_win = curses.newwin(1, w - len(prompt) - 1, input_line_y, len(prompt)) input_win.bkgd(' ', curses.color_pair(4)) # Black on white input_str = input_win.getstr().decode('utf-8') stdscr.nodelay(1) curses.noecho() curses.curs_set(0) return input_str def show_message(stdscr, h, w, message, is_error=False, duration=1.5): """Display a message for a short duration.""" msg_y = h - 2 color = curses.color_pair(3) if is_error else curses.color_pair(1) stdscr.addstr(msg_y, 0, " " * (w - 1), color) stdscr.addstr(msg_y, 0, message, color | curses.A_BOLD) stdscr.refresh() time.sleep(duration) def main_menu(stdscr, slave_id): """The main curses UI with a full blue background.""" global status_data curses.curs_set(0) stdscr.nodelay(1) stdscr.timeout(100) # Initialize colors curses.start_color() curses.init_pair(1, curses.COLOR_WHITE, curses.COLOR_BLUE) # Main: white on blue curses.init_pair(2, curses.COLOR_BLUE, curses.COLOR_WHITE) # Selected: blue on white curses.init_pair(3, curses.COLOR_RED, curses.COLOR_BLUE) # Error: red on blue curses.init_pair(4, curses.COLOR_BLACK, curses.COLOR_WHITE) # Input: black on white # Set the background for the whole screen stdscr.bkgd(' ', curses.color_pair(1)) menu = ["Open Valve", "Close Valve", "Stop Valve", "Set Max Opening Time", "Set Max Closing Time", "Exit"] current_row_idx = 0 while not stop_event.is_set(): stdscr.clear() # Clear screen with the background color h, w = stdscr.getmaxyx() # --- Draw Status Area --- with status_lock: current_data = status_data.copy() if current_data.get("error"): stdscr.addstr(0, 0, current_data["error"], curses.color_pair(3) | curses.A_BOLD) else: # Use a single color pair, bold for labels bold = curses.color_pair(1) | curses.A_BOLD normal = curses.color_pair(1) col1_x, col2_x, col3_x = 2, 35, 70 stdscr.addstr(1, col1_x, "Valve State:", bold) stdscr.addstr(1, col1_x + 14, str(current_data.get('state', 'N/A')), normal) stdscr.addstr(2, col1_x, "Movement:", bold) stdscr.addstr(2, col1_x + 14, str(current_data.get('movement', 'N/A')), normal) stdscr.addstr(3, col1_x, "Motor Current:", bold) stdscr.addstr(3, col1_x + 14, str(current_data.get('motor_current', 'N/A')), normal) stdscr.addstr(1, col2_x, "Max Open Time:", bold) stdscr.addstr(1, col2_x + 16, str(current_data.get('open_time', 'N/A')), normal) stdscr.addstr(2, col2_x, "Max Close Time:", bold) stdscr.addstr(2, col2_x + 16, str(current_data.get('close_time', 'N/A')), normal) stdscr.addstr(1, col3_x, "Firmware:", bold) stdscr.addstr(1, col3_x + 11, str(current_data.get('firmware', 'N/A')), normal) stdscr.addstr(2, col3_x, "Uptime:", bold) stdscr.addstr(2, col3_x + 11, str(current_data.get('uptime', 'N/A')), normal) stdscr.addstr(5, 0, "─" * (w - 1), normal) # --- Draw Menu Buttons --- for idx, row in enumerate(menu): x = w // 2 - (len(row) + 4) // 2 y = h // 2 - (len(menu)) + (idx * 3) draw_button(stdscr, y, x, row, idx == current_row_idx) stdscr.refresh() key = stdscr.getch() if key == curses.KEY_UP and current_row_idx > 0: current_row_idx -= 1 elif key == curses.KEY_DOWN and current_row_idx < len(menu) - 1: current_row_idx += 1 elif key == curses.KEY_ENTER or key in [10, 13]: selected_option = menu[current_row_idx] if selected_option == "Exit": stop_event.set() break elif selected_option == "Open Valve": client.write_register(REG_HOLDING_VALVE_COMMAND, 1, slave=slave_id) show_message(stdscr, h, w, "-> Sent OPEN command") elif selected_option == "Close Valve": client.write_register(REG_HOLDING_VALVE_COMMAND, 2, slave=slave_id) show_message(stdscr, h, w, "-> Sent CLOSE command") elif selected_option == "Stop Valve": client.write_register(REG_HOLDING_VALVE_COMMAND, 0, slave=slave_id) show_message(stdscr, h, w, "-> Sent STOP command") elif "Set Max" in selected_option: prompt = f"Enter new value for '{selected_option}' (seconds): " input_str = get_user_input(stdscr, h, w, prompt) try: seconds = int(input_str) reg = REG_HOLDING_MAX_OPENING_TIME_S if "Opening" in selected_option else REG_HOLDING_MAX_CLOSING_TIME_S client.write_register(reg, seconds, slave=slave_id) show_message(stdscr, h, w, f"-> Set '{selected_option}' to {seconds}s") except ValueError: show_message(stdscr, h, w, "-> Invalid input. Please enter a number.", is_error=True) except Exception as e: show_message(stdscr, h, w, f"-> Error: {e}", is_error=True) def main(): global client parser = argparse.ArgumentParser(description="Modbus tool for irrigation system nodes.") parser.add_argument("port", help="Serial port (e.g., /dev/ttyACM0)") parser.add_argument("--baud", type=int, default=19200, help="Baud rate") parser.add_argument("--slave-id", type=int, default=1, help="Modbus slave ID") parser.add_argument("--interval", type=float, default=1.0, help="Polling interval (sec)") args = parser.parse_args() client = ModbusSerialClient(port=args.port, baudrate=args.baud, stopbits=1, bytesize=8, parity="N", timeout=1) if not client.connect(): print(f"Error: Failed to connect to serial port {args.port}") sys.exit(1) print(f"Successfully connected to {args.port}. Starting UI...") time.sleep(0.5) poll_thread = threading.Thread(target=poll_status, args=(args.slave_id, args.interval)) poll_thread.daemon = True poll_thread.start() try: curses.wrapper(main_menu, args.slave_id) finally: stop_event.set() print("\nExiting...") if client.is_socket_open(): client.close() poll_thread.join(timeout=2) if __name__ == "__main__": main()