Refactor: Move MQTT client logic to mqtt_client.c and cleanup main.c

This commit is contained in:
Eduard Iten 2025-07-17 08:54:55 +02:00
parent febaa06799
commit 85892deb2b
3 changed files with 171 additions and 195 deletions

View File

@ -8,52 +8,27 @@
LOG_MODULE_REGISTER(ha_mqtt_switch, CONFIG_LOG_DEFAULT_LEVEL);
#include <zephyr/kernel.h>
#include <zephyr/net/socket.h>
#include <zephyr/net/mqtt.h>
#include <zephyr/random/random.h>
#include <zephyr/posix/poll.h>
#include <zephyr/net/net_ip.h>
#include <zephyr/settings/settings.h>
#include <zephyr/shell/shell.h>
#include <zephyr/data/json.h>
#include <zephyr/device.h>
#include <zephyr/drivers/gpio.h>
#include <zephyr/net/net_if.h>
#include <zephyr/net/net_mgmt.h>
#include <zephyr/drivers/hwinfo.h>
#include <zephyr/random/random.h> // For sys_rand32_get
#include <string.h> // For strlen, strncpy
#include "mqtt_client.h"
#include "mqtt_client_shell.h"
#define HA_DISCOVERY_PREFIX "homeassistant"
#define MQTT_CLIENTID "ha-switch-zephyr"
#define MQTT_BROKER_HOSTNAME CONFIG_HA_MQTT_BROKER_HOSTNAME
#define MQTT_BROKER_PORT CONFIG_HA_MQTT_BROKER_PORT
#define APP_CONNECT_TIMEOUT_MS 2000
#define APP_SLEEP_MSECS 500
#define APP_MQTT_BUFFER_SIZE 2048
#define SWITCH_NODE DT_ALIAS(switch0)
#define UUID_MAX_LEN 40
#define HA_MQTT_STR_MAX_LEN 48
// name, uuid, state, state_changed, mqtt_running, broker_host are now extern in mqtt_client.h
#define TOPIC_BUF_LEN 128
static const struct gpio_dt_spec sw = GPIO_DT_SPEC_GET_OR(SWITCH_NODE, gpios, {0});
static struct gpio_callback switch_cb_data;
static uint8_t rx_buffer[APP_MQTT_BUFFER_SIZE];
static struct sockaddr_storage broker;
// connected is only used locally in main.c
static bool connected;
void switch_callback(const struct device *dev, struct gpio_callback *cb, uint32_t pins)
@ -286,104 +261,5 @@ int main(void)
}
}
// Im Main-Thread: MQTT-Start zentralisiert
err = ha_mqtt_start();
if (err) {
LOG_ERR("Failed to start MQTT client: %d", err);
return 0;
}
while (true) {
err = wait(APP_CONNECT_TIMEOUT_MS);
if (err) {
LOG_ERR("Failed to wait for MQTT client: %d", err);
err = mqtt_disconnect(&client_ctx, NULL);
if (err) {
LOG_ERR("Failed to disconnect MQTT client: %d", err);
}
clear_fds();
return 0;
}
err = mqtt_input(&client_ctx);
if (err) {
LOG_ERR("Failed to process MQTT input: %d", err);
continue;
}
if (!connected) {
continue;
}
err = ha_publish_discovery_document(&client_ctx);
if (err) {
LOG_ERR("Failed to publish discovery document: %d", err);
} else {
LOG_INF("Published discovery document");
}
char cmd_topic[TOPIC_BUF_LEN];
build_topic(cmd_topic, sizeof(cmd_topic), "zephyr/%s/switch/set");
err = subscribe(&client_ctx, cmd_topic);
if (err) {
LOG_ERR("Failed to subscribe to topic: %d", err);
} else {
LOG_INF("Subscribed to topic %s", cmd_topic);
}
// Break if MQTT client is stopped externally (e.g., via shell)
if (!mqtt_running) {
break;
}
break;
}
while (true) {
if (!mqtt_running) {
break;
}
err = wait(APP_SLEEP_MSECS);
if (err && err != -EAGAIN) {
LOG_ERR("Failed to wait for MQTT client: %d", err);
break;
}
err = mqtt_input(&client_ctx);
if (err) {
LOG_ERR("Failed to process MQTT input: %d", err);
continue;
}
err = mqtt_live(&client_ctx);
if (err && err != -EAGAIN) {
LOG_ERR("Failed to send MQTT keepalive: %d", err);
break;
} else if (err == 0) {
LOG_DBG("Sent MQTT keepalive (PINGREQ)");
}
if (state_changed) {
char state_topic[TOPIC_BUF_LEN];
snprintf(state_topic, sizeof(state_topic), HA_DISCOVERY_PREFIX "/switch/%s/state", uuid);
err = publish(&client_ctx, MQTT_QOS_0_AT_MOST_ONCE, state_topic, state ? "ON" : "OFF");
if (err) {
LOG_ERR("Failed to publish switch state: %d", err);
} else {
LOG_INF("Published switch state: %s", state ? "ON" : "OFF");
state_changed = false;
}
}
}
err = mqtt_disconnect(&client_ctx, NULL);
if (err) {
LOG_ERR("Failed to disconnect MQTT client: %d", err);
}
mqtt_running = false;
clear_fds();
return 0;
}

View File

@ -1,9 +1,3 @@
// ...existing includes...
#include <stdbool.h>
#include <zephyr/posix/poll.h>
#include <zephyr/net/socket.h>
#include <stdbool.h>
#include <zephyr/posix/poll.h>
#include <zephyr/net/socket.h>
@ -23,6 +17,8 @@
#include <zephyr/drivers/hwinfo.h>
LOG_MODULE_REGISTER(mqtt_client, LOG_LEVEL_DBG);
#define HA_DISCOVERY_PREFIX "homeassistant"
#define MQTT_CLIENTID "ha-switch-zephyr"
#define APP_CONNECT_TIMEOUT_MS 2000
#define APP_SLEEP_MSECS 500
@ -34,22 +30,20 @@ LOG_MODULE_REGISTER(mqtt_client, LOG_LEVEL_DBG);
static struct pollfd fds[1];
static int nfds;
static bool connected;
static struct mqtt_utf8 mqtt_username;
static struct mqtt_utf8 mqtt_password;
static uint8_t rx_buffer[APP_MQTT_BUFFER_SIZE];
static uint8_t tx_buffer[APP_MQTT_BUFFER_SIZE];
struct mqtt_client client_ctx;
static struct sockaddr_storage broker;
// Only one definition for each global variable
char broker_host[BROKER_HOST_MAX_LEN] = "127.0.0.1"; // Default broker host, replace as needed
bool mqtt_running = false;
void mqtt_evt_handler(struct mqtt_client *client, const struct mqtt_evt *evt) {
// TODO: Implement event handler logic or leave as stub if implemented elsewhere
}
bool state = false;
bool state_changed = false;
// ...existing includes...
char name[HA_MQTT_STR_MAX_LEN] = CONFIG_HA_MQTT_NAME;
char uuid[UUID_MAX_LEN] = {0};
static void prepare_fds(struct mqtt_client *client)
{
if (client->transport.type == MQTT_TRANSPORT_NON_SECURE) {
@ -103,48 +97,6 @@ void mqtt_auth_init(void)
mqtt_password.size = strlen(password_buf);
}
// ...existing code...
#include "mqtt_client.h"
#include <zephyr/logging/log.h>
#include <zephyr/net/socket.h>
#include <zephyr/net/mqtt.h>
#include <zephyr/random/random.h>
#include <zephyr/posix/poll.h>
#include <zephyr/net/net_ip.h>
#include <zephyr/settings/settings.h>
#include <zephyr/data/json.h>
#include <zephyr/device.h>
#include <zephyr/drivers/gpio.h>
#include <zephyr/net/net_if.h>
#include <zephyr/net/net_mgmt.h>
#include <zephyr/drivers/hwinfo.h>
#include <string.h>
// ...existing code...
#define HA_DISCOVERY_PREFIX "homeassistant"
#define MQTT_CLIENTID "ha-switch-zephyr"
#define APP_CONNECT_TIMEOUT_MS 2000
#define APP_SLEEP_MSECS 500
#define APP_MQTT_BUFFER_SIZE 2048
#define UUID_MAX_LEN 40
#define HA_MQTT_STR_MAX_LEN 48
#define TOPIC_BUF_LEN 128
#define BROKER_HOST_MAX_LEN 64
char name[HA_MQTT_STR_MAX_LEN] = CONFIG_HA_MQTT_NAME;
char uuid[UUID_MAX_LEN] = {0};
// ...existing code...
// ...existing code...
int mqtt_client_init_all(void) {
// Placeholder for future expansion
return 0;
}
int client_init(void)
{
int err;
@ -206,7 +158,6 @@ int subscribe(struct mqtt_client *client, const char *topic)
return mqtt_subscribe(client, &sub_list);
}
void build_topic(char *buf, size_t buflen, const char *fmt)
{
int n = snprintf(buf, buflen, fmt, uuid);
@ -291,6 +242,157 @@ int ha_publish_discovery_document(struct mqtt_client *client)
return publish(client, MQTT_QOS_0_AT_MOST_ONCE, disc_topic, payload);
}
void mqtt_evt_handler(struct mqtt_client *client, const struct mqtt_evt *evt)
{
int err;
switch (evt->type) {
case MQTT_EVT_CONNACK:
if (evt->result != 0) {
LOG_ERR("MQTT connect failed: %d", evt->result);
mqtt_running = false;
break;
}
LOG_INF("MQTT client connected!");
// Publish discovery document and subscribe to command topic on successful connection
err = ha_publish_discovery_document(client);
if (err) {
LOG_ERR("Failed to publish discovery document: %d", err);
} else {
LOG_INF("Published discovery document");
}
char cmd_topic[TOPIC_BUF_LEN];
build_topic(cmd_topic, sizeof(cmd_topic), HA_DISCOVERY_PREFIX "/switch/%s/set");
err = subscribe(client, cmd_topic);
if (err) {
LOG_ERR("Failed to subscribe to topic: %d", err);
} else {
LOG_INF("Subscribed to topic %s", cmd_topic);
}
break;
case MQTT_EVT_DISCONNECT:
LOG_INF("MQTT client disconnected: %d", evt->result);
mqtt_running = false;
clear_fds();
break;
case MQTT_EVT_PUBLISH:
LOG_INF("MQTT PUBLISH event received!");
const struct mqtt_publish_param *p = &evt->param.publish;
char payload_str[APP_MQTT_BUFFER_SIZE]; // Use a fixed-size buffer
char topic_str[TOPIC_BUF_LEN]; // Use a fixed-size buffer
if (p->message.payload.len >= sizeof(payload_str)) {
LOG_ERR("Payload too long for buffer");
break;
}
memcpy(payload_str, p->message.payload.data, p->message.payload.len);
payload_str[p->message.payload.len] = '\0';
if (p->message.topic.topic.size >= sizeof(topic_str)) {
LOG_ERR("Topic too long for buffer");
break;
}
memcpy(topic_str, p->message.topic.topic.utf8, p->message.topic.topic.size);
topic_str[p->message.topic.topic.size] = '\0';
LOG_INF("Topic: %s, Payload: %s", topic_str, payload_str);
// Handle switch command
char cmd_topic_expected[TOPIC_BUF_LEN];
build_topic(cmd_topic_expected, sizeof(cmd_topic_expected), HA_DISCOVERY_PREFIX "/switch/%s/set");
if (strcmp(topic_str, cmd_topic_expected) == 0) {
if (strcmp(payload_str, "ON") == 0) {
state = true;
state_changed = true;
LOG_INF("Switch state set to ON (via MQTT)");
} else if (strcmp(payload_str, "OFF") == 0) {
state = false;
state_changed = true;
LOG_INF("Switch state set to OFF (via MQTT)");
} else {
LOG_WRN("Unknown command payload: %s", payload_str);
}
}
break;
case MQTT_EVT_SUBACK:
LOG_INF("MQTT SUBACK event received!");
break;
case MQTT_EVT_UNSUBACK:
LOG_INF("MQTT UNSUBACK event received!");
break;
case MQTT_EVT_PUBACK:
LOG_INF("MQTT PUBACK event received!");
break;
case MQTT_EVT_PUBREC:
LOG_INF("MQTT PUBREC event received!");
break;
case MQTT_EVT_PUBREL:
LOG_INF("MQTT PUBREL event received!");
break;
case MQTT_EVT_PUBCOMP:
LOG_INF("MQTT PUBCOMP event received!");
break;
default:
LOG_DBG("Unhandled MQTT event type: %d", evt->type);
break;
}
}
void mqtt_client_run_loop(void)
{
int err;
while (mqtt_running) {
err = wait(APP_SLEEP_MSECS);
if (err && err != -EAGAIN) {
LOG_ERR("Failed to wait for MQTT client: %d", err);
break;
}
err = mqtt_input(&client_ctx);
if (err) {
LOG_ERR("Failed to process MQTT input: %d", err);
// Continue to allow for potential recovery or disconnect event
}
err = mqtt_live(&client_ctx);
if (err && err != -EAGAIN) {
LOG_ERR("Failed to send MQTT keepalive: %d", err);
break;
} else if (err == 0) {
LOG_DBG("Sent MQTT keepalive (PINGREQ)");
}
if (state_changed) {
char state_topic[TOPIC_BUF_LEN];
snprintf(state_topic, sizeof(state_topic), HA_DISCOVERY_PREFIX "/switch/%s/state", uuid);
err = publish(&client_ctx, MQTT_QOS_0_AT_MOST_ONCE, state_topic, state ? "ON" : "OFF");
if (err) {
LOG_ERR("Failed to publish switch state: %d", err);
} else {
LOG_INF("Published switch state: %s", state ? "ON" : "OFF");
state_changed = false;
}
}
}
// Clean up after loop exits (e.g., mqtt_running becomes false)
err = mqtt_disconnect(&client_ctx, NULL);
if (err) {
LOG_ERR("Failed to disconnect MQTT client: %d", err);
}
clear_fds();
LOG_INF("MQTT client run loop terminated.");
}
int ha_mqtt_start(void)
{
int err = client_init();
@ -310,6 +412,12 @@ int ha_mqtt_start(void)
LOG_INF("Connected to MQTT broker at %s:%d", ip_str, ntohs(broker4->sin_port));
}
prepare_fds(&client_ctx);
// Start the MQTT client run loop in a separate thread or manage it here
// For now, we'll call it directly, assuming it's blocking until mqtt_running is false
// In a real application, you might want to k_thread_create this.
mqtt_client_run_loop();
return 0;
}
@ -319,13 +427,8 @@ int ha_mqtt_stop(void)
LOG_INF("MQTT client is not running.");
return 0;
}
int err = mqtt_disconnect(&client_ctx, NULL);
if (err) {
LOG_ERR("Failed to disconnect MQTT client: %d", err);
return err;
}
clear_fds();
// Setting mqtt_running to false will cause the mqtt_client_run_loop to exit
mqtt_running = false;
LOG_INF("MQTT client stopped.");
LOG_INF("MQTT client stop requested.");
return 0;
}

View File

@ -10,6 +10,9 @@ extern char broker_host[BROKER_HOST_MAX_LEN];
extern bool mqtt_running;
extern bool state;
extern bool state_changed;
extern char uuid[];
extern char name[];
extern char broker_host[];
#include <zephyr/net/mqtt.h>
#include <zephyr/kernel.h>
@ -24,19 +27,13 @@ int ha_mqtt_stop(void);
int ha_publish_discovery_document(struct mqtt_client *client);
int subscribe(struct mqtt_client *client, const char *topic);
void mqtt_evt_handler(struct mqtt_client *client, const struct mqtt_evt *evt);
void mqtt_client_run_loop(void);
void build_topic(char *buf, size_t buflen, const char *fmt);
void clear_fds(void);
int wait(int timeout);
int publish(struct mqtt_client *client, enum mqtt_qos qos, const char *topic, const char *payload);
extern bool mqtt_running;
extern bool state;
extern bool state_changed;
extern char uuid[];
extern char name[];
extern char broker_host[];
extern struct mqtt_client client_ctx;
#ifdef __cplusplus