Source code for desisurvey.tiles

"""Manage static information associated with tiles, programs and passes.

Each tile has an assigned program name. The program names
(DARK, BRIGHT) are predefined in terms of conditions on the
ephemerides, but not all programs need to be present in a tiles file.
Pass numbers are arbitrary integers and do not need to be consecutive or dense.

To ensure consistent and efficient usage of static tile info, all code
should use::

    tiles = desisurvey.tiles.get_tiles()

To use a non-standard tiles file, change the configuration before the
first call to ``get_tiles()`` with::

    config = desisurvey.config.Configuration()
    config.tiles_file.set_value(name)

The :class:`Tiles` class returned by :func:`get_tiles` is a wrapper around
the FITS table contained in a tiles file, that adds some precomputed derived
attributes for consistency and efficiency.
"""
from __future__ import print_function, division

import os

import numpy as np

import astropy.units as u
from astropy.time import Time
from astropy.table import Table

import desimodel.io
import desiutil.log

import desisurvey.config
import desisurvey.utils
import desisurvey.etc


[docs]class Tiles(object): """Manage static info associated with the tiles file. Parameters ---------- tile_file : str or None Name of the tiles file to use or None for the default specified in our configuration. """ def __init__(self, tiles_file=None): config = desisurvey.config.Configuration() self.nogray = config.tiles_nogray() bright_allowed_in_dark = getattr( config, 'bright_allowed_in_dark', None) if bright_allowed_in_dark is not None: self.bright_allowed_in_dark = bright_allowed_in_dark() else: self.bright_allowed_in_dark = False # Read the specified tiles file. self.tiles_file = tiles_file or config.tiles_file() self.tiles_file = find_tile_file(self.tiles_file) tiles = self.read_tiles_table() # Copy tile arrays. self.tileID = tiles['TILEID'].data.copy() self.tileRA = tiles['RA'].data.copy() self.tileDEC = tiles['DEC'].data.copy() self.tileprogram = np.array([p.strip() for p in tiles['PROGRAM']]) self.tilepass = tiles['PASS'].data.copy() self.designha = None if 'DESIGNHA' in tiles.dtype.names: self.designha = tiles['DESIGNHA'].data.copy() self.priority_boostfac = np.ones(len(tiles), dtype='f4') if 'PRIORITY_BOOSTFAC' in tiles.dtype.names: self.priority_boostfac = tiles['PRIORITY_BOOSTFAC'] self.tileobsconditions = self.get_conditions() if self.nogray: mgray = self.tileobsconditions == 'GRAY' self.tileobsconditions[mgray] = 'DARK' self.in_desi = tiles['IN_DESI'].data.copy() != 0 # Count tiles. self.ntiles = len(self.tileID) # Can remove this when tile_index no longer uses searchsorted. if not np.all(np.diff(self.tileID) > 0): raise RuntimeError('Tile IDs are not increasing.') self.programs = [x for x in np.unique(tiles['PROGRAM'].data)] self.program_index = {pname: pidx for pidx, pname in enumerate(self.programs)} # Build tile masks for each program. A program will no tiles with have an empty mask. self.program_mask = {} for p in self.programs: self.program_mask[p] = (self.tileprogram == p) & self.in_desi # Calculate and save dust exposure factors. self.dust_factor = desisurvey.etc.dust_exposure_factor(tiles['EBV_MED'].data) # Precompute coefficients to calculate tile observing airmass. latitude = np.radians(config.location.latitude()) tile_dec_rad = np.radians(self.tileDEC) self.tile_coef_A = np.sin(tile_dec_rad) * np.sin(latitude) self.tile_coef_B = np.cos(tile_dec_rad) * np.cos(latitude) # Placeholders for overlap attributes that are expensive to calculate # so we use lazy evaluation the first time they are accessed. self._overlapping = None self._neighbors = None self._fiberassign_delay = None # Calculate the maximum |HA| in degrees allowed for each tile to stay # above the survey minimum altitude cosZ_min = np.cos(90 * u.deg - config.min_altitude()) cosHA_min = ( (cosZ_min - np.sin(self.tileDEC * u.deg) * np.sin(latitude)) / (np.cos(self.tileDEC * u.deg) * np.cos(latitude))).value cosHA_min = np.clip(cosHA_min, -1, 1) self.max_abs_ha = np.degrees(np.arccos(cosHA_min)) m = ~np.isfinite(self.max_abs_ha) | (self.max_abs_ha < 3.75) self.max_abs_ha[m] = 7.5 # always give at least a half hour window. CONDITIONS = ['DARK', 'GRAY', 'BRIGHT'] CONDITION_INDEX = {cond: i for i, cond in enumerate(CONDITIONS)}
[docs] def airmass(self, hour_angle, mask=None): """Calculate tile airmass given hour angle. Parameters ---------- hour_angle : array Array of hour angles in degrees to use. If mask is None, then should have length ``self.ntiles``. Otherwise, should have a value per non-zero entry in the mask. mask : array or None Boolean mask of which tiles to perform the calculation for. Returns ------- array Array of airmasses corresponding to each input hour angle. """ hour_angle = np.deg2rad(hour_angle) if mask is None: mask = slice(None) cosZ = self.tile_coef_A[mask] + self.tile_coef_B[mask] * np.cos(hour_angle) return desisurvey.utils.cos_zenith_to_airmass(cosZ)
[docs] def airmass_at_mjd(self, mjd, mask=None): """Calculate tile airmass at given MJD. Parameters ---------- mjd : array Array of MJD to use. If mask is None, then should have length ``self.ntiles``. Otherwise, should have a value per non-zero entry in the mask. mask : array or None Boolean mask of which tiles to perform the calculation for. Returns ------- array Array of airmasses corresponding to each input hour angle. """ mjd = np.atleast_1d(mjd) if len(mjd) == 0: return np.zeros(0, dtype='f8') tt = Time(mjd, format='mjd', location=desisurvey.utils.get_location()) lst = tt.sidereal_time('apparent').to(u.deg).value ha = lst - self.tileRA[mask] return self.airmass(ha, mask=mask)
[docs] def airmass_second_derivative(self, HA, mask=None): """Calculate second derivative of airmass with HA. Useful for determining how close to design airmass we have to get for different tiles. When this is large, we really need to observe things right at their design angles. When it's small, we have more flexibility. """ x = self.airmass(HA, mask=mask) if mask is not None: b = self.tile_coef_B[mask] else: b = self.tile_coef_B d2rad = b*x**2 * (2*b*x*np.sin(np.radians(HA))**2 + np.cos(np.radians(HA))) return d2rad * (np.pi/180)**2
[docs] def index(self, tileID, return_mask=False): """Map tile ID to array index. Parameters ---------- tileID : int or array Tile ID value(s) to convert. mask : bool if mask=True, an additional mask array is returned, indicating which IDs were present in the tile array. Otherwise, an exception is raised if tiles were not found. Returns ------- int or array Index into internal per-tile arrays corresponding to each input tile ID. """ scalar = np.isscalar(tileID) tileID = np.atleast_1d(tileID) if np.any(tileID < 0): raise ValueError('tileIDs must positive!') idx = np.searchsorted(self.tileID, tileID) idx = np.clip(idx, 0, len(self.tileID)-1) bad = self.tileID[idx] != tileID if not return_mask and np.any(bad): raise ValueError('Invalid tile ID(s): {}.'.format(tileID[bad])) mask = ~bad idx = idx[0] if scalar else idx mask = mask[0] if scalar else mask res = idx if return_mask: res = (res, mask) return res
def get_conditions(self): res = [] config = desisurvey.config.Configuration() for program in self.tileprogram: tprogram = getattr(config.programs, program, None) if tprogram is None: res.append('NONE') else: res.append(tprogram.conditions()) return np.array(res) def allowed_in_conditions(self, cond): if self.nogray and (cond == 'GRAY'): cond = 'DARK' res = (self.tileobsconditions == cond) if self.bright_allowed_in_dark and (cond == 'DARK'): res = res | (self.tileobsconditions == 'BRIGHT') return res @property def overlapping(self): """Dictionary of tile overlap matrices. overlapping[i] is the list of tile row numbers that overlap the tile with row number i. Overlapping tiles are only computed within a program; a tile cannot overlap a tile of a different program. If fiber_assignment_delay is negative, tile do not overlap one another within a program. """ if self._overlapping is None: self._calculate_overlaps() return self._overlapping @property def neighbors(self): """Dictionary of tile neighbor matrices. neighbors[i] is the list of tile row numbers that neighbor the tile with row number i within a pass. Neighboring tiles are only computed within a program and pass. """ if self._neighbors is None: self._calculate_neighbors() return self._neighbors @property def fiberassign_delay(self): """Delay between covering a tile and when it can be fiber assigned. Units are determined by the value of the fiber_assignment_cadence configuration parameter. """ if self._fiberassign_delay is None: self._calculate_overlaps() return self._fiberassign_delay
[docs] def _calculate_overlaps(self): """Initialize attributes _overlapping. Uses the config parameters ``fiber_assignment_delay`` and ``tile_diameter`` to determine overlap dependencies. This is relatively slow, so only used the first time ``overlapping`` properties are accessed. """ self._overlapping = [[] for _ in range(self.ntiles)] self._fiberassign_delay = np.full(self.ntiles, -1, int) config = desisurvey.config.Configuration() tile_diameter = 2 * config.tile_radius() fiber_assignment_delay = config.fiber_assignment_delay for program in self.programs: delay = getattr(fiber_assignment_delay, program, None) if delay is not None: delay = delay() else: delay = -1 m = self.program_mask[program] & (self.in_desi != 0) rownum = np.flatnonzero(m) self._fiberassign_delay[m] = delay # self._overlapping: list of lists, giving tiles overlapping each # tile if delay < 0: # this program doesn't have overlapping tile requirements continue from astropy.coordinates import SkyCoord, search_around_sky c = SkyCoord(self.tileRA[m]*u.deg, self.tileDEC[m]*u.deg) idx1, idx2, sep2d, dist3d = search_around_sky(c, c, tile_diameter) for ind1, ind2 in zip(idx1, idx2): if ind1 == ind2: # ignore self matches continue self._overlapping[rownum[ind1]].append(rownum[ind2])
[docs] def _calculate_neighbors(self): """Initialize attribute _neighbors. A neigbor is defined as a tile in the same pass within 3 * config.tile_radius if config.tiles_lowpass is True. Otherwise, it's all tiles within the program within 3 * config.tile_radius. This is relatively slow, so only used the first time ``overlapping`` properties are accessed. """ self._neighbors = [[] for _ in range(self.ntiles)] config = desisurvey.config.Configuration() match_distance = 3 * config.tile_radius() lowpass = config.tiles_lowpass() for program in self.programs: mprogram = self.program_mask[program] rownum = np.flatnonzero(mprogram) from astropy.coordinates import SkyCoord, search_around_sky c = SkyCoord(self.tileRA[mprogram]*u.deg, self.tileDEC[mprogram]*u.deg) idx1, idx2, sep2d, dist3d = search_around_sky( c, c, match_distance) if lowpass: m = self.tilepass[idx1] == self.tilepass[idx2] idx1 = idx1[m] idx2 = idx2[m] for ind1, ind2 in zip(idx1, idx2): if ind1 == ind2: # ignore self matches continue self._neighbors[rownum[ind1]].append(rownum[ind2]) for passnum in np.unique(self.tilepass[mprogram]): m = (mprogram & (self.tilepass == passnum) & (self.in_desi != 0)) rownum = np.flatnonzero(m)
# self._neighbors: list of lists, giving tiles neighboring each # tile
[docs] def read_tiles_table(self): """Read and trim the tiles table. Must be called after self.tiles_file and self.nogray member variables have been set. """ config = desisurvey.config.Configuration() tiles = Table.read(self.tiles_file) # control truncation tileprogram = np.zeros(len(tiles), dtype='U20') tileprogram[:] = tiles['PROGRAM'] # convert dark/gray/bright to canonical DARK/GRAY/BRIGHT progupper = np.array([t.upper().strip() for t in tileprogram]) for p in ['DARK', 'BRIGHT', 'GRAY', 'BACKUP']: m = progupper == p tileprogram[m] = p if self.nogray: m = (tileprogram == 'GRAY') | (tileprogram == 'DARK') tileprogram[m] = 'DARK' tiles['PROGRAM'] = tileprogram trim = config.tiles_trim() if trim: tiles = tiles[tiles['IN_DESI'] != 0] tprograms = np.unique(tiles['PROGRAM']) programinconfig = np.isin(tprograms, [x for x in config.programs.keys]) log = desiutil.log.get_logger() keep = np.ones(len(tiles), dtype='bool') if np.any(~programinconfig): for program in tprograms[~programinconfig]: keep[tiles['PROGRAM'] == program] = 0 log.info('Removing the following programs from the tile ' 'file: ' + ' '.join(tprograms[~programinconfig])) tiles = tiles[keep] if not np.all(np.diff(tiles['TILEID']) > 0): tiles['TILEID'] = np.arange(len(tiles)) log = desiutil.log.get_logger() log.warning('Tile file TILEID are not ascending; rewriting.') return tiles
_cached_tiles = {}
[docs]def get_tiles(tiles_file=None, use_cache=True, write_cache=True): """Return a Tiles object with optional caching. You should normally always use the default arguments to ensure that tiles are defined consistently and efficiently between different classes. Parameters ---------- tiles_file : str or None Use the specified name to override config.tiles_file. use_cache : bool Use tiles previously cached in memory when True. Otherwise, (re)load tiles from disk. write_cache : bool If tiles need to be loaded from disk with this call, save them in a memory cache for future calls. """ global _cached_tiles log = desiutil.log.get_logger() config = desisurvey.config.Configuration() tiles_file = tiles_file or config.tiles_file() if use_cache and tiles_file in _cached_tiles: tiles = _cached_tiles[tiles_file] log.debug('Using cached tiles for "{}".'.format(tiles_file)) else: tiles = Tiles(tiles_file) log.info('Initialized tiles from "{}".'.format(tiles_file)) for pname in tiles.programs: log.debug('{:6s}: {} tiles'.format( pname, np.sum(tiles.program_mask[pname]))) if write_cache: _cached_tiles[tiles_file] = tiles else: log.info('Tiles not cached for "{}".'.format(tiles_file)) return tiles
def find_tile_file(filename): log = desiutil.log.get_logger() if os.path.isabs(filename): if not os.path.exists(filename): raise FileNotFoundError( 'tile file not found at {}'.format(filename)) return filename dmname = desimodel.io.findfile(os.path.join('footprint', filename)) dsdirname = os.environ.get('DESISURVEY_OUTPUT', None) if dsdirname is not None: dsname = os.path.join(dsdirname, filename) else: dsname = '' localname = filename namedict = dict(DESISURVEY=(dsname, os.path.exists(dsname)), DESIMODEL=(dmname, os.path.exists(dmname)), LOCAL=(localname, os.path.exists(localname))) for key in namedict: if namedict[key][1]: fn = namedict.pop(key)[0] others = [key for (key, (name, exists)) in namedict.items() if exists] if len(others) > 0: log.info('Using {} filename, '.format(fn) + 'ignoring other files of same name: ' + ' '.join(others)) return fn raise FileNotFoundError('tile file not found at {}'.format(filename))
[docs]def get_nominal_program_times(tileprogram, config=None, return_timetypes=False): """Return nominal times for given programs in seconds.""" if config is None: config = desisurvey.config.Configuration() progconf = config.programs nomtimes = [] timetypes = [] unknownprograms = [] nunknown = 0 for program in tileprogram: tprogconf = getattr(progconf, program, None) if tprogconf is None: nomprogramtime = 300 unknownprograms.append(program) nunknown += 1 timetype = 'ELG' else: nomprogramtime = getattr(tprogconf, 'efftime')() timetype = getattr(tprogconf, 'efftime_type')() if not isinstance(nomprogramtime, int): nomprogramtime = nomprogramtime.to(u.s).value nomtimes.append(nomprogramtime) timetypes.append(timetype) if nunknown > 0: log = desiutil.log.get_logger() log.debug(('%d observations of unknown programs:' % nunknown) + ' '.join(np.unique(unknownprograms))) nomtimes = np.array(nomtimes) timetypes = np.array(timetypes) ret = nomtimes if return_timetypes: ret = (ret, timetypes) return ret