Source code for txpipe.ingest.dp02

from ..base_stage import PipelineStage
from ..data_types import ShearCatalog, HDFFile
from .lsst import process_photometry_data, process_shear_data
import numpy as np
import glob
import math
from ceci.config import StageParameter


[docs] class TXIngestDataPreview02(PipelineStage): """ Ingest galaxy catalogs from DP0.2 There is no metacal on this, and there won't be. """ name = "TXIngestDataPreview02" parallel = False inputs = [] outputs = [ ("photometry_catalog", HDFFile), ("shear_catalog", ShearCatalog), ] config_options = { "pq_path": StageParameter( str, "/global/cfs/cdirs/lsst/shared/rubin/DP0.2/objectTable/", msg="Path to Parquet objectTable files." ), "tracts": StageParameter(str, "", msg="Comma-separated list of tracts to use (empty for all)."), } def run(self): from pyarrow.parquet import ParquetFile import h5py from ..utils.hdf_tools import h5py_shorten, repack tracts = self.config["tracts"] pq_path = self.config["pq_path"] cat_files = glob.glob(f"{pq_path}/objectTable*.parq") if tracts: tracts = [tract.strip() for tract in tracts.split(",")] print(f"Using {len(tracts)} tracts out of {len(cat_files)}") cat_files = [c for c in cat_files if c.split("/")[-1].split("_")[2] in tracts] if len(cat_files) != len(tracts): raise ValueError("Some tracts not found") n = 0 for fn in cat_files: with ParquetFile(fn) as f: n += f.metadata.num_rows print(f"Full catalog size = {n:,}") # Input columns for photometry photo_cols = ["objectId", "coord_ra", "coord_dec", "refExtendedness", "tract"] shape_cols = [ "objectId", "coord_ra", "coord_dec", "refExtendedness", "tract", "i_hsmShapeRegauss_e1", "i_hsmShapeRegauss_e2", "i_hsmShapeRegauss_sigma", "i_hsmShapeRegauss_flag", "i_ixx", "i_ixy", "i_iyy", "i_ixxPSF", "i_ixyPSF", "i_iyyPSF", ] # Magnitude columns, given to both photometry and shear catalogs for band in "ugrizy": for cols in [photo_cols, shape_cols]: cols.append(f"{band}_cModelFlux") cols.append(f"{band}_cModelFluxErr") cols.append(f"{band}_cModel_flag") cols = list(set(shape_cols + photo_cols)) nfile = len(cat_files) # This is the default batch size for pyarrow batch_size = 65536 start1 = 0 start2 = 0 for i, fn in enumerate(cat_files): with ParquetFile(fn) as f: n_chunk = math.ceil(f.metadata.num_rows / batch_size) it = f.iter_batches(columns=cols) for j, d in enumerate(it): d = {col.name: d[col.name].to_numpy(zero_copy_only=False) for col in d.schema} if i == 0 and j == 0: output_names = set(d.keys()) for col in cols: assert col in output_names, f"Column {col} not found" photo_data = process_photometry_data(d) shear_data = process_shear_data(d) if i == 0 and j == 0: photo_outfile = self.setup_output("photometry_catalog", "photometry", photo_data, n) shear_outfile = self.setup_output("shear_catalog", "shear", shear_data, n) end1 = start1 + len(photo_data["ra"]) end2 = start2 + len(shear_data["ra"]) self.write_output(photo_outfile, "photometry", photo_data, start1, end1) self.write_output(shear_outfile, "shear", shear_data, start2, end2) print(f"Processing chunk {j + 1}/{n_chunk} of file {i + 1}/{nfile} into rows {start1:,} - {end1:,}") start1 = end1 start2 = end2 print("Final selected objects: {end1:,} in photometry and {end2:,} in shear") # Cut down to just include stars. print("Trimming photometry columns:") for col in photo_data.keys(): print(" ", col) h5py_shorten(photo_outfile["photometry"], col, end1) print("Trimming shear columns:") for col in shear_data.keys(): print(" ", col) h5py_shorten(shear_outfile["shear"], col, end2) photo_outfile.close() shear_outfile.close() # Run h5repack on the file print("Repacking file") repack(self.get_output("photometry_catalog")) repack(self.get_output("shear_catalog")) def setup_output(self, tag, group, first_chunk, n): f = self.open_output(tag) g = f.create_group(group) for name, col in first_chunk.items(): g.create_dataset(name, shape=(n,), dtype=col.dtype) return f def write_output(self, outfile, group, data, start, end): g = outfile[group] for name, col in data.items(): g[name][start:end] = col