Skip to content

Instantly share code, notes, and snippets.

@chiahaoliu
Forked from CJ-Wright/nd_per_step.py
Last active July 24, 2018 22:20
Show Gist options
  • Save chiahaoliu/5f0cc63d69d398146066c6408e52b856 to your computer and use it in GitHub Desktop.
Save chiahaoliu/5f0cc63d69d398146066c6408e52b856 to your computer and use it in GitHub Desktop.
import time
import numpy as np
import bluesky.plans as bp
import bluesky.plan_stubs as bps
import bluesky.preprocessors as bpp
from bluesky.utils import Msg, short_uid as _short_uid
from bluesky.preprocessors import subs_wrapper
from bluesky.simulators import summarize_plan
from bluesky.callbacks import LiveTable
from xpdacq.xpdacq_conf import xpd_configuration
def configure_area_det_expo(expo):
area_det = xpd_configuration['area_det']
# compute number of frames
acq_time = area_det.cam.acquire_time.get()
_check_mini_expo(exposure, acq_time)
num_frame = np.ceil(exposure / acq_time)
computed_exposure = num_frame * acq_time
yield from bps.mv(area_det.images_per_set, num_frame)
# print exposure time
print("INFO: requested exposure time = {} - > computed exposure time"
"= {}".format(exposure, computed_exposure))
def light_dark_nd_step(detectors, step, pos_cache):
"""
Inner loop of an N-dimensional step scan
This is the default function for ``per_step`` param`` in ND plans.
Parameters
----------
detectors : iterable
devices to read
step : dict
mapping motors to positions in this step
pos_cache : dict
mapping motors to their last-set positions
"""
def move():
yield Msg('checkpoint')
grp = _short_uid('set')
for motor, pos in step.items():
if pos == pos_cache[motor]:
# This step does not move this motor.
continue
#if motor == cryostat1:
# for (low, hi), heater_pos in heater_dict.items():
# if low < pos <= hi:
# yield Msg('set', cryostat1.heater, heater_pos, group=grp)
yield Msg('set', motor, pos, group=grp)
pos_cache[motor] = pos
yield Msg('wait', None, group=grp)
motors = step.keys()
yield from bps.abs_set(shutter, sh_close, wait=True)
yield from move()
for k, v in sub_sample_dict.items():
if np.abs(sample_motor.get().user_readback - k) < 1.5:
inner_dict = v
break
else:
inner_dict = {'exposure': .1, 'wait': 1}
print(inner_dict)
yield from configure_area_det_expo(inner_dict['exposure'])
yield from bps.trigger_and_read(list(detectors) + list(motors) +
[shutter], name='dark')
yield from bps.abs_set(shutter, sh_open, wait=True)
t0 = time.time()
yield from bps.trigger_and_read(list(detectors) + list(motors) +
[shutter])
t1 = time.time()
print("INFO: exposure time (plus minor message overhead) = {:.2f}".format(t1-t0))
yield from bps.abs_set(shutter, sh_close, wait=True)
yield from bps.sleep(inner_dict['wait'])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment