Compare commits

..

No commits in common. "c2916662e29a907caeae1cd6ee4006f9ee2ca867" and "95fd88e93eebed5b42a1cc96617756abd3c84957" have entirely different histories.

4 changed files with 131 additions and 144 deletions

View File

@ -27,4 +27,3 @@ CONFIG_SETTINGS_LOG_LEVEL_DBG=y
CONFIG_UART_INTERRUPT_DRIVEN=y CONFIG_UART_INTERRUPT_DRIVEN=y
CONFIG_MODBUS=y CONFIG_MODBUS=y
CONFIG_MODBUS_ROLE_SERVER=y CONFIG_MODBUS_ROLE_SERVER=y
CONFIG_MODBUS_BUFFER_SIZE=256

View File

@ -5,15 +5,10 @@ Dieses Python-Skript bietet eine interaktive Kommandozeilen-Benutzeroberfläche
## Features ## Features
- **Interaktive Benutzeroberfläche:** Eine benutzerfreundliche, auf `curses` basierende Oberfläche, die eine einfache Bedienung ermöglicht. - **Interaktive Benutzeroberfläche:** Eine benutzerfreundliche, auf `curses` basierende Oberfläche, die eine einfache Bedienung ermöglicht.
- **Live-Statusanzeige:** Zeigt tabellarisch und in Echtzeit alle wichtigen Register des Slaves an: - **Live-Statusanzeige:** Zeigt tabellarisch und in Echtzeit den Zustand des Ventils, die Bewegung, den Motorstrom, die konfigurierten Öffnungs-/Schließzeiten sowie Firmware-Version und Uptime des Geräts an.
- Ventilstatus (Zustand, Bewegung, Motorstrom) - **Volle Kontrolle:** Ermöglicht das Senden von Befehlen zum Öffnen, Schließen und Stoppen des Ventils.
- Zustand der digitalen Ein- und Ausgänge - **Konfiguration zur Laufzeit:** Die maximalen Öffnungs- und Schließzeiten können direkt in der Oberfläche geändert werden.
- "Clear-on-Read" Taster-Events - **Anpassbares Design:** Die Benutzeroberfläche ist für eine klare Lesbarkeit mit einem durchgehenden blauen Hintergrund und abgesetzten Schaltflächen gestaltet.
- Systemkonfiguration (Öffnungs-/Schließzeiten, Watchdog-Timeout)
- Gerätestatus (Firmware-Version, Uptime)
- **Volle Kontrolle:** Ermöglicht das Senden von Befehlen zum Öffnen, Schließen und Stoppen des Ventils sowie zum Umschalten der digitalen Ausgänge.
- **Konfiguration zur Laufzeit:** Die maximalen Öffnungs-/Schließzeiten und der Watchdog-Timeout können direkt in der Oberfläche geändert werden.
- **Simulierter Firmware-Upload:** Implementiert den vollständigen, in der Dokumentation beschriebenen Firmware-Update-Prozess. Das Tool sendet eine `firmware.bin`-Datei in Chunks an den Slave und folgt dem CRC-Verifizierungs-Protokoll.
## Installation ## Installation
@ -96,6 +91,5 @@ Ersetzen Sie `/dev/ttyACM0` durch den korrekten Port Ihres Geräts.
- **Navigation:** Verwenden Sie die **Pfeiltasten (↑/↓)**, um zwischen den Menüpunkten zu navigieren. - **Navigation:** Verwenden Sie die **Pfeiltasten (↑/↓)**, um zwischen den Menüpunkten zu navigieren.
- **Auswählen:** Drücken Sie **Enter**, um den ausgewählten Befehl auszuführen. - **Auswählen:** Drücken Sie **Enter**, um den ausgewählten Befehl auszuführen.
- **Werte eingeben:** Bei Aktionen wie "Set Watchdog" werden Sie zur Eingabe eines Wertes aufgefordert. Geben Sie den Wert ein und bestätigen Sie mit **Enter**. - **Werte eingeben:** Bei Aktionen wie "Set Max Opening Time" werden Sie zur Eingabe eines Wertes aufgefordert. Geben Sie den Wert ein und bestätigen Sie mit **Enter**.
- **Firmware Update:** Diese Funktion startet den Upload der Datei `firmware.bin` aus dem aktuellen Verzeichnis. Während des Updates wird eine Fortschrittsanzeige dargestellt.
- **Beenden:** Wählen Sie den Menüpunkt **"Exit"** und drücken Sie **Enter**. - **Beenden:** Wählen Sie den Menüpunkt **"Exit"** und drücken Sie **Enter**.

View File

@ -4,9 +4,9 @@ import threading
import time import time
import sys import sys
import curses import curses
import os
from pymodbus.client import ModbusSerialClient from pymodbus.client import ModbusSerialClient
from pymodbus.exceptions import ModbusException from pymodbus.exceptions import ModbusException
import os
# --- Register Definitions --- # --- Register Definitions ---
# Input Registers # Input Registers
@ -38,13 +38,14 @@ stop_event = threading.Event()
client = None client = None
status_data = {} status_data = {}
status_lock = threading.Lock() status_lock = threading.Lock()
update_status = {"running": False, "message": "", "progress": 0.0}
update_lock = threading.Lock()
def format_uptime(seconds): def format_uptime(seconds):
"""Formats seconds into a human-readable d/h/m/s string."""
if not isinstance(seconds, (int, float)) or seconds < 0: return "N/A" if not isinstance(seconds, (int, float)) or seconds < 0: return "N/A"
if seconds == 0: return "0s" if seconds == 0: return "0s"
days, rem = divmod(seconds, 86400); hours, rem = divmod(rem, 3600); minutes, secs = divmod(rem, 60) days, rem = divmod(seconds, 86400)
hours, rem = divmod(rem, 3600)
minutes, secs = divmod(rem, 60)
parts = [] parts = []
if days > 0: parts.append(f"{int(days)}d") if days > 0: parts.append(f"{int(days)}d")
if hours > 0: parts.append(f"{int(hours)}h") if hours > 0: parts.append(f"{int(hours)}h")
@ -53,104 +54,79 @@ def format_uptime(seconds):
return " ".join(parts) return " ".join(parts)
def poll_status(slave_id, interval): def poll_status(slave_id, interval):
"""Periodically polls the status of the node and updates the global status_data dict."""
global status_data global status_data
while not stop_event.is_set(): while not stop_event.is_set():
if update_status["running"]: time.sleep(interval); continue
new_data = {"error": None} new_data = {"error": None}
try: try:
# Grouped reads for efficiency
ir_valve = client.read_input_registers(REG_INPUT_VALVE_STATE_MOVEMENT, count=2, slave=slave_id) ir_valve = client.read_input_registers(REG_INPUT_VALVE_STATE_MOVEMENT, count=2, slave=slave_id)
ir_dig = client.read_input_registers(REG_INPUT_DIGITAL_INPUTS_STATE, count=2, slave=slave_id) ir_dig = client.read_input_registers(REG_INPUT_DIGITAL_INPUTS_STATE, count=2, slave=slave_id)
ir_sys = client.read_input_registers(REG_INPUT_FIRMWARE_VERSION_MAJOR_MINOR, count=5, slave=slave_id) ir_sys = client.read_input_registers(REG_INPUT_FIRMWARE_VERSION_MAJOR_MINOR, count=5, slave=slave_id)
hr_valve = client.read_holding_registers(REG_HOLDING_MAX_OPENING_TIME_S, count=2, slave=slave_id) hr_valve = client.read_holding_registers(REG_HOLDING_MAX_OPENING_TIME_S, count=2, slave=slave_id)
hr_dig = client.read_holding_registers(REG_HOLDING_DIGITAL_OUTPUTS_STATE, count=1, slave=slave_id) hr_dig = client.read_holding_registers(REG_HOLDING_DIGITAL_OUTPUTS_STATE, count=1, slave=slave_id)
hr_sys = client.read_holding_registers(REG_HOLDING_WATCHDOG_TIMEOUT_S, count=1, slave=slave_id) hr_sys = client.read_holding_registers(REG_HOLDING_WATCHDOG_TIMEOUT_S, count=1, slave=slave_id)
# Check for errors
for res in [ir_valve, ir_dig, ir_sys, hr_valve, hr_dig, hr_sys]: for res in [ir_valve, ir_dig, ir_sys, hr_valve, hr_dig, hr_sys]:
if res.isError(): raise ModbusException(str(res)) if res.isError(): raise ModbusException(str(res))
# --- Process Valve & Motor Data ---
valve_state_raw = ir_valve.registers[0] valve_state_raw = ir_valve.registers[0]
movement_map = {0: "Idle", 1: "Opening", 2: "Closing", 3: "Error"}; state_map = {0: "Closed", 1: "Open"} movement_map = {0: "Idle", 1: "Opening", 2: "Closing", 3: "Error"}
new_data["movement"] = movement_map.get(valve_state_raw >> 8, 'Unknown'); new_data["state"] = state_map.get(valve_state_raw & 0xFF, 'Unknown') state_map = {0: "Closed", 1: "Open"}
new_data["motor_current"] = f"{ir_valve.registers[1]} mA"; new_data["open_time"] = f"{hr_valve.registers[0]}s"; new_data["close_time"] = f"{hr_valve.registers[1]}s" new_data["movement"] = movement_map.get(valve_state_raw >> 8, 'Unknown')
new_data["digital_inputs"] = f"0x{ir_dig.registers[0]:04X}"; new_data["button_events"] = f"0x{ir_dig.registers[1]:04X}"; new_data["digital_outputs"] = f"0x{hr_dig.registers[0]:04X}" new_data["state"] = state_map.get(valve_state_raw & 0xFF, 'Unknown')
new_data["motor_current"] = f"{ir_valve.registers[1]} mA"
new_data["open_time"] = f"{hr_valve.registers[0]}s"
new_data["close_time"] = f"{hr_valve.registers[1]}s"
fw_major = ir_sys.registers[0] >> 8; fw_minor = ir_sys.registers[0] & 0xFF; fw_patch = ir_sys.registers[1] # --- Process Digital I/O ---
new_data["digital_inputs"] = f"0x{ir_dig.registers[0]:04X}"
new_data["button_events"] = f"0x{ir_dig.registers[1]:04X}"
new_data["digital_outputs"] = f"0x{hr_dig.registers[0]:04X}"
# --- Process System Data ---
fw_major = ir_sys.registers[0] >> 8
fw_minor = ir_sys.registers[0] & 0xFF
fw_patch = ir_sys.registers[1]
uptime_seconds = (ir_sys.registers[4] << 16) | ir_sys.registers[3] uptime_seconds = (ir_sys.registers[4] << 16) | ir_sys.registers[3]
new_data["firmware"] = f"v{fw_major}.{fw_minor}.{fw_patch}"; new_data["device_status"] = "OK" if ir_sys.registers[2] == 0 else "ERROR" new_data["firmware"] = f"v{fw_major}.{fw_minor}.{fw_patch}"
new_data["uptime"] = format_uptime(uptime_seconds); new_data["watchdog"] = f"{hr_sys.registers[0]}s" new_data["device_status"] = "OK" if ir_sys.registers[2] == 0 else "ERROR"
new_data["uptime"] = format_uptime(uptime_seconds)
new_data["watchdog"] = f"{hr_sys.registers[0]}s"
except ModbusException as e:
new_data["error"] = f"Modbus Error: {e}"
except Exception as e: except Exception as e:
new_data["error"] = f"Error: {e}" new_data["error"] = f"Unexpected Error: {e}"
with status_lock: status_data = new_data
with status_lock:
status_data = new_data
time.sleep(interval) time.sleep(interval)
def firmware_update_thread(slave_id, filepath):
global update_status
with update_lock:
update_status = {"running": True, "message": "Starting update...", "progress": 0.0}
try:
with open(filepath, 'rb') as f:
firmware = f.read()
file_size = len(firmware)
chunk_size = 248 # Max payload size for write_registers is ~248 bytes
offset = 0
while offset < file_size:
chunk = firmware[offset:offset + chunk_size]
with update_lock:
update_status["message"] = f"Sending chunk {offset//chunk_size + 1}/{(file_size + chunk_size - 1)//chunk_size}..."
update_status["progress"] = offset / file_size
# 1. Set offset and size
client.write_register(REG_HOLDING_FWU_CHUNK_OFFSET_LOW, offset & 0xFFFF, slave=slave_id)
client.write_register(REG_HOLDING_FWU_CHUNK_OFFSET_HIGH, (offset >> 16) & 0xFFFF, slave=slave_id)
client.write_register(REG_HOLDING_FWU_CHUNK_SIZE, len(chunk), slave=slave_id)
# 2. Write data buffer in smaller bursts to avoid timing issues
padded_chunk = chunk
if len(padded_chunk) % 2 != 0:
padded_chunk += b'\x00'
all_registers = [int.from_bytes(padded_chunk[i:i+2], 'big') for i in range(0, len(padded_chunk), 2)]
burst_size_regs = 16 # 32 bytes per burst
for i in range(0, len(all_registers), burst_size_regs):
reg_burst = all_registers[i:i + burst_size_regs]
start_addr = REG_HOLDING_FWU_DATA_BUFFER + i
client.write_registers(start_addr, reg_burst, slave=slave_id)
time.sleep(0.02) # Small delay between bursts
# 3. Read back CRC
time.sleep(0.1) # Give slave time to calculate
remote_crc = client.read_input_registers(REG_INPUT_FWU_LAST_CHUNK_CRC, count=1, slave=slave_id).registers[0]
# 4. Verify (not implemented in this simulation) and command write
client.write_register(REG_HOLDING_FWU_COMMAND, 1, slave=slave_id) # Command: Verify Chunk
offset += len(chunk)
with update_lock: update_status["message"] = "Finalizing update..."
client.write_register(REG_HOLDING_FWU_COMMAND, 2, slave=slave_id) # Command: Finalize
time.sleep(1)
with update_lock: update_status["message"] = "Update complete! Slave is rebooting."
time.sleep(2)
except Exception as e:
with update_lock: update_status["message"] = f"Error: {e}"
time.sleep(3)
finally:
with update_lock: update_status["running"] = False
def draw_button(stdscr, y, x, text, selected=False): def draw_button(stdscr, y, x, text, selected=False):
"""Draws a button with a border, handling selection highlight."""
button_width = len(text) + 4
color = curses.color_pair(2) if selected else curses.color_pair(1) color = curses.color_pair(2) if selected else curses.color_pair(1)
stdscr.addstr(y, x, f" {' ' * len(text)} ", color) stdscr.addstr(y, x, " " * button_width, color)
stdscr.addstr(y, x + 1, text, color) stdscr.addstr(y, x + 2, text, color)
stdscr.addstr(y - 1, x, "" + "" * (button_width - 2) + "", color)
stdscr.addstr(y, x, "", color)
stdscr.addstr(y, x + button_width - 1, "", color)
stdscr.addstr(y + 1, x, "" + "" * (button_width - 2) + "", color)
def main_menu(stdscr, slave_id): def main_menu(stdscr, slave_id):
global status_data, update_status """The main curses UI with a flicker-free, state-based drawing loop."""
curses.curs_set(0); stdscr.nodelay(1); stdscr.timeout(100) global status_data
curses.start_color(); curses.init_pair(1, curses.COLOR_WHITE, curses.COLOR_BLUE); curses.init_pair(2, curses.COLOR_BLUE, curses.COLOR_WHITE); curses.init_pair(3, curses.COLOR_RED, curses.COLOR_BLUE) curses.curs_set(0)
stdscr.nodelay(1)
stdscr.timeout(100)
curses.start_color()
curses.init_pair(1, curses.COLOR_WHITE, curses.COLOR_BLUE)
curses.init_pair(2, curses.COLOR_BLUE, curses.COLOR_WHITE)
curses.init_pair(3, curses.COLOR_RED, curses.COLOR_BLUE)
stdscr.bkgd(' ', curses.color_pair(1)) stdscr.bkgd(' ', curses.color_pair(1))
menu = ["Open Valve", "Close Valve", "Stop Valve", "Toggle Output 1", "Toggle Output 2", "Set Watchdog", "Firmware Update", "Exit"] menu = ["Open Valve", "Close Valve", "Stop Valve", "Toggle Output 1", "Toggle Output 2", "Set Watchdog", "Firmware Update", "Exit"]
@ -162,27 +138,26 @@ def main_menu(stdscr, slave_id):
h, w = stdscr.getmaxyx() h, w = stdscr.getmaxyx()
key = stdscr.getch() key = stdscr.getch()
with update_lock: is_updating = update_status["running"] if input_mode:
if key in [10, 13]: # Enter
if is_updating:
# Update display only, no input handling
pass
elif input_mode:
if key in [10, 13]:
try: try:
value = int(input_str) value = int(input_str)
client.write_register(input_target_reg, value, slave=slave_id) client.write_register(input_target_reg, value, slave=slave_id)
message = f"-> Set register 0x{input_target_reg:04X} to {value}" message = f"-> Set register 0x{input_target_reg:04X} to {value}"
except Exception as e: message = f"-> Error: {e}" except Exception as e:
message = f"-> Error: {e}"
message_time, input_mode, input_str = time.time(), False, "" message_time, input_mode, input_str = time.time(), False, ""
elif key == curses.KEY_BACKSPACE or key == 127: input_str = input_str[:-1] elif key == curses.KEY_BACKSPACE or key == 127:
elif key != -1 and chr(key).isprintable(): input_str += chr(key) input_str = input_str[:-1]
else: elif key != -1 and chr(key).isprintable():
input_str += chr(key)
else: # Navigation mode
if key == curses.KEY_UP: current_row_idx = (current_row_idx - 1) % len(menu) if key == curses.KEY_UP: current_row_idx = (current_row_idx - 1) % len(menu)
elif key == curses.KEY_DOWN: current_row_idx = (current_row_idx + 1) % len(menu) elif key == curses.KEY_DOWN: current_row_idx = (current_row_idx + 1) % len(menu)
elif key == curses.KEY_ENTER or key in [10, 13]: elif key == curses.KEY_ENTER or key in [10, 13]:
selected_option = menu[current_row_idx] selected_option = menu[current_row_idx]
message_time = time.time() message_time = time.time()
if selected_option == "Exit": stop_event.set(); continue if selected_option == "Exit": stop_event.set(); continue
elif selected_option == "Open Valve": client.write_register(REG_HOLDING_VALVE_COMMAND, 1, slave=slave_id); message = "-> Sent OPEN command" elif selected_option == "Open Valve": client.write_register(REG_HOLDING_VALVE_COMMAND, 1, slave=slave_id); message = "-> Sent OPEN command"
elif selected_option == "Close Valve": client.write_register(REG_HOLDING_VALVE_COMMAND, 2, slave=slave_id); message = "-> Sent CLOSE command" elif selected_option == "Close Valve": client.write_register(REG_HOLDING_VALVE_COMMAND, 2, slave=slave_id); message = "-> Sent CLOSE command"
@ -191,64 +166,83 @@ def main_menu(stdscr, slave_id):
bit = 0 if "1" in selected_option else 1 bit = 0 if "1" in selected_option else 1
try: try:
current_val = client.read_holding_registers(REG_HOLDING_DIGITAL_OUTPUTS_STATE, count=1, slave=slave_id).registers[0] current_val = client.read_holding_registers(REG_HOLDING_DIGITAL_OUTPUTS_STATE, count=1, slave=slave_id).registers[0]
client.write_register(REG_HOLDING_DIGITAL_OUTPUTS_STATE, current_val ^ (1 << bit), slave=slave_id) new_val = current_val ^ (1 << bit)
client.write_register(REG_HOLDING_DIGITAL_OUTPUTS_STATE, new_val, slave=slave_id)
message = f"-> Toggled Output {bit+1}" message = f"-> Toggled Output {bit+1}"
except Exception as e: message = f"-> Error: {e}" except Exception as e: message = f"-> Error: {e}"
elif selected_option == "Set Watchdog": elif selected_option == "Set Watchdog":
input_mode, input_prompt, input_target_reg = True, "Enter Watchdog Timeout (s): ", REG_HOLDING_WATCHDOG_TIMEOUT_S input_mode, input_prompt, input_target_reg = True, "Enter Watchdog Timeout (s): ", REG_HOLDING_WATCHDOG_TIMEOUT_S
elif selected_option == "Firmware Update": elif selected_option == "Firmware Update":
threading.Thread(target=firmware_update_thread, args=(slave_id, "firmware.bin"), daemon=True).start() message = "-> Firmware update process not yet implemented."
stdscr.clear() stdscr.clear()
if is_updating:
with update_lock:
prog = update_status["progress"]
msg = update_status["message"]
stdscr.addstr(h // 2 - 1, w // 2 - 25, "FIRMWARE UPDATE IN PROGRESS", curses.A_BOLD | curses.color_pair(2))
stdscr.addstr(h // 2, w // 2 - 25, f"[{'#' * int(prog * 50):<50}] {prog:.0%}")
stdscr.addstr(h // 2 + 1, w // 2 - 25, msg.ljust(50))
else:
with status_lock: current_data = status_data.copy() with status_lock: current_data = status_data.copy()
if current_data.get("error"): stdscr.addstr(0, 0, current_data["error"], curses.color_pair(3) | curses.A_BOLD)
if current_data.get("error"):
stdscr.addstr(0, 0, current_data["error"], curses.color_pair(3) | curses.A_BOLD)
else: else:
bold, normal = curses.color_pair(1) | curses.A_BOLD, curses.color_pair(1) bold, normal = curses.color_pair(1) | curses.A_BOLD, curses.color_pair(1)
# Status Area
col1, col2, col3, col4 = 2, 30, 58, 88 col1, col2, col3, col4 = 2, 30, 58, 88
# Status display lines...
stdscr.addstr(1, col1, "State:", bold); stdscr.addstr(1, col1 + 18, str(current_data.get('state', 'N/A')), normal) stdscr.addstr(1, col1, "State:", bold); stdscr.addstr(1, col1 + 18, str(current_data.get('state', 'N/A')), normal)
stdscr.addstr(2, col1, "Movement:", bold); stdscr.addstr(2, col1 + 18, str(current_data.get('movement', 'N/A')), normal) stdscr.addstr(2, col1, "Movement:", bold); stdscr.addstr(2, col1 + 18, str(current_data.get('movement', 'N/A')), normal)
stdscr.addstr(3, col1, "Motor Current:", bold); stdscr.addstr(3, col1 + 18, str(current_data.get('motor_current', 'N/A')), normal) stdscr.addstr(3, col1, "Motor Current:", bold); stdscr.addstr(3, col1 + 18, str(current_data.get('motor_current', 'N/A')), normal)
stdscr.addstr(1, col2, "Digital Inputs:", bold); stdscr.addstr(1, col2 + 18, str(current_data.get('digital_inputs', 'N/A')), normal) stdscr.addstr(1, col2, "Digital Inputs:", bold); stdscr.addstr(1, col2 + 18, str(current_data.get('digital_inputs', 'N/A')), normal)
stdscr.addstr(2, col2, "Digital Outputs:", bold); stdscr.addstr(2, col2 + 18, str(current_data.get('digital_outputs', 'N/A')), normal) stdscr.addstr(2, col2, "Digital Outputs:", bold); stdscr.addstr(2, col2 + 18, str(current_data.get('digital_outputs', 'N/A')), normal)
stdscr.addstr(3, col2, "Button Events:", bold); stdscr.addstr(3, col2 + 18, str(current_data.get('button_events', 'N/A')), normal) stdscr.addstr(3, col2, "Button Events:", bold); stdscr.addstr(3, col2 + 18, str(current_data.get('button_events', 'N/A')), normal)
stdscr.addstr(1, col3, "Max Open Time:", bold); stdscr.addstr(1, col3 + 16, str(current_data.get('open_time', 'N/A')), normal) stdscr.addstr(1, col3, "Max Open Time:", bold); stdscr.addstr(1, col3 + 16, str(current_data.get('open_time', 'N/A')), normal)
stdscr.addstr(2, col3, "Max Close Time:", bold); stdscr.addstr(2, col3 + 16, str(current_data.get('close_time', 'N/A')), normal) stdscr.addstr(2, col3, "Max Close Time:", bold); stdscr.addstr(2, col3 + 16, str(current_data.get('close_time', 'N/A')), normal)
stdscr.addstr(3, col3, "Watchdog:", bold); stdscr.addstr(3, col3 + 16, str(current_data.get('watchdog', 'N/A')), normal) stdscr.addstr(3, col3, "Watchdog:", bold); stdscr.addstr(3, col3 + 16, str(current_data.get('watchdog', 'N/A')), normal)
stdscr.addstr(1, col4, "Firmware:", bold); stdscr.addstr(1, col4 + 12, str(current_data.get('firmware', 'N/A')), normal) stdscr.addstr(1, col4, "Firmware:", bold); stdscr.addstr(1, col4 + 12, str(current_data.get('firmware', 'N/A')), normal)
stdscr.addstr(2, col4, "Uptime:", bold); stdscr.addstr(2, col4 + 12, str(current_data.get('uptime', 'N/A')), normal) stdscr.addstr(2, col4, "Uptime:", bold); stdscr.addstr(2, col4 + 12, str(current_data.get('uptime', 'N/A')), normal)
stdscr.addstr(3, col4, "Device Status:", bold); stdscr.addstr(3, col4 + 12, str(current_data.get('device_status', 'N/A')), normal) stdscr.addstr(3, col4, "Device Status:", bold); stdscr.addstr(3, col4 + 12, str(current_data.get('device_status', 'N/A')), normal)
stdscr.addstr(5, 0, "" * (w - 1), normal) stdscr.addstr(5, 0, "" * (w - 1), normal)
for idx, row in enumerate(menu): for idx, row in enumerate(menu):
draw_button(stdscr, h // 2 - len(menu) + (idx * 2), w // 2 - len(row) // 2, row, idx == current_row_idx) x = w // 2 - (len(row) + 4) // 2
if time.time() - message_time < 2.0: stdscr.addstr(h - 2, 0, message.ljust(w - 1), curses.color_pair(1) | curses.A_BOLD) y = h // 2 - len(menu) + (idx * 3)
draw_button(stdscr, y, x, row, idx == current_row_idx)
if time.time() - message_time < 2.0:
stdscr.addstr(h - 2, 0, message.ljust(w - 1), curses.color_pair(1) | curses.A_BOLD)
if input_mode: if input_mode:
curses.curs_set(1); stdscr.addstr(h - 2, 0, (input_prompt + input_str).ljust(w-1), curses.color_pair(2)); stdscr.move(h - 2, len(input_prompt) + len(input_str)) curses.curs_set(1)
else: curses.curs_set(0) stdscr.addstr(h - 2, 0, (input_prompt + input_str).ljust(w-1), curses.color_pair(2))
stdscr.move(h - 2, len(input_prompt) + len(input_str))
else:
curses.curs_set(0)
stdscr.refresh() stdscr.refresh()
def main(): def main():
global client global client
parser = argparse.ArgumentParser(description="Modbus tool for irrigation system nodes.") parser = argparse.ArgumentParser(description="Modbus tool for irrigation system nodes.")
parser.add_argument("port", help="Serial port"); parser.add_argument("--baud", type=int, default=19200); parser.add_argument("--slave-id", type=int, default=1); parser.add_argument("--interval", type=float, default=1.0) parser.add_argument("port", help="Serial port (e.g., /dev/ttyACM0)")
parser.add_argument("--baud", type=int, default=19200, help="Baud rate")
parser.add_argument("--slave-id", type=int, default=1, help="Modbus slave ID")
parser.add_argument("--interval", type=float, default=1.0, help="Polling interval (sec)")
args = parser.parse_args() args = parser.parse_args()
client = ModbusSerialClient(port=args.port, baudrate=args.baud, stopbits=1, bytesize=8, parity="N", timeout=1) client = ModbusSerialClient(port=args.port, baudrate=args.baud, stopbits=1, bytesize=8, parity="N", timeout=1)
if not client.connect(): print(f"Error: Failed to connect to serial port {args.port}"); sys.exit(1) if not client.connect():
print(f"Error: Failed to connect to serial port {args.port}"); sys.exit(1)
print("Successfully connected. Starting UI..."); time.sleep(0.5) print("Successfully connected. Starting UI..."); time.sleep(0.5)
threading.Thread(target=poll_status, args=(args.slave_id, args.interval), daemon=True).start() poll_thread = threading.Thread(target=poll_status, args=(args.slave_id, args.interval), daemon=True)
try: curses.wrapper(main_menu, args.slave_id) poll_thread.start()
try:
curses.wrapper(main_menu, args.slave_id)
finally: finally:
stop_event.set() stop_event.set()
print("\nExiting...") print("\nExiting...")
if client.is_socket_open(): client.close() if client.is_socket_open(): client.close()
poll_thread.join(timeout=2)
if __name__ == "__main__": if __name__ == "__main__":
main() main()