from .base_stage import PipelineStage
from .data_types import (
TomographyCatalog,
FiducialCosmology,
SACCFile,
MapsFile,
HDFFile,
PhotozPDFFile,
LensingNoiseMaps,
ClusteringNoiseMaps,
PNGFile,
QPNOfZFile,
)
from ceci.config import StageParameter
import numpy as np
import collections
from .utils import choose_pixelization, array_hash
from .utils.theory import theory_3x2pt
import sys
import warnings
import pathlib
# Using the same convention as in twopoint.py
SHEAR_SHEAR = 0
SHEAR_POS = 1
POS_POS = 2
NAMES = {
SHEAR_SHEAR: "shear-shear",
SHEAR_POS: "shear-position",
POS_POS: "position-position",
}
Measurement = collections.namedtuple(
"Measurement",
["corr_type", "l", "value", "noise", "noise_coupled", "win", "i", "j"],
)
[docs]
class TXTwoPointFourier(PipelineStage):
"""
Make Fourier space 3x2pt measurements using NaMaster
This Pipeline Stage computes all auto- and cross-correlations
for a list of tomographic bins, including all galaxy-galaxy,
galaxy-shear and shear-shear power spectra. Sources and lenses
both come from the same shear_catalog and tomography_catalog objects.
The power spectra are computed after deprojecting a number of
systematic-dominated modes, represented as input maps.
In the future we will want to include the following generalizations:
- TODO: specify which cross-correlations in particular to include (e.g. which bin pairs and which source/lens combinations).
- TODO: include flags for rejected objects. Is this included in the tomography_catalog?
- TODO: ell-binning is currently static.
"""
name = "TXTwoPointFourier"
inputs = [
("shear_photoz_stack", QPNOfZFile), # Photoz stack
("lens_photoz_stack", QPNOfZFile), # Photoz stack
("fiducial_cosmology", FiducialCosmology), # For the cosmological parameters
("tracer_metadata", HDFFile), # For density info
("lens_maps", MapsFile),
("source_maps", MapsFile),
("density_maps", MapsFile),
("mask", MapsFile),
("source_noise_maps", LensingNoiseMaps),
("lens_noise_maps", ClusteringNoiseMaps),
]
output_main = "twopoint_data_fourier"
outputs = [(output_main, SACCFile)]
config_options = {
"mask_threshold": StageParameter(float, 0.0, msg="Threshold for masking pixels"),
"flip_g1": StageParameter(bool, False, msg="Whether to flip the sign of g1"),
"flip_g2": StageParameter(bool, False, msg="Whether to flip the sign of g2"),
"cache_dir": StageParameter(str, "./cache/twopoint_fourier", msg="Directory for caching intermediate results"),
"low_mem": StageParameter(bool, False, msg="Whether to use low memory mode"),
"deproject_syst_clustering": StageParameter(
bool, False, msg="Whether to deproject systematic modes from clustering"
),
"systmaps_clustering_dir": StageParameter(str, "", msg="Directory containing systematic maps for clustering"),
"ell_min": StageParameter(int, 100, msg="Minimum ell value for power spectra"),
"ell_max": StageParameter(int, 1500, msg="Maximum ell value for power spectra"),
"n_ell": StageParameter(int, 20, msg="Number of ell bins"),
"ell_spacing": StageParameter(str, "log", msg="Spacing of ell bins (log or linear)"),
"true_shear": StageParameter(bool, False, msg="Whether to use true shear values"),
"analytic_noise": StageParameter(bool, False, msg="Whether to use analytic noise estimates"),
"gaussian_sims_factor": StageParameter(
list,
default=[1.0],
msg="Factor by which to decrease lens density to account for increased density contrast.",
),
"b0": StageParameter(float, 1.0, msg="Galaxy bias parameter"),
"do_shear_shear": StageParameter(bool, True, msg="Whether to compute shear-shear power spectra"),
"do_shear_pos": StageParameter(bool, True, msg="Whether to compute shear-position power spectra"),
"do_pos_pos": StageParameter(bool, True, msg="Whether to compute position-position power spectra"),
"compute_theory": StageParameter(bool, True, msg="Whether to compute theory predictions"),
}
def run(self):
import pymaster
import healpy
import sacc
import pyccl
from .utils.nmt_utils import choose_ell_bins
config = self.config
if self.comm:
self.comm.Barrier()
self.setup_results()
# Generate namaster fields
pixel_scheme, maps, f_sky = self.load_maps()
if self.rank == 0:
print("Loaded maps.")
nbin_source = maps["nbin_source"]
nbin_lens = maps["nbin_lens"]
# Do this at the beginning since its fast and sometimes crashes.
# It is best to avoid losing running time later.
# Load the n(z) values, which are both saved in the output
# file alongside the spectra, and then used to calculate the
# fiducial theory C_ell, which is used in the deprojection calculation
tracer_sacc = self.load_tracers(nbin_source, nbin_lens)
if self.config["compute_theory"]:
with self.open_input("fiducial_cosmology", wrapper=True) as f:
cosmo = f.to_ccl()
theory_ell = np.unique(np.geomspace(self.config["ell_min"], self.config["ell_max"], 100).astype(int))
theory_sacc = theory_3x2pt(cosmo, tracer_sacc, bias=self.config["b0"], smooth=True, ell_values=theory_ell)
else:
theory_sacc = sacc.Sacc()
# Get the complete list of calculations to be done,
# for all the three spectra and all the bin pairs.
# This will be used for parallelization.
calcs = self.select_calculations(nbin_source, nbin_lens)
# Binning scheme, currently chosen from the geometry.
ell_bins = choose_ell_bins(**config)
# Initiate and populate hash_metadata for workspaces
self.hash_metadata = {}
self.populate_hash_metadata(maps, ell_bins)
workspace_cache = self.make_workspaces(maps, calcs, ell_bins)
# If we are rank zero print out some info
if self.rank == 0:
nband = ell_bins.get_n_bands()
ell_effs = ell_bins.get_effective_ells()
print(f"Chosen {nband} ell bin bands with effective ell values and ranges:")
for i in range(nband):
leff = ell_effs[i]
lmin = ell_bins.get_ell_min(i)
lmax = ell_bins.get_ell_max(i)
print(f" {leff:.0f} ({lmin:.0f} - {lmax:.0f})")
# Run the compute power spectra portion in parallel
# This splits the calculations among the parallel bins
# It's not the most optimal way of doing it
# as it's not dynamic, just a round-robin assignment.
for i, j, k in calcs:
self.compute_power_spectra(pixel_scheme, i, j, k, maps, workspace_cache, ell_bins, theory_sacc, f_sky)
if self.rank == 0:
print(f"Collecting results together")
# Pull all the results together to the master process.
self.collect_results()
# Write the collect results out to HDF5.
if self.rank == 0:
self.save_power_spectra(tracer_sacc, nbin_source, nbin_lens)
print("Saved power spectra")
def load_maps(self):
import pymaster as nmt
import healpy
# We will assert that the maps being correlated (density, shear etc)
# must have been generated at the config nside
# This does not apply to the mask which will be degraded at read
nside = self.config["nside"]
# Load the maps from their files.
# First the mask
with self.open_input("mask", wrapper=True) as f:
info = f.read_map_info("mask")
area = info["area"]
f_sky = info["f_sky"]
mask = f.read_mask_healpix(
thresh=self.config["mask_threshold"], degrade_nside=nside
)
if self.rank == 0:
print("Loaded mask")
# Using a flat mask as the clustering weight for now, since I need to know
# how to turn the depth map into a weight
clustering_weight = mask
if self.config["do_shear_shear"] or self.config["do_shear_pos"]:
# Then the shear maps and weights
with self.open_input("source_maps", wrapper=True) as f:
nbin_source = f.file["maps"].attrs["nbin_source"]
g1_maps = [f.read_map_healpix(f"g1_{b}") for b in range(nbin_source)]
g2_maps = [f.read_map_healpix(f"g2_{b}") for b in range(nbin_source)]
lensing_weights = [
f.read_map_healpix(f"lensing_weight_{b}")
for b in range(nbin_source)
]
assert all(healpy.npix2nside(len(m)) == nside for m in g1_maps)
assert all(healpy.npix2nside(len(m)) == nside for m in g2_maps)
assert all(healpy.npix2nside(len(m)) == nside for m in lensing_weights)
if self.rank == 0:
print(f"Loaded 2 x {nbin_source} shear maps")
print(f"Loaded {nbin_source} lensing weight maps")
else:
g1_maps, g2_maps, lensing_weights = [], [], []
nbin_source = 0
if self.config["do_shear_pos"] or self.config["do_pos_pos"]:
# And finally the density maps
with self.open_input("density_maps", wrapper=True) as f:
nbin_lens = f.file["maps"].attrs["nbin_lens"]
d_maps = [f.read_map_healpix(f"delta_{b}") for b in range(nbin_lens)]
assert all(healpy.npix2nside(len(m)) == nside for m in d_maps)
print(f"Loaded {nbin_lens} overdensity maps")
else:
d_maps = []
nbin_lens = 0
# Choose pixelization and read mask and systematics maps
pixel_scheme = choose_pixelization(**info)
if pixel_scheme.name != "healpix":
raise ValueError("TXTwoPointFourier can only run on healpix maps")
if self.rank == 0:
print(f"Unmasked area = {area:.2f} deg^2, fsky = {f_sky:.2e}")
print(f"Nside = {pixel_scheme.nside}")
# Mask any pixels which have the healpix bad value
for g1, g2, lw in zip(g1_maps, g2_maps, lensing_weights):
lw[g1 == healpy.UNSEEN] = 0
lw[g2 == healpy.UNSEEN] = 0
lw[lw == healpy.UNSEEN] = 0
# When running on the CosmoDC2 mocks I've found I need to flip
# both g1 and g2 in order to get both positive galaxy-galaxy lensing
# and shear-shear spectra.
if self.config["flip_g1"]:
for g1 in g1_maps:
w = np.where(g1 != healpy.UNSEEN)
g1[w] *= -1
if self.config["flip_g2"]:
for g2 in g2_maps:
w = np.where(g2 != healpy.UNSEEN)
g2[w] *= -1
# Load HEALPix systematics maps
deproject_syst_clustering = self.config["deproject_syst_clustering"]
if deproject_syst_clustering:
s_maps, nside = self.read_systematics_maps(clustering_weight)
if s_maps is not None:
# reshape systmaps array (needed for NaMaster):
npix = healpy.nside2npix(nside)
n_systmaps = len(s_maps)
s_maps = np.array(s_maps).reshape([n_systmaps, 1, npix])
else:
print("Not using systematics maps for deprojection in NaMaster")
# This seems to be an off-by-one bug or ambiguity in namaster v2.
lmax1 = self.config["ell_max"] - 1
if self.config["do_shear_pos"] or self.config["do_pos_pos"]:
if deproject_syst_clustering:
density_fields = [
(nmt.NmtField(clustering_weight, [d], templates=s_maps, n_iter=0, lmax=lmax1)) for d in d_maps
]
else:
density_fields = [(nmt.NmtField(clustering_weight, [d], n_iter=0, lmax=lmax1)) for d in d_maps]
else:
density_fields = []
if self.config["do_shear_pos"] or self.config["do_shear_shear"]:
lensing_fields = [
(nmt.NmtField(lw, [g1, g2], n_iter=0, lmax=lmax1))
for (lw, g1, g2) in zip(lensing_weights, g1_maps, g2_maps)
]
else:
lensing_fields = []
# Collect together all the maps we will output
maps = {
"dw": clustering_weight,
"lw": lensing_weights,
"g": list(zip(g1_maps, g2_maps)),
"d": d_maps,
"lf": lensing_fields,
"df": density_fields,
"nbin_source": nbin_source,
"nbin_lens": nbin_lens,
}
return pixel_scheme, maps, f_sky
def read_systematics_maps(self, mask_gc):
import healpy as hp
print("Deprojecting systematics maps for number counts")
n_systmaps = 0
s_maps = []
systmaps_clustering_dir = self.config["systmaps_clustering_dir"]
systmaps_path = pathlib.Path(systmaps_clustering_dir)
for systmap in systmaps_path.iterdir():
try:
if systmap.is_file():
if pathlib.Path(systmap).suffix != ".fits":
print(
"Warning: Problem reading systematics map file",
systmap,
"Not a HEALPix .fits file.",
)
warnings.warn("Systematics map file must be a HEALPix .fits file.")
print("Ignoring", systmap)
else:
systmap_file = str(systmap)
self.config[f"clustering_deproject_{n_systmaps}"] = systmap_file # for provenance
print("Reading clustering systematics map file:", systmap_file)
syst_map = hp.read_map(systmap_file, verbose=False)
# normalize map for Namaster
# calculate the mean, accounting for case where mask isn't binary
unmasked = mask_gc > 0.0
mean = (syst_map[unmasked] * mask_gc[unmasked]).sum() / mask_gc[unmasked].sum()
print("Syst map: mean value = ", mean)
# subtract the mean rather than normalise by it, as some systematics will have ~0 mean
# (other pixels can stay as hp.UNSEEN since they are masked anyway)
syst_map[unmasked] -= mean
s_maps.append(syst_map)
n_systmaps += 1
except:
print("Warning: Problem with systematics map file", systmap)
print("Ignoring", systmap)
print("Number of systematics maps read: ", n_systmaps)
if n_systmaps == 0:
print("No systematics maps found. Skipping deprojection.")
s_maps = None
nside = None
self.config["deproject_syst_clustering"] = False
else:
print("Using systematics maps for galaxy number counts.")
# We assume all systematics maps have the same nside
nside = hp.pixelfunc.get_nside(syst_map)
# needed for NaMaster:
s_maps = np.array(s_maps)
return s_maps, nside
def populate_hash_metadata(self, maps, ell_bins):
# Make a hash for the ell binning
self.hash_metadata["ell_hash"] = array_hash(ell_bins.get_effective_ells())
# For map-based case, do the same for the weight maps
# (for catalog-based case, these hashes are dynamically computed in
# get_field)
if "lw" in maps:
lensing_weights = maps["lw"]
for i, lw in enumerate(lensing_weights):
self.hash_metadata[f"mask_source_{i}"] = array_hash(lw)
if "dw" in maps:
dw = maps["dw"]
# Use same mask for all lens bins
for i in range(maps["nbin_lens"]):
self.hash_metadata[f"mask_lens_{i}"] = array_hash(dw)
def make_workspaces(self, maps, calcs, ell_bins):
import pymaster as nmt
from .utils.nmt_utils import WorkspaceCache
# Make the field object
if self.rank == 0:
print("Preparing workspaces")
# load the cache
cache = WorkspaceCache(self.config["cache_dir"], low_mem=self.config["low_mem"])
# Get hash metadata and fields
ell_hash = self.hash_metadata["ell_hash"]
for i, j, k in calcs:
if k == SHEAR_SHEAR:
f1 = self.get_field(maps, i, "shear")
f2 = self.get_field(maps, j, "shear")
h1 = self.hash_metadata[f"mask_source_{i}"]
h2 = self.hash_metadata[f"mask_source_{j}"]
elif k == SHEAR_POS:
f1 = self.get_field(maps, i, "shear")
f2 = self.get_field(maps, j, "pos")
h1 = self.hash_metadata[f"mask_source_{i}"]
h2 = self.hash_metadata[f"mask_lens_{j}"]
else:
f1 = self.get_field(maps, i, "pos")
f2 = self.get_field(maps, j, "pos")
h1 = self.hash_metadata[f"mask_lens_{i}"]
h2 = self.hash_metadata[f"mask_lens_{j}"]
# First we derive a hash which will change whenever either
# the mask changes or the ell binning.
# We combine the hashes of those three objects together
# using xor (python ^ operator), which seems to be standard.
key = h1 ^ ell_hash
# ... except that if we are using the same field twice then
# x ^ x = 0, which causes problems (all the auto-bins would
# have the same key). So we only combine the hashes if the
# fields are different.
if h2 != h1:
key ^= h2
# Check on disc to see if we have one saved already.
space = cache.get(i, j, k, key=key)
# If not, compute it. We will save it later
if space is None:
print(f"Rank {self.rank} computing coupling matrix {i}, {j}, {k}")
space = nmt.NmtWorkspace.from_fields(f1, f2, ell_bins)
else:
print(f"Rank {self.rank} getting coupling matrix {i}, {j}, {k} from cache.")
# This is a bit awkward - we attach the key to the
# object to avoid more book-keeping. This is used inside
# the workspace cache
space.txpipe_key = key
# We now automatically cache everything, non-optionally,
# but to save memory we don't keep them all loaded
cache.put(i, j, k, space)
return cache
def collect_results(self):
if self.comm is None:
return
self.results = self.comm.gather(self.results, root=0)
if self.rank == 0:
# Concatenate results
self.results = sum(self.results, [])
def setup_results(self):
self.results = []
def select_calculations(self, nbins_source, nbins_lens):
# Build up a big list of all the calculations we want to
# perform. We should probably expose this in the configuration
# file so you can skip some.
calcs = []
# For shear-shear we omit pairs with j>i
if self.config["do_shear_shear"]:
k = SHEAR_SHEAR
for i in range(nbins_source):
for j in range(i + 1):
calcs.append((i, j, k))
# For shear-position we use all pairs
if self.config["do_shear_pos"]:
k = SHEAR_POS
for i in range(nbins_source):
for j in range(nbins_lens):
calcs.append((i, j, k))
# For position-position we omit pairs with j>i.
# We do keep cross-pairs, since even though we may not want to
# do parameter estimation with them they are useful diagnostics.
if self.config["do_pos_pos"]:
k = POS_POS
for i in range(nbins_lens):
for j in range(i + 1):
calcs.append((i, j, k))
calcs = [calc for calc in self.split_tasks_by_rank(calcs)]
return calcs
def compute_power_spectra(self, pixel_scheme, i, j, k, maps, workspace_cache, ell_bins, cl_theory, f_sky):
# Compute power spectra
# TODO: now all possible auto- and cross-correlation are computed.
# This should be tunable.
# k refers to the type of measurement we are making
import sacc
import pymaster as nmt
import healpy
CEE = sacc.standard_types.galaxy_shear_cl_ee
CEB = sacc.standard_types.galaxy_shear_cl_eb
CBE = sacc.standard_types.galaxy_shear_cl_be
CBB = sacc.standard_types.galaxy_shear_cl_bb
CdE = sacc.standard_types.galaxy_shearDensity_cl_e
CdB = sacc.standard_types.galaxy_shearDensity_cl_b
Cdd = sacc.standard_types.galaxy_density_cl
type_name = NAMES[k]
print(f"Process {self.rank} calculating {type_name} spectrum for bin pair {i},{j}")
sys.stdout.flush()
if k == SHEAR_SHEAR:
field_i = self.get_field(maps, i, "shear")
field_j = self.get_field(maps, j, "shear")
results_to_use = [(0, CEE), (1, CEB), (2, CBE), (3, CBB)]
elif k == POS_POS:
field_i = self.get_field(maps, i, "pos")
field_j = self.get_field(maps, j, "pos")
results_to_use = [(0, Cdd)]
elif k == SHEAR_POS:
field_i = self.get_field(maps, i, "shear")
field_j = self.get_field(maps, j, "pos")
results_to_use = [(0, CdE), (1, CdB)]
workspace = workspace_cache.get(i, j, k)
# Load mask for calculation of cl_guess and (optionally) analytic noise calculation.
with self.open_input("mask", wrapper=True) as f:
mask = f.read_mask_healpix(
thresh=self.config["mask_threshold"], degrade_nside=self.config["nside"]
)
if self.rank == 0:
print("Loaded mask")
# we are going to subtract the noise afterwards
pcl = nmt.compute_coupled_cell(field_i, field_j)
c = workspace.decouple_cell(pcl)
if self.config["analytic_noise"]:
# noise to subtract (already decoupled)
n_ell, n_ell_coupled = self.compute_noise_analytic(
i, j, k, maps, f_sky, workspace, mask
)
else:
# Get the coupled noise C_ell values to give to the master algorithm
n_ell, n_ell_coupled = self.compute_noise(
i, j, k, ell_bins, maps, workspace
)
if n_ell is None:
n_ell_coupled = np.zeros((c.shape[0], 3 * self.config["nside"]))
n_ell = np.zeros_like(c)
c = c - n_ell
# Deprojection bias (computable for map-based fields only)
if field_i.lite or field_j.lite:
cb = np.zeros_like(c)
else:
cl_guess = nmt.compute_coupled_cell(field_i, field_j) / np.mean(mask * mask)
pclb = nmt.deprojection_bias(field_i, field_j, cl_guess)
cb = workspace.decouple_cell(pclb)
c = c - cb
def window_pixel(ell, nside):
r_theta = 1 / (np.sqrt(3.0) * nside)
x = ell * r_theta
f = 0.532 + 0.006 * (x - 0.5) ** 2
y = f * x
return np.exp(-(y**2) / 2)
# The binning information - effective (mid) ell values and
# the window information
ls = ell_bins.get_effective_ells()
# Current treatment of the beam is incorrect so commented out for now.
# Issue has been opened on GitHub (#344)
c_beam = c # / window_pixel(ls, pixel_scheme.nside) ** 2
print("c_beam, k, i, j", c_beam, k, i, j)
# this has shape n_cls, n_bpws, n_cls, lmax+1
bandpowers = workspace.get_bandpower_windows()
# Save all the results, skipping things we don't want like EB modes
for index, name in results_to_use:
win = bandpowers[index, :, index, :]
self.results.append(
Measurement(
name,
ls,
c_beam[index],
n_ell[index],
n_ell_coupled[index],
win,
i,
j,
)
)
def get_field(self, maps, i, kind):
# In this class this is very simple, just retrieving a map from
# the dictionary. But we want to avoid loading the full catalogs
# for all of the tomographic bins at once in the sub-class that
# uses them, so we make this a method that it can override to load
# them dynamically.
if kind == "shear":
return maps["lf"][i]
else:
return maps["df"][i]
def compute_noise(self, i, j, k, ell_bins, maps, workspace):
if self.config["true_shear"] and k == SHEAR_SHEAR:
# if using true shear (i.e. no shape noise) we do not need to add any noise for the shear-shear case
return None
import pymaster as nmt
import healpy
# No noise contribution in cross-correlations
if (i != j) or (k == SHEAR_POS):
return None, None
if k == SHEAR_SHEAR:
noise_maps = self.open_input("source_noise_maps", wrapper=True)
weight = maps["lw"][i]
else:
noise_maps = self.open_input("lens_noise_maps", wrapper=True)
weight = maps["dw"]
nreal = noise_maps.number_of_realizations()
noise_c_ells = []
for r in range(nreal):
print(f"Analyzing noise map {r}")
# Load the noise map - may be either (g1, g2)
# or rho1 - rho2
# Also in the event there are any UNSEENs in these,
# downweight them
w = weight.copy()
if k == SHEAR_SHEAR:
realization = noise_maps.read_rotation_healpix(r, i)
w[realization[0] == healpy.UNSEEN] = 0
w[realization[1] == healpy.UNSEEN] = 0
else:
rho1, rho2 = noise_maps.read_density_split_healpix(r, i)
realization = [rho1 - rho2]
w[realization[0] == healpy.UNSEEN] = 0
# Analyze it with namaster
field = nmt.NmtField(w, realization, n_iter=0, lmax=self.config["ell_max"] - 1)
cl_coupled = nmt.compute_coupled_cell(field, field)
# Accumulate
noise_c_ells.append(cl_coupled)
mean_noise = np.mean(noise_c_ells, axis=0)
# Since this is the noise on the half maps the real
# noise C_ell will be (0.5)**2 times the size
if k == POS_POS:
mean_noise /= 4
return workspace.decouple_cell(mean_noise), mean_noise
def compute_noise_analytic(self, i, j, k, maps, f_sky, workspace, mask):
# Copied from the HSC branch
import pymaster
import healpy as hp
if (i != j) or (k == SHEAR_POS):
return None, None
# This bit only works with healpix maps but it's checked beforehand so that's fine
if k == SHEAR_SHEAR:
with self.open_input("source_maps", wrapper=True) as f:
var_map = f.read_map_healpix(f"var_e_{i}")
print("i, j", i, j)
var_map[var_map == hp.UNSEEN] = 0.0
nside = hp.get_nside(var_map)
pxarea = hp.nside2pixarea(nside)
n_ls = np.mean(var_map) * pxarea
# pdb.set_trace()
print("np.mean(var_map) * pxarea", i, j, k, n_ls)
n_ell_coupled = np.zeros((4, 3 * nside))
# Note that N_ell = 0 for ell = 1, 2 for shear
n_ell_coupled[0, 2:] = n_ls
n_ell_coupled[3, 2:] = n_ls
if k == POS_POS:
### New method ###
with self.open_input("lens_maps", wrapper=True) as f:
lens_map = f.read_map_healpix(f"ngal_{i}")
lens_map[lens_map == hp.UNSEEN] = 0.0
nside = hp.get_nside(lens_map)
pxarea = hp.nside2pixarea(nside)
# compute the mean galaxy counts (in pixels above the mask threshold)
above_thresh = mask > 0.0
mu_N = np.sum(lens_map[above_thresh]) / np.sum(mask[above_thresh])
# coupled N_ells:
n_ls = pxarea * np.mean(mask) / mu_N
n_ell_coupled = n_ls * np.ones((1, 3 * nside))
n_ell = workspace.decouple_cell(n_ell_coupled)
return n_ell, n_ell_coupled
def load_tomographic_quantities(self, nbin_source, nbin_lens, f_sky):
# Get lots of bits of metadata from the input file,
# per tomographic bin.
metadata = self.open_input("tracer_metadata")
sigma_e = metadata["tracers/sigma_e"][:]
N_eff = metadata["tracers/N_eff"][:]
lens_counts = metadata["tracers/lens_counts"][:]
metadata.close()
area = 4 * np.pi * f_sky
n_eff = N_eff / area
n_lens = lens_counts / area
tomo_info = {
"area_steradians": area,
"n_eff_steradian": n_eff,
"n_lens_steradian": n_lens,
"sigma_e": sigma_e,
"f_sky": f_sky,
}
warnings.warn("Using unweighted lens samples here")
return tomo_info
def load_tracers(self, nbin_source, nbin_lens):
# Load the N(z) and convert to sacc tracers.
# We need this both to put it into the output file,
# but also potentially to compute the theory guess
# for projecting out modes
import sacc
sacc_data = sacc.Sacc()
if nbin_source > 0:
with self.open_input("shear_photoz_stack", wrapper=True) as f:
for i in range(nbin_source):
z, Nz = f.get_bin_n_of_z(i)
sacc_data.add_tracer("NZ", f"source_{i}", z, Nz)
if nbin_lens > 0:
with self.open_input("lens_photoz_stack", wrapper=True) as f:
for i in range(nbin_lens):
z, Nz = f.get_bin_n_of_z(i)
sacc_data.add_tracer("NZ", f"lens_{i}", z, Nz)
return sacc_data
def save_power_spectra(self, tracer_sacc, nbin_source, nbin_lens):
import sacc
from sacc.windows import BandpowerWindow
CEE = sacc.standard_types.galaxy_shear_cl_ee
CEB = sacc.standard_types.galaxy_shear_cl_eb
CBE = sacc.standard_types.galaxy_shear_cl_be
CBB = sacc.standard_types.galaxy_shear_cl_bb
CdE = sacc.standard_types.galaxy_shearDensity_cl_e
CdB = sacc.standard_types.galaxy_shearDensity_cl_b
Cdd = sacc.standard_types.galaxy_density_cl
S = tracer_sacc.copy()
# We have saved the results in a big list. Each entry contains a single
# bin pair and spectrum type, but many data points at different angles.
# Here we pull them all out to add to sacc
for d in self.results:
tracer1 = f"source_{d.i}" if d.corr_type in [CEE, CEB, CBE, CBB, CdE, CdB] else f"lens_{d.i}"
tracer2 = f"source_{d.j}" if d.corr_type in [CEE, CEB, CBE, CBB] else f"lens_{d.j}"
n = len(d.l)
# The full set of ell values (unbinned), used to store the
# bandpowers
ell_full = np.arange(d.win.shape[1])
win = BandpowerWindow(ell_full, d.win.T)
if self.config["gaussian_sims_factor"] != [1.0] and "lens" in tracer2:
print(
"ATTENTION: We are multiplying the measurement and the noise saved in the sacc file by the gaussian sims factor."
)
value = d.value * self.config["gaussian_sims_factor"][int(tracer2[-1])]
noise = d.noise * self.config["gaussian_sims_factor"][int(tracer2[-1])]
if "lens" in tracer1:
value *= self.config["gaussian_sims_factor"][int(tracer1[-1])]
noise *= self.config["gaussian_sims_factor"][int(tracer1[-1])]
else:
value = d.value
noise = d.noise
for i in range(n):
# We use optional tags i and j here to record the bin indices, as well
# as in the tracer names, in case it helps to select on them later.
S.add_data_point(
d.corr_type,
(tracer1, tracer2),
value[i],
ell=d.l[i],
window=win,
i=d.i,
j=d.j,
n_ell=noise[i],
window_ind=i,
)
# Add n_ell_coupled to tracer metadata. This will work as far as
# the coupled noise is constant
tr = S.tracers[tracer1]
if (tracer1 == tracer2) and ("n_ell_coupled" not in tr.metadata):
if self.config["analytic_noise"] is False:
# If computed through simulations, it might be better to
# take the mean since, for now, only a float can be passed
i = 0 if "lens" in tracer1 else 2
tr.metadata["n_ell_coupled"] = np.mean(d.noise_coupled)
else:
# Save the last element because the first one is zero for
# shear
tr.metadata["n_ell_coupled"] = d.noise_coupled[-1]
if self.config["gaussian_sims_factor"] != [1.0] and "lens" in tracer1:
print(tracer1)
print(
"ATTENTION: We are multiplying the coupled noise saved in the sacc file by the gaussian sims factor."
)
print("Original noise:", d.noise_coupled)
tr.metadata["n_ell_coupled"] *= self.config["gaussian_sims_factor"][int(tracer1[-1])] ** 2
# Save provenance information
provenance = self.gather_provenance()
provenance.update(SACCFile.generate_provenance())
for key, value in provenance.items():
if isinstance(value, str) and "\n" in value:
values = value.split("\n")
for i, v in enumerate(values):
S.metadata[f"provenance/{key}_{i}"] = v
else:
S.metadata[f"provenance/{key}"] = value
# Save hashes needed to recover workspace hash
for k, v in self.hash_metadata.items():
S.metadata[f"hash/{k}"] = v
if self.config["cache_dir"]:
S.metadata["cache_dir"] = self.config["cache_dir"]
# Adding binning information to pass later to TJPCov. I shouldn't be
# necessary, but it is at the moment.
S.metadata["binning/ell_min"] = self.config["ell_min"]
S.metadata["binning/ell_max"] = self.config["ell_max"]
S.metadata["binning/ell_spacing"] = self.config["ell_spacing"]
S.metadata["binning/n_ell"] = self.config["n_ell"]
# Get output name (will be different if subclass used)
output = self.output_main
# And we're all done!
output_filename = self.get_output(output)
S.save_fits(output_filename, overwrite=True)
class TXTwoPointFourierCatalog(TXTwoPointFourier):
"""
Make Fourier space 3x2pt measurements directly from catalogues using NaMaster
This subclass of TXTwoPointFourier computes the angular power spectra
directly from the catalogs using the catalog-based pseudo-Cl formalism,
instead of requiring that maps of the relevant fields be created in advance.
Template deprojection can still be used to mitigate systematic effects if
the systematics templates are provided as input. The mask for galaxy clustering
fields can either be provided directly (in map form), or one can instead
provide a catalog of randoms.
"""
name = "TXTwoPointFourierCatalog"
inputs = [
("binned_shear_catalog", HDFFile),
("binned_lens_catalog", HDFFile),
("binned_random_catalog", HDFFile),
("tracer_metadata", HDFFile),
("shear_photoz_stack", QPNOfZFile), # Photoz stack
("lens_photoz_stack", QPNOfZFile), # Photoz stack
("mask", MapsFile),
("fiducial_cosmology", FiducialCosmology),
("source_maps", MapsFile),
("lens_maps", MapsFile),
]
output_main = "twopoint_data_fourier_cat"
outputs = [(output_main, SACCFile)]
config_options_cat = { # Config specific to catalog-based C_ells
"use_randoms_clustering": StageParameter(
bool, False, msg="Whether to use randoms instead of a map-based mask for clustering"
),
"calc_noise_dp_bias": StageParameter(
bool, False, msg="Whether to analytically compute bias in the shot noise component induced by deprojection."
),
"ell_max_deproj": StageParameter(int, -1, "Maximum multipole to use for mode deprojection."),
}
config_options = TXTwoPointFourier.config_options | config_options_cat
def load_maps(self):
# In this subclass this function is just used for loading the mask (if provided)
# and the clustering systematics templates (if deprojecting).
import pymaster as nmt
import healpy as hp
with self.open_input("binned_lens_catalog", wrapper=True) as f:
nbin_lens = f.file["lens"].attrs["nbin_lens"]
with self.open_input("binned_shear_catalog", wrapper=True) as f:
nbin_source = f.file["shear"].attrs["nbin_source"]
# We will obtain nside from somewhere if needed, but for now set to None
nside = self.config["nside"]
# Load mask for galaxy clustering
if self.config["deproject_syst_clustering"] or not self.config["use_randoms_clustering"]:
with self.open_input("mask", wrapper=True) as f:
info = f.read_map_info("mask")
mask_gc = f.read_mask_healpix(
thresh=self.config["mask_threshold"], degrade_nside=nside
)
if self.rank == 0:
print("Loaded mask")
else:
mask_gc = None
# Load HEALPix systematics maps if required
if self.config["deproject_syst_clustering"]:
s_maps, nside = self.read_systematics_maps(mask_gc)
else:
print("Not using systematics maps for deprojection in NaMaster")
s_maps = None
maps = {
"mask_lens": mask_gc,
"nbin_source": nbin_source,
"nbin_lens": nbin_lens,
"systmaps": s_maps,
"nside": nside,
}
print(maps)
# Have to return Nones as placeholders to be consistent with TXTwoPointFourier
return None, maps, None
def get_field(self, maps, i, kind, get_shears=False):
# In this subclass we load the catalog field objects dynamically,
# because they are much larger than the map objects, or can be
# at full LSST scale
import pymaster as nmt
import healpy as hp
# If I don't do the -1 here I get an error in namaster
lmax = self.config["ell_max"] - 1
if kind == "shear":
if self.rank == 0:
print("Loading shear catalog for bin", i)
# TODO: If memory issues do occur, they will probably occur here.
# Need to reduce memory usage.
with self.open_input("binned_shear_catalog") as f:
group = f[f"shear/bin_{i}"]
weight = group["weight"][:]
positions = np.array([group["ra"][:], group["dec"][:]])
if get_shears:
g1 = group["g1"][:] * (-1) ** self.config["flip_g1"]
g2 = group["g2"][:] * (-1) ** self.config["flip_g2"]
shear = [g1, g2]
else:
shear = None
field = nmt.NmtFieldCatalog(positions, weight, shear, lmax=lmax, lmax_mask=lmax, spin=2, lonlat=True)
# MCM depends on positions and weights of sources
self.hash_metadata[f"mask_source_{i}"] = array_hash(positions) ^ array_hash(weight)
else:
# Retrieve templates (in map form, if loaded at all)
templates = maps["systmaps"]
if self.rank == 0:
print("Loading lens catalog for bin", i)
with self.open_input("binned_lens_catalog") as f:
group = f[f"lens/bin_{i}"]
positions = np.array([group["ra"][:], group["dec"][:]])
weight = group["weight"][:]
# Load randoms for this bin if using them
if self.config["use_randoms_clustering"]:
with self.open_input("binned_random_catalog") as f:
group = f[f"randoms/bin_{i}"]
pos_rand = np.array([group["ra"][:], group["dec"][:]])
if "weight" in group.keys():
weight_rand = group["weight"][:]
else:
weight_rand = np.ones_like(pos_rand[0])
if self.config["deproject_syst_clustering"]:
# Get values of each template at position of each random
ipix_rand = hp.ang2pix(maps["nside"], *pos_rand, lonlat=True)
templates = maps["systmaps"][:, ipix_rand]
# No need for mask - set to None
mask_gc = None
# MCM depends on positions and weights of data AND of randoms;
# final hash ensures there is no chance of reusing an existing workspace
# if a future run uses a mask instead of randoms
# TODO: find faster alternative
self.hash_metadata[f"mask_lens_{i}"] = (
array_hash(positions)
^ array_hash(weight)
^ array_hash(pos_rand)
^ array_hash(weight_rand)
^ hash("randoms")
)
else:
pos_rand = None
weight_rand = None
# In this case we do need the mask - retrieve from maps
mask_gc = maps["mask_lens"]
# MCM depends only on the mask
self.hash_metadata[f"mask_lens_{i}"] = array_hash(mask_gc) ^ hash("mask")
# Set lmax_deproj to None if not provided explicitly (will only matter if deprojecting)
if self.config["ell_max_deproj"] < 0:
lmax_deproj = None
else:
lmax_deproj = self.config["ell_max_deproj"]
field = nmt.NmtFieldCatalogClustering(
positions,
weight,
positions_rand=pos_rand,
weights_rand=weight_rand,
lmax=lmax,
mask=mask_gc,
lmax_mask=lmax,
templates=templates,
lmax_deproj=lmax_deproj,
lonlat=True,
calculate_noise_dp_bias=self.config["calc_noise_dp_bias"],
)
return field
def compute_power_spectra(self, pixel_scheme, i, j, k, maps, workspace_cache, ell_bins, cl_theory, f_sky):
# Compute power spectra
# TODO: now all possible auto- and cross-correlation are computed.
# This should be tunable.
# k refers to the type of measurement we are making
import sacc
import pymaster as nmt
import healpy
CEE = sacc.standard_types.galaxy_shear_cl_ee
CEB = sacc.standard_types.galaxy_shear_cl_eb
CBE = sacc.standard_types.galaxy_shear_cl_be
CBB = sacc.standard_types.galaxy_shear_cl_bb
CdE = sacc.standard_types.galaxy_shearDensity_cl_e
CdB = sacc.standard_types.galaxy_shearDensity_cl_b
Cdd = sacc.standard_types.galaxy_density_cl
type_name = NAMES[k]
print(f"Process {self.rank} calculating {type_name} spectrum for bin pair {i},{j}")
sys.stdout.flush()
if k == SHEAR_SHEAR:
field_i = self.get_field(maps, i, "shear", get_shears=True)
field_j = self.get_field(maps, j, "shear", get_shears=True)
results_to_use = [(0, CEE), (1, CEB), (2, CBE), (3, CBB)]
elif k == POS_POS:
field_i = self.get_field(maps, i, "pos", get_shears=True)
field_j = self.get_field(maps, j, "pos", get_shears=True)
results_to_use = [(0, Cdd)]
elif k == SHEAR_POS:
field_i = self.get_field(maps, i, "shear", get_shears=True)
field_j = self.get_field(maps, j, "pos", get_shears=True)
results_to_use = [(0, CdE), (1, CdB)]
workspace = workspace_cache.get(i, j, k)
# Cannot use compute_full_master here as deproj. bias not implemented for cat-based C_ells
pcl = nmt.compute_coupled_cell(field_i, field_j)
# Noise is automatically computed analytically and subtracted for cat-based C_ells,
# but noise deprojection bias needs subtracting manually
noise_db = 0
if i == j and k == POS_POS:
n_ell_coupled = field_i.Nf * np.ones((len(results_to_use), self.config["ell_max"]))
if self.config["calc_noise_dp_bias"] and self.config["deproject_syst_clustering"]:
noise_db = field_i.get_noise_deprojection_bias()
else:
n_ell_coupled = np.zeros((len(results_to_use), self.config["ell_max"]))
n_ell = workspace.decouple_cell(n_ell_coupled)
c = workspace.decouple_cell(pcl - noise_db)
# The binning information - effective (mid) ell values and
# the window information
ls = ell_bins.get_effective_ells()
print("c_beam, k, i, j", c, k, i, j)
# this has shape n_cls, n_bpws, n_cls, lmax+1
bandpowers = workspace.get_bandpower_windows()
# Save all the results, skipping things we don't want like EB modes
for index, name in results_to_use:
win = bandpowers[index, :, index, :]
self.results.append(
Measurement(
name,
ls,
c[index],
n_ell[index],
n_ell_coupled[index],
win,
i,
j,
)
)
if __name__ == "__main__":
PipelineStage.main()