Source code for txpipe.twopoint_null_tests

from .base_stage import PipelineStage
from .data_types import (
    HDFFile,
    ShearCatalog,
    TomographyCatalog,
    RandomsCatalog,
    SACCFile,
    PNGFile,
    TextFile,
    QPNOfZFile,
)
import numpy as np
from .twopoint import TXTwoPoint, SHEAR_SHEAR, SHEAR_POS, POS_POS, TREECORR_CONFIG
from .utils import DynamicSplitter
from ceci.config import StageParameter


[docs] class TXStarCatalogSplitter(PipelineStage): """ Split a star catalog into bright and dim stars """ name = "TXStarCatalogSplitter" parallel = False inputs = [ ("star_catalog", HDFFile), ] outputs = [ ("binned_star_catalog", HDFFile), ] config_options = { "chunk_rows": StageParameter(int, 100_000, msg="Number of rows to process in each chunk."), "initial_size": StageParameter(int, 100_000, msg="Initial size for star catalog bins."), } def run(self): cols = ["ra", "dec"] #  Object we use to make the separate lens bins catalog output = self.open_output("binned_star_catalog") group = output.create_group("stars") group.attrs["nbin"] = 2 group.attrs["nbin_lens"] = 2 bins = { "bright": self.config["initial_size"], "dim": self.config["initial_size"], } splitter = DynamicSplitter(group, "bin", cols, bins) # Also read the r band mag. We don't save it as we don't # need it later (?) but we do use it for the split it = self.iterate_hdf( "star_catalog", "stars", cols + ["mag_r"], self.config["chunk_rows"], ) for s, e, data in it: print(f"Process 0 binning data in range {s:,} - {e:,}") r = data["mag_r"] mag_bins = np.repeat(" ", r.size) mag_bins = np.zeros(r.size, dtype=int) dim_cut = (r > 18.2) & (r < 22.0) bright_cut = (r > 14.0) & (r < 18.2) dim = {name: col[dim_cut] for name, col in data.items()} bright = {name: col[bright_cut] for name, col in data.items()} splitter.write_bin(dim, "dim") splitter.write_bin(bright, "bright") splitter.finish() output.close()
[docs] class TXGammaTFieldCenters(TXTwoPoint): """ Make diagnostic 2pt measurements of tangential shear around field centers This subclass of the standard TXTwoPoint uses the centers of exposure fields as "lenses", as a systematics test. """ name = "TXGammaTFieldCenters" inputs = [ ("binned_shear_catalog", ShearCatalog), ("shear_photoz_stack", QPNOfZFile), ("lens_photoz_stack", QPNOfZFile), ("random_cats", RandomsCatalog), ("exposures", HDFFile), ("patch_centers", TextFile), ("tracer_metadata", HDFFile), ] outputs = [ ("gammat_field_center", SACCFile), ("gammat_field_center_plot", PNGFile), ] # Add values to the config file that are not previously defined config_options = TREECORR_CONFIG | { "calcs": StageParameter( list, [0, 1, 2], msg="Which calculations to perform: 0=shear-shear, 1=shear-position, 2=position-position" ), "reduce_randoms_size": StageParameter(float, 1.0, msg="Factor to reduce the size of random catalogs."), "var_method": StageParameter(str, "shot", msg="Method for computing variance (shot, jackknife, etc.)."), "npatch": StageParameter(int, 5, msg="Number of patches for null tests."), "use_true_shear": StageParameter(bool, False, msg="Whether to use true shear values."), "subtract_mean_shear": StageParameter(bool, False, msg="Whether to subtract mean shear."), "use_randoms": StageParameter(bool, True, msg="Whether to use random catalogs."), "patch_dir": StageParameter(str, "./cache/patches", msg="Directory for storing patch files."), "low_mem": StageParameter(bool, False, msg="Whether to use low memory mode."), "chunk_rows": StageParameter(int, 100_000, msg="Number of rows to process in each chunk."), "share_patch_files": StageParameter(bool, False, msg="Whether to share patch files across processes."), "use_subsampled_randoms": StageParameter(bool, False, msg="Use subsampled randoms file for RR calculation."), } def run(self): # Before running the parent class we add source_bins and lens_bins # options that it is expecting, both set to -1 to indicate that we # will choose them automatically (below). import matplotlib matplotlib.use("agg") super().run() def read_nbin(self): # We use only a single source and lens bin in this case - # the source is the complete 2D field and the lens is the # field centers return ["all"], [0] def get_random_catalog(self, i): import treecorr if not self.config["use_randoms"]: return None rancat = treecorr.Catalog( self.get_input("random_cats"), ext="randoms", ra_col="ra", dec_col="dec", ra_units="degree", dec_units="degree", patch_centers=self.get_input("patch_centers"), save_patch_dir=self.get_patch_dir("random_cats", i), ) return rancat def get_lens_catalog(self, i): import treecorr # check if we are using the old format exposures file # or the new one with self.open_input("exposures", wrapper=False) as f: if "visits" in f.keys(): # This is the new format ext = "visits" ra_col = "ra" dec_col = "dec" else: # This is the old format ext = "exposures" ra_col = "ratel" dec_col = "dectel" assert i == 0 cat = treecorr.Catalog( self.get_input("exposures"), ext=ext, ra_col=ra_col, dec_col=dec_col, ra_units="degree", dec_units="degree", patch_centers=self.get_input("patch_centers"), save_patch_dir=self.get_patch_dir("exposures", i), ) return cat def select_calculations(self, source_list, lens_list): # We only want a single calculation, the 2D gamma_T around # the field centers return [("all", 0, SHEAR_POS)] def write_output(self, source_list, lens_list, meta, results): # This subclass only needs the root process for this task if self.rank != 0: return # we write output both to file for later and to # a plot self.write_output_sacc(meta, results) self.write_output_plot(results) def write_output_plot(self, results): import matplotlib.pyplot as plt d = results[0] dvalue = d.object.xi derror = np.sqrt(d.object.varxi) dtheta = np.exp(d.object.meanlogr) fig = self.open_output("gammat_field_center_plot", wrapper=True) plt.errorbar(dtheta, dtheta * dvalue, derror, fmt="ro", capsize=3) plt.xscale("log") plt.xlabel(r"$\theta$ / arcmin") plt.ylabel(r"$\theta \cdot \gamma_t(\theta)$") plt.title("Field Center Tangential Shear") fig.close() def write_output_sacc(self, meta, results): # We write out the results slightly differently here # beause they go to a different file and have different # tracers and tags. import sacc dt = "galaxyFieldCenter_shearDensity_xi_t" S = sacc.Sacc() with self.open_input("shear_photoz_stack", wrapper=True) as f: # The last entry represents the 2D n(z) z, Nz = f.get_2d_n_of_z(-1) # Add the data points that we have one by one, recording which # tracer they each require S.add_tracer("Misc", "fieldcenter") S.add_tracer("NZ", "source2d", z, Nz) d = results[0] assert len(results) == 1 dvalue = d.object.xi derror = np.sqrt(d.object.varxi) dtheta = np.exp(d.object.meanlogr) dnpair = d.object.npairs dweight = d.object.weight # Each of our Measurement objects contains various theta values, # and we loop through and add them all n = len(dvalue) for i in range(n): S.add_data_point( dt, ("source2d", "fieldcenter"), dvalue[i], theta=dtheta[i], error=derror[i], npair=dnpair[i], weight=dweight[i], ) self.write_metadata(S, meta) # Our data points may currently be in any order depending on which processes # ran which calculations. Re-order them. S.to_canonical_order() # Finally, save the output to Sacc file S.save_fits(self.get_output("gammat_field_center"), overwrite=True)
[docs] class TXGammaTStars(TXTwoPoint): """ Make diagnostic 2pt measurements of tangential shear around stars This subclass of the standard TXTwoPoint uses the centers of stars as "lenses", as a systematics test. """ name = "TXGammaTStars" inputs = [ ("binned_shear_catalog", ShearCatalog), ("shear_tomography_catalog", TomographyCatalog), ("shear_photoz_stack", QPNOfZFile), ("lens_photoz_stack", QPNOfZFile), ("random_cats", RandomsCatalog), ("binned_star_catalog", HDFFile), ("patch_centers", TextFile), ("tracer_metadata", HDFFile), ("binned_random_catalog", HDFFile), ] outputs = [ ("gammat_bright_stars", SACCFile), ("gammat_bright_stars_plot", PNGFile), ("gammat_dim_stars", SACCFile), ("gammat_dim_stars_plot", PNGFile), ] # Add values to the config file that are not previously defined config_options = TREECORR_CONFIG | { "calcs": StageParameter( list, [0, 1, 2], msg="Which calculations to perform: 0=shear-shear, 1=shear-position, 2=position-position" ), "reduce_randoms_size": StageParameter(float, 1.0, msg="Factor to reduce the size of random catalogs."), "var_method": StageParameter(str, "shot", msg="Method for computing variance (shot, jackknife, etc.)."), "npatch": StageParameter(int, 5, msg="Number of patches for null tests."), "use_true_shear": StageParameter(bool, False, msg="Whether to use true shear values."), "subtract_mean_shear": StageParameter(bool, False, msg="Whether to subtract mean shear."), "use_randoms": StageParameter(bool, True, msg="Whether to use random catalogs."), "patch_dir": StageParameter(str, "./cache/patches", msg="Directory for storing patch files."), "low_mem": StageParameter(bool, False, msg="Whether to use low memory mode."), "chunk_rows": StageParameter(int, 100_000, msg="Number of rows to process in each chunk."), "share_patch_files": StageParameter(bool, False, msg="Whether to share patch files across processes."), "use_subsampled_randoms": StageParameter(bool, False, msg="Use subsampled randoms file for RR calculation."), } def run(self): import matplotlib matplotlib.use("agg") super().run() def read_nbin(self): # We use two sets of stars, dim and bright return ["all"], ["bright", "dim"] def get_lens_catalog(self, i): import treecorr assert i == "bright" or i == "dim" cat = treecorr.Catalog( self.get_input("binned_star_catalog"), ext=f"stars/bin_{i}", ra_col="ra", dec_col="dec", ra_units="degree", dec_units="degree", patch_centers=self.get_input("patch_centers"), save_patch_dir=self.get_patch_dir("binned_star_catalog", i), ) return cat def get_random_catalog(self, i): import treecorr if not self.config["use_randoms"]: return None rancat = treecorr.Catalog( self.get_input("random_cats"), ext="randoms", ra_col="ra", dec_col="dec", ra_units="degree", dec_units="degree", patch_centers=self.get_input("patch_centers"), save_patch_dir=self.get_patch_dir("random_cats", i), ) return rancat def select_calculations(self, source_list, lens_list): # We only want a single calculation, the gamma_T around # the field centers return [("all", "bright", SHEAR_POS), ("all", "dim", SHEAR_POS)] def write_output(self, source_list, lens_list, meta, results): # This subclass only needs the root process for this task if self.rank != 0: return # we write output both to file for later and to a plot self.write_output_sacc(meta, results[0], "gammat_bright_stars", "Bright") self.write_output_sacc(meta, results[1], "gammat_dim_stars", "Dim") self.write_output_plot(results[0], "gammat_bright_stars_plot", "Bright") self.write_output_plot(results[1], "gammat_dim_stars_plot", "Dim") def write_output_plot(self, d, image_file, text): import matplotlib.pyplot as plt dvalue = d.object.xi derror = np.sqrt(d.object.varxi) dtheta = np.exp(d.object.meanlogr) fig = self.open_output(image_file, wrapper=True) # compute the mean and the chi^2/dof z = (dvalue) / derror chi2 = np.sum(z**2) chi2dof = chi2 / (len(dtheta) - 1) plt.errorbar( dtheta, dtheta * dvalue, dtheta * derror, fmt="ro", capsize=3, label=rf"$\chi^2/dof = {chi2dof:.2f}$", ) plt.legend(loc="best") plt.xscale("log") plt.xlabel(r"$\theta$ / arcmin") plt.ylabel(r"$\theta \cdot \gamma_t(\theta)$") plt.title(f"{text} Star Centers Tangential Shear") fig.close() def write_output_sacc(self, meta, d, sacc_file, text): # We write out the results slightly differently here # beause they go to a different file and have different # tracers and tags. import sacc dt = "galaxyStar_shearDensity_xi_t" S = sacc.Sacc() with self.open_input("shear_photoz_stack", wrapper=True) as f: z, Nz = f.get_2d_n_of_z(-1) # Add the data points that we have one by one, recording which # tracer they each require name = "{}_stars".format(text.lower()) S.add_tracer("Misc", name) S.add_tracer("NZ", "source2d", z, Nz) dvalue = d.object.xi derror = np.sqrt(d.object.varxi) dtheta = np.exp(d.object.meanlogr) dnpair = d.object.npairs dweight = d.object.weight # Each of our Measurement objects contains various theta values, # and we loop through and add them all n = len(dvalue) for i in range(n): S.add_data_point( dt, ("source2d", name), dvalue[i], theta=dtheta[i], error=derror[i], npair=dnpair[i], weight=dweight[i], ) self.write_metadata(S, meta) # Our data points may currently be in any order depending on which processes # ran which calculations. Re-order them. S.to_canonical_order() # Finally, save the output to Sacc file S.save_fits(self.get_output(sacc_file), overwrite=True)
[docs] class TXGammaTRandoms(TXTwoPoint): """ Make diagnostic 2pt measurements of tangential shear around randoms It's not clear to me that this is a useful null test; if it was we wouldn't need to subtrac this term in the Landay-Szalay estimator. This subclass of the standard TXTwoPoint uses the centers of stars as "lenses", as a systematics test. """ name = "TXGammaTRandoms" inputs = [ ("binned_shear_catalog", ShearCatalog), ("shear_photoz_stack", QPNOfZFile), ("random_cats", RandomsCatalog), ("patch_centers", TextFile), ("tracer_metadata", HDFFile), ] outputs = [ ("gammat_randoms", SACCFile), ("gammat_randoms_plot", PNGFile), ] # Add values to the config file that are not previously defined config_options = TREECORR_CONFIG | { "calcs": StageParameter( list, [0, 1, 2], msg="Which calculations to perform: 0=shear-shear, 1=shear-position, 2=position-position" ), "reduce_randoms_size": StageParameter(float, 1.0, msg="Factor to reduce the size of random catalogs."), "var_method": StageParameter(str, "shot", msg="Method for computing variance (shot, jackknife, etc.)."), "npatch": StageParameter(int, 5, msg="Number of patches for null tests."), "use_true_shear": StageParameter(bool, False, msg="Whether to use true shear values."), "subtract_mean_shear": StageParameter(bool, False, msg="Whether to subtract mean shear."), "use_randoms": StageParameter(bool, False, msg="Whether to use random catalogs."), "patch_dir": StageParameter(str, "./cache/patches", msg="Directory for storing patch files."), "low_mem": StageParameter(bool, False, msg="Whether to use low memory mode."), "chunk_rows": StageParameter(int, 100_000, msg="Number of rows to process in each chunk."), "share_patch_files": StageParameter(bool, False, msg="Whether to share patch files across processes."), "use_subsampled_randoms": StageParameter(bool, False, msg="Use subsampled randoms file for RR calculation."), } def run(self): # Before running the parent class we add source_bins and lens_bins # options that it is expecting, both set to -1 to indicate that we # will choose them automatically (below). import matplotlib matplotlib.use("agg") super().run() def read_nbin(self): # We use only a single source and lens bin in this case - # the source is the complete 2D field and the lens is the # field centers return ["all"], [0] def get_random_catalog(self, i): # override the parent method # so that we don't load the randoms here, # because if we subtract randoms from randoms # we get nothing. return None def get_lens_catalog(self, i): import treecorr assert i == 0 cat = treecorr.Catalog( self.get_input("random_cats"), ext=f"randoms", ra_col="ra", dec_col="dec", ra_units="degree", dec_units="degree", patch_centers=self.get_input("patch_centers"), save_patch_dir=self.get_patch_dir("random_cats", i), ) return cat def select_calculations(self, source_list, lens_list): # We only want a single calculation, the gamma_T around # the field centers return [("all", 0, SHEAR_POS)] def write_output(self, source_list, lens_list, meta, results): # we write output both to file for later and to # a plot self.write_output_sacc(meta, results) self.write_output_plot(results) def write_output_plot(self, results): import matplotlib.pyplot as plt d = results[0] dvalue = d.object.xi derror = np.sqrt(d.object.varxi) dtheta = np.exp(d.object.meanlogr) fig = self.open_output("gammat_randoms_plot", wrapper=True) # compute the mean and the chi^2/dof flat1 = 0 z = (dvalue - flat1) / derror chi2 = np.sum(z**2) chi2dof = chi2 / (len(dtheta) - 1) plt.errorbar( dtheta, dtheta * dvalue, dtheta * derror, fmt="ro", capsize=3, label=r"$\chi^2/dof = $" + str(chi2dof), ) plt.legend(loc="best") plt.xscale("log") plt.xlabel(r"$\theta$ / arcmin") plt.ylabel(r"$\theta \cdot \gamma_t(\theta)$") plt.title("Randoms Tangential Shear") fig.close() def write_output_sacc(self, meta, results): # We write out the results slightly differently here # beause they go to a different file and have different # tracers and tags. import sacc dt = "galaxyRandoms_shearDensity_xi_t" S = sacc.Sacc() with self.open_input("shear_photoz_stack", wrapper=True) as f: z, Nz = f.get_2d_n_of_z(-1) # Add the data points that we have one by one, recording which # tracer they each require S.add_tracer("Misc", "randoms") S.add_tracer("NZ", "source2d", z, Nz) d = results[0] assert len(results) == 1 dvalue = d.object.xi derror = np.sqrt(d.object.varxi) dtheta = np.exp(d.object.meanlogr) dnpair = d.object.npairs dweight = d.object.weight # Each of our Measurement objects contains various theta values, # and we loop through and add them all n = len(dvalue) for i in range(n): S.add_data_point( dt, ("source2d", "randoms"), dvalue[i], theta=dtheta[i], error=derror[i], npair=dnpair[i], weight=dweight[i], ) self.write_metadata(S, meta) # Our data points may currently be in any order depending on which processes # ran which calculations. Re-order them. S.to_canonical_order() # Finally, save the output to Sacc file S.save_fits(self.get_output("gammat_randoms"), overwrite=True)
# Aperture Mass class that inherits from TXTwoPoint
[docs] class TXApertureMass(TXTwoPoint): """ Measure the aperture mass statistics with TreeCorr There are real and imaginary components of the aperture mass and its cross term. """ name = "TXApertureMass" inputs = [ ("binned_shear_catalog", ShearCatalog), ("shear_photoz_stack", QPNOfZFile), ("patch_centers", TextFile), ("tracer_metadata", HDFFile), ] outputs = [ ("aperture_mass_data", SACCFile), ] # Add values to the config file that are not previously defined config_options = TREECORR_CONFIG | { "calcs": StageParameter( list, [0, 1, 2], msg="Which calculations to perform: 0=shear-shear, 1=shear-position, 2=position-position" ), "source_bins": StageParameter(list, [-1], msg="List of source bins to use (-1 means all)."), "lens_bins": StageParameter(list, [-1], msg="List of lens bins to use (-1 means all)."), "reduce_randoms_size": StageParameter(float, 1.0, msg="Factor to reduce the size of random catalogs."), "var_method": StageParameter(str, "jackknife", msg="Method for computing variance (jackknife, sample, etc.)."), "use_true_shear": StageParameter(bool, False, msg="Whether to use true shear values."), "subtract_mean_shear": StageParameter(bool, False, msg="Whether to subtract mean shear."), "use_randoms": StageParameter(bool, False, msg="Whether to use random catalogs."), "low_mem": StageParameter(bool, False, msg="Whether to use low memory mode."), "patch_dir": StageParameter(str, "./cache/patches", msg="Directory for storing patch files."), "chunk_rows": StageParameter(int, 100_000, msg="Number of rows to process in each chunk."), "share_patch_files": StageParameter(bool, False, msg="Whether to share patch files across processes."), } # These two functions can be combined into a single one. def _read_nbin_from_tomography(self): with self.open_input("binned_shear_catalog") as f: nbin_source = f["shear"].attrs["nbin_source"] source_list = range(nbin_source) lens_list = [] # Not necessary in this subclass return source_list, lens_list def select_calculations(self, source_list, lens_list): # For shear-shear we omit pairs with j>i # Lens list is an empty list here, and is unused calcs = [] k = SHEAR_SHEAR for i in source_list: for j in range(i + 1): if j in source_list: calcs.append((i, j, k)) if self.rank == 0: print(f"Running these calculations: {calcs}") return calcs def calculate_shear_shear(self, i, j): gg = super().calculate_shear_shear(i, j) gg.mapsq = gg.calculateMapSq() return gg def write_output(self, source_list, lens_list, meta, results): # lens_list is unused in this function, but should always be passed as an empty list import sacc # Names for aperture-mass correlation functions MAPSQ = "galaxy_shear_apmass2" MAPSQ_IM = "galaxy_shear_apmass2_im" MXSQ = "galaxy_shear_apmass2_cross" MXSQ_IM = "galaxy_shear_apmass2_im_cross" # Initialise SACC object S = sacc.Sacc() # We include the n(z) data in the output. # So here we load it in and add it to the data # Load the tracer data N(z) from an input file and # copy it to the output, for convenience with self.open_input("shear_photoz_stack", wrapper=True) as f: for i in source_list: z, Nz = f.get_bin_n_of_z(i) S.add_tracer("NZ", f"source_{i}", z, Nz) # Now build up the collection of data points, adding them all to the sacc for d in results: # First the tracers and generic tags tracer1 = f"source_{d.i}" tracer2 = f"source_{d.j}" # Skip empty bins if d.object is None: continue theta = np.exp(d.object.meanlogr) weight = d.object.weight err = np.sqrt(d.object.mapsq[4]) n = len(theta) for j, CORR in enumerate([MAPSQ, MAPSQ_IM, MXSQ, MXSQ_IM]): map = d.object.mapsq[j] for i in range(n): S.add_data_point( CORR, (tracer1, tracer2), map[i], theta=theta[i], error=err[i], weight=weight[i], ) # Our data points may currently be in any order depending on which processes # ran which calculations. Re-order them. S.to_canonical_order() self.write_metadata(S, meta) # Finally, save the output to Sacc file S.save_fits(self.get_output("aperture_mass_data"), overwrite=True)