Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save CMCDragonkai/9db94abbad02dbdd285c587ee4eceb94 to your computer and use it in GitHub Desktop.
Save CMCDragonkai/9db94abbad02dbdd285c587ee4eceb94 to your computer and use it in GitHub Desktop.
Python: Multithreaded, Multiprocessed, Signal Handling, Serial Port to TCP Filter/Proxy

Multithreaded, Multiprocessed, Signal Handling, Serial Port to TCP Filter/Proxy

To run it, terminal 1:

socat -d -d pty,raw,b38400 pty,raw,b38400 &
./serial_to_tcp_filter.py /dev/pty1 38400 127.0.0.1 8888

Terminal 2:

picocom --baud 38400 --echo /dev/pty2
Device Ready!\n

Make sure to write \n, try Ctrl + J.

Terminal 3:

nc 127.0.0.1 8888
Get me Data!!

To exit picocom, enter Ctrl + A, Ctrl + X.

Todo:

#!/usr/bin/env python3
import sys
import signal
import serial
import threading
import socketserver
from copy import deepcopy
from multiprocessing import Pool
# globals
output_data_1 = 0
output_data_2 = 0.0
# TCP server will be in its own thread
class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
pass
# TCP request handling will spawn its own thread
class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
def handle(self):
print("Responding to new client: {}".format(self.request.getpeername()))
# setting the read timeout to 5 seconds
# if we get no reads, then close the connection
self.request.settimeout(5)
try:
while True:
if not (self.request.recv(1024)): break
self.request.sendall(bytes("{0}:{1}\n".format(output_data_1, output_data_2), 'ascii'))
print("Client: {} closed connection.".format(self.request.getpeername()))
except:
print("Timed out waiting for client, closing connection to client: {}".format(self.request.getpeername()))
def processing_function(sensor_message, request_ticker):
print("{}: Processing Sensor Message!".format(request_ticker))
sensor_message = sensor_message.decode("utf-8")
interesting_number = int(sensor_message) + 1
return (interesting_number, request_ticker)
def callback_function(processed_data):
# we need to declare these variables as global in order for the callback function to asynchronously mutate them
global output_data_1
global output_data_2
(interesting_number, request_ticker) = processed_data
output_data_1 = interesting_number + 1
output_data_2 = interesting_number + 1.0
print("{}: Processed Sensor Message!".format(request_ticker))
def cleanup_and_exit(pool, sensor, server, code):
print("Closing Orbit Detection Process Pool and TCP Server!")
pool.close() if pool is not None else None
sensor.close() if sensor is not None else None
server.shutdown() if server is not None else None
server.server_close() if server is not None else None
sys.exit(code)
if __name__ == "__main__":
pool = None
sensor = None
server = None
device_path = sys.argv[1]
baud_rate = int(sys.argv[2])
host = sys.argv[3]
port = int(sys.argv[4])
# the child process will inherit the signal handlers of the parent
# so we make sure to ignore the termination signals, then fork to a child process
# because the parent process will clean up the child process
exit_handler = lambda signum, frame: cleanup_and_exit(pool, sensor, server, 0)
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal.SIG_IGN)
signal.signal(signal.SIGQUIT, signal.SIG_IGN)
signal.signal(signal.SIGHUP, signal.SIG_IGN)
pool = Pool(processes=1)
signal.signal(signal.SIGINT, exit_handler)
signal.signal(signal.SIGTERM, exit_handler)
signal.signal(signal.SIGQUIT, exit_handler)
signal.signal(signal.SIGHUP, exit_handler)
server = ThreadedTCPServer((host, port), ThreadedTCPRequestHandler)
server_thread = threading.Thread(target=server.serve_forever)
server_thread.daemon = True
server_thread.start()
sensor = serial.Serial(device_path, baud_rate)
sensor.reset_input_buffer()
sensor.reset_output_buffer()
print("Waiting for device to be ready...")
sensor.timeout = None
if sensor.readline() != b"Device Ready!\n":
print("Device was not ready! Exiting!")
cleanup_and_exit(pool, sensor, server, 1)
sensor.timeout = 1
print("Device is ready!")
print("Host is ready!")
sensor.write(b"Host Ready!\n")
# we'll use this to keep track of a single request in a concurrent environment
request_ticker = 0
while(True):
print("{}: Reading from the sensor!".format(request_ticker))
sensor_message = sensor.readline()
if sensor_message == b"":
print("{}: Timed out reading from the sensor, skipping!".format(request_ticker))
request_ticker += 1
continue
# deep copy the data going into the child process
# it isn't needed on primitive data structures, but is needed for abstract data structures
# here it's not needed, but I put it here so that you remember about it!
pool.apply_async(processing_function, args=(deepcopy(sensor_message), request_ticker), callback=callback_function)
request_ticker += 1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment