Source code for txpipe.twopoint

from .base_stage import PipelineStage
from .data_types import (
    HDFFile,
    ShearCatalog,
    SACCFile,
    TextFile,
    MapsFile,
    QPNOfZFile,
)
from .utils.patches import PatchMaker
from ceci.config import StageParameter
import numpy as np
import collections
import sys
import os
import pathlib
from time import perf_counter
import gc
from .utils import choose_pixelization

# This creates a little mini-type, like a struct,
# for holding individual measurements
Measurement = collections.namedtuple("Measurement", ["corr_type", "object", "i", "j"])

SHEAR_SHEAR = 0
SHEAR_POS = 1
POS_POS = 2

# external cross correlations
POS_EXT = 3
SHEAR_EXT = 4

TREECORR_CONFIG = {
    "min_sep": StageParameter(float, 0.5, msg="Minimum separation for correlation measurements"),
    "max_sep": StageParameter(float, 300.0, msg="Maximum separation for correlation measurements"),
    "nbins": StageParameter(int, 9, msg="Number of separation bins"),
    "bin_slop": StageParameter(float, 0.0, msg="Tolerance for bin sloppiness in TreeCorr"),
    "sep_units": StageParameter(str, "arcmin", msg="Units for separation (arcmin, degrees, etc.)"),
    "flip_g1": StageParameter(bool, False, msg="Whether to flip the sign of g1"),
    "flip_g2": StageParameter(bool, True, msg="Whether to flip the sign of g2"),
    "cores_per_task": StageParameter(int, 20, msg="Number of CPU cores to use per task"),
    "verbose": StageParameter(int, 1, msg="Verbosity level for TreeCorr output"),
}


[docs] class TXTwoPoint(PipelineStage): """ Make 2pt measurements using TreeCorr This stage make the full set of cosmic shear, galaxy-galaxy lensing, and galaxy density measurements on the tomographic catalog using TreeCorr. Results are saved to a sacc file. """ name = "TXTwoPoint" inputs = [ ("binned_shear_catalog", ShearCatalog), ("binned_lens_catalog", HDFFile), ("binned_random_catalog", HDFFile), ("binned_random_catalog_sub", HDFFile), ("shear_photoz_stack", QPNOfZFile), ("lens_photoz_stack", QPNOfZFile), ("patch_centers", TextFile), ("tracer_metadata", HDFFile), ] outputs = [("twopoint_data_real_raw", SACCFile), ("twopoint_gamma_x", SACCFile)] # Add values to the config file that are not previously defined config_options = TREECORR_CONFIG | { "calcs": StageParameter( list, [0, 1, 2], msg="Which calculations to perform: 0=shear-shear, 1=shear-position, 2=position-position" ), "source_bins": StageParameter(list, [-1], msg="List of source bins to use (-1 means all)"), "lens_bins": StageParameter(list, [-1], msg="List of lens bins to use (-1 means all)"), "reduce_randoms_size": StageParameter(float, 1.0, msg="Factor to reduce the size of random catalogs"), "do_shear_shear": StageParameter(bool, True, msg="Whether to compute shear-shear correlations"), "do_shear_pos": StageParameter(bool, True, msg="Whether to compute shear-position correlations"), "do_pos_pos": StageParameter(bool, True, msg="Whether to compute position-position correlations"), "auto_only": StageParameter(bool, False, msg="Whether to compute only auto-correlations"), "var_method": StageParameter(str, "jackknife", msg="Method for computing variance (jackknife, sample, etc.)"), "use_randoms": StageParameter(bool, True, msg="Whether to use random catalogs"), "low_mem": StageParameter(bool, False, msg="Whether to use low memory mode"), "patch_dir": StageParameter(str, "./cache/patches", msg="Directory for storing patch files"), "chunk_rows": StageParameter(int, 100_000, msg="Number of rows to process in each chunk"), "share_patch_files": StageParameter(bool, False, msg="Whether to share patch files across processes"), "metric": StageParameter(str, "Euclidean", msg="Distance metric to use (Euclidean, Arc, etc.)"), "gaussian_sims_factor": StageParameter( list, default=[1.0], msg="Factor by which to decrease lens density to account for increased density contrast.", ), "use_subsampled_randoms": StageParameter(bool, True, msg="Use subsampled randoms file for RR calculation"), } def validate(self): if self.__class__ is not TXTwoPoint: return if self.config["do_pos_pos"] or self.config["do_shear_pos"]: with self.open_input("lens_photoz_stack", wrapper=True) as f: # For both source and lens z, Nz = f.get_bin_n_of_z(0) if self.config["do_shear_shear"] or self.config["do_shear_pos"]: with self.open_input("shear_photoz_stack", wrapper=True) as g: z, Nz = g.get_bin_n_of_z(0) def run(self): """ Run the analysis for this stage. """ import sacc import healpy import treecorr # Binning information source_list, lens_list = self.read_nbin() self.config["num_threads"] = int(os.environ.get("OMP_NUM_THREADS", 1)) if self.rank == 0: # This is a workaround for the fact the the ceci config stuff doesn't # quite handle the get method properly. # Which metrics are available, and how they are interpreted, depends on # whether a distance is in the catalogs returned in get_shear_catalog # and friends, below. In this base class only the 2D metrics will be # available, but subclasses can specify to load a distance column too. metric = self.config["metric"] if "metric" in self.config else "Euclidean" print(f'Running TreeCorr with metric "{metric}"') # Calculate metadata like the area and related # quantities meta = self.read_metadata() # Choose which pairs of bins to calculate calcs = self.select_calculations(source_list, lens_list) sys.stdout.flush() # Split the catalogs into patch files self.prepare_patches(calcs, meta) results = [] for i, j, k in calcs: result = self.call_treecorr(i, j, k) results.append(result) if self.comm: self.comm.Barrier() # Save the results self.write_output(source_list, lens_list, meta, results) def select_calculations(self, source_list, lens_list): calcs = [] # For shear-shear we omit pairs with j>i if self.config["do_shear_shear"]: print("DOING SHEAR-SHEAR") k = SHEAR_SHEAR for i in source_list: for j in range(i + 1): if j in source_list: calcs.append((i, j, k)) # For shear-position we use all pairs if self.config["do_shear_pos"]: print("DOING SHEAR-POS") k = SHEAR_POS for i in source_list: for j in lens_list: calcs.append((i, j, k)) # For position-position we omit pairs with j>i if self.config["do_pos_pos"]: print("DOING POS-POS") if not self.config["use_randoms"]: raise ValueError("You need to have a random catalog to calculate position-position correlations") k = POS_POS if self.config["auto_only"]: for i in lens_list: calcs.append((i, i, k)) else: for i in lens_list: for j in range(i + 1): if j in lens_list: calcs.append((i, j, k)) if self.rank == 0: print(f"Running {len(calcs)} calculations: {calcs}") return calcs def read_nbin(self): """ Determine the bins to use in this analysis, either from the input file or from the configuration. """ if self.config["source_bins"] == [-1] and self.config["lens_bins"] == [-1]: source_list, lens_list = self._read_nbin_from_tomography() else: source_list, lens_list = self._read_nbin_from_config() ns = len(source_list) nl = len(lens_list) if self.rank == 0: print(f"Running with {ns} source bins and {nl} lens bins") return source_list, lens_list # These two functions can be combined into a single one. def _read_nbin_from_tomography(self): if self.get_input("binned_shear_catalog") == "none": nbin_source = 0 else: with self.open_input("binned_shear_catalog") as f: nbin_source = f["shear"].attrs["nbin_source"] if self.get_input("binned_lens_catalog") == "none": nbin_lens = 0 else: with self.open_input("binned_lens_catalog") as f: nbin_lens = f["lens"].attrs["nbin_lens"] source_list = list(range(nbin_source)) lens_list = list(range(nbin_lens)) return source_list, lens_list def _read_nbin_from_config(self): # TODO handle the case where the user only specefies # bins for only sources or only lenses source_list = self.config["source_bins"] lens_list = self.config["lens_bins"] # catch bad input tomo_source_list, tomo_lens_list = self._read_nbin_from_tomography() tomo_nbin_source = len(tomo_source_list) tomo_nbin_lens = len(tomo_lens_list) nbin_source = len(source_list) nbin_lens = len(lens_list) if source_list == [-1]: source_list = tomo_source_list if lens_list == [-1]: lens_list = tomo_lens_list # if more bins are input than exist, raise an error if not nbin_source <= tomo_nbin_source: raise ValueError(f"Requested too many source bins in the config ({nbin_source}): max is {tomo_nbin_source}") if not nbin_lens <= tomo_nbin_lens: raise ValueError(f"Requested too many lens bins in the config ({nbin_lens}): max is {tomo_nbin_lens}") # make sure the bin numbers actually exist for i in source_list: if i not in tomo_source_list: raise ValueError(f"Requested source bin {i} that is not in the input file") for i in lens_list: if i not in tomo_lens_list: raise ValueError(f"Requested lens bin {i} that is not in the input file") return source_list, lens_list def add_data_points(self, S, results): import treecorr import sacc XI = "combined" XIP = sacc.standard_types.galaxy_shear_xi_plus XIM = sacc.standard_types.galaxy_shear_xi_minus GAMMAT = sacc.standard_types.galaxy_shearDensity_xi_t GAMMAX = sacc.standard_types.galaxy_shearDensity_xi_x WTHETA = sacc.standard_types.galaxy_density_xi comb = [] for index, d in enumerate(results): # First the tracers and generic tags tracer1 = f"source_{d.i}" if d.corr_type in [XI, GAMMAT] else f"lens_{d.i}" tracer2 = f"source_{d.j}" if d.corr_type in [XI] else f"lens_{d.j}" # This happens when there is an empty bin. We can't do a covariance # here, or anything useful, really, so we just skip this bin. if d.object is None: continue theta = np.exp(d.object.meanlogr) npair = d.object.npairs weight = d.object.weight # xip / xim is a special case because it has two observables. # the other two are together below if d.corr_type == XI: xip = d.object.xip xim = d.object.xim xiperr = np.sqrt(d.object.varxip) ximerr = np.sqrt(d.object.varxim) n = len(xip) # add all the data points to the sacc for i in range(n): S.add_data_point( XIP, (tracer1, tracer2), xip[i], theta=theta[i], error=xiperr[i], npair=npair[i], weight=weight[i], ) for i in range(n): S.add_data_point( XIM, (tracer1, tracer2), xim[i], theta=theta[i], error=ximerr[i], npair=npair[i], weight=weight[i], ) else: if self.config["gaussian_sims_factor"] != [1.0]: # only for gammat and wtheta, for the gaussian simulations we need to scale the measurements up to correct for # the scaling of the density field when building the simulations. if "lens" in tracer2: if "lens" in tracer1: scaling_factor = ( self.config["gaussian_sims_factor"][int(tracer1[-1])] * self.config["gaussian_sims_factor"][int(tracer2[-1])] ) else: scaling_factor = self.config["gaussian_sims_factor"][int(tracer2[-1])] d.object.xi *= scaling_factor d.object.varxi *= scaling_factor**2 xi = d.object.xi err = np.sqrt(d.object.varxi) n = len(xi) for i in range(n): S.add_data_point( d.corr_type, (tracer1, tracer2), xi[i], theta=theta[i], error=err[i], weight=weight[i], ) # We build up the comb list to get the covariance of it later # in the same order as our data points comb.append(d.object) # Add the covariance. There are several different jackknife approaches # available - see the treecorr docs if treecorr.__version__.startswith("4.2."): if self.rank == 0: print("Using old TreeCorr - covariance may be slow. Consider using 4.3 from github main branch.") cov = treecorr.estimate_multi_cov(comb, self.config["var_method"]) else: if self.rank == 0: print("Using new TreeCorr 4.3 or above") cov = treecorr.estimate_multi_cov(comb, self.config["var_method"], comm=self.comm) S.add_covariance(cov) def add_gamma_x_data_points(self, S, results): import treecorr import sacc XI = "combined" GAMMAT = sacc.standard_types.galaxy_shearDensity_xi_t GAMMAX = sacc.standard_types.galaxy_shearDensity_xi_x covs = [] for index, d in enumerate(results): tracer1 = f"source_{d.i}" if d.corr_type in [XI, GAMMAT] else f"lens_{d.i}" tracer2 = f"source_{d.j}" if d.corr_type in [XI] else f"lens_{d.j}" if d.corr_type == GAMMAT: theta = np.exp(d.object.meanlogr) npair = d.object.npairs weight = d.object.weight xi_x = d.object.xi_im covX = d.object.estimate_cov("shot") # TreeCorr v5 returns the diagonal of the covariance matrix # instead of a full but diagal (so almost all zero) format. if treecorr.__version_info__[0] >= 5: covX = np.diag(covX) covs.append(covX) err = np.sqrt(np.diag(covX)) n = len(xi_x) for i in range(n): S.add_data_point( GAMMAX, (tracer1, tracer2), xi_x[i], theta=theta[i], error=err[i], weight=weight[i], ) S.add_covariance(covs) def write_output(self, source_list, lens_list, meta, results): import sacc import treecorr XI = "combined" XIP = sacc.standard_types.galaxy_shear_xi_plus XIM = sacc.standard_types.galaxy_shear_xi_minus GAMMAT = sacc.standard_types.galaxy_shearDensity_xi_t GAMMAX = sacc.standard_types.galaxy_shearDensity_xi_x WTHETA = sacc.standard_types.galaxy_density_xi S = sacc.Sacc() S2 = sacc.Sacc() # We include the n(z) data in the output. # So here we load it in and add it to the data # Load the tracer data N(z) from an input file and # copy it to the output, for convenience if self.config["do_shear_pos"] or self.config["do_shear_shear"]: if source_list: with self.open_input("shear_photoz_stack", wrapper=True) as f: for i in source_list: z, Nz = f.get_bin_n_of_z(i) S.add_tracer("NZ", f"source_{i}", z, Nz) if self.config["do_shear_pos"] == True: S2.add_tracer("NZ", f"source_{i}", z, Nz) else: sys.exit("Requesting a measurement that requires source galaxies but no source_list provided") if self.config["do_pos_pos"] or self.config["do_shear_pos"]: if lens_list: with self.open_input("lens_photoz_stack", wrapper=True) as f: # For both source and lens for i in lens_list: z, Nz = f.get_bin_n_of_z(i) S.add_tracer("NZ", f"lens_{i}", z, Nz) if self.config["do_shear_pos"] == True: S2.add_tracer("NZ", f"lens_{i}", z, Nz) else: sys.exit("Requesting a measurement that requires lens galaxies but no lens_list provided") # Now build up the collection of data points, adding them all to # the sacc data one by one. self.add_data_points(S, results) # The other processes are only needed for the covariance estimation. # They do a bunch of other stuff here that isn't actually needed, but # it should all be very fast. After this point they are not needed # at all so return if self.rank != 0: return # Our data points may currently be in any order depending on which processes # ran which calculations. Re-order them. S.to_canonical_order() self.write_metadata(S, meta) # Finally, save the output to Sacc file S.save_fits(self.get_output("twopoint_data_real_raw"), overwrite=True) # Adding the gammaX calculation: if self.config["do_shear_pos"] == True: self.add_gamma_x_data_points(S2, results) S2.to_canonical_order() self.write_metadata(S2, meta) # always write the file, even if it is empty S2.save_fits(self.get_output("twopoint_gamma_x"), overwrite=True) def write_metadata(self, S, meta): # We also save the associated metadata to the file for k, v in meta.items(): if np.isscalar(v): S.metadata[k] = v else: for i, vi in enumerate(v): S.metadata[f"{k}_{i}"] = vi # Add provenance metadata. In managed formats this is done # automatically, but because the Sacc library is external # we do it manually here. provenance = self.gather_provenance() provenance.update(SACCFile.generate_provenance()) for key, value in provenance.items(): if isinstance(value, str) and "\n" in value: values = value.split("\n") for i, v in enumerate(values): S.metadata[f"provenance/{key}_{i}"] = v else: S.metadata[f"provenance/{key}"] = value def call_treecorr(self, i, j, k): """ This is a wrapper for interaction with treecorr. """ import sacc import pickle # TODO: fix up the caching code if self.name == "TXTwoPoint" or self.name == "TXTwoPointPixel": pickle_filename = self.get_output("twopoint_data_real_raw") + f".checkpoint-{i}-{j}-{k}.pkl" # pickle_filename = f"treecorr-cache-{i}-{j}-{k}.pkl" if os.path.exists(pickle_filename): print(f"{self.rank} WARNING USING THIS PICKLE FILE I FOUND: {pickle_filename}") with open(pickle_filename, "rb") as f: result = pickle.load(f) return result if k == SHEAR_SHEAR: xx = self.calculate_shear_shear(i, j) xtype = "combined" elif k == SHEAR_POS: xx = self.calculate_shear_pos(i, j) xtype = sacc.standard_types.galaxy_shearDensity_xi_t elif k == POS_POS: xx = self.calculate_pos_pos(i, j) xtype = sacc.standard_types.galaxy_density_xi else: raise ValueError(f"Unknown correlation function {k}") # Force garbage collection here to make sure all the # catalogs are definitely freed gc.collect() # The measurement object collects the results and type info. # we use it because the ordering will not be simple if we have # parallelized, so it's good to keep explicit track. result = Measurement(xtype, xx, i, j) sys.stdout.flush() if self.comm: self.comm.Barrier() if self.name == "TXTwoPoint" or self.name == "TXTwoPointPixel": if self.rank == 0: print(f"Pickling result to {pickle_filename}") with open(pickle_filename, "wb") as f: pickle.dump(result, f) return result def prepare_patches(self, calcs, meta): """ For each catalog to be generated, have one process load the catalog and write its patch files out to disc. These are then re-used later by all the different processes. Parameters ---------- calcs: list A list of (bin1, bin2, bin_type) where bin1 and bin2 are indices or bin labels and bin_type is one of the constants SHEAR_SHEAR, SHEAR_POS, or POS_POS. meta: dict A dict to which the number of patches (or zero, if no patches) will be added for each catalog type, with keys "npatch_shear", "npatch_pos", and "npatch_ran". """ # Make the full list of catalogs to run cats = set() # Use shear-shear and pos-pos only here as they represent # catalogs not pairs. for i, j, k in calcs: if k == SHEAR_SHEAR: cats.add((i, SHEAR_SHEAR)) cats.add((j, SHEAR_SHEAR)) elif k == SHEAR_POS: cats.add((i, SHEAR_SHEAR)) cats.add((j, POS_POS)) elif k == POS_POS: cats.add((i, POS_POS)) cats.add((j, POS_POS)) cats = list(cats) cats.sort(key=str) chunk_rows = self.config["chunk_rows"] npatch_shear = 0 npatch_pos = 0 npatch_ran = 0 self.empty_patch_exists = {} # Parallelization is now done at the patch level for h, k in cats: ktxt = "shear" if k == SHEAR_SHEAR else "position" print(f"Rank {self.rank} making patches for {ktxt} catalog bin {h}") # For shear we just have the one catalog. For position we may # have randoms also. We explicitly delete catalogs after loading # them to ensure we don't have two in memory at once. if k == SHEAR_SHEAR: cat = self.get_shear_catalog(h) npatch_shear, contains_empty = PatchMaker.run(cat, chunk_rows, self.comm) self.empty_patch_exists[cat.save_patch_dir] = contains_empty del cat else: cat = self.get_lens_catalog(h) npatch_pos, contains_empty = PatchMaker.run(cat, chunk_rows, self.comm) self.empty_patch_exists[cat.save_patch_dir] = contains_empty del cat ran_cat = self.get_random_catalog(h) # support use_randoms = False if ran_cat is None: continue npatch_ran, contains_empty = PatchMaker.run(ran_cat, chunk_rows, self.comm) self.empty_patch_exists[ran_cat.save_patch_dir] = contains_empty del ran_cat if self.config["use_subsampled_randoms"]: ran_cat = self.get_subsampled_random_catalog(h) npatch_ran, contains_empty = PatchMaker.run(ran_cat, chunk_rows, self.comm) self.empty_patch_exists[ran_cat.save_patch_dir] = contains_empty del ran_cat meta["npatch_shear"] = npatch_shear meta["npatch_pos"] = npatch_pos meta["npatch_ran"] = npatch_ran # stop other processes progressing to the rest of the code and # trying to load things we have not written yet if self.comm is not None: self.comm.Barrier() def get_patch_dir(self, input_tag, b): """ Select a patch directory for the file with the given input tag and with a bin number/label. To ensure that if you change the catalog the patch dir will also change, the directory path includes the unique ID of the input file. Parameters ---------- input_tag: str One of the tags in the class's inputs attribute b: any An additional label used as the last component in the returned directory Returns ------- str: a directory, which has been created if it did not exist already. """ # start from a user-specified base directory patch_base = self.config["patch_dir"] # append the unique identifier for the parent catalog file with self.open_input(input_tag, wrapper=True) as f: p = f.read_provenance() uuid = p["uuid"] pth = pathlib.Path(f.path).resolve() ctime = os.stat(pth).st_ctime # We expect the input files to be generated within a pipeline and so always # have input files to have a unique ID. But if for some reason it doesn't # have one we handle that too. if uuid == "UNKNOWN": ident = hash(f"{pth}{ctime}").to_bytes(8, "big", signed=True).hex() name = f"{input_tag}_{ident}" else: name = f"{input_tag}_{uuid}" # Include a tag for the current stage name, so that # if we are running several subclasses at the same time # they don't interfere with each other. This is a waste of # disc space, but hopefully we are not short of that. if not self.config["share_patch_files"]: name = self.instance_name + name # And finally append the bin name or number patch_dir = pathlib.Path(patch_base) / name / str(b) # Make the directory and return it pathlib.Path(patch_dir).mkdir(exist_ok=True, parents=True) return patch_dir def get_shear_catalog(self, i): import treecorr # Load and calibrate the appropriate bin data cat = treecorr.Catalog( self.get_input("binned_shear_catalog"), ext=f"/shear/bin_{i}", g1_col="g1", g2_col="g2", ra_col="ra", dec_col="dec", w_col="weight", ra_units="degree", dec_units="degree", patch_centers=self.get_input("patch_centers"), save_patch_dir=self.get_patch_dir("binned_shear_catalog", i), flip_g1=self.config["flip_g1"], flip_g2=self.config["flip_g2"], ) return cat def get_lens_catalog(self, i): import treecorr # Load and calibrate the appropriate bin data cat = treecorr.Catalog( self.get_input("binned_lens_catalog"), ext=f"/lens/bin_{i}", ra_col="ra", dec_col="dec", w_col="weight", ra_units="degree", dec_units="degree", patch_centers=self.get_input("patch_centers"), save_patch_dir=self.get_patch_dir("binned_lens_catalog", i), ) return cat def get_random_catalog(self, i): import treecorr if not self.config["use_randoms"]: return None rancat = treecorr.Catalog( self.get_input("binned_random_catalog"), ext=f"/randoms/bin_{i}", ra_col="ra", dec_col="dec", ra_units="degree", dec_units="degree", patch_centers=self.get_input("patch_centers"), save_patch_dir=self.get_patch_dir("binned_random_catalog", i), ) return rancat def get_subsampled_random_catalog(self, i): import treecorr if not self.config["use_randoms"]: return None rancat = treecorr.Catalog( self.get_input("binned_random_catalog_sub"), ext=f"/randoms/bin_{i}", ra_col="ra", dec_col="dec", ra_units="degree", dec_units="degree", patch_centers=self.get_input("patch_centers"), save_patch_dir=self.get_patch_dir("binned_random_catalog_sub", i), ) return rancat def touch_patches(self, cat): # If any patches were empty for this cat # run get_patches on rank 0 and bcast # this will re-make patches but prevents processes conflicting # in the gg.process # If no patches are empty returns the cat, unaltered if cat is None: return cat if self.empty_patch_exists[cat.save_patch_dir]: if self.rank == 0: cat.get_patches() if self.comm is not None: cat = self.comm.bcast(cat, root=0) return cat def calculate_shear_shear(self, i, j): import treecorr cat_i = self.get_shear_catalog(i) cat_i = self.touch_patches(cat_i) n_i = cat_i.nobj if i == j: cat_j = None n_j = n_i else: cat_j = self.get_shear_catalog(j) cat_j = self.touch_patches(cat_j) n_j = cat_j.nobj if self.rank == 0: print(f"Calculating shear-shear bin pair ({i},{j}): {n_i} x {n_j} objects using MPI") if n_i == 0 or n_j == 0: if self.rank == 0: print("Empty catalog: returning None") return None gg = treecorr.GGCorrelation(self.config) t1 = perf_counter() gg.process(cat_i, cat_j, low_mem=self.config["low_mem"], comm=self.comm) t2 = perf_counter() if self.rank == 0: print(f"Processing took {t2 - t1:.1f} seconds") return gg def calculate_shear_pos(self, i, j): import treecorr cat_i = self.get_shear_catalog(i) cat_i = self.touch_patches(cat_i) n_i = cat_i.nobj cat_j = self.get_lens_catalog(j) cat_j = self.touch_patches(cat_j) rancat_j = self.get_random_catalog(j) rancat_j = self.touch_patches(rancat_j) n_j = cat_j.nobj n_rand_j = rancat_j.nobj if rancat_j is not None else 0 if self.rank == 0: print(f"Calculating shear-position bin pair ({i},{j}): {n_i} x {n_j} objects, {n_rand_j} randoms") if n_i == 0 or n_j == 0: if self.rank == 0: print("Empty catalog: returning None") return None ng = treecorr.NGCorrelation(self.config) t1 = perf_counter() ng.process(cat_j, cat_i, comm=self.comm, low_mem=self.config["low_mem"]) if rancat_j: rg = treecorr.NGCorrelation(self.config) rg.process(rancat_j, cat_i, comm=self.comm, low_mem=self.config["low_mem"]) else: rg = None ng.calculateXi(rg=rg) t2 = perf_counter() if self.rank == 0: print(f"Processing took {t2 - t1:.1f} seconds") return ng def calculate_pos_pos(self, i, j): import treecorr cat_i = self.get_lens_catalog(i) cat_i = self.touch_patches(cat_i) rancat_i = self.get_random_catalog(i) rancat_i = self.touch_patches(rancat_i) n_i = cat_i.nobj n_rand_i = rancat_i.nobj if rancat_i is not None else 0 if i == j: cat_j = None rancat_j = rancat_i n_j = n_i n_rand_j = n_rand_i else: cat_j = self.get_lens_catalog(j) cat_j = self.touch_patches(cat_j) rancat_j = self.get_random_catalog(j) rancat_j = self.touch_patches(rancat_j) n_j = cat_j.nobj n_rand_j = rancat_j.nobj if self.config["use_subsampled_randoms"]: rancat_sub_i = self.get_subsampled_random_catalog(i) rancat_sub_i = self.touch_patches(rancat_sub_i) n_rand_sub_i = rancat_sub_i.nobj if rancat_sub_i is not None else 0 if i == j: rancat_sub_j = rancat_sub_i n_rand_sub_j = n_rand_sub_i else: rancat_sub_j = self.get_subsampled_random_catalog(j) rancat_sub_j = self.touch_patches(rancat_sub_j) n_rand_sub_j = rancat_sub_j.nobj if rancat_sub_j is not None else 0 if self.rank == 0: print( f"Calculating position-position bin pair ({i}, {j}): {n_i} x {n_j} objects, {n_rand_i} x {n_rand_j} randoms" ) if self.config["use_subsampled_randoms"]: print(f"and for the rr term, {n_rand_sub_i} x {n_rand_sub_j} pairs") if n_i == 0 or n_j == 0: if self.rank == 0: print("Empty catalog: returning None") return None t1 = perf_counter() nn = treecorr.NNCorrelation(self.config) nn.process(cat_i, cat_j, comm=self.comm, low_mem=self.config["low_mem"]) nr = treecorr.NNCorrelation(self.config) nr.process(cat_i, rancat_j, comm=self.comm, low_mem=self.config["low_mem"]) # The next calculation is faster if we explicitly tell TreeCorr # that its two catalogs here are the same one. if i == j: rancat_j = None rancat_sub_j = None rr = treecorr.NNCorrelation(self.config) if self.config["use_subsampled_randoms"]: rr.process(rancat_sub_i, rancat_sub_j, comm=self.comm, low_mem=self.config["low_mem"]) else: rr.process(rancat_i, rancat_j, comm=self.comm, low_mem=self.config["low_mem"]) if i == j: rn = None else: rn = treecorr.NNCorrelation(self.config) rn.process(rancat_i, cat_j, comm=self.comm, low_mem=self.config["low_mem"]) t2 = perf_counter() nn.calculateXi(rr=rr, dr=nr, rd=rn) if self.rank == 0: print(f"Processing took {t2 - t1:.1f} seconds") return nn def read_metadata(self): meta_data = self.open_input("tracer_metadata") area = meta_data["tracers"].attrs["area"] meta = {} meta["area"] = area try: sigma_e = meta_data["tracers/sigma_e"][:] N_eff = meta_data["tracers/N_eff"][:] mean_e1 = meta_data["tracers/mean_e1"][:] mean_e2 = meta_data["tracers/mean_e2"][:] meta["neff"] = N_eff meta["area"] = area meta["sigma_e"] = sigma_e meta["mean_e1"] = mean_e1 meta["mean_e2"] = mean_e2 except KeyError: # will happen for lens only runs pass return meta
[docs] class TXTwoPointPixel(TXTwoPoint): """ This subclass of the standard TXTwoPoint uses maps to compute pixelized versions of the real space correlation functions. This is useful when the number density of the galaxy samples is too high to use random points to sample the mask. """ name = "TXTwoPointPixel" inputs = [ ("density_maps", MapsFile), ("source_maps", MapsFile), ("binned_shear_catalog", ShearCatalog), ("binned_lens_catalog", HDFFile), ("binned_random_catalog", HDFFile), ("shear_photoz_stack", QPNOfZFile), ("lens_photoz_stack", QPNOfZFile), ("patch_centers", TextFile), ("tracer_metadata", HDFFile), ("mask", MapsFile), ] outputs = [("twopoint_data_real_raw", SACCFile), ("twopoint_gamma_x", SACCFile)] # Add values to the config file that are not previously defined config_options = TREECORR_CONFIG | { # TODO: Allow more fine-grained selection of 2pt subsets to compute "calcs": StageParameter( list, [0, 1, 2], msg="Which calculations to perform: 0=shear-shear, 1=shear-position, 2=position-position" ), "source_bins": StageParameter(list, [-1], msg="List of source bins to use (-1 means all)"), "lens_bins": StageParameter(list, [-1], msg="List of lens bins to use (-1 means all)"), "reduce_randoms_size": StageParameter(float, 1.0, msg="Factor to reduce the size of random catalogs"), "do_shear_shear": StageParameter(bool, True, msg="Whether to compute shear-shear correlations"), "do_shear_pos": StageParameter(bool, True, msg="Whether to compute shear-position correlations"), "do_pos_pos": StageParameter(bool, True, msg="Whether to compute position-position correlations"), "var_method": StageParameter(str, "jackknife", msg="Method for computing variance (jackknife, sample, etc.)"), "low_mem": StageParameter(bool, False, msg="Whether to use low memory mode"), "patch_dir": StageParameter(str, "./cache/patches", msg="Directory for storing patch files"), "chunk_rows": StageParameter(int, 100_000, msg="Number of rows to process in each chunk"), "share_patch_files": StageParameter(bool, False, msg="Whether to share patch files across processes"), "metric": StageParameter(str, "Euclidean", msg="Distance metric to use (Euclidean, Arc, etc.)"), "use_randoms": StageParameter(bool, True, msg="Whether to use random catalogs"), "auto_only": StageParameter(bool, False, msg="Whether to compute only auto-correlations"), "gaussian_sims_factor": StageParameter( list, default=[1.0], msg="Factor by which to decrease lens density to account for increased density contrast.", ), "use_subsampled_randoms": StageParameter( bool, False, msg="Use subsampled randoms file for RR calculation (not used for pixel estimator)" ), } def get_density_map(self, i): import treecorr with self.open_input("density_maps", wrapper=True) as f: info = f.read_map_info(f"delta_{i}") map_d = f.read_map(f"delta_{i}") pix = map_d.valid_pixels print(f"Loaded {i} overdensity maps") # Read the mask to get fracdet weights with self.open_input("mask", wrapper=True) as f: mask = f.read_map("mask") scheme = choose_pixelization(**info) ra_pix, dec_pix = scheme.pix2ang(pix) # Load and calibrate the appropriate bin data cat = treecorr.Catalog( ra=ra_pix, dec=dec_pix, w=mask[pix], # weight pixels by their fracdet k=map_d[pix], ra_units="degree", dec_units="degree", patch_centers=self.get_input("patch_centers"), ) return cat def get_shear_map(self, i): import treecorr import pdb with self.open_input("source_maps", wrapper=True) as f: info_g1 = f.read_map_info(f"g1_{i}") map_g1 = f.read_map(f"g1_{i}") pix_g1 = map_g1.valid_pixels print(f"Loaded shear 1 {i} maps") info_g2 = f.read_map_info(f"g2_{i}") map_g2 = f.read_healpix(f"g2_{i}") pix_g2 = map_g2.valid_pixels print(f"Loaded shear 2 {i} maps") scheme = choose_pixelization(**info_g1) ra_pix, dec_pix = scheme.pix2ang(pix_g1) mask_unseen = (map_g1[pix_g1] > -1e30) * (map_g2[pix_g2] > -1e30) cat = treecorr.Catalog( ra=ra_pix[mask_unseen], dec=dec_pix[mask_unseen], # w=, g1=map_g1[pix_g1][mask_unseen], g2=map_g2[pix_g2][mask_unseen], ra_units="degree", dec_units="degree", patch_centers=self.get_input("patch_centers"), flip_g1=self.config["flip_g1"], flip_g2=self.config["flip_g2"], ) return cat def calculate_shear_pos(self, i, j): import treecorr cat_i = self.get_shear_map(i) cat_j = self.get_density_map(j) if self.rank == 0: print(f"Calculating shear-position bin pair ({i},{j}).") kg = treecorr.KGCorrelation(self.config) t1 = perf_counter() kg.process(cat_j, cat_i, comm=self.comm, low_mem=self.config["low_mem"]) return kg def calculate_pos_pos(self, i, j): import treecorr cat_i = self.get_density_map(i) if i == j: cat_j = cat_i else: cat_j = self.get_density_map(j) if self.rank == 0: print(f"Calculating position-position bin pair ({i}, {j})") t1 = perf_counter() kk = treecorr.KKCorrelation(self.config) kk.process(cat_i, cat_j, comm=self.comm, low_mem=self.config["low_mem"]) return kk
[docs] class TXTwoPointPixelExtCross(TXTwoPointPixel): """ TXTwoPointPixel - External - Cross correlation This subclass of TXTwoPointPixel uses maps to compute cross correlations between the galaxy density maps and an external set of Survey Property (or other contaminant) maps """ name = "TXTwoPointPixelExtCross" inputs = [ ("density_maps", MapsFile), ("source_maps", MapsFile), ("binned_shear_catalog", ShearCatalog), ("binned_lens_catalog", HDFFile), ("binned_random_catalog", HDFFile), ("shear_photoz_stack", QPNOfZFile), ("lens_photoz_stack", QPNOfZFile), ("patch_centers", TextFile), ("tracer_metadata", HDFFile), ("mask", MapsFile), ] outputs = [("twopoint_data_ext_cross_raw", SACCFile)] # Add values to the config file that are not previously defined config_options = TREECORR_CONFIG | { "supreme_path_root": StageParameter(str, "", msg="Root path for supreme files"), "do_pos_ext": StageParameter(bool, True, msg="Whether to compute position-external correlations"), "do_shear_ext": StageParameter(bool, True, msg="Whether to compute shear-external correlations"), # TODO: Remove any unnesesary config options here "calcs": StageParameter( list, [0, 1, 2], msg="Which calculations to perform: 0=shear-shear, 1=shear-position, 2=position-position" ), "source_bins": StageParameter(list, [-1], msg="List of source bins to use (-1 means all)"), "lens_bins": StageParameter(list, [-1], msg="List of lens bins to use (-1 means all)"), "reduce_randoms_size": StageParameter(float, 1.0, msg="Factor to reduce the size of random catalogs"), "do_shear_shear": StageParameter(bool, False, msg="Whether to compute shear-shear correlations"), "do_shear_pos": StageParameter(bool, False, msg="Whether to compute shear-position correlations"), "do_pos_pos": StageParameter(bool, False, msg="Whether to compute position-position correlations"), "var_method": StageParameter(str, "jackknife", msg="Method for computing variance (jackknife, sample, etc.)"), "low_mem": StageParameter(bool, False, msg="Whether to use low memory mode"), "patch_dir": StageParameter(str, "./cache/patches", msg="Directory for storing patch files"), "chunk_rows": StageParameter(int, 100_000, msg="Number of rows to process in each chunk"), "share_patch_files": StageParameter(bool, False, msg="Whether to share patch files across processes"), "metric": StageParameter(str, "Euclidean", msg="Distance metric to use (Euclidean, Arc, etc.)"), "use_randoms": StageParameter(bool, True, msg="Whether to use random catalogs"), "auto_only": StageParameter(bool, False, msg="Whether to compute only auto-correlations"), "gaussian_sims_factor": StageParameter( list, default=[1.0], msg="Factor by which to decrease lens density to account for increased density contrast.", ), "use_subsampled_randoms": StageParameter( bool, False, msg="Use subsampled randoms file for RR calculation (not used for pixel estimator)" ), } def get_ext_list(self): import glob root = self.config["supreme_path_root"] sys_files = glob.glob(f"{root}*.hs") return sys_files def select_calculations(self, source_list, lens_list): """ For TXTwoPointPixelExtCross, this method only selects the cross correlations between data and external maps """ calcs = [] # get the list of external map files self.ext_list = self.get_ext_list() ext_list = np.arange(len(self.ext_list)) # For shear-ext if self.config["do_shear_ext"]: k = SHEAR_EXT for i in source_list: for j in ext_list: calcs.append((i, j, k)) if self.config["do_pos_ext"]: k = POS_EXT for i in lens_list: for j in ext_list: calcs.append((i, j, k)) if self.rank == 0: print(f"Running {len(calcs)} calculations: {calcs}") return calcs def call_treecorr(self, i, j, k): """ call_treecorr is modified for this sub-class to include the external-cross-correlations This is a wrapper for interaction with treecorr. """ import sacc import pickle # TODO: fix up the caching code if self.name == "TXTwoPointPixelExtCross": pickle_filename = self.get_output("twopoint_data_ext_cross_raw") + f".checkpoint-{i}-{j}-{k}.pkl" if os.path.exists(pickle_filename): print(f"{self.rank} WARNING USING THIS PICKLE FILE I FOUND: {pickle_filename}") with open(pickle_filename, "rb") as f: result = pickle.load(f) return result if k == SHEAR_SHEAR: xx = self.calculate_shear_shear(i, j) xtype = "combined" elif k == SHEAR_POS: xx = self.calculate_shear_pos(i, j) xtype = sacc.standard_types.galaxy_shearDensity_xi_t elif k == POS_POS: xx = self.calculate_pos_pos(i, j) xtype = sacc.standard_types.galaxy_density_xi elif k == POS_EXT: assert self.name == "TXTwoPointPixelExtCross" xx = self.calculate_pos_ext(i, j) xtype = sacc.build_data_type_name("galaxy", ["density", "ext"], "xi") elif k == SHEAR_EXT: assert self.name == "TXTwoPointPixelExtCross" xx = self.calculate_shear_ext(i, j) xtype = sacc.build_data_type_name("galaxy", ["shear", "ext"], "xi") else: raise ValueError(f"Unknown correlation function {k}") # Force garbage collection here to make sure all the # catalogs are definitely freed gc.collect() # The measurement object collects the results and type info. # we use it because the ordering will not be simple if we have # parallelized, so it's good to keep explicit track. result = Measurement(xtype, xx, i, j) sys.stdout.flush() if self.comm: self.comm.Barrier() if self.name == "TXTwoPointPixelExtCross": if self.rank == 0: print(f"Pickling result to {pickle_filename}") with open(pickle_filename, "wb") as f: pickle.dump(result, f) return result def calculate_pos_ext(self, i, j): import treecorr cat_i = self.get_density_map(i) cat_j = self.get_ext_map(j) if self.rank == 0: print(f"Calculating position-external bin pair ({i}, {j})") t1 = perf_counter() kk = treecorr.KKCorrelation(self.config) kk.process(cat_i, cat_j, comm=self.comm, low_mem=self.config["low_mem"]) return kk def calculate_shear_ext(self, i, j): import treecorr cat_i = self.get_shear_map(i) cat_j = self.get_ext_map(j) if self.rank == 0: print(f"Calculating shear-external bin pair ({i},{j}).") kg = treecorr.KGCorrelation(self.config) t1 = perf_counter() kg.process(cat_j, cat_i, comm=self.comm, low_mem=self.config["low_mem"]) return kg def get_ext_map(self, i): """ get the ith external map (e.g. SP map) from the directory specified in the config """ import treecorr import healsparse import healpy as hp # Read the mask to get fracdet weights with self.open_input("mask", wrapper=True) as f: info = f.read_map_info("mask") mask = f.read_map("mask") pix = mask.valid_pixels nside = mask.nside_sparse # open the input healsparse map sys_files = self.get_ext_list() sys_file = sys_files[i] m = healsparse.HealSparseMap.read(sys_file) m = m.degrade(nside) if info["nest"] == True: map_d = m[pix] else: map_d = m[hp.ring2nest(nside, pix)] # convert SP map to an overdensity mean_data = np.average(map_d, weights=mask[pix]) map_d = map_d / mean_data - 1.0 scheme = choose_pixelization(**info) ra_pix, dec_pix = scheme.pix2ang(pix) # Load and calibrate the appropriate bin data cat = treecorr.Catalog( ra=ra_pix, dec=dec_pix, w=mask[pix], # weight pixels by their fracdet k=map_d, ra_units="degree", dec_units="degree", patch_centers=self.get_input("patch_centers"), ) return cat def add_data_points(self, S, results, tracers_later=False): """ modify add_data_points to know about external map cross correlations and allow tracers_later=True """ import treecorr import sacc XI = "combined" XIP = sacc.standard_types.galaxy_shear_xi_plus XIM = sacc.standard_types.galaxy_shear_xi_minus GAMMAT = sacc.standard_types.galaxy_shearDensity_xi_t GAMMAX = sacc.standard_types.galaxy_shearDensity_xi_x WTHETA = sacc.standard_types.galaxy_density_xi POS_EXT_TYPE = sacc.build_data_type_name("galaxy", ["density", "ext"], "xi") SHEAR_EXT_TYPE = sacc.build_data_type_name("galaxy", ["shear", "ext"], "xi") comb = [] for index, d in enumerate(results): # First the tracers and generic tags if d.corr_type in [XI, XIP, XIM]: tracer1 = f"source_{d.i}" tracer2 = f"source_{d.j}" elif d.corr_type in [GAMMAT, GAMMAX]: tracer1 = f"source_{d.i}" tracer2 = f"lens_{d.j}" elif d.corr_type == WTHETA: tracer1 = f"lens_{d.i}" tracer2 = f"lens_{d.j}" elif d.corr_type == POS_EXT_TYPE: tracer1 = f"lens_{d.i}" tracer2 = f"external_{d.j}" elif d.corr_type == SHEAR_EXT_TYPE: tracer1 = f"source_{d.i}" tracer2 = f"external_{d.j}" else: raise RuntimeError("unrecognised corr_type") # This happens when there is an empty bin. We can't do a covariance # here, or anything useful, really, so we just skip this bin. if d.object is None: continue theta = np.exp(d.object.meanlogr) npair = d.object.npairs weight = d.object.weight # xip / xim is a special case because it has two observables. # the other two are together below if d.corr_type == XI: xip = d.object.xip xim = d.object.xim xiperr = np.sqrt(d.object.varxip) ximerr = np.sqrt(d.object.varxim) n = len(xip) # add all the data points to the sacc for i in range(n): S.add_data_point( XIP, (tracer1, tracer2), xip[i], theta=theta[i], error=xiperr[i], npair=npair[i], weight=weight[i], tracers_later=tracers_later, ) for i in range(n): S.add_data_point( XIM, (tracer1, tracer2), xim[i], theta=theta[i], error=ximerr[i], npair=npair[i], weight=weight[i], tracers_later=tracers_later, ) else: if self.config["gaussian_sims_factor"] != [1.0]: # only for gammat and wtheta, for the gaussian simulations we need to scale the measurements up to correct for # the scaling of the density field when building the simulations. if "lens" in tracer2: if "lens" in tracer1: scaling_factor = ( self.config["gaussian_sims_factor"][int(tracer1[-1])] * self.config["gaussian_sims_factor"][int(tracer2[-1])] ) else: scaling_factor = self.config["gaussian_sims_factor"][int(tracer2[-1])] d.object.xi *= scaling_factor d.object.varxi *= scaling_factor**2 xi = d.object.xi err = np.sqrt(d.object.varxi) n = len(xi) for i in range(n): S.add_data_point( d.corr_type, (tracer1, tracer2), xi[i], theta=theta[i], error=err[i], weight=weight[i], tracers_later=tracers_later, ) # We build up the comb list to get the covariance of it later # in the same order as our data points comb.append(d.object) # Add the covariance. There are several different jackknife approaches # available - see the treecorr docs if treecorr.__version__.startswith("4.2."): if self.rank == 0: print("Using old TreeCorr - covariance may be slow. Consider using 4.3 from github main branch.") cov = treecorr.estimate_multi_cov(comb, self.config["var_method"]) else: if self.rank == 0: print("Using new TreeCorr 4.3 or above") cov = treecorr.estimate_multi_cov(comb, self.config["var_method"], comm=self.comm) S.add_covariance(cov) def write_output(self, source_list, lens_list, meta, results): """ Re-define method to use external cross-correlation outputs """ import sacc import treecorr S = sacc.Sacc() # We include the n(z) data in the output. # So here we load it in and add it to the data # Load the tracer data N(z) from an input file and # copy it to the output, for convenience if source_list: with self.open_input("shear_photoz_stack", wrapper=True) as f: for i in source_list: z, Nz = f.get_bin_n_of_z(i) S.add_tracer("NZ", f"source_{i}", z, Nz) if lens_list: with self.open_input("lens_photoz_stack", wrapper=True) as f: # For both source and lens for i in lens_list: z, Nz = f.get_bin_n_of_z(i) S.add_tracer("NZ", f"lens_{i}", z, Nz) # Now build up the collection of data points, adding them all to # the sacc data one by one. self.add_data_points(S, results, tracers_later=True) # The other processes are only needed for the covariance estimation. # They do a bunch of other stuff here that isn't actually needed, but # it should all be very fast. After this point they are not needed # at all so return if self.rank != 0: return # Our data points may currently be in any order depending on which processes # ran which calculations. Re-order them. S.to_canonical_order() # add list of sp map names to the metadata meta["ext_list"] = self.ext_list self.write_metadata(S, meta) # Finally, save the output to Sacc file S.save_fits(self.get_output("twopoint_data_ext_cross_raw"), overwrite=True)
if __name__ == "__main__": PipelineStage.main()