-
-
Save euphy/75366a70346689190bb5 to your computer and use it in GitHub Desktop.
#!/usr/bin/python | |
""" | |
MIT License | |
Copyright (c) 2022 Sandy Noble | |
Permission is hereby granted, free of charge, to any person obtaining a copy | |
of this software and associated documentation files (the "Software"), to deal | |
in the Software without restriction, including without limitation the rights | |
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
copies of the Software, and to permit persons to whom the Software is | |
furnished to do so, subject to the following conditions: | |
The above copyright notice and this permission notice shall be included in all | |
copies or substantial portions of the Software. | |
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
SOFTWARE. | |
Only python package required is pyserial: | |
pip install pyserial | |
Use this wee script to send a command queue to a Polargraph machine. | |
If the filename is "mycommandqueue.txt" and the machine is connected on COM7, then do: | |
python send.py mycommandqueue.txt COM7 | |
""" | |
import os | |
import serial | |
import time | |
import sys | |
BAUD_RATE = 57600 | |
class Polargraph(): | |
""" | |
This is a crude model of a drawing machine. It includes it's drawing state | |
as well as the state of the communications lines to the machine and the | |
queue of commands going to it. | |
""" | |
def __init__(self): | |
self.time_started = time.time() | |
serial_port = None | |
file = None | |
# State | |
ready = False | |
file_position = 0 | |
total_lines = 0 | |
def start_serial_comms(self, comm_port, baud_rate=57600): | |
""" | |
Attempts to connect this machine to it's comm_port. It starts two | |
threads, one for reading that is attached to the 'received_log' | |
list, and one thread for writing that is attached to the main outgoing | |
command queue. | |
:param comm_port: The name of the comm port | |
:param baud_rate: The speed to open the serial connection at | |
:return: | |
""" | |
try: | |
self.serial_port = serial.Serial(comm_port, baudrate=baud_rate) | |
print "Connected successfully to %s (%s)." % (COMM_PORT, serial) | |
return True | |
except Exception as e: | |
print("Oh there was an exception loading the port %s: %s" % (comm_port, e)) | |
print e.message | |
self.serial_port = None | |
return False | |
def load_file(self, filename): | |
""" | |
Checks that the file exists, opens it and counts the lines in it. | |
:param filename: | |
:return: | |
""" | |
if os.path.isfile(filename): | |
print "Found %s!" % os.path.abspath(filename) | |
self.file = open(filename, 'r') | |
# Lets have a quick review of this file | |
lines = 0 | |
for lines, l in enumerate(self.file): | |
pass | |
self.total_lines = lines + 1 | |
print "This file has %s lines." % self.total_lines | |
# reset the file position | |
self.file.seek(0) | |
else: | |
self.file = None | |
print "File %s couldn't be found!" % os.path.abspath(filename) | |
def read_line(self): | |
l = self.serial_port.readline().strip() | |
print "received: {:s}".format(l) | |
if l.startswith("READY"): | |
# if it's changing from not ready to ready, then it's just finished a command | |
if not self.ready and self.file_position > 0: # ie it's not the first one | |
time_ran = time.time() - self.time_started | |
time_per_command = time_ran / (self.file_position) | |
time_projected = self.total_lines * time_per_command | |
time_left = time_projected - time_ran | |
m, s = divmod(time_left, 60) | |
h, m = divmod(m, 60) | |
d, h = divmod(h, 24) | |
print "Ran {:d} commands in {:0.2f} seconds: " \ | |
"at {:0.2f} seconds per command, we'll finish in {:02d}:{:02d}:{:02d})".format(self.file_position, | |
time_ran, | |
time_per_command, | |
int(h), int(m), int(s)) | |
self.ready = True | |
def write_line(self): | |
if self.ready and self.file: | |
l = self.file.readline().strip() | |
self.file_position += 1 | |
print "Command {}/{}: {} ({:.0%})".format(self.file_position, self.total_lines, l, (float(self.file_position) / float(self.total_lines))) | |
self.serial_port.write(l + "\n") | |
self.ready = False | |
def commands_queued(self): | |
return self.total_lines - self.file_position | |
def close(self): | |
print "Finished sending {:d} commands in {:0.2f}".format(self.total_lines, time.time() - self.time_started) | |
self.file.close() | |
def main(): | |
# Assemble a Polargraph object, load a file, open the comms port. | |
polargraph = Polargraph() | |
polargraph.load_file(INPUT_FILENAME) | |
opened = polargraph.start_serial_comms(comm_port=COMM_PORT) | |
if not opened: | |
print "There was a problem opening the communications port. It should be entered exactly as you see it in" \ | |
"your operating system." | |
exit() | |
while polargraph.commands_queued(): | |
polargraph.write_line() | |
polargraph.read_line() | |
polargraph.close() | |
if __name__ == "__main__": | |
print len(sys.argv) | |
if len(sys.argv) == 3: | |
INPUT_FILENAME = sys.argv[1] | |
COMM_PORT = sys.argv[2] | |
else: | |
print "Supply filename and serial port as parameters." | |
print "eg: python send.py myqueue.txt COM7" | |
exit() | |
main() |
Found C:\Users\stephanie\mycommands.txt!
This file has 351 lines.
Connected successfully to COM3 (<module 'serial' from 'C:\Users\stephanie\AppData\Local\Programs\Python\Python37-32\lib\site-packages\serial\init.py'>).
received: {:s}
Traceback (most recent call last):
File "send.py", line 160, in
main()
File "send.py", line 145, in main
polargraph.read_line()
File "send.py", line 96, in read_line
print ("received: {:s}").format(l)
AttributeError: 'NoneType' object has no attribute 'format'
i keep getting this error please help i need an offline polargraph controller the polargraphSD isn't available on our country
Python3 edited:
#!/usr/bin/python3
"""
Only python package required is pyserial:
pip install pyserial
Use this wee script to send a command queue to a Polargraph machine.
If the filename is "mycommandqueue.txt" and the machine is connected on COM7, then do:
python send.py mycommandqueue.txt COM7
"""
import os
import serial
import time
import sys
BAUD_RATE = 57600
class Polargraph():
"""
This is a crude model of a drawing machine. It includes it's drawing state
as well as the state of the communications lines to the machine and the
queue of commands going to it.
"""
def __init__(self):
self.time_started = time.time()
serial_port = None
file = None
# State
ready = False
file_position = 0
total_lines = 0
def start_serial_comms(self, comm_port, baud_rate=57600):
"""
Attempts to connect this machine to it's comm_port. It starts two
threads, one for reading that is attached to the 'received_log'
list, and one thread for writing that is attached to the main outgoing
command queue.
:param comm_port: The name of the comm port
:param baud_rate: The speed to open the serial connection at
:return:
"""
try:
self.serial_port = serial.Serial(comm_port, baudrate=baud_rate)
print ("Connected successfully to %s (%s)." % (COMM_PORT, serial))
return True
except Exception as e:
print("Oh there was an exception loading the port %s: %s" % (comm_port, e))
print (e.message)
self.serial_port = None
return False
def load_file(self, filename):
"""
Checks that the file exists, opens it and counts the lines in it.
:param filename:
:return:
"""
if os.path.isfile(filename):
print ("Found %s!" % os.path.abspath(filename))
self.file = open(filename, 'r')
# Lets have a quick review of this file
lines = 0
for lines, l in enumerate(self.file):
pass
self.total_lines = lines + 1
print ("This file has %s lines." % self.total_lines)
# reset the file position
self.file.seek(0)
else:
self.file = None
print ("File %s couldn't be found!" % os.path.abspath(filename))
def read_line(self):
l = self.serial_port.readline().decode('ascii')
l.strip()
print("received: {:s}".format(l))
if l.startswith("READY"):
# if it's changing from not ready to ready, then it's just finished a command
if not self.ready and self.file_position > 0: # ie it's not the first one
time_ran = time.time() - self.time_started
time_per_command = time_ran / (self.file_position)
time_projected = self.total_lines * time_per_command
time_left = time_projected - time_ran
m, s = divmod(time_left, 60)
h, m = divmod(m, 60)
d, h = divmod(h, 24)
print ("Ran {:d} commands in {:0.2f} seconds: " \
"at {:0.2f} seconds per command, we'll finish in {:02d}:{:02d}:{:02d})".format(self.file_position,
time_ran,
time_per_command,
int(h), int(m), int(s)))
self.ready = True
def write_line(self):
if self.ready and self.file:
l = self.file.readline().strip()
self.file_position += 1
print ("Command {}/{}: {} ({:.0%})".format(self.file_position, self.total_lines, l, (float(self.file_position) / float(self.total_lines))))
self.serial_port.write((l + "\n").encode())
self.ready = False
def commands_queued(self):
return self.total_lines - self.file_position
def close(self):
print ("Finished sending {:d} commands in {:0.2f}".format(self.total_lines, time.time() - self.time_started))
self.file.close()
def main():
# Assemble a Polargraph object, load a file, open the comms port.
polargraph = Polargraph()
polargraph.load_file(INPUT_FILENAME)
opened = polargraph.start_serial_comms(comm_port=COMM_PORT)
if not opened:
print ("There was a problem opening the communications port. It should be entered exactly as you see it in" \
"your operating system.")
exit()
while polargraph.commands_queued():
polargraph.write_line()
polargraph.read_line()
polargraph.close()
if __name__ == "__main__":
print (len(sys.argv))
if len(sys.argv) == 3:
INPUT_FILENAME = sys.argv[1]
COMM_PORT = sys.argv[2]
else:
print ("Supply filename and serial port as parameters.")
print ("eg: python send.py myqueue.txt COM7")
exit()
main()
Hi @euphy. Under what license is this script distributed? I'm thinking about redistributing a fork as a Python package via PyPI.
Thanks,
Maja
Hi Majabojarska, added MIT license - plenty permissive, good luck, would love to see the fork!
Thanks @euphy!
I was just tinkering with raspberry pi and this queue sender and I thought wouldn't it be awesome if it had a function to shutdown raspi at the end of the drawing.
I added little codey thingy just after this line
and little bit tweaking the beginning of it and added this import line
and blam !
now it shuts the pi down after the drawing has finished.
edit: shut down also disconnects arduino from pi so basically it deactivates steppers.