Created
August 8, 2016 11:32
-
-
Save zakx/c4cef15846e289e7676a29ab9db66c4c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
### Category: Comms | |
### Author: zakx <[email protected]> | |
### License: MIT | |
### Appname: Message Snoop | |
### Description: Have no friends? Receive other people's messages | |
import wifi | |
import mqtt | |
import ugfx | |
import pyb | |
import math | |
import time | |
import json | |
def callback(topic, msg): | |
data = json.loads(msg) | |
if data['type'] == 'message': | |
printmsg(topic, data['sender'], data['payload'], data['ts']) | |
# taken from nexmo's messages app. cheers! | |
def zf(d): | |
l=2 | |
if len(d) < l: | |
return "0" + d | |
else: | |
return d | |
# taken from nexmo's messages app. cheers! | |
def timestring(tt): | |
offset = 946681200 | |
t = tt-offset | |
ts = time.localtime(t) | |
timestamp = "{}:{}:{} {}/{}/{}".format(zf(str(ts[3])), zf(str(ts[4])), zf(str(ts[5])), str(ts[2]), str(ts[1]), str(ts[0])) | |
return timestamp | |
# taken from nexmo's messages app. cheers! | |
def printmsg(topic, sender, text, ts): | |
ugfx.clear(ugfx.WHITE) | |
ugfx.set_default_font(ugfx.FONT_SMALL) | |
ugfx.text(0, 0, "From: "+sender, ugfx.BLUE) | |
ugfx.text(0, 15, "To: "+topic.decode("ascii"), ugfx.BLUE) | |
timestamp = timestring(ts) | |
ugfx.text(0, 30, "Sent at "+timestamp, ugfx.BLUE) | |
linelen = 40 | |
lines = int(math.ceil(len(text)/linelen)) | |
for l in range(0, lines): | |
pixels = l*25+50 | |
start = l*linelen | |
end = l*linelen+linelen | |
if end>len(text): | |
end = len(text) | |
linetext = text[start:end] | |
ugfx.text(10,pixels,linetext,ugfx.BLACK) | |
def waiting_screen(): | |
ugfx.clear(ugfx.WHITE) | |
ugfx.area(0,0,ugfx.width(),ugfx.height(),0xFFFF) | |
ugfx.set_default_font(ugfx.FONT_MEDIUM_BOLD) | |
ugfx.text(0,10,"Waiting for messages.",ugfx.BLACK) | |
ugfx.set_default_font(ugfx.FONT_SMALL) | |
ugfx.text(0,225,"Press MENU to exit.",ugfx.BLACK) | |
ugfx.init() | |
c = None | |
def connect_mqtt(): | |
global c | |
# if they don't like it, they may block our user agant | |
c = mqtt.MQTTClient('snoop/2', 'badge.emf.camp') | |
try: | |
c.connect() | |
except OSError: | |
pyb.delay(200) | |
connect_mqtt() | |
return | |
c.set_callback(callback) | |
c.subscribe('/badge/#', qos=0) | |
def disconnect_mqtt(): | |
global c | |
if c: | |
try: | |
c.disconnect() | |
except: | |
pass | |
def connect(reconnect=False): | |
global c | |
ugfx.clear(ugfx.RED) | |
ugfx.set_default_font(ugfx.FONT_SMALL) | |
ugfx.text(0, 10, "Connecting to Wifi...", ugfx.WHITE) | |
if reconnect: | |
disconnect_mqtt() | |
wifi.nic().disconnect() | |
while wifi.is_connected(): | |
try: | |
wifi.connect(wait=True, timeout=15) | |
except: | |
pyb.delay(200) | |
ugfx.text(0, 22, "Connecting to MQTT...", ugfx.WHITE) | |
c = None | |
connect_mqtt() | |
connect() | |
waiting_screen() | |
next_tick = 0 | |
while True: | |
if pyb.millis() > next_tick: | |
if not wifi.is_connected(): | |
connect(reconnect=True) | |
try: | |
c.check_msg() | |
except: | |
pass | |
try: | |
c.sock.send(b"\xc0\0") # ping | |
except: | |
connect(reconnect=True) | |
waiting_screen() | |
next_tick = pyb.millis() + 30000 | |
try: | |
c.check_msg() | |
except: | |
pyb.delay(200) | |
ugfx.clear() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment