Last active
July 9, 2025 07:37
-
-
Save cpq/ae11ec3073067325da31398fa7361d00 to your computer and use it in GitHub Desktop.
Mongoose Wizard - MQTT client with device management
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// This MQTT client implements device management with | |
// a device dashboard, remote firmware update and device control over MQTT. | |
// | |
// Usage: | |
// 1. Visit https://mongoose.ws/wizard. Enable MQTT, generate project | |
// 2. Copy-paste this code into your main.c | |
// 3. After mongoose_init() call, add initialization: | |
// struct mongoose_mqtt_handlers mqtt_handlers = { | |
// my_mqtt_connect, my_mqtt_tls_init, my_mqtt_on_connect, | |
// my_mqtt_on_message, my_mqtt_on_cmd, | |
// }; | |
// mongoose_set_mqtt_handlers(&mqtt_handlers); | |
// 4. Build, flash the device and run serial console to see the logs | |
// 5. Visit https://mongoose.ws/mqtt-dashboard/ | |
#define TLS_CA "" | |
#define FIRMWARE_VERSION "1.0.0" | |
static struct mg_rpc *s_rpc = NULL; // List of registered RPC methods | |
static uint8_t s_qos = 1; // MQTT QoS | |
static char s_device_id[40]; // Unique device ID | |
static const char *s_topic_prefix = "mg_mqtt_dashboard"; | |
struct device_state { | |
bool led_status; | |
char firmware_version[20]; | |
}; | |
static struct device_state s_device_state = {false, FIRMWARE_VERSION}; | |
static char *make_topic_name(char *buf, size_t len, const char *suffix) { | |
if (s_device_id[0] == '\0') { | |
mg_snprintf(s_device_id, sizeof(s_device_id), "device_%llu", mg_now()); | |
} | |
mg_snprintf(buf, len, "%s/%s/%s", s_topic_prefix, s_device_id, suffix); | |
return buf; | |
} | |
static void publish_response(struct mg_connection *c, char *buf, size_t len) { | |
struct mg_mqtt_opts pub_opts; | |
char topic[100]; | |
memset(&pub_opts, 0, sizeof(pub_opts)); | |
pub_opts.topic = mg_str(make_topic_name(topic, sizeof(topic), "tx")); | |
pub_opts.message = mg_str_n(buf, len); | |
pub_opts.qos = s_qos; | |
mg_mqtt_pub(c, &pub_opts); | |
} | |
static void publish_status(struct mg_connection *c) { | |
char topic[100]; | |
struct mg_mqtt_opts pub_opts; | |
struct mg_iobuf io = {0, 0, 0, 256}; | |
// Print JSON notification into the io buffer | |
mg_xprintf( | |
mg_pfn_iobuf, &io, | |
"{%m:%m,%m:{%m:%m,%m:%s,%m:%m}}", // | |
MG_ESC("method"), MG_ESC("status.notify"), MG_ESC("params"), // | |
MG_ESC("status"), MG_ESC("online"), // | |
MG_ESC("led_status"), s_device_state.led_status ? "true" : "false", // | |
MG_ESC("firmware_version"), MG_ESC(s_device_state.firmware_version)); | |
memset(&pub_opts, 0, sizeof(pub_opts)); | |
pub_opts.topic = mg_str(make_topic_name(topic, sizeof(topic), "status")); | |
pub_opts.message = mg_str_n((char *) io.buf, io.len); | |
pub_opts.qos = s_qos; | |
pub_opts.retain = true; | |
mg_mqtt_pub(c, &pub_opts); | |
mg_iobuf_free(&io); | |
} | |
static void rpc_state_set(struct mg_rpc_req *r) { | |
mg_json_get_bool(r->frame, "$.params.led_status", &s_device_state.led_status); | |
mg_rpc_ok(r, "true"); | |
// Here, syncronise the s_device_state with the hardware. For example, | |
// gpio_set_level(LED_GPIO, s_device_state.led_status); | |
} | |
static void rpc_ota_upload(struct mg_rpc_req *r) { | |
long ofs = mg_json_get_long(r->frame, "$.params.offset", -1); | |
long tot = mg_json_get_long(r->frame, "$.params.total", -1); | |
int len = 0; | |
char *buf = mg_json_get_b64(r->frame, "$.params.chunk", &len); | |
if (buf == NULL) { | |
mg_rpc_err(r, 1, "Error processing the binary chunk."); | |
} else { | |
if (ofs < 0 || tot < 0) { | |
mg_rpc_err(r, 1, "offset and total not set"); | |
} else if (ofs == 0 && mg_ota_begin((size_t) tot) == false) { | |
mg_rpc_err(r, 1, "mg_ota_begin(%ld) failed\n", tot); | |
mg_ota_end(); | |
} else if (len > 0 && mg_ota_write(buf, len) == false) { | |
mg_rpc_err(r, 1, "mg_ota_write(%lu) @%ld failed\n", len, ofs); | |
mg_ota_end(); | |
} else if (ofs + len >= tot && mg_ota_end() == false) { | |
mg_rpc_err(r, 1, "mg_ota_end() failed\n", tot); | |
} else { | |
mg_rpc_ok(r, "%m", MG_ESC("ok")); | |
} | |
mg_free(buf); | |
} | |
} | |
void my_mqtt_tls_init(struct mg_connection *c) { | |
bool is_tls = mg_url_is_ssl(WIZARD_MQTT_URL); | |
MG_DEBUG(("%lu TLS enabled: %s", c->id, is_tls ? "yes" : "no")); | |
if (is_tls) { | |
struct mg_tls_opts opts; | |
memset(&opts, 0, sizeof(opts)); | |
opts.ca = mg_str(TLS_CA); | |
opts.name = mg_url_host(WIZARD_MQTT_URL); | |
mg_tls_init(c, &opts); | |
} | |
} | |
// Called when we connected to the MQTT server | |
void my_mqtt_on_connect(struct mg_connection *c, int code) { | |
char topic[100]; | |
struct mg_mqtt_opts opts; | |
memset(&opts, 0, sizeof(opts)); | |
opts.qos = 1; | |
opts.topic = mg_str(make_topic_name(topic, sizeof(topic), "rx")); | |
mg_mqtt_sub(c, &opts); | |
publish_status(c); | |
MG_DEBUG(("%lu code %d. Subscribing to [%.*s]", c->id, code, opts.topic.len, | |
opts.topic.buf)); | |
MG_INFO(("Visit https://mongoose.ws/mqtt-dashboard/")); | |
} | |
// This function gets called for every received MQTT message | |
void my_mqtt_on_message(struct mg_connection *c, struct mg_str topic, | |
struct mg_str data) { | |
// struct mg_mqtt_message *mm = (struct mg_mqtt_message *) ev_data; | |
struct mg_iobuf io = {0, 0, 0, 512}; | |
struct mg_rpc_req r = {&s_rpc, NULL, mg_pfn_iobuf, | |
&io, NULL, {data.buf, data.len}}; | |
size_t clipped_len = data.len > 512 ? 512 : data.len; | |
MG_INFO(("%lu RECEIVED %.*s <- %.*s", c->id, clipped_len, data.buf, topic.len, | |
topic.buf)); | |
mg_rpc_process(&r); | |
if (io.buf) { | |
publish_response(c, (char *) io.buf, io.len); | |
publish_status(c); | |
} | |
mg_iobuf_free(&io); | |
} | |
void my_mqtt_on_cmd(struct mg_connection *c, struct mg_mqtt_message *mm) { | |
MG_DEBUG(("%lu cmd %d qos %d", c->id, mm->cmd, mm->qos)); | |
} | |
struct mg_connection *my_mqtt_connect(mg_event_handler_t fn) { | |
const char *url = WIZARD_MQTT_URL; | |
char topic[100], message[100]; | |
struct mg_mqtt_opts opts; | |
memset(&opts, 0, sizeof(opts)); | |
mg_snprintf(message, sizeof(message), "{%m:%m,%m:{%m:%m}}", MG_ESC("method"), | |
MG_ESC("status.notify"), MG_ESC("params"), MG_ESC("status"), | |
MG_ESC("offline")); | |
opts.clean = true; | |
opts.qos = s_qos; | |
opts.topic = mg_str(make_topic_name(topic, sizeof(topic), "status")); | |
opts.version = 4; | |
// opts.keepalive = MQTT_KEEPALIVE_SEC; | |
opts.retain = true; | |
opts.message = mg_str(message); | |
if (s_rpc == NULL) { | |
mg_rpc_add(&s_rpc, mg_str("state.set"), rpc_state_set, NULL); | |
mg_rpc_add(&s_rpc, mg_str("ota.upload"), rpc_ota_upload, NULL); | |
} | |
return mg_mqtt_connect(&g_mgr, url, &opts, fn, NULL); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment