from .base_stage import PipelineStage
from .data_types import ShearCatalog, TomographyCatalog, FiducialCosmology
from .utils import read_shear_catalog_type, Splitter, rename_iterated
from .shear_calibration import Calibrator
from ceci.config import StageParameter
import numpy as np
[docs]
class TXShearCalibration(PipelineStage):
"""Split the shear catalog into calibrated bins
This class runs after source selection has been done, because the final
calibration factor can only be estimated once we have read the entire catalog
and chosen tomographic bins (since it is an ensemble average of cal factors).
Once that stage has run and computed both the tomographic bin for each sample
and the calibration factors, this stage takes the full catalog and splits it
into one HDF5 group per bin.
We are not (yet) saving per-patch catalogs for TreeCorr here. We might want to
do that later.
"""
name = "TXShearCalibration"
inputs = [
("shear_catalog", ShearCatalog),
("shear_tomography_catalog", TomographyCatalog),
("fiducial_cosmology", FiducialCosmology),
]
outputs = [
("binned_shear_catalog", ShearCatalog),
]
config_options = {
"use_true_shear": StageParameter(bool, False, msg="Use true shear values instead of observed shear"),
"chunk_rows": StageParameter(int, 100_000, msg="Number of rows to process in each chunk"),
"subtract_mean_shear": StageParameter(
bool, True, msg="Whether to subtract the mean shear from the calibrated shear"
),
"copy_redshift": StageParameter(bool, False, msg="Whether to copy the redshift column from the input catalog"),
"add_fiducial_distance": StageParameter(
bool, False, msg="Whether to add fiducial comoving distance to the output catalog"
),
"redshift_name": StageParameter(str, "redshift_true", msg="Name of the redshift column"),
"extra_cols": StageParameter(list, [""], msg="Additional columns to copy from the input catalog"),
"shear_catalog_type": StageParameter(
str, "", msg="Type of shear catalog (e.g., metadetect, metacal, lensfit, hsc)"
),
}
def run(self):
# Extract the configuration parameters
cat_type = read_shear_catalog_type(self)
use_true = self.config["use_true_shear"]
extra_cols = [c for c in self.config["extra_cols"] if c]
subtract_mean_shear = self.config["subtract_mean_shear"]
add_fiducial_distance = self.config["add_fiducial_distance"]
copy_redshift = self.config["copy_redshift"]
z_name = self.config["redshift_name"]
with self.open_input("shear_catalog", wrapper=True) as f:
bands = f.get_bands()
# this is the names of the columns in the input catalog
mag_cols_out = [f"mag_{b}" for b in bands] + [f"mag_err_{b}" for b in bands]
mag_cols_in = mag_cols_out
if self.rank == 0:
print("Copying extra columns: ", extra_cols)
print("Copying magnitude: ", mag_cols_in)
# Prepare the output file, and create a splitter object,
# whose job is to save the separate bins to separate HDF5
# extensions depending on the tomographic bin
output_file, splitter, nbin = self.setup_output(extra_cols + mag_cols_out)
# Load the calibrators. If using the true shear no calibration
# is needed
tomo_file = self.get_input("shear_tomography_catalog")
cals, cal2d = Calibrator.load(tomo_file, null=use_true)
print("Using calibration method:", cal2d.__class__.__name__)
# The catalog columns are named differently in different cases
# Get the correct shear catalogs
with self.open_input("shear_catalog", wrapper=True) as f:
cat_cols, renames = f.get_primary_catalog_names()
g = f.get_primary_catalog_group()
# cat_cols is everything we are reading in
if cat_type == "metadetect":
cat_cols = cat_cols + [f"00/{c}" for c in extra_cols + mag_cols_in]
mag_cols_in = [f"00/{c}" for c in mag_cols_in]
renames.update({f"00/{c}": c for c in extra_cols})
else:
cat_cols = cat_cols + extra_cols + mag_cols_in
renames.update(zip(mag_cols_in, mag_cols_out))
if add_fiducial_distance:
if copy_redshift:
cat_cols += [z_name]
else:
raise ValueError(f"To add fiducial distances the shear catalog needs a redshift")
if cat_type != "hsc":
output_cols = ["ra", "dec", "weight", "g1", "g2"] + extra_cols + mag_cols_out
else:
output_cols = ["ra", "dec", "weight", "g1", "g2", "c1", "c2"] + extra_cols + mag_cols_out
if add_fiducial_distance:
output_cols.append("r")
output_cols.append("z")
print("Adding fiducial distances; hopefully a mean redshift is defined")
# We parallelize by bin. This isn't ideal but we don't know the number
# of objects in each bin per chunk, so we can't parallelize in full. This
# is a quick stage though.
bins = list(range(nbin)) + ["all"]
my_bins = list(self.split_tasks_by_rank(bins))
# Print out which bins this proc will do, also as a prompt to the user
# in case they're wondering why adding procs doesn't help
if my_bins:
my_bins_text = ", ".join(str(x) for x in my_bins)
print(f"Process {self.rank} collating bins: [{my_bins_text}]")
else:
print(f"Note: Process {self.rank} will not do anything.")
# make the iterator that loops through data
it = self.combined_iterators(
self.config["chunk_rows"],
# first file
"shear_catalog",
"shear",
cat_cols,
"shear_tomography_catalog",
"tomography",
["bin"],
parallel=False,
)
# Main loop
for s, e, data in rename_iterated(it, renames):
if self.rank == 0:
print(f"Rank 0 processing data {s:,} - {e:,}")
# Rename mcal_g1 -> g1 etc
self.rename_metacal(data)
if add_fiducial_distance:
self.redshift_to_comoving(data, z_name)
# Now output the calibrated bin data for this processor
for b in my_bins:
# Select objects to go in this bin
if b == "all":
# the 2D case is any object from any other bin
w = np.where(data["bin"] >= 0)
cal = cal2d
else:
# otherwise just objects in this bin
w = np.where(data["bin"] == b)
cal = cals[b]
# Cut down the data to just this selection for output
d = {name: data[name][w] for name in output_cols}
# Calibrate the shear columns
if cat_type == "hsc":
d["g1"], d["g2"] = cal.apply(
d["g1"],
d["g2"],
d["c1"],
d["c2"],
d["aselepsf1"],
d["aselepsf2"],
d["msel"],
subtract_mean=subtract_mean_shear,
)
elif cat_type == "lensfit":
# In KiDS, the additive bias is calculated and removed per North and South field
# therefore, we add dec to split data into these fields.
# You can choose not to by setting dec_cut = 90 in the config, for example.
d["g1"], d["g2"] = cal.apply(d["g1"], d["g2"], d["dec"], subtract_mean=subtract_mean_shear)
else:
d["g1"], d["g2"] = cal.apply(d["g1"], d["g2"], subtract_mean=subtract_mean_shear)
# Write output, keeping track of sizes
splitter.write_bin(d, b)
splitter.finish(my_bins)
output_file.close()
def setup_output(self, extra_cols):
add_fiducial_distance = self.config["add_fiducial_distance"]
# count the expected number of objects per bin from the tomo data
with self.open_input("shear_tomography_catalog") as f:
counts = f["counts/counts"][:]
count2d = f["counts/counts_2d"][0]
nbin = len(counts)
# Prepare the calibrated output catalog
f = self.open_output("binned_shear_catalog", parallel=True)
# we only retain these columns
cols = ["ra", "dec", "weight", "g1", "g2"] + extra_cols
if add_fiducial_distance:
cols.extend(["r", "z"])
# structure is /shear/bin_1, /shear/bin_2, etc
g = f.create_group("shear")
# These are both the same here, but there may be some stages
# that are still expecting it to be called "nbin_source"
g.attrs["nbin"] = nbin
g.attrs["nbin_source"] = nbin
# This maps the bin numbers (and name, in the case
# of the non-tomographic "all" bin) to the number
# of objects in each, and is used by the splitter
# to initialize the output groups.
bins = {b: c for b, c in enumerate(counts)}
bins["all"] = count2d
# These are the possible integer columns
dtypes = {"id": "i8", "flags": "i8"}
splitter = Splitter(g, "bin", cols, bins, dtypes=dtypes)
return f, splitter, nbin
def rename_metacal(self, d):
# rename the columns so they're always just g1, g2.
# First determine the renaming we should do
if "true_g1" in d:
prefix = "true"
elif "mcal_g1" in d:
raise ValueError("The mcal_ prefix should have been removed in input catalogs by strip_mcal.py.")
else:
return
# Then rename
d["g1"] = d[f"{prefix}_g1"]
d["g2"] = d[f"{prefix}_g2"]
del d[f"{prefix}_g1"], d[f"{prefix}_g2"]
def redshift_to_comoving(self, d, name):
import pyccl as ccl
cosmo = self.open_input("fiducial_cosmology", wrapper=True).to_ccl()
# renaming the redshift name
d["z"] = d[name]
d["r"] = ccl.background.comoving_radial_distance(cosmo, 1 / (1 + d[name]))