Source code for txpipe.covariance_nmt

from .base_stage import PipelineStage
from .data_types import ShearCatalog, HDFFile, FiducialCosmology, SACCFile, MapsFile
import numpy as np
import warnings
import os
import pickle
import sys
from ceci.config import StageParameter

# require TJPCov to be in PYTHONPATH
d2r = np.pi / 180

# Needed changes: 1) ell and theta spacing could be further optimized 2) coupling matrix


[docs] class TXFourierNamasterCovariance(PipelineStage): """ Compute a Gaussian Fourier-space covariance with NaMaster This functionality duplicates that of TXFourierTJPCovariance, and we should rationalize. """ name = "TXFourierNamasterCovariance" do_xi = False inputs = [ ("fiducial_cosmology", FiducialCosmology), # For the cosmological parameters ("twopoint_data_fourier", SACCFile), # For the binning information ("tracer_metadata", HDFFile), # For metadata ("mask", MapsFile), # For the mask ] outputs = [ ("summary_statistics_fourier", SACCFile), ] config_options = { "pickled_wigner_transform": StageParameter(str, "", msg="Path to pickled Wigner transform file."), "use_true_shear": StageParameter(bool, False, msg="Whether to use true shear values."), "scratch_dir": StageParameter(str, "temp", msg="Directory for temporary files."), "nside": StageParameter(int, 1024, msg="HEALPix nside parameter."), } def run(self): import pymaster as nmt import h5py import healpy as hp import scipy import pyccl as ccl import sacc import tjpcov import threadpoolctl comm = self.comm size = self.size rank = self.rank self.scratch_dir = self.config["scratch_dir"] if rank == 0: if not os.path.exists(self.scratch_dir): os.makedirs(self.scratch_dir) # read the fiducial cosmology cosmo = self.read_cosmology() # read binning two_point_data, any_source, any_lens = self.read_sacc() # read the n(z) and f_sky from the source summary stats meta = self.read_number_statistics(any_source, any_lens) # read the mask with self.open_input("mask", wrapper=True) as f: m = f.read_map("mask") nside = self.config["nside"] m = hp.ud_grade(m, nside) msk = 1 * (m == 1) msk = nmt.mask_apodization(msk, 1.0, apotype="Smooth") # get w-workspace if rank == 0: spinlist = self.get_w_spinlist() else: spinlist = None if comm is None: spinlist = spinlist[0] else: spinlist = comm.scatter(spinlist, root=0) self.get_w(msk, spinlist) if comm is not None: comm.Barrier() self.read_w() # get cw-workspace if rank == 0: spinlist = self.get_cw_spinlist() else: spinlist = None if comm is None: spinlist = spinlist[0] else: spinlist = comm.scatter(spinlist, root=0) self.get_cw(spinlist) if comm is not None: comm.Barrier() self.read_cw() # Binning choices. The ell binning is a linear piece with all the # integer values up to 500 -- these are from firecrown, might need # to change later meta["ell"] = np.concatenate( ( np.linspace(2, 500 - 1, 500 - 2), np.logspace(np.log10(500), np.log10(6e4), 500), ) ) # creating ell arrays for the tjp-block and nmt-block tjp_ell_mask = meta["ell"] > nside * 3 meta["ell_tjp"] = meta["ell"][tjp_ell_mask] meta["ell_nmt0"] = np.linspace(0, 3 * nside - 1, 3 * nside) meta["ell_nmt"] = meta["ell"][np.invert(tjp_ell_mask)] # Theta binning - log spaced between 1 .. 300 arcmin. meta["theta"] = np.logspace(np.log10(1 / 60), np.log10(300.0 / 60), 3000) if rank == 0: diclist, covsize = self.make_mpi_dict(cosmo, meta, two_point_data=two_point_data) else: diclist = None covsize = None if comm is None: diclist = diclist[0] else: diclist = comm.scatter(diclist, root=0) covsize = comm.bcast(covsize, root=0) self.compute_covariance(cosmo, meta, two_point_data, diclist) if self.comm is not None: comm.Barrier() cov = self.put_together(covsize) self.save_outputs(two_point_data, cov) def save_outputs(self, two_point_data, cov): filename = self.get_output("summary_statistics_fourier") two_point_data.add_covariance(cov, overwrite=True) two_point_data.save_fits(filename, overwrite=True) def read_cosmology(self): return self.open_input("fiducial_cosmology", wrapper=True).to_ccl() def read_sacc(self): import sacc f = self.get_input("twopoint_data_fourier") two_point_data = sacc.Sacc.load_fits(f) # Remove the data types that we won't use for inference mask = [ two_point_data.indices(sacc.standard_types.galaxy_shear_cl_ee), two_point_data.indices(sacc.standard_types.galaxy_shearDensity_cl_e), two_point_data.indices(sacc.standard_types.galaxy_density_cl), # not doing b-modes, do we want to? ] print("Length before cuts = ", len(two_point_data)) mask = np.concatenate(mask) two_point_data.keep_indices(mask) print("Length after cuts = ", len(two_point_data)) two_point_data.to_canonical_order() any_source = any( d.data_type in [sacc.standard_types.galaxy_shear_cl_ee, sacc.standard_types.galaxy_shearDensity_cl_e] for d in two_point_data.data ) any_lens = any( d.data_type in [sacc.standard_types.galaxy_shearDensity_cl_e, sacc.standard_types.galaxy_density_cl] for d in two_point_data.data ) return two_point_data, any_source, any_lens def read_number_statistics(self, any_source, any_lens): # Read the bits of the metadata that we need with self.open_input("tracer_metadata") as input_data: # area in sq deg area_deg2 = input_data["tracers"].attrs["area"] area_unit = input_data["tracers"].attrs["area_unit"] if any_source: N_eff = input_data["tracers/N_eff"][:] if self.config["use_true_shear"]: nbins = len(input_data["tracers/sigma_e"][:]) sigma_e = np.array([0.0 for i in range(nbins)]) else: sigma_e = input_data["tracers/sigma_e"][:] if any_lens: N_lens = input_data["tracers/lens_counts"][:] if area_unit != "deg^2": raise ValueError("Units of area have changed") # area in steradians and sky fraction area = area_deg2 * np.radians(1) ** 2 area_arcmin2 = area_deg2 * 60**2 full_sky = 4 * np.pi f_sky = area / full_sky # Output dict meta = { "f_sky": f_sky, } # General bits print(f"area = {area_deg2:.1f} deg^2") print(f"f_sky: {f_sky}") # Lens related bits if any_lens: n_lens = N_lens / area n_lens_arcmin = N_lens / area_arcmin2 print(f"N_lens: {N_lens} (totals)") print(f"lens density: {n_lens} / steradian") print(f" = {np.around(n_lens_arcmin, 2)} / arcmin") meta["n_lens"] = n_lens # Source-related bits if any_source: n_eff = N_eff / area n_eff_arcmin = N_eff / area_arcmin2 print(f"N_eff: {N_eff} (totals)") print(f"n_eff: {n_eff} / steradian") print(f" = {np.around(n_eff_arcmin, 2)} / sq arcmin") meta["sigma_e"] = sigma_e meta["n_eff"] = n_eff return meta def get_w_spinlist(self): size = self.size allspins = [(0, 0), (2, 0), (2, 2)] spinlist = [[] for i in range(size)] for i, spins in enumerate(allspins): num = i % size spinlist[num].append(spins) return spinlist def get_w(self, msk, spinlist): import pymaster as nmt nside = self.config["nside"] self.f0 = nmt.NmtField(msk, [msk], n_iter=0) # Spin-2 field self.f2 = nmt.NmtField(msk, [msk, msk], n_iter=2) # Binning self.b = nmt.NmtBin.from_nside_linear(nside, 48) # Workspace for spins in spinlist: s1 = spins[0] s2 = spins[1] self.w = nmt.NmtWorkspace() self.w.compute_coupling_matrix(getattr(self, f"f{s1}"), getattr(self, f"f{s2}"), self.b) self.w.write_to(f"{self.scratch_dir}/w{s1}{s2}.fits") def read_w(self): import pymaster as nmt # These are accessed via getattr in compute_covariance_block self.w00 = nmt.NmtWorkspace() self.w00.read_from(f"{self.scratch_dir}/w00.fits") self.w20 = nmt.NmtWorkspace() self.w20.read_from(f"{self.scratch_dir}/w20.fits") self.w22 = nmt.NmtWorkspace() self.w22.read_from(f"{self.scratch_dir}/w22.fits") def get_cw_spinlist(self): size = self.size allspins = [ (0, 0, 0, 0), (0, 0, 2, 0), (0, 0, 2, 2), (2, 0, 2, 0), (2, 0, 2, 2), (2, 2, 2, 2), ] spinlist = [[] for i in range(size)] for i, spins in enumerate(allspins): num = i % size spinlist[num].append(spins) return spinlist def get_cw(self, spinlist): import pymaster as nmt for spins in spinlist: s1 = spins[0] s2 = spins[1] s3 = spins[2] s4 = spins[3] cw = nmt.NmtCovarianceWorkspace() cw.compute_coupling_coefficients( getattr(self, f"f{s1}"), getattr(self, f"f{s2}"), getattr(self, f"f{s3}"), getattr(self, f"f{s4}"), ) cw.write_to(f"{self.scratch_dir}/cw{s1}{s2}{s3}{s4}.fits") def read_cw(self): import pymaster as nmt # These are accessed via getattr in compute_covariance_block self.cw0000 = nmt.NmtCovarianceWorkspace() self.cw0000.read_from(f"{self.scratch_dir}/cw0000.fits") self.cw0020 = nmt.NmtCovarianceWorkspace() self.cw0020.read_from(f"{self.scratch_dir}/cw0020.fits") self.cw0022 = nmt.NmtCovarianceWorkspace() self.cw0022.read_from(f"{self.scratch_dir}/cw0022.fits") self.cw2020 = nmt.NmtCovarianceWorkspace() self.cw2020.read_from(f"{self.scratch_dir}/cw2020.fits") self.cw2022 = nmt.NmtCovarianceWorkspace() self.cw2022.read_from(f"{self.scratch_dir}/cw2022.fits") self.cw2222 = nmt.NmtCovarianceWorkspace() self.cw2222.read_from(f"{self.scratch_dir}/cw2222.fits") def get_tracer_info(self, cosmo, meta, two_point_data): # Generates CCL tracers from n(z) information in the data file import pyccl as ccl import scipy ccl_tracers = {} tracer_noise = {} for tracer in two_point_data.tracers: # Pull out the integer corresponding to the tracer index tracer_dat = two_point_data.get_tracer(tracer) nbin = int(two_point_data.tracers[tracer].name.split("_")[1]) z = tracer_dat.z.copy().flatten() nz = tracer_dat.nz.copy().flatten() # Identify source tracers and gnerate WeakLensingTracer objects # based on them if "source" in tracer or "src" in tracer: sigma_e = meta["sigma_e"][nbin] n_eff = meta["n_eff"][nbin] ccl_tracers[tracer] = ccl.WeakLensingTracer(cosmo, dndz=(z, nz)) # CCL automatically normalizes dNdz tracer_noise[tracer] = sigma_e**2 / n_eff # or if it is a lens bin then generaete the corresponding # CCL tracer class elif "lens" in tracer: b = 1.0 * np.ones(len(z)) # place holder n_gal = meta["n_lens"][nbin] tracer_noise[tracer] = 1 / n_gal ccl_tracers[tracer] = ccl.NumberCountsTracer(cosmo, has_rsd=False, dndz=(z, nz), bias=(z, b)) return ccl_tracers, tracer_noise def get_spins(self, tracer_comb): # Get the Wigner Transform factors WT_factors = {} WT_factors["lens", "source"] = (0, 2) WT_factors["source", "lens"] = (2, 0) # same as (0,2) WT_factors["source", "source"] = {"plus": (2, 2), "minus": (2, -2)} WT_factors["lens", "lens"] = (0, 0) tracers = [] for i in tracer_comb: if "lens" in i: tracers += ["lens"] if "source" in i: tracers += ["source"] return WT_factors[tuple(tracers)] # compute a single covariance matrix for a given pair of C_ell or xi. def compute_covariance_block( self, cosmo, meta, ell_bins, tracer_comb1=None, tracer_comb2=None, ccl_tracers=None, tracer_Noise=None, two_point_data=None, xi_plus_minus1="plus", xi_plus_minus2="plus", cache=None, WT=None, ): import pyccl as ccl from tjpcov.wigner_transform import bin_cov import pymaster as nmt import scipy cl = {} # tracers 1,2,3,4 = tracer_comb1[0], tracer_comb1[1], tracer_comb2[0], tracer_comb2[1] # In the dicts below we use '13' to indicate C_ell_(1,3), etc. # This index maps to this usage reindex = { (0, 0): 13, (1, 1): 24, (0, 1): 14, (1, 0): 23, } ell = meta["ell"] ell_tjp = meta["ell_tjp"] ell_nmt = meta["ell_nmt"] # interpolate nmt covariance to this ell ell_nmt0 = meta["ell_nmt0"] # ell for calculating nmt covariance # Getting all the C_ell that we need, saving the results in a cache # for later re-use for i in (0, 1): for j in (0, 1): local_key = reindex[(i, j)] # For symmetric pairs we may have saved the C_ell the other # way around, so try both keys cache_key1 = (tracer_comb1[i], tracer_comb2[j]) cache_key2 = (tracer_comb2[j], tracer_comb1[i]) if cache_key1 in cache: cl[local_key] = cache[cache_key1] elif cache_key2 in cache: cl[local_key] = cache[cache_key2] else: # If not cached then we must compute t1 = tracer_comb1[i] t2 = tracer_comb2[j] c = ccl.angular_cl(cosmo, ccl_tracers[t1], ccl_tracers[t2], ell) print("Computed C_ell for ", cache_key1) cache[cache_key1] = c cl[local_key] = c # create cl's for tjp and nmt separately cl_tjp = {} for label in [13, 14, 23, 24]: cl_tjp[label] = np.interp(ell_tjp, ell, cl[label]) cl_nmt = {} for label in [13, 14, 23, 24]: cl_nmt[label] = np.interp(ell_nmt0, ell, cl[label]) # The shape noise C_ell values. # These are zero for cross bins and as computed earlier for auto bins SN = {} SN[13] = tracer_Noise[tracer_comb1[0]] if tracer_comb1[0] == tracer_comb2[0] else 0 SN[24] = tracer_Noise[tracer_comb1[1]] if tracer_comb1[1] == tracer_comb2[1] else 0 SN[14] = tracer_Noise[tracer_comb1[0]] if tracer_comb1[0] == tracer_comb2[1] else 0 SN[23] = tracer_Noise[tracer_comb1[1]] if tracer_comb1[1] == tracer_comb2[0] else 0 # The tjp part of the covariance # The coupling is an identity matrix at least when we neglect # the mask coupling_mat = {} coupling_mat[1324] = np.eye(len(ell_tjp)) coupling_mat[1423] = np.eye(len(ell_tjp)) # Initial covariance of C_ell components cov_tjp = {} cov_tjp[1324] = np.outer(cl_tjp[13] + SN[13], cl_tjp[24] + SN[24]) * coupling_mat[1324] cov_tjp[1423] = np.outer(cl_tjp[14] + SN[14], cl_tjp[23] + SN[23]) * coupling_mat[1423] # for shear-shear components we also add a B-mode contribution first_is_shear_shear = ("source" in tracer_comb1[0]) and ("source" in tracer_comb1[1]) second_is_shear_shear = ("source" in tracer_comb2[0]) and ("source" in tracer_comb2[1]) if self.do_xi and (first_is_shear_shear or second_is_shear_shear): # this adds the B-mode shape noise contribution. # We assume B-mode power (C_ell) is 0 Bmode_F = 1 if xi_plus_minus1 != xi_plus_minus2: # in the cross term, this contribution is subtracted. # eq. 29-31 of https://arxiv.org/pdf/0708.0387.pdf Bmode_F = -1 # below the we multiply zero to maintain the shape of the Cl array, these are effectively # B-modes cov_tjp[1324] += np.outer(cl_tjp[13] * 0 + SN[13], cl_tjp[24] * 0 + SN[24]) * coupling_mat[1324] * Bmode_F cov_tjp[1423] += np.outer(cl_tjp[14] * 0 + SN[14], cl_tjp[23] * 0 + SN[23]) * coupling_mat[1423] * Bmode_F cov_tjp["final"] = cov_tjp[1423] + cov_tjp[1324] # The nmt part of the covariance: w1spin, w2spin, nmtspin = self.get_nmt_spin(tracer_comb1, tracer_comb2) w1 = getattr(self, "w" + w1spin) w2 = getattr(self, "w" + w2spin) cw = getattr(self, "cw" + nmtspin) shape = self.get_nmt_shape(tracer_comb1, tracer_comb2, meta) nmt_input = self.get_nmt_input(tracer_comb1, tracer_comb2, cl_nmt, SN) nmt_input_bmode = self.get_nmt_input_bmode(tracer_comb1, tracer_comb2, cl_nmt, SN) nmt_cov = nmt.gaussian_covariance( cw, int(nmtspin[0]), int(nmtspin[1]), int(nmtspin[2]), int(nmtspin[3]), nmt_input[13], nmt_input[14], nmt_input[23], nmt_input[24], wa=w1, wb=w2, coupled=True, ).reshape(shape)[:, 0, :, 0] # for shear-shear components we also add a B-mode contribution first_is_shear_shear = ("source" in tracer_comb1[0]) and ("source" in tracer_comb1[1]) second_is_shear_shear = ("source" in tracer_comb2[0]) and ("source" in tracer_comb2[1]) if self.do_xi and (first_is_shear_shear or second_is_shear_shear): Bmode_F = 1 if xi_plus_minus1 != xi_plus_minus2: Bmode_F = -1 nmt_cov += ( nmt.gaussian_covariance( cw, int(nmtspin[0]), int(nmtspin[1]), int(nmtspin[2]), int(nmtspin[3]), nmt_input_bmode[13], nmt_input_bmode[14], nmt_input_bmode[23], nmt_input_bmode[24], wa=w1, wb=w2, coupled=True, ).reshape(shape)[:, 0, :, 0] * Bmode_F ) # Transform nmt part covariance back to the un-normalized # tjp part to combine them together nmt_cov *= 1.0 / (meta["f_sky"] ** 3) norm_nmt = (2 * ell_nmt0 + 1) * np.gradient(ell_nmt0) * meta["f_sky"] nmt_cov *= norm_nmt # interpolate nmt-part covariance to the original ell f = scipy.interpolate.interp2d(ell_nmt0, ell_nmt0, nmt_cov, kind="cubic") nmt_cov = f(ell_nmt, ell_nmt) # Combining tjp and nmt parts together cov = {} cov["final"] = np.zeros((len(ell), len(ell))) cov["final"][: len(ell_nmt), : len(ell_nmt)] = nmt_cov cov["final"][len(ell_nmt) :, len(ell_nmt) :] = cov_tjp["final"] # Normalization and/or wigner transform if self.do_xi: s1_s2_1 = self.get_spins(tracer_comb1) s1_s2_2 = self.get_spins(tracer_comb2) # For the shear-shear we have two sets of spins, plus and minus, # which are returned as a dict, so we need to pull out the one we need # Otherwise it's just specified as a tuple, e.g. (2,0) if isinstance(s1_s2_1, dict): s1_s2_1 = s1_s2_1[xi_plus_minus1] if isinstance(s1_s2_2, dict): s1_s2_2 = s1_s2_2[xi_plus_minus2] # Use these terms to project the covariance from C_ell to xi(theta) print( "tracer combos:", tracer_comb1, tracer_comb2, " xi+/xi-:", xi_plus_minus1, xi_plus_minus2, " spins:", s1_s2_1, s1_s2_2, ) th, cov["final"] = WT.projected_covariance( ell_cl=ell, s1_s2=s1_s2_1, s1_s2_cross=s1_s2_2, cl_cov=cov["final"] ) # Normalize # The overall normalization factor at the front of the matrix if self.do_xi: norm = np.pi * 4 * meta["f_sky"] else: norm = (2 * ell + 1) * np.gradient(ell) * meta["f_sky"] cov["final"] /= norm # Put the covariance into bins. # This is optional in the case of a C_ell covariance (only if bins in ell are # supplied, otherwise the matrix is for each ell value individually). It is # required for real-space covariances since these are always binned. if self.do_xi: thb, cov["final_b"] = bin_cov(r=th / d2r, r_bins=ell_bins, cov=cov["final"]) else: if ell_bins is not None: lb, cov["final_b"] = bin_cov(r=ell, r_bins=ell_bins, cov=cov["final"]) return cov["final_b"] def get_nmt_spin(self, tracer_comb1, tracer_comb2): s1_s2_1 = self.get_spins(tracer_comb1) s1_s2_2 = self.get_spins(tracer_comb2) if isinstance(s1_s2_1, dict): s1_s2_1 = s1_s2_1["plus"] if isinstance(s1_s2_2, dict): s1_s2_2 = s1_s2_2["plus"] w1spin = str(abs(s1_s2_1[0])) + str(abs(s1_s2_1[1])) w2spin = str(abs(s1_s2_2[0])) + str(abs(s1_s2_2[1])) nmtspin = str(abs(s1_s2_1[0])) + str(abs(s1_s2_1[1])) + str(abs(s1_s2_2[0])) + str(abs(s1_s2_2[1])) return w1spin, w2spin, nmtspin def get_nmt_shape(self, tracer_comb1, tracer_comb2, meta): nell = len(meta["ell_nmt0"]) s1_s2_1 = self.get_spins(tracer_comb1) s1_s2_2 = self.get_spins(tracer_comb2) if isinstance(s1_s2_1, dict): s1_s2_1 = s1_s2_1["plus"] if isinstance(s1_s2_2, dict): s1_s2_2 = s1_s2_2["plus"] s1 = (abs(s1_s2_1[0]), abs(s1_s2_1[1])) s2 = (abs(s1_s2_2[0]), abs(s1_s2_2[1])) dim2 = sum(s1) + 1 if sum(s1) == 0 else sum(s1) dim4 = sum(s2) + 1 if sum(s2) == 0 else sum(s2) return [nell, dim2, nell, dim4] def get_nmt_input(self, tracer_comb1, tracer_comb2, cl_nmt, SN): s1_s2_1 = self.get_spins(tracer_comb1) s1_s2_2 = self.get_spins(tracer_comb2) if isinstance(s1_s2_1, dict): s1_s2_1 = s1_s2_1["plus"] if isinstance(s1_s2_2, dict): s1_s2_2 = s1_s2_2["plus"] s1 = abs(s1_s2_1[0]) s2 = abs(s1_s2_1[1]) s3 = abs(s1_s2_2[0]) s4 = abs(s1_s2_2[1]) cl130 = 0 * cl_nmt[13] cl13 = [cl_nmt[13] + SN[13], cl130, cl130, cl130 + SN[13]] cl140 = 0 * cl_nmt[14] cl14 = [cl_nmt[14] + SN[14], cl140, cl140, cl140 + SN[14]] cl230 = 0 * cl_nmt[23] cl23 = [cl_nmt[23] + SN[23], cl230, cl230, cl230 + SN[23]] cl240 = 0 * cl_nmt[24] cl24 = [cl_nmt[24] + SN[24], cl240, cl240, cl240 + SN[24]] n13 = 1 if s1 + s3 == 0 else s1 + s3 n14 = 1 if s1 + s4 == 0 else s1 + s4 n23 = 1 if s2 + s3 == 0 else s2 + s3 n24 = 1 if s2 + s4 == 0 else s2 + s4 nmt_input = {} nmt_input[13] = cl13[:n13] nmt_input[14] = cl14[:n14] nmt_input[23] = cl23[:n23] nmt_input[24] = cl24[:n24] return nmt_input def get_nmt_input_bmode(self, tracer_comb1, tracer_comb2, cl_nmt, SN): s1_s2_1 = self.get_spins(tracer_comb1) s1_s2_2 = self.get_spins(tracer_comb2) if isinstance(s1_s2_1, dict): s1_s2_1 = s1_s2_1["plus"] if isinstance(s1_s2_2, dict): s1_s2_2 = s1_s2_2["plus"] s1 = abs(s1_s2_1[0]) s2 = abs(s1_s2_1[1]) s3 = abs(s1_s2_2[0]) s4 = abs(s1_s2_2[1]) cl130 = 0 * cl_nmt[13] cl13 = [cl130 + SN[13], cl130, cl130, cl130 + SN[13]] cl140 = 0 * cl_nmt[14] cl14 = [cl140 + SN[14], cl140, cl140, cl140 + SN[14]] cl230 = 0 * cl_nmt[23] cl23 = [cl230 + SN[23], cl230, cl230, cl230 + SN[23]] cl240 = 0 * cl_nmt[24] cl24 = [cl240 + SN[24], cl240, cl240, cl240 + SN[24]] n13 = 1 if s1 + s3 == 0 else s1 + s3 n14 = 1 if s1 + s4 == 0 else s1 + s4 n23 = 1 if s2 + s3 == 0 else s2 + s3 n24 = 1 if s2 + s4 == 0 else s2 + s4 nmt_input = {} nmt_input[13] = cl13[:n13] nmt_input[14] = cl14[:n14] nmt_input[23] = cl23[:n23] nmt_input[24] = cl24[:n24] return nmt_input def get_angular_bins(self, two_point_data): # Assume that the ell binning is the same for each of the bins. # This is true in the current pipeline. X = two_point_data.get_data_points("galaxy_shear_cl_ee", i=0, j=0) # Further assume that the ell ranges are contiguous, so that # the max value of one window is the min value of the next. # So we just need the lower edges of each bin and then the # final maximum value of the last bin ell_edges = [x["window"].min for x in X] ell_edges.append(X[-1]["window"].max) return np.array(ell_edges) def make_wigner_transform(self, meta): import threadpoolctl from tjpcov import wigner_transform path = self.config["pickled_wigner_transform"] if path: if os.path.exists(path): print(f"Loading precomputed wigner transform from {path}") WT = pickle.load(open(path, "rb")) return WT else: print(f"Precomputed wigner transform {path} not found.") print("Will compute it and then save it.") # We don't want to use n processes with n threads each by accident, # where n is the number of CPUs we have # so for this bit of the code, which uses python's multiprocessing, # we limit the number of threads that numpy etc can use. # After this is finished this will switch back to allowing all the CPUs # to be used for threading instead. num_processes = int(os.environ.get("OMP_NUM_THREADS", 1)) print("Generating Wigner Transform.") with threadpoolctl.threadpool_limits(1): WT = wigner_transform.WignerTransform( ell=meta["ell"], theta=meta["theta"] * d2r, s1_s2=[(2, 2), (2, -2), (0, 2), (2, 0), (0, 0)], ncpu=num_processes, ) print("Computed Wigner Transform.") if path: try: pickle.dump(WT, open(path, "wb")) except OSError: sys.stderr.write(f"Could not save wigner transform to {path}") return WT # make a list of list of dictionary, to be scattered to each node. i.e. # each node receives a list of dictionary, which contains info about # the row and column of the covariance block (i,j), as well as some other # info, e.g. tracer combo and xi_pm def make_mpi_dict(self, cosmo, meta, two_point_data): # we will loop over all these tracer_combs = two_point_data.get_tracer_combinations() N2pt = len(tracer_combs) # the bit below is just counting the number of 2pt functions, and accounting # for the fact that xi needs to be double counted N2pt0 = 0 if self.do_xi: N2pt0 = N2pt tracer_combs_temp = tracer_combs.copy() for combo in tracer_combs: if ("source" in combo[0]) and ("source" in combo[1]): N2pt += 1 tracer_combs_temp += [combo] tracer_combs = tracer_combs_temp.copy() ell_bins = self.get_angular_bins(two_point_data) Nell_bins = len(ell_bins) - 1 covsize = {"Nell_bins": Nell_bins, "N2pt": N2pt} count_xi_pm1 = 0 count_xi_pm2 = 0 cl_cache = {} xi_pm = [ [("plus", "plus"), ("plus", "minus")], [("minus", "plus"), ("minus", "minus")], ] xim_start = N2pt0 - (N2pt - N2pt0) xim_end = N2pt0 # create a list of list of dictionaries, and scatter it using MPI alldic = [] # Look through the chunk of matrix, tracer pair by tracer pair for i in range(0, N2pt): tracer_comb1 = tracer_combs[i] count_xi_pm1 = 1 if i in range(xim_start, xim_end) else 0 for j in range(i, N2pt): tracer_comb2 = tracer_combs[j] print(f"Computing {tracer_comb1} x {tracer_comb2}: chunk ({i},{j}) of ({N2pt},{N2pt})") count_xi_pm2 = 1 if j in range(xim_start, xim_end) else 0 if ( self.do_xi and ("source" in tracer_comb1[0] and "source" in tracer_comb1[1]) or ("source" in tracer_comb2[0] and "source" in tracer_comb2[1]) ): dic = { "ij": (i, j), "tracer_comb1": tracer_comb1, "tracer_comb2": tracer_comb2, "xi_plus_minus1": xi_pm[count_xi_pm1][count_xi_pm2][0], "xi_plus_minus2": xi_pm[count_xi_pm1][count_xi_pm2][1], } alldic.append(dic) else: dic = { "ij": (i, j), "tracer_comb1": tracer_comb1, "tracer_comb2": tracer_comb2, } alldic.append(dic) size = self.size diclist = [[] for i in range(size)] for i, dic in enumerate(alldic): num = i % size diclist[num].append(dic) return diclist, covsize # compute all the covariances and then combine them into one single giant matrix def compute_covariance(self, cosmo, meta, two_point_data, diclist): from tjpcov.wigner_transform import bin_cov ccl_tracers, tracer_Noise = self.get_tracer_info(cosmo, meta, two_point_data=two_point_data) # we will loop over all these tracer_combs = two_point_data.get_tracer_combinations() N2pt = len(tracer_combs) WT = self.make_wigner_transform(meta) # the bit below is just counting the number of 2pt functions, and accounting # for the fact that xi needs to be double counted N2pt0 = 0 if self.do_xi: N2pt0 = N2pt tracer_combs_temp = tracer_combs.copy() for combo in tracer_combs: if ("source" in combo[0]) and ("source" in combo[1]): N2pt += 1 tracer_combs_temp += [combo] tracer_combs = tracer_combs_temp.copy() ell_bins = self.get_angular_bins(two_point_data) Nell_bins = len(ell_bins) - 1 cov_full = np.zeros((Nell_bins * N2pt, Nell_bins * N2pt)) count_xi_pm1 = 0 count_xi_pm2 = 0 cl_cache = {} xi_pm = [ [("plus", "plus"), ("plus", "minus")], [("minus", "plus"), ("minus", "minus")], ] # Look through the chunk of matrix, tracer pair by tracer pair for num, dic in enumerate(diclist): tracer_comb1 = dic["tracer_comb1"] tracer_comb2 = dic["tracer_comb2"] if ( self.do_xi and ("source" in tracer_comb1[0] and "source" in tracer_comb1[1]) or ("source" in tracer_comb2[0] and "source" in tracer_comb2[1]) ): # print(tracer_comb1,tracer_comb2,dic['xi_plus_minus1'],dic['xi_plus_minus2']) cov_ij = self.compute_covariance_block( cosmo, meta, ell_bins, tracer_comb1=dic["tracer_comb1"], tracer_comb2=dic["tracer_comb2"], ccl_tracers=ccl_tracers, tracer_Noise=tracer_Noise, two_point_data=two_point_data, xi_plus_minus1=dic["xi_plus_minus1"], xi_plus_minus2=dic["xi_plus_minus2"], cache=cl_cache, WT=WT, ) else: cov_ij = self.compute_covariance_block( cosmo, meta, ell_bins, tracer_comb1=dic["tracer_comb1"], tracer_comb2=dic["tracer_comb2"], ccl_tracers=ccl_tracers, tracer_Noise=tracer_Noise, two_point_data=two_point_data, cache=cl_cache, WT=WT, ) i = dic["ij"][0] j = dic["ij"][1] np.savetxt(f"{self.scratch_dir}/cov_{i}_{j}.txt", cov_ij) def put_together(self, covsize): Nell_bins = covsize["Nell_bins"] N2pt = covsize["N2pt"] cov_full = np.zeros((Nell_bins * N2pt, Nell_bins * N2pt)) for i in range(0, N2pt): for j in range(i, N2pt): # Fill in this chunk of the matrix cov_ij = np.loadtxt(f"{self.scratch_dir}/cov_{i}_{j}.txt") # Find the right location in the matrix start_i = i * Nell_bins start_j = j * Nell_bins end_i = start_i + Nell_bins end_j = start_j + Nell_bins # and fill it in, and the transpose component cov_full[start_i:end_i, start_j:end_j] = cov_ij cov_full[start_j:end_j, start_i:end_i] = cov_ij.T try: np.linalg.cholesky(cov_full) except: print( "liAnalg.LinAlgError: Covariance not positive definite! " "Most likely this is a problem in xim. " "We will continue for now but this needs to be fixed." ) return cov_full
[docs] class TXRealNamasterCovariance(TXFourierNamasterCovariance): """ Compute a Gaussian real-space covariance with NaMaster We don't yet have another stage for this, but should rationalize when comparing to TJPCov. """ name = "TXRealNamasterCovariance" do_xi = True inputs = [ ("fiducial_cosmology", FiducialCosmology), # For the cosmological parameters ("twopoint_data_real", SACCFile), # For the binning information ("tracer_metadata", HDFFile), # For metadata ("mask", MapsFile), ] outputs = [ ("summary_statistics_real", SACCFile), ] config_options = { "pickled_wigner_transform": StageParameter(str, "", msg="Path to pickled Wigner transform file."), "use_true_shear": StageParameter(bool, False, msg="Whether to use true shear values."), "galaxy_bias": StageParameter(list, [0.0], msg="Galaxy bias values."), "scratch_dir": StageParameter(str, "temp", msg="Directory for temporary files."), } def run(self): super().run() def get_angular_bins(self, two_point_data): min_sep = float(two_point_data.metadata["provenance/config/min_sep"]) max_sep = float(two_point_data.metadata["provenance/config/max_sep"]) nbins = int(two_point_data.metadata["provenance/config/nbins"]) th_arcmin = np.logspace( np.log10(min_sep), np.log10(max_sep), nbins + 1, ) return th_arcmin / 60.0 def read_sacc(self): import sacc f = self.get_input("twopoint_data_real") two_point_data = sacc.Sacc.load_fits(f) mask = [ two_point_data.indices(sacc.standard_types.galaxy_density_xi), two_point_data.indices(sacc.standard_types.galaxy_shearDensity_xi_t), two_point_data.indices(sacc.standard_types.galaxy_shear_xi_plus), two_point_data.indices(sacc.standard_types.galaxy_shear_xi_minus), ] mask = np.concatenate(mask) two_point_data.keep_indices(mask) two_point_data.to_canonical_order() any_source = any( d.data_type in [ sacc.standard_types.galaxy_shearDensity_xi_t, sacc.standard_types.galaxy_shear_xi_plus, sacc.standard_types.galaxy_shear_xi_minus, ] for d in two_point_data.data ) any_lens = any( d.data_type in [sacc.standard_types.galaxy_density_xi, sacc.standard_types.galaxy_shearDensity_xi_t] for d in two_point_data.data ) return two_point_data, any_source, any_lens def save_outputs(self, two_point_data, cov): filename = self.get_output("summary_statistics_real") two_point_data.add_covariance(cov, overwrite=True) two_point_data.save_fits(filename, overwrite=True)