Last active
July 2, 2023 06:21
-
-
Save jim-mckeown/53f797fd6596bc559e92a68ed47ece82 to your computer and use it in GitHub Desktop.
Text command line processor
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| """ | |
| Created on Thu Jun 29 14:56:17 2023 | |
| @author: jmckeown | |
| Generate a random number at regular interval in a thread | |
| Expose method to get the latest random number | |
| """ | |
| import random | |
| import time | |
| import threading | |
| class generator: | |
| def __init__(self): | |
| #self.s = s | |
| random.seed() | |
| self.okToRun = True | |
| self.generatedNum = 0.0 | |
| thread = threading.Thread(target=self.run, args=()) | |
| thread.daemon = False # Daemonize thread or not | |
| thread.start() # Start the execution | |
| def run(self): | |
| while self.okToRun: | |
| self.generatedNum = random.random() | |
| time.sleep(1) | |
| print("ending " + __name__) | |
| def getRandom(self): | |
| return self.generatedNum | |
| def setOkToRun(self, state): | |
| self.okToRun = state | |
| def getOkToRun(self): | |
| return self.okToRun | |
| if __name__ == '__main__': | |
| g = generator() | |
| print(g.generatedNum) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| """ | |
| Created on Thu Jun 29 14:56:17 2023 | |
| @author: jmckeown | |
| """ | |
| import socket | |
| import generator # external custom class | |
| commands = [["cmd0", "cmd1", "cmd2"], | |
| ["arg0", "arg1", "arg2"], | |
| ["arg0", "arg1", "arg2"]] | |
| HOST = "127.0.0.1" # Standard loopback interface address (localhost) | |
| PORT = 5000 # Port to listen on (non-privileged ports are > 1023) | |
| g = generator.generator() | |
| s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
| s.bind((HOST, PORT)) | |
| print("Waiting for connection...") | |
| s.listen() | |
| conn, addr = s.accept() | |
| print(f"Connected by {addr}") | |
| buffer = b'' | |
| try: | |
| while True: | |
| data = conn.recv(1) | |
| if not data: | |
| break | |
| buffer += data | |
| if buffer.decode().endswith("\r\n"): | |
| # parse buffer | |
| strBuffer = buffer.decode() # convert received bytes to a string | |
| strBuffer = strBuffer.removesuffix("\r\n") # strip off terminating cr, lf | |
| aryBuffer = strBuffer.split(",") # parse received string into string array | |
| if len(aryBuffer) > 0: # check for zero length array | |
| if aryBuffer[0] == commands[0][0]: # process command 0 | |
| if len(aryBuffer) == 1: | |
| # process command 0 with no parameters: | |
| buffer = b'Received command 0 with no paramaters\r\n' | |
| conn.sendall(buffer) | |
| if len(aryBuffer) == 2: | |
| # process command 0 with 1 parameter | |
| buffer = b'Received cmmand 0 with paramater: ' + bytes(aryBuffer[1], 'UTF-8') + b'\r\n' | |
| conn.sendall(buffer) | |
| if aryBuffer[0] == commands[0][1]: # process command 1 | |
| if len(aryBuffer) == 1: | |
| # process command 1 with no parameters | |
| buffer = b'Received command 1 with no paramaters\r\n' | |
| conn.sendall(buffer) | |
| if len(aryBuffer) == 2: | |
| # process command 1 with 1 parameter | |
| buffer = b'Received cmmand 1 with paramater: ' + bytes(aryBuffer[1], 'UTF-8') + b' ' + bytes(str(g.getRandom()), 'UTF-8') + b'\r\n' | |
| conn.sendall(buffer) | |
| buffer = b'' #clear buffer after command is processed | |
| finally: # close socket if any error is raised | |
| print("ending " + __name__) | |
| s.shutdown(socket.SHUT_RDWR) | |
| s.close() | |
| g.setOkToRun(False) | |
| g = None |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment