from ceci.config import StageParameter
from .base_stage import PipelineStage
from .data_types import (
ShearCatalog,
HDFFile,
FiducialCosmology,
SACCFile,
YamlFile,
MapsFile,
)
from .twopoint_fourier import TXTwoPointFourier
import numpy as np
import warnings
import os
import pickle
import sys
# require TJPCov to be in PYTHONPATH
d2r = np.pi / 180
sq_deg_on_sky = 360**2 / np.pi
# Needed changes: 1) ell and theta spacing could be further optimized 2) coupling matrix
[docs]
class TXFourierGaussianCovariance(PipelineStage):
"""
Compute a Gaussian Fourier-space covariance with TJPCov using f_sky only
It imports TJPCov to do so, and runs at a fiducial cosmology.
This version does not account for mask geometry, only the total sky area
measured.
"""
name = "TXFourierGaussianCovariance"
parallel = False
do_xi = False
inputs = [
("fiducial_cosmology", FiducialCosmology), # For the cosmological parameters
("twopoint_data_fourier", SACCFile), # For the binning information
("tracer_metadata", HDFFile), # For metadata
]
outputs = [
("summary_statistics_fourier", SACCFile),
]
config_options = {
"pickled_wigner_transform": StageParameter(str, default="", msg="Path to pickled Wigner transform."),
"use_true_shear": StageParameter(bool, default=False, msg="Use true shear values."),
"galaxy_bias": StageParameter(list, default=[0.0], msg="Galaxy bias values, or one zero for unit bias."),
"gaussian_sims_factor": StageParameter(
list,
default=[1.0],
msg="Factor by which to decrease lens density to account for increased density contrast.",
),
}
def run(self):
import pyccl as ccl
import sacc
import tjpcov
import threadpoolctl
# read the fiducial cosmology
cosmo = self.read_cosmology()
# read binning
two_point_data = self.read_sacc()
# read the n(z) and f_sky from the source summary stats
meta = self.read_number_statistics()
# Binning choices. The ell binning is a linear piece with all the
# integer values up to 500 -- these are from firecrown, might need
# to change later
meta["ell"] = np.concatenate(
(
np.linspace(2, 500 - 1, 500 - 2),
np.logspace(np.log10(500), np.log10(6e4), 500),
)
)
# Theta binning - log spaced between 1 .. 300 arcmin.
meta["theta"] = np.logspace(np.log10(1 / 60), np.log10(300.0 / 60), 3000)
# C_ell covariance
cov = self.compute_covariance(cosmo, meta, two_point_data=two_point_data)
self.save_outputs(two_point_data, cov)
def save_outputs(self, two_point_data, cov):
filename = self.get_output("summary_statistics_fourier")
two_point_data.add_covariance(cov, overwrite=True)
two_point_data.save_fits(filename, overwrite=True)
def read_cosmology(self):
return self.open_input("fiducial_cosmology", wrapper=True).to_ccl()
def read_sacc(self):
import sacc
f = self.get_input("twopoint_data_fourier")
two_point_data = sacc.Sacc.load_fits(f)
# Remove the data types that we won't use for inference
mask = [
two_point_data.indices(sacc.standard_types.galaxy_shear_cl_ee),
two_point_data.indices(sacc.standard_types.galaxy_shearDensity_cl_e),
two_point_data.indices(sacc.standard_types.galaxy_density_cl),
# not doing b-modes, do we want to?
]
print("Length before cuts = ", len(two_point_data))
mask = np.concatenate(mask)
two_point_data.keep_indices(mask)
print("Length after cuts = ", len(two_point_data))
two_point_data.to_canonical_order()
return two_point_data
def read_number_statistics(self):
input_data = self.open_input("tracer_metadata")
# per-bin quantities
N_eff = input_data["tracers/N_eff"][:] # for sources
N_lens = input_data["tracers/lens_counts"][:]
# For the gaussian sims, lambda = nbar(1+b*delta),
# instead of lambda = nbar(1+delta), where delta is the density contrast field.
# Then, we need to scale up the shot noise term for the lenses
# in the covariance for the same b factor.
# Here we decrease the number density for this factor, since shot noise term is 1/nbar.
print("N_lens:", N_lens)
N_lens = N_lens / np.array(self.config["gaussian_sims_factor"]) ** 2
if self.config["gaussian_sims_factor"] != [1.0]:
print(
"ATTENTION: We are dividing N_lens by the gaussian sims factor squared:",
np.array(self.config["gaussian_sims_factor"]) ** 2,
)
print("Scaled N_lens is:", N_lens)
if self.config["use_true_shear"]:
nbins = len(input_data["tracers/sigma_e"][:])
sigma_e = np.array([0.0 for i in range(nbins)])
else:
sigma_e = input_data["tracers/sigma_e"][:]
# area in sq deg
area_deg2 = input_data["tracers"].attrs["area"]
area_unit = input_data["tracers"].attrs["area_unit"]
if area_unit != "deg^2":
raise ValueError("Units of area have changed")
input_data.close()
# area in steradians and sky fraction
area = area_deg2 * np.radians(1) ** 2
area_arcmin2 = area_deg2 * 60**2
full_sky = 4 * np.pi
f_sky = area / full_sky
# Density information from counts
n_eff = N_eff / area
n_lens = N_lens / area
# for printing out only
n_eff_arcmin = N_eff / area_arcmin2
n_lens_arcmin = N_lens / area_arcmin2
# Feedback
print(f"area = {area_deg2:.1f} deg^2")
print(f"f_sky: {f_sky}")
print(f"N_eff: {N_eff} (totals)")
print(f"N_lens: {N_lens} (totals)")
print(f"n_eff: {n_eff} / steradian")
print(f" = {np.around(n_eff_arcmin, 2)} / sq arcmin")
print(f"lens density: {n_lens} / steradian")
print(f" = {np.around(n_lens_arcmin, 2)} / arcmin")
# Pass all this back as a dictionary
meta = {
"f_sky": f_sky,
"sigma_e": sigma_e,
"n_eff": n_eff,
"n_lens": n_lens,
}
return meta
def get_tracer_info(self, cosmo, meta, two_point_data):
# Generates CCL tracers from n(z) information in the data file
import pyccl as ccl
ccl_tracers = {}
tracer_noise = {}
for tracer in two_point_data.tracers:
# Pull out the integer corresponding to the tracer index
tracer_dat = two_point_data.get_tracer(tracer)
nbin = int(two_point_data.tracers[tracer].name.split("_")[1])
z = tracer_dat.z.copy().flatten()
nz = tracer_dat.nz.copy().flatten()
# Identify source tracers and gnerate WeakLensingTracer objects
# based on them
if "source" in tracer or "src" in tracer:
sigma_e = meta["sigma_e"][nbin]
n_eff = meta["n_eff"][nbin]
ccl_tracers[tracer] = ccl.WeakLensingTracer(cosmo, dndz=(z, nz)) # CCL automatically normalizes dNdz
tracer_noise[tracer] = sigma_e**2 / n_eff
# or if it is a lens bin then generaete the corresponding
# CCL tracer class
elif "lens" in tracer:
# Get galaxy bias for this sample. Default value = 1.
if self.config["galaxy_bias"] == [0.0]:
b0 = 1
print(f"Using galaxy bias = 1 for {tracer} (since you didn't specify any biases)")
else:
b0 = self.config["galaxy_bias"][nbin]
print(f"Using galaxy bias = {b0} for {tracer}")
b = b0 * np.ones(len(z))
n_gal = meta["n_lens"][nbin]
tracer_noise[tracer] = 1 / n_gal
ccl_tracers[tracer] = ccl.NumberCountsTracer(cosmo, has_rsd=False, dndz=(z, nz), bias=(z, b))
return ccl_tracers, tracer_noise
def get_spins(self, tracer_comb):
# Get the Wigner Transform factors
WT_factors = {}
WT_factors["lens", "source"] = (0, 2)
WT_factors["source", "lens"] = (2, 0) # same as (0,2)
WT_factors["source", "source"] = {"plus": (2, 2), "minus": (2, -2)}
WT_factors["lens", "lens"] = (0, 0)
tracers = []
for i in tracer_comb:
if "lens" in i:
tracers += ["lens"]
if "source" in i:
tracers += ["source"]
return WT_factors[tuple(tracers)]
# compute a single covariance matrix for a given pair of C_ell or xi.
def compute_covariance_block(
self,
cosmo,
meta,
ell_bins,
tracer_comb1=None,
tracer_comb2=None,
ccl_tracers=None,
tracer_Noise=None,
two_point_data=None,
xi_plus_minus1="plus",
xi_plus_minus2="plus",
cache=None,
WT=None,
):
import pyccl as ccl
from tjpcov.wigner_transform import bin_cov
cl = {}
# tracers 1,2,3,4 = tracer_comb1[0], tracer_comb1[1], tracer_comb2[0], tracer_comb2[1]
# In the dicts below we use '13' to indicate C_ell_(1,3), etc.
# This index maps to this usae
reindex = {
(0, 0): 13,
(1, 1): 24,
(0, 1): 14,
(1, 0): 23,
}
ell = meta["ell"]
# Getting all the C_ell that we need, saving the results in a cache
# for later re-use
for i in (0, 1):
for j in (0, 1):
local_key = reindex[(i, j)]
# For symmetric pairs we may have saved the C_ell the other
# way around, so try both keys
cache_key1 = (tracer_comb1[i], tracer_comb2[j])
cache_key2 = (tracer_comb2[j], tracer_comb1[i])
if cache_key1 in cache:
cl[local_key] = cache[cache_key1]
elif cache_key2 in cache:
cl[local_key] = cache[cache_key2]
else:
# If not cached then we must compute
t1 = tracer_comb1[i]
t2 = tracer_comb2[j]
c = ccl.angular_cl(cosmo, ccl_tracers[t1], ccl_tracers[t2], ell)
print("Computed C_ell for ", cache_key1)
cache[cache_key1] = c
cl[local_key] = c
# The shape noise C_ell values.
# These are zero for cross bins and as computed earlier for auto bins
SN = {}
SN[13] = tracer_Noise[tracer_comb1[0]] if tracer_comb1[0] == tracer_comb2[0] else 0
SN[24] = tracer_Noise[tracer_comb1[1]] if tracer_comb1[1] == tracer_comb2[1] else 0
SN[14] = tracer_Noise[tracer_comb1[0]] if tracer_comb1[0] == tracer_comb2[1] else 0
SN[23] = tracer_Noise[tracer_comb1[1]] if tracer_comb1[1] == tracer_comb2[0] else 0
# The overall normalization factor at the front of the matrix
if self.do_xi:
norm = np.pi * 4 * meta["f_sky"]
else:
norm = (2 * ell + 1) * np.gradient(ell) * meta["f_sky"]
# The coupling is an identity matrix at least when we neglect
# the mask
coupling_mat = {}
coupling_mat[1324] = np.eye(len(ell))
coupling_mat[1423] = np.eye(len(ell))
# Initial covariance of C_ell components
cov = {}
cov[1324] = np.outer(cl[13] + SN[13], cl[24] + SN[24]) * coupling_mat[1324]
cov[1423] = np.outer(cl[14] + SN[14], cl[23] + SN[23]) * coupling_mat[1423]
# for shear-shear components we also add a B-mode contribution
first_is_shear_shear = ("source" in tracer_comb1[0]) and ("source" in tracer_comb1[1])
second_is_shear_shear = ("source" in tracer_comb2[0]) and ("source" in tracer_comb2[1])
if self.do_xi and (first_is_shear_shear or second_is_shear_shear):
# this adds the B-mode shape noise contribution.
# We assume B-mode power (C_ell) is 0
Bmode_F = 1
if xi_plus_minus1 != xi_plus_minus2:
# in the cross term, this contribution is subtracted.
# eq. 29-31 of https://arxiv.org/pdf/0708.0387.pdf
Bmode_F = -1
# below the we multiply zero to maintain the shape of the Cl array, these are effectively
# B-modes
cov[1324] += np.outer(cl[13] * 0 + SN[13], cl[24] * 0 + SN[24]) * coupling_mat[1324] * Bmode_F
cov[1423] += np.outer(cl[14] * 0 + SN[14], cl[23] * 0 + SN[23]) * coupling_mat[1423] * Bmode_F
cov["final"] = cov[1423] + cov[1324]
if self.do_xi:
s1_s2_1 = self.get_spins(tracer_comb1)
s1_s2_2 = self.get_spins(tracer_comb2)
# For the shear-shear we have two sets of spins, plus and minus,
# which are returned as a dict, so we need to pull out the one we need
# Otherwise it's just specified as a tuple, e.g. (2,0)
if isinstance(s1_s2_1, dict):
s1_s2_1 = s1_s2_1[xi_plus_minus1]
if isinstance(s1_s2_2, dict):
s1_s2_2 = s1_s2_2[xi_plus_minus2]
# Use these terms to project the covariance from C_ell to xi(theta)
th, cov["final"] = WT.projected_covariance(
ell_cl=ell, s1_s2=s1_s2_1, s1_s2_cross=s1_s2_2, cl_cov=cov["final"]
)
# Normalize
cov["final"] /= norm
# Put the covariance into bins.
# This is optional in the case of a C_ell covariance (only if bins in ell are
# supplied, otherwise the matrix is for each ell value individually). It is
# required for real-space covariances since these are always binned.
if self.do_xi:
thb, cov["final_b"] = bin_cov(r=th / d2r, r_bins=ell_bins, cov=cov["final"])
else:
if ell_bins is not None:
lb, cov["final_b"] = bin_cov(r=ell, r_bins=ell_bins, cov=cov["final"])
return cov
def get_angular_bins(self, cl_sacc):
from .utils.nmt_utils import choose_ell_bins
return choose_ell_bins(**cl_sacc.metadata)
def make_wigner_transform(self, meta):
import threadpoolctl
from tjpcov.wigner_transform import WignerTransform
path = self.config["pickled_wigner_transform"]
if path:
if os.path.exists(path):
print(f"Loading precomputed wigner transform from {path}")
WT = pickle.load(open(path, "rb"))
return WT
else:
print(f"Precomputed wigner transform {path} not found.")
print("Will compute it and then save it.")
# We don't want to use n processes with n threads each by accident,
# where n is the number of CPUs we have
# so for this bit of the code, which uses python's multiprocessing,
# we limit the number of threads that numpy etc can use.
# After this is finished this will switch back to allowing all the CPUs
# to be used for threading instead.
num_processes = int(os.environ.get("OMP_NUM_THREADS", 1))
print("Generating Wigner Transform.")
with threadpoolctl.threadpool_limits(1):
WT = WignerTransform(
ell=meta["ell"],
theta=meta["theta"] * d2r,
s1_s2=[(2, 2), (2, -2), (0, 2), (2, 0), (0, 0)],
ncpu=num_processes,
)
print("Computed Wigner Transform.")
if path:
try:
pickle.dump(WT, open(path, "wb"))
except OSError:
sys.stderr.write(f"Could not save wigner transform to {path}")
return WT
# compute all the covariances and then combine them into one single giant matrix
def compute_covariance(self, cosmo, meta, two_point_data):
from tjpcov.wigner_transform import bin_cov
ccl_tracers, tracer_Noise = self.get_tracer_info(cosmo, meta, two_point_data=two_point_data)
# we will loop over all these
tracer_combs = two_point_data.get_tracer_combinations()
N2pt = len(tracer_combs)
WT = self.make_wigner_transform(meta)
# the bit below is just counting the number of 2pt functions, and accounting
# for the fact that xi needs to be double counted
N2pt0 = 0
if self.do_xi:
N2pt0 = N2pt
tracer_combs_temp = tracer_combs.copy()
for combo in tracer_combs:
if ("source" in combo[0]) and ("source" in combo[1]):
N2pt += 1
tracer_combs_temp += [combo]
tracer_combs = tracer_combs_temp.copy()
ell_bins = self.get_angular_bins(two_point_data)
Nell_bins = len(ell_bins) - 1
cov_full = np.zeros((Nell_bins * N2pt, Nell_bins * N2pt))
count_xi_pm1 = 0
count_xi_pm2 = 0
cl_cache = {}
xi_pm = [
[("plus", "plus"), ("plus", "minus")],
[("minus", "plus"), ("minus", "minus")],
]
print("Total number of 2pt functions:", N2pt)
print("Number of 2pt functions without xim:", N2pt0)
# Look through the chunk of matrix, tracer pair by tracer pair
# Order of the covariance needs to be the cannonical order of saac. For a 3x2pt matrix that is:
# -galaxy_density_xi
# -galaxy_shearDensity_xi_t
# -galaxy_shear_xi_minus
# -galaxy_shear_xi_plus
xim_start = N2pt0 - (N2pt - N2pt0)
xim_end = N2pt0
for i in range(N2pt):
tracer_comb1 = tracer_combs[i]
count_xi_pm1 = 1 if i in range(xim_start, xim_end) else 0
for j in range(i, N2pt):
tracer_comb2 = tracer_combs[j]
print(f"Computing {tracer_comb1} x {tracer_comb2}: chunk ({i},{j}) of ({N2pt},{N2pt})")
count_xi_pm2 = 1 if j in range(xim_start, xim_end) else 0
if (
self.do_xi
and ("source" in tracer_comb1[0] and "source" in tracer_comb1[1])
or ("source" in tracer_comb2[0] and "source" in tracer_comb2[1])
):
cov_ij = self.compute_covariance_block(
cosmo,
meta,
ell_bins,
tracer_comb1=tracer_comb1,
tracer_comb2=tracer_comb2,
ccl_tracers=ccl_tracers,
tracer_Noise=tracer_Noise,
two_point_data=two_point_data,
xi_plus_minus1=xi_pm[count_xi_pm1][count_xi_pm2][0],
xi_plus_minus2=xi_pm[count_xi_pm1][count_xi_pm2][1],
cache=cl_cache,
WT=WT,
)
else:
cov_ij = self.compute_covariance_block(
cosmo,
meta,
ell_bins,
tracer_comb1=tracer_comb1,
tracer_comb2=tracer_comb2,
ccl_tracers=ccl_tracers,
tracer_Noise=tracer_Noise,
two_point_data=two_point_data,
cache=cl_cache,
WT=WT,
)
# Fill in this chunk of the matrix
cov_ij = cov_ij["final_b"]
# Find the right location in the matrix
start_i = i * Nell_bins
start_j = j * Nell_bins
end_i = start_i + Nell_bins
end_j = start_j + Nell_bins
# and fill it in, and the transpose component
cov_full[start_i:end_i, start_j:end_j] = cov_ij
cov_full[start_j:end_j, start_i:end_i] = cov_ij.T
try:
np.linalg.cholesky(cov_full)
except:
print(
"liAnalg.LinAlgError: Covariance not positive definite! "
"Most likely this is a problem in xim. "
"We will continue for now but this needs to be fixed."
)
return cov_full
[docs]
class TXRealGaussianCovariance(TXFourierGaussianCovariance):
"""
Compute a Gaussian real-space covariance with TJPCov using f_sky only
This version does not account for mask geometry, only the total sky area
measured.
It is implemented as a subclass of the Fourier-space version, so also uses
TJPCov and a fiducial cosmology.
"""
name = "TXRealGaussianCovariance"
parallel = False
do_xi = True
inputs = [
("fiducial_cosmology", FiducialCosmology), # For the cosmological parameters
("twopoint_data_real", SACCFile), # For the binning information
("tracer_metadata", HDFFile), # For metadata
]
outputs = [
("summary_statistics_real", SACCFile),
]
config_options = {
"min_sep": StageParameter(float, default=2.5, msg="Minimum separation in arcmin."),
"max_sep": StageParameter(float, default=250, msg="Maximum separation in arcmin."),
"nbins": StageParameter(int, default=20, msg="Number of bins."),
"pickled_wigner_transform": StageParameter(str, default="", msg="Path to pickled Wigner transform."),
"use_true_shear": StageParameter(bool, default=False, msg="Use true shear values."),
"galaxy_bias": StageParameter(list, default=[0.0], msg="Galaxy bias values."),
"gaussian_sims_factor": StageParameter(list, default=[1.0], msg="Gaussian simulation factor."),
}
def run(self):
super().run()
def get_angular_bins(self, two_point_data):
# this should be changed to read from sacc file
th_arcmin = np.logspace(
np.log10(self.config["min_sep"]),
np.log10(self.config["max_sep"]),
self.config["nbins"] + 1,
)
return th_arcmin / 60.0
def read_sacc(self):
import sacc
f = self.get_input("twopoint_data_real")
two_point_data = sacc.Sacc.load_fits(f)
mask = [
two_point_data.indices(sacc.standard_types.galaxy_density_xi),
two_point_data.indices(sacc.standard_types.galaxy_shearDensity_xi_t),
two_point_data.indices(sacc.standard_types.galaxy_shear_xi_plus),
two_point_data.indices(sacc.standard_types.galaxy_shear_xi_minus),
]
mask = np.concatenate(mask)
two_point_data.keep_indices(mask)
two_point_data.to_canonical_order()
return two_point_data
def save_outputs(self, two_point_data, cov):
filename = self.get_output("summary_statistics_real")
two_point_data.add_covariance(cov, overwrite=True)
two_point_data.save_fits(filename, overwrite=True)
[docs]
class TXFourierTJPCovariance(PipelineStage):
"""
Compute a Gaussian Fourier-space covariance with TJPCov using mask geometry
This also calls out to TJPCov, using more recent additions to that package.
This version, for speed, re-uses the workspace objects cached in the twopoint
fourier measurement stage.
"""
name = "TXFourierTJPCovariance"
do_xi = False
inputs = [
("fiducial_cosmology", FiducialCosmology), # For the cosmological parameters
("twopoint_data_fourier", SACCFile), # For the binning information
("tracer_metadata_yml", YamlFile), # For metadata
("mask", MapsFile), # For the lens mask
("density_maps", MapsFile), # For the clustering mask
("source_maps", MapsFile), # For the sources masks
]
outputs = [
("summary_statistics_fourier", SACCFile),
]
config_options = {
"galaxy_bias": StageParameter(list, default=[0.0], msg="Galaxy bias values, or one zero for unit bias."),
"IA": StageParameter(float, default=0.5, msg="Intrinsic alignment parameter."),
"cache_dir": StageParameter(str, default="", msg="Cache directory."),
"cov_type": StageParameter(
list, default=["FourierGaussianNmt", "FourierSSCHaloModel"], msg="Covariance types to use."
),
}
def run(self):
from tjpcov.covariance_calculator import CovarianceCalculator
import healpy
# Read the metadata from earlier in the pipeline
with self.open_input("tracer_metadata_yml", wrapper=True) as f:
meta = f.content
# check the units are what we are expecting
assert meta["area_unit"] == "deg^2"
assert meta["density_unit"] == "arcmin^{-2}"
# get the number of bins from metadata
nbin_lens = meta["nbin_lens"]
nbin_source = meta["nbin_source"]
# set up some config options for TJPCov
# tjp_config corresponds to the "tjpcov" section of the input config
tjp_config = {}
tjp_config["cov_type"] = self.config["cov_type"]
cl_sacc = self.read_sacc()
tjp_config["sacc_file"] = cl_sacc
# The Gaussian classes use this
fsky = meta["area"] / sq_deg_on_sky
# Get the CCL cosmo object to pass to TJPCov
with self.open_input("fiducial_cosmology", wrapper=True) as f:
tjp_config["cosmo"] = f.to_ccl()
# Choose linear bias values to pass to CCL.
# Based on configuration option for user.
if self.config["galaxy_bias"] == [0.0]:
bias = [1.0 for i in range(nbin_lens)]
else:
bias = self.config["galaxy_bias"]
if not len(bias) == nbin_lens:
raise ValueError("Wrong number of bias values supplied")
# Set more TJPCov config options
for i in range(nbin_lens):
tjp_config[f"bias_lens_{i}"] = bias[i]
tjp_config[f"Ngal_lens_{i}"] = meta["lens_density"][i]
# Load masks
# For clustering, we follow twopoint_fourier.py:214-219
with self.open_input("mask", wrapper=True) as f:
mask = f.read_map("mask")
mask[mask == healpy.UNSEEN] = 0.0
if self.rank == 0:
print("Loaded mask")
# Set any unseen pixels to zero weight.
# TODO: unify this code with the code in twopoint_fourier.py
with self.open_input("density_maps", wrapper=True) as f:
nbin_lens = f.file["maps"].attrs["nbin_lens"]
d_maps = [f.read_map(f"delta_{b}") for b in range(nbin_lens)]
print(f"Loaded {nbin_lens} overdensity maps")
# twopoint_fourier.py:219
for d in d_maps:
mask[d == healpy.UNSEEN] = 0
# twopoint_fourier.py:225
with self.open_input("source_maps", wrapper=True) as f:
lensing_weights = []
for b in range(nbin_source):
lw = f.read_map(f"lensing_weight_{b}")
lw[lw == healpy.UNSEEN] = 0.0
lensing_weights.append(lw)
if self.rank == 0:
print(f"Loaded {nbin_source} lensing weight maps")
# Following twopoint_fourier.py:197 all clustering maps use this mask
masks = {f"lens_{i}": mask for i in range(nbin_lens)}
masks.update({f"source_{i}": lensing_weights[i] for i in range(nbin_source)})
masks_names = {f"lens_{i}": "mask_lens" for i in range(nbin_lens)}
masks_names.update({f"source_{i}": f"mask_source_{i}" for i in range(nbin_source)})
tjp_config[f"mask_file"] = masks
tjp_config[f"mask_names"] = masks_names
# Set the TJPCov specific cache:
if self.config["cache_dir"]:
tjp_config["outdir"] = self.config["cache_dir"]
else:
tjp_config["outdir"] = cl_sacc.metadata.get("cache_dir", ".")
# MPI
if self.comm:
self.comm.Barrier()
tjp_config["use_mpi"] = True
# Run TJPCov
# I shouldn't need to pass the binnning (unless the cache is not set or
# there is one of the workspaces missing). For generality, I will pass
# it.
tjp_config["binning_info"] = self.recover_NmtBin(cl_sacc)
# For now, since they're only strings, pass the workspaces even if not
# requested
workspaces = self.get_workspaces_dict(cl_sacc, masks_names)
cache = {"workspaces": workspaces}
# Compute the covariance and save it in the cache folder. This will
# save also the independent terms.
calculator = CovarianceCalculator({"tjpcov": tjp_config, "cache": cache, "GaussianFsky": {"fsky": fsky}})
calculator.create_sacc_cov("summary_statistics_fourier", save_terms=True)
# Write the sacc file with the covariance in the TXPipe output folder
if self.rank == 0:
cov = calculator.get_covariance()
cl_sacc.add_covariance(cov, overwrite=True)
output_filename = self.get_output("summary_statistics_fourier")
cl_sacc.save_fits(output_filename, overwrite=True)
print("Saved power spectra with its Gaussian covariance")
def get_workspaces_dict(self, cl_sacc, masks_names):
# Based on txpipe/twopoint_fourier.py
# TODO: Move this to txpipe/utils/nmt_utils.py
cache_dir = cl_sacc.metadata.get("cache_dir", None)
cache = self.load_workspace_cache(cache_dir)
if cache == {}:
return {}
hashes = {}
masks_names_list = list(masks_names.values())
for m in masks_names_list:
if m not in hashes:
hashes[m] = cl_sacc.metadata[f"hash/{m}"]
ell_hash = cl_sacc.metadata["hash/ell_hash"]
w = {"00": {}, "02": {}, "22": {}}
# Get the number of data points per Cell
dtype = cl_sacc.get_data_types()[0]
trs = cl_sacc.get_tracer_combinations()[0]
ell_eff, _ = cl_sacc.get_ell_cl(dtype, *trs)
n_ell = ell_eff.size
for tr1, tr2 in cl_sacc.get_tracer_combinations():
# This assumes that the name of the tracers will include 'lens'or
# 'source'
s1 = 0 if "lens" in tr1 else 2
s2 = 0 if "lens" in tr2 else 2
sk = "".join(sorted(f"{s1}{s2}"))
m1 = masks_names[tr1]
m2 = masks_names[tr2]
key = (m1, m2)
# checking if the combination m1,m2 or m2,m1 is already done.
if (key in w[sk]) or (key[::-1] in w[sk]):
continue
# Build workspace hash (twopoint_fourier.py:387-395)
h1 = hashes[m1]
h2 = hashes[m2]
cache_key = h1 ^ ell_hash
if h2 != h1:
cache_key ^= h2
w[sk][key] = str(cache.get_path(cache_key))
return w
def load_workspace_cache(self, dirname):
# Copied from twopoint_fourier.py
from .utils.nmt_utils import WorkspaceCache
if not dirname:
if self.rank == 0:
print("Not using an on-disc cache. Set cache_dir to use one")
return {}
cache = WorkspaceCache(dirname)
return cache
def recover_NmtBin(self, cl_sacc):
# This function replicates `choose_ell_bins` in twopoint_fourier.py
# TODO: Move this to txpipe/utils/nmt_utils.py
from .utils.nmt_utils import choose_ell_bins
ell_bins = choose_ell_bins(**cl_sacc.metadata)
# Check that the binning is compatible with the one in the file
dtype = cl_sacc.get_data_types()[0]
trs = cl_sacc.get_tracer_combinations()[0]
ell_eff, _ = cl_sacc.get_ell_cl(dtype, *trs)
if not np.all(ell_bins.get_effective_ells() == ell_eff):
print(ell_bins.get_effective_ells())
print(ell_eff)
raise ValueError("The reconstructed NmtBin object is not " + "compatible with the ells in the sacc file")
return ell_bins
def read_sacc(self):
# Loads a sacc file.
import sacc
f = self.get_input("twopoint_data_fourier")
two_point_data = sacc.Sacc.load_fits(f)
# Since NaMaster computes all terms (B-modes included). Keep all of
# them.
return two_point_data