"""
This file contains TXPipe-specific file types, subclassing the more
generic types in base.py
"""
from .base import HDFFile, DataFile, YamlFile
import yaml
[docs]class ShearCatalog(HDFFile):
"""
A generic shear catalog
"""
# These are columns
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._catalog_type = None
[docs] def read_catalog_info(self):
try:
group = self.file["shear"]
info = dict(group.attrs)
except:
raise ValueError(f"Unable to read shear catalog")
shear_catalog_type = info.get("catalog_type")
return shear_catalog_type
@property
def catalog_type(self):
if self._catalog_type is not None:
return self._catalog_type
if "catalog_type" in self.file["shear"].attrs:
t = self.file["shear"].attrs["catalog_type"]
elif "mcal_g1" in self.file["shear"].keys():
t = "metacal"
elif "1p" in self.file["shear"].keys():
t = "metadetect"
elif "c1" in self.file["shear"].keys():
t = "lensfit"
else:
raise ValueError("Could not figure out catalog format")
self._catalog_type = t
return t
[docs] def get_size(self):
if self.catalog_type == "metadetect":
return self.file["shear/00/ra"].size
else:
return self.file["shear/ra"].size
[docs] def get_primary_catalog_group(self):
if self.catalog_type == "metadetect":
return "shear/00"
else:
return "shear"
[docs] def get_primary_catalog_names(self, true_shear=False):
if true_shear:
shear_cols = ["true_g1", "true_g2", "ra", "dec", "weight"]
rename = {"true_g1": "g1", "true_g2": "g2"}
elif self.catalog_type == "metacal":
shear_cols = ["mcal_g1", "mcal_g2", "ra", "dec", "weight"]
rename = {"mcal_g1": "g1", "mcal_g2": "g2"}
elif self.catalog_type == "hsc":
shear_cols = ["g1", "g2", "c1", "c2", "ra", "dec", "weight"]
rename = {}
elif self.catalog_type == "metadetect":
shear_cols = ["00/g1", "00/g2", "00/ra", "00/dec", "00/weight"]
rename = {c: c[3:] for c in shear_cols}
else:
shear_cols = ["g1", "g2", "ra", "dec", "weight"]
rename = {}
return shear_cols, rename
[docs]class TomographyCatalog(HDFFile):
required_datasets = []
[docs] def read_zbins(self, bin_type):
"""
Read saved redshift bin edges from attributes
"""
d = dict(self.file["tomography"].attrs)
nbin = d[f"nbin_{bin_type}"]
zbins = [
(d[f"{bin_type}_zmin_{i}"], d[f"{bin_type}_zmax_{i}"]) for i in range(nbin)
]
return zbins
[docs] def read_nbin(self, bin_type):
d = dict(self.file["tomography"].attrs)
return d[f"nbin_{bin_type}"]
[docs]class RandomsCatalog(HDFFile):
required_datasets = ["randoms/ra", "randoms/dec"]
[docs]class MapsFile(HDFFile):
required_datasets = []
[docs] def list_maps(self):
import h5py
maps = []
# h5py uses this visititems method to walk through
# a file, looking at everything underneath a path.
# We use it here to search through everything in the
# "maps" section of a maps file looking for any groups
# that seem to be a map. You have to pass a function
# like this to visititems.
def visit(name, obj):
if isinstance(obj, h5py.Group):
keys = obj.keys()
# we save maps with these two data sets,
# so if they are both there then this will
# be a map
if "pixel" in keys and "value" in keys:
maps.append(name)
# Now actually run this
self.file["maps"].visititems(visit)
# return the accumulated list
return maps
[docs] def read_healpix(self, map_name, return_all=False):
import healpy
import numpy as np
group = self.file[f"maps/{map_name}"]
nside = group.attrs["nside"]
npix = healpy.nside2npix(nside)
m = np.repeat(healpy.UNSEEN, npix)
pix = group["pixel"][:]
val = group["value"][:]
m[pix] = val
if return_all:
return m, pix, nside
else:
return m
[docs] def read_map_info(self, map_name):
group = self.file[f"maps/{map_name}"]
info = dict(group.attrs)
if not "pixelization" in info:
raise ValueError(
f"Map '{map_name}' not found, "
f"or not saved properly in file {self.path}"
)
return info
[docs] def read_map(self, map_name):
info = self.read_map_info(map_name)
pixelization = info["pixelization"]
if pixelization == "gnomonic":
m = self.read_gnomonic(map_name)
elif pixelization == "healpix":
m = self.read_healpix(map_name)
else:
raise ValueError(f"Unknown map pixelization type {pixelization}")
return m
[docs] def read_mask(self):
mask = self.read_map("mask")
mask[mask < 0] = 0
return mask
[docs] def write_map(self, map_name, pixel, value, metadata):
"""
Save an output map to an HDF5 subgroup.
The pixel numbering and the metadata are also saved.
Parameters
----------
group: H5Group
The h5py Group object in which to store maps
name: str
The name of this map, used as the name of a subgroup in the group where the data is stored.
pixel: array
Array of indices of observed pixels
value: array
Array of values of observed pixels
metadata: mapping
Dict or other mapping of metadata to store along with the map
"""
if not "maps" in self.file:
self.file.create_group("maps")
if not "pixelization" in metadata:
raise ValueError("Map metadata should include pixelization")
if not pixel.shape == value.shape:
raise ValueError(
f"Map pixels and values should be same shape "
f"but are {pixel.shape} vs {value.shape}"
)
if not 'maps' in self.file.keys():
self.file.create_group('maps')
subgroup = self.file["maps"].create_group(map_name)
subgroup.attrs.update(metadata)
subgroup.create_dataset("pixel", data=pixel)
subgroup.create_dataset("value", data=value)
[docs] def plot_healpix(self, map_name, view="cart", **kwargs):
import healpy
import numpy as np
m, pix, nside = self.read_healpix(map_name, return_all=True)
lon, lat = healpy.pix2ang(nside, pix, lonlat=True)
npix = healpy.nside2npix(nside)
if len(pix) == 0:
print(f"Empty map {map_name}")
return
if len(pix) == len(m):
w = np.where((m != healpy.UNSEEN) & (m != 0))
else:
w = None
lon_range = [lon[w].min() - 0.1, lon[w].max() + 0.1]
lat_range = [lat[w].min() - 0.1, lat[w].max() + 0.1]
lat_range = np.clip(lat_range, -90, 90)
m[m == 0] = healpy.UNSEEN
title = kwargs.pop("title", map_name)
if view == "cart":
healpy.cartview(
m, lonra=lon_range, latra=lat_range, title=title, hold=True, **kwargs
)
elif view == "moll":
healpy.mollview(m, title=title, hold=True, **kwargs)
else:
raise ValueError(f"Unknown Healpix view mode {mode}")
[docs] def read_gnomonic(self, map_name):
import numpy as np
group = self.file[f"maps/{map_name}"]
info = dict(group.attrs)
nx = info["nx"]
ny = info["ny"]
m = np.zeros((ny, nx))
m[:, :] = np.nan
pix = group["pixel"][:]
val = group["value"][:]
w = np.where(pix != -9999)
pix = pix[w]
val = val[w]
x = pix % nx
y = pix // nx
m[y, x] = val
return m
[docs] def plot_gnomonic(self, map_name, **kwargs):
import matplotlib.pyplot as plt
import numpy as np
info = self.read_map_info(map_name)
ra_min, ra_max = info["ra_min"], info["ra_max"]
if ra_min > 180 and ra_max < 180:
ra_min -= 360
ra_range = (ra_max, ra_min)
dec_range = (info["dec_min"], info["dec_max"])
# the view arg is needed for healpix but not gnomonic
kwargs.pop("view")
m = self.read_gnomonic(map_name)
extent = list(ra_range) + list(dec_range)
title = kwargs.pop("title", map_name)
plt.imshow(m, aspect="equal", extent=extent, **kwargs)
plt.title(title)
plt.colorbar()
[docs] def plot(self, map_name, **kwargs):
info = self.read_map_info(map_name)
pixelization = info["pixelization"]
if pixelization == "gnomonic":
m = self.plot_gnomonic(map_name, **kwargs)
elif pixelization == "healpix":
m = self.plot_healpix(map_name, **kwargs)
else:
raise ValueError(f"Unknown map pixelization type {pixelization}")
return m
[docs]class LensingNoiseMaps(MapsFile):
required_datasets = []
[docs] def read_rotation(self, realization_index, bin_index):
g1_name = f"rotation_{realization_index}/g1_{bin_index}"
g2_name = f"rotation_{realization_index}/g2_{bin_index}"
g1 = self.read_map(g1_name)
g2 = self.read_map(g2_name)
return g1, g2
[docs] def number_of_realizations(self):
info = self.file["maps"].attrs
lensing_realizations = info["lensing_realizations"]
return lensing_realizations
[docs]class ClusteringNoiseMaps(MapsFile):
[docs] def read_density_split(self, realization_index, bin_index):
rho1_name = f"split_{realization_index}/rho1_{bin_index}"
rho2_name = f"split_{realization_index}/rho2_{bin_index}"
rho1 = self.read_map(rho1_name)
rho2 = self.read_map(rho2_name)
return rho1, rho2
[docs] def number_of_realizations(self):
info = self.file["maps"].attrs
clustering_realizations = info["clustering_realizations"]
return clustering_realizations
[docs]class PhotozPDFFile(HDFFile):
required_datasets = []
[docs]class CSVFile(DataFile):
suffix = "csv"
[docs] def save_file(self, name, dataframe):
dataframe.to_csv(name)
[docs]class SACCFile(DataFile):
suffix = "sacc"
[docs] @classmethod
def open(cls, path, mode, **kwargs):
import sacc
if mode == "w":
raise ValueError(
"Do not use the open_output method to write sacc files. Use sacc.write_fits"
)
return sacc.Sacc.load_fits(path)
[docs] def read_provenance(self):
meta = self.file.metadata
provenance = {
"uuid": meta.get("provenance/uuid", "UNKNOWN"),
"creation": meta.get("provenance/creation", "UNKNOWN"),
"domain": meta.get("provenance/domain", "UNKNOWN"),
"username": meta.get("provenance/username", "UNKNOWN"),
}
return provenance
[docs]class NOfZFile(HDFFile):
# Must have at least one bin in
required_datasets = []
[docs] def get_nbin(self, kind):
return self.file["n_of_z"][kind].attrs["nbin"]
[docs] def get_n_of_z(self, kind, bin_index):
group = self.file["n_of_z"][kind]
z = group["z"][:]
nz = group[f"bin_{bin_index}"][:]
return (z, nz)
[docs] def get_n_of_z_spline(self, bin_index, kind="cubic", **kwargs):
import scipy.interpolate
z, nz = self.get_n_of_z(bin_index)
spline = scipy.interpolate.interp1d(z, nz, kind=kind, **kwargs)
return spline
[docs] def save_plot(self, filename, **fig_kw):
import matplotlib.pyplot as plt
plt.figure(**fig_kw)
self.plot()
plt.legend()
plt.savefig(filename)
plt.close()
[docs] def plot(self, kind):
import matplotlib.pyplot as plt
for b in range(self.get_nbin(kind)):
z, nz = self.get_n_of_z(kind, b)
plt.plot(z, nz, label=f"Bin {b}")
[docs]class FiducialCosmology(YamlFile):
# TODO replace when CCL has more complete serialization tools.
[docs] def to_ccl(self, **kwargs):
import pyccl as ccl
with open(self.path, "r") as fp:
params = yaml.load(fp, Loader=yaml.Loader)
# Now we assemble an init for the object since the CCL YAML has
# extra info we don't need and different formatting.
inits = dict(
Omega_c=params["Omega_c"],
Omega_b=params["Omega_b"],
h=params["h"],
n_s=params["n_s"],
sigma8=None if params["sigma8"] == "nan" else params["sigma8"],
A_s=None if params["A_s"] == "nan" else params["A_s"],
Omega_k=params["Omega_k"],
Neff=params["Neff"],
w0=params["w0"],
wa=params["wa"],
bcm_log10Mc=params["bcm_log10Mc"],
bcm_etab=params["bcm_etab"],
bcm_ks=params["bcm_ks"],
mu_0=params["mu_0"],
sigma_0=params["sigma_0"],
)
if "z_mg" in params:
inits["z_mg"] = params["z_mg"]
inits["df_mg"] = params["df_mg"]
if "m_nu" in params:
inits["m_nu"] = params["m_nu"]
inits["m_nu_type"] = "list"
inits.update(kwargs)
return ccl.Cosmology(**inits)
[docs]class QPFile(DataFile):
# TODO: Flesh this out
suffix = "hdf5"