Last active
May 6, 2022 18:56
-
-
Save euphy/75366a70346689190bb5 to your computer and use it in GitHub Desktop.
Polargraph queue sender (low resource)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
""" | |
MIT License | |
Copyright (c) 2022 Sandy Noble | |
Permission is hereby granted, free of charge, to any person obtaining a copy | |
of this software and associated documentation files (the "Software"), to deal | |
in the Software without restriction, including without limitation the rights | |
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
copies of the Software, and to permit persons to whom the Software is | |
furnished to do so, subject to the following conditions: | |
The above copyright notice and this permission notice shall be included in all | |
copies or substantial portions of the Software. | |
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
SOFTWARE. | |
Only python package required is pyserial: | |
pip install pyserial | |
Use this wee script to send a command queue to a Polargraph machine. | |
If the filename is "mycommandqueue.txt" and the machine is connected on COM7, then do: | |
python send.py mycommandqueue.txt COM7 | |
""" | |
import os | |
import serial | |
import time | |
import sys | |
BAUD_RATE = 57600 | |
class Polargraph(): | |
""" | |
This is a crude model of a drawing machine. It includes it's drawing state | |
as well as the state of the communications lines to the machine and the | |
queue of commands going to it. | |
""" | |
def __init__(self): | |
self.time_started = time.time() | |
serial_port = None | |
file = None | |
# State | |
ready = False | |
file_position = 0 | |
total_lines = 0 | |
def start_serial_comms(self, comm_port, baud_rate=57600): | |
""" | |
Attempts to connect this machine to it's comm_port. It starts two | |
threads, one for reading that is attached to the 'received_log' | |
list, and one thread for writing that is attached to the main outgoing | |
command queue. | |
:param comm_port: The name of the comm port | |
:param baud_rate: The speed to open the serial connection at | |
:return: | |
""" | |
try: | |
self.serial_port = serial.Serial(comm_port, baudrate=baud_rate) | |
print "Connected successfully to %s (%s)." % (COMM_PORT, serial) | |
return True | |
except Exception as e: | |
print("Oh there was an exception loading the port %s: %s" % (comm_port, e)) | |
print e.message | |
self.serial_port = None | |
return False | |
def load_file(self, filename): | |
""" | |
Checks that the file exists, opens it and counts the lines in it. | |
:param filename: | |
:return: | |
""" | |
if os.path.isfile(filename): | |
print "Found %s!" % os.path.abspath(filename) | |
self.file = open(filename, 'r') | |
# Lets have a quick review of this file | |
lines = 0 | |
for lines, l in enumerate(self.file): | |
pass | |
self.total_lines = lines + 1 | |
print "This file has %s lines." % self.total_lines | |
# reset the file position | |
self.file.seek(0) | |
else: | |
self.file = None | |
print "File %s couldn't be found!" % os.path.abspath(filename) | |
def read_line(self): | |
l = self.serial_port.readline().strip() | |
print "received: {:s}".format(l) | |
if l.startswith("READY"): | |
# if it's changing from not ready to ready, then it's just finished a command | |
if not self.ready and self.file_position > 0: # ie it's not the first one | |
time_ran = time.time() - self.time_started | |
time_per_command = time_ran / (self.file_position) | |
time_projected = self.total_lines * time_per_command | |
time_left = time_projected - time_ran | |
m, s = divmod(time_left, 60) | |
h, m = divmod(m, 60) | |
d, h = divmod(h, 24) | |
print "Ran {:d} commands in {:0.2f} seconds: " \ | |
"at {:0.2f} seconds per command, we'll finish in {:02d}:{:02d}:{:02d})".format(self.file_position, | |
time_ran, | |
time_per_command, | |
int(h), int(m), int(s)) | |
self.ready = True | |
def write_line(self): | |
if self.ready and self.file: | |
l = self.file.readline().strip() | |
self.file_position += 1 | |
print "Command {}/{}: {} ({:.0%})".format(self.file_position, self.total_lines, l, (float(self.file_position) / float(self.total_lines))) | |
self.serial_port.write(l + "\n") | |
self.ready = False | |
def commands_queued(self): | |
return self.total_lines - self.file_position | |
def close(self): | |
print "Finished sending {:d} commands in {:0.2f}".format(self.total_lines, time.time() - self.time_started) | |
self.file.close() | |
def main(): | |
# Assemble a Polargraph object, load a file, open the comms port. | |
polargraph = Polargraph() | |
polargraph.load_file(INPUT_FILENAME) | |
opened = polargraph.start_serial_comms(comm_port=COMM_PORT) | |
if not opened: | |
print "There was a problem opening the communications port. It should be entered exactly as you see it in" \ | |
"your operating system." | |
exit() | |
while polargraph.commands_queued(): | |
polargraph.write_line() | |
polargraph.read_line() | |
polargraph.close() | |
if __name__ == "__main__": | |
print len(sys.argv) | |
if len(sys.argv) == 3: | |
INPUT_FILENAME = sys.argv[1] | |
COMM_PORT = sys.argv[2] | |
else: | |
print "Supply filename and serial port as parameters." | |
print "eg: python send.py myqueue.txt COM7" | |
exit() | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thanks @euphy!