Last active
August 29, 2015 14:23
-
-
Save lpereira/31b2ace8acf07594e5e9 to your computer and use it in GitHub Desktop.
IoTomada
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import mraa | |
import time | |
import pyupm_i2clcd | |
import pyupm_grove | |
import SimpleXMLRPCServer | |
import netifaces as ni | |
import threading | |
import Queue | |
import os | |
import requests | |
class Background(threading.Thread): | |
def __init__(self): | |
threading.Thread.__init__(self) | |
self.queue = Queue.Queue() | |
def run(self): | |
while True: | |
try: | |
func, args = self.queue.get(block=True) | |
func(*args) | |
except Exception, e: | |
print "Ignoring exception:", e | |
def enqueue(self, func, *args): | |
self.queue.put((func, args)) | |
class Dispatcher: | |
def __init__(self, **proxies): | |
self.proxies = proxies | |
def _dispatch(self, method, params): | |
m = method.split('_', 1) | |
if len(m) < 2: | |
return False | |
proxy = self.proxies[m[0]] | |
attr = getattr(proxy, 'api_' + m[1], None) | |
if attr: | |
proxy.enqueue(attr, *params) | |
return True | |
return False | |
class Gpio(Background): | |
def __init__(self, pin): | |
Background.__init__(self) | |
self.pin = mraa.Gpio(pin) | |
self.pin.dir(mraa.DIR_OUT) | |
self.start() | |
def api_liga(self): | |
self.pin.write(1) | |
def api_desliga(self): | |
self.pin.write(0) | |
def constrain(value, vmin, vmax): | |
if value < vmin: | |
return vmin | |
if value > vmax: | |
return vmax | |
return value | |
class Sistema(Background): | |
def __init__(self): | |
Background.__init__(self) | |
def api_reboot(self): | |
os.system("reboot") | |
def api_soft_reset(self): | |
os.exit(0) | |
class LCD(Background): | |
def __init__(self, lcd): | |
Background.__init__(self) | |
self.lcd = lcd | |
self.start() | |
def api_msg(self, msg): | |
self.lcd.setCursor(1, 1) | |
self.lcd.write(msg) | |
def api_msg_color(self, r, g, b): | |
r = constrain(r, 0, 255) | |
g = constrain(g, 0, 255) | |
b = constrain(b, 0, 255) | |
self.lcd.setColor(r, g, b) | |
class LCDScroller(threading.Thread): | |
def __init__(self, lcd, lampada): | |
threading.Thread.__init__(self) | |
self.lcd = lcd | |
self.lampada = lampada | |
self.button = pyupm_grove.GroveButton(8) | |
def run(self): | |
last_btn = self.button.value() | |
btn = not last_btn | |
while True: | |
if btn != last_btn: | |
last_btn = btn | |
if last_btn: | |
self.lcd.setColor(128, 64, 32) | |
self.lampada.api_liga() | |
else: | |
self.lcd.setColor(64, 128, 32) | |
self.lampada.api_desliga() | |
self.lcd.scroll(not not btn) | |
time.sleep(0.2) | |
btn = self.button.value() | |
class Talkback(threading.Thread): | |
def __init__(self, dispatcher, lcd): | |
threading.Thread.__init__(self) | |
self.dispatcher = dispatcher | |
self.lcd = lcd | |
self.start() | |
def run(self): | |
while True: | |
self.lcd.setColor(255, 128, 64) | |
try: | |
r = requests.get("https://api.thingspeak.com/talkbacks/XXXX/commands/execute", | |
params = {'api_key': 'YYYY'}) | |
method = r.text.replace('-', '_') | |
self.dispatcher._dispatch(method, ()) | |
except: | |
pass | |
finally: | |
self.lcd.setColor(64, 255, 128) | |
time.sleep(5) | |
def get_ip_addr(iface): | |
while True: | |
try: | |
ip = ni.ifaddresses(iface) | |
return ip[2][0]['addr'] | |
except: | |
sleep(0.1) | |
if __name__ == '__main__': | |
print 'Init LCD' | |
lcd = pyupm_i2clcd.Jhd1313m1(6, 0x3e, 0x62) | |
ip = get_ip_addr('wlan0') | |
lcd.setCursor(0, 1) | |
lcd.write("Endereco IP: " + ip) | |
lcd.setCursor(1, 1) | |
lcd.write("#intelmaker Sao Paulo 2015") | |
lampada = Gpio(2) | |
lcd_scroller = LCDScroller(lcd, lampada) | |
lcd_scroller.start() | |
print 'Init XML-RPC server' | |
server = SimpleXMLRPCServer.SimpleXMLRPCServer(('0.0.0.0', 8080)) | |
print 'Register xml proxy for lampada' | |
dispatcher = Dispatcher(lampada=lampada, tomada=Gpio(3), lcd=LCD(lcd), sistema=Sistema()) | |
talkback = Talkback(dispatcher, lcd) | |
server.register_instance(dispatcher) | |
print 'Serving' | |
server.serve_forever() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment