Source code for desisurvey.utils

"""Utility functions for survey planning and scheduling.
"""
from __future__ import print_function, division

import datetime
import os
import warnings

import numpy as np

import pytz

import astropy.time
from astropy.coordinates import EarthLocation
import astropy.units as u

import desiutil.log
import desiutil.iers

import desimodel.weather

from .config import Configuration



_telescope_location = None
#
# This global variable appears to be unused.
#
_dome_closed_fractions = None

# Temporary assignment for backward compatibility
freeze_iers = desiutil.iers.freeze_iers


[docs]def get_location(): """Return the telescope's earth location. The location object is cached after the first call, so there is no need to cache this function's return value externally. Returns ------- astropy.coordinates.EarthLocation """ global _telescope_location if _telescope_location is None: config = Configuration() _telescope_location = EarthLocation.from_geodetic( lat=config.location.latitude(), lon=config.location.longitude(), height=config.location.elevation()) return _telescope_location
[docs]def get_observer(when, alt=None, az=None): """Return the AltAz frame for the telescope at the specified time(s). Refraction corrections are not applied (for now). The returned object is automatically broadcast over input arrays. Parameters ---------- when : astropy.time.Time One or more times when the AltAz transformations should be calculated. alt : astropy.units.Quantity or None Local altitude angle(s) az : astropy.units.Quantity or None Local azimuth angle(s) Returns ------- astropy.coordinates.AltAz AltAz frame object suitable for transforming to/from local horizon (alt, az) coordinates. """ if alt is not None and az is not None: kwargs = dict(alt=alt, az=az) elif alt is not None or az is not None: raise ValueError('Must specify both alt and az.') else: kwargs = {} return astropy.coordinates.AltAz( location=get_location(), obstime=when, pressure=0, **kwargs)
[docs]def cos_zenith_to_airmass(cosZ): """Convert a zenith angle to an airmass. Uses the Rozenberg 1966 interpolation formula, which gives reasonable results for high zenith angles, with a horizon air mass of 40. https://en.wikipedia.org/wiki/Air_mass_(astronomy)#Interpolative_formulas Rozenberg, G. V. 1966. "Twilight: A Study in Atmospheric Optics." New York: Plenum Press, 160. The value of cosZ is clipped to [0,1], so observations below the horizon return the horizon value (~40). Parameters ---------- cosZ : float or array Cosine of angle(s) to convert. Returns ------- float or array Airmass value(s) >= 1. """ cosZ = np.clip(np.asarray(cosZ), 0., 1.) return np.clip(1. / (cosZ + 0.025 * np.exp(-11 * cosZ)), 1., None)
[docs]def get_airmass(when, ra, dec): """Return the airmass of (ra,dec) at the specified observing time. Uses :func:`cos_zenith_to_airmass`. Parameters ---------- when : astropy.time.Time Observation time, which specifies the local zenith. ra : astropy.units.Quantity Target RA angle(s) dec : astropy.units.Quantity Target DEC angle(s) Returns ------- array or float Value of the airmass for each input (ra,dec). """ target = astropy.coordinates.ICRS(ra=ra, dec=dec) zenith = get_observer(when, alt=90 * u.deg, az=0 * u.deg ).transform_to(astropy.coordinates.ICRS) # Calculate zenith angle in degrees. zenith_angle = target.separation(zenith) # Convert to airmass. return cos_zenith_to_airmass(np.cos(zenith_angle))
[docs]def cos_zenith(ha, dec, latitude=None): """Calculate cos(zenith) for specified hour angle, DEC and latitude. Combine with :func:`cos_zenith_to_airmass` to calculate airmass. Parameters ---------- ha : astropy.units.Quantity Hour angle(s) to use, with units convertible to angle. dec : astropy.units.Quantity Declination angle(s) to use, with units convertible to angle. latitude : astropy.units.Quantity or None Latitude angle to use, with units convertible to angle. Defaults to the latitude of :func:`get_location` if None. Returns ------- numpy array cosine of zenith angle(s) corresponding to the inputs. """ if latitude is None: # Use the observatory latitude by default. latitude = Configuration().location.latitude() # Calculate sin(altitude) = cos(zenith). cosZ = (np.sin(dec) * np.sin(latitude) + np.cos(dec) * np.cos(latitude) * np.cos(ha)) # Return a plain array (instead of a unitless Quantity). return cosZ.value
[docs]def is_monsoon(night): """Test if this night's observing falls in the monsoon shutdown. Uses the monsoon date ranges defined in the :class:`desisurvey.config.Configuration`. Parameters ---------- night : date Converted to a date using :func:`desisurvey.utils.get_date`. Returns ------- bool True if this night's observing falls during the monsoon shutdown. """ date = get_date(night) # Fetch our configuration. config = Configuration() # Test if date falls within any of the shutdowns. for key in config.monsoon.keys: node = getattr(config.monsoon, key) if date >= node.start() and date < node.stop(): return True # If we get here, date does not fall in any of the shutdowns. return False
[docs]def local_noon_on_date(day): """Convert a date to an astropy time at local noon. Local noon is used as the separator between observing nights. The purpose of this function is to standardize the boundary between observing nights and the mapping of dates to times. Generates astropy ErfaWarnings for times in the future. Parameters ---------- day : datetime.date The day to use for generating a time object. Returns ------- astropy.time.Time A Time object with the input date and a time corresponding to local noon at the telescope. """ # Fetch our configuration. config = Configuration() # Build a datetime object at local noon. tz = pytz.timezone(config.location.timezone()) local_noon = tz.localize( datetime.datetime.combine(day, datetime.time(hour=12))) # Convert to UTC. utc_noon = local_noon.astimezone(pytz.utc) # Return a corresponding astropy Time. return astropy.time.Time(utc_noon)
[docs]def get_current_date(): """Give current date following get_date convention (date changes at noon). Returns ------- datetime.date object for current night, following get_date convention """ date = datetime.datetime.now().astimezone() return get_date(date)
[docs]def get_date(date): """Convert different date specifications into a datetime.date object. We use strptime() to convert an input string, so leading zeros are not required for strings in the format YYYY-MM-DD, e.g. 2019-8-3 is considered valid. Instead of testing the input type, we try different conversion methods: ``.datetime.date()`` for an astropy time and ``datetime.date()`` for a datetime. Date specifications that include a time of day (datetime, astropy time, MJD) are rounded down to the previous local noon before converting to a date. This ensures that all times during a local observing night are mapped to the same date, when the night started. A "naive" (un-localized) datetime is assumed to refer to UTC. Generates astropy ERFA warnings for future dates. Parameters ---------- date : astropy.time.Time, datetime.date, datetime.datetime, string or number Specification of the date to return. A string must have the format YYYY-MM-DD (but leading zeros on MM and DD are optional). A number will be interpreted as a UTC MJD value. Returns ------- datetime.date """ input_date = date # valid types: string, number, Time, datetime, date try: # Convert bytes to str. date = date.decode() except AttributeError: pass try: # Convert a string of the form YYYY-MM-DD into a date. # This will raise a ValueError for a badly formatted string # or invalid date such as 2019-13-01. try: date = datetime.datetime.strptime(date, '%Y-%m-%d').date() except ValueError: try: date = datetime.datetime.strptime(date, '%Y%m%d').date() except ValueError: raise except TypeError: pass # valid types: number, Time, datetime, date try: # Convert a number to an astropy time, assuming it is a UTC MJD value. date = astropy.time.Time(date, format='mjd') except ValueError: pass # valid types: Time, datetime, date try: # Convert an astropy time into a datetime date = date.datetime except AttributeError: pass # valid types: datetime, date try: # Localize a naive datetime assuming it refers to UTC. date = pytz.utc.localize(date) except (AttributeError, ValueError): pass # valid types: localized datetime, date try: # Convert a localized datetime into the date of the previous noon. local_tz = pytz.timezone( Configuration().location.timezone()) local_time = date.astimezone(local_tz) date = local_time.date() if local_time.hour < 12: date -= datetime.timedelta(days=1) except AttributeError: pass # valid types: date if not isinstance(date, datetime.date): raise ValueError('Invalid date specification: {0}.'.format(input_date)) return date
[docs]def night_to_str(date): """Return DESI string format (YYYYMMDD) of datetime night. Parameters ---------- date : datetime.date object, as from get_date() Returns ------- str YYYMMDD formatted date string """ return date.isoformat().replace('-', '')
[docs]def day_number(date): """Return the number of elapsed days since the start of the survey. Does not perform any range check that the date is within the nominal survey schedule. Parameters ---------- date : astropy.time.Time, datetime.date, datetime.datetime, string or number Converted to a date using :func:`get_date`. Returns ------- int Number of elapsed days since the start of the survey. """ config = Configuration() return (get_date(date) - config.first_day()).days
[docs]def slewtime(ra1, dec1, ra2, dec2, freeslewtime=10, ignore_positive_ra=False): """Estimate slew times. Uses slew model from DESI-3687. Assumes that freeslewtime s of slew time is "free"---i.e., it can be overlapped with other overheads. Parameters ---------- ra1 : float right ascension (deg) dec1 : float declination (deg) ra2 : float right ascension (deg) dec2 : float declination (deg) freeslewtime : float amount of time during which one can slew "for free" (s) ignore_positive_ra : float if True, slew time in the positive RA direction doesn't count. Intended to provide no penalty for slews that are just keeping up with the sky. In this case, slews in dec don't count if they fit within the slew in RA. Returns ------- Estimated slew time in s needed to reach target. """ slewconstants = dict(ra=[0.45, 0.05, 8], dec=[0.45, 0.05, 8]) isscalar = np.ndim(ra1) == 0 ra1 = np.atleast_1d(ra1) dec1 = np.atleast_1d(dec1) ra2 = np.atleast_1d(ra2) dec2 = np.atleast_1d(dec2) if ((ra1.shape != ra2.shape) or (ra1.shape != dec1.shape) or (ra1.shape != dec2.shape)): raise ValueError('ra1, ra2, dec1, dec2 must have same shape.') def slewtimefun(dx, slewtype, ignore_positive=False): v, a, t = slewconstants[slewtype] dx = ((dx + 180) % 360)-180 if ignore_positive: m = dx > 0 dx[m] = 0 dx = np.abs(dx) m = dx < v**2/a tt = np.zeros(len(dx), dtype='f4') tt[m] = 2*np.sqrt(dx[m]/a)+t tt[~m] = dx[~m]/v+t+v/a return np.clip(tt-freeslewtime, 0, np.inf) tra = slewtimefun(ra2-ra1, 'ra') tdec = slewtimefun(dec2-dec1, 'dec') if ignore_positive_ra: tra2 = slewtimefun(ra2-ra1, 'ra', ignore_positive=ignore_positive_ra) m = tdec < tra tdec[m] = np.clip(tdec[m], 0, tra2[m]) tra = tra2 tra = np.atleast_1d(tra) tdec = np.atleast_1d(tdec) tslew = np.max([tra, tdec], axis=0) if isscalar: tslew = tslew[0] return tslew
[docs]def separation_matrix(ra1, dec1, ra2, dec2, max_separation=None): """Build a matrix of pair-wise separation between (ra,dec) pointings. The ra1 and dec1 arrays must have the same shape. The ra2 and dec2 arrays must also have the same shape, but it can be different from the (ra1,dec1) shape, resulting in a non-square return matrix. Uses the Haversine formula for better accuracy at low separations. See https://en.wikipedia.org/wiki/Haversine_formula for details. Equivalent to using the separations() method of astropy.coordinates.ICRS, but faster since it bypasses any units. Parameters ---------- ra1 : array 1D array of n1 RA coordinates in degrees (without units attached). dec1 : array 1D array of n1 DEC coordinates in degrees (without units attached). ra2 : array 1D array of n2 RA coordinates in degrees (without units attached). dec2 : array 1D array of n2 DEC coordinates in degrees (without units attached). max_separation : float or None When present, the matrix elements are replaced with booleans given by (value <= max_separation), which saves some computation. Returns ------- array Array with shape (n1,n2) with element [i1,i2] giving the 3D separation angle between (ra1[i1],dec1[i1]) and (ra2[i2],dec2[i2]) in degrees or, if max_separation is not None, booleans (value <= max_separation). """ ra1, ra2 = np.deg2rad(ra1), np.deg2rad(ra2) dec1, dec2 = np.deg2rad(dec1), np.deg2rad(dec2) if ra1.shape != dec1.shape: raise ValueError('Arrays ra1, dec1 must have the same shape.') if len(ra1.shape) != 1: raise ValueError('Arrays ra1, dec1 must be 1D.') if ra2.shape != dec2.shape: raise ValueError('Arrays ra2, dec2 must have the same shape.') if len(ra2.shape) != 1: raise ValueError('Arrays ra2, dec2 must be 1D.') havRA12 = 0.5 * (1 - np.cos(ra2 - ra1[:, np.newaxis])) havDEC12 = 0.5 * (1 - np.cos(dec2 - dec1[:, np.newaxis])) havPHI = havDEC12 + np.cos(dec1)[:, np.newaxis] * np.cos(dec2) * havRA12 if max_separation is not None: # Replace n1 x n2 arccos calls with a single sin call. threshold = np.sin(0.5 * np.deg2rad(max_separation)) ** 2 return havPHI <= threshold else: return np.rad2deg(np.arccos(np.clip(1 - 2 * havPHI, -1, +1)))
[docs]def match(a, b): """Find matching elements of b in unique array a by index. Returns indices ma, mb such that a[ma] == b[mb] Parameters ---------- a : unique array b : array Returns ------- ma, mb : indices such that a[ma] == b[mb] """ sa = np.argsort(a) _, ua = np.unique(a[sa], return_index=True) if len(ua) != len(a): raise ValueError('All keys in a must be unique.') ind = np.searchsorted(a[sa], b) m = (ind >= 0) & (ind < len(a)) matches = a[sa[ind[m]]] == b[m] m[m] &= matches return sa[ind[m]], np.flatnonzero(m)
[docs]def yesno(question): """Simple Yes/No Function.""" prompt = f'{question} (y/n): ' ans = input(prompt).strip().lower() if ans not in ['y', 'n', 'yes', 'no']: print(f'{ans} is invalid, please try again...') return yesno(question) if ans == 'y': return True return False
[docs]def get_average_dome_closed_fractions(first, last, smooth=7): """Get daily averaged dome-closed fractions between first and last. Returns the fraction of the time the dome is closed on each night. Parameters ---------- first : datetime.date Date of first night. last : datetime.date Date of last night. Survey stops the morning of this date. smooth : float Number of days to smooth dome closed fraction by. Returns ------- np.ndarray giving dome closed fraction on each night. """ years = np.arange(2007, 2018) fractions = [] for year in years: fractions.append( desimodel.weather.dome_closed_fractions( first, first+datetime.timedelta(days=365), replay='Y{}'.format(year))) from scipy.ndimage import gaussian_filter fractions = gaussian_filter(fractions, smooth, mode='wrap') fractions = np.mean(fractions, axis=0) nnight = (last-first).days nyear = nnight // 365 + 1 fractions = np.tile(fractions, nyear) return fractions[:nnight]