Last active
October 8, 2021 06:48
-
-
Save sgsfak/e9b1636a2464316abfa1d8abcf872649 to your computer and use it in GitHub Desktop.
a python server accepting MLLP-wrapped requests for PANACEA IMP
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket | |
import threading | |
import socketserver | |
import sys | |
import time | |
def send_command(command:str)->str: | |
HOST = '127.0.0.1' | |
PORT = 27016 | |
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: | |
s.connect((HOST, PORT)) | |
s.sendall(command.encode('utf-8')) | |
data = [] | |
while True: | |
msg = s.recv(1024) | |
if not msg: | |
break | |
print(f"Got '{msg}'") | |
data.append(msg.decode("utf-8", "strict")) | |
return "\n".join(data) | |
def debug(msg:str): | |
print('\033[32m' + msg+ '\033[0m', file=sys.stderr) | |
tries = [min(2, i) for i in range(1, 8)] | |
def acquire_credentials(): | |
response = send_command('Launch capture') | |
if response == 'command received': | |
debug('Capture cmd submitted.. polling for result') | |
response = send_command('Matching result') | |
for k, t in enumerate(tries): | |
debug(f"Try {k+1}, got '{response}'") | |
if response != "0": | |
[username, password] = response.splitlines()[:2] | |
print(f"Username: {username} Password: '{password}'") | |
return (username, password) | |
debug(f"Sleeping for {t} seconds") | |
time.sleep(t) | |
response = send_command('Matching result') | |
else: | |
debug(f"Error, got '{response}'") | |
send_command("Stop capture") | |
return None | |
HOST = '' | |
PORT = 27017 | |
header = b'\x0b' | |
trailer = b'\x1c\x0d' | |
class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler): | |
def handle(self): | |
while True: | |
req = self.request.recv(1024) | |
if not req: | |
return | |
msg = req.removeprefix(header).removesuffix(trailer).decode() | |
print(f"Got {msg}") | |
creds = acquire_credentials() | |
if creds: | |
data = "\n".join(creds) | |
else: | |
data = "" | |
response = bytearray(header) + bytearray(data.encode("utf-8")) + bytearray(trailer) | |
self.request.sendall(response) | |
class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): | |
pass | |
def client(port, message): | |
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: | |
sock.connect(('127.0.0.1', port)) | |
sock.sendall(bytearray(header) + bytearray(message.encode("utf-8")) + bytearray(trailer)) | |
response = sock.recv(1024).removeprefix(header).removesuffix(trailer).decode() | |
print("Received: {}".format(response)) | |
if __name__ == "__main__": | |
server = ThreadedTCPServer((HOST, PORT), ThreadedTCPRequestHandler) | |
with server: | |
ip, port = server.server_address | |
print(f"Server started in {ip}:{port}") | |
server.serve_forever() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment