Skip to content

Instantly share code, notes, and snippets.

@aschmidt75
Created July 16, 2018 12:01
Show Gist options
  • Save aschmidt75/706fa7f5360868d6ed45d2bcd9cd0bad to your computer and use it in GitHub Desktop.
Save aschmidt75/706fa7f5360868d6ed45d2bcd9cd0bad to your computer and use it in GitHub Desktop.
Micropython main script for MQTT/WebThingAPI sample
from action import Action
from event import Event
from property import Property
from thing import Thing
from value import Value
import json
import time
import uuid
from machine import Pin
import pycom
import gc
from mqtt import mqtt
from ThingMQTTBinding import ThingMQTTBinding
# Connect to mqtt server
mqtt_server = "192.168.0.100"
mqtt_client = mqtt.MQTTClient("pycom", mqtt_server,user="wipy", password="your_api_key", port=1883)
resp = mqtt_client.connect()
if resp == 0:
print("Connected to MQTT broker")
else:
print("Error connecting to MQTT broker.")
# Instantiate Thing, create values, attach
# properties, actions, events
thing = Thing('PycomDemoThing', ['OnOffSwitch', 'Light'], '-')
# we're using the ws_href field as the name of the mqtt topic to subscribe/publish to
thing.set_ws_href('pycom')
## Properties
# properties are backed by Value objects
led_onoff_value = Value(True)
button_value = Value(False)
# Property objects connect values with names and metadata, and attach
# them to a thing.
led_onoff_property = Property(thing, 'on', led_onoff_value, {
'@type': 'OnOffProperty',
'label': 'Builtin LED On/Off',
'type': 'boolean',
'description': 'WiPy built-in color LED turned on of off',
})
button_property = Property(thing, 'S1', button_value, {
'@type': 'OnOffProperty',
'label': 'Builtin S1 button',
'type': 'boolean',
'description': 'Button S1 of Pycom Expansion board',
})
thing.add_property(led_onoff_property)
thing.add_property(button_property)
## Action
# Each action has a class that derives from Action.
class TestAction(Action):
def __init__(self, thing=None, input_=None):
Action.__init__(self, uuid.uuid4().hex, thing, 'test', input_=input_)
def perform_action(self):
print("in TestAction.perform_action()")
print(self.input)
# Add action class with name and meta data to thing
thing.add_available_action('test', {
'label': 'Test',
'properties': {
'duration': {
'type': 'number',
'unit': 'seconds',
'minimum': 0,
}
}
}, TestAction)
## Events
class TestEvent(Event):
def __init__(self,thing,data_):
Event.__init__(self,thing, 'test', data=data_)
thing.add_available_event('test', {
'description': 'something has happened.',
'type': 'string',
})
# to trigger this event, use:
# thing.add_event(TestEvent(thing, { "detail": "data"}))
## Binding
# ThingMQTTBinding is the intermediate between an MQTT client object
# and a Thing object. It handles all interactions such as `setProperty`
# or `event` update messages. It needs to be instantiated after complete
# setup of a Thing objects
binding = ThingMQTTBinding(mqtt_client, thing)
# Ex1: add user button interrupt handler, fire event
def button_released(arg):
button_value = arg.value() # update wot value
# fire event
thing.add_event(TestEvent(thing, 'button_pressed'))
button = Pin("G17", mode=Pin.IN)
button.callback(Pin.IRQ_RISING, button_released, None)
# Ex2: whenever led_onoff value changes, we want to know and
# set the internal LED accordingly.
def update_led(value):
print("set_led: {}", value)
if value == True:
pycom.rgbled(0xFFFFFF)
else:
pycom.rgbled(0x000000)
led_onoff_value.value_forwarder = update_led
gc.collect()
## main loop
binding.handle_announce()
counter = 0
while True:
mqtt_client.check_msg()
time.sleep(1)
# every 60s, send a "propertyStatus" message
counter += 1
if counter % 60 == 0:
binding.update_property_status()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment