Last active
March 18, 2022 15:45
-
-
Save ma2shita/831eedf63e759b8a05e257201275a373 to your computer and use it in GitHub Desktop.
AWS IoT Core's Device shadow implementation using SORACOM Beam on M5Stack Basic/Gray + 3G ext. board
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
Example | AWS IoT Core's Device shadow implementation using SORACOM Beam on M5Stack Basic/Gray + 3G ext. board | |
Copyright (c) 2022 Kohei "Max" MATSUSHITA ([email protected]) | |
Released under the MIT license | |
https://opensource.org/licenses/mit-license.php | |
*/ | |
#define _VERSION_ "0.9" | |
#define THING_NAME "mcu1" | |
#define SHADOW_NAME "peripheral" | |
#include <M5Stack.h> | |
#define SerialMon Serial | |
#define SerialAT Serial2 // Serial2 is 3G ext. module | |
#define TINY_GSM_MODEM_UBLOX | |
#include <TinyGsmClient.h> | |
TinyGsm modem(SerialAT); | |
TinyGsmClient ctx(modem); | |
#include <ArduinoJson.h> | |
#include <PubSubClient.h> | |
PubSubClient MqttClient; | |
struct MqttParams { | |
char id[64]; | |
char shadow_prefix[256]; | |
char reported_topic[256]; | |
char delta_topic[256]; | |
char get_topic[256]; | |
char accepted_topic[256]; | |
}; | |
struct MqttParams mqtt_params; | |
#define TIMER0_INTERVAL_TICK 720 // sec (original 720) | |
volatile bool timer0_trigger; // Initial values are defined in `setup()` | |
volatile uint32_t timer0_tick = 0; | |
hw_timer_t* timer0 = NULL; | |
void IRAM_ATTR timer0_callback() { | |
timer0_tick++; | |
timer0_trigger = (bool) (timer0_tick >= TIMER0_INTERVAL_TICK); // trigger is evaluated within `loop()` | |
} | |
#define TIMER1_INTERVAL_TICK 3600 // sec (original 3600) | |
volatile bool timer1_trigger; // Initial values are defined in `setup()` | |
volatile uint32_t timer1_tick = 0; | |
hw_timer_t* timer1 = NULL; | |
void IRAM_ATTR timer1_callback() { | |
timer1_tick++; | |
timer1_trigger = (bool) (timer1_tick >= TIMER1_INTERVAL_TICK); // trigger is evaluated within `loop()` | |
} | |
volatile int32_t example_counter = 0; // for example | |
void report_state() { | |
SerialMon.println(F("> report_state()")); | |
SerialMon.println("(e.g.) Implement report the state of peripherals."); | |
int32_t val = example_counter; // Aka. read sensor data; | |
DynamicJsonDocument doc(2048); | |
JsonObject state = doc.createNestedObject("state"); | |
state["reported"]["example_counter"] = val; | |
state["reported"]["welcome"] = nullptr; // default value for IoT Core. Unnecessary, so turn it off. | |
state["desired"] = nullptr; // Tips. | |
char payload[2048]; | |
serializeJson(doc, payload, sizeof(payload)); | |
SerialMon.print(F("> Report JSON: ")); | |
SerialMon.println(payload); | |
MqttClient.publish(mqtt_params.reported_topic, payload); | |
} | |
void operate_peripheral(JsonObject state) { | |
SerialMon.println(F("> operate_peripheral()")); | |
if (!state) return; // forced termination. | |
serializeJson(state, SerialMon); SerialMon.println(); | |
SerialMon.println("(e.g.) Implement operations on peripherals."); | |
if (!state["example_counter"].isNull()) { | |
example_counter = state["example_counter"].as<signed long>(); // Aka. operate sensor; | |
M5.Lcd.clear(); | |
M5.Lcd.setCursor(0, 0); | |
M5.Lcd.println(F("EXAMPLE:")); | |
M5.Lcd.println(F(" Device Shadow")); | |
M5.Lcd.println(F("BtnA -1 | BtnC +1")); | |
M5.Lcd.println(F("BtnB report_state")); | |
M5.Lcd.println(); | |
M5.Lcd.println(example_counter); | |
} | |
} | |
// Required global instances: SerialMon | |
void mqtt_subscriber_callback(const char* topic, byte* payload, unsigned int length) { | |
String buf_t = String(topic); | |
SerialMon.print(F("> Incoming: ")); SerialMon.println(buf_t); | |
payload[length] = '\0'; // https://hawksnowlog.blogspot.com/2017/06/convert-byte-array-to-string.html | |
String buf_p = String((char*) payload); // convert to String from char* | |
SerialMon.print(F("> Payload: ")); SerialMon.println(buf_p); | |
DynamicJsonDocument doc(2048); | |
DeserializationError error = deserializeJson(doc, buf_p); | |
if (error) { | |
SerialMon.print(F("> deserializeJson() failed: ")); SerialMon.println(error.c_str()); | |
return; // forced termination. | |
} | |
if (buf_t.endsWith("/get/accepted")) { // Restore by shadow. | |
operate_peripheral(doc["state"]["reported"]); | |
operate_peripheral(doc["state"]["delta"]); | |
report_state(); // Finally, report back to shadow. | |
} | |
if (buf_t.endsWith("/update/delta")) { // Desired from IoT Core | |
operate_peripheral(doc["state"]); | |
report_state(); // Finally, report back to shadow. | |
} | |
} | |
// Required global instances: NONE | |
template <typename T> void connect_to_cellular_network(TinyGsm* modem, T* serialMon) { | |
long s = millis(); | |
serialMon->print(F("> modem.restart(): ")); | |
modem->restart(); | |
serialMon->println(F("done.")); | |
serialMon->print(F("> getModemInfo(): ")); serialMon->println(modem->getModemInfo()); | |
serialMon->print(F("> getIMEI(): ")); serialMon->println(modem->getIMEI()); | |
serialMon->print(F("> waitForNetwork(): ")); | |
while (!modem->waitForNetwork()) serialMon->print(F(".")); | |
serialMon->println(F("Ok.")); | |
serialMon->print(F("> gprsConnect(soracom.io): ")); | |
modem->gprsConnect("soracom.io", "sora", "sora"); | |
serialMon->println(F("done.")); | |
serialMon->print(F("> isNetworkConnected(): ")); | |
while (!modem->isNetworkConnected()) serialMon->print(F(".")); | |
serialMon->println(F("Ok.")); | |
serialMon->print(F("> localIP(): ")); serialMon->println(modem->localIP()); | |
long e = millis(); | |
serialMon->print(F("> connect_to_cellular_network() elapsed(ms): ")); serialMon->println(e - s); | |
} | |
// Required global instances: NONE | |
template <typename T> bool is_cellular_network_connected(TinyGsm* modem, T* serialMon) { | |
bool result = modem->isGprsConnected(); | |
if (!result) serialMon->println(F("> isGprsConnected(): false")); | |
return result; | |
} | |
// Required global instances: MqttParams mqtt_params | |
template <typename T> void set_mqtt_id_and_topics(const char* thing_name, const char* shadow_name, T* serialMon) { | |
sprintf_P(mqtt_params.id, PSTR("%s"), thing_name); | |
serialMon->print(F("> MQTT_ID: ")); serialMon->println(mqtt_params.id); | |
sprintf_P(mqtt_params.shadow_prefix, PSTR("$aws/things/%s/shadow/name/%s"), thing_name, shadow_name); | |
serialMon->print(F("> shadow_prefix: ")); serialMon->println(mqtt_params.shadow_prefix); | |
sprintf_P(mqtt_params.reported_topic, PSTR("%s/update"), mqtt_params.shadow_prefix); | |
serialMon->print(F("> reported_topic: ")); serialMon->println(mqtt_params.reported_topic); | |
sprintf_P(mqtt_params.delta_topic, PSTR("%s/update/delta"), mqtt_params.shadow_prefix); | |
serialMon->print(F("> delta_topic: ")); serialMon->println(mqtt_params.delta_topic); | |
sprintf_P(mqtt_params.get_topic, PSTR("%s/get"), mqtt_params.shadow_prefix); | |
serialMon->print(F("> get_topic: ")); serialMon->println(mqtt_params.get_topic); | |
sprintf_P(mqtt_params.accepted_topic, PSTR("%s/get/accepted"), mqtt_params.shadow_prefix); | |
serialMon->print(F("> accepted_topic: ")); serialMon->println(mqtt_params.accepted_topic); | |
} | |
// Required global instances: PubSubClient MqttClient, MqttParams mqtt_params, Function `mqtt_subscriber_callback()` | |
template <typename T, typename U> void connect_to_mqtt_broker(T* ctx, U* serialMon) { | |
MqttClient.setServer("beam.soracom.io", 1883); | |
MqttClient.setClient(*ctx); | |
MqttClient.setBufferSize(1024); | |
MqttClient.setCallback(mqtt_subscriber_callback); | |
if (!MqttClient.connect(mqtt_params.id)) { | |
serialMon->println(F("> MqttClient.connect() failed.")); | |
serialMon->print(F("> MqttClient.state(): ")); serialMon->println(MqttClient.state()); | |
long e = millis(); | |
return; // forced termination. | |
} | |
MqttClient.subscribe(mqtt_params.delta_topic); | |
MqttClient.subscribe(mqtt_params.accepted_topic); | |
} | |
// Required global instances: PubSubClient MqttClient | |
template <typename T> bool is_mqtt_broker_connected(T* serialMon) { | |
bool result = MqttClient.connected(); | |
if (!result) serialMon->println(F("> MqttClient.connected(): false")); | |
return result; | |
} | |
// Required global instances: M5, SerialMon | |
void mcu_restart() { | |
SerialMon.println("> MCU_RESTART"); | |
delay(3000); // waiting for flush of serial buffer | |
M5.Power.reset(); | |
} | |
void setup() { | |
delay(1000); | |
M5.begin(); | |
SerialMon.begin(115200); | |
SerialMon.println(); | |
SerialMon.println(F("> --- START ---------------------------------------------------")); | |
SerialMon.println(_VERSION_); | |
M5.Lcd.wakeup(); | |
M5.Lcd.setBrightness(40); | |
M5.Lcd.setTextSize(3); | |
M5.Lcd.fillScreen(TFT_BLACK); | |
M5.Lcd.clear(); | |
M5.Lcd.setCursor(0, 0); | |
M5.Lcd.println(F("Connecting...")); | |
SerialAT.begin(115200, SERIAL_8N1, 16, 17); // 3G ext. module | |
connect_to_cellular_network(&modem, &SerialMon); | |
set_mqtt_id_and_topics(THING_NAME, SHADOW_NAME, &SerialMon); | |
connect_to_mqtt_broker(&ctx, &SerialMon); | |
timer0_trigger = false; // not fire with bootup | |
timer0 = timerBegin(0, 80, true); | |
timerAlarmWrite(timer0, 1000000, true); | |
timerAttachInterrupt(timer0, &timer0_callback, true); | |
timerAlarmEnable(timer0); | |
timer1_trigger = true; // fire with bootup | |
timer1 = timerBegin(1, 80, true); | |
timerAlarmWrite(timer1, 1000000, true); | |
timerAttachInterrupt(timer1, &timer1_callback, true); | |
timerAlarmEnable(timer1); | |
} | |
#define LOOP_INTERVAL_MS 50 // msec | |
void loop() { | |
M5.update(); | |
if (!(is_cellular_network_connected(&modem, &SerialMon) && is_mqtt_broker_connected(&SerialMon))) { | |
connect_to_cellular_network(&modem, &SerialMon); | |
connect_to_mqtt_broker(&ctx, &SerialMon); | |
} | |
if (M5.BtnB.wasPressed()) timer0_trigger = true; // If you wanna actively send | |
if (timer0_trigger) { | |
SerialMon.println(F("> Periodically send the state for 'digital twin'.")); | |
report_state(); | |
timer0_trigger = false; // reset | |
timer0_tick = 0; // reset | |
} | |
if (timer1_trigger) { | |
SerialMon.println(F("> Periodically get the shadow for 'digital twin'.")); | |
MqttClient.publish(mqtt_params.get_topic, "{}"); | |
timer1_trigger = false; // reset | |
timer1_tick = 0; // reset | |
} | |
int32_t prev_example_counter = example_counter; | |
if (M5.BtnA.wasPressed()) example_counter--; | |
if (M5.BtnC.wasPressed()) example_counter++; | |
if (prev_example_counter != example_counter) { // is_modified ? | |
DynamicJsonDocument doc(128); | |
JsonObject state = doc.createNestedObject("state"); | |
state["example_counter"] = example_counter; | |
operate_peripheral(state); | |
// report_state(); // If you wanna send at the same time as the change | |
} | |
unsigned long next = millis(); | |
while (millis() < next + LOOP_INTERVAL_MS) MqttClient.loop(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment