from .maps import TXBaseMaps, map_config_options
import numpy as np
from .base_stage import PipelineStage
from .mapping import (
make_coverage_map,
make_dask_shear_maps,
make_dask_flag_maps,
make_dask_bright_object_map,
make_dask_depth_map,
make_dask_depth_map_det_prob,
)
from .data_types import MapsFile, HDFFile, ShearCatalog
from .utils import choose_pixelization, import_dask
from ceci.config import StageParameter
[docs]
class TXAuxiliarySourceMaps(PipelineStage):
name = "TXAuxiliarySourceMaps"
dask_parallel = True
inputs = [
("shear_catalog", ShearCatalog), # for psfs
("shear_tomography_catalog", HDFFile), # for per-bin psf maps
("source_maps", MapsFile), # we copy the pixel scheme from here
]
outputs = [
("aux_source_maps", MapsFile),
]
config_options = {
"block_size": StageParameter(int, 0, msg="Block size for dask processing (0 means auto)."),
"flag_exponent_max": StageParameter(int, 8, msg="Maximum exponent for flag bits (default 8)."),
**map_config_options,
}
def choose_pixel_scheme(self):
with self.open_input("source_maps", wrapper=True) as maps_file:
pix_info = dict(maps_file.file["maps"].attrs)
return choose_pixelization(**pix_info)
def run(self):
dask, da = import_dask()
import healsparse as hsp
pixel_scheme = self.choose_pixel_scheme()
block_size = self.config["block_size"]
if block_size == 0:
block_size = "auto"
flag_exponent_max = self.config["flag_exponent_max"]
# We have to keep this open throughout the process, because
# dask will internally load chunks of the input hdf5 data.
shear_cat = self.open_input("shear_catalog", wrapper=True)
shear_tomo = self.open_input("shear_tomography_catalog", wrapper=True)
nbin = shear_tomo.file["tomography"].attrs["nbin"]
# The "all" bin is the non-tomographic case.
bins = list(range(nbin)) + ["all"]
maps = {}
group = shear_cat.get_primary_catalog_group()
# These don't actually load all the data - everything is lazy
ra = da.from_array(shear_cat.file[f"{group}/ra"], block_size)
# force all columns to use the same block size
block_size = ra.chunksize
dec = da.from_array(shear_cat.file[f"{group}/dec"], block_size)
psf_g1 = da.from_array(shear_cat.file[f"{group}/g1"], block_size)
psf_g2 = da.from_array(shear_cat.file[f"{group}/g2"], block_size)
weight = da.from_array(shear_cat.file[f"{group}/weight"], block_size)
flag_name = "flags"
flag = da.from_array(shear_cat.file[f"{group}/{flag_name}"], block_size)
b = da.from_array(shear_tomo.file["tomography/bin"], block_size)
# collate metadata
metadata = {key: self.config[key] for key in map_config_options}
metadata["flag_exponent_max"] = flag_exponent_max
metadata["nbin"] = nbin
metadata["nbin_source"] = nbin
metadata.update(pixel_scheme.metadata)
cov_map = make_coverage_map(ra, dec, pixel_scheme)
for i in bins:
if i == "all":
w = b >= 0
else:
w = b == i
shear_map_results = make_dask_shear_maps(
ra[w], dec[w], psf_g1[w], psf_g2[w], weight[w], pixel_scheme, cov_map
)
# Change output name
if i == "all":
i = "2D"
maps[f"psf/count_{i}"] = shear_map_results["count_map"]
maps[f"psf/g1_{i}"] = shear_map_results["g1_map"]
maps[f"psf/g2_{i}"] = shear_map_results["g2_map"]
maps[f"psf/var_g1_{i}"] = shear_map_results["var1_map"]
maps[f"psf/var_g2_{i}"] = shear_map_results["var2_map"]
maps[f"psf/var_e_{i}"] = shear_map_results["esq_map"]
maps[f"psf/lensing_weight_{i}"] = shear_map_results["weight_map"]
# Now add the flag maps. These are not tomographic.
flag_maps = make_dask_flag_maps(
ra, dec, flag, flag_exponent_max, pixel_scheme, cov_map
)
for j in range(flag_exponent_max):
maps[f"flags/flag_{2**j}"] = flag_maps[j]
(maps,) = dask.compute(maps)
# Print out some info about the flag maps
for i in range(flag_exponent_max):
f = 2**i
count = maps[f"flags/flag_{f}"].sum()
print(f"Map shows total {count} objects with flag {f}")
# convert sparse_map arrays into healsparse map objects
hsp_maps = {}
for name, map in maps.items():
hsp_maps[name] = hsp.HealSparseMap(
cov_map=cov_map, sparse_map=map, nside_sparse=cov_map.nside_sparse
)
# write the output maps
with self.open_output("aux_source_maps", wrapper=True) as out:
for map_name, hsp_map in hsp_maps.items():
out.write_map(map_name, hsp_map, metadata)
out.file["maps"].attrs.update(metadata)
[docs]
class TXAuxiliaryLensMaps(TXBaseMaps):
"""
Generate auxiliary maps from the lens catalog
This class generates maps of:
- depth
- bright object counts
"""
name = "TXAuxiliaryLensMaps"
dask_parallel = True
inputs = [
("photometry_catalog", HDFFile), # for mags etc
]
outputs = [
("aux_lens_maps", MapsFile),
]
config_options = {
"block_size": StageParameter(int, 0, msg="Block size for dask processing (0 means auto)."),
"bright_obj_threshold": StageParameter(float, 22.0, msg="Magnitude threshold for bright objects."),
"depth_band": StageParameter(str, "i", msg="Band for depth maps."),
"snr_threshold": StageParameter(float, 10.0, msg="S/N value for depth maps."),
"snr_delta": StageParameter(float, 1.0, msg="Delta for S/N thresholding."),
}
def run(self):
# Import dask and alias it as 'da'
_, da = import_dask()
import healsparse as hsp
# Retrieve configuration parameters
block_size = self.config["block_size"]
if block_size == 0:
block_size = "auto"
band = self.config["depth_band"]
# Open the input photometry catalog file.
# We can't use a "with" statement because we need to keep the file open
# while we're using dask.
f = self.open_input("photometry_catalog", wrapper=True)
# Load photometry data into dask arrays.
# This is lazy in dask, so we're not actually loading the data here.
ra = da.from_array(f.file["photometry/ra"], block_size)
block_size = ra.chunksize
dec = da.from_array(f.file["photometry/dec"], block_size)
extendedness = da.from_array(f.file["photometry/extendedness"], block_size)
snr = da.from_array(f.file[f"photometry/snr_{band}"], block_size)
mag = da.from_array(f.file[f"photometry/mag_{band}"], block_size)
# Choose the pixelization scheme based on the configuration.
# Might need to review this to make sure we use the same scheme everywhere
pixel_scheme = choose_pixelization(**self.config)
assert pixel_scheme.nest
cov_map = make_coverage_map(ra, dec, pixel_scheme)
# Initialize a dictionary to store the maps.
# To start with this is all lazy too, until we call compute
maps = {}
# Create a bright object map using dask
bright_object_results = make_dask_bright_object_map(
ra,
dec,
mag,
extendedness,
self.config["bright_obj_threshold"],
pixel_scheme,
cov_map,
)
maps["bright_objects/count"] = bright_object_results["count"]
# Create depth maps using dask
depth_map_results = make_dask_depth_map(
ra,
dec,
mag,
snr,
self.config["snr_threshold"],
self.config["snr_delta"],
pixel_scheme,
cov_map,
)
maps["depth/depth"] = depth_map_results["depth_map"]
maps["depth/depth_count"] = depth_map_results["count_map"]
maps["depth/depth_var"] = depth_map_results["depth_var"]
(maps,) = da.compute(maps)
# convert sparse_map arrays into healsparse map objects
hsp_maps = {}
for name, map in maps.items():
hsp_maps[name] = hsp.HealSparseMap(
cov_map=cov_map, sparse_map=map, nside_sparse=cov_map.nside_sparse
)
# Prepare metadata for the maps. Copy the pixelization-related
# configuration options only here
metadata = {key: self.config[key] for key in map_config_options if key in self.config}
# Then add the other configuration options
metadata["depth_band"] = band
metadata["depth_snr_threshold"] = self.config["snr_threshold"]
metadata["depth_snr_delta"] = self.config["snr_delta"]
metadata["bright_obj_threshold"] = self.config["bright_obj_threshold"]
metadata.update(pixel_scheme.metadata)
# Write the output maps to the output file
with self.open_output("aux_lens_maps", wrapper=True) as out:
for map_name, hsp_map in hsp_maps.items():
out.write_map(map_name, hsp_map, metadata)
out.file["maps"].attrs.update(metadata)
[docs]
class TXAuxiliarySSIMaps(TXBaseMaps):
"""
Generate auxiliary maps from SSI catalogs
This class generates maps of:
- depth (measured magnitude)
- depth (true magnitude)
"""
name = "TXAuxiliarySSIMaps"
dask_parallel = True
inputs = [
("matched_ssi_photometry_catalog", HDFFile), # injected objhects that were detected
("injection_catalog", HDFFile), # injection locations
("ssi_detection_catalog", HDFFile), # detection info on each injection
]
outputs = [
("aux_ssi_maps", MapsFile),
]
###################
##################
config_options = {
"block_size": StageParameter(int, 0, msg="Block size for dask processing (0 means auto)."),
"depth_band": StageParameter(str, "i", msg="Band for depth maps."),
"snr_threshold": StageParameter(float, 10.0, msg="S/N value for depth maps."),
"snr_delta": StageParameter(float, 1.0, msg="Delta for S/N thresholding."),
"det_prob_threshold": StageParameter(float, 0.8, msg="Detection probability threshold for SSI depth."),
"mag_delta": StageParameter(float, 0.01, msg="Magnitude bin size for detection probability depth."),
"min_depth": StageParameter(float, 18, msg="Minimum magnitude for detection probability depth."),
"max_depth": StageParameter(float, 26, msg="Maximum magnitude for detection probability depth."),
"smooth_det_frac": StageParameter(bool, True, msg="Apply smoothing to detection fraction vs magnitude."),
"smooth_window": StageParameter(float, 0.5, msg="Smoothing window size in magnitudes."),
}
def run(self):
# Import dask and alias it as 'da'
_, da = import_dask()
import healsparse as hsp
# Retrieve configuration parameters
block_size = self.config["block_size"]
if block_size == 0:
block_size = "auto"
band = self.config["depth_band"]
# Open the input catalog files
# We can't use "with" statements because we need to keep the file open
# while we're using dask.
f_matched = self.open_input("matched_ssi_photometry_catalog", wrapper=True)
f_inj = self.open_input("injection_catalog", wrapper=True)
f_det = self.open_input("ssi_detection_catalog", wrapper=True)
# Load matched catalog data into dask arrays.
# This is lazy in dask, so we're not actually loading the data here.
ra = da.from_array(f_matched.file["photometry/ra"], block_size)
block_size = ra.chunksize
dec = da.from_array(f_matched.file["photometry/dec"], block_size)
snr = da.from_array(f_matched.file[f"photometry/snr_{band}"], block_size)
mag_meas = da.from_array(f_matched.file[f"photometry/mag_{band}"], block_size)
mag_true = da.from_array(f_matched.file[f"photometry/inj_mag_{band}"], block_size)
# Choose the pixelization scheme based on the configuration.
# Might need to review this to make sure we use the same scheme everywhere
pixel_scheme = choose_pixelization(**self.config)
# make coverage map for these ra,dec
cov_map = make_coverage_map(ra, dec, pixel_scheme)
# Load detection catalog data into dask arrays.
# This is lazy in dask, so we're not actually loading the data here.
ra_inj = da.from_array(f_inj.file["photometry/ra"], block_size)
dec_inj = da.from_array(f_inj.file["photometry/dec"], block_size)
inj_mag = da.from_array(f_inj.file[f"photometry/inj_mag_{band}"], block_size)
det = da.from_array(f_det.file[f"photometry/detected"], block_size)
# Initialize a dictionary to store the maps.
# To start with this is all lazy too, until we call compute
maps = {}
# Create depth maps using dask and measured magnitudes
depth_map_results = make_dask_depth_map(
ra,
dec,
mag_meas,
snr,
self.config["snr_threshold"],
self.config["snr_delta"],
pixel_scheme,
cov_map,
)
maps["depth_meas/depth"] = depth_map_results["depth_map"]
maps["depth_meas/depth_count"] = depth_map_results["count_map"]
maps["depth_meas/depth_var"] = depth_map_results["depth_var"]
# Create depth maps using dask and true magnitudes
depth_map_results = make_dask_depth_map(
ra,
dec,
mag_true,
snr,
self.config["snr_threshold"],
self.config["snr_delta"],
pixel_scheme,
cov_map,
)
maps["depth_true/depth"] = depth_map_results["depth_map"]
maps["depth_true/depth_count"] = depth_map_results["count_map"]
maps["depth_true/depth_var"] = depth_map_results["depth_var"]
# Create depth maps using injection catalog
# depth is defined at given detection probability
depth_map_results = make_dask_depth_map_det_prob(
ra_inj,
dec_inj,
inj_mag,
det,
self.config["det_prob_threshold"],
self.config["mag_delta"],
self.config["min_depth"],
self.config["max_depth"],
pixel_scheme,
cov_map,
self.config["smooth_det_frac"],
self.config["smooth_window"],
)
maps["depth_det_prob/depth"] = depth_map_results["depth_map"]
maps["depth_det_prob/depth_det_count"] = depth_map_results["det_count_map"]
maps["depth_det_prob/depth_inj_count"] = depth_map_results["inj_count_map"]
maps["depth_det_prob/det_frac_by_mag_thres"] = depth_map_results[
"det_frac_by_mag_thres"
]
maps["depth_det_prob/inj_count_by_mag_thres"] = depth_map_results[
"inj_count_by_mag_thres"
]
(maps,) = da.compute(maps)
# convert sparse_map arrays into healsparse map objects
hsp_maps = {}
for name, map in maps.items():
if map.ndim == 2: # is a 2D map, save as recarray
map = np.rec.fromarrays(
map, names=[f"bin{i}" for i in range(map.shape[0])]
)
primary = "bin0"
else:
primary = None
hsp_maps[name] = hsp.HealSparseMap(
cov_map=cov_map,
sparse_map=map,
nside_sparse=cov_map.nside_sparse,
primary=primary,
)
# Prepare metadata for the maps. Copy the pixelization-related
# configuration options only here
metadata = {key: self.config[key] for key in map_config_options if key in self.config}
# Then add the other configuration options
metadata["depth_band"] = band
metadata["depth_snr_threshold"] = self.config["snr_threshold"]
metadata["depth_snr_delta"] = self.config["snr_delta"]
metadata["mag_delta"] = self.config["mag_delta"]
metadata["min_depth"] = self.config["min_depth"]
metadata["max_depth"] = self.config["max_depth"]
metadata["mag_edges"] = depth_map_results["mag_edges"]
metadata.update(pixel_scheme.metadata)
# Write the output maps to the output file
with self.open_output("aux_ssi_maps", wrapper=True) as out:
for map_name, m in hsp_maps.items():
out.write_map(map_name, m, metadata)
out.file["maps"].attrs.update(metadata)
class TXPredictDensitySelectionFunction(TXBaseMaps):
"""
Model selection function across footprint using survey property maps.
Uses the selection measured in a subset of the footprint to model its dependence on
survey property maps, then uses that model to predict the selection function across
the remainder of the footprint. The model is fit via polynomial regression with
respect to the survey property maps, where the degree of the polynomial is specified
via the config.
TODO: Make it possible to select the degree of the fit for each SP map individually.
TODO: Add other options for the form of the model.
"""
name = "TXPredictDensitySelectionFunction"
dask_parallel = True
inputs = [
("aux_ssi_maps", MapsFile), # Measured selection function and uncertainties
("mask", MapsFile) # Mask defining survey geometry
]
outputs = [
("sel_func_pred_maps", MapsFile), # Model prediction of selection function and its uncertainties
("sel_func_model_info", HDFFile), # Best-fit params and their covariance matrix
]
config_options = {
"block_size": StageParameter(int, 0, msg="Block size for dask processing (0 means auto)."),
"systmaps_dir": StageParameter(str, "", msg="Directory containing systematic maps."),
"degree": StageParameter(int, 1, msg="Degree of the polynomial fit."),
"mask_thresh": StageParameter(float, 0.0, msg="Threshold for masking pixels at native resolution of mask."),
"mask_thresh_coarse": StageParameter(float, 0.0, msg="Threshold for masking pixels after mask is degraded."),
"inj_count_thresh": StageParameter(int, 1, msg="Exclude pixels containing fewer injections than this number."),
"sel_func_err_type": StageParameter(
str,
"none",
msg="Type of uncertainty attributed to measured selection function. Can be either 'none' (assumes no "
"uncertainty) or 'gaussian' (uses a Gaussian approximation to estimate uncertainties)."
),
"bins_to_model": StageParameter(
list,
[-1],
msg="List of depth bins for which the selection function is to be modelled. -1 means selection functions will be "
"modelled for every bin for which fracdet maps were created in TXAuxiliarySSIMaps."
),
**map_config_options
}
def run(self):
import glob
import healpy as hp
import healsparse as hsp
from functools import reduce
# Import dask and alias it as 'da'
_, da = import_dask()
# Assign imports to self to avoid repeated imports in other functions
self.da = da
# Retrieve configuration parameters
block_size = self.config["block_size"]
if block_size == 0:
block_size = "auto"
# Type of uncertainty estimate to use
err_type = self.config["sel_func_err_type"].lower()
pixel_scheme = choose_pixelization(**self.config)
# Load the map of the measured selection function and its uncertainties
with self.open_input("aux_ssi_maps", wrapper=True) as f:
sel_func_meas = f.read_map("depth_det_prob/det_frac_by_mag_thres")
ninj = f.read_map("depth_det_prob/inj_count_by_mag_thres")
# Ensure correct resolution (desired resolution can only be lower than
# or equal to the native resolution of the selection function map).
# "sum" reduction used for ninj since this is a counts map
sel_func_meas = sel_func_meas.degrade(
pixel_scheme.nside,
reduction="wmean",
weights=ninj
)
ninj = ninj.degrade(pixel_scheme.nside, reduction="sum")
with self.open_input("mask", wrapper=True) as f:
mask = f.read_mask(
thresh=self.config["mask_thresh"],
degrade_nside=pixel_scheme.nside
)
# Load survey property maps at the valid pixels
# TODO: Define these as individual inputs rather than glob search?
root = self.config["systmaps_dir"]
sysfiles = glob.glob(f"{root}*.hs")
nsys = len(sysfiles)
print(f"Found {nsys} total systematic maps")
spmaps = [
hsp.HealSparseMap.read(
sf,
degrade_nside=pixel_scheme.nside
)
for sf in sysfiles
]
# Identify valid pixels across mask and all SP maps
goodmap = hsp.operations.product_intersection([mask] + spmaps)
goodpix = goodmap.valid_pixels
# Training pixels must also be valid in the selection function
pix_train = np.intersect1d(sel_func_meas.valid_pixels, goodpix)
# Remove any pixels below specified coverage fraction after degrading
pix_train = pix_train[mask[pix_train] >= self.config["mask_thresh_coarse"]]
# Determine which samples are to have their selection functions modelled
if self.config["bins_to_model"] == [-1]:
samples = sel_func_meas.dtype.names
else:
samples = [f"bin{k}" for k in self.config["bins_to_model"]]
# Cycle through the samples for which selection functions are to be modelled
maps = {
"sel_func": [],
"err_sel_func": []
}
model_info = {
"alphas": [],
"cov_alphas": []
}
for k in samples:
print(f"Modelling selection function for sample {k}")
# Remove pixels with fewer than the specified no. of injections
pix_train_k = pix_train[ninj[k][pix_train] >= self.config["inj_count_thresh"]]
# Check there is at least one pixel which has survived all cuts
errmsg = f"Cannot model selection function for sample {k}; no valid pixels."
assert len(pix_train_k) > 0, errmsg
# Select training data
sel_func_train = da.from_array(sel_func_meas[k][pix_train_k], block_size)
block_size = sel_func_train.chunksize
ninj_train = da.from_array(ninj[k][pix_train_k], block_size)
spmaps_train = da.stack(
[
da.from_array(m[pix_train_k], block_size) for m in spmaps
]
).T
# Get uncertainties on training data
err_sel_func_train = self.sel_func_uncertainties(
sel_func_train,
err_type=err_type,
ninj=(None if err_type == "none" else ninj_train)
)
# Get survey property values across remainder of footprint
pix_pred = np.setdiff1d(goodpix, pix_train_k)
spmaps_pred = da.stack(
[
da.from_array(m[pix_pred], block_size) for m in spmaps
]
).T
# Perform polynomial regression and retrieve the following:
# - mean prediction on the selection function (in each pixel)
# - 1-sigma uncertainty on these predictions (in each pixel)
# - best-fit coefficients for each survey property (+ an intercept term)
# - covariance matrix for the best-fit parameters
sel_func_pred, err_sel_func_pred, alphas, cov_alphas = self.polynomial_model(
spmaps_train,
sel_func_train,
err_sel_func_train,
spmaps_pred,
err_type
)
# Combine training data and predictions into single HealSparse maps
pix_all = np.concatenate([pix_train_k, pix_pred])
sel_func_full = da.concatenate([sel_func_train, sel_func_pred])
err_sel_func_full = da.concatenate([err_sel_func_train, err_sel_func_pred])
# Sort by pixel number (will make saving to HealSparse format easier)
inds = pix_all.argsort()
chunk_sizes = sel_func_full.chunks[0]
chunk_bounds = np.cumsum([0] + list(chunk_sizes))
sel_func_full = self.dask_sort(sel_func_full, inds, chunk_bounds)
err_sel_func_full = self.dask_sort(err_sel_func_full, inds, chunk_bounds)
maps["sel_func"].append(sel_func_full)
maps["err_sel_func"].append(err_sel_func_full)
# Store model info
model_info["alphas"].append(alphas)
model_info["cov_alphas"].append(cov_alphas)
(maps,) = da.compute(maps)
# Convert maps to healsparse map objects
hsp_maps = {}
dtypes = [(k, float) for k in samples]
for n, m in maps.items():
# Both sel_func and err_sel_func are 2D; save as recarrays
primary = "bin0"
m_hsp = hsp.HealSparseMap.make_empty(
pixel_scheme.nside_coverage,
pixel_scheme.nside,
dtypes,
primary=primary
)
m_hsp.update_values_pix(
goodpix,
np.zeros(len(goodpix), dtype=dtypes)
)
for i,k in enumerate(samples):
m_hsp[k][goodpix] = m[i]
hsp_maps[n] = m_hsp
# Prepare metadata for the maps. Copy the pixelization-related
# configuration options only here
metadata = {key: self.config[key] for key in map_config_options if key in self.config}
# Then add the other configuration options
metadata.update(pixel_scheme.metadata)
# Write the output maps to the output file
with self.open_output("sel_func_pred_maps", wrapper=True) as out:
for map_name, hsp_map in hsp_maps.items():
out.write_map(map_name, hsp_map, metadata)
out.file["maps"].attrs.update(metadata)
# Prepare metadata for model parameter info
params_metadata = {
"model": f"polynomial (deg = {self.config['degree']})",
"inputs": [
sf.split('/')[-1] for sf in sysfiles
]
}
# Write the model parameters and covariances to the output file
with self.open_output("sel_func_model_info", wrapper=True) as out:
mi = out.file.create_group("model_info")
out.file["model_info"].attrs.update(params_metadata)
gp_alphas = mi.create_group("alphas")
gp_cov = mi.create_group("cov_alphas")
for i,k in enumerate(samples):
gp_alphas.create_dataset(k, data=model_info["alphas"][i])
gp_cov.create_dataset(k, data=model_info["cov_alphas"][i])
def sel_func_uncertainties(self, sel_func, err_type="none", ninj=None):
"""
Estimates uncertainties on the measured selection function.
Parameters
----------
sel_func: dask.array, shape=(N,)
Selection function computed in TXAuxiliarySSIMaps, evaluated at pixels
which are valid in the N pixels which have at least the requisite
number of injections and are valid in all survey property maps.
err_type: str, optional (default="none")
Determines the treatment of uncertainties on the measured selection
function. Currently two values for err_type are accommodated (case-independent):
- "none": selection function is treated as exact; zero uncertainty
- "gaussian": uncertainties are calculated under a Gaussian approx.
TODO: add other options, e.g. Wilson score interval.
ninj: dask.array (shape=N,) or None; optional (default=None)
Total injected counts in each of the pixels covered by sel_func. Only required
if err_type=="gaussian".
Returns
-------
dask.array, shape=(N,)
Estimates of the uncertainty on the measured selection function in each pixel.
"""
if err_type == "none":
return self.da.zeros_like(sel_func)
elif err_type == "gaussian":
return self.da.sqrt(sel_func * (1 - sel_func) / ninj)
else:
raise ValueError(
"err_type must be either 'none' or 'gaussian'."
)
def sel_func_weights(self, err_type="none", err_sel_func=None):
"""
Converts selection function uncertainties into weights for modelling.
Parameters
----------
err_type: str, optional (default="none")
Determines the treatment of uncertainties on the measured selection
function. Currently two values for err_type are accommodated (case-independent):
- "none": selection function is treated as exact; zero uncertainty
- "gaussian": uncertainties are calculated under a Gaussian approx.
err_sel_func: dask.array, shape=(N,) or None; optional (default=None)
Uncertainty on the measured selection function in the N pixels which have at
least the requisite number of injections and are valid in all survey property
maps. Only required if err_type=="gaussian".
Returns
-------
dask.array, shape=(N,)
Weights corresponding to the uncertainty on the measured selection function in
each pixel.
"""
if err_type == "none":
# Return ones if uncertanties are treated as being zero
return self.da.ones(len(err_sel_func))
else:
# Return inverse of variance
return 1 / (self.da.power(err_sel_func, 2))
def polynomial_model(self, X_train, y_train, yerr_train, X_pred, err_type="none"):
"""
Polynomial regression and model predictions for test data.
Performs a polynomial regression of y with respect to X_train, then
generates predictions at X_pred. In this context, X is an array
containing survey property maps in certain pixels, and y is
the selection function measured in those pixels.
Parameters
----------
X_train: dask.array, shape=(N_train, N_sys)
Survey property maps evaluated at the N_train pixels that have been selected
for use as training data for the model.
y_train: dask.array, shape=(N_train,)
Measured selection function in the N_train training data pixels.
yerr_train: dask.array, shape=(N_train,)
Uncertainties on the measured selection function in the N_train training data
pixels.
X_pred: dask.array, shape=(N_pred, N_sys)
Survey property maps evaluated at all pixels in the survey footprint (as defined
by the mask) not included in the training set, which are also valid across all
survey property maps. I.e. if the footprint comprises N_tot of these valid pixels,
then N_pred = N_tot - N_train.
err_type: str, optional (default="none")
Determines the treatment of uncertainties on the measured selection
function. Currently two values for err_type are accommodated (case-independent):
- "none": selection function is treated as exact; zero uncertainty
- "gaussian": uncertainties are calculated under a Gaussian approx.
Returns
-------
y_pred: dask.array, shape=(N_pred,)
Model predictions for the selection function in the N_pred pixels at which the
survey property maps were evaluated to construct X_pred.
yerr_pred: dask.array, shape=(N_pred,)
Uncertainties on the model preditions for the selection function.
alphas: numpy.array, shape=(deg*N_sys+1,)
Best-fit parameters from the model. Shape will depend on the degree of the
polynomial (deg) specified in the config file. The first parameter is the bias
term; the next N_sys parameters correspond to each survey property map. If
deg >= 2, then the following N_sys parameters correspond to the squares of the
survey property maps, etc. The indices of the parameters corresponding to
the i'th survey property, its square, its cube, etc., can be obtained via:
`inds = [1 + i + d * N_sys for d in range(deg)].`
The names of the survey property maps are saved in the correct order as part of
the outputs of this stage so that they can be associated with their corresponding
parameters.
cov_alphas: numpy.array, shape=(deg*N_sys+1, deg*N_sys+1)
Covariance matrix of the best-fit parameters.
"""
da = self.da
# Degree of polynomial fit
deg = self.config["degree"]
# Weights for each training pixel
W = self.sel_func_weights(err_type, yerr_train)
# If Gaussian approx was used for uncertainty estimation, pixels in
# which the selection function is 0 or 1 will have zero uncertainty
# (and thus infinite weight); remove these pixels here.
(keep,) = da.compute(da.isfinite(W))
X_train = X_train[keep, :]
y_train = y_train[keep]
W = W[keep]
# Construct inputs matrix for regression
m = len(X_train[:, 0])
ones = da.ones((m, 1), chunks=(X_train.chunks[0], 1))
X = da.hstack([ones, X_train])
for n in range(2, deg + 1):
X = da.hstack([X, da.power(X_train, n)])
# Explicitly compute necessary array combinations
# NOTE: this assumes a diagonal covariance matrix for y,
# i.e. cov_y = diag(W)
(XtWX,) = da.compute(X.T @ (W[:, None] * X))
(XtWy,) = da.compute(X.T @ (W * y_train))
# Compute best-fit coeffs (alphas) and their covariance
alphas = np.linalg.solve(XtWX, XtWy)
cov_alphas = np.linalg.inv(XtWX)
# Now make predictions at X_pred
m = len(X_pred[:,0])
ones = da.ones((m, 1), chunks=(X_pred.chunks[0], 1))
X = da.hstack([ones, X_pred])
for n in range(2, deg + 1):
X = da.hstack([X, da.power(X_pred, n)])
y_pred = alphas @ X.T
# The following is a cheaper equivalent of doing
# `sqrt(diag(X @ cov_alphas @ X.T))`
yerr_pred = (X @ cov_alphas * X).sum(axis=1)
return y_pred, yerr_pred, alphas, cov_alphas
def dask_sort(self, X, inds, chunk_boundaries):
"""
Reorders a dask array using `inds`, without creating more chunks.
This is motivated by wanting to order certain dask arrays by their corresponding
HEALPix pixel indices, which can result in dask creating many more chunks and
slowing down subsequent computations. This function sorts the pixel indices by chunk,
and then gathers from the target array one chunk at a time.
Parameters
----------
X: dask.array, shape=(N,)
Dask array to be sorted according to `inds`.
inds: numpy.array, shape=(N,)
Array containing the indices of X in order of ascending HEALPix pixel ID.
chunk_boundaries: numpy.array, shape=(ceil(N/m)+1,)
Boundaries of each chunk of size m, including the left edge and excluding the right.
E.g. The first chunk will have boundaries [0, m), the next will have [m, 2*m), etc.
Returns
-------
X_sorted: dask.array, shape=(N,)
Sorted version of the input array
"""
n_chunks = len(chunk_boundaries) - 1
# Assign each index in `inds` to a chunk
chunk_ids = np.searchsorted(chunk_boundaries[1:], inds, side='right')
# Sort inds by chunk, preserving the mapping back to output positions
order = np.argsort(chunk_ids, kind='stable')
inds_sorted = inds[order]
# Gather from X one chunk at a time
X_sorted = self.da.empty(len(inds), dtype=X.dtype, chunks=X.chunks)
for c in range(n_chunks):
mask = chunk_ids[order] == c
if not mask.any():
continue
local_inds = inds_sorted[mask] - chunk_boundaries[c]
chunk_data = X[chunk_boundaries[c]:chunk_boundaries[c+1]]
X_sorted[order[mask]] = chunk_data[local_inds]
return X_sorted