Here is the source code for my simple alarm system:
boot.py
main.py
httpclient.py
express.py
onboard.py
On the raspberry pi, you simply bring up the server by running as root:
sudo python onboary.py 3000
led_on.sh
led_off.sh
Here is the source code for my simple alarm system:
boot.py
main.py
httpclient.py
express.py
onboard.py
On the raspberry pi, you simply bring up the server by running as root:
sudo python onboary.py 3000
led_on.sh
led_off.sh
from machine import Pin | |
import network | |
import time | |
HIGH = 1 | |
LOW = 0 | |
sta = network.WLAN(network.STA_IF) | |
ap = network.WLAN(network.AP_IF) | |
wifi = Pin(16, Pin.OUT) | |
ap.active(False) | |
sta.active(False) | |
wifi.off() | |
time.sleep(2) | |
sta.active(True) | |
sta.connect('SSID', 'password') #change settings here | |
while sta.isconnected() == False: | |
wifi.on() | |
time.sleep(0.3) | |
wifi.off() | |
time.sleep(0.3) | |
print('Connected: ', sta.isconnected()) | |
if sta.isconnected(): | |
wifi.on() | |
else: | |
wifi.off() | |
switch = Pin(15, Pin.IN) | |
led = Pin(14, Pin.OUT) | |
led.off() | |
print('switch active') |
#!/usr/bin/env python | |
# """ | |
# This code is based on a minimal http server by bradmontgomery | |
# https://gist.github.com/bradmontgomery/2219997 | |
# | |
# Some part of it was inspired by express (expressjs.com) | |
# """ | |
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer | |
import SocketServer | |
class ExpressServerFactory: | |
def __init__(self): | |
self.cache = {} | |
def add(self, path, fn): | |
self.cache[path] = fn | |
def get(self, path, fn): | |
self.cache[path] = fn | |
def post(self, path, fn): | |
self.cache[path] = fn | |
def createClass(self): | |
cache = self.cache | |
def fn404(self): | |
self.send_response(404, 'path: "%s" is not defined...') | |
class B(BaseHTTPRequestHandler): | |
def do_GET(self): | |
try: | |
fn = cache[self.path] | |
except Exception as e: | |
fn = fn404 | |
fn(self) | |
def do_POST(self): | |
try: | |
fn = cache[self.path] | |
except Exception as e: | |
fn = fn404 | |
fn(self) | |
return B | |
def noop(*args, **kwargs): | |
pass | |
def run(server_class=HTTPServer, handler_class=None, port=80, stop_handler=noop): | |
server_address = ('', port) | |
httpd = server_class(server_address, handler_class) | |
print 'Starting httpd...' | |
try: | |
httpd.serve_forever() | |
except KeyboardInterrupt: | |
stop_handler(httpd) | |
if __name__ == "__main__": | |
from sys import argv | |
connected = False | |
srv = MyHttpServer() | |
def foo(self): | |
length = int(self.headers['Content-Length']) | |
post_data = self.rfile.read(length) | |
print type(post_data), post_data | |
srv.post('/foo', foo) | |
handlerClass = srv.createClass() | |
if len(argv) == 2: | |
run(port=int(argv[1]), handler_class=handlerClass) | |
else: | |
run(handler_class=handlerClass) | |
import usocket | |
import ujson | |
try: | |
import ussl | |
SUPPORT_SSL = True | |
except ImportError: | |
ussl = None | |
SUPPORT_SSL = False | |
import time | |
SUPPORT_TIMEOUT = hasattr(usocket.socket, 'settimeout') | |
CONTENT_TYPE_JSON = 'application/json' | |
class Response(object): | |
def __init__(self, status_code, raw): | |
self.status_code = status_code | |
self.raw = raw | |
self._content = False | |
self.encoding = 'utf-8' | |
@property | |
def content(self): | |
if self._content is False: | |
self._content = self.raw.read() | |
self.raw.close() | |
self.raw = None | |
return self._content | |
@property | |
def text(self): | |
content = self.content | |
return str(content, self.encoding) if content else '' | |
def close(self): | |
if self.raw is not None: | |
self._content = None | |
self.raw.close() | |
self.raw = None | |
def json(self): | |
return ujson.loads(self.text) | |
def raise_for_status(self): | |
if 400 <= self.status_code < 500: | |
raise OSError('Client error: %s' % self.status_code) | |
if 500 <= self.status_code < 600: | |
raise OSError('Server error: %s' % self.status_code) | |
# Adapted from upip | |
def request(method, url, json=None, timeout=None, headers=None): | |
urlparts = url.split('/', 3) | |
proto = urlparts[0] | |
host = urlparts[2] | |
urlpath = '' if len(urlparts) < 4 else urlparts[3] | |
if proto == 'http:': | |
port = 80 | |
elif proto == 'https:': | |
port = 443 | |
else: | |
raise OSError('Unsupported protocol: %s' % proto[:-1]) | |
if ':' in host: | |
host, port = host.split(':') | |
port = int(port) | |
if json is not None: | |
content = ujson.dumps(json) | |
content_type = CONTENT_TYPE_JSON | |
else: | |
content = None | |
ai = usocket.getaddrinfo(host, port) | |
addr = ai[0][4] | |
sock = usocket.socket() | |
if timeout is not None: | |
assert SUPPORT_TIMEOUT, 'Socket does not support timeout' | |
sock.settimeout(timeout) | |
sock.connect(addr) | |
if proto == 'https:': | |
assert SUPPORT_SSL, 'HTTPS not supported: could not find ussl' | |
sock = ussl.wrap_socket(sock) | |
sock.write('%s /%s HTTP/1.0\r\nHost: %s\r\n' % (method, urlpath, host)) | |
if headers is not None: | |
for header in headers.items(): | |
sock.write('%s: %s\r\n' % header) | |
if content is not None: | |
sock.write('content-length: %s\r\n' % len(content)) | |
sock.write('content-type: %s\r\n' % content_type) | |
sock.write('\r\n') | |
sock.write(content) | |
else: | |
sock.write('\r\n') | |
l = sock.readline() | |
protover, status, msg = l.split(None, 2) | |
# Skip headers | |
while sock.readline() != b'\r\n': | |
pass | |
return Response(int(status), sock) | |
def get(url, **kwargs): | |
return request('GET', url, **kwargs) | |
def post(url, **kwargs): | |
return request('POST', url, **kwargs) | |
if __name__ == '__main__': | |
#resp = get('http://192.168.0.7:8080/sample.txt') | |
resp = get('http://192.168.0.4:3000/status') | |
print('Response:', resp.text) | |
post('http://192.168.0.4:3000/post', json={ "value": 1 }).close() | |
time.sleep(5) | |
post('http://192.168.0.4:3000/post', json={ "value": 0 }).close() |
#!/bin/sh | |
curl http://192.168.0.4:3000/post -d '{"value" : 0}' \ | |
-H 'Content-Type:application/json' | |
echo |
#!/bin/sh | |
curl http://192.168.0.4:3000/post -d '{"value" : 1}' \ | |
-H 'Content-Type:application/json' | |
echo |
from machine import Pin | |
import time | |
import httpclient as http | |
import network | |
station = network.WLAN(network.STA_IF) | |
class L : pass | |
p = L() | |
p.WIFI_CONNECTED = station.isconnected() | |
URL='http://192.168.0.4:3000/post' | |
switch = Pin(15, Pin.IN) | |
wifi_led = Pin(16, Pin.OUT) | |
alarm_led = Pin(14, Pin.OUT) | |
def set_state(value): | |
print('value:', value) | |
alarm_led.value(value) | |
if p.WIFI_CONNECTED: | |
http.post(URL, json={'value': value}).close() | |
def debounce(fn, p, interval=20): | |
def cb(*args, **kwargs): | |
count = 0 | |
hits = 0 | |
v = p.value() | |
while count < interval: | |
if p.value() == v: | |
hits = hits + 1 | |
count = count + 1 | |
time.sleep(0.001) | |
if hits == interval: | |
fn(v) | |
return cb | |
print('setting switch trigger...') | |
switch.irq(trigger=Pin.IRQ_RISING | Pin.IRQ_FALLING,\ | |
handler=debounce(set_state, switch, 20)) | |
print('running infinitely...') | |
def do_wifi_check(): | |
val = station.isconnected() | |
p.WIFI_CONNECTED = val | |
wifi_led.value(val) | |
while True: | |
do_wifi_check(); | |
time.sleep(0.5) | |
# raspberry pi on board led controlling: | |
# https://raspberrypi.stackexchange.com/a/1504/74294 | |
import os | |
from express import run, ExpressServerFactory as Factory | |
from BaseHTTPServer import HTTPServer | |
import json | |
class CBServer(HTTPServer): | |
def server_activate(self): | |
HTTPServer.server_activate(self) | |
self.RequestHandlerClass.post_start() | |
def server_close(self): | |
HTTPServer.server_close(self) | |
self.RequestHandlerClass.post_stop() | |
sfact = Factory() | |
def post_start(self): | |
print 'server is running...' | |
def process(self): | |
length = int(self.headers['Content-Length']) | |
data = self.rfile.read(length) | |
self.send_response(200, 'Ok') | |
self.wfile.write('\r\n') | |
self.wfile.write(data) | |
req = json.loads(data) | |
if type(req['value']) == int: | |
val = req['value'] | |
if val: | |
brightness_on() | |
else: | |
brightness_off() | |
def stat(self): | |
self.send_response(200, 'Ok') | |
self.wfile.write('\r\n') | |
self.wfile.write('hello world') | |
sfact.post('/post', process) | |
sfact.get('/status', stat) | |
sclass = sfact.createClass() | |
def brightness_on(): | |
os.system("echo 1 > /sys/class/leds/led0/brightness") | |
def brightness_off(): | |
os.system("echo 0 > /sys/class/leds/led0/brightness") | |
class Handler(sclass): | |
@classmethod | |
def post_start(self): | |
print 'setting led trigger...' | |
os.system('echo none > /sys/class/leds/led0/trigger') | |
print 'done' | |
@classmethod | |
def post_stop(self): | |
print 'defaulting led trigger...' | |
os.system('echo mmc0 > /sys/class/leds/led0/trigger') | |
print 'done' | |
def on_stop(self): | |
print 'server closing...' | |
self.server_close() | |
print 'server closed' | |
if __name__ == '__main__': | |
import sys | |
if len(sys.argv) == 2: | |
p = int(sys.argv[1]) | |
run(port=p, handler_class=Handler, server_class=CBServer, stop_handler=on_stop) | |
else: | |
run(handler_class=Handler, server_class=CBServer, stop_handler=on_stop) | |