Source code for txpipe.ingest.ssi

from ..base_stage import PipelineStage
from ..data_types import (
    HDFFile,
    FitsFile,
)
from ..utils import (
    nanojansky_to_mag_ab,
    nanojansky_err_to_mag_ab,
)
from .base import TXIngestCatalogBase, TXIngestCatalogFits
import numpy as np
import warnings
from ceci.config import StageParameter

# translate between GCR and TXpipe column names
column_names = {
    "coord_ra": "ra",
    "coord_dec": "dec",
}


[docs] class TXIngestSSIGCR(TXIngestCatalogBase): """ Ingest SSI catalogs using GCR Does not treat the injection or ssi photometry catalogs as formal inputs since they are not in a format TXPipe can recognize """ name = "TXIngestSSIGCR" parallel = False inputs = [] outputs = [ ("injection_catalog", HDFFile), ("ssi_photometry_catalog", HDFFile), ("ssi_uninjected_photometry_catalog", HDFFile), ] config_options = { "injection_catalog_name": StageParameter(str, "", msg="Catalog of objects manually injected."), "ssi_photometry_catalog_name": StageParameter( str, "", msg="Catalog of objects from real data with no injections." ), "ssi_uninjected_photometry_catalog_name": StageParameter( str, "", msg="Catalog of objects from real data with no injections." ), "GCRcatalog_path": StageParameter(str, "", msg="Path to GCRCatalogs for SSI runs."), "flux_name": StageParameter(str, "gaap3p0Flux", msg="Flux column name to use."), } def run(self): """ Run the analysis for this stage. Loads the catalogs using gcr and saves the relevent columns to a hdf5 format that TXPipe can read """ # This is needed to access the SSI runs currently on NERSC run by SRV team # As the final runs become more formalized, this could be removed if self.config["GCRcatalog_path"] != "": import sys sys.path.insert(0, self.config["GCRcatalog_path"]) import GCRCatalogs # attempt to ingest three types of catalog catalogs_labels = [ "injection_catalog", "ssi_photometry_catalog", "ssi_uninjected_photometry_catalog", ] for catalog_label in catalogs_labels: input_catalog_name = self.config[f"{catalog_label}_name"] if input_catalog_name == "": print(f"No catalog {catalog_label} name provided") continue output_file = self.open_output(catalog_label) group = output_file.create_group("photometry") gc0 = GCRCatalogs.load_catalog(input_catalog_name) native_quantities = gc0.list_all_native_quantities() # iterate over native quantities as some might be missing (or be nans) for q in native_quantities: try: qobj = gc0.get_quantities(q) # TO DO: load with iterator (currently returns full datatset) # GCRCatalogs.lsst_object.LSSTInjectedObjectTable iterator currently # returns full dataset if np.isnan(qobj[q]).all(): continue # skip the quantities that are empty group.create_dataset(q, data=qobj[q], dtype=qobj[q].dtype) if q in column_names: # also save with TXPipe names group[column_names[q]] = group[q] except KeyError: # skip quantities that are missing warnings.warn(f"quantity {q} was missing from the GCRCatalog object") continue except TypeError: warnings.warn(f"Quantity {q} coud not be saved as it has a data type not recognised by hdf5") # convert fluxes to mags using txpipe/utils/conversion.py bands = "ugrizy" flux_name = self.config["flux_name"] for b in bands: try: mag = nanojansky_to_mag_ab(group[f"{b}_{flux_name}"][:]) mag_err = nanojansky_err_to_mag_ab(group[f"{b}_{flux_name}"][:], group[f"{b}_{flux_name}Err"][:]) group.create_dataset(f"mag_{b}", data=mag) group.create_dataset(f"mag_err_{b}", data=mag_err) except KeyError: warnings.warn(f"no flux {b}_{flux_name} in SSI GCR catalog") output_file.close()
[docs] class TXMatchSSI(PipelineStage): """ Match an SSI injection catalog and a photometry catalog Default inputs are in TXPipe photometry catalog format Outpus a matched SSI catalog for further use """ name = "TXMatchSSI" # TO DO: switch inputs from TXPipe format to GCR inputs = [ ("injection_catalog", HDFFile), ("ssi_photometry_catalog", HDFFile), ] outputs = [ ("matched_ssi_photometry_catalog", HDFFile), ] config_options = { "chunk_rows": StageParameter(int, 100000, msg="Number of rows to process in each chunk."), "match_radius": StageParameter(float, 0.5, msg="Matching radius in arcseconds."), "magnification": StageParameter(int, 0, msg="Magnification label."), } def run(self): """ Run the analysis for this stage. """ # loop through chunks of the ssi photometry # catalog and match to the injections matched_cat = self.match_cats() def match_cats(self): """ Match the injected catalogs with astropy tools TO DO: check if this step can be replaced with LSST probabalistic matching https://github.com/lsst/meas_astrom/blob/main/python/lsst/meas/astrom/match_probabilistic_task.py """ import astropy.units as u from astropy.coordinates import SkyCoord # prep the catalogs for reading # read ALL the ra and dec for the injection catalog inj_cat = self.open_input("injection_catalog") inj_coord = SkyCoord( ra=inj_cat["photometry/ra"][:] * u.degree, dec=inj_cat["photometry/dec"][:] * u.degree, ) # loop over chunk of the photometry catalog phot_cat = self.open_input("ssi_photometry_catalog") nrows = phot_cat["photometry/ra"].shape[0] batch_size = self.config["chunk_rows"] n_chunk = int(np.ceil(nrows / batch_size)) max_n = nrows match_outfile = self.setup_output( "matched_ssi_photometry_catalog", "photometry", inj_cat["photometry"], phot_cat["photometry"], max_n, ) out_start = 0 for ichunk, (in_start, in_end, data) in enumerate( self.iterate_hdf("ssi_photometry_catalog", "photometry", ["ra", "dec"], batch_size) ): phot_coord = SkyCoord( ra=data["ra"] * u.degree, dec=data["dec"] * u.degree, ) idx, d2d, d3d = phot_coord.match_to_catalog_sky(inj_coord) select_matches = d2d.value <= self.config["match_radius"] / 60.0 / 60.0 nmatches = np.sum(select_matches) out_end = out_start + nmatches if nmatches != 0: print(out_start, out_end, nmatches) self.write_output( match_outfile, "photometry", inj_cat["photometry"], phot_cat["photometry"], idx, select_matches, ichunk, batch_size, out_start, out_end, ) out_start = out_end self.finalize_output(match_outfile, "photometry", out_end) def setup_output(self, tag, group, inj_group, phot_group, n): """ Prepare the hdf5 files """ import h5py f = self.open_output(tag) g = f.create_group(group) for name, col in phot_group.items(): g.create_dataset(name, shape=(n,), maxshape=n, dtype=col.dtype) for name, col in inj_group.items(): g.create_dataset("inj_" + name, shape=(n,), maxshape=n, dtype=col.dtype) g.attrs["magnification"] = self.config["magnification"] # TO DO: add aditional metadata from inputs return f def write_output( self, outfile, group, inj_group, phot_group, idx, select_matches, ichunk, batch_size, start, end, ): """ Write the matched catalog for a single chunk """ g = outfile[group] for name, col in phot_group.items(): g[name][start:end] = col[ichunk * batch_size : (ichunk + 1) * batch_size][select_matches] for name, col in inj_group.items(): g["inj_" + name][start:end] = col[idx][select_matches] def finalize_output(self, outfile, group, ntot): """ Remove the excess rows,,c lose file """ g = outfile[group] for name, col in g.items(): col.resize((ntot,)) outfile.close() return
[docs] class TXIngestSSIDESBalrog(TXIngestCatalogFits): """ Base-stage for ingesting a DES SSI catalog AKA "Balrog" """ name = "TXIngestSSIDESBalrog" def setup_output(self, output_name, column_names, dtypes, n): """ For balrog, we need to include if statements to catch the 2D data entries and add an extendedness column """ cols = list(column_names.keys()) output = self.open_output(output_name) g = output.create_group("photometry") for col in cols: dtype = dtypes[column_names[col]] if dtype.subdtype is not None: # this is a multi-dimentional column assert dtype.subdtype[1] == ( 4, ) # We are assuming this entry is a 2D array with 4 columns (corresponding to griz) dtype = dtype.subdtype[0] for b in "griz": g.create_dataset(col + f"_{b}", (n,), dtype=dtype) else: g.create_dataset(col, (n,), dtype=dtype) if col == "meas_EXTENDED_CLASS_SOF": # also create an "extendedness" column g.create_dataset("extendedness", (n,), dtype=dtype) return output, g def add_columns(self, g, input_name, column_names, chunk_rows, n): """ For balrog, we need to include if statements to catch the 2D data entries and possibly add a extendedness column """ cols = list(column_names.keys()) input_cols = np.unique(list(column_names.values())) for s, e, data in self.iterate_fits(input_name, 1, input_cols, chunk_rows): print(s, e, n) for col in cols: if len(data[column_names[col]].shape) == 2: assert data[column_names[col]].shape[1] == 4 for iband, b in enumerate("griz"): g[col + f"_{b}"][s:e] = data[column_names[col]][:, iband] else: g[col][s:e] = data[column_names[col]] if col == "extendedness": # The DES classifier "meas_EXTENDED_CLASS_SOF" is (0 or 1) for star, (2 or 3) for galaxy # The current TXPipe classifier "extendedness" is 0 for star, 1 for galaxy extendedness = np.where((data[column_names[col]] == 2) | (data[column_names[col]] == 3), 1, 0) g[col][s:e] = extendedness
[docs] class TXIngestSSIMatchedDESBalrog(TXIngestSSIDESBalrog): """ Ingest a matched "SSI" catalog from DES (AKA Balrog) """ name = "TXIngestSSIMatchedDESBalrog" inputs = [ ("balrog_matched_catalog", FitsFile), ] outputs = [ ("matched_ssi_photometry_catalog", HDFFile), ] def run(self): """ Run the analysis for this stage. """ print("Ingesting DES Balrog matched catalog") # we will only load a subset of columns to save space column_names = { "bal_id": "bal_id", # Unique identifier for object (created during balrog process) "inj_mag": "true_bdf_mag_deredden", # Magnitude of the original deep field object, dereddened "inj_id": "true_id", # Original coadd_obj_id of deep field object "id": "meas_id", # Coadd_object_id of injection "ra": "meas_ra", # measured RA of the injection "dec": "meas_dec", # measured DEC of the injection "mag": "meas_cm_mag_deredden", # measured magnitude of the injection "snr": "meas_cm_max_flux_s2n", # measured S2N of the injection "cm_T": "meas_cm_T", # measured size parameter T (x^2+y^2) "EXTENDED_CLASS_SOF": "meas_EXTENDED_CLASS_SOF", # Star galaxy classifier (0,1=star, 2,3=Galaxy) "FLAGS_GOLD": "meas_FLAGS_GOLD_SOF_ONLY", # Measured flags (short version) } dummy_columns = { "redshift_true": 10.0, "mag_err_g": -99.0, "mag_err_r": -99.0, "mag_err_i": -99.0, "mag_err_z": -99.0, } self.process_catalog( "balrog_matched_catalog", "matched_ssi_photometry_catalog", column_names, dummy_columns, )
[docs] class TXIngestSSIDetectionDESBalrog(TXIngestSSIDESBalrog): """ Ingest an "SSI" "detection" catalog from DES (AKA Balrog) """ name = "TXIngestSSIDetectionDESBalrog" inputs = [ ("balrog_detection_catalog", FitsFile), ] outputs = [ ("injection_catalog", HDFFile), ("ssi_detection_catalog", HDFFile), ] def run(self): """ We will split the Balrog "detection" catalog into two catalogs One containing the injected catalog The other contains info on whether a given injection has been detected """ ## Extract the injection catalog column_names_inj = { "bal_id": "bal_id", # Unique identifier for object (created during balrog process) "ra": "true_ra", # *injected* ra "dec": "true_dec", # *injected* dec "inj_mag": "true_bdf_mag_deredden", # true magnitude (de-reddened) "flags": "meas_FLAGS_GOLD_SOF_ONLY", # measured data flags "meas_EXTENDED_CLASS_SOF": "meas_EXTENDED_CLASS_SOF", # star galaxy separator } self.process_catalog("balrog_detection_catalog", "injection_catalog", column_names_inj, {}) # Extract the "detection" file # We will only load a subset of columns to save space column_names_det = { "bal_id": "bal_id", # Unique identifier for object (created during balrog process) "detected": "detected", # 0 or 1. Is there a match between this object in injection_catalog and the ssi_photometry_catalog (search radius of 0.5'') "match_flag_0.5_asec": "match_flag_0.5_asec", # 0,1 or 2. Is there a match between this object in injection_catalog and the ssi_uninjected_photometry_catalog (search radius 0.5''). 1 if detection is lower flux than injection, 2 if brighter "match_flag_0.75_asec": "match_flag_0.75_asec", # as above but with search radius of 0.75'' "match_flag_1.0_asec": "match_flag_1.0_asec", # etc "match_flag_1.25_asec": "match_flag_1.25_asec", "match_flag_1.5_asec": "match_flag_1.5_asec", "match_flag_1.75_asec": "match_flag_1.75_asec", "match_flag_2.0_asec": "match_flag_2.0_asec", } self.process_catalog("balrog_detection_catalog", "ssi_detection_catalog", column_names_det, {})