import sched
import time
from bluesky import RunEngine
from ophyd.sim import noisy_det
from bluesky.plans import count
RE = RunEngine()
s = sched.scheduler(time.monotonic, time.sleep)
s.enter(0, 1, lambda: print(RE(count([noisy_det], 10, delay=0.5))))
s.enter(0, 1, lambda: print(RE(count([noisy_det], 10, delay=0.5))))
s.enter(0, 1, lambda: print(RE(count([noisy_det], 10, delay=0.5))))
In [20]: s.queue
Out[20]:
[Event(time=8565.830196546, priority=1, action=<function <lambda> at 0x7fdb18e7cd30>, argument=(), kwargs={}),
Event(time=8566.797394388, priority=1, action=<function <lambda> at 0x7fdb1d3e3f70>, argument=(), kwargs={}),
Event(time=8567.447242847, priority=1, action=<function <lambda> at 0x7fdb1b0748b0>, argument=(), kwargs={})]
s.run()
RunEngineInterrupted:
Your RunEngine is entering a paused state. These are your options for changing
the state of the RunEngine:
RE.resume() Resume the plan.
RE.abort() Perform cleanup, then kill plan. Mark exit_stats='aborted'.
RE.stop() Perform cleanup, then kill plan. Mark exit_status='success'.
RE.halt() Emergency Stop: Do not perform cleanup --- just stop.
In [20]: s.queue
Out[20]:
[Event(time=141076.866579366, priority=1, sequence=7, action=<function <lambda> at 0x7fc69fd09f30>, argument=(), kwargs={}),
Event(time=141077.939733281, priority=1, sequence=8, action=<function <lambda> at 0x7fc69fd0ad40>, argument=(), kwargs={})]
s.enter(0, 1, lambda: print(RE(count([noisy_det], 10, delay=0.5))))
s.enter(0, 1, lambda: print(RE(count([noisy_det], 10, delay=0.5))))
s.enter(0, 1, lambda: print(RE(count([noisy_det], 10, delay=0.5))))
RE.resume()
import random
def get_feedback():
return random.randint()
example_plan = lambda num: RE(count([noisy_det], num, delay=0.5))
def adaptive(plan):
num = get_feedback()
print(f"num = {num}")
example_plan(num)
s.enter(0, 1, lambda: adaptive(example_plan))
s.enter(0, 1, lambda: adaptive(example_plan))
In [39]: s.run()
num = 0
num = 4
def get_next_plan():
plan = lambda: print(RE(count([noisy_det], 2, delay=0.5)))
return plan
Create a continious
function, with any custom logic that you want, that gets the next plan and executes it.
def continious(s, get_next_plan):
while True:
result = input("Do you want to continue? [y/n]")
if result == "n":
break
s.enter(0, 1, get_next_plan())
s.run()
continious(s, get_next_plan)
Do you want to continue? [y/n]y
('c5b9da08-844b-41a7-a254-e34134207bac',)
Do you want to continue? [y/n]y
('63fc6b4f-5f11-4f2b-86a8-484e6547eb59',)
Do you want to continue? [y/n]y
('15aa63c0-f515-4b33-bd8c-8a6cac98eeb9',)
Do you want to continue? [y/n]
You can connect to a remote kernel through IPython, Jupyter, Spyder IDE, VSCode. https://github.com/ipython/ipython/wiki/Cookbook:-Connecting-to-a-remote-kernel-via-ssh
This can be done by making use of the ipython KernelManager, KernelProvider, and IPython API.
We can write a customization of KernelProvider to enforce only 1 bluesky kernel per beamline: see: https://jupyter-kernel-mgmt.readthedocs.io/en/latest/kernel_providers.html#creating-a-kernel-provider