Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save ma2shita/831eedf63e759b8a05e257201275a373 to your computer and use it in GitHub Desktop.
Save ma2shita/831eedf63e759b8a05e257201275a373 to your computer and use it in GitHub Desktop.
AWS IoT Core's Device shadow implementation using SORACOM Beam on M5Stack Basic/Gray + 3G ext. board
/*
Example | AWS IoT Core's Device shadow implementation using SORACOM Beam on M5Stack Basic/Gray + 3G ext. board
Copyright (c) 2022 Kohei "Max" MATSUSHITA ([email protected])
Released under the MIT license
https://opensource.org/licenses/mit-license.php
*/
#define _VERSION_ "0.9"
#define THING_NAME "mcu1"
#define SHADOW_NAME "peripheral"
#include <M5Stack.h>
#define SerialMon Serial
#define SerialAT Serial2 // Serial2 is 3G ext. module
#define TINY_GSM_MODEM_UBLOX
#include <TinyGsmClient.h>
TinyGsm modem(SerialAT);
TinyGsmClient ctx(modem);
#include <ArduinoJson.h>
#include <PubSubClient.h>
PubSubClient MqttClient;
struct MqttParams {
char id[64];
char shadow_prefix[256];
char reported_topic[256];
char delta_topic[256];
char get_topic[256];
char accepted_topic[256];
};
struct MqttParams mqtt_params;
#define TIMER0_INTERVAL_TICK 720 // sec (original 720)
volatile bool timer0_trigger; // Initial values are defined in `setup()`
volatile uint32_t timer0_tick = 0;
hw_timer_t* timer0 = NULL;
void IRAM_ATTR timer0_callback() {
timer0_tick++;
timer0_trigger = (bool) (timer0_tick >= TIMER0_INTERVAL_TICK); // trigger is evaluated within `loop()`
}
#define TIMER1_INTERVAL_TICK 3600 // sec (original 3600)
volatile bool timer1_trigger; // Initial values are defined in `setup()`
volatile uint32_t timer1_tick = 0;
hw_timer_t* timer1 = NULL;
void IRAM_ATTR timer1_callback() {
timer1_tick++;
timer1_trigger = (bool) (timer1_tick >= TIMER1_INTERVAL_TICK); // trigger is evaluated within `loop()`
}
volatile int32_t example_counter = 0; // for example
void report_state() {
SerialMon.println(F("> report_state()"));
SerialMon.println("(e.g.) Implement report the state of peripherals.");
int32_t val = example_counter; // Aka. read sensor data;
DynamicJsonDocument doc(2048);
JsonObject state = doc.createNestedObject("state");
state["reported"]["example_counter"] = val;
state["reported"]["welcome"] = nullptr; // default value for IoT Core. Unnecessary, so turn it off.
state["desired"] = nullptr; // Tips.
char payload[2048];
serializeJson(doc, payload, sizeof(payload));
SerialMon.print(F("> Report JSON: "));
SerialMon.println(payload);
MqttClient.publish(mqtt_params.reported_topic, payload);
}
void operate_peripheral(JsonObject state) {
SerialMon.println(F("> operate_peripheral()"));
if (!state) return; // forced termination.
serializeJson(state, SerialMon); SerialMon.println();
SerialMon.println("(e.g.) Implement operations on peripherals.");
if (!state["example_counter"].isNull()) {
example_counter = state["example_counter"].as<signed long>(); // Aka. operate sensor;
M5.Lcd.clear();
M5.Lcd.setCursor(0, 0);
M5.Lcd.println(F("EXAMPLE:"));
M5.Lcd.println(F(" Device Shadow"));
M5.Lcd.println(F("BtnA -1 | BtnC +1"));
M5.Lcd.println(F("BtnB report_state"));
M5.Lcd.println();
M5.Lcd.println(example_counter);
}
}
// Required global instances: SerialMon
void mqtt_subscriber_callback(const char* topic, byte* payload, unsigned int length) {
String buf_t = String(topic);
SerialMon.print(F("> Incoming: ")); SerialMon.println(buf_t);
payload[length] = '\0'; // https://hawksnowlog.blogspot.com/2017/06/convert-byte-array-to-string.html
String buf_p = String((char*) payload); // convert to String from char*
SerialMon.print(F("> Payload: ")); SerialMon.println(buf_p);
DynamicJsonDocument doc(2048);
DeserializationError error = deserializeJson(doc, buf_p);
if (error) {
SerialMon.print(F("> deserializeJson() failed: ")); SerialMon.println(error.c_str());
return; // forced termination.
}
if (buf_t.endsWith("/get/accepted")) { // Restore by shadow.
operate_peripheral(doc["state"]["reported"]);
operate_peripheral(doc["state"]["delta"]);
report_state(); // Finally, report back to shadow.
}
if (buf_t.endsWith("/update/delta")) { // Desired from IoT Core
operate_peripheral(doc["state"]);
report_state(); // Finally, report back to shadow.
}
}
// Required global instances: NONE
template <typename T> void connect_to_cellular_network(TinyGsm* modem, T* serialMon) {
long s = millis();
serialMon->print(F("> modem.restart(): "));
modem->restart();
serialMon->println(F("done."));
serialMon->print(F("> getModemInfo(): ")); serialMon->println(modem->getModemInfo());
serialMon->print(F("> getIMEI(): ")); serialMon->println(modem->getIMEI());
serialMon->print(F("> waitForNetwork(): "));
while (!modem->waitForNetwork()) serialMon->print(F("."));
serialMon->println(F("Ok."));
serialMon->print(F("> gprsConnect(soracom.io): "));
modem->gprsConnect("soracom.io", "sora", "sora");
serialMon->println(F("done."));
serialMon->print(F("> isNetworkConnected(): "));
while (!modem->isNetworkConnected()) serialMon->print(F("."));
serialMon->println(F("Ok."));
serialMon->print(F("> localIP(): ")); serialMon->println(modem->localIP());
long e = millis();
serialMon->print(F("> connect_to_cellular_network() elapsed(ms): ")); serialMon->println(e - s);
}
// Required global instances: NONE
template <typename T> bool is_cellular_network_connected(TinyGsm* modem, T* serialMon) {
bool result = modem->isGprsConnected();
if (!result) serialMon->println(F("> isGprsConnected(): false"));
return result;
}
// Required global instances: MqttParams mqtt_params
template <typename T> void set_mqtt_id_and_topics(const char* thing_name, const char* shadow_name, T* serialMon) {
sprintf_P(mqtt_params.id, PSTR("%s"), thing_name);
serialMon->print(F("> MQTT_ID: ")); serialMon->println(mqtt_params.id);
sprintf_P(mqtt_params.shadow_prefix, PSTR("$aws/things/%s/shadow/name/%s"), thing_name, shadow_name);
serialMon->print(F("> shadow_prefix: ")); serialMon->println(mqtt_params.shadow_prefix);
sprintf_P(mqtt_params.reported_topic, PSTR("%s/update"), mqtt_params.shadow_prefix);
serialMon->print(F("> reported_topic: ")); serialMon->println(mqtt_params.reported_topic);
sprintf_P(mqtt_params.delta_topic, PSTR("%s/update/delta"), mqtt_params.shadow_prefix);
serialMon->print(F("> delta_topic: ")); serialMon->println(mqtt_params.delta_topic);
sprintf_P(mqtt_params.get_topic, PSTR("%s/get"), mqtt_params.shadow_prefix);
serialMon->print(F("> get_topic: ")); serialMon->println(mqtt_params.get_topic);
sprintf_P(mqtt_params.accepted_topic, PSTR("%s/get/accepted"), mqtt_params.shadow_prefix);
serialMon->print(F("> accepted_topic: ")); serialMon->println(mqtt_params.accepted_topic);
}
// Required global instances: PubSubClient MqttClient, MqttParams mqtt_params, Function `mqtt_subscriber_callback()`
template <typename T, typename U> void connect_to_mqtt_broker(T* ctx, U* serialMon) {
MqttClient.setServer("beam.soracom.io", 1883);
MqttClient.setClient(*ctx);
MqttClient.setBufferSize(1024);
MqttClient.setCallback(mqtt_subscriber_callback);
if (!MqttClient.connect(mqtt_params.id)) {
serialMon->println(F("> MqttClient.connect() failed."));
serialMon->print(F("> MqttClient.state(): ")); serialMon->println(MqttClient.state());
long e = millis();
return; // forced termination.
}
MqttClient.subscribe(mqtt_params.delta_topic);
MqttClient.subscribe(mqtt_params.accepted_topic);
}
// Required global instances: PubSubClient MqttClient
template <typename T> bool is_mqtt_broker_connected(T* serialMon) {
bool result = MqttClient.connected();
if (!result) serialMon->println(F("> MqttClient.connected(): false"));
return result;
}
// Required global instances: M5, SerialMon
void mcu_restart() {
SerialMon.println("> MCU_RESTART");
delay(3000); // waiting for flush of serial buffer
M5.Power.reset();
}
void setup() {
delay(1000);
M5.begin();
SerialMon.begin(115200);
SerialMon.println();
SerialMon.println(F("> --- START ---------------------------------------------------"));
SerialMon.println(_VERSION_);
M5.Lcd.wakeup();
M5.Lcd.setBrightness(40);
M5.Lcd.setTextSize(3);
M5.Lcd.fillScreen(TFT_BLACK);
M5.Lcd.clear();
M5.Lcd.setCursor(0, 0);
M5.Lcd.println(F("Connecting..."));
SerialAT.begin(115200, SERIAL_8N1, 16, 17); // 3G ext. module
connect_to_cellular_network(&modem, &SerialMon);
set_mqtt_id_and_topics(THING_NAME, SHADOW_NAME, &SerialMon);
connect_to_mqtt_broker(&ctx, &SerialMon);
timer0_trigger = false; // not fire with bootup
timer0 = timerBegin(0, 80, true);
timerAlarmWrite(timer0, 1000000, true);
timerAttachInterrupt(timer0, &timer0_callback, true);
timerAlarmEnable(timer0);
timer1_trigger = true; // fire with bootup
timer1 = timerBegin(1, 80, true);
timerAlarmWrite(timer1, 1000000, true);
timerAttachInterrupt(timer1, &timer1_callback, true);
timerAlarmEnable(timer1);
}
#define LOOP_INTERVAL_MS 50 // msec
void loop() {
M5.update();
if (!(is_cellular_network_connected(&modem, &SerialMon) && is_mqtt_broker_connected(&SerialMon))) {
connect_to_cellular_network(&modem, &SerialMon);
connect_to_mqtt_broker(&ctx, &SerialMon);
}
if (M5.BtnB.wasPressed()) timer0_trigger = true; // If you wanna actively send
if (timer0_trigger) {
SerialMon.println(F("> Periodically send the state for 'digital twin'."));
report_state();
timer0_trigger = false; // reset
timer0_tick = 0; // reset
}
if (timer1_trigger) {
SerialMon.println(F("> Periodically get the shadow for 'digital twin'."));
MqttClient.publish(mqtt_params.get_topic, "{}");
timer1_trigger = false; // reset
timer1_tick = 0; // reset
}
int32_t prev_example_counter = example_counter;
if (M5.BtnA.wasPressed()) example_counter--;
if (M5.BtnC.wasPressed()) example_counter++;
if (prev_example_counter != example_counter) { // is_modified ?
DynamicJsonDocument doc(128);
JsonObject state = doc.createNestedObject("state");
state["example_counter"] = example_counter;
operate_peripheral(state);
// report_state(); // If you wanna send at the same time as the change
}
unsigned long next = millis();
while (millis() < next + LOOP_INTERVAL_MS) MqttClient.loop();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment