Created
April 2, 2017 20:24
-
-
Save tiagocoutinho/0e989f4df9fcd2cdfb46c4f93c1fe170 to your computer and use it in GitHub Desktop.
python progress bar (tqdm) axes motion
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import time | |
import collections | |
class Motor(object): | |
Motion = collections.namedtuple('Motion', 'ipos fpos start_time end_time') | |
def __init__(self, name='overture', unit='mm', speed=10.0): | |
self.name = name | |
self.__pos = 0. | |
self.__motion = None | |
self.speed = speed | |
self.config = dict(unit=unit) | |
def start_move(self, pos, start_time=None): | |
pos = float(pos) | |
if self.state: | |
raise RuntimeError('cannot start another motion while moving') | |
start_time = time.time() if start_time is None else start_time | |
delta_pos = pos - self.__pos | |
delta_time = delta_pos / self.speed | |
end_time = start_time + delta_time | |
self.__motion = Motor.Motion(self.__pos, pos, start_time, end_time) | |
def stop(self): | |
self.__pos = self.__update() | |
self.__motion = None | |
def __update(self): | |
if self.__motion is None: | |
return self.__pos | |
curr_time = time.time() | |
ipos, fpos, start_time, end_time = self.__motion | |
if curr_time >= end_time: | |
self.__pos = fpos | |
self.__motion = None | |
else: | |
self.__pos = ipos + self.speed * (curr_time - start_time) | |
return self.__pos | |
@property | |
def speed(self): | |
return self.__speed | |
@speed.setter | |
def speed(self, new_speed): | |
self.__speed = float(new_speed) | |
@property | |
def state(self): | |
self.__update() | |
return 1 if self.__motion else 0 | |
@property | |
def position(self): | |
return self.__update() | |
@position.setter | |
def position(self, new_pos): | |
if self.state: | |
raise RuntimeError('cannot set position while moving') | |
self.__pos = float(new_pos) | |
@property | |
def motion(self): | |
return self.__motion | |
import tqdm | |
class MotionBar(tqdm.tqdm): | |
monitor_interval = 0 | |
def __init__(self, motor, pos, label=None, **kwargs): | |
label = motor.name if label is None else label | |
ipos, tpos = motor.position, pos | |
unit = motor.config.get('unit') | |
unit = unit if unit else '' | |
delta_pos = abs(tpos - ipos) | |
self.__motor = motor | |
self.__initial_position = ipos | |
self.__target_position = tpos | |
self.__total_displacement = delta_pos | |
self.__unit = unit | |
l_bar = u'{desc}|' | |
r_bar = u'|{{percentage:=3.0f}}% \u0394p={{n:.02f}}/{{total:.02f}}{unit} \u0394t={{elapsed}} ETA:{{remaining}}, v:{{rate_fmt}}'.format(unit=unit) | |
bar_format = u'{l_bar}{{bar}}{r_bar}'.format(l_bar=l_bar, r_bar=r_bar) | |
kwargs['total'] = delta_pos | |
# kwargs['unit'] = unit | |
kwargs['bar_format'] = bar_format | |
super(MotionBar, self).__init__(**kwargs) | |
self.__desc_format = u'{label}={{pos_fmt}}'.format(label=label) | |
def motor_update(self, new_position=None, description=None): | |
if new_position is None: | |
new_position = self.__motor.position | |
if description is None: | |
pos_fmt = self.position_format(new_position) | |
description = self.__desc_format.format(pos_fmt=pos_fmt) | |
self.set_description(description) | |
displacement = abs(new_position - self.__initial_position) | |
self.update(displacement - self.n) | |
def run(self): | |
while self.__motor.state: | |
self.motor_update() | |
def position_format(self, pos = None): | |
if pos is None: | |
pos = self.__motor.position | |
return u'{0:.3f}{1}'.format(pos, self.__unit) | |
def move_simple_tqdm(motor, position): | |
with MotionBar(motor, position) as pb: | |
motor.start_move(position) | |
try: | |
pb.run() | |
except KeyboardInterrupt: | |
motor.stop() | |
pb.write('motion aborted') | |
import contextlib | |
def move_tqdm(*args): | |
motors, positions = args[::2], args[1::2] | |
motor_positions = zip(motors, positions) | |
bars = [MotionBar(m, p, position=i) | |
for i, (m, p) in enumerate(motor_positions)] | |
with contextlib.nested(*bars): | |
for motor, position in motor_positions: | |
motor.start_move(position) | |
try: | |
motion = True | |
while motion: | |
motion = False | |
for motor, bar in zip(motors, bars): | |
if not bar.disable: | |
bar.motor_update() | |
if motor.state: | |
motion = True | |
else: | |
bar.close() | |
time.sleep(0.1) | |
except KeyboardInterrupt: | |
for motor, bar in zip(motors, bars): | |
motor.stop() | |
finally: | |
for motor, bar in zip(motors, bars): | |
bar.motor_update() | |
def demo(): | |
th = Motor(' th', unit=u'deg') | |
tth = Motor('tth', unit=u'deg') | |
chi = Motor('chi', unit='deg') | |
phi = Motor('phi', unit='deg') | |
mu = Motor(' mu', unit='deg') | |
gam = Motor('gam', unit='deg') | |
th.position = 10. | |
th.speed = 9. | |
tth.position = 10. | |
tth.speed = 8. | |
chi.position = 10. | |
chi.speed = 7. | |
phi.position = 10. | |
phi.speed = 6. | |
mu.position = 10. | |
mu.speed = 5. | |
gam.position = 10. | |
gam.speed = 4. | |
move_tqdm(th, 360.5, tth, 45.5, chi, 55.2, phi, 35.5, mu, 27.9, gam, 12.40) | |
if __name__ == "__main__": | |
demo() | |
print('') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment