"""Manage and apply tile observing priorities using rules.
"""
from __future__ import print_function, division
import os
import re
import collections
import yaml
import numpy as np
import astropy.table
import astropy.utils.data
import astropy.units as u
import desimodel.io
import desiutil.log
import desisurvey.config
import desisurvey.utils
import desisurvey.tiles
try:
from astropy.utils.data import get_pkg_data_path
except ImportError:
# Astropy < 4.3
from astropy.utils.data import _find_pkg_data_path as get_pkg_data_path
# Loads a YAML file with dictionary key ordering preserved.
# https://stackoverflow.com/questions/5121931/
# in-python-how-can-you-load-yaml-mappings-as-ordereddicts/21048064#21048064
def _ordered_load(stream, Loader=yaml.Loader,
object_pairs_hook=collections.OrderedDict):
class OrderedLoader(Loader):
pass
def construct_mapping(loader, node):
loader.flatten_mapping(node)
return object_pairs_hook(loader.construct_pairs(node))
OrderedLoader.add_constructor(
yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG,
construct_mapping)
return yaml.load(stream, OrderedLoader)
[docs]class Rules(object):
"""Load rules from the specified file.
Read tile group definitions and observing rules from the specified
YAML file.
Parameters
----------
file_name : str
Name of YAML file containing the rules to use. A relative path refers
to our configured output path.
"""
def __init__(self, file_name='rules.yaml'):
self.log = desiutil.log.get_logger()
config = desisurvey.config.Configuration()
self.min_snr2_fraction = config.min_snr2_fraction()
self.finish_started_priority = config.finish_started_priority()
self.ignore_completed_priority = config.ignore_completed_priority()
self.boost_priority_by_passnum = config.boost_priority_by_passnum()
self.adjacency_priority = config.adjacency_priority()
tiles = desisurvey.tiles.get_tiles()
NGC = (tiles.tileRA > 75.0) & (tiles.tileRA < 300.0)
SGC = ~NGC
# Initialize regexp for parsing "GROUP_NAME(PROGRAM)"
parser = re.compile('([^\(]+)\(([^\(]+)\)$')
# Get the full path of the YAML file to read.
if os.path.isabs(file_name):
full_path = file_name
elif os.path.exists(config.get_path(file_name)):
full_path = config.get_path(file_name)
else:
# Locate the config file in our package data/ directory.
full_path = get_pkg_data_path(os.path.join('data', file_name))
# Read the YAML file into memory.
with open(full_path) as f:
rules_dict = _ordered_load(f, yaml.SafeLoader)
group_names = []
group_ids = np.zeros(tiles.ntiles, int)
dec_priority = np.ones(tiles.ntiles, float)
group_rules = {}
group_max_orphans = {}
for group_name in rules_dict:
group_sel = np.ones(tiles.ntiles, bool)
node = rules_dict[group_name]
# Parse optional geographical attribute.
cap = node.get('cap')
if cap == 'N':
group_sel[SGC] = False
elif cap == 'S':
group_sel[NGC] = False
dec_min = node.get('dec_min')
if dec_min is not None:
group_sel[tiles.tileDEC < float(dec_min)] = False
dec_max = node.get('dec_max')
if dec_max is not None:
group_sel[tiles.tileDEC >= float(dec_max)] = False
max_orphans = node.get('max_orphans') or 0
# Parse required "passes" attribute.
programs = node.get('programs')
if programs is None:
raise RuntimeError(
'Missing required programs for {0}.'.format(group_name))
programs = [p.strip() for p in str(programs).split(',')]
for p in tiles.programs:
if p not in programs:
group_sel[tiles.tileprogram == p] = False
# Create GROUP(PROGRAM) subgroup combinations.
final_group_sel = np.zeros(tiles.ntiles, bool)
for p in programs:
program_name = '{0}({1})'.format(group_name, p)
group_names.append(program_name)
group_id = len(group_names)
program_sel = group_sel & (tiles.tileprogram == p)
# Remove any tiles in this pass that have already been assigned
# to a previously defined subgroup.
program_sel[program_sel] &= ~(group_ids[program_sel] != 0)
final_group_sel |= program_sel
group_ids[program_sel] = group_id
group_rules[program_name] = {'START': 0.0}
group_max_orphans[program_name] = max_orphans
# Some tiles may be dropped by covering requirements in this
# or previous groups.
assert not np.any(~group_sel & final_group_sel)
group_sel = final_group_sel
# Calculate priority multipliers to implement optional DEC ordering.
dec_order = node.get('dec_order')
if dec_order is not None and np.any(group_sel):
dec_group = tiles.tileDEC[group_sel]
lo, hi = np.min(dec_group), np.max(dec_group)
slope = float(dec_order)
if lo == hi:
dec_priority[group_sel] = 1.0
elif slope > 0:
dec_priority[group_sel] *= np.exp(-(dec_group-lo)/slope)
else:
dec_priority[group_sel] *= np.exp((hi-dec_group)/slope)
else:
assert np.all(dec_priority[group_sel] == 1)
# Parse rules for this group.
rules = node.get('rules')
if rules is None:
raise RuntimeError(
'Missing required rules for {0}.'.format(group_name))
for target in rules:
target_parsed = parser.match(target)
if not target_parsed or target_parsed.groups(1) == group_name:
raise RuntimeError('Invalid rule target: {0}'.format(target))
for trigger in rules[target]:
if trigger != 'START':
trigger_parsed = parser.match(trigger)
if not trigger_parsed:
raise RuntimeError(
'Invalid rule trigger: {0}.'.format(trigger))
try:
new_weight = float(rules[target][trigger])
except ValueError:
raise RuntimeError(
'Invalid new weight for trigger {0}: {1}.'
.format(trigger, rules[target][trigger]))
assert target in group_rules
group_rules[target][trigger] = new_weight
# Check that all tiles are assigned to exactly one group.
if np.any(group_ids == 0):
orphans = (group_ids == 0) & (tiles.in_desi)
programs = ','.join([str(s) for s in np.unique(tiles.tileprogram[orphans])])
self.log.warning(
'{0} tiles in passes {1} not assigned to any group. These '
'tiles will be given zero priority. '
.format(np.count_nonzero(orphans), programs))
# Check that all rule triggers are valid subgroup names.
for name in group_names:
for target in group_rules[name]:
if target == 'START':
continue
if target not in group_names:
raise RuntimeError(
'Invalid target {0} in {1} rule.'.format(target, name))
self.group_names = group_names
self.group_ids = group_ids
self.group_rules = group_rules
self.dec_priority = dec_priority
self.group_max_orphans = group_max_orphans
[docs] def apply(self, donefrac):
"""Apply rules to determine tile priorites based on those completed so far.
Parameters
----------
completed : array
Boolean array of per-tile completion status.
Returns
-------
array
Array of per-tile observing priorities.
"""
tiles = desisurvey.tiles.get_tiles()
nogray = tiles.nogray
# First pass through groups to check trigger conditions.
triggered = {'START': True}
notilescoveredrules = []
for i, name in enumerate(self.group_names):
gid = i+1
group_sel = self.group_ids == gid
if not np.any(group_sel):
notilescoveredrules.append(name)
ngroup = np.count_nonzero(group_sel)
completed = donefrac >= self.min_snr2_fraction
ndone = np.count_nonzero(completed[group_sel])
max_orphans = self.group_max_orphans[name]
triggered[name] = (ndone + max_orphans >= ngroup)
notilescoveredrules = [x for x in notilescoveredrules
if not nogray or '(GRAY)' not in name]
if len(notilescoveredrules) > 0:
self.log.debug('No tiles covered by rules {}'.format(
' '.join(notilescoveredrules)))
# Second pass through groups to apply rules.
priorities = np.zeros_like(self.dec_priority)
for gid, name in zip(np.unique(self.group_ids), self.group_names):
priority = 0
for condition, value in self.group_rules[name].items():
if triggered[condition]:
priority = max(priority, value)
sel = self.group_ids == gid
priorities[sel] = priority * self.dec_priority[sel]
priorities *= (1 + self.boost_priority_by_passnum)**tiles.tilepass
priorities *= (1 + self.finish_started_priority*(donefrac > 0))
neighborfrac = completed_neighbor_fraction(
tiles, donefrac >= self.min_snr2_fraction)
priorities *= (1 + self.adjacency_priority*neighborfrac)
if self.ignore_completed_priority > 0:
priorities *= np.where(donefrac >= self.min_snr2_fraction,
self.ignore_completed_priority, 1)
priorities *= tiles.priority_boostfac
return priorities
def completed_neighbor_fraction(tiles, completed):
cache = getattr(completed_neighbor_fraction, 'neighborcache', None)
if cache is None:
n1 = np.repeat(np.arange(len(tiles.neighbors)),
[len(x) for x in tiles.neighbors])
n2 = np.concatenate([x for x in tiles.neighbors if len(x) > 0])
completed_neighbor_fraction.neighborcache = (n1, n2)
else:
n1, n2 = cache
if len(completed) != tiles.ntiles:
raise ValueError('shape mismatch between completed and tiles!')
res = np.bincount(n1, weights=completed[n2],
minlength=len(tiles.neighbors))
nneighbor = np.bincount(n1, minlength=len(tiles.neighbors))
res = res / (nneighbor + (nneighbor == 0))
return res