Source code for txpipe.ingest.gcr

from ..base_stage import PipelineStage
from ..data_types import ShearCatalog, HDFFile
from ..shear_calibration import band_variants, metacal_variants
from ..utils import moments_to_shear
from ceci.config import StageParameter
import numpy as np
import glob
import re


[docs] class TXMetacalGCRInput(PipelineStage): """ Ingest metacal catalogs from GCRCatalogs This loads a matched shear and photometry catalog. """ name = "TXMetacalGCRInput" parallel = False inputs = [] outputs = [ ("shear_catalog", ShearCatalog), ("photometry_catalog", HDFFile), ] config_options = { "cat_name": StageParameter(str, "", msg="Name of the GCR catalog to load."), "single_tract": StageParameter(str, "", msg="Single tract to use (optional)."), "length": StageParameter(int, 0, msg="Pre-known length, if the catalog has been checked at previously."), "table_dir": StageParameter(str, "", msg="Directory for table files (optional)."), "data_release": StageParameter(str, "", msg="Data release identifier (optional)."), } def run(self): import GCRCatalogs import GCR import h5py # Open input data. We do not treat this as a formal "input" # since it's the starting point of the whol pipeline and so is # not in a TXPipe format. cat_name = self.config["cat_name"] # If set, override some keys from the config config_overwrite = {} for key in ["table_dir", "data_release"]: if self.config[key]: config_overwrite[key] = self.config[key] print(f"Loading catalog {cat_name} + {config_overwrite}") cat = GCRCatalogs.load_catalog(cat_name, config_overwrite=config_overwrite) # Total size is needed to set up the output file, # although in larger files it is a little slow to compute this. if self.config["length"] == 0: n = len(cat) print(f"Total catalog size = {n}") else: n = self.config["length"] print(f"Using fixed specified size = {n}") # This option, which prevented memory leaks with a previous # catalog format, appears not to work for Parquet catalogs, # but the memory leaks don't happen in that case either. try: cat.master.use_cache = False except AttributeError: pass available = cat.list_all_quantities() bands = [] for b in "ugrizy": if f"mcal_mag_{b}" in available: bands.append(b) # Columns that we will need. # These are the input names; we strip off the mcal_ prefix # for the output shear_cols = ( [ "id", "ra", "dec", "mcal_psf_g1", "mcal_psf_g2", "mcal_psf_T_mean", "mcal_flags", ] + metacal_variants("mcal_g1", "mcal_g2", "mcal_T", "mcal_s2n") + band_variants(bands, "mcal_mag", "mcal_mag_err", shear_catalog_type="metacal") ) # Input columns for photometry photo_cols = ["id", "ra", "dec", "extendedness", "tract"] # Photometry columns (non-metacal) for band in "ugrizy": photo_cols.append(f"mag_{band}") photo_cols.append(f"magerr_{band}") photo_cols.append(f"snr_{band}_cModel") # For shear we just add a weight column, and the non-rounded PSF estimates shear_out_cols = shear_cols + ["weight"] # We want these in the input but not the output as we construct # other values from them instead shear_cols += ["IxxPSF", "IxyPSF", "IyyPSF"] # For the photometry output we strip off the _cModeel suffix. photo_out_cols = [col[:-7] if col.endswith("_cModel") else col for col in photo_cols] # eliminate duplicates before loading cols = list(set(shear_cols + photo_cols)) start = 0 shear_output = None photo_output = None # Remove the mcal_ prefix from the output names. shear_out_cols = self.strip_mcal(shear_out_cols) # Loop through the data, as chunke natively by GCRCatalogs single_tract = self.config["single_tract"] if single_tract: kwargs = {"native_filters": f"tract == {single_tract}"} print(f"Selecting one tract only: {single_tract}") else: kwargs = {} for data in cat.get_quantities(cols, return_iterator=True, **kwargs): # Some columns have different names in input than output self.rename_columns(data) self.add_weight_column(data) data = self.strip_mcal(data) # First chunk of data we use to set up the output # It is easier this way (no need to check types etc) # if we change the column list if shear_output is None: shear_output = self.setup_output("shear_catalog", "shear", data, shear_out_cols, n) photo_output = self.setup_output("photometry_catalog", "photometry", data, photo_out_cols, n) # Write out this chunk of data to HDF end = start + len(data["ra"]) print(f" Saving {start} - {end}") self.write_output(shear_output, "shear", shear_out_cols, start, end, data) self.write_output(photo_output, "photometry", photo_out_cols, start, end, data) start = end # All done! photo_output.close() shear_output.close() def strip_mcal(d): if isinstance(d, list): return [col.remove_prefix("mcal_") for col in d] else: return {k.remove_prefix("mcal_"): v for k, v in d.items()} def rename_columns(self, data): for band in "ugrizy": data[f"snr_{band}"] = data[f"snr_{band}_cModel"] del data[f"snr_{band}_cModel"] Ixx = data["IxxPSF"] Ixy = data["IxyPSF"] Iyy = data["IyyPSF"] data["psf_g1"], data["psf_g2"] = moments_to_shear(Ixx, Iyy, Ixy) def setup_output(self, name, group, cat, cols, n): import h5py f = self.open_output(name) g = f.create_group(group) g.attrs["bands"] = "ugrizy" for name in cols: g.create_dataset(name, shape=(n,), dtype=cat[name].dtype) return f def add_weight_column(self, data): n = len(data["ra"]) data["weight"] = np.ones(n) def write_output(self, output_file, group_name, cols, start, end, data): g = output_file[group_name] for name in cols: g[name][start:end] = data[name]
[docs] class TXIngestStars(PipelineStage): """ Ingest a star catalog from GCRCatalogs Includes shape information (i.e. PSF samples) and whether the star was used in PSF estimation. """ name = "TXIngestStars" parallel = False inputs = [] outputs = [ ("star_catalog", HDFFile), ] config_options = { "single_tract": StageParameter(str, "", msg="Single tract to use (optional)."), "cat_name": StageParameter(str, "", msg="Name of the GCR catalog to load."), "length": StageParameter(int, 0, msg="Pre-known length, if the catalog has been checked at previously."), } def run(self): import GCRCatalogs import GCR import h5py from ..utils.hdf_tools import repack, h5py_shorten cat_name = self.config["cat_name"] cat = GCRCatalogs.load_catalog(cat_name) # This is the max possible length of the stars. # Actually much smaller of course if self.config["length"]: n = self.config["length"] print(f"Using fixed size {n}") else: n = len(cat) print(f"Full catalog size = {n}") # Columns we need to load in for the star data - # the measured object moments and the identifier telling us # if it was used in PSF measurement star_cols = [ "id", "ra", "dec", "calib_psf_used", "calib_psf_reserved", "extendedness", "tract", "mag_u", "mag_g", "mag_r", "mag_i", "mag_z", "mag_y", "Ixx", "Ixy", "Iyy", "IxxPSF", "IxyPSF", "IyyPSF", ] # The star output names are mostly different to the input names star_out_cols = [ # These are read directly "id", "ra", "dec", "calib_psf_used", "calib_psf_reserved", "extendedness", "tract", "mag_u", "mag_g", "mag_r", "mag_i", "mag_z", "mag_y", # These are calculated "measured_e1", "measured_e2", "model_e1", "model_e2", "measured_T", "model_T", ] single_tract = self.config["single_tract"] if single_tract: kwargs = {"native_filters": f"tract == {single_tract}"} print(f"Selecting one tract only: {single_tract}") else: kwargs = {} # As with the galaxy ingestion, this option doesn't # work with Parquet catalogs. try: cat.master.use_cache = False except AttributeError: pass start = 0 star_start = 0 star_output = None for data in cat.get_quantities(star_cols, return_iterator=True, **kwargs): end = start + len(data["ra"]) print(f"Reading data {start:,} - {end:,}") # Some columns have different names in input than output star_data = self.compute_star_data(data) star_end = star_start + len(star_data["ra"]) if star_output is None: star_output = self.setup_output("star_catalog", "stars", star_data, star_out_cols, n) self.write_output(star_output, "stars", star_out_cols, star_start, star_end, star_data) start = end star_start = star_end # Cut down to just include stars. for col in star_out_cols: h5py_shorten(star_output["stars"], col, star_end) star_output.close() # Run h5repack on the file repack(self.get_output("star_catalog")) def setup_output(self, name, group, cat, cols, n): import h5py f = self.open_output(name) g = f.create_group(group) for name in cols: g.create_dataset(name, shape=(n,), dtype=cat[name].dtype) return f def write_output(self, output_file, group_name, cols, start, end, data): g = output_file[group_name] for name in cols: g[name][start:end] = data[name] def compute_star_data(self, data): star_data = {} # We specifically use the stars chosen for PSF measurement star = data["calib_psf_used"] | data["calib_psf_reserved"] for col in [ "id", "ra", "dec", "calib_psf_used", "calib_psf_reserved", "extendedness", "tract", ]: star_data[col] = data[col][star] for b in "ugrizy": star_data[f"mag_{b}"] = data[f"mag_{b}"][star] # HSM reports moments. We convert these into # ellipticities. We do this for both the star shape # itself and the PSF model. kinds = [("", "measured_"), ("PSF", "model_")] for in_name, out_name in kinds: # Pulling out the correct moment columns Ixx = data[f"Ixx{in_name}"][star] Iyy = data[f"Iyy{in_name}"][star] Ixy = data[f"Ixy{in_name}"][star] # Conversion of moments to e1, e2 T = Ixx + Iyy e1, e2 = moments_to_shear(Ixx, Iyy, Ixy) # save to output star_data[f"{out_name}e1"] = e1 star_data[f"{out_name}e2"] = e2 star_data[f"{out_name}T"] = T return star_data
# response to an old Stack Overflow question of mine: # https://stackoverflow.com/questions/33529057/indices-that-intersect-and-sort-two-numpy-arrays def intersecting_indices(x, y): u_x, u_idx_x = np.unique(x, return_index=True) u_y, u_idx_y = np.unique(y, return_index=True) i_xy = np.intersect1d(u_x, u_y, assume_unique=True) i_idx_x = u_idx_x[np.in1d(u_x, i_xy, assume_unique=True)] i_idx_y = u_idx_y[np.in1d(u_y, i_xy, assume_unique=True)] return i_idx_x, i_idx_y