"""Script wrapper for creating a movie of survey progress.
To run this script from the command line, use the ``surveymovie`` entry point
that is created when this package is installed, and should be in your shell
command search path.
The optional matplotlib python package must be installed to use this script.
The external program ffmpeg must be installed to use this script.
At nersc, try ``module add ffmpeg``.
"""
from __future__ import print_function, division, absolute_import
import argparse
import os.path
import numpy as np
import matplotlib as mpl
mpl.use('Agg')
import matplotlib.pyplot as plt
import matplotlib.gridspec
import matplotlib.animation
import matplotlib.colors
import matplotlib.animation
import astropy.time
import astropy.io.fits
import astropy.units as u
import desiutil.log
import desisurvey.ephem
import desisurvey.utils
import desisurvey.config
import desisurvey.tiles
import desisurvey.plots
[docs]def parse(options=None):
"""Parse command-line options for running survey planning.
"""
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--verbose', action='store_true',
help='display log messages with severity >= info')
parser.add_argument('--debug', action='store_true',
help='display log messages with severity >= debug (implies verbose)')
parser.add_argument('--log-interval', type=int, default=100, metavar='N',
help='interval for logging periodic info messages')
parser.add_argument(
'--exposures', default='exposures_surveysim.fits', metavar='FITS',
help='name of FITS file with list of exposures taken')
parser.add_argument(
'--start', type=str, default=None, metavar='DATE',
help='movie starts on the evening of this day, formatted as YYYY-MM-DD')
parser.add_argument(
'--stop', type=str, default=None, metavar='DATE',
help='movie stops on the morning of this day, formatted as YYYY-MM-DD')
parser.add_argument(
'--expid', type=int, default=None, metavar='ID',
help='index of single exposure to display')
parser.add_argument(
'--nightly', action='store_true',
help='output one summary frame per night')
# The scores option needs to be re-implemented after the refactor.
##parser.add_argument(
## '--scores', action='store_true', help='display scheduler scores')
parser.add_argument(
'--save', type=str, default='surveymovie', metavar='NAME',
help='base name (without extension) of output file to write')
parser.add_argument(
'--fps', type=float, default=10., metavar='FPS',
help='frames per second to render')
parser.add_argument(
'--label', type=str, default='DESI', metavar='TEXT',
help='label to display on each frame')
parser.add_argument(
'--output-path', default=None, metavar='PATH',
help='path that desisurvey files are read from')
parser.add_argument(
'--tiles-file', default=None, metavar='TILES',
help='name of tiles file to use instead of config.tiles_file')
parser.add_argument(
'--config-file', default='config.yaml', metavar='CONFIG',
help='input configuration file')
if options is None:
args = parser.parse_args()
else:
args = parser.parse_args(options)
# The scores option needs to be re-implemented after the refactor.
args.scores = False
if args.nightly and args.scores:
log.warn('Cannot display scores in nightly summary.')
args.scores = False
# Validate start/stop date args and covert to datetime objects.
# Unspecified values are taken from our config.
config = desisurvey.config.Configuration(args.config_file)
if args.start is None:
args.start = config.first_day()
else:
try:
args.start = desisurvey.utils.get_date(args.start)
except ValueError as e:
raise ValueError('Invalid start: {0}'.format(e))
if args.stop is None:
args.stop = config.last_day()
else:
try:
args.stop = desisurvey.utils.get_date(args.stop)
except ValueError as e:
raise ValueError('Invalid stop: {0}'.format(e))
if args.start >= args.stop:
raise ValueError('Expected start < stop.')
return args
[docs]def wrap(angle, offset=-60):
"""Wrap values in the range [0, 360] to [offset, offset+360].
"""
return np.fmod(angle - offset + 360, 360) + offset
[docs]class Animator(object):
"""Manage animation of survey progress.
"""
def __init__(self, exposures_path, start, stop, label, show_scores):
self.log = desiutil.log.get_logger()
self.config = desisurvey.config.Configuration()
self.tiles = desisurvey.tiles.get_tiles()
self.ephem = desisurvey.ephem.get_ephem()
# Load exposures and associated tile data.
self.exposures = astropy.table.Table.read(exposures_path, hdu='EXPOSURES')
self.tiledata = astropy.table.Table.read(exposures_path, hdu='TILEDATA')
self.label = label
self.show_scores = show_scores
self.ra = wrap(self.tiles.tileRA)
self.dec = self.tiles.tileDEC
self.tileprogram = self.tiles.tileprogram
self.tileid = self.tiles.tileID
self.prognames = [p for p in self.tiles.programs]
# if DARK and BRIGHT are names, put them first
for pname in ['BRIGHT', 'DARK']:
if pname in self.prognames:
self.prognames.remove(pname)
self.prognames = [pname] + self.prognames
self.tiles_per_program = {p: np.sum(self.tiles.program_mask[p])
for p in self.prognames}
self.psels = [self.tiles.program_mask[p] for p in self.prognames]
self.start_date = self.config.first_day()
self.survey_weeks = int(np.ceil((self.config.last_day() - self.start_date).days / 7))
# Add some computed columns to the exposures table.
self.exposures['EXPID'] = np.arange(len(self.exposures))
self.exposures['INDEX'] = self.tiles.index(self.exposures['TILEID'])
self.exposures['PROGRAM'] = self.tiles.tileprogram[self.exposures['INDEX']]
self.exposures['STATUS'] = np.ones(len(self.exposures), np.int32)
self.exposures['STATUS'][self.exposures['SNR2FRAC'] == 0] = 0
self.exposures['STATUS'][
self.exposures['SNR2FRAC'] >= self.config.min_snr2_fraction()] = 2
# Convert tables to recarrays for much faster indexing.
self.exposures = self.exposures.as_array()
self.tiledata = self.tiledata.as_array()
# Restrict the list of exposures to [start, stop].
date_start = desisurvey.utils.get_date(start)
date_stop = desisurvey.utils.get_date(stop)
mjd_start = desisurvey.utils.local_noon_on_date(date_start).mjd
mjd_stop = desisurvey.utils.local_noon_on_date(date_stop).mjd
in_range = (self.exposures['MJD'] >= mjd_start) & (self.exposures['MJD'] < mjd_stop)
self.exposures = self.exposures[in_range]
self.num_exp = len(self.exposures)
# Count nights with at least one exposure.
day0 = desisurvey.utils.local_noon_on_date(date_start).mjd
day_number = np.floor(self.exposures['MJD'] - day0)
self.num_nights = len(np.unique(day_number))+1
# Calculate each exposure's LST window.
exp_midpt = astropy.time.Time(
self.exposures['MJD'] + self.exposures['EXPTIME'] / 86400.,
format='mjd', location=desisurvey.utils.get_location())
lst_midpt = exp_midpt.sidereal_time('apparent').to(u.deg).value
# convert from seconds to degrees.
lst_len = self.exposures['EXPTIME'] / 240.
self.lst = np.empty((self.num_exp, 2))
self.lst[:, 0] = wrap(lst_midpt - 0.5 * lst_len)
self.lst[:, 1] = wrap(lst_midpt + 0.5 * lst_len)
[docs] def init_date(self, date, ephem):
"""Initialize before drawing frames for a new night.
Parameters
----------
date : datetime.date
Date on which this night's observing starts.
night : astropy.table.Column
Ephemerides data for this night.
"""
# Update the observing program for this night.
dark, gray, bright = self.ephem.tabulate_program(ephem['noon'] + self.dmjd)
self.pdata[:] = 0
self.pdata[dark] = 1
self.pdata[gray] = 2
self.pdata[bright] = 3
self.programs.set_data(self.pdata.reshape(1, -1))
if self.show_scores:
# Load new scheduler scores for this night.
scores_name = self.config.get_path('scores_{0}.fits'.format(date))
if os.path.exists(scores_name):
hdus = astropy.io.fits.open(scores_name, memmap=False)
self.scores = hdus[0].data
hdus.close()
# Save index of first exposure on this date.
noon = desisurvey.utils.local_noon_on_date(date)
self.iexp0 = np.argmax(self.exposures['MJD'] > noon.mjd)
else:
self.warn('Missing scores file: {0}.'.format(scores_name))
# Get interpolator for moon, planet positions during this night.
for i, name in enumerate(self.avoid_names):
self.f_obj[i] = desisurvey.ephem.get_object_interpolator(
ephem, name)
if self.last_date is not None:
week_num = int(np.floor((date - self.start_date).days / 7.))
# Update progress graphs for each program.
for psel, iplot in zip(self.psels, self.iplots):
nprog = np.count_nonzero(psel)
ndone = np.count_nonzero(self.status[psel] == 2)
yprog = iplot.get_ydata()
yprog[week_num] = 1.0 * ndone / nprog
iplot.set_ydata(yprog)
# Lookup which tiles are available and planned for tonight.
day_number = desisurvey.utils.day_number(date)
# avail = self.tiledata['AVAIL']
# no longer makes sense in no-pass scheme.
# consider all tiles as available on day 0.
avail = self.tiledata['AVAIL']*0
self.available = (avail >= 0) & (avail <= day_number)
# planned = self.tiledata['PLANNED']
# no longer makes sense in no-pass scheme
# consider all tiles as planned on day 0.
planned = self.tiledata['PLANNED']*0
self.planned = (planned >= 0) & (planned <= day_number)
self.last_date = date
[docs] def draw_exposure(self, iexp, nightly):
"""Draw the frame for a single exposure.
Calls :meth:`init_date` if this is the first exposure of the night
that we have seen.
Parameters
----------
iexp : int
Index of the exposure to draw.
Returns
-------
bool
True if a new frame was drawn.
"""
info = self.exposures[iexp]
mjd = info['MJD']
date = desisurvey.utils.get_date(mjd)
night = self.ephem.get_night(date)
# Initialize status if necessary.
if self.status is None:
# Find the most recent SNR2FRAC for each tile before this exposure.
snr2frac = np.zeros(self.tiles.ntiles)
for j in range(iexp):
snr2frac[self.exposures['INDEX'][j]] = self.exposures['SNR2FRAC'][j]
# Determine which tiles are unobserved (0), partial (1) or done (2)
# before this exposure.
self.status = np.ones(self.tiles.ntiles)
self.status[snr2frac == 0] = 0
self.status[snr2frac >= self.config.min_snr2_fraction()] = 2
# Update the status for the current exposure.
self.status[info['INDEX']] = info['STATUS']
if date != self.last_date:
# Initialize for this night.
self.init_date(date, night)
elif nightly and iexp != len(self.exposures)-1:
return False
# Update the top-right label.
label = '{} {} #{:06d}'.format(self.label, date, info['EXPID'])
if not nightly:
label += ' ({:.1f}",{:.2f})'.format(
info['SEEING'], info['TRANSP'])
self.text.set_text(label)
if not nightly:
# Update current time in program.
dt1 = mjd - night['noon']
dt2 = dt1 + info['EXPTIME'] / 86400.
self.pline1.set_xdata([dt1, dt1])
self.pline2.set_xdata([dt2, dt2])
if self.show_scores:
# Update scores display for this exposure.
score = self.scores[iexp - self.iexp0]
max_score = np.max(score)
for progidx, scatter in enumerate(self.scatters):
sel = self.tiles.program_mask[self.prognames[progidx]]
done = self.status[sel] == 2
avail = self.available[sel]
inplan = self.planned[sel]
if self.show_scores:
fc = self.scorecmap(score[sel] / max_score)
else:
fc = scatter.get_facecolors()
fc[avail] = self.availcolor
sizes = scatter.get_sizes()
sizes[~inplan] = 20.
sizes[~done & inplan] = 90.
sizes[done] = 30.
fc[done] = self.completecolor
fc[~avail] = self.unavailcolor
if not nightly and (info['PROGRAM'] == self.prognames[progidx]):
# Highlight the tile being observed now.
jdx = np.where(self.tileid[sel] == info['TILEID'])[0][0]
fc[jdx] = self.nowcolor
scatter.get_sizes()[jdx] = 600.
scatter.set_facecolors(fc)
# Update percent complete label.
pct = (100. * np.count_nonzero(self.status[sel] == 2) /
self.tiles_per_program[self.prognames[progidx]])
self.labels[progidx].set_text('{} {:5.1f}%'.format(
self.prognames[progidx], pct))
if not nightly:
# Update LST lines.
x1, x2 = self.lst[iexp]
for progidx, (line1, line2) in enumerate(self.lstlines):
ls = '-' if info['PROGRAM'] == self.prognames[progidx] else '--'
line1.set_linestyle(ls)
line2.set_linestyle(ls)
line1.set_xdata([x1, x1])
line2.set_xdata([x2, x2])
# Fill the moon with a shade of gray corresponding to its illuminated
# fraction during this exposure.
moon_frac = self.ephem.get_moon_illuminated_fraction(mjd)
# Update moon and planet locations.
for i, f in enumerate(self.f_obj):
# Calculate this object's (dec,ra) path during the night.
obj_dec, obj_ra = f(mjd)
self.xy_avoid[i] = wrap(obj_ra), obj_dec
for scatter in self.avoids:
scatter.set_offsets(self.xy_avoid)
scatter.get_facecolors()[self.moon_index] = [
moon_frac, moon_frac, moon_frac, 1.]
return True
[docs]def main(args):
"""Command-line driver to visualize survey scheduling and progress.
"""
# Set up the logger
if args.debug:
log = desiutil.log.get_logger(desiutil.log.DEBUG)
args.verbose = True
elif args.verbose:
log = desiutil.log.get_logger(desiutil.log.INFO)
else:
log = desiutil.log.get_logger(desiutil.log.WARNING)
# Freeze IERS table for consistent results.
desisurvey.utils.freeze_iers()
# Set the output path if requested.
config = desisurvey.config.Configuration()
if args.output_path is not None:
config.set_output_path(args.output_path)
if args.tiles_file is not None:
config.tiles_file.set_value(args.tiles_file)
# Look for the exposures file in the output path by default.
args.exposures = config.get_path(args.exposures)
# Initialize.
animator = Animator(
args.exposures, args.start, args.stop, args.label, args.scores)
log.info('Found {0} exposures from {1} to {2} ({3} nights).'
.format(animator.num_exp, args.start, args.stop,
animator.num_nights))
animator.init_figure(args.nightly)
if args.expid is not None:
expid = animator.exposures['EXPID']
assert np.all(expid == expid[0] + np.arange(len(expid)))
if (args.expid < expid[0]) or (args.expid > expid[-1]):
raise RuntimeError('Requested exposure ID {0} not available.'
.format(args.expid))
animator.draw_exposure(args.expid - expid[0], args.nightly)
save_name = args.save + '.png'
plt.savefig(save_name)
log.info('Saved {0}.'.format(save_name))
else:
nframes = animator.num_nights if args.nightly else animator.num_exp
iexp = [0]
def init():
return animator.artists
def update(iframe):
if (iframe + 1) % args.log_interval == 0:
log.info('Drawing frame {0}/{1}.'
.format(iframe + 1, nframes))
if args.nightly:
while not animator.draw_exposure(iexp[0], nightly=True):
iexp[0] += 1
else:
animator.draw_exposure(iexp=iframe, nightly=False)
return animator.artists
log.info('Movie will be {:.1f} mins long at {:.1f} frames/sec.'
.format(nframes / (60 * args.fps), args.fps))
animation = matplotlib.animation.FuncAnimation(
animator.figure, update, init_func=init, blit=True, frames=nframes)
writer = matplotlib.animation.writers['ffmpeg'](
bitrate=2400, fps=args.fps, metadata=dict(artist='surveymovie'))
save_name = args.save + '.mp4'
animation.save(save_name, writer=writer, dpi=animator.dpi)
log.info('Saved {0}.'.format(save_name))