Skip to content

Instantly share code, notes, and snippets.

@chiahaoliu
Last active March 21, 2017 02:10
Show Gist options
  • Save chiahaoliu/8e95a7a06882613a4e99396f6d6cdadf to your computer and use it in GitHub Desktop.
Save chiahaoliu/8e95a7a06882613a4e99396f6d6cdadf to your computer and use it in GitHub Desktop.
170320_XPDsupport
import os
import numpy as np
from itertools import product
from xpdacq.beamtime import _configure_area_det
import bluesky.plans as bp
from bluesky.plans import (scan, subs_wrapper, abs_set, count, list_scan,
adaptive_scan, reset_positions_wrapper)
from bluesky.callbacks import LiveTable, LivePlot
from bluesky.plan_tools import print_summary
def motor_plan(motor, motor_setpoint, liveplot_key=None, totExpTime=5, num_exp=1, delay=1):
"""
move motor to one desired position and execute "count" scan with desired total exposure time,
number of exposures and delay between exposures.
Parameters
----------
motor : ophyd.Device
python representation of motor
motor_setpoint : float
position of motor
liveplot_key : str, optional
e. g., liveplot_key = rga_mass1
data key for LivePlot. default is None, which means no LivePlot
totExpTime : float
total exposure time per frame in seconds. Dafault value is 5 sec
num_exp : int
number of images/exposure, default is 1
delay: float
delay time between exposures in sec
Example
-------
"""
## move motor to designed location
yield from abs_set(motor, motor_setpoint)
## configure the exposure time first
_configure_area_det(totExpTime) # 5 secs exposuretime
## ScanPlan you need
plan = bp.count([pe1c, motor], num=num_exp, delay= delay)
plan = bp.subs_wrapper(plan, LiveTable([xpd_configuration['area_det'], motor]))
if liveplot_key and isinstance(liveplot_key, str):
plan = bp.subs_wrapper(plan, LivePlot(liveplot_key))
yield from plan
def run_and_save_motorplan(motor, motor_scanplan_func, motor_pos_list,
sample_num_list, dryrun=True, *args):
"""helper function to execute xrun from combinations between elements from
a list of motor positions and elements from a list of sample number
motor_scanplan_func : func
a function that returns valid scanplan
motor_pos_list : list
a list of motor position you wish to drive to
sample_num_list : list
a list of sample index from ```bt.list()``.
dryrun : bool, optional
option of dry run. if it's True, only the (motor_pos, sample_num)
pairs will be printed. default to True
args :
additional parameters to motor_scanplan_func
Example
--------
For an experiment with sample numbers to run are [0,1,2,5,7],
at corresponding position of diff_x at [-7.5, -6.5, -5.0, 0.0, 2.0],
for 5 sec total exposure time, number of exposure = 3, delay= 1 sec
the syntax will be (inside Ipython):
In[]: %run -i /home/documents/scripts/***.py # this depends on the path to the script
In[]: run_and_save_motorplan(diff_x, motor_plan,
[-7.5, -6.5, -5.0, 0.0, 2.0],
[0, 1, 2, 5, 7], True, 5, 3, 1) # dryrun to check
In[]: run_and_save_motorplan(diff_x, motor_plan,
[-7.5, -6.5, -5.0, 0.0, 2.0],
[0, 1, 2, 5, 7], False, 5, 3, 1) # real scan
"""
data_dir = "/direct/XF28ID1/pe2_data/xpdUser/tiff_base/"
#combinations = list(product(motor_pos_list, sample_num_list))
combinations = list(zip(motor_pos_list, sample_num_list))
total_num = len(combinations)
for comb in combinations:
pos, sample_num = comb
if dryrun:
print("==== At motor position = {}, xrun on sample_num = {} ===="
.format(pos, sample_num))
else:
_plan = motor_scanplan_func(motor, pos, *args)
xrun(sample_num, _plan)
integrate_and_save_last()
### note: this code block gives you separate small tables on each scan
""""
file_name = data_dir + "sample_num_" + str(sample_num) + ".csv"
tb = db.get_table(db[-1])
tb.to_csv()
""""
### note: this code block gives you a huge tables on all of your scans
file_name = data_dir + "sample_num_list{}.csv".format(sample_num_list)
tb = db.get_table(db[-total_num:])
tb.to_csv()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment