Skip to content

Instantly share code, notes, and snippets.

@jim-mckeown
Last active July 2, 2023 06:21
Show Gist options
  • Save jim-mckeown/53f797fd6596bc559e92a68ed47ece82 to your computer and use it in GitHub Desktop.
Save jim-mckeown/53f797fd6596bc559e92a68ed47ece82 to your computer and use it in GitHub Desktop.
Text command line processor
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Thu Jun 29 14:56:17 2023
@author: jmckeown
Generate a random number at regular interval in a thread
Expose method to get the latest random number
"""
import random
import time
import threading
class generator:
def __init__(self):
#self.s = s
random.seed()
self.okToRun = True
self.generatedNum = 0.0
thread = threading.Thread(target=self.run, args=())
thread.daemon = False # Daemonize thread or not
thread.start() # Start the execution
def run(self):
while self.okToRun:
self.generatedNum = random.random()
time.sleep(1)
print("ending " + __name__)
def getRandom(self):
return self.generatedNum
def setOkToRun(self, state):
self.okToRun = state
def getOkToRun(self):
return self.okToRun
if __name__ == '__main__':
g = generator()
print(g.generatedNum)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Thu Jun 29 14:56:17 2023
@author: jmckeown
"""
import socket
import generator # external custom class
commands = [["cmd0", "cmd1", "cmd2"],
["arg0", "arg1", "arg2"],
["arg0", "arg1", "arg2"]]
HOST = "127.0.0.1" # Standard loopback interface address (localhost)
PORT = 5000 # Port to listen on (non-privileged ports are > 1023)
g = generator.generator()
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((HOST, PORT))
print("Waiting for connection...")
s.listen()
conn, addr = s.accept()
print(f"Connected by {addr}")
buffer = b''
try:
while True:
data = conn.recv(1)
if not data:
break
buffer += data
if buffer.decode().endswith("\r\n"):
# parse buffer
strBuffer = buffer.decode() # convert received bytes to a string
strBuffer = strBuffer.removesuffix("\r\n") # strip off terminating cr, lf
aryBuffer = strBuffer.split(",") # parse received string into string array
if len(aryBuffer) > 0: # check for zero length array
if aryBuffer[0] == commands[0][0]: # process command 0
if len(aryBuffer) == 1:
# process command 0 with no parameters:
buffer = b'Received command 0 with no paramaters\r\n'
conn.sendall(buffer)
if len(aryBuffer) == 2:
# process command 0 with 1 parameter
buffer = b'Received cmmand 0 with paramater: ' + bytes(aryBuffer[1], 'UTF-8') + b'\r\n'
conn.sendall(buffer)
if aryBuffer[0] == commands[0][1]: # process command 1
if len(aryBuffer) == 1:
# process command 1 with no parameters
buffer = b'Received command 1 with no paramaters\r\n'
conn.sendall(buffer)
if len(aryBuffer) == 2:
# process command 1 with 1 parameter
buffer = b'Received cmmand 1 with paramater: ' + bytes(aryBuffer[1], 'UTF-8') + b' ' + bytes(str(g.getRandom()), 'UTF-8') + b'\r\n'
conn.sendall(buffer)
buffer = b'' #clear buffer after command is processed
finally: # close socket if any error is raised
print("ending " + __name__)
s.shutdown(socket.SHUT_RDWR)
s.close()
g.setOkToRun(False)
g = None
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment