Source code for txpipe.maps

from .base_stage import PipelineStage
from .data_types import TomographyCatalog, MapsFile, HDFFile
from ceci.config import StageParameter
import numpy as np
from .utils import unique_list, choose_pixelization, import_dask
from .mapping import make_dask_shear_maps, make_dask_lens_maps, make_coverage_map


SHEAR_SHEAR = 0
SHEAR_POS = 1
POS_POS = 2

NON_TOMO_BIN = 999999

# These generic mapping options are used by multiple different
# map types.
# TODO: consider dropping support for gnomonic maps.
# Also consider adding support for pixell
map_config_options = {
    "chunk_rows": StageParameter(int, 100000, msg="The number of rows to read in each chunk of data at a time"),
    "pixelization": StageParameter(str, "healpix", msg="The pixelization scheme to use, currently just healpix"),
    "nside": StageParameter(
        int, 0, msg="The Healpix resolution parameter for the generated maps. Only required if using healpix"
    ),
    "sparse": StageParameter(
        bool, True, msg="Whether to generate sparse maps - faster and less memory for small sky areas"
    ),
    "ra_cent": StageParameter(
        float, np.nan, msg="Central RA for gnomonic projection (only required if pixelization==tan)"
    ),
    "dec_cent": StageParameter(
        float, np.nan, msg="Central Dec for gnomonic projection (only required if pixelization==tan)"
    ),
    "npix_x": StageParameter(int, -1, msg="Number of pixels in x direction for gnomonic projection"),
    "npix_y": StageParameter(int, -1, msg="Number of pixels in y direction for gnomonic projection"),
    "pixel_size": StageParameter(float, np.nan, msg="Pixel size of pixelization scheme"),
}


[docs] class TXBaseMaps(PipelineStage): """ A base class for mapping stages This is an abstract base class, which other subclasses inherit from to use the same basic structure, which is: - select pixelization - prepare some mapper objects - iterate through selected columns - update each mapper with each chunk - finalize the mappers - save the maps """ name = "TXBaseMaps" inputs = [] outputs = [] config_options = {} def run(self): # Read input configuration information # and select pixel scheme. Also save the scheme # metadata so we can save it later pixel_scheme = self.choose_pixel_scheme() self.pixel_metadata = pixel_scheme.metadata self.config.update(self.pixel_metadata) # Initialize our maps mappers = self.prepare_mappers(pixel_scheme) # Loop through the data for s, e, data in self.data_iterator(): # Give an idea of progress print(f"Process {self.rank} read data chunk {s:,} - {e:,}") # Build up any maps we are using self.accumulate_maps(pixel_scheme, data, mappers) # Finalize and output the maps maps = self.finalize_mappers(pixel_scheme, mappers) if self.rank == 0: self.save_maps(pixel_scheme, maps) def choose_pixel_scheme(self): """ Subclasses can override to instead load pixelization from an existing map """ return choose_pixelization(**self.config) def prepare_mappers(self, pixel_scheme): """ Subclasses must override to init any mapper objects """ raise RuntimeError("Do not use TXBaseMaps - use a subclass") def data_iterator(self): """ Subclasses must override to create an iterator looping over input data """ raise RuntimeError("Do not use TXBaseMaps - use a subclass") def accumulate_maps(self, pixel_scheme, data, mappers): """ Subclasses must override to supply the next chunk "data" to their mappers """ raise RuntimeError("Do not use TXBaseMaps - use a subclass") def finalize_mappers(self, pixel_scheme, mappers): """ Subclasses must override to finalize their maps and return a dictionary of (output_tag, map_name) -> (pixels, values) """ raise RuntimeError("Do not use TXBaseMaps - use a subclass") def save_maps(self, pixel_scheme, maps): """ Subclasses can use this directly, by generating maps as described in finalize_mappers """ import healsparse as hsp import healpy as hp # Find and open all output files tags = unique_list([outfile for outfile, _ in maps.keys()]) output_files = {tag: self.open_output(tag, wrapper=True) for tag in tags if tag.endswith("maps")} # add a maps section to each for output_file in output_files.values(): output_file.file.create_group("maps") # this is a bit of a hack, but if you have set an alias # it can't be added as an attr to the hdf5 file # this saves everythinkg but aliases from ceci.config import StageConfig config_no_alias = {} for k in self.config.keys(): if k == "aliases": continue config_no_alias[k] = self.config[k] config_no_alias = StageConfig(**config_no_alias) # output_file.file["maps"].attrs.update(self.config) output_file.file["maps"].attrs.update(config_no_alias) # same the relevant maps in each for (tag, map_name), (pix, map_data) in maps.items(): output_files[tag].write_map_pixval( map_name, pix, map_data, self.pixel_metadata )
[docs] class TXSourceMaps(PipelineStage): """Generate source maps directly from binned, calibrated shear catalogs. This implementation uses DASK, which offers a numpy-like syntax and hides the complicated parallelization details. """ name = "TXSourceMaps" dask_parallel = True inputs = [ ("binned_shear_catalog", HDFFile), ] outputs = [ ("source_maps", MapsFile), ] config_options = { "block_size": StageParameter(int, 0, msg="Block size for dask processing (0 means auto)"), **map_config_options, } def run(self): dask, da = import_dask() import healpy import healsparse as hsp # Configuration options pixel_scheme = choose_pixelization(**self.config) nside = self.config["nside"] npix = healpy.nside2npix(nside) block_size = self.config["block_size"] if block_size == 0: block_size = "auto" # We have to keep this open throughout the process, because # dask will internally load chunks of the input hdf5 data. f = self.open_input("binned_shear_catalog") nbin = f["shear"].attrs["nbin_source"] # Build a "master" coverage map using all the ra and dec values in the catalog # This way we can use the same coverage mask for all the maps ra = da.from_array(f[f"shear/bin_all/ra"], block_size) dec = da.from_array(f[f"shear/bin_all/dec"], block_size) cov_map = make_coverage_map(ra, dec, pixel_scheme) # The "all" bin is the non-tomographic case. bins = list(range(nbin)) + ["all"] output = {} for i in bins: # These don't actually load all the data - everything is lazy ra = da.from_array(f[f"shear/bin_{i}/ra"], block_size) block_size = ra.chunksize dec = da.from_array(f[f"shear/bin_{i}/dec"], block_size) g1 = da.from_array(f[f"shear/bin_{i}/g1"], block_size) g2 = da.from_array(f[f"shear/bin_{i}/g2"], block_size) weight = da.from_array(f[f"shear/bin_{i}/weight"], block_size) shear_map_results = make_dask_shear_maps( ra, dec, g1, g2, weight, pixel_scheme, cov_map ) # slight change in output name if i == "all": i = "2D" # Save all the stuff we want here. # fmt: off output[f"count_{i}"] = shear_map_results["count_map"] output[f"g1_{i}"] = shear_map_results["g1_map"] output[f"g2_{i}"] = shear_map_results["g2_map"] output[f"lensing_weight_{i}"] = shear_map_results["weight_map"] output[f"var_e_{i}"] = shear_map_results["esq_map"] output[f"var_g1_{i}"] = shear_map_results["var1_map"] output[f"var_g2_{i}"] = shear_map_results["var2_map"] # fmt: on # mask is where a pixel is hit in any of the tomo bins mask = da.zeros(shear_map_results["count_map"].shape[0], dtype=bool) for i in bins: if i == "all": i = "2D" mask |= output[f"lensing_weight_{i}"] > 0 output["mask"] = mask # Everything above is lazy - this forces the computation. # It works out an efficient (we hope) way of doing everything in parallel (output,) = dask.compute(output) f.close() # convert sparse_map arrays into healsparse map objects output_hsp = {} for name, map in output.items(): output_hsp[name] = hsp.HealSparseMap( cov_map=cov_map, sparse_map=map, nside_sparse=cov_map.nside_sparse ) # collate metadata metadata = {key: self.config[key] for key in map_config_options} metadata["nbin"] = nbin metadata["nbin_source"] = nbin metadata.update(pixel_scheme.metadata) # write the output maps with self.open_output("source_maps", wrapper=True) as out: for i in bins: # again rename "all" to "2D" if i == "all": i = "2D" # We save the pixels in the mask - i.g. any pixel that is hit in any # tomographic bin is included. Some will be UNSEEN. for key in "g1", "g2", "count", "var_e", "var_g1", "var_g2", "lensing_weight": out.write_map(f"{key}_{i}", output_hsp[f"{key}_{i}"], metadata) out.file["maps"].attrs.update(metadata)
[docs] class TXLensMaps(PipelineStage): """ Make tomographic lens number count maps Uses photometry and lens tomography catalogs. Density maps are made later once masks are generated. """ name = "TXLensMaps" dask_parallel = True inputs = [ ("photometry_catalog", HDFFile), ("lens_tomography_catalog", TomographyCatalog), ] outputs = [ ("lens_maps", MapsFile), ] config_options = { "block_size": StageParameter(int, 0, msg="Block size for dask processing (0 means auto)"), **map_config_options, } def ra_dec_inputs(self): return "photometry_catalog", "photometry" def run(self): import healsparse as hsp _, da = import_dask() # The subclass reads the ra and dec of the lenses # from a different input file, so we allow that here # using this method which is overwrites cat_name, cat_group = self.ra_dec_inputs() # open our two input files. They will be read lazily tomo_cat = self.open_input("lens_tomography_catalog", wrapper=True) photo_cat = self.open_input(cat_name, wrapper=True) nbin_lens = tomo_cat.read_nbin() self.config["nbin_lens"] = nbin_lens # Other config info. pixel_scheme = choose_pixelization(**self.config) block_size = self.config["block_size"] if block_size == 0: block_size = "auto" # Lazily open input data sets ra = da.from_array(photo_cat.file[f"{cat_group}/ra"], block_size) block_size = ra.chunksize dec = da.from_array(photo_cat.file[f"{cat_group}/dec"], block_size) weight = da.from_array(tomo_cat.file["tomography/lens_weight"], block_size) tomo_bin = da.from_array(tomo_cat.file["tomography/bin"], block_size) # Build a "master" coverage map using all the ra and dec values in the catalog # This way we can use the same coverage mask for all the maps cov_map = make_coverage_map(ra, dec, pixel_scheme) # bins to generate maps for bins = list(range(nbin_lens)) + ["2D"] maps = {} # Generate the maps with dask. This is lazy until we do da.compute for b in bins: lens_map_results = make_dask_lens_maps( ra, dec, weight, tomo_bin, b, pixel_scheme, cov_map, sentinel=0.0 ) maps[f"ngal_{b}"] = lens_map_results["count_map"] maps[f"weighted_ngal_{b}"] = lens_map_results["weight_map"] # Actually run everything (maps,) = da.compute(maps) # convert sparse_map arrays into healsparse map objects maps_hsp = {} for name, map in maps.items(): maps_hsp[name] = hsp.HealSparseMap( cov_map=cov_map, sparse_map=map, nside_sparse=cov_map.nside_sparse, sentinel=0.0, ) # collate metadata metadata = {key: self.config[key] for key in map_config_options} metadata["nbin"] = nbin_lens metadata["nbin_lens"] = nbin_lens metadata.update(pixel_scheme.metadata) # Save all the maps with self.open_output("lens_maps", wrapper=True) as out: for name, map in maps_hsp.items(): out.write_map(name, map, metadata) out.file["maps"].attrs.update(metadata)
[docs] class TXExternalLensMaps(TXLensMaps): """ Make tomographic lens number count maps from external data Same as TXLensMaps except it reads from an external lens catalog. """ name = "TXExternalLensMaps" inputs = [ ("lens_catalog", HDFFile), ("lens_tomography_catalog", TomographyCatalog), ] def ra_dec_inputs(self): return "lens_catalog", "lens"
[docs] class TXDensityMaps(PipelineStage): """ Convert galaxy count maps to overdensity delta maps delta = ngal / (weight * <ngal>/<weight>) - 1 This has to be separate from the lens mappers above because it requires the mask, which is created elsewhere (right now in masks.py) """ name = "TXDensityMaps" parallel = False inputs = [ ("lens_maps", MapsFile), ("mask", MapsFile), ] outputs = [ ("density_maps", MapsFile), ] config_options = {"mask_threshold": StageParameter(float, 0.0, msg="Threshold for masking pixels")} def run(self): import healpy import healsparse as hsp # Read the mask and set all pixels below the threshold to 0 with self.open_input("mask", wrapper=True) as f: mask = f.read_mask( thresh=self.config["mask_threshold"], degrade_nside=self.config["nside"] ) # identify unmasked pixels pix = mask.valid_pixels # Read the count maps with self.open_input("lens_maps", wrapper=True) as f: meta = dict(f.file["maps"].attrs) assert meta["nside"] == self.config["nside"] nbin_lens = meta["nbin_lens"] ngal_maps = [f.read_map(f"weighted_ngal_{b}") for b in range(nbin_lens)] # Convert count maps into density maps density_maps = [] for i, ng in enumerate(ngal_maps): # In order to interpret the empty pixels as 0 the sentinel must be 0. assert ng.sentinel == 0.0 # calculate mean of ng and mean of mask mu_n = np.mean(ng[pix]) mu_w = np.mean(mask[pix]) # make a new map for density delta_map = hsp.HealSparseMap.make_empty( ng.nside_coverage, ng.nside_sparse, np.float64 ) delta_map[pix] = (ng[pix] / mask[pix]) / (mu_n / mu_w) density_maps.append(delta_map) # write output with self.open_output("density_maps", wrapper=True) as f: # create group and save metadata there too. f.file.create_group("maps") f.file["maps"].attrs.update(meta) # save each density map for i, rho in enumerate(density_maps): f.write_map(f"delta_{i}", rho, meta)