from .base_stage import PipelineStage
from .utils import choose_pixelization
from .data_types import (
HDFFile,
ShearCatalog,
TextFile,
MapsFile,
FileCollection,
FiducialCosmology,
TomographyCatalog,
QPNOfZFile,
)
from ceci.config import StageParameter
import glob
import time
import numpy as np
[docs]
class TXLogNormalGlass(PipelineStage):
"""
Uses GLASS to generate a simulated catalog from lognormal fields
GLASS citation:
https://ui.adsabs.harvard.edu/abs/2023OJAp....6E..11T
Contamination is applied to the density field by poission sampling
the density field at a higher rate than target density, then
removing objects with prob proportional to 1/input_weight
This allows us to produce contaminated/ucontamined maps of the same
galaxy realisation
"""
name = "TXLogNormalGlass"
parallel = False
inputs = [
("mask", MapsFile),
("lens_photoz_stack", QPNOfZFile),
("fiducial_cosmology", FiducialCosmology),
("input_lss_weight_maps", MapsFile),
]
outputs = [
("photometry_catalog", HDFFile),
("lens_tomography_catalog_unweighted", TomographyCatalog),
("glass_cl_shells", HDFFile),
("glass_cl_binned", HDFFile),
("density_shells", HDFFile),
# TO DO: add shear maps
]
config_options = {
"num_dens": StageParameter(list, required=True, msg="Number density of galaxies per square arcmin"),
"zmin": StageParameter(float, 0.0, msg="Minimum redshift for the simulation"),
"zmax": StageParameter(float, 2.0, msg="Maximum redshift for the simulation"),
"dx": StageParameter(int, 100, msg="Comoving distance spacing for redshift shells in Mpc"),
"bias0": StageParameter(float, 2.0, msg="Linear bias at zpivot"),
"alpha_bz": StageParameter(float, 0.0, msg="Controls redshift evolution of bias"),
"zpivot": StageParameter(float, 0.6, msg="Pivot redshift for bias evolution"),
"shift": StageParameter(float, 1.0, msg="Lognormal shift parameter"),
"contaminate": StageParameter(bool, False, msg="Whether to apply contamination to the density field"),
"random_seed": StageParameter(int, 0, msg="Random seed for reproducibility"),
"cl_optional_file": StageParameter(
str, "none", msg="Optional file for input C(l) values. Otherwise they are computed (slow)"
),
"ell_binned_min": StageParameter(float, 0.1, msg="Minimum ell for binned C(l) output"),
"ell_binned_max": StageParameter(float, 5.0e5, msg="Maximum ell for binned C(l) output"),
"ell_binned_nbins": StageParameter(int, 100, msg="Number of ell bins for binned C(l) output"),
"output_density_shell_maps": StageParameter(bool, False, msg="Whether to output density maps for each shell"),
"density_shell_nest": StageParameter(bool, False, msg="Whether output density maps be in nest ordering"),
}
def run(self):
"""
Execute the TXLogNormalGlass pipeline stage
This method coordinates the execution of the GLASS-based pipeline,
generating simulated catalogs from lognormal fields
It calls the following sub-methods:
- set_z_shells
- generate_shell_cls #these are the C(l) used to generate the mocks
- generate_binned_cls #these are the C(l) the user can use for testing
- generate_catalogs
- save_catalogs
"""
# get nside and nest info from mask
with self.open_input("mask", wrapper=True) as map_file:
mask_map_info = map_file.read_map_info("mask")
self.nside = self.lmax = mask_map_info["nside"]
self.nest = mask_map_info["nest"]
self.set_z_shells()
self.generate_shell_cls()
self.generate_binned_cls()
self.generate_catalogs()
def set_z_shells(self):
"""
Set up redshift shells with uniform spacing in comoving distance
These shells determine the redshift resolution of the simulation so should
be narrower than the n(z)s
This method initializes redshift bins based on the provided configuration
options. It calculates redshift shells and associated weight functions
for later use
Uses CCl rather than glass.shells.distance_grid to avoid mixing CCL and CAMB
"""
import glass.shells
with self.open_input("fiducial_cosmology", wrapper=True) as f:
cosmo = f.to_ccl()
chi_min = cosmo.comoving_radial_distance(1 / (1 + self.config["zmin"]))
chi_max = cosmo.comoving_radial_distance(1 / (1 + self.config["zmax"]))
chi_grid = np.arange(chi_min, chi_max + self.config["dx"], self.config["dx"])
a_grid = cosmo.scale_factor_of_chi(chi_grid)
self.zb = (1.0 / a_grid) - 1.0
self.ws = glass.shells.tophat_windows(self.zb, weight=camb_tophat_weight)
self.nshells = len(self.ws)
# TO DO: figure out why GLASS needed the linear ramp weight here
def get_bias_at_z(self, z):
bias0 = self.config["bias0"]
alpha_bz = self.config["alpha_bz"]
zpivot = self.config["zpivot"]
return (
bias0
* (1.0 + (1.0 / 3.0) * ((1 + z) ** alpha_bz - 1))
/ (1.0 + (1.0 / 3.0) * ((1 + zpivot) ** alpha_bz - 1))
)
def generate_shell_cls(self):
"""
Generate angular power spectra (C(l)s) for each redshift shell
This method computes matter C(l)s using CCL for each pair of redshift shells based on the
provided fiducial cosmology
"""
import scipy.interpolate
import pyccl as ccl
import h5py
if self.config["cl_optional_file"] != "none":
with h5py.File(self.config["cl_optional_file"]) as cl_file:
self.ell = cl_file["lognormal_cl/ell"][:]
self.cls = cl_file["lognormal_cl/cls"][:]
else:
with self.open_input("fiducial_cosmology", wrapper=True) as f:
cosmo = f.to_ccl()
# In order for CCl to get the cross correlation between bins right,
# we need to interpolate the windows at all z
dz = self.ws[0].za[1] - self.ws[0].za[0]
zb_grid = np.arange(self.config["zmin"], self.config["zmax"] + dz, dz)
# Make density shell objects for CCL
density = []
for ishell in range(self.nshells):
# make bias const in shell
b_shell = self.get_bias_at_z(self.ws[ishell].zeff)
bz = np.ones(len(zb_grid)) * b_shell
wa_interped = scipy.interpolate.interp1d(
self.ws[ishell].za,
self.ws[ishell].wa,
bounds_error=False,
fill_value=0.0,
)(zb_grid)
density.append(
ccl.NumberCountsTracer(
cosmo,
dndz=(zb_grid, wa_interped),
has_rsd=False,
bias=(zb_grid, bz),
mag_bias=None,
)
)
self.ell = np.arange(self.lmax)
self.cls = []
self.cls_index = []
for i in range(1, self.nshells + 1):
for j in range(i, 0, -1):
cl_bin = cosmo.angular_cl(density[i - 1], density[j - 1], self.ell)
# fix monopole to 0
cl_bin[0] = 0
self.cls.append(cl_bin)
self.cls_index.append((i, j))
# save the C(l)
cl_output = self.open_output("glass_cl_shells")
group = cl_output.create_group("lognormal_cl")
group.create_dataset("ell", data=self.ell, dtype="f")
group.create_dataset("cls", data=self.cls, dtype="f")
group.create_dataset("cls_index", data=self.cls_index, dtype="f")
group.create_dataset("zb_grid", data=zb_grid, dtype="f")
cl_output.close()
print("shell Cls done")
def generate_binned_cls(self):
"""
Generate angular power spectra (C(l)s) for each tomographic redshift bin
The output of this method is not used in generating the simulation
It is just useful for comparing to the data
This method computes galaxy C(l)s using CCL for each pair of redshift bins based on the
provided fiducial cosmology
"""
import scipy.interpolate
import pyccl as ccl
import h5py
with self.open_input("fiducial_cosmology", wrapper=True) as f:
cosmo = f.to_ccl()
# load n(z)
nzs = []
with self.open_input("lens_photoz_stack", wrapper=True) as f:
for ibin in range(f.get_nbin()):
z_nz, nz_i = f.get_bin_n_of_z(ibin)
nzs.append(nz_i)
# Make density bin objects for CCL
density = []
for ibin in range(len(nzs)):
bz = self.get_bias_at_z(z_nz)
density.append(
ccl.NumberCountsTracer(
cosmo,
dndz=(z_nz, nzs[ibin]),
has_rsd=False,
bias=(z_nz, bz),
mag_bias=None,
)
)
# user can define the ell binning for the binned case
self.ell_binned = np.logspace(
np.log10(self.config["ell_binned_min"]),
np.log10(self.config["ell_binned_max"]),
self.config["ell_binned_nbins"],
)
self.cls_binned = []
self.cls_index_binned = []
for i in range(1, len(nzs) + 1):
for j in range(i, 0, -1):
cl_bin = cosmo.angular_cl(density[i - 1], density[j - 1], self.ell_binned)
self.cls_binned.append(cl_bin)
self.cls_index_binned.append((i, j))
# save the C(l)
cl_output = self.open_output("glass_cl_binned")
group = cl_output.create_group("lognormal_cl")
group.create_dataset("ell", data=self.ell_binned, dtype="f")
group.create_dataset("cls", data=self.cls_binned, dtype="f")
group.create_dataset("cls_index", data=self.cls_index_binned, dtype="f")
cl_output.close()
print("binned Cls done")
def generate_catalogs(self):
"""
Generate simulated galaxy catalogs based on lognormal fields
This method simulates galaxy positions within each matter shell and
populates them with corresponding redshifts and bin information
"""
import glass.fields
import glass.points
import glass.galaxies
import healpy as hp
rng = np.random.default_rng(int(self.config["random_seed"]))
# load n(z)
nzs = []
with self.open_input("lens_photoz_stack", wrapper=True) as f:
for ibin in range(f.get_nbin()):
z_nz, nz_i = f.get_bin_n_of_z(ibin)
nzs.append(nz_i)
# load mask
with self.open_input("mask", wrapper=True) as map_file:
mask = map_file.read_mask_healpix(
"mask", degrade_nside=self.config["nside"]
)
self.mask_map_info = map_file.read_map_info("mask")
mask[mask == hp.UNSEEN] = 0.0 # set UNSEEN pixels to 0 (GLASS needs this)
mask_area = np.sum(mask[mask != hp.UNSEEN]) * hp.nside2pixarea(self.nside, degrees=True)
# get number density arcmin^-2 for each z bin from config
target_num_dens = np.array(self.config["num_dens"])
if not len(target_num_dens) == len(nzs):
raise ValueError(f"num_dens config option length ({len(target_num_dens)}) does not match number of tomographic bins ({len(nzs)})")
self.nbin_lens = len(nzs)
# prepare the Lognormal C(l)
self.gls = glass.fields.lognormal_gls(
self.cls,
shift=self.config["shift"],
nside=self.nside,
lmax=self.lmax,
ncorr=3,
)
# check for negative values in the gls
for i, g in enumerate(self.gls):
if len(g) > 0:
if (g < 0).any():
print("negative values found in gC(l)", i, "setting to 0.")
print(g[g < 0])
g[g < 0] = 0.0
# generator for lognormal matter fields
matter = glass.fields.generate_lognormal(self.gls, self.nside, shift=self.config["shift"], ncorr=3, rng=rng)
# prepare for weight maps
if self.config["contaminate"]:
max_inv_w = self.get_max_inverse_weight()
# estimate max size of output catalog
self.est_max_n = int(1.5 * np.sum(target_num_dens) * mask_area * 60 * 60)
self.setup_output()
# simulate and add galaxies in each matter shell
shell_catalogs = []
count = 0
for ishell, delta_i in enumerate(matter):
print("computing shell", ishell, "at z =", self.zb[ishell])
# restrict galaxy distributions to this shell
z_i, dndz_i = glass.shells.restrict(z_nz, nzs, self.ws[ishell])
if (dndz_i == 0).all():
continue
# compute galaxy density (for each n(z)) in this shell
ngal_in_shell = target_num_dens * np.trapz(dndz_i, z_i) / np.trapz(nzs, z_nz)
self.write_shell_output(ishell, delta_i, ngal_in_shell, self.ws[ishell].zeff)
if self.config["contaminate"]:
ngal_in_shell *= max_inv_w
print("Ngal", ngal_in_shell * mask_area * 60 * 60)
# simulate positions from matter density
for gal_lon, gal_lat, gal_count in glass.points.positions_from_delta(
ngal_in_shell, delta_i, bias=None, vis=mask, rng=rng
):
# Figure out which bin was generated (len(ngal_in_shell) = Nbins)
occupied_bins = np.where(gal_count != 0)[0]
assert len(occupied_bins) == 1 # only one bin should be generated per call
ibin = occupied_bins[0]
gal_count_bin = gal_count[ibin]
# sample redshifts uniformly in shell
gal_z = glass.galaxies.redshifts_from_nz(gal_count_bin, self.ws[ishell].za, self.ws[ishell].wa)
gal_lon[gal_lon < 0] += 360 # keeps 0 < ra < 360
if self.config["contaminate"]:
obj_pixel_nest = hp.ang2pix(
self.mask_map_info["nside"],
gal_lon,
gal_lat,
lonlat=True,
nest=True,
)
obj_weight = self.get_obj_weight(ibin, obj_pixel_nest)
prob_accept = (1.0 / obj_weight) / max_inv_w[ibin]
obj_accept_contaminated = np.random.rand(len(gal_lon)) < prob_accept
gal_lon = gal_lon[obj_accept_contaminated]
gal_lat = gal_lat[obj_accept_contaminated]
gal_z = gal_z[obj_accept_contaminated]
gal_count_bin = np.sum(obj_accept_contaminated.astype("int"))
self.write_catalog_output_chunk(count, count + gal_count_bin, gal_lon, gal_lat, gal_z, ibin)
count += gal_count_bin
self.finalize_output(count)
def setup_output(self):
"""
Sets up the output data file for the catalog
Creates the data sets and groups for the generated photometry catalog
and lens tomography catalog output files
maxshape should be larger than a reasonable total Ngal
Note: We will saves RA, DEC and Z_TRUE in the photometry catalog and bin
information in the lens tomography catalog
"""
import healpy as hp
phot_output = self.open_output("photometry_catalog")
group = phot_output.create_group("photometry")
group.create_dataset("ra", (self.est_max_n,), maxshape=self.est_max_n, dtype="f")
group.create_dataset("dec", (self.est_max_n,), maxshape=self.est_max_n, dtype="f")
group.create_dataset("redshift_true", (self.est_max_n,), maxshape=self.est_max_n, dtype="f")
self.phot_output = phot_output
tomo_output = self.open_output("lens_tomography_catalog_unweighted")
group = tomo_output.create_group("tomography")
group.create_dataset("bin", (self.est_max_n,), maxshape=self.est_max_n, dtype="i")
group.create_dataset("lens_weight", (self.est_max_n,), maxshape=self.est_max_n, dtype="f")
group_counts = tomo_output.create_group("counts")
group_counts.create_dataset("counts", (self.nbin_lens,), dtype="i")
group_counts.create_dataset("counts_2d", (1,), dtype="i")
self.tomo_output = tomo_output
density_shell_output = self.open_output("density_shells")
if self.config["output_density_shell_maps"]:
group = density_shell_output.create_group("density_shell_maps")
group.attrs["nest"] = self.config["density_shell_nest"]
fullsky_npix = hp.nside2npix(self.nside)
for ishell in range(self.nshells):
group.create_dataset(f"shell{ishell}", (fullsky_npix,), dtype="f")
group = density_shell_output.create_group("num_dens_shell")
group.create_dataset(f"num_dens_shell", (self.nbin_lens, self.nshells), dtype="f")
group.create_dataset(f"zeff_shell", (self.nshells,), dtype="f")
self.density_shell_output = density_shell_output
def write_shell_output(self, ishell, delta_i, ngal_in_shell, zeff_shell):
"""
write a single density shell to output
"""
import healpy as hp
if self.config["output_density_shell_maps"]:
group = self.density_shell_output["density_shell_maps"]
if self.config["density_shell_nest"]:
#convert output shell maps to nest ordering
nside = hp.get_nside(delta_i)
npix = hp.nside2npix(hp.get_nside(delta_i))
pix_ring = np.arange(npix)
pix_nest = hp.ring2nest(nside, pix_ring)
delta_i_nest = np.empty_like(delta_i)
delta_i_nest[pix_nest] = delta_i[pix_ring]
group[f"shell{ishell}"][:] = delta_i_nest
else:
group[f"shell{ishell}"][:] = delta_i
group = self.density_shell_output["num_dens_shell"]
group["num_dens_shell"][:, ishell] = ngal_in_shell
group["zeff_shell"][ishell] = zeff_shell
def write_catalog_output_chunk(self, start, end, gal_lon, gal_lat, gal_z, tomobin):
"""
Writes a chunk of the photometry and tomography file
"""
assert end - start == len(gal_lat)
# write photometry catalog chunk
group = self.phot_output["photometry"]
group["ra"][start:end] = gal_lon
group["dec"][start:end] = gal_lat
group["redshift_true"][start:end] = gal_z
# write tomography catalog chunk
group = self.tomo_output["tomography"]
group["bin"][start:end] = np.full(end - start, tomobin)
group["lens_weight"][start:end] = np.ones(end - start)
def finalize_output(self, total_count):
"""
Removes any unused entrys in the catalog and adds the total counts to tomography file
"""
# remove unfilled objects
group = self.phot_output["photometry"]
group["ra"].resize((total_count,))
group["dec"].resize((total_count,))
group["redshift_true"].resize((total_count,))
# write global values
group = self.tomo_output["tomography"]
group["bin"].resize((total_count,))
group["lens_weight"].resize((total_count,))
counts = np.bincount(group["bin"][:])
assert total_count == np.sum(counts)
group_counts = self.tomo_output["counts"]
group_counts["counts"][:] = counts
group_counts["counts_2d"][:] = np.array([total_count])
group.attrs["nbin"] = self.nbin_lens
# close everything
self.phot_output.close()
self.tomo_output.close()
self.density_shell_output.close()
def get_max_inverse_weight(self):
"""
Get the maximum 1/weight for each tomographic bin
"""
max_inv_w = np.ones(self.nbin_lens)
with self.open_input("input_lss_weight_maps") as f:
for tomobin in range(self.nbin_lens):
value = f[f"maps/weight_map_bin_{tomobin}/value"][:]
max_inv_w[tomobin] = (1.0 / value).max()
assert f["maps"].attrs["nside"] == self.mask_map_info["nside"]
return max_inv_w
def get_obj_weight(self, tomobin, obj_pix_nest):
"""
Returns the weight for each object in a given tomographic bin
"""
import healpy as hp
with self.open_input("input_lss_weight_maps", wrapper=True) as f:
wmap = f.read_map(f"weight_map_bin_{tomobin}")
input_weight_map_is_nest = f.file[f"maps/weight_map_bin_{tomobin}"].attrs["nest"]
nside = self.mask_map_info["nside"]
if input_weight_map_is_nest:
return wmap[obj_pix_nest]
else:
return wmap[hp.nest2ring(nside, obj_pix_nest)]
def camb_tophat_weight(z):
"""
TAKEN FROM GLASS
This function returns a weight for a given redshift (z) based on a linear
ramp from 0 to 1, transitioning from z=0 to z=0.1.
Args:
z (float): The redshift at which to calculate the weight.
Returns:
float: The weight for the given redshift, between 0 and 1.
"""
return np.clip(z / 0.1, None, 1.0)