Skip to content

Instantly share code, notes, and snippets.

@David256
Created May 20, 2019 14:27
Show Gist options
  • Select an option

  • Save David256/59c3fd698a7e7b5981352428b0dd9da4 to your computer and use it in GitHub Desktop.

Select an option

Save David256/59c3fd698a7e7b5981352428b0dd9da4 to your computer and use it in GitHub Desktop.
This script defines http server with flask to read and show data from sensors of ultrasound and infrared.
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# import the cool libray
import RPi.GPIO as GPIO
# others
import time
import argparse
import threading
import signal
import flask
# config the argument parser
parser = argparse.ArgumentParser(
description="IoT server.")
# the flask server
app = flask.Flask("IoTServer")
# an argument
parser.add_argument(
"-U",
"--ultrasound",
action="store_const",
dest="ultrasound",
const=True,
default=False,
help="Enable ultrasound")
# another argument
parser.add_argument(
"-I",
"--infrared",
action="store_const",
dest="infrared",
const=True,
default=False,
help="Enable infrared")
#...
parser.add_argument(
"-M",
"--methodport",
dest="method",
nargs="?",
default="/basura",
type=str,
help="Method to request")
parser.add_argument(
"-P",
"--port",
dest="port",
nargs="?",
default=8080,
type=int,
help="Port to connection")
# read the configuration
config = parser.parse_args()
#
# the cool class, bro
#
class IoT(threading.Thread):
"""A class that manager the iot device."""
def __init__(self, activador, escuchador, visor, config):
super(IoT, self).__init__()
self.activador = activador
self.escuchador = escuchador
self.visor = visor
self.config = config
self.distancia = 3333333333333
self.lleno = False
self.on = True
# now, config the GPIO environment
GPIO.setmode(GPIO.BOARD)
# setup the pins
if self.config.ultrasound:
GPIO.setup(self.activador, GPIO.OUT)
GPIO.setup(self.escuchador, GPIO.IN)
if self.config.infrared:
GPIO.setup(self.visor, GPIO.IN)
def __del__(self):
if self.config.infrared or self.config.ultrasound:
print("Limpiando un poco...")
GPIO.cleanup()
def web(self):
# normalize the data
distancia = self.distancia if self.distancia < 30 else 30
distancia = 0 if self.distancia < 0 else self.distancia
dicc = dict(
lleno=self.lleno,
porcentaje=(30 - distancia)/30,
distancia=distancia)
json = flask.json.dumps(dicc)
response = flask.Response(json)
response.headers['Content-Type'] = "application/json"
return response
def run(self):
while self.on:
if self.config.ultrasound:
# wait...
GPIO.output(self.activador, False)
time.sleep(0.5)
# shot!
GPIO.output(self.activador, True)
# wait!
time.sleep(0.00001)
# stop shotting!
GPIO.output(self.activador, False)
# now, see
inicio = time.process_time()
final = time.process_time()
while GPIO.input(self.escuchador) == 0:
inicio = time.process_time()
while GPIO.input(self.escuchador) == 1:
final = time.process_time()
# calc the delta time
delta = final - inicio
# calc the distance
distancia = delta*34000
distancia /= 2
self.distancia = distancia
print(
"D = %.1fcm" % distancia,
end="",
flush=True)
if not self.config.infrared:
print()
if self.config.infrared:
ciego = GPIO.input(self.visor)
print(" C =", not ciego)
self.lleno = not ciego
if not self.config.ultrasound:
time.sleep(0.5)
if not self.config.infrared and not self.config.ultrasound:
time.sleep(2)
print("Nada que hacer aquí...")
def close(self, sig, frame):
print("Terminando primero el hilo...")
print("Desactivando...")
self.on = False
print("Esperando por cierre...")
self.join(0)
print("Esperando un tantito...")
time.sleep(1)
print("Eliminando cuestiones...")
self.__del__()
print("Saliendo...")
print("Chao :v")
exit(0)
# set the board number pins
activador = 16
escuchador = 18
visor = 22
iot = IoT(activador, escuchador, visor, config)
signal.signal(signal.SIGINT, iot.close)
@app.route("/")
def handler():
return "Por favor, conecte su tel&eoacute;fono usando <b><em>AppRecBasura</em></b>, a esta direcci&oacute;n con puerto 8080."
app.logger.info("Vale")
# set the handler to iot query
app.route(config.method)(iot.web)
# start that listening of sensors
iot.start()
# start the web server
print("Iniciando el servidor.")
app.run(host="0.0.0.0", port=config.port, debug=False)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment