Skip to content

Instantly share code, notes, and snippets.

@ProfAndreaPollini
Created June 8, 2024 04:52
Show Gist options
  • Save ProfAndreaPollini/bfd79fdf2d5bac85af36af766a2581d8 to your computer and use it in GitHub Desktop.
Save ProfAndreaPollini/bfd79fdf2d5bac85af36af766a2581d8 to your computer and use it in GitHub Desktop.
python multiprocessing with flask and TCP server communicating through a queue
from dataclasses import asdict, dataclass
import time
from paho.mqtt import client as mqtt_client
from multiprocessing import Process, Queue, log_to_stderr, Manager, Lock
import logging
import functools
import socket
import os
import json
logging.basicConfig(level=logging.INFO)
log = log_to_stderr()
def cmd_test():
log.debug("Starting test")
print("Hello World")
time.sleep(10)
########## COMMANDS ##########
@dataclass
class Command:
cmd: str
args: dict = None
wait_time: int = 0
def dict(self):
return {k: str(v) for k, v in asdict(self).items()}
CMD_START = functools.partial(Command, "START")()
CMD_STOP = functools.partial(Command, "STOP")()
CMD_READ_DATA = functools.partial(Command, "READ_DATA")()
def CMD_SELEZIONA_VALVOLA(x):
return Command("SELEZIONA_VALVOLA", {"valvola": x})
COMMANDS = {
"START": CMD_START,
"STOP": CMD_STOP,
"READ_DATA": CMD_READ_DATA,
"SELEZIONA_VALVOLA": CMD_SELEZIONA_VALVOLA,
}
########## COMMAND RETURN CODES and CLASSES ##########
CMD_RET_SUCCESS = 0x0
CMD_RET_ERROR = 0xFF
@dataclass
class CmdReturn:
code: int
cmd: str
message: str = ""
payload: str = ""
def dict(self):
return {k: str(v) for k, v in asdict(self).items()}
def server(queue: Queue, rqueue: Queue, d, l: Lock):
# serves only 5 request and dies
while True:
# Wait for next request from client
# message = socket.recv()
# print("Received request #%s: %s" % (1, message))
# socket.send_string("World from %s" % port)
try:
msg = queue.get()
log.info("Received message: %s" % msg)
match msg.cmd:
case "START":
with l:
d["active"] = True
rqueue.put(CmdReturn(CMD_RET_SUCCESS, "start", "Success"))
break
case "STOP":
with l:
d["active"] = False
break
case "READ_DATA":
with l:
d["busy"] = True
log.info("Reading data")
cmd_test()
log.info("Data read")
with l:
d["busy"] = False
rqueue.put(
CmdReturn(
CMD_RET_SUCCESS,
"start",
"Success",
payload={
"temperature": 25,
"pressure": 100,
"humidity": 50,
"valvola": 1,
},
),
block=False,
)
break
case "SELEZIONA_VALVOLA":
break
case _:
break
# with l:
# d["busy"] = True
# cmd_test()
# time.sleep(0.1)
# with l:
# d[msg] = msg
# d["busy"] = False
except Exception as e:
log.error(e)
rqueue.put(CmdReturn(CMD_RET_ERROR, "error", str(e)))
return
def web_server(port, queue: Queue, rqueue: Queue, d, l: Lock):
from flask import Flask
app = Flask(__name__)
@app.route("/")
def hello_world():
# Send a message to the MQTT server
print("Sending message to MQTT")
queue.put("Hello")
return "<p>Hello, World!</p>"
@app.route("/status")
def status():
return dict(d)
@app.route("/data")
def data():
if not d["active"]:
return CmdReturn(CMD_RET_ERROR, "data", "Device not active").dict()
queue.put(COMMANDS["READ_DATA"], block=False)
time.sleep(2)
msg = rqueue.get()
return msg.dict()
@app.route("/start")
def start():
try:
queue.put(COMMANDS["START"])
ret = rqueue.get()
return ret.dict()
except Exception as e:
log.error(e)
return CmdReturn(CMD_RET_ERROR, "start", str(e)).dict()
@app.route("/stop")
def stop():
try:
queue.put(COMMANDS["STOP"], block=False)
return CmdReturn(CMD_RET_SUCCESS, "stop", "Success").dict()
except Exception as e:
log.error(e)
return CmdReturn(CMD_RET_ERROR, "stop", str(e)).dict()
@app.route("/size")
def size():
return "<p>Size: %s</p>" % queue.qsize()
app.run(port=port)
def client(port, queue, d, l):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(("127.0.0.1", 12345))
sock.listen(0)
while True:
connection, address = sock.accept()
buf = connection.recv(1024).decode()
cmd = json.loads(buf)
log.info("Received command: %s" % cmd)
log.info(json.dumps(buf))
connection.send(json.dumps(buf).encode())
connection.close()
if __name__ == "__main__":
q = Queue()
ret_queue = Queue()
# Manager to create shared object.
with Manager() as manager:
# Create a global variable.
data = manager.dict()
data["busy"] = False
data["active"] = False
lock = manager.Lock()
p1 = Process(target=server, args=(q, ret_queue, data, lock))
p1.daemon = True
p1.start()
# Now we can connect a client to all these servers
p2 = Process(target=client, args=(8881, q, data, lock))
p2.daemon = True
p2.start()
p3 = Process(target=web_server, args=(5100, q, ret_queue, data, lock))
p3.daemon = True
p3.start()
workers = [p1, p2, p3]
try:
for w in workers:
w.join()
except KeyboardInterrupt:
log.error(data)
for w in workers:
w.terminate()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment