Skip to content

Instantly share code, notes, and snippets.

@branw
Created November 5, 2018 20:00
Show Gist options
  • Save branw/9ee4d989901441a3fc44fe76603d9b12 to your computer and use it in GitHub Desktop.
Save branw/9ee4d989901441a3fc44fe76603d9b12 to your computer and use it in GitHub Desktop.
def main():
try:
ptu = PTU(PTU_PORT)
except serial.serialutil.SerialException:
print(f'Could not connect to pan-tilt on port {PTU_PORT}')
return
# Open connection to device
ser = serial.Serial(DUE_PORT)
ser.setDTR(False)
ser.flushInput()
ser.setDTR(True)
print('Communicating over port {}'.format(ser.name))
# Send handshake to device
ser.write(bytearray([0, 0, 0, 0, 0]))
opcode, response_len = struct.unpack('<BI', ser.read(size=5))
if opcode != 0x80 or response_len != 2:
print('Unexpected packet! Reset the Due'
.format(opcode, response_len))
return
version = struct.unpack('<BB', ser.read(size=response_len))
print('Connected to device v{}.{}'.format(*version))
if version != (0, 2):
print('Invalid device version! Update the Due')
return
# Ask for name of output folder
print('Enter name of folder: ', end='')
folder_name = input()
# Create a subplot for each channel
f, (ax1, ax2) = plt.subplots(1, 2, sharey=True)
ax1.set_xlim([0, 10000])
ax1.set_ylim([0, 4096])
ax2.set_xlim([0, 10000])
ax2.set_ylim([0, 4096])
num_runs = 0
use_dynamic_motion = True
trial_start = datetime.now()
last_switch = time.time_ns()
runs_since_switch = 0
first_since_switch = True
print('Starting with {}'.format('dynamic' if use_dynamic_motion else 'static'))
while True:
try:
# Switch motion profile from static to dynamic every 10 seconds
if runs_since_switch == SWITCH_INTERVAL:
use_dynamic_motion ^= True
print('Switching to {}'.format('dynamic' if use_dynamic_motion else 'static'))
# Wait for some time if we're switching from dynamic to static
if not use_dynamic_motion:
while (time.time_ns() - last_switch) * 1e-6 < DELAY_AFTER_SWITCHING_FROM_DYNAMIC:
pass
num_runs_since_switch = 0
last_switch = time.time_ns()
runs_since_switch = 0
first_since_switch = True
run_start = time.time_ns()
# Initiate data collection
collection_opcode = (3 if runs_since_switch % DYNAMIC_REPEAT_INTERVAL == 0else 4) if use_dynamic_motion else 2
print(collection_opcode)
ser.write(bytearray([collection_opcode, 0, 0, 0, 0]))
opcode, response_len = struct.unpack('<BI', ser.read(size=5))
if opcode != (0x80 | collection_opcode) or response_len != 0:
print('Unexpected packet! Reset the Due'
.format(opcode, response_len))
return
first_since_switch = False
# Wait for data collection to finish
opcode, response_len = struct.unpack('<BI', ser.read(size=5))
if opcode != 0x82:
print('Unexpected packet! opcode=0x{:02x}, response_len={}'
.format(opcode, response_len))
return
run_end = time.time_ns()
# Create output folder and file
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S%f')[:-3]
output_folder = folder_name + '/' + ('dynamic' if use_dynamic_motion else 'static') + '/'
output_filename = timestamp + '.txt'
output_path = output_folder + output_filename
if not os.path.exists(output_folder):
os.makedirs(output_folder)
# Transfer the raw data
raw_data = ser.read(size=response_len)
#while len(raw_data) < response_len:
# raw_data.extend(ser.read(size=4000))
# Process the raw data
joined_data = [(y << 8) | x for x, y in zip(raw_data[::2], raw_data[1::2])]
separated_data = joined_data[::2] + joined_data[1::2]
with open(output_path, 'w') as f:
for data in separated_data:
f.write('{}\n'.format(data))
num_runs += 1
runs_since_switch += 1
# Periodically plot an incoming signal
if num_runs % PLOT_INTERVAL == 0:
elapsed = datetime.now() - trial_start
# Leave a status message
ax1.set_title('{} runs - {}'.format(num_runs, str(elapsed)[:-7]))
ax2.set_title('{} runs/min'.format(int(num_runs/max(elapsed.seconds,1)*60)))
# Clear previous lines for speed
ax1.lines = ax2.lines = []
# Split the data into two channels
split_point = len(separated_data)//2
left, right = separated_data[:split_point], separated_data[split_point:]
ax1.plot(left)
ax2.plot(right)
# Show the plot without blocking (there's no separate UI
# thread)
plt.show(block=False)
plt.pause(0.001)
# Make sure that each run takes at least RUN_DURATION
while (time.time_ns() - run_start) * 1e-6 < RUN_DURATION:
pass
# Show timing info (this adds time!)
#processing_end = time.time_ns()
#print('collect: {:0.3f} ms, total: {:0.3f} ms'.format((run_end - run_start) * 1e-6, (processing_end - run_start) * 1e-6))
except KeyboardInterrupt:
break
print('Closing connection')
ser.close()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment