|
#!/usr/bin/env python3 |
|
|
|
import sys |
|
import signal |
|
import serial |
|
import threading |
|
import socketserver |
|
from copy import deepcopy |
|
from multiprocessing import Pool |
|
|
|
# globals |
|
output_data_1 = 0 |
|
output_data_2 = 0.0 |
|
|
|
# TCP server will be in its own thread |
|
class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): |
|
pass |
|
|
|
# TCP request handling will spawn its own thread |
|
class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler): |
|
|
|
def handle(self): |
|
|
|
print("Responding to new client: {}".format(self.request.getpeername())) |
|
# setting the read timeout to 5 seconds |
|
# if we get no reads, then close the connection |
|
self.request.settimeout(5) |
|
try: |
|
while True: |
|
if not (self.request.recv(1024)): break |
|
self.request.sendall(bytes("{0}:{1}\n".format(output_data_1, output_data_2), 'ascii')) |
|
print("Client: {} closed connection.".format(self.request.getpeername())) |
|
except: |
|
print("Timed out waiting for client, closing connection to client: {}".format(self.request.getpeername())) |
|
|
|
def processing_function(sensor_message, request_ticker): |
|
|
|
print("{}: Processing Sensor Message!".format(request_ticker)) |
|
sensor_message = sensor_message.decode("utf-8") |
|
interesting_number = int(sensor_message) + 1 |
|
return (interesting_number, request_ticker) |
|
|
|
def callback_function(processed_data): |
|
|
|
# we need to declare these variables as global in order for the callback function to asynchronously mutate them |
|
global output_data_1 |
|
global output_data_2 |
|
(interesting_number, request_ticker) = processed_data |
|
output_data_1 = interesting_number + 1 |
|
output_data_2 = interesting_number + 1.0 |
|
print("{}: Processed Sensor Message!".format(request_ticker)) |
|
|
|
def cleanup_and_exit(pool, sensor, server, code): |
|
|
|
print("Closing Orbit Detection Process Pool and TCP Server!") |
|
pool.close() if pool is not None else None |
|
sensor.close() if sensor is not None else None |
|
server.shutdown() if server is not None else None |
|
server.server_close() if server is not None else None |
|
sys.exit(code) |
|
|
|
if __name__ == "__main__": |
|
|
|
pool = None |
|
sensor = None |
|
server = None |
|
|
|
device_path = sys.argv[1] |
|
baud_rate = int(sys.argv[2]) |
|
host = sys.argv[3] |
|
port = int(sys.argv[4]) |
|
|
|
# the child process will inherit the signal handlers of the parent |
|
# so we make sure to ignore the termination signals, then fork to a child process |
|
# because the parent process will clean up the child process |
|
exit_handler = lambda signum, frame: cleanup_and_exit(pool, sensor, server, 0) |
|
signal.signal(signal.SIGINT, signal.SIG_IGN) |
|
signal.signal(signal.SIGTERM, signal.SIG_IGN) |
|
signal.signal(signal.SIGQUIT, signal.SIG_IGN) |
|
signal.signal(signal.SIGHUP, signal.SIG_IGN) |
|
pool = Pool(processes=1) |
|
signal.signal(signal.SIGINT, exit_handler) |
|
signal.signal(signal.SIGTERM, exit_handler) |
|
signal.signal(signal.SIGQUIT, exit_handler) |
|
signal.signal(signal.SIGHUP, exit_handler) |
|
|
|
server = ThreadedTCPServer((host, port), ThreadedTCPRequestHandler) |
|
server_thread = threading.Thread(target=server.serve_forever) |
|
server_thread.daemon = True |
|
server_thread.start() |
|
|
|
sensor = serial.Serial(device_path, baud_rate) |
|
sensor.reset_input_buffer() |
|
sensor.reset_output_buffer() |
|
|
|
print("Waiting for device to be ready...") |
|
sensor.timeout = None |
|
if sensor.readline() != b"Device Ready!\n": |
|
print("Device was not ready! Exiting!") |
|
cleanup_and_exit(pool, sensor, server, 1) |
|
sensor.timeout = 1 |
|
print("Device is ready!") |
|
|
|
print("Host is ready!") |
|
sensor.write(b"Host Ready!\n") |
|
|
|
# we'll use this to keep track of a single request in a concurrent environment |
|
request_ticker = 0 |
|
|
|
while(True): |
|
|
|
print("{}: Reading from the sensor!".format(request_ticker)) |
|
|
|
sensor_message = sensor.readline() |
|
|
|
if sensor_message == b"": |
|
print("{}: Timed out reading from the sensor, skipping!".format(request_ticker)) |
|
request_ticker += 1 |
|
continue |
|
|
|
# deep copy the data going into the child process |
|
# it isn't needed on primitive data structures, but is needed for abstract data structures |
|
# here it's not needed, but I put it here so that you remember about it! |
|
pool.apply_async(processing_function, args=(deepcopy(sensor_message), request_ticker), callback=callback_function) |
|
|
|
request_ticker += 1 |