Created
May 20, 2019 14:27
-
-
Save David256/59c3fd698a7e7b5981352428b0dd9da4 to your computer and use it in GitHub Desktop.
This script defines http server with flask to read and show data from sensors of ultrasound and infrared.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/python3 | |
| # -*- coding: utf-8 -*- | |
| # import the cool libray | |
| import RPi.GPIO as GPIO | |
| # others | |
| import time | |
| import argparse | |
| import threading | |
| import signal | |
| import flask | |
| # config the argument parser | |
| parser = argparse.ArgumentParser( | |
| description="IoT server.") | |
| # the flask server | |
| app = flask.Flask("IoTServer") | |
| # an argument | |
| parser.add_argument( | |
| "-U", | |
| "--ultrasound", | |
| action="store_const", | |
| dest="ultrasound", | |
| const=True, | |
| default=False, | |
| help="Enable ultrasound") | |
| # another argument | |
| parser.add_argument( | |
| "-I", | |
| "--infrared", | |
| action="store_const", | |
| dest="infrared", | |
| const=True, | |
| default=False, | |
| help="Enable infrared") | |
| #... | |
| parser.add_argument( | |
| "-M", | |
| "--methodport", | |
| dest="method", | |
| nargs="?", | |
| default="/basura", | |
| type=str, | |
| help="Method to request") | |
| parser.add_argument( | |
| "-P", | |
| "--port", | |
| dest="port", | |
| nargs="?", | |
| default=8080, | |
| type=int, | |
| help="Port to connection") | |
| # read the configuration | |
| config = parser.parse_args() | |
| # | |
| # the cool class, bro | |
| # | |
| class IoT(threading.Thread): | |
| """A class that manager the iot device.""" | |
| def __init__(self, activador, escuchador, visor, config): | |
| super(IoT, self).__init__() | |
| self.activador = activador | |
| self.escuchador = escuchador | |
| self.visor = visor | |
| self.config = config | |
| self.distancia = 3333333333333 | |
| self.lleno = False | |
| self.on = True | |
| # now, config the GPIO environment | |
| GPIO.setmode(GPIO.BOARD) | |
| # setup the pins | |
| if self.config.ultrasound: | |
| GPIO.setup(self.activador, GPIO.OUT) | |
| GPIO.setup(self.escuchador, GPIO.IN) | |
| if self.config.infrared: | |
| GPIO.setup(self.visor, GPIO.IN) | |
| def __del__(self): | |
| if self.config.infrared or self.config.ultrasound: | |
| print("Limpiando un poco...") | |
| GPIO.cleanup() | |
| def web(self): | |
| # normalize the data | |
| distancia = self.distancia if self.distancia < 30 else 30 | |
| distancia = 0 if self.distancia < 0 else self.distancia | |
| dicc = dict( | |
| lleno=self.lleno, | |
| porcentaje=(30 - distancia)/30, | |
| distancia=distancia) | |
| json = flask.json.dumps(dicc) | |
| response = flask.Response(json) | |
| response.headers['Content-Type'] = "application/json" | |
| return response | |
| def run(self): | |
| while self.on: | |
| if self.config.ultrasound: | |
| # wait... | |
| GPIO.output(self.activador, False) | |
| time.sleep(0.5) | |
| # shot! | |
| GPIO.output(self.activador, True) | |
| # wait! | |
| time.sleep(0.00001) | |
| # stop shotting! | |
| GPIO.output(self.activador, False) | |
| # now, see | |
| inicio = time.process_time() | |
| final = time.process_time() | |
| while GPIO.input(self.escuchador) == 0: | |
| inicio = time.process_time() | |
| while GPIO.input(self.escuchador) == 1: | |
| final = time.process_time() | |
| # calc the delta time | |
| delta = final - inicio | |
| # calc the distance | |
| distancia = delta*34000 | |
| distancia /= 2 | |
| self.distancia = distancia | |
| print( | |
| "D = %.1fcm" % distancia, | |
| end="", | |
| flush=True) | |
| if not self.config.infrared: | |
| print() | |
| if self.config.infrared: | |
| ciego = GPIO.input(self.visor) | |
| print(" C =", not ciego) | |
| self.lleno = not ciego | |
| if not self.config.ultrasound: | |
| time.sleep(0.5) | |
| if not self.config.infrared and not self.config.ultrasound: | |
| time.sleep(2) | |
| print("Nada que hacer aquí...") | |
| def close(self, sig, frame): | |
| print("Terminando primero el hilo...") | |
| print("Desactivando...") | |
| self.on = False | |
| print("Esperando por cierre...") | |
| self.join(0) | |
| print("Esperando un tantito...") | |
| time.sleep(1) | |
| print("Eliminando cuestiones...") | |
| self.__del__() | |
| print("Saliendo...") | |
| print("Chao :v") | |
| exit(0) | |
| # set the board number pins | |
| activador = 16 | |
| escuchador = 18 | |
| visor = 22 | |
| iot = IoT(activador, escuchador, visor, config) | |
| signal.signal(signal.SIGINT, iot.close) | |
| @app.route("/") | |
| def handler(): | |
| return "Por favor, conecte su tel&eoacute;fono usando <b><em>AppRecBasura</em></b>, a esta dirección con puerto 8080." | |
| app.logger.info("Vale") | |
| # set the handler to iot query | |
| app.route(config.method)(iot.web) | |
| # start that listening of sensors | |
| iot.start() | |
| # start the web server | |
| print("Iniciando el servidor.") | |
| app.run(host="0.0.0.0", port=config.port, debug=False) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment