"""Plan future DESI observations.
"""
from __future__ import print_function, division
import datetime
import os.path
import numpy as np
import astropy.table
import astropy.io.fits
import desiutil.log
import desisurvey.utils
import desisurvey.tiles
import desisurvey.ephem
[docs]def load_design_hourangle():
"""Load design hour-angle assignments from disk.
If hour angles are present in the tile file, defaults to those.
Otherwise reads column 'DESIGNHA' from file saved by the
``surveyinit`` script. Contents must row-match the tile file.
Parameters
----------
name : str
Name of the ecsv file to read. A relative path is assumed to
refer to the output path specified in the configuration.
Returns
-------
array
1D array of design hour angles in degrees, with indexing
that matches :class:`desisurvey.tiles.Tiles`.
"""
tiles = desisurvey.tiles.get_tiles()
if tiles.designha is not None:
return tiles.designha
else:
from astropy.table import Table
tf = os.path.join(os.environ['DESISURVEY_OUTPUT'],
os.path.basename(tiles.tiles_file))
design = Table.read(tf)
if not (np.all(tiles.tileID == design['TILEID']) and
np.allclose(tiles.tileRA['RA'], design['RA']) and
np.allclose(tiles['DEC'], design['DEC']) and
np.all(tiles['PROGRAM'] == design['PROGRAM'])):
raise ValueError('Plan HA does match tile file.')
return design['HA'].data.copy()
def get_fiber_assign_dir():
fiber_assign_dirs = []
fiber_assign_dir = os.environ.get('FIBER_ASSIGN_DIR', None)
if fiber_assign_dir is None:
config = desisurvey.config.Configuration()
fiber_assign_dir = getattr(config, 'fiber_assign_dir', None)
if fiber_assign_dir is not None:
fiber_assign_dir = fiber_assign_dir()
if fiber_assign_dir is not None:
fiber_assign_dirs.append(fiber_assign_dir)
fa_holding_pen = os.environ.get('FA_HOLDING_PEN', None)
if fa_holding_pen is None:
config = desisurvey.config.Configuration()
fa_holding_pen = getattr(config, 'fa_holding_pen', None)
if fa_holding_pen is not None:
fa_holding_pen = fa_holding_pen()
if fa_holding_pen is not None:
fiber_assign_dirs.append(fa_holding_pen)
return fiber_assign_dirs
[docs]def load_weather(start_date=None, stop_date=None, name='surveyinit.fits'):
"""Load dome-open fraction expected during each night of the survey.
Reads Image HDU 'WEATHER'. This is the format saved by the
``surveyinit`` script, but any FITS file following the same
convention can be used.
Parameters
----------
name : str
Name of the FITS file to read. A relative path is assumed to
refer to the output path specified in the configuration.
start_date : date or None
First night to include or use the first date of the survey. Must
be convertible to a date using :func:`desisurvey.utils.get_date`.
stop_date : date or None
First night to include or use the last date of the survey. Must
be convertible to a date using :func:`desisurvey.utils.get_date`.
Returns
-------
array
1D array of length equal to the span between stop_date and
start_date. Values are between 0 (dome closed all night) and
1 (dome open all night).
"""
config = desisurvey.config.Configuration()
if start_date is None:
start_date = config.first_day()
else:
start_date = desisurvey.utils.get_date(start_date)
if stop_date is None:
stop_date = config.last_day()
else:
stop_date = desisurvey.utils.get_date(stop_date)
if stop_date <= start_date:
raise ValueError('Expected start_date < stop_date.')
with astropy.io.fits.open(config.get_path(name), memmap=False) as hdus:
weather = hdus['WEATHER'].data
num_nights = len(weather)
first = desisurvey.utils.get_date(hdus['WEATHER'].header['FIRST'])
last = first + datetime.timedelta(num_nights)
if start_date < first:
raise ValueError('Weather not available before {}.'.format(
first.isoformat()))
num_nights = (stop_date - start_date).days
if last < stop_date:
raise ValueError('Weather not available after {}.'.format(
last.isoformat()))
ilo, ihi = (start_date - first).days, (stop_date - first).days
return weather[ilo:ihi]
[docs]class Planner(object):
"""Coordinate afternoon planning activities.
Parameters
----------
rules : object or None
Object with an ``apply`` method that is used to implement survey
strategy by updating tile priorities each afternoon. When None, all
tiles have equal priority.
restore : str or None
Restore internal state from the snapshot saved to this filename,
or initialize a new planner when None. Use :meth:`save` to
save a snapshot to be restored later. Filename is relative to
the configured output path unless an absolute path is
provided. Raise a RuntimeError if the saved tile IDs do not
match the current tiles_file values.
simulate : bool
If True, simulate fiber assignment process.
log : log object or None
logging object to use; None for desiutil default.
"""
def __init__(self, rules=None, restore=None, simulate=False, log=None):
if log is None:
self.log = desiutil.log.get_logger()
else:
self.log = log
self.rules = rules
config = desisurvey.config.Configuration()
self.min_snr2_fraction = config.min_snr2_fraction()
self.tiles_lowpass = config.tiles_lowpass()
self.simulate = simulate
if self.simulate:
self.fiberassign_cadence = config.fiber_assignment_cadence()
if ((self.fiberassign_cadence not in ('daily', 'monthly')) and
(not isinstance(self.fiberassign_cadence, int))):
raise ValueError('Invalid fiberassign_cadence: "{}".'.format(
self.fiberassign_cadence))
nogray = getattr(config, 'tiles_nogray', False)
if not isinstance(nogray, bool):
nogray = nogray()
self.nogray = nogray
fa_on_the_fly = getattr(config, 'fa_on_the_fly', False)
if not isinstance(fa_on_the_fly, bool):
fa_on_the_fly = fa_on_the_fly()
self.fa_on_the_fly = fa_on_the_fly
self.tiles = desisurvey.tiles.get_tiles()
self.ephem = desisurvey.ephem.get_ephem()
if restore is not None:
# Restore the plan for a survey in progress.
fullname = config.get_path(restore)
if not os.path.exists(fullname):
raise RuntimeError('Cannot restore planner from non-existent '
'"{}".'.format(fullname))
t = astropy.table.Table.read(fullname)
if ((len(t) != self.tiles.ntiles) or
np.any(self.tiles.tileID != t['TILEID']) or
np.any(self.tiles.tileRA != t['RA']) or
np.any(self.tiles.tileDEC != t['DEC']) or
np.any(self.tiles.tileprogram != t['PROGRAM'])):
raise ValueError('mismatch between tiles and status file; '
'should not be possible!')
if self.simulate:
if 'FIRST' in t.meta:
self.first_night = desisurvey.utils.get_date(
t.meta['FIRST'])
self.last_night = desisurvey.utils.get_date(
t.meta['LAST'])
else:
self.first_night = self.last_night = None
self.tile_countdown = np.zeros(self.tiles.ntiles, dtype='i4')
if 'COUNTDOWN' in t.keys():
self.tile_countdown = t['COUNTDOWN'].data.copy()
else:
self.tile_countdown[:] = (
self.tiles.fiberassign_delay.copy())
if ('CADENCE' in t.meta) and (
t.meta['CADENCE'] != self.fiberassign_cadence):
raise ValueError('Fiberassign cadence mismatch.')
self.tile_status = np.zeros(self.tiles.ntiles, dtype='U20')
self.tile_status[:] = 'unobs'
self.tile_status[:] = [s.strip().lower() for s in t['STATUS']]
self.tile_available = self.tiles.in_desi.copy()
self.tile_priority = t['PRIORITY'].data.copy()
self.donefrac = t['DONEFRAC'].data.copy()
self.designha = t['DESIGNHA'].data.copy()
if 'AVAILABLE' in t.dtype.names:
self.tile_available[:] &= t['AVAILABLE'].data.copy()
self.log.debug(('Restored plan with {} unobserved, {} pending, '
'and {} completed tiles from {}.').format(
np.sum(self.donefrac <= 0),
np.sum((self.donefrac > 0) &
(self.tile_status != 'done')),
np.sum(self.tile_status == 'done'),
fullname))
else:
# Initialize the plan for a a new survey.
self.tile_available = self.tiles.in_desi.copy()
self.donefrac = np.zeros(self.tiles.ntiles, 'f4')
self.designha = load_design_hourangle()
# Initialize per-tile arrays.
self.tile_status = np.zeros(self.tiles.ntiles, dtype='U12')
self.tile_status[:] = 'unobs'
if self.simulate:
self.first_night = self.last_night = None
# Initailize the delay countdown for each tile.
self.tile_countdown = self.tiles.fiberassign_delay.copy()
# Initialize priorities.
if self.rules is not None:
none_started = np.zeros(self.tiles.ntiles, 'f4')
self.tile_priority = self.rules.apply(none_started)
else:
self.tile_priority = np.ones(self.tiles.ntiles, float)
if not np.any(self.tile_priority > 0):
raise RuntimeError('All tile priorities are all <= 0.')
[docs] def add_pending_tile(self, tileid):
"""Add a newly observed, now-pending tile to the pending tile list.
Updates tile availability so that this tile's neighbors will not
be observed until this tile is completed.
Parameters
----------
tileid : int
"""
idx, mask = self.tiles.index(tileid, return_mask=True)
if not mask:
self.log.error(
('Invalid tileid {} passed to '
'add_pending_tile; ignoring.').format(tileid))
return
overlapping = self.tiles.overlapping[idx]
# if this tile's LyA decisions have already been made,
# that's a problem!
assert not self.tile_status[idx] == 'done'
self.tile_available[overlapping] = 0
[docs] def set_donefrac(self, tileid, donefrac=None, status=None,
ignore_pending=False, nobs=None):
"""Update planner with new tile donefrac.
Parameters
----------
tileid : array
1D array of integer tileIDs to update
donefrac : array
1D array of completion fractions for tiles, matching tileid,
optional
status : array
1D array of tile status to update; optional
ignore_pending: bool
do not mark newly started files as pending
nobs : array
1D array of number of observations of each tile
A tile with at least 1 observation will never be considered
unobserved, regardless of whether it has zero donefrac.
"""
tileid = np.atleast_1d(tileid)
if donefrac is not None:
donefrac = np.atleast_1d(donefrac)
if status is not None:
status = np.atleast_1d(status)
tileind, mask = self.tiles.index(tileid, return_mask=True)
if np.any(~mask):
self.log.debug('Some tiles with unknown IDs; ignoring')
tileind = tileind[mask]
if donefrac is not None:
donefrac = donefrac[mask]
if status is not None:
status = status[mask]
if nobs is not None:
nobs = nobs[mask]
if donefrac is not None:
self.donefrac[tileind] = donefrac
if status is not None:
self.tile_status[tileind] = status
if nobs is not None:
m = (nobs > 0) & (self.tile_status[tileind] == 'unobs')
self.tile_status[tileind[m]] = 'obsstart'
if not ignore_pending and (donefrac is not None):
for tileid0, donefrac0 in zip(np.array(tileid)[mask], donefrac):
if donefrac0 > 0:
self.add_pending_tile(tileid0)
def obsend(self):
return ((self.tile_status == 'obsend') |
(self.tile_status == 'done') |
(self.donefrac >= self.min_snr2_fraction))
def obsend_by_program(self):
tiledone = self.obsend()
tiledone_by_program = np.zeros(len(self.tiles.programs), 'i4')
for program in self.tiles.programs:
progidx = self.tiles.program_index[program]
m = self.tiles.program_mask[program]
tiledone_by_program[progidx] = np.sum(tiledone[m])
return tiledone_by_program
[docs] def survey_completed(self):
"""Test if all tiles have been completed.
"""
return np.sum(self.obsend()) == np.sum(self.tiles.in_desi)
[docs] def save(self, name):
"""Save a snapshot of our current state that can be restored.
The output file has a binary table (extname PLAN) with columns
TILEID, CENTERID, PASS, RA, DEC, PROGRAM, IN_DESI, EBV_MED, DESIGNHA,
PRIORITY, STATUS, and DONEFRAC.
Simulations also include COUNTDOWN and header keywords CADENCE,
FIRST, LAST.
Parameters
----------
name : str
Name of FITS file where the snapshot will be saved. The file will
be saved under our configuration's output path unless name is
already an absolute path. Pass the same name to the constructor's
``restore`` argument to restore this snapshot.
"""
config = desisurvey.config.Configuration()
fullname = config.get_path(name)
meta = dict(EXTNAME='TILES')
if self.simulate:
meta['CADENCE'] = self.fiberassign_cadence
meta['FIRST'] = self.first_night.isoformat()
meta['LAST'] = self.last_night.isoformat()
t = self.tiles.read_tiles_table()
t.meta = meta
t['DESIGNHA'] = self.designha
t['DESIGNHA'].format = '%7.2f'
t['DESIGNHA'].description = 'Design hour angles'
t['PRIORITY'] = self.tile_priority
t['PRIORITY'].format = '%10.3e'
t['PRIORITY'].description = 'Tile observation priority'
t['STATUS'] = self.tile_status
t['STATUS'].description = 'unobs, obsstart, obsend, done'
t['DONEFRAC'] = self.donefrac
t['DONEFRAC'].format = '%7.4f'
t['DONEFRAC'].description = 'Tile completeness fraction'
t['AVAILABLE'] = self.tile_available
t['AVAILABLE'].description = 'Fiberassign file is available'
if self.simulate:
t['COUNTDOWN'] = self.tile_countdown
t.write(fullname+'.tmp', overwrite=True, format='ascii.ecsv')
os.rename(fullname+'.tmp', fullname)
self.log.debug(
('Saved plan with {} unobserved, {} pending, and {} '
'completed tiles from {}.').format(
np.sum(self.tile_status == 'unobs'),
np.sum(self.tile_status == 'obsstart'),
np.sum(self.tile_status == 'done'),
fullname))
[docs] def fiberassign_simulate(self, night):
"""Update fiber assignments.
"""
# Calculate the number of elapsed nights in the survey.
pending = self.tile_status == 'obsend'
run_now = pending & (self.tile_countdown <= 0)
self.tile_status[run_now] = 'done'
delayed = pending & (self.tile_countdown > 0)
self.tile_countdown[delayed] -= 1
self.tile_available = self.tiles.in_desi.copy()
self.log.info('Completed {} tiles with {} delayed on {}.'
.format(np.count_nonzero(run_now),
np.count_nonzero(delayed), night))
[docs] def fiberassign(self, dirnames):
r"""Update list of tiles available for spectroscopy.
Scans given directory looking for fiberassign file and populates Plan
object accordingly.
Parameters
----------
dirnames : list
list of directory names where fiberassign files are to be found
This directory is recursively scanned for all files with names
matching tile-(\d+)\.fits. TILEIDs are populated according to
the name of the fiberassign file, and any header information is
ignored.
"""
import glob
import re
files = []
for dirname in dirnames:
files0 = glob.glob(os.path.join(dirname, '**/*.fits*'),
recursive=True)
files = files + files0
rgx = re.compile(r'.*fiberassign-(\d+)\.fits(\.gz)?')
available_tileids = []
for fn in files:
match = rgx.match(fn)
if match:
available_tileids.append(int(match.group(1)))
available = np.zeros(self.tiles.ntiles, dtype='bool')
ind, mask = self.tiles.index(available_tileids, return_mask=True)
available[ind[mask]] = True
self.tile_available = self.tiles.in_desi & available
self.log.info('Observations possible for {} tiles.'.format(
np.count_nonzero(available)))
if np.count_nonzero(available) == 0:
self.log.error('No fiberassign files available for scheduling!')
if np.any(~mask):
self.log.debug(
'Ignoring {} tiles that were assigned, '.format(sum(~mask)) +
'but not found in the tile file.')
[docs] def afternoon_plan(self, night):
"""Update plan for a given night. Update tile availability and priority.
Parameters
----------
night : str
night string, YYYY-MM-DD, to be planned.
This argument has no effect for when Plan.simulate = False; in this
case, tile availability and priority is based entirely on what
files are currently present in the fiberassign directory and what
the planner believes the current tile completions are.
Returns
-------
new_observed (array), new_completed (array)
boolean arrays indicating newly observed and newly completed tiles.
"""
oldstatus = self.tile_status.copy()
newlystarted = ((self.donefrac > 0) & (self.tile_status == 'unobs'))
self.tile_status[newlystarted] = 'obsstart'
newlyobserved = ((self.donefrac >= self.min_snr2_fraction) &
((self.tile_status == 'obsstart') |
(self.tile_status == 'unobs')))
self.tile_status[newlyobserved] = 'obsend'
self.log.debug(('Starting afternoon planning for {} with {} / {} '
'tiles completed.').format(
night,
np.sum(self.tile_status == 'done'),
self.tiles.ntiles))
if self.simulate:
if self.first_night is None:
# Remember the first night of the survey.
self.first_night = night
day_number = (night - self.first_night).days
# Update fiber assignments this afternoon?
if self.fiberassign_cadence == 'monthly':
# Run fiber assignment on the afternoon before the full moon.
dt = self.ephem.get_night(night)['nearest_full_moon']
run_fiberassign = (dt > -0.5) and (dt <= 0.5)
assert run_fiberassign == self.ephem.is_full_moon(
night, num_nights=1)
elif isinstance(self.fiberassign_cadence, int):
run_fiberassign = (day_number % self.fiberassign_cadence) == 0
else:
run_fiberassign = True
if run_fiberassign:
self.fiberassign_simulate(night)
self.last_night = night
else:
fiber_assign_dirs = get_fiber_assign_dir()
if fiber_assign_dirs is None:
raise ValueError(
'fiber_assign_dir must be set either in '
'config.yaml or in FIBER_ASSIGN_DIR; failing!')
oldavail = self.tile_available.copy()
self.fiberassign(fiber_assign_dirs)
filesavailable = self.tile_available.copy()
self.tile_available = oldavail.copy()
# Update tile priorities.
if self.rules is not None:
self.tile_priority = self.rules.apply(self.donefrac)
self.last_night = night
pending = ((self.tile_status == 'obsstart') |
(self.tile_status == 'obsend'))
for tileid in self.tiles.tileID[pending]:
self.add_pending_tile(tileid)
if self.tiles_lowpass:
self.prefer_low_passnum()
zeropriority = ((self.tile_status != 'unobs') &
(self.tile_available == 0))
self.tile_priority[zeropriority] = 0
# tiles with priority > 0 may be designed.
if not self.simulate and not self.fa_on_the_fly:
m = self.tile_available & ~filesavailable
self.tile_available[~filesavailable] = 0
for prog, progmask in self.tiles.program_mask.items():
ma = m & progmask
nundesigned = np.sum(ma)
if nundesigned == 0:
continue
elif nundesigned < 200:
self.log.info(
'Newly available {} tiles that '.format(prog) +
'have not yet been designed: ' +
' '.join([str(x) for x in self.tiles.tileID[ma]]))
else:
self.log.info(
'There are many ({}) available {} tiles that have '
'not been designed. Suppressing full '
'list.'.format(nundesigned, prog))
newlycompleted = ((self.tile_status == 'done') &
(oldstatus != 'done'))
return newlystarted, newlycompleted
[docs] def prefer_low_passnum(self):
"""Mark only tiles available that are in the lowest pass
of any overlapping tiles."""
obsend = self.obsend()
mobservable = self.tile_available & ~obsend
mfree = np.ones(self.tiles.ntiles, dtype='bool')
passes = np.unique(self.tiles.tilepass[mobservable])
s = np.argsort(passes)
for pass0 in passes[s]:
mpass = self.tiles.tilepass == pass0
m = mpass & mobservable
for idx in np.flatnonzero(m):
if ~mfree[idx]:
continue
over = np.array(self.tiles.overlapping[idx], dtype='i4')
mfree[over] = 0
if np.any((self.tiles.tilepass[over] < pass0) & ~obsend[over]):
# unobserved, overlapping tiles with lower pass numbers.
mfree[idx] = 0
self.tile_available[~mfree] = 0