Created
January 24, 2019 01:23
-
-
Save ryanswanstrom/0dca9ee88e70ba52ba1ccb5c47cb724c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import urequests as requests | |
#common functions | |
def mb_safe(func): | |
def _wrapper(*args, **kwargs): | |
try: | |
return func(*args, **kwargs) | |
except Exception as e: | |
print('===== mb_safe: ' + str(e)) | |
return '' | |
return _wrapper | |
#create iot module | |
import uos | |
uos.chdir('/flash/flash') | |
with open("iot.py", "w") as f: | |
f.write("") | |
import ujson | |
import iot | |
import time | |
import codey | |
import urequests | |
from codey_wlan_board import codey_wlan | |
# 云列表请求 域名 | |
iot_list_request_domain = '[iotCloudListDomain]' | |
iot_weather_request_domin = '[iotWeatherDomain]'; | |
iot_air_request_domain = '[iotAirDomain]'; | |
def __iot_get_request_header(): | |
return { | |
'content-type': 'application/json; charset=utf-8', | |
'uid': '[uId]', | |
'devicetype': '[deviceType]', | |
'deviceid': '[deviceId]' | |
} | |
def __iot_get(request_url): | |
if not codey.wifi_is_connected(): | |
return '' | |
res = urequests.request('GET', request_url, headers = __iot_get_request_header()).json() | |
return res['data'] | |
def __iot_post(request_url, post_data): | |
if not codey.wifi_is_connected(): | |
return '' | |
res = urequests.request('POST', request_url, headers = __iot_get_request_header(), data = post_data).json() | |
return res['data'] | |
# 添加数据项 | |
@mb_safe | |
def iot_list_add(name, data): | |
post_data = ujson.dumps({ "listName": name, "data": data}) | |
return __iot_post(iot_list_request_domain + 'meos/postcloudlist', post_data) | |
iot.list_add = iot_list_add | |
# 得到 index 指向数据项 | |
@mb_safe | |
def iot_list_index(name, index): | |
req_type = 'index' | |
req_index = 0 | |
if index == 'random': | |
req_type = 'random' | |
elif index == 'last': | |
req_type = 'last' | |
else: | |
index = int(index) | |
if index > 0: | |
req_index = index - 1 | |
else: | |
return '' | |
res = __iot_get(iot_list_request_domain + 'meos/getCloudListItemByIndex?listName=' + name + '&type=' + req_type + '&index=' + str(req_index)) | |
return res['itemData']['data'] | |
iot.list_index = iot_list_index | |
# 获取云列表长度 | |
@mb_safe | |
def iot_list_length(name): | |
res = __iot_get(iot_list_request_domain + 'meos/getcloudlistlen?listName=' + name) | |
return int(res['listLen']) | |
iot.list_length = iot_list_length | |
# 获取天气信息想 | |
# city_code: 城市编码 | |
# data_type: 获取数据类型 | |
@mb_safe | |
def iot_weather(city_code, data_type): | |
if not codey.wifi_is_connected(): | |
return '' | |
request_url = iot_weather_request_domin + 'getweather?woeid=' + str(city_code) + '&type=' + str(data_type) | |
res = urequests.request('GET', request_url) | |
text = res.text | |
if int(data_type) <= 3: | |
return int(text) | |
return text | |
iot.weather = iot_weather | |
# 获取空气信息 | |
# cid: 检查站id | |
# arg: 需要查询的信息(pm25, so2等) | |
@mb_safe | |
def iot_air(cid, arg): | |
if not codey.wifi_is_connected(): | |
return '' | |
post_data = ujson.dumps({ "cid": cid, "arg": arg}) | |
res = urequests.request('POST', iot_air_request_domain + 'air/getone', headers = __iot_get_request_header(), data = post_data) | |
text = res.text | |
return float(text) | |
iot.air = iot_air | |
import codey | |
import rocky | |
def on_button_callback2(): | |
codey.face( | |
'1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1' | |
'0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0' | |
'0 1 1 1 0 0 0 0 0 0 0 0 0 0 0 0' | |
'0 1 0 0 0 1 1 1 0 0 0 0 0 1 1 1' | |
'0 1 1 1 0 0 1 0 0 1 1 1 0 1 0 1' | |
'0 0 0 1 0 0 1 0 0 1 0 1 0 1 1 1' | |
'0 1 1 1 0 0 1 0 0 1 1 1 0 1 0 0' | |
'1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1') | |
rocky.stop() | |
codey.wifi('GooF-phone', '12345678') | |
if codey.wifi_is_connected(): | |
codey.say('yeah.wav', True) | |
codey.show('yes', wait = False) | |
else: | |
codey.say('angry.wav', True) | |
codey.show('no', wait = False) | |
codey.on_button('C', on_button_callback2) | |
def on_button_callback(): | |
codey.face( | |
'1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1' | |
'0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0' | |
'0 1 1 1 0 0 0 0 0 0 0 1 1 1 0 0' | |
'0 1 0 1 0 1 1 1 1 1 0 1 0 1 0 0' | |
'0 1 1 1 0 0 0 1 0 0 0 1 1 1 0 0' | |
'0 1 0 0 0 0 0 1 0 0 0 1 1 0 0 0' | |
'0 1 0 0 0 0 0 1 0 0 0 1 0 1 0 0' | |
'1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1') | |
codey.say('annoyed.wav') | |
rocky.forward(60, 0.6) | |
rocky.backward(20, 0.5) | |
if codey.wifi_is_connected(): | |
res = requests.get(url='http://ryanstestfuncapp.azurewebsites.net/api/mbot?symbol=msft') | |
if res: | |
codey.show('Microsoft stock price is ' + res.text) | |
else: | |
codey.show('apis bad') | |
else: | |
codey.show('not con') | |
codey.on_button('A', on_button_callback) | |
def on_button_callback1(): | |
codey.face( | |
'1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1' | |
'0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0' | |
'0 0 1 1 1 0 0 1 0 0 1 1 1 0 0 0' | |
'0 0 1 0 1 0 0 0 0 0 1 0 1 0 0 0' | |
'0 0 1 1 1 0 0 1 0 0 1 1 1 0 0 0' | |
'0 0 1 1 0 0 0 1 0 0 1 0 0 0 0 0' | |
'0 0 1 0 1 0 0 1 0 0 1 0 0 0 0 0' | |
'1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1') | |
for count in range(2): | |
codey.play_note(60, 0.25) | |
rocky.turn_left(50, 0.5) | |
rocky.forward(50, 0.5) | |
rocky.turn_right(50, 0.6) | |
rocky.backward(50, 0.5) | |
codey.on_button('B', on_button_callback1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment