Last active
March 21, 2017 02:10
-
-
Save chiahaoliu/8e95a7a06882613a4e99396f6d6cdadf to your computer and use it in GitHub Desktop.
170320_XPDsupport
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import numpy as np | |
from itertools import product | |
from xpdacq.beamtime import _configure_area_det | |
import bluesky.plans as bp | |
from bluesky.plans import (scan, subs_wrapper, abs_set, count, list_scan, | |
adaptive_scan, reset_positions_wrapper) | |
from bluesky.callbacks import LiveTable, LivePlot | |
from bluesky.plan_tools import print_summary | |
def motor_plan(motor, motor_setpoint, liveplot_key=None, totExpTime=5, num_exp=1, delay=1): | |
""" | |
move motor to one desired position and execute "count" scan with desired total exposure time, | |
number of exposures and delay between exposures. | |
Parameters | |
---------- | |
motor : ophyd.Device | |
python representation of motor | |
motor_setpoint : float | |
position of motor | |
liveplot_key : str, optional | |
e. g., liveplot_key = rga_mass1 | |
data key for LivePlot. default is None, which means no LivePlot | |
totExpTime : float | |
total exposure time per frame in seconds. Dafault value is 5 sec | |
num_exp : int | |
number of images/exposure, default is 1 | |
delay: float | |
delay time between exposures in sec | |
Example | |
------- | |
""" | |
## move motor to designed location | |
yield from abs_set(motor, motor_setpoint) | |
## configure the exposure time first | |
_configure_area_det(totExpTime) # 5 secs exposuretime | |
## ScanPlan you need | |
plan = bp.count([pe1c, motor], num=num_exp, delay= delay) | |
plan = bp.subs_wrapper(plan, LiveTable([xpd_configuration['area_det'], motor])) | |
if liveplot_key and isinstance(liveplot_key, str): | |
plan = bp.subs_wrapper(plan, LivePlot(liveplot_key)) | |
yield from plan | |
def run_and_save_motorplan(motor, motor_scanplan_func, motor_pos_list, | |
sample_num_list, dryrun=True, *args): | |
"""helper function to execute xrun from combinations between elements from | |
a list of motor positions and elements from a list of sample number | |
motor_scanplan_func : func | |
a function that returns valid scanplan | |
motor_pos_list : list | |
a list of motor position you wish to drive to | |
sample_num_list : list | |
a list of sample index from ```bt.list()``. | |
dryrun : bool, optional | |
option of dry run. if it's True, only the (motor_pos, sample_num) | |
pairs will be printed. default to True | |
args : | |
additional parameters to motor_scanplan_func | |
Example | |
-------- | |
For an experiment with sample numbers to run are [0,1,2,5,7], | |
at corresponding position of diff_x at [-7.5, -6.5, -5.0, 0.0, 2.0], | |
for 5 sec total exposure time, number of exposure = 3, delay= 1 sec | |
the syntax will be (inside Ipython): | |
In[]: %run -i /home/documents/scripts/***.py # this depends on the path to the script | |
In[]: run_and_save_motorplan(diff_x, motor_plan, | |
[-7.5, -6.5, -5.0, 0.0, 2.0], | |
[0, 1, 2, 5, 7], True, 5, 3, 1) # dryrun to check | |
In[]: run_and_save_motorplan(diff_x, motor_plan, | |
[-7.5, -6.5, -5.0, 0.0, 2.0], | |
[0, 1, 2, 5, 7], False, 5, 3, 1) # real scan | |
""" | |
data_dir = "/direct/XF28ID1/pe2_data/xpdUser/tiff_base/" | |
#combinations = list(product(motor_pos_list, sample_num_list)) | |
combinations = list(zip(motor_pos_list, sample_num_list)) | |
total_num = len(combinations) | |
for comb in combinations: | |
pos, sample_num = comb | |
if dryrun: | |
print("==== At motor position = {}, xrun on sample_num = {} ====" | |
.format(pos, sample_num)) | |
else: | |
_plan = motor_scanplan_func(motor, pos, *args) | |
xrun(sample_num, _plan) | |
integrate_and_save_last() | |
### note: this code block gives you separate small tables on each scan | |
"""" | |
file_name = data_dir + "sample_num_" + str(sample_num) + ".csv" | |
tb = db.get_table(db[-1]) | |
tb.to_csv() | |
"""" | |
### note: this code block gives you a huge tables on all of your scans | |
file_name = data_dir + "sample_num_list{}.csv".format(sample_num_list) | |
tb = db.get_table(db[-total_num:]) | |
tb.to_csv() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment