Created
June 8, 2024 04:52
-
-
Save ProfAndreaPollini/bfd79fdf2d5bac85af36af766a2581d8 to your computer and use it in GitHub Desktop.
python multiprocessing with flask and TCP server communicating through a queue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from dataclasses import asdict, dataclass | |
import time | |
from paho.mqtt import client as mqtt_client | |
from multiprocessing import Process, Queue, log_to_stderr, Manager, Lock | |
import logging | |
import functools | |
import socket | |
import os | |
import json | |
logging.basicConfig(level=logging.INFO) | |
log = log_to_stderr() | |
def cmd_test(): | |
log.debug("Starting test") | |
print("Hello World") | |
time.sleep(10) | |
########## COMMANDS ########## | |
@dataclass | |
class Command: | |
cmd: str | |
args: dict = None | |
wait_time: int = 0 | |
def dict(self): | |
return {k: str(v) for k, v in asdict(self).items()} | |
CMD_START = functools.partial(Command, "START")() | |
CMD_STOP = functools.partial(Command, "STOP")() | |
CMD_READ_DATA = functools.partial(Command, "READ_DATA")() | |
def CMD_SELEZIONA_VALVOLA(x): | |
return Command("SELEZIONA_VALVOLA", {"valvola": x}) | |
COMMANDS = { | |
"START": CMD_START, | |
"STOP": CMD_STOP, | |
"READ_DATA": CMD_READ_DATA, | |
"SELEZIONA_VALVOLA": CMD_SELEZIONA_VALVOLA, | |
} | |
########## COMMAND RETURN CODES and CLASSES ########## | |
CMD_RET_SUCCESS = 0x0 | |
CMD_RET_ERROR = 0xFF | |
@dataclass | |
class CmdReturn: | |
code: int | |
cmd: str | |
message: str = "" | |
payload: str = "" | |
def dict(self): | |
return {k: str(v) for k, v in asdict(self).items()} | |
def server(queue: Queue, rqueue: Queue, d, l: Lock): | |
# serves only 5 request and dies | |
while True: | |
# Wait for next request from client | |
# message = socket.recv() | |
# print("Received request #%s: %s" % (1, message)) | |
# socket.send_string("World from %s" % port) | |
try: | |
msg = queue.get() | |
log.info("Received message: %s" % msg) | |
match msg.cmd: | |
case "START": | |
with l: | |
d["active"] = True | |
rqueue.put(CmdReturn(CMD_RET_SUCCESS, "start", "Success")) | |
break | |
case "STOP": | |
with l: | |
d["active"] = False | |
break | |
case "READ_DATA": | |
with l: | |
d["busy"] = True | |
log.info("Reading data") | |
cmd_test() | |
log.info("Data read") | |
with l: | |
d["busy"] = False | |
rqueue.put( | |
CmdReturn( | |
CMD_RET_SUCCESS, | |
"start", | |
"Success", | |
payload={ | |
"temperature": 25, | |
"pressure": 100, | |
"humidity": 50, | |
"valvola": 1, | |
}, | |
), | |
block=False, | |
) | |
break | |
case "SELEZIONA_VALVOLA": | |
break | |
case _: | |
break | |
# with l: | |
# d["busy"] = True | |
# cmd_test() | |
# time.sleep(0.1) | |
# with l: | |
# d[msg] = msg | |
# d["busy"] = False | |
except Exception as e: | |
log.error(e) | |
rqueue.put(CmdReturn(CMD_RET_ERROR, "error", str(e))) | |
return | |
def web_server(port, queue: Queue, rqueue: Queue, d, l: Lock): | |
from flask import Flask | |
app = Flask(__name__) | |
@app.route("/") | |
def hello_world(): | |
# Send a message to the MQTT server | |
print("Sending message to MQTT") | |
queue.put("Hello") | |
return "<p>Hello, World!</p>" | |
@app.route("/status") | |
def status(): | |
return dict(d) | |
@app.route("/data") | |
def data(): | |
if not d["active"]: | |
return CmdReturn(CMD_RET_ERROR, "data", "Device not active").dict() | |
queue.put(COMMANDS["READ_DATA"], block=False) | |
time.sleep(2) | |
msg = rqueue.get() | |
return msg.dict() | |
@app.route("/start") | |
def start(): | |
try: | |
queue.put(COMMANDS["START"]) | |
ret = rqueue.get() | |
return ret.dict() | |
except Exception as e: | |
log.error(e) | |
return CmdReturn(CMD_RET_ERROR, "start", str(e)).dict() | |
@app.route("/stop") | |
def stop(): | |
try: | |
queue.put(COMMANDS["STOP"], block=False) | |
return CmdReturn(CMD_RET_SUCCESS, "stop", "Success").dict() | |
except Exception as e: | |
log.error(e) | |
return CmdReturn(CMD_RET_ERROR, "stop", str(e)).dict() | |
@app.route("/size") | |
def size(): | |
return "<p>Size: %s</p>" % queue.qsize() | |
app.run(port=port) | |
def client(port, queue, d, l): | |
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
sock.bind(("127.0.0.1", 12345)) | |
sock.listen(0) | |
while True: | |
connection, address = sock.accept() | |
buf = connection.recv(1024).decode() | |
cmd = json.loads(buf) | |
log.info("Received command: %s" % cmd) | |
log.info(json.dumps(buf)) | |
connection.send(json.dumps(buf).encode()) | |
connection.close() | |
if __name__ == "__main__": | |
q = Queue() | |
ret_queue = Queue() | |
# Manager to create shared object. | |
with Manager() as manager: | |
# Create a global variable. | |
data = manager.dict() | |
data["busy"] = False | |
data["active"] = False | |
lock = manager.Lock() | |
p1 = Process(target=server, args=(q, ret_queue, data, lock)) | |
p1.daemon = True | |
p1.start() | |
# Now we can connect a client to all these servers | |
p2 = Process(target=client, args=(8881, q, data, lock)) | |
p2.daemon = True | |
p2.start() | |
p3 = Process(target=web_server, args=(5100, q, ret_queue, data, lock)) | |
p3.daemon = True | |
p3.start() | |
workers = [p1, p2, p3] | |
try: | |
for w in workers: | |
w.join() | |
except KeyboardInterrupt: | |
log.error(data) | |
for w in workers: | |
w.terminate() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment