Skip to content

Instantly share code, notes, and snippets.

@ma2shita
Created November 4, 2019 03:29
Show Gist options
  • Save ma2shita/be6be16f544c4ba8caf0309a5a79e6b1 to your computer and use it in GitHub Desktop.
Save ma2shita/be6be16f544c4ba8caf0309a5a79e6b1 to your computer and use it in GitHub Desktop.
"""
[Unit]
Description=SwitchBot worker using MQTT with SORACOM Beam
After=network-online.target
[Service]
Type=simple
WorkingDirectory=/home/pi/switchbot
ExecStart=/home/pi/switchbot/bin/python3 -B switchbot_worker_with_soracom_beam.py
ExecStop=/bin/kill -INT ${MAINPID}
Restart=always
RestartSec=5s
StartLimitBurst=10
[Install]
WantedBy=default.target
"""
import time
import paho.mqtt.client as mqtt # paho-mqtt
import json
import switchbot
ENDPOINT = {"host": "beam.soracom.io", "port": 1883}
THING_NAME = "GarageSwitchBot1"
OPENER_SWITCHBOT_MACADDRESS = "YOUR_SWITCHBOT_MAC_ADDRESS"
CLOSER_SWITCHBOT_MACADDRESS = "YOUR_SWITCHBOT_MAC_ADDRESS"
class Controller:
def open(self):
switchbot.SwitchBot(OPENER_SWITCHBOT_MACADDRESS).press()
def close(self):
switchbot.SwitchBot(CLOSER_SWITCHBOT_MACADDRESS).press()
class GeneralMqtt:
def delta_topic(thing_name):
return "$aws/things/{}/shadow/update/delta".format(thing_name) # for Shadow of AWS IoT Core
def extract(obj):
return obj['state']['garage'] # Likes shadow doc of AWS IoT Core
def on_connect(client, userdata, flag, rc):
delta_topic = GeneralMqtt.delta_topic(THING_NAME)
client.subscribe(delta_topic)
def on_message(client, userdata, msg):
print(msg.payload, flush=True)
obj = json.loads(msg.payload)
method_name = GeneralMqtt.extract(obj)
getattr(Controller(), method_name)()
def on_disconnect(client, userdata, rc):
print("on_disconnect({})".format(rc))
if rc != 0:
quit(rc) # and expect respawn by Supervisor (e.g. systemd)
def on_log(client, obj, level, string):
print(string, flush=True)
def main():
client = mqtt.Client()
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.on_message = on_message
client.on_log = on_log
# client.tls_set(ca_certs=rCA, certfile=CRT, keyfile=KEY, cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None)
print("Connect to {}".format(ENDPOINT["host"]))
client.connect(ENDPOINT["host"], port=ENDPOINT["port"], keepalive=600)
client.loop_forever()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment