Source code for txpipe.photoz_stack

from .base_stage import PipelineStage
from .data_types import TomographyCatalog, HDFFile, PNGFile, QPPDFFile, QPNOfZFile
from .utils.mpi_utils import in_place_reduce
from .utils import rename_iterated
import numpy as np
import warnings
from ceci.config import StageParameter


[docs] class TXPhotozStack(PipelineStage): """ Naive stacker using QP. Can only cope with hist or interp PDF types. Ideally this should be replaced by a RAIL stage. """ name = "TXPhotozStack" inputs = [ ("photoz_pdfs", QPPDFFile), ("tomography_catalog", TomographyCatalog), ("weights_catalog", HDFFile), ] outputs = [ ("photoz_stack", QPNOfZFile), ] config_options = { "chunk_rows": StageParameter(int, 5000, msg="Number of rows to process in each chunk."), "tomo_name": StageParameter(str, "source", msg="Name of the tomographic binning."), "weight_col": StageParameter(str, "shear/00/weight", msg="Column name for weights in the input catalog."), "zmax": StageParameter(float, 0.0, msg="Maximum redshift to use if not specified in input PDFs."), "nz": StageParameter(int, 0, msg="Number of redshift histogram bins."), } def run(self): import qp qp_filename = self.get_input("photoz_pdfs") tomo_cat = self.open_input("tomography_catalog") if self.get_input("weights_catalog") == "none": weight_cat = None else: weight_cat = self.open_input("weights_catalog") weight_col = self.config["weight_col"] tomo_name = self.config["tomo_name"] chunk_rows = self.config["chunk_rows"] with self.open_input("tomography_catalog", wrapper=True) as f: nbin = f.read_nbin() with self.open_input("photoz_pdfs", wrapper=True) as f: z = f.get_z() pdf_type = f.get_pdf_type() nz = z.size if pdf_type == "interp" else z.size - 1 if pdf_type == "hist": nz = z.size - 1 elif pdf_type == "interp": nz = z.size else: raise ValueError(f"TXPipe cannot yet use QP PDF type {pdf_type}") pdfs = np.zeros((nbin + 1, nz)) total_weight = np.zeros(nbin + 1) with self.open_input("photoz_pdfs", wrapper=True) as f: for start, end, qp_chunk in f.iterate(chunk_rows, rank=self.rank, size=self.size): print(f"Rank {self.rank} stacking PDFs {start} to {end}") bins = tomo_cat[f"tomography/bin"][start:end] if weight_cat is None: weights = None else: weights = weight_cat[weight_col][start:end] for i in range(nbin): sel = bins == i qp_bin = qp_chunk[sel] # This is a bit of a hack - we want to leave the QP # object in whichever form it naturally came, so we # query it internally and stack either the PDFs or yvals. if pdf_type == "hist": pdfs_chunk = qp_bin.gen_obj.pdfs elif pdf_type == "interp": pdfs_chunk = qp_bin.gen_obj.yvals else: raise ValueError(f"TXPipe cannot yet use QP PDF type {pdf_type}") if weights is None: chunk_stack = pdfs_chunk.sum(axis=0) pdfs[i] += chunk_stack total_weight[i] += qp_bin.npdf # 2D bin pdfs[-1] += chunk_stack total_weight[-1] += qp_bin.npdf else: # This is not yet tested! chunk_stack = weights[sel] @ pdfs_chunk w = weights[sel].sum() pdfs[i] += chunk_stack total_weight[i] += w pdfs[-1] += chunk_stack total_weight[-1] += w # Collect the results from all processors in_place_reduce(pdfs, self.comm) in_place_reduce(total_weight, self.comm) # only root saves output if self.rank == 0: pdfs /= total_weight[:, None] # Create a qp object for the n(z) information that we have. if pdf_type == "interp": q = qp.Ensemble(qp.interp, data={"xvals": z, "yvals": pdfs}) elif pdf_type == "hist": q = qp.Ensemble(qp.hist, data={"bins": z, "pdfs": pdfs.T}) else: raise ValueError(f"TXPipe cannot yet use QP PDF type {pdf_type}") with self.open_output("photoz_stack", "w") as f: f.write_ensemble(q)
[docs] class TXTruePhotozStack(PipelineStage): """ Make an ideal true source n(z) using true redshifts Fake an n(z) by histogramming the true redshift values for each object. """ name = "TXTruePhotozStack" inputs = [("tomography_catalog", TomographyCatalog), ("catalog", HDFFile), ("weights_catalog", HDFFile)] outputs = [ ("photoz_stack", QPNOfZFile), ] config_options = { "chunk_rows": StageParameter(int, 5000, msg="Number of rows to read at once."), "zmax": StageParameter(float, 0.0, msg="Maximum redshift for stacking."), "nz": StageParameter(int, 0, msg="Number of redshift histogram bins."), "weight_col": StageParameter(str, "weight", msg="Column name for weights in the input catalog."), "redshift_group": StageParameter(str, "", msg="Group name for redshift column in input file."), "redshift_col": StageParameter(str, "redshift_true", msg="Column name for true redshift in input file."), } def run(self): import qp # Create the stack objects nz = self.config["nz"] zmax = self.config["zmax"] z = np.linspace(0, zmax, nz + 1) with self.open_input("tomography_catalog", wrapper=True) as f: nbin = f.read_nbin() if self.get_input("weights_catalog") == "none": weight_col = None else: weight_col = self.config["weight_col"] histograms = np.zeros((nbin + 1, nz)) total_weight = np.zeros((nbin + 1)) # So we just do a single loop through the pair of files. for s, e, data in self.data_iterator(weight_col): # Feed back on our progress print(f"Process {self.rank} read data chunk {s:,} - {e:,}") # Add data to the stacks self.stack_data(data, histograms, total_weight, zmax, weight_col) # Sum the stacks in_place_reduce(histograms, self.comm) in_place_reduce(total_weight, self.comm) # Normalize the histograms histograms /= total_weight[:, None] # only root saves output if self.rank == 0: # Create a qp object for the n(z) information that we have. q = qp.Ensemble(qp.hist, data={"bins": z, "pdfs": histograms}) with self.open_output("photoz_stack", wrapper=True) as f: f.write_ensemble(q) def data_iterator(self, weight_col): # This collects together matching inputs from the different # input files and returns an iterator to them which yields # start, end, data redshift_group = self.config["redshift_group"] redshift_col = self.config["redshift_col"] # basic arguments to the iterator function input_spec = [ "tomography_catalog", "tomography", ["bin"], "catalog", redshift_group, [redshift_col], ] # If we have weights, add them to the iterator if weight_col is not None: input_spec.extend(["weights_catalog", "/", [weight_col]]) return self.combined_iterators(self.config["chunk_rows"], *input_spec) def stack_data(self, data, histograms, total_weight, zmax, weight_col): # Stack the data from a single chunk into the histograms # and total weight arrays redshift_col = self.config["redshift_col"] z = data[redshift_col] bin = data["bin"] if weight_col: weights = data[weight_col] else: weights = np.ones_like(z) # Sizes for the histogram nbin = total_weight.shape[0] - 1 nz = histograms.shape[1] for i in range(nbin): sel = bin == i h = np.histogram(z[sel], bins=nz, range=(0, zmax), weights=weights[sel])[0] w = weights[sel].sum() histograms[i] += h total_weight[i] += w # The 2D bin histograms[nbin] += h total_weight[nbin] += w
[docs] class TXPhotozPlot(PipelineStage): """ Make n(z) plots of source and lens galaxies """ parallel = False name = "TXPhotozPlot" inputs = [("photoz_stack", QPNOfZFile)] outputs = [ ("nz_plot", PNGFile), ] config_options = { "label": StageParameter(str, "", msg="Label for the n(z) plot."), "zmax": StageParameter(float, 3.0, msg="Maximum redshift for plotting."), } def run(self): import matplotlib matplotlib.use("agg") import matplotlib.pyplot as plt with self.open_input("photoz_stack", wrapper=True) as f: ensemble = f.read_ensemble() label = self.config["label"] if label: label = f"{label} n(z)" else: label = "n(z)" with self.open_output("nz_plot", wrapper=True) as fig: ax = fig.file.gca() zmax = self.config["zmax"] ax.set_xlim(0, zmax) zg = np.linspace(0, zmax, int(zmax * 100)) pdfs = ensemble.pdf(zg) for i in range(ensemble.npdf - 1): ax.plot(zg, pdfs[i], label=f"Bin {i}") ax.plot(zg, pdfs[ensemble.npdf - 1] * (ensemble.npdf - 1), label=f"Total", linestyle="--") plt.legend() plt.title(label) plt.xlim(xmin=0) plt.xlabel("z") plt.ylabel("n(z)")