Source code for txpipe.lens_selector

from .base_stage import PipelineStage
from .data_types import (
    YamlFile,
    TomographyCatalog,
    HDFFile,
    TextFile,
    FiducialCosmology,
    PhotometryCatalog,
    FitsFile,
    MapsFile,
)
from .utils import LensNumberDensityStats, Splitter, rename_iterated
from .binning import build_tomographic_classifier, apply_classifier, read_training_data
import numpy as np
import warnings
from ceci.config import StageParameter


[docs] class TXBaseLensSelector(PipelineStage): """ Base class for lens object selection, using the BOSS Target Selection. Subclasses of this pipeline stage select objects to be used as the lens sample for the galaxy clustering and shear-position calibrations. The cut used here is simplistic and should be replaced. """ name = "TXBaseLensSelector" outputs = [ ("lens_tomography_catalog_unweighted", TomographyCatalog), ] config_options = { "verbose": StageParameter(bool, False, msg="Enable verbose output for lens selection."), "chunk_rows": StageParameter(int, 10000, msg="Number of rows to process in each chunk."), "lens_zbin_edges": StageParameter(list, [float], msg="Edges of lens redshift bins."), # Mag cuts # Default photometry cuts based on the BOSS Galaxy Target Selection: # http://www.sdss3.org/dr9/algorithms/boss_galaxy_ts.php "cperp_cut": StageParameter(float, 0.2, msg="cperp cut for BOSS selection."), "r_cpar_cut": StageParameter(float, 13.5, msg="r_cpar cut for BOSS selection."), "r_lo_cut": StageParameter(float, 16.0, msg="Lower r-band magnitude cut."), "r_hi_cut": StageParameter(float, 19.6, msg="Upper r-band magnitude cut."), "i_lo_cut": StageParameter(float, 17.5, msg="Lower i-band magnitude cut."), "i_hi_cut": StageParameter(float, 19.9, msg="Upper i-band magnitude cut."), "r_i_cut": StageParameter(float, 2.0, msg="r-i color cut."), "random_seed": StageParameter(int, 42, msg="Random seed for reproducibility."), "selection_type": StageParameter(str, "boss", msg="Type of lens selection (e.g., boss)."), "maglim_band": StageParameter(str, "i", msg="Band for magnitude limit."), "maglim_limit": StageParameter(float, 24.1, msg="Magnitude limit value."), "extra_cols": StageParameter(list, [""], msg="Extra columns to include in output."), "apply_mask": StageParameter(bool, False, msg="Whether to apply a mask to the selection."), } def run(self): """ Run the analysis for this stage. - Collect the list of columns to read - Create iterators to read chunks of those columns - Loop through chunks: - select objects for each bin - write them out - accumulate selection bias values - Average the selection biases - Write out biases and close the output """ import astropy.table import sklearn.ensemble if self.name == "TXBaseLensSelector": raise ValueError("Do not run TXBaseLensSelector - run a sub-class") # Suppress some warnings from numpy that are not relevant original_warning_settings = np.seterr(all="ignore") # The output file we will put the tomographic # information into output_file = self.setup_output() iterator = self.data_iterator() selector = self.prepare_selector() # If we are going to remove lens galaxies that sit outside the mask, load the mask if self.config["apply_mask"]: with self.open_input("mask", wrapper=True) as f: self.mask, self.mask_nside = f.read_healsparse("mask", return_all=True) # We will collect the selection biases for each bin # as a matrix. We will collect together the different # matrices for each chunk and do a weighted average at the end. nbin_lens = len(self.config["lens_zbin_edges"]) - 1 number_density_stats = LensNumberDensityStats(nbin_lens, self.comm) # Loop through the input data, processing it chunk by chunk for start, end, phot_data in iterator: print(f"Process {self.rank} running selection for rows {start:,}-{end:,}") pz_data = self.apply_redshift_cut(phot_data, selector) # Select lens bin objects lens_gals = self.select_lens(phot_data) # Combine this selection with size and snr cuts to produce a source selection # and calculate the shear bias it would generate tomo_bin, counts = self.calculate_tomography(pz_data, phot_data, lens_gals) # Save the tomography for this chunk self.write_tomography(output_file, start, end, tomo_bin) # Accumulate information on the number counts and the selection biases. # These will be brought together at the end. number_density_stats.add_data(tomo_bin) # Do the selection bias averaging and output that too. self.write_global_values(output_file, number_density_stats) # Save and complete output_file.close() # Restore the original warning settings in case we are being called from a library np.seterr(**original_warning_settings) def prepare_selector(self): return None def apply_redshift_cut(self, phot_data, _): pz_data = {} nbin = len(self.config["lens_zbin_edges"]) - 1 z = phot_data[f"z"] ntot = len(z) zbin = np.repeat(-1, ntot) nsel = 0 nfinite = np.isfinite(z).sum() zmean = np.nanmean(z) for zi in range(nbin): mask_zbin = (z >= self.config["lens_zbin_edges"][zi]) & (z < self.config["lens_zbin_edges"][zi + 1]) nsel += mask_zbin.sum() zbin[mask_zbin] = zi print( f"Rank {self.rank} found {nsel} / {ntot} galaxies within the selection redshift range, in any bin. {nfinite} galaxies had finite z. Mean z of these was {zmean}" ) pz_data[f"zbin"] = zbin return pz_data def setup_output(self): """ Set up the output data file. Creates the data sets and groups to put module output in the tomography_catalog output file. """ n = self.open_input("photometry_catalog")["photometry/ra"].size nbin_lens = len(self.config["lens_zbin_edges"]) - 1 output = self.open_output("lens_tomography_catalog_unweighted", parallel=True, wrapper=True) outfile = output.file group = outfile.create_group("tomography") output.write_zbins(self.config["lens_zbin_edges"]) group.create_dataset("bin", (n,), dtype="i") group.create_dataset("lens_weight", (n,), dtype="f") group["class_id"] = group["bin"] # hard link to bin column. needed for RAIL masked summarizer stages group_counts = outfile.create_group("counts") group_counts.create_dataset("counts", (nbin_lens,), dtype="i") group_counts.create_dataset("counts_2d", (1,), dtype="i") group.attrs["nbin"] = nbin_lens group.attrs[f"zbin_edges"] = self.config["lens_zbin_edges"] return outfile def write_tomography(self, outfile, start, end, lens_bin): """ Write out a chunk of tomography and response. Parameters ---------- outfile: h5py.File start: int The index into the output this chunk starts at end: int The index into the output this chunk ends at tomo_bin: array of shape (nrow,) The bin index for each output object R: array of shape (nrow,2,2) Multiplicative bias calibration factor for each object """ group = outfile["tomography"] group["bin"][start:end] = lens_bin group["lens_weight"][start:end] = 1.0 def write_global_values(self, outfile, number_density_stats): """ Write out overall selection biases Parameters ---------- outfile: h5py.File """ lens_counts, lens_counts_2d = number_density_stats.collect() if self.rank == 0: group = outfile["counts"] group["counts"][:] = lens_counts group["counts_2d"][:] = lens_counts_2d print("Final bin counts:") for i, b in enumerate(lens_counts): print(f"Bin {i}: {b:,}") print("Total: ", lens_counts_2d) def select_lens(self, phot_data): t = self.config["selection_type"] if t == "boss": s = self.select_lens_boss(phot_data) elif t == "maglim": s = self.select_lens_maglim(phot_data) elif t == "DESmaglim": s = self.select_lens_DESmaglim(phot_data) else: raise ValueError(f"Unknown lens selection type {t} - expected boss or maglim") # select only galaxies that are in the footprint s *= self.select_in_footprint(phot_data) ntot = s.size nsel = s.sum() print(f"Rank {self.rank} selected {nsel} objects out of {ntot} as potential lenses with method {t}") return s def select_lens_boss(self, phot_data): """Photometry cuts based on the BOSS Galaxy Target Selection: http://www.sdss3.org/dr9/algorithms/boss_galaxy_ts.php """ mag_i = phot_data["mag_i"] mag_r = phot_data["mag_r"] mag_g = phot_data["mag_g"] # Mag cuts cperp_cut_val = self.config["cperp_cut"] r_cpar_cut_val = self.config["r_cpar_cut"] r_lo_cut_val = self.config["r_lo_cut"] r_hi_cut_val = self.config["r_hi_cut"] i_lo_cut_val = self.config["i_lo_cut"] i_hi_cut_val = self.config["i_hi_cut"] r_i_cut_val = self.config["r_i_cut"] n = len(mag_i) # HDF does not support bools, so we will prepare a binary array # where 0 is a lens and 1 is not lens_gals = np.repeat(0, n) cpar = 0.7 * (mag_g - mag_r) + 1.2 * ((mag_r - mag_i) - 0.18) cperp = (mag_r - mag_i) - ((mag_g - mag_r) / 4.0) - 0.18 dperp = (mag_r - mag_i) - ((mag_g - mag_r) / 8.0) # LOWZ cperp_cut = np.abs(cperp) < cperp_cut_val # 0.2 r_cpar_cut = mag_r < r_cpar_cut_val + cpar / 0.3 r_lo_cut = mag_r > r_lo_cut_val # 16.0 r_hi_cut = mag_r < r_hi_cut_val # 19.6 lowz_cut = (cperp_cut) & (r_cpar_cut) & (r_lo_cut) & (r_hi_cut) # CMASS i_lo_cut = mag_i > i_lo_cut_val # 17.5 i_hi_cut = mag_i < i_hi_cut_val # 19.9 r_i_cut = (mag_r - mag_i) < r_i_cut_val # 2.0 # dperp_cut = dperp > 0.55 # this cut did not return any sources... cmass_cut = (i_lo_cut) & (i_hi_cut) & (r_i_cut) # If a galaxy is a lens under either LOWZ or CMASS give it a zero lens_mask = lowz_cut | cmass_cut lens_gals[lens_mask] = 1 return lens_gals def select_lens_maglim(self, phot_data): band = self.config["maglim_band"] limit = self.config["maglim_limit"] mag_i = phot_data[f"mag_{band}"] s = (mag_i < limit).astype(np.int8) return s def select_lens_DESmaglim(self, phot_data): band = self.config["maglim_band"] bright_limit = self.config["bright_limit"] # mag < a*zphot + b a = self.config["a"] b = self.config["b"] z = phot_data["z"] mag_i = phot_data[f"mag_{band}"] sg = phot_data["EXTENDED_CLASS_SOF"] flags = phot_data["FLAGS_GOLD"] cut1 = mag_i > bright_limit # cut very bright gaalxies cut2 = mag_i < a * z + b # the z-dependant mag cut cut3 = sg == 3 # star/galaxy separator # flags, selects objects flagged by bits 2^1, 2^2, 2^3, 2^4, 2^5, 2^6 but not 2^7 # from Rodriguez-Monroy et al cut4 = np.bitwise_and(flags, 126) == 0 s = (cut1 & cut2 & cut3 & cut4).astype(np.int8) return s def select_in_footprint(self, phot_data): """ Remove objects that fall outside of the mask """ import healpy as hp if self.config["apply_mask"]: assert "mask" in self.input_tags(), ( "If apply_mask if True you must use a sub-class that uses 'mask' as an input" ) # with self.open_input('mask', wrapper=True) as f: # mask, nside= f.read_healsparse('mask', return_all=True) pix = hp.ang2pix(self.mask_nside, phot_data["ra"], phot_data["dec"], lonlat=True) s = np.where(self.mask[pix] == hp.UNSEEN, 0, 1) print(f"{len(s) - np.sum(s)}/{len(s)} objects removed because they are outside the mask") return s else: return 1 def calculate_tomography(self, pz_data, phot_data, lens_gals): nbin = len(self.config["lens_zbin_edges"]) - 1 n = len(phot_data["mag_i"]) # The main output data - the tomographic # bin index for each object, or -1 for no bin. tomo_bin = np.repeat(-1, n) # We also keep count of total count of objects in each bin counts = np.zeros(nbin, dtype=int) for i in range(nbin): sel_00 = (pz_data["zbin"] == i) & (lens_gals == 1) tomo_bin[sel_00] = i counts[i] = sel_00.sum() return tomo_bin, counts
[docs] class TXTruthLensSelector(TXBaseLensSelector): """ Select lens objects based on true redshifts and BOSS criteria This is useful for testing with idealised lens bins. """ name = "TXTruthLensSelector" inputs = [ ("photometry_catalog", PhotometryCatalog), ] def data_iterator(self): print(f"We are cheating and using the true redshift.") chunk_rows = self.config["chunk_rows"] phot_cols = ["mag_i", "mag_r", "mag_g", "redshift_true"] extra_cols = [c for c in self.config["extra_cols"] if c] phot_cols += extra_cols # Input data. These are iterators - they lazily load chunks # of the data one by one later when we do the for loop. # This code can be run in parallel, and different processes will # each get different chunks of the data for s, e, data in self.iterate_hdf("photometry_catalog", "photometry", phot_cols, chunk_rows): data["z"] = data["redshift_true"] yield s, e, data
[docs] class TXMeanLensSelector(TXBaseLensSelector): """ Select lens objects based on mean redshifts and BOSS criteria This requires PDFs to have been estimated earlier. """ name = "TXMeanLensSelector" inputs = [ ("photometry_catalog", PhotometryCatalog), ("lens_photoz_pdfs", HDFFile), ] def data_iterator(self): chunk_rows = self.config["chunk_rows"] phot_cols = ["mag_i", "mag_r", "mag_g"] z_cols = ["zmean"] rename = {"zmean": "z"} extra_cols = [c for c in self.config["extra_cols"] if c] phot_cols += extra_cols it = self.combined_iterators( chunk_rows, "photometry_catalog", "photometry", phot_cols, "lens_photoz_pdfs", "ancil", z_cols, ) return rename_iterated(it, rename)
[docs] class TXModeLensSelector(TXBaseLensSelector): """ Select lens objects based on best-fit redshifts and BOSS criteria This requires PDFs to have been estimated earlier. """ name = "TXModeLensSelector" inputs = [ ("photometry_catalog", PhotometryCatalog), ("lens_photoz_pdfs", HDFFile), ] def data_iterator(self): chunk_rows = self.config["chunk_rows"] phot_cols = ["mag_i", "mag_r", "mag_g"] z_cols = ["zmode"] rename = {"zmode": "z"} extra_cols = [c for c in self.config["extra_cols"] if c] phot_cols += extra_cols it = self.combined_iterators( chunk_rows, "photometry_catalog", "photometry", phot_cols, "lens_photoz_pdfs", "ancil", z_cols, ) return rename_iterated(it, rename)
class TXCustomLensSelector(TXBaseLensSelector): """ Select lens objects based on best-fit redshifts and BOSS criteria This requires PDFs to have been estimated earlier. """ name = "TXCustomLensSelector" inputs = [ ("photometry_catalog", PhotometryCatalog), ("lens_photoz_pdfs", HDFFile), ("mask", MapsFile), ] def data_iterator(self): chunk_rows = self.config["chunk_rows"] phot_cols = ["mag_i", "mag_r", "mag_g", "ra", "dec"] z_cols = [self.config["zcol"]] rename = {self.config["zcol"]: "z"} extra_cols = [c for c in self.config["extra_cols"] if c] phot_cols += extra_cols it = self.combined_iterators( chunk_rows, "photometry_catalog", "photometry", phot_cols, "lens_photoz_pdfs", "ancil", z_cols, ) return rename_iterated(it, rename)
[docs] class TXRandomForestLensSelector(TXBaseLensSelector): name = "TXRandomForestLensSelector" inputs = [ ("photometry_catalog", PhotometryCatalog), ("spectroscopic_catalog", HDFFile), ] config_options = TXBaseLensSelector.config_options.copy() config_options.update( { "verbose": StageParameter(bool, False, msg="Enable verbose output for lens selection."), "bands": StageParameter(str, "ugrizy", msg="Photometric bands to use for classification."), "chunk_rows": StageParameter(int, 10000, msg="Number of rows to process in each chunk."), "lens_zbin_edges": StageParameter(list, required=True, msg="Edges of lens redshift bins."), "random_seed": StageParameter(int, 42, msg="Random seed for reproducibility."), "spec_mag_column_format": StageParameter( str, "photometry/{band}", msg="Format string for spectroscopic magnitude columns." ), "spec_redshift_column": StageParameter( str, "photometry/redshift", msg="Column name for spectroscopic redshift." ), } ) def data_iterator(self): chunk_rows = self.config["chunk_rows"] phot_cols = ["mag_u", "mag_g", "mag_r", "mag_i", "mag_z", "mag_y"] extra_cols = [c for c in self.config["extra_cols"] if c] phot_cols += extra_cols for s, e, data in self.iterate_hdf("photometry_catalog", "photometry", phot_cols, chunk_rows): yield s, e, data def prepare_selector(self): with self.open_input("spectroscopic_catalog") as spec_file: training_data = read_training_data( spec_file, self.config["bands"], self.config["spec_mag_column_format"], self.config["spec_redshift_column"], ) return build_tomographic_classifier( self.config["bands"], training_data, self.config["lens_zbin_edges"], self.config["random_seed"], self.comm, ) def apply_redshift_cut(self, phot_data, selector): classifier, features = selector shear_catalog_type = "not applicable" bands = self.config["bands"] pz_data = apply_classifier(classifier, features, bands, shear_catalog_type, phot_data) return pz_data
[docs] class TXLensCatalogSplitter(PipelineStage): """ Split a lens catalog file into a new file with separate bins Splitting up like this helps reduce memory usage in TreeCorr later """ name = "TXLensCatalogSplitter" inputs = [ ("lens_tomography_catalog_unweighted", TomographyCatalog), ("photometry_catalog", PhotometryCatalog), ("fiducial_cosmology", FiducialCosmology), ("lens_photoz_pdfs", HDFFile), ] outputs = [ ("binned_lens_catalog_unweighted", HDFFile), ] config_options = { "initial_size": StageParameter(int, 100_000, msg="Initial size of the output catalog."), "chunk_rows": StageParameter(int, 100_000, msg="Number of rows to process in each chunk."), "extra_cols": StageParameter(list, [""], msg="Extra columns to include in output."), "redshift_column": StageParameter(str, "zmean", msg="Name of the redshift column to use."), } def get_lens_tomo_name(self): # can overwrite this in a weighted subclass return "lens_tomography_catalog_unweighted" def get_binned_lens_name(self): # can overwrite this in a weighted subclass return "binned_lens_catalog_unweighted" def get_bands(self): with self.open_input("photometry_catalog", wrapper=True) as f: bands = f.get_bands() return bands def run(self): with self.open_input(self.get_lens_tomo_name()) as f: nbin = f["tomography"].attrs["nbin"] counts = f["counts/counts"][:] count2d = f["counts/counts_2d"][:] # We also copy over the magnitudes and their errors to tbe new # per-bin catalogs. This makes it easier to run photo-z with RAIL # on each of the bins. So here we add those columns to our list bands = self.get_bands() if self.rank == 0: print(f"Copying photometry bands {bands} to sub-catalogs") band_cols = [f"mag_{b}" for b in bands] band_cols += [f"mag_err_{b}" for b in bands] self.config["extra_cols"] += band_cols # For some reason we briefly ended up with None or empty columns # in the configuration. extra_cols = [c for c in self.config["extra_cols"] if c] # Regular columns. cols = ["ra", "dec", "weight", "comoving_distance"] # Object we use to make the separate lens bins catalog cat_output = self.open_output(self.get_binned_lens_name(), parallel=True) cat_group = cat_output.create_group("lens") cat_group.attrs["nbin"] = len(counts) cat_group.attrs["nbin_lens"] = len(counts) bins = {b: c for b, c in enumerate(counts)} bins["all"] = count2d dtypes = {"id": "i8", "flags": "i8"} splitter = Splitter(cat_group, "bin", cols + extra_cols, bins, dtypes=dtypes) my_bins = list(self.split_tasks_by_rank(bins)) if my_bins: my_bins_text = ", ".join(str(x) for x in my_bins) print(f"Process {self.rank} collating bins: [{my_bins_text}]") else: print(f"Note: Process {self.rank} will not do anything.") for s, e, data in self.data_iterator(): if self.rank == 0: print(f"Process 0 binning data in range {s:,} - {e:,}") data["weight"] = data["lens_weight"] for b in my_bins: if b == "all": w = np.where(data["bin"] >= 0) else: w = np.where(data["bin"] == b) d = {name: col[w] for name, col in data.items()} splitter.write_bin(d, b) splitter.finish(my_bins) cat_output.close() def data_iterator(self): z_col = self.config["redshift_column"] extra_cols = [c for c in self.config["extra_cols"] if c] it = self.combined_iterators( self.config["chunk_rows"], # first file self.get_lens_tomo_name(), "tomography", ["bin", "lens_weight"], # second file "photometry_catalog", "photometry", ["ra", "dec"] + extra_cols, "lens_photoz_pdfs", "ancil", [z_col], parallel=False, ) return self.add_redshifts(it) def add_redshifts(self, iterator): import pyccl z_col = self.config["redshift_column"] # This iterates through chunks of the input catalogs, but for each # chunk it also intercepts and adds the radial distance. So this function # returns an iterator, just like combined_iterators does. with self.open_input("fiducial_cosmology", wrapper=True) as f: cosmo = f.to_ccl() for s, e, data in iterator: z = data[z_col] a = 1.0 / (1 + z) # if redshift estimate is negative set radial distance to 0 d = np.zeros(len(a)) d[z > 0] = pyccl.comoving_radial_distance(cosmo, a[z > 0]) data["comoving_distance"] = d yield s, e, data
[docs] class TXTruthLensCatalogSplitter(TXLensCatalogSplitter): """ Split a lens catalog file into a new file with separate bins with true redshifts. The redshifts are used to calculate the comoving distances. """ name = "TXTruthLensCatalogSplitter" inputs = [ ("lens_tomography_catalog_unweighted", TomographyCatalog), ("photometry_catalog", PhotometryCatalog), ("fiducial_cosmology", FiducialCosmology), ] config_options = TXLensCatalogSplitter.config_options.copy() config_options["redshift_column"] = StageParameter(str, "redshift_true", msg="Column name for true redshift.") def data_iterator(self): z_col = self.config["redshift_column"] extra_cols = [c for c in self.config["extra_cols"] if c] it = self.combined_iterators( self.config["chunk_rows"], # first file self.get_lens_tomo_name(), "tomography", ["bin", "lens_weight"], # second file "photometry_catalog", "photometry", ["ra", "dec", z_col] + extra_cols, parallel=False, ) return self.add_redshifts(it)
[docs] class TXExternalLensCatalogSplitter(TXLensCatalogSplitter): """ Split an external lens catalog into bins Implemented as a subclass of TXLensCatalogSplitter, and changes only file names. """ config_options = TXLensCatalogSplitter.config_options.copy() config_options["redshift_column"] = StageParameter(str, "redshift", msg="Column name for redshift.") name = "TXExternalLensCatalogSplitter" inputs = [ ("lens_tomography_catalog_unweighted", TomographyCatalog), ("lens_catalog", HDFFile), ("fiducial_cosmology", FiducialCosmology), ] def get_bands(self): with self.open_input("lens_catalog", wrapper=True) as f: bands = f.file["lens"].attrs["bands"] return bands def data_iterator(self): z_col = self.config["redshift_column"] extra_cols = [c for c in self.config["extra_cols"] if c and c != "comoving_distance"] iterator = self.combined_iterators( self.config["chunk_rows"], # first file self.get_lens_tomo_name(), "tomography", ["bin", "lens_weight"], # second file "lens_catalog", "lens", ["ra", "dec", z_col] + extra_cols, parallel=False, ) return self.add_redshifts(iterator)
[docs] class TXTruthLensCatalogSplitterWeighted(TXTruthLensCatalogSplitter): """ Split a lens catalog file into a new file with separate bins with true redshifts. The redshifts are used to calculate the comoving distances. """ name = "TXTruthLensCatalogSplitterWeighted" inputs = [ ("lens_tomography_catalog", TomographyCatalog), ("photometry_catalog", PhotometryCatalog), ("fiducial_cosmology", FiducialCosmology), ] outputs = [ ("binned_lens_catalog", HDFFile), ] def get_lens_tomo_name(self): # can overwrite this in a weighted subclass return "lens_tomography_catalog" def get_binned_lens_name(self): # can overwrite this in a weighted subclass return "binned_lens_catalog" def data_iterator(self): z_col = self.config["redshift_column"] extra_cols = [c for c in self.config["extra_cols"] if c and c != "comoving_distance"] iterator = self.combined_iterators( self.config["chunk_rows"], # first file self.get_lens_tomo_name(), "tomography", ["bin", "lens_weight"], # second file "photometry_catalog", "photometry", ["ra", "dec", z_col] + extra_cols, parallel=False, ) return self.add_redshifts(iterator)
if __name__ == "__main__": PipelineStage.main()