Created
May 8, 2017 12:01
-
-
Save StuartLittlefair/81b27f5cfb7d257d1a9394368edf6b68 to your computer and use it in GitHub Desktop.
Autoscheduling for ultracam runs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
from trm import observing | |
from astropy.coordinates import SkyCoord, EarthLocation | |
from astropy import units as u | |
from astropy.time import Time | |
from astroplan import Observer, ObservingBlock, FixedTarget | |
from astroplan.scheduling import (Schedule, Transitioner, TransitionBlock, | |
PriorityScheduler, SequentialScheduler) | |
from astroplan.constraints import (AirmassConstraint, AtNightConstraint, Constraint, | |
MoonSeparationConstraint, MoonIlluminationConstraint) | |
# make a phase constraint | |
class PhaseConstraint(Constraint): | |
def __init__(self, t0, P, kind='barycentric', | |
unit='mjd', | |
min=None, max=None, | |
boolean_constraint=True): | |
self.min = min if min is not None else 0.0 | |
self.max = max if max is not None else 1.0 | |
# recast on range 0 to 1 | |
self.min -= np.floor(self.min).astype(int) | |
self.max -= np.floor(self.max).astype(int) | |
self.kind = kind | |
self.unit = unit | |
self.t0 = t0 | |
self.P = P | |
self.boolean_constraint = boolean_constraint | |
def compute_constraint(self, times, observer, targets): | |
if self.kind not in ['barycentric', 'heliocentric', 'utc']: | |
raise ValueError('Unrecognised timescale') | |
if self.kind == 'barycentric': | |
time_in_right_scale = times.tdb + times.light_travel_time( | |
targets, kind='barycentric', location=observer.location | |
) | |
elif self.kind == 'heliocentric': | |
time_in_right_scale = times + times.light_travel_time( | |
targets, kind='heliocentric', location=observer.location | |
) | |
else: | |
time_in_right_scale = times | |
values = (time_in_right_scale.jd if self.unit == 'jd' else | |
time_in_right_scale.mjd) | |
phase = (values - self.t0)/self.P | |
frac_phase = phase - np.floor(phase).astype(int) | |
mask = np.where(self.max > self.min, | |
(frac_phase > self.min) & (frac_phase < self.max), | |
(frac_phase > self.min) | (frac_phase < self.max)) | |
return mask | |
def read_trm_files(target_file, info_file, range_file): | |
peinfo = {} | |
count = 0 | |
name = None | |
with open(target_file) as tf: | |
for line in tf: | |
if not line.startswith('#') and not line.isspace(): | |
count += 1 | |
if count == 1: | |
name = line.strip() | |
elif count == 2: | |
rastr, decstr = line.split() | |
elif count == 3: | |
try: | |
eph = observing.Ephemeris(line) | |
peinfo[name] = {'ra': rastr, 'dec': decstr, | |
'ephemeris': eph} | |
except: | |
peinfo[name] = {'ra': rastr, 'dec': decstr, | |
'ephemeris': None} | |
finally: | |
count = 0 | |
count = 0 | |
with open(info_file) as inf: | |
for line in inf: | |
if not line.startswith('#') and not line.isspace(): | |
count += 1 | |
if count == 1: | |
name = line.strip() | |
elif count == 2: | |
fields = line.split() | |
priority, lf, dur, nvisits = ( | |
int(fields[0])+1, float(fields[1]), | |
float(fields[2]), int(fields[3]) | |
) | |
peinfo[name].update( | |
{'moon': lf, 'duration': dur, 'visits': nvisits, 'pri': priority} | |
) | |
count = 0 | |
prinfo = {} | |
count = 0 | |
with open(range_file) as rf: | |
for line in rf: | |
if line.startswith('#') or line.isspace(): | |
if count: | |
if name in peinfo: | |
prinfo[name] = pr | |
else: | |
print(name, 'not found in position/ephemeris file and will be skipped.') | |
count = 0 | |
else: | |
count += 1 | |
if count == 1: | |
name = line.strip() | |
pr = observing.Prange(name) | |
elif count > 1: | |
pr.add(line) | |
return peinfo, prinfo | |
def create_observing_blocks(target_file, info_file, range_file): | |
peinfo, prinfo = read_trm_files(target_file, info_file, range_file) | |
blocks = [] | |
for name in peinfo: | |
if 'duration' not in peinfo[name]: | |
print('{} not found in info file and will be skipped'.format(name)) | |
continue | |
try: | |
coo = SkyCoord( | |
peinfo[name]['ra'], | |
peinfo[name]['dec'], | |
unit=(u.hour, u.deg) | |
) | |
target = FixedTarget(coo, name=name) | |
# add moon constraint | |
mic = MoonIlluminationConstraint(max=peinfo[name]['moon']) | |
# if there's an ephemeris and no specified, duration use that | |
if ('ephemeris' in peinfo[name] and peinfo[name]['ephemeris'] is not None and | |
peinfo[name]['duration'] == 0.0): | |
# add OBs for the each phase constraint | |
for rng in prinfo[name].prange: | |
# rng = prinfo[name].prange[0] | |
minval, maxval, _, _, _ = rng | |
eph = peinfo[name]['ephemeris'] | |
t0, p = eph.coeff | |
assert eph.time in ['BMJD', 'HMJD', 'HJD', | |
'BJD'] | |
if eph.time == 'BMJD': | |
kind = 'barycentric' | |
unit = 'mjd' | |
elif eph.time == 'HMJD': | |
kind = 'heliocentric' | |
unit = 'mjd' | |
elif eph.time == 'HJD': | |
kind = 'heliocentric' | |
unit = 'jd' | |
else: | |
kind = 'barycentric' | |
unit = 'jd' | |
pc = PhaseConstraint(t0, p, | |
min=minval, | |
max=maxval, | |
kind=kind, | |
unit=unit) | |
# phase constraint defines the allowed | |
# observable phases. if this is equal to the duration | |
# we have numerical precision issues and the block | |
# is probably never scheduled. Here we shorten | |
# the duration by a minute to allow scheduling | |
block = ObservingBlock( | |
target, | |
p*(maxval-minval)*u.d - 1*u.minute, | |
peinfo[name]['pri'], | |
{'acquisition': name}, | |
[mic, pc] | |
) | |
blocks.extend([block]*peinfo[name]['visits']) | |
else: | |
# add OBs with no phase constraint | |
# add moon constraint | |
block = ObservingBlock( | |
target, | |
peinfo[name]['duration']*u.min, | |
peinfo[name]['pri'], | |
{'acquisition': name}, | |
[mic]) | |
blocks.extend([block]*peinfo[name]['visits']) | |
except Exception as err: | |
print(name) | |
print(peinfo[name]) | |
print(prinfo) | |
print(str(err)) | |
raise | |
return blocks | |
def write_switch_file(schedule): | |
obs_blocks = [b for b in schedule.scheduled_blocks if isinstance(b, ObservingBlock)] | |
transitions = [b for b in schedule.scheduled_blocks if isinstance(b, TransitionBlock)] | |
switch_times = [transition.start_time for transition in transitions] | |
switch_times.insert(0, obs_blocks[0].start_time) | |
switch_times.append(obs_blocks[-1].end_time) | |
names = [b.target.name for b in obs_blocks] | |
names.append('None') | |
durations = [transition.duration.to(u.min).value for transition in transitions] | |
durations.insert(0, 0) | |
durations.append(10) | |
for n, t, d in zip(names, switch_times, durations): | |
tstring = t.iso.split()[1] | |
tstring_fields = tstring.split(':') | |
tstring = ':'.join(tstring_fields[:-1]) | |
print('{} | {} | {}'.format( | |
n, tstring, int(d) | |
)) | |
if __name__ == "__main__": | |
import argparse | |
import datetime as dt | |
TNT = EarthLocation(lat=18.574, lon=98.482, height=2449.) | |
WHT = EarthLocation(lat=28.76063889, lon=-17.88163889, height=2332.) | |
VLT = EarthLocation(lat=-24.6250000, lon=-70.40275000, height=2635.) | |
NTT = EarthLocation(lat=-29.2589167, lon=-70.73375000, height=2375.) | |
scopes = dict(TNT=TNT, WHT=WHT, VLT=VLT, NTT=NTT) | |
parser = argparse.ArgumentParser( | |
formatter_class=argparse.ArgumentDefaultsHelpFormatter | |
) | |
# positional | |
parser.add_argument( | |
'stardata', | |
help='general file containing positions and ephemerides. It should have the same format of my C++ ephemeris file.') | |
parser.add_argument('telescope', help='Telescope name, e.g. WHT, VLT') | |
parser.add_argument('info', | |
help='general info on stars (priority, lunar limits, etc') | |
parser.add_argument( | |
'pranges', | |
help='file of stars to include and phase ranges of interest. Each entry should start with the name of the ' + | |
'star on its own on a line, and then follow it with the phase ranges in the form:\np1 p2 col lw\nwhere ' + | |
'p1 p2 is the range of phase to indicate, col is the pgplot colour index (2 = red, etc), lw is the ' + | |
'pgplot line width. It is OK to have p1 > p2; the programme will map it into a positive range.') | |
parser.add_argument('date', help='date at start of night, e.g. "01 Aug 2012"') | |
parser.add_argument('-s', '--scheduler', default='priority', | |
help='Scheduler to use. Options are sequential or priority. ' + | |
'Sequential scheduling runs through the night, scheduling the best OB at any time. ' + | |
'Priority scheduling schedules the OBs in priority order') | |
args = parser.parse_args() | |
if args.scheduler not in ['priority', 'sequential']: | |
raise ValueError('Unrecognised scheduler') | |
# get noon before and after | |
midnight_on_night_starting = Time(dt.datetime.strptime(args.date, "%d %b %Y")) | |
noon_before = midnight_on_night_starting + 12*u.hour | |
noon_after = midnight_on_night_starting + 36*u.hour | |
global_constraints = [AirmassConstraint(max=2.2, boolean_constraint=False), | |
AtNightConstraint.twilight_nautical(), | |
MoonSeparationConstraint(30*u.deg)] | |
# create observing blocks for each target (xN if multiple visits OK) | |
blocks = create_observing_blocks(args.stardata, args.info, args.pranges) | |
# create a transitioner | |
slew_rate = 1.0*u.deg/u.second | |
transition_components = dict( | |
acquisition=dict(default=10*u.minute), | |
filter=dict(default=10*u.minute) | |
) | |
transitioner = Transitioner(slew_rate, transition_components) | |
observer = Observer(location=scopes[args.telescope]) | |
pscheduler = PriorityScheduler(constraints=global_constraints, | |
observer=observer, | |
transitioner=transitioner) | |
sscheduler = SequentialScheduler(constraints=global_constraints, | |
observer=observer, | |
transitioner=transitioner) | |
schedule = Schedule(noon_before, noon_after) | |
if args.scheduler == 'priority': | |
pscheduler(blocks, schedule) | |
elif args.scheduler == 'sequential': | |
sscheduler(blocks, schedule) | |
print('') | |
write_switch_file(schedule) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment