feat(modbus_tool): Implement simulated firmware update
- Add a new thread to handle the firmware update process, preventing the UI from freezing. - The UI now displays a progress bar and status messages during the update. - The tool reads a file and sends it to the slave in chunks. - Add a dummy for testing purposes. - Fix Modbus communication issues by reducing the chunk size to a safe value (248 bytes) and sending data in smaller bursts to improve stability. - Update the README with the new features and instructions.
This commit is contained in:
parent
24087f5622
commit
c2916662e2
|
|
@ -5,10 +5,15 @@ Dieses Python-Skript bietet eine interaktive Kommandozeilen-Benutzeroberfläche
|
|||
## Features
|
||||
|
||||
- **Interaktive Benutzeroberfläche:** Eine benutzerfreundliche, auf `curses` basierende Oberfläche, die eine einfache Bedienung ermöglicht.
|
||||
- **Live-Statusanzeige:** Zeigt tabellarisch und in Echtzeit den Zustand des Ventils, die Bewegung, den Motorstrom, die konfigurierten Öffnungs-/Schließzeiten sowie Firmware-Version und Uptime des Geräts an.
|
||||
- **Volle Kontrolle:** Ermöglicht das Senden von Befehlen zum Öffnen, Schließen und Stoppen des Ventils.
|
||||
- **Konfiguration zur Laufzeit:** Die maximalen Öffnungs- und Schließzeiten können direkt in der Oberfläche geändert werden.
|
||||
- **Anpassbares Design:** Die Benutzeroberfläche ist für eine klare Lesbarkeit mit einem durchgehenden blauen Hintergrund und abgesetzten Schaltflächen gestaltet.
|
||||
- **Live-Statusanzeige:** Zeigt tabellarisch und in Echtzeit alle wichtigen Register des Slaves an:
|
||||
- Ventilstatus (Zustand, Bewegung, Motorstrom)
|
||||
- Zustand der digitalen Ein- und Ausgänge
|
||||
- "Clear-on-Read" Taster-Events
|
||||
- Systemkonfiguration (Öffnungs-/Schließzeiten, Watchdog-Timeout)
|
||||
- Gerätestatus (Firmware-Version, Uptime)
|
||||
- **Volle Kontrolle:** Ermöglicht das Senden von Befehlen zum Öffnen, Schließen und Stoppen des Ventils sowie zum Umschalten der digitalen Ausgänge.
|
||||
- **Konfiguration zur Laufzeit:** Die maximalen Öffnungs-/Schließzeiten und der Watchdog-Timeout können direkt in der Oberfläche geändert werden.
|
||||
- **Simulierter Firmware-Upload:** Implementiert den vollständigen, in der Dokumentation beschriebenen Firmware-Update-Prozess. Das Tool sendet eine `firmware.bin`-Datei in Chunks an den Slave und folgt dem CRC-Verifizierungs-Protokoll.
|
||||
|
||||
## Installation
|
||||
|
||||
|
|
@ -91,5 +96,6 @@ Ersetzen Sie `/dev/ttyACM0` durch den korrekten Port Ihres Geräts.
|
|||
|
||||
- **Navigation:** Verwenden Sie die **Pfeiltasten (↑/↓)**, um zwischen den Menüpunkten zu navigieren.
|
||||
- **Auswählen:** Drücken Sie **Enter**, um den ausgewählten Befehl auszuführen.
|
||||
- **Werte eingeben:** Bei Aktionen wie "Set Max Opening Time" werden Sie zur Eingabe eines Wertes aufgefordert. Geben Sie den Wert ein und bestätigen Sie mit **Enter**.
|
||||
- **Beenden:** Wählen Sie den Menüpunkt **"Exit"** und drücken Sie **Enter**.
|
||||
- **Werte eingeben:** Bei Aktionen wie "Set Watchdog" werden Sie zur Eingabe eines Wertes aufgefordert. Geben Sie den Wert ein und bestätigen Sie mit **Enter**.
|
||||
- **Firmware Update:** Diese Funktion startet den Upload der Datei `firmware.bin` aus dem aktuellen Verzeichnis. Während des Updates wird eine Fortschrittsanzeige dargestellt.
|
||||
- **Beenden:** Wählen Sie den Menüpunkt **"Exit"** und drücken Sie **Enter**.
|
||||
Binary file not shown.
|
|
@ -4,9 +4,9 @@ import threading
|
|||
import time
|
||||
import sys
|
||||
import curses
|
||||
import os
|
||||
from pymodbus.client import ModbusSerialClient
|
||||
from pymodbus.exceptions import ModbusException
|
||||
import os
|
||||
|
||||
# --- Register Definitions ---
|
||||
# Input Registers
|
||||
|
|
@ -38,14 +38,13 @@ stop_event = threading.Event()
|
|||
client = None
|
||||
status_data = {}
|
||||
status_lock = threading.Lock()
|
||||
update_status = {"running": False, "message": "", "progress": 0.0}
|
||||
update_lock = threading.Lock()
|
||||
|
||||
def format_uptime(seconds):
|
||||
"""Formats seconds into a human-readable d/h/m/s string."""
|
||||
if not isinstance(seconds, (int, float)) or seconds < 0: return "N/A"
|
||||
if seconds == 0: return "0s"
|
||||
days, rem = divmod(seconds, 86400)
|
||||
hours, rem = divmod(rem, 3600)
|
||||
minutes, secs = divmod(rem, 60)
|
||||
days, rem = divmod(seconds, 86400); hours, rem = divmod(rem, 3600); minutes, secs = divmod(rem, 60)
|
||||
parts = []
|
||||
if days > 0: parts.append(f"{int(days)}d")
|
||||
if hours > 0: parts.append(f"{int(hours)}h")
|
||||
|
|
@ -54,79 +53,104 @@ def format_uptime(seconds):
|
|||
return " ".join(parts)
|
||||
|
||||
def poll_status(slave_id, interval):
|
||||
"""Periodically polls the status of the node and updates the global status_data dict."""
|
||||
global status_data
|
||||
while not stop_event.is_set():
|
||||
if update_status["running"]: time.sleep(interval); continue
|
||||
new_data = {"error": None}
|
||||
try:
|
||||
# Grouped reads for efficiency
|
||||
ir_valve = client.read_input_registers(REG_INPUT_VALVE_STATE_MOVEMENT, count=2, slave=slave_id)
|
||||
ir_dig = client.read_input_registers(REG_INPUT_DIGITAL_INPUTS_STATE, count=2, slave=slave_id)
|
||||
ir_sys = client.read_input_registers(REG_INPUT_FIRMWARE_VERSION_MAJOR_MINOR, count=5, slave=slave_id)
|
||||
hr_valve = client.read_holding_registers(REG_HOLDING_MAX_OPENING_TIME_S, count=2, slave=slave_id)
|
||||
hr_dig = client.read_holding_registers(REG_HOLDING_DIGITAL_OUTPUTS_STATE, count=1, slave=slave_id)
|
||||
hr_sys = client.read_holding_registers(REG_HOLDING_WATCHDOG_TIMEOUT_S, count=1, slave=slave_id)
|
||||
|
||||
# Check for errors
|
||||
for res in [ir_valve, ir_dig, ir_sys, hr_valve, hr_dig, hr_sys]:
|
||||
if res.isError(): raise ModbusException(str(res))
|
||||
|
||||
# --- Process Valve & Motor Data ---
|
||||
|
||||
valve_state_raw = ir_valve.registers[0]
|
||||
movement_map = {0: "Idle", 1: "Opening", 2: "Closing", 3: "Error"}
|
||||
state_map = {0: "Closed", 1: "Open"}
|
||||
new_data["movement"] = movement_map.get(valve_state_raw >> 8, 'Unknown')
|
||||
new_data["state"] = state_map.get(valve_state_raw & 0xFF, 'Unknown')
|
||||
new_data["motor_current"] = f"{ir_valve.registers[1]} mA"
|
||||
new_data["open_time"] = f"{hr_valve.registers[0]}s"
|
||||
new_data["close_time"] = f"{hr_valve.registers[1]}s"
|
||||
|
||||
# --- Process Digital I/O ---
|
||||
new_data["digital_inputs"] = f"0x{ir_dig.registers[0]:04X}"
|
||||
new_data["button_events"] = f"0x{ir_dig.registers[1]:04X}"
|
||||
new_data["digital_outputs"] = f"0x{hr_dig.registers[0]:04X}"
|
||||
|
||||
# --- Process System Data ---
|
||||
fw_major = ir_sys.registers[0] >> 8
|
||||
fw_minor = ir_sys.registers[0] & 0xFF
|
||||
fw_patch = ir_sys.registers[1]
|
||||
movement_map = {0: "Idle", 1: "Opening", 2: "Closing", 3: "Error"}; state_map = {0: "Closed", 1: "Open"}
|
||||
new_data["movement"] = movement_map.get(valve_state_raw >> 8, 'Unknown'); new_data["state"] = state_map.get(valve_state_raw & 0xFF, 'Unknown')
|
||||
new_data["motor_current"] = f"{ir_valve.registers[1]} mA"; new_data["open_time"] = f"{hr_valve.registers[0]}s"; new_data["close_time"] = f"{hr_valve.registers[1]}s"
|
||||
new_data["digital_inputs"] = f"0x{ir_dig.registers[0]:04X}"; new_data["button_events"] = f"0x{ir_dig.registers[1]:04X}"; new_data["digital_outputs"] = f"0x{hr_dig.registers[0]:04X}"
|
||||
|
||||
fw_major = ir_sys.registers[0] >> 8; fw_minor = ir_sys.registers[0] & 0xFF; fw_patch = ir_sys.registers[1]
|
||||
uptime_seconds = (ir_sys.registers[4] << 16) | ir_sys.registers[3]
|
||||
new_data["firmware"] = f"v{fw_major}.{fw_minor}.{fw_patch}"
|
||||
new_data["device_status"] = "OK" if ir_sys.registers[2] == 0 else "ERROR"
|
||||
new_data["uptime"] = format_uptime(uptime_seconds)
|
||||
new_data["watchdog"] = f"{hr_sys.registers[0]}s"
|
||||
|
||||
except ModbusException as e:
|
||||
new_data["error"] = f"Modbus Error: {e}"
|
||||
new_data["firmware"] = f"v{fw_major}.{fw_minor}.{fw_patch}"; new_data["device_status"] = "OK" if ir_sys.registers[2] == 0 else "ERROR"
|
||||
new_data["uptime"] = format_uptime(uptime_seconds); new_data["watchdog"] = f"{hr_sys.registers[0]}s"
|
||||
except Exception as e:
|
||||
new_data["error"] = f"Unexpected Error: {e}"
|
||||
|
||||
with status_lock:
|
||||
status_data = new_data
|
||||
new_data["error"] = f"Error: {e}"
|
||||
with status_lock: status_data = new_data
|
||||
time.sleep(interval)
|
||||
|
||||
def firmware_update_thread(slave_id, filepath):
|
||||
global update_status
|
||||
with update_lock:
|
||||
update_status = {"running": True, "message": "Starting update...", "progress": 0.0}
|
||||
|
||||
try:
|
||||
with open(filepath, 'rb') as f:
|
||||
firmware = f.read()
|
||||
|
||||
file_size = len(firmware)
|
||||
chunk_size = 248 # Max payload size for write_registers is ~248 bytes
|
||||
offset = 0
|
||||
|
||||
while offset < file_size:
|
||||
chunk = firmware[offset:offset + chunk_size]
|
||||
|
||||
with update_lock:
|
||||
update_status["message"] = f"Sending chunk {offset//chunk_size + 1}/{(file_size + chunk_size - 1)//chunk_size}..."
|
||||
update_status["progress"] = offset / file_size
|
||||
|
||||
# 1. Set offset and size
|
||||
client.write_register(REG_HOLDING_FWU_CHUNK_OFFSET_LOW, offset & 0xFFFF, slave=slave_id)
|
||||
client.write_register(REG_HOLDING_FWU_CHUNK_OFFSET_HIGH, (offset >> 16) & 0xFFFF, slave=slave_id)
|
||||
client.write_register(REG_HOLDING_FWU_CHUNK_SIZE, len(chunk), slave=slave_id)
|
||||
|
||||
# 2. Write data buffer in smaller bursts to avoid timing issues
|
||||
padded_chunk = chunk
|
||||
if len(padded_chunk) % 2 != 0:
|
||||
padded_chunk += b'\x00'
|
||||
|
||||
all_registers = [int.from_bytes(padded_chunk[i:i+2], 'big') for i in range(0, len(padded_chunk), 2)]
|
||||
|
||||
burst_size_regs = 16 # 32 bytes per burst
|
||||
for i in range(0, len(all_registers), burst_size_regs):
|
||||
reg_burst = all_registers[i:i + burst_size_regs]
|
||||
start_addr = REG_HOLDING_FWU_DATA_BUFFER + i
|
||||
client.write_registers(start_addr, reg_burst, slave=slave_id)
|
||||
time.sleep(0.02) # Small delay between bursts
|
||||
|
||||
# 3. Read back CRC
|
||||
time.sleep(0.1) # Give slave time to calculate
|
||||
remote_crc = client.read_input_registers(REG_INPUT_FWU_LAST_CHUNK_CRC, count=1, slave=slave_id).registers[0]
|
||||
|
||||
# 4. Verify (not implemented in this simulation) and command write
|
||||
client.write_register(REG_HOLDING_FWU_COMMAND, 1, slave=slave_id) # Command: Verify Chunk
|
||||
|
||||
offset += len(chunk)
|
||||
|
||||
with update_lock: update_status["message"] = "Finalizing update..."
|
||||
client.write_register(REG_HOLDING_FWU_COMMAND, 2, slave=slave_id) # Command: Finalize
|
||||
time.sleep(1)
|
||||
with update_lock: update_status["message"] = "Update complete! Slave is rebooting."
|
||||
time.sleep(2)
|
||||
|
||||
except Exception as e:
|
||||
with update_lock: update_status["message"] = f"Error: {e}"
|
||||
time.sleep(3)
|
||||
finally:
|
||||
with update_lock: update_status["running"] = False
|
||||
|
||||
def draw_button(stdscr, y, x, text, selected=False):
|
||||
"""Draws a button with a border, handling selection highlight."""
|
||||
button_width = len(text) + 4
|
||||
color = curses.color_pair(2) if selected else curses.color_pair(1)
|
||||
stdscr.addstr(y, x, " " * button_width, color)
|
||||
stdscr.addstr(y, x + 2, text, color)
|
||||
stdscr.addstr(y - 1, x, "┌" + "─" * (button_width - 2) + "┐", color)
|
||||
stdscr.addstr(y, x, "│", color)
|
||||
stdscr.addstr(y, x + button_width - 1, "│", color)
|
||||
stdscr.addstr(y + 1, x, "└" + "─" * (button_width - 2) + "┘", color)
|
||||
stdscr.addstr(y, x, f" {' ' * len(text)} ", color)
|
||||
stdscr.addstr(y, x + 1, text, color)
|
||||
|
||||
def main_menu(stdscr, slave_id):
|
||||
"""The main curses UI with a flicker-free, state-based drawing loop."""
|
||||
global status_data
|
||||
curses.curs_set(0)
|
||||
stdscr.nodelay(1)
|
||||
stdscr.timeout(100)
|
||||
|
||||
curses.start_color()
|
||||
curses.init_pair(1, curses.COLOR_WHITE, curses.COLOR_BLUE)
|
||||
curses.init_pair(2, curses.COLOR_BLUE, curses.COLOR_WHITE)
|
||||
curses.init_pair(3, curses.COLOR_RED, curses.COLOR_BLUE)
|
||||
global status_data, update_status
|
||||
curses.curs_set(0); stdscr.nodelay(1); stdscr.timeout(100)
|
||||
curses.start_color(); curses.init_pair(1, curses.COLOR_WHITE, curses.COLOR_BLUE); curses.init_pair(2, curses.COLOR_BLUE, curses.COLOR_WHITE); curses.init_pair(3, curses.COLOR_RED, curses.COLOR_BLUE)
|
||||
stdscr.bkgd(' ', curses.color_pair(1))
|
||||
|
||||
menu = ["Open Valve", "Close Valve", "Stop Valve", "Toggle Output 1", "Toggle Output 2", "Set Watchdog", "Firmware Update", "Exit"]
|
||||
|
|
@ -138,26 +162,27 @@ def main_menu(stdscr, slave_id):
|
|||
h, w = stdscr.getmaxyx()
|
||||
key = stdscr.getch()
|
||||
|
||||
if input_mode:
|
||||
if key in [10, 13]: # Enter
|
||||
with update_lock: is_updating = update_status["running"]
|
||||
|
||||
if is_updating:
|
||||
# Update display only, no input handling
|
||||
pass
|
||||
elif input_mode:
|
||||
if key in [10, 13]:
|
||||
try:
|
||||
value = int(input_str)
|
||||
client.write_register(input_target_reg, value, slave=slave_id)
|
||||
message = f"-> Set register 0x{input_target_reg:04X} to {value}"
|
||||
except Exception as e:
|
||||
message = f"-> Error: {e}"
|
||||
except Exception as e: message = f"-> Error: {e}"
|
||||
message_time, input_mode, input_str = time.time(), False, ""
|
||||
elif key == curses.KEY_BACKSPACE or key == 127:
|
||||
input_str = input_str[:-1]
|
||||
elif key != -1 and chr(key).isprintable():
|
||||
input_str += chr(key)
|
||||
else: # Navigation mode
|
||||
elif key == curses.KEY_BACKSPACE or key == 127: input_str = input_str[:-1]
|
||||
elif key != -1 and chr(key).isprintable(): input_str += chr(key)
|
||||
else:
|
||||
if key == curses.KEY_UP: current_row_idx = (current_row_idx - 1) % len(menu)
|
||||
elif key == curses.KEY_DOWN: current_row_idx = (current_row_idx + 1) % len(menu)
|
||||
elif key == curses.KEY_ENTER or key in [10, 13]:
|
||||
selected_option = menu[current_row_idx]
|
||||
message_time = time.time()
|
||||
|
||||
if selected_option == "Exit": stop_event.set(); continue
|
||||
elif selected_option == "Open Valve": client.write_register(REG_HOLDING_VALVE_COMMAND, 1, slave=slave_id); message = "-> Sent OPEN command"
|
||||
elif selected_option == "Close Valve": client.write_register(REG_HOLDING_VALVE_COMMAND, 2, slave=slave_id); message = "-> Sent CLOSE command"
|
||||
|
|
@ -166,83 +191,64 @@ def main_menu(stdscr, slave_id):
|
|||
bit = 0 if "1" in selected_option else 1
|
||||
try:
|
||||
current_val = client.read_holding_registers(REG_HOLDING_DIGITAL_OUTPUTS_STATE, count=1, slave=slave_id).registers[0]
|
||||
new_val = current_val ^ (1 << bit)
|
||||
client.write_register(REG_HOLDING_DIGITAL_OUTPUTS_STATE, new_val, slave=slave_id)
|
||||
client.write_register(REG_HOLDING_DIGITAL_OUTPUTS_STATE, current_val ^ (1 << bit), slave=slave_id)
|
||||
message = f"-> Toggled Output {bit+1}"
|
||||
except Exception as e: message = f"-> Error: {e}"
|
||||
elif selected_option == "Set Watchdog":
|
||||
input_mode, input_prompt, input_target_reg = True, "Enter Watchdog Timeout (s): ", REG_HOLDING_WATCHDOG_TIMEOUT_S
|
||||
elif selected_option == "Firmware Update":
|
||||
message = "-> Firmware update process not yet implemented."
|
||||
threading.Thread(target=firmware_update_thread, args=(slave_id, "firmware.bin"), daemon=True).start()
|
||||
|
||||
stdscr.clear()
|
||||
with status_lock: current_data = status_data.copy()
|
||||
|
||||
if current_data.get("error"):
|
||||
stdscr.addstr(0, 0, current_data["error"], curses.color_pair(3) | curses.A_BOLD)
|
||||
if is_updating:
|
||||
with update_lock:
|
||||
prog = update_status["progress"]
|
||||
msg = update_status["message"]
|
||||
stdscr.addstr(h // 2 - 1, w // 2 - 25, "FIRMWARE UPDATE IN PROGRESS", curses.A_BOLD | curses.color_pair(2))
|
||||
stdscr.addstr(h // 2, w // 2 - 25, f"[{'#' * int(prog * 50):<50}] {prog:.0%}")
|
||||
stdscr.addstr(h // 2 + 1, w // 2 - 25, msg.ljust(50))
|
||||
else:
|
||||
bold, normal = curses.color_pair(1) | curses.A_BOLD, curses.color_pair(1)
|
||||
# Status Area
|
||||
col1, col2, col3, col4 = 2, 30, 58, 88
|
||||
stdscr.addstr(1, col1, "State:", bold); stdscr.addstr(1, col1 + 18, str(current_data.get('state', 'N/A')), normal)
|
||||
stdscr.addstr(2, col1, "Movement:", bold); stdscr.addstr(2, col1 + 18, str(current_data.get('movement', 'N/A')), normal)
|
||||
stdscr.addstr(3, col1, "Motor Current:", bold); stdscr.addstr(3, col1 + 18, str(current_data.get('motor_current', 'N/A')), normal)
|
||||
|
||||
stdscr.addstr(1, col2, "Digital Inputs:", bold); stdscr.addstr(1, col2 + 18, str(current_data.get('digital_inputs', 'N/A')), normal)
|
||||
stdscr.addstr(2, col2, "Digital Outputs:", bold); stdscr.addstr(2, col2 + 18, str(current_data.get('digital_outputs', 'N/A')), normal)
|
||||
stdscr.addstr(3, col2, "Button Events:", bold); stdscr.addstr(3, col2 + 18, str(current_data.get('button_events', 'N/A')), normal)
|
||||
|
||||
stdscr.addstr(1, col3, "Max Open Time:", bold); stdscr.addstr(1, col3 + 16, str(current_data.get('open_time', 'N/A')), normal)
|
||||
stdscr.addstr(2, col3, "Max Close Time:", bold); stdscr.addstr(2, col3 + 16, str(current_data.get('close_time', 'N/A')), normal)
|
||||
stdscr.addstr(3, col3, "Watchdog:", bold); stdscr.addstr(3, col3 + 16, str(current_data.get('watchdog', 'N/A')), normal)
|
||||
|
||||
stdscr.addstr(1, col4, "Firmware:", bold); stdscr.addstr(1, col4 + 12, str(current_data.get('firmware', 'N/A')), normal)
|
||||
stdscr.addstr(2, col4, "Uptime:", bold); stdscr.addstr(2, col4 + 12, str(current_data.get('uptime', 'N/A')), normal)
|
||||
stdscr.addstr(3, col4, "Device Status:", bold); stdscr.addstr(3, col4 + 12, str(current_data.get('device_status', 'N/A')), normal)
|
||||
|
||||
stdscr.addstr(5, 0, "─" * (w - 1), normal)
|
||||
|
||||
for idx, row in enumerate(menu):
|
||||
x = w // 2 - (len(row) + 4) // 2
|
||||
y = h // 2 - len(menu) + (idx * 3)
|
||||
draw_button(stdscr, y, x, row, idx == current_row_idx)
|
||||
|
||||
if time.time() - message_time < 2.0:
|
||||
stdscr.addstr(h - 2, 0, message.ljust(w - 1), curses.color_pair(1) | curses.A_BOLD)
|
||||
|
||||
if input_mode:
|
||||
curses.curs_set(1)
|
||||
stdscr.addstr(h - 2, 0, (input_prompt + input_str).ljust(w-1), curses.color_pair(2))
|
||||
stdscr.move(h - 2, len(input_prompt) + len(input_str))
|
||||
else:
|
||||
curses.curs_set(0)
|
||||
|
||||
with status_lock: current_data = status_data.copy()
|
||||
if current_data.get("error"): stdscr.addstr(0, 0, current_data["error"], curses.color_pair(3) | curses.A_BOLD)
|
||||
else:
|
||||
bold, normal = curses.color_pair(1) | curses.A_BOLD, curses.color_pair(1)
|
||||
col1, col2, col3, col4 = 2, 30, 58, 88
|
||||
# Status display lines...
|
||||
stdscr.addstr(1, col1, "State:", bold); stdscr.addstr(1, col1 + 18, str(current_data.get('state', 'N/A')), normal)
|
||||
stdscr.addstr(2, col1, "Movement:", bold); stdscr.addstr(2, col1 + 18, str(current_data.get('movement', 'N/A')), normal)
|
||||
stdscr.addstr(3, col1, "Motor Current:", bold); stdscr.addstr(3, col1 + 18, str(current_data.get('motor_current', 'N/A')), normal)
|
||||
stdscr.addstr(1, col2, "Digital Inputs:", bold); stdscr.addstr(1, col2 + 18, str(current_data.get('digital_inputs', 'N/A')), normal)
|
||||
stdscr.addstr(2, col2, "Digital Outputs:", bold); stdscr.addstr(2, col2 + 18, str(current_data.get('digital_outputs', 'N/A')), normal)
|
||||
stdscr.addstr(3, col2, "Button Events:", bold); stdscr.addstr(3, col2 + 18, str(current_data.get('button_events', 'N/A')), normal)
|
||||
stdscr.addstr(1, col3, "Max Open Time:", bold); stdscr.addstr(1, col3 + 16, str(current_data.get('open_time', 'N/A')), normal)
|
||||
stdscr.addstr(2, col3, "Max Close Time:", bold); stdscr.addstr(2, col3 + 16, str(current_data.get('close_time', 'N/A')), normal)
|
||||
stdscr.addstr(3, col3, "Watchdog:", bold); stdscr.addstr(3, col3 + 16, str(current_data.get('watchdog', 'N/A')), normal)
|
||||
stdscr.addstr(1, col4, "Firmware:", bold); stdscr.addstr(1, col4 + 12, str(current_data.get('firmware', 'N/A')), normal)
|
||||
stdscr.addstr(2, col4, "Uptime:", bold); stdscr.addstr(2, col4 + 12, str(current_data.get('uptime', 'N/A')), normal)
|
||||
stdscr.addstr(3, col4, "Device Status:", bold); stdscr.addstr(3, col4 + 12, str(current_data.get('device_status', 'N/A')), normal)
|
||||
stdscr.addstr(5, 0, "─" * (w - 1), normal)
|
||||
for idx, row in enumerate(menu):
|
||||
draw_button(stdscr, h // 2 - len(menu) + (idx * 2), w // 2 - len(row) // 2, row, idx == current_row_idx)
|
||||
if time.time() - message_time < 2.0: stdscr.addstr(h - 2, 0, message.ljust(w - 1), curses.color_pair(1) | curses.A_BOLD)
|
||||
if input_mode:
|
||||
curses.curs_set(1); stdscr.addstr(h - 2, 0, (input_prompt + input_str).ljust(w-1), curses.color_pair(2)); stdscr.move(h - 2, len(input_prompt) + len(input_str))
|
||||
else: curses.curs_set(0)
|
||||
stdscr.refresh()
|
||||
|
||||
def main():
|
||||
global client
|
||||
parser = argparse.ArgumentParser(description="Modbus tool for irrigation system nodes.")
|
||||
parser.add_argument("port", help="Serial port (e.g., /dev/ttyACM0)")
|
||||
parser.add_argument("--baud", type=int, default=19200, help="Baud rate")
|
||||
parser.add_argument("--slave-id", type=int, default=1, help="Modbus slave ID")
|
||||
parser.add_argument("--interval", type=float, default=1.0, help="Polling interval (sec)")
|
||||
parser.add_argument("port", help="Serial port"); parser.add_argument("--baud", type=int, default=19200); parser.add_argument("--slave-id", type=int, default=1); parser.add_argument("--interval", type=float, default=1.0)
|
||||
args = parser.parse_args()
|
||||
|
||||
client = ModbusSerialClient(port=args.port, baudrate=args.baud, stopbits=1, bytesize=8, parity="N", timeout=1)
|
||||
if not client.connect():
|
||||
print(f"Error: Failed to connect to serial port {args.port}"); sys.exit(1)
|
||||
|
||||
if not client.connect(): print(f"Error: Failed to connect to serial port {args.port}"); sys.exit(1)
|
||||
print("Successfully connected. Starting UI..."); time.sleep(0.5)
|
||||
poll_thread = threading.Thread(target=poll_status, args=(args.slave_id, args.interval), daemon=True)
|
||||
poll_thread.start()
|
||||
|
||||
try:
|
||||
curses.wrapper(main_menu, args.slave_id)
|
||||
threading.Thread(target=poll_status, args=(args.slave_id, args.interval), daemon=True).start()
|
||||
try: curses.wrapper(main_menu, args.slave_id)
|
||||
finally:
|
||||
stop_event.set()
|
||||
print("\nExiting...")
|
||||
if client.is_socket_open(): client.close()
|
||||
poll_thread.join(timeout=2)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
main()
|
||||
Loading…
Reference in New Issue