from .base_stage import PipelineStage
from .data_types import Directory, ShearCatalog, HDFFile, PNGFile, TomographyCatalog, TextFile
from .shear_calibration import (
Calibrator,
MeanShearInBins,
metadetect_variants,
band_variants,
)
from .utils.fitting import fit_straight_line
from .utils import import_dask, read_shear_catalog_type
from .plotting import manual_step_histogram
import numpy as np
from ceci.config import StageParameter
[docs]
class TXDiagnosticQuantiles(PipelineStage):
"""
Measure quantiles of various values in the shear catalog.
This uses a library called "Distogram" which builds a
histogram that it gradually updates, tweaking the edges
as it goes along. This means we don't have to load
the full data into memory ever.
The algorithm used can be noisy if there are large outliers in
the data, but after selection cuts are made this seems to be okay here,
with all the quantiles for the base quantities (T, s2n, T_psf, g1_psf) within
10% of the true quantile and the majority within 1%. For our purposes
(defining bins for diagonstics) this is fine.
"""
name = "TXDiagnosticQuantiles"
dask_parallel = True
inputs = [
("shear_catalog", ShearCatalog),
("shear_tomography_catalog", TomographyCatalog),
]
outputs = [
("shear_catalog_quantiles", HDFFile),
]
config_options = {
"nbins": StageParameter(int, 20, msg="Number of quantile bins to compute."),
"chunk_rows": StageParameter(int, 0, msg="Number of rows to process in each chunk (0 means auto)."),
"bands": StageParameter(str, "riz", msg="Bands to use for diagnostics."),
}
def run(self):
_, da = import_dask()
# Configuration parameters
chunk_rows = self.config["chunk_rows"]
nedge = self.config["nbins"] + 1
if chunk_rows == 0:
chunk_rows = "auto"
with self.open_input("shear_catalog", wrapper=True) as f:
group = f.get_primary_catalog_group()
# We canonicalise the names here
col_names = {
"psf_g1": f"{group}/psf_g1",
"psf_T_mean": f"{group}/psf_T_mean",
"s2n": f"{group}/s2n",
"T": f"{group}/T",
}
for band in self.config["bands"]:
col_names[f"mag_{band}"] = f"{group}/mag_{band}"
# We ask for quantiles at these points
quantiles = np.linspace(0, 1, nedge, endpoint=True)
percentiles = quantiles * 100
with self.open_input("shear_catalog") as f, self.open_input("shear_tomography_catalog") as g:
# We will be checking if the source is in a tomographic bin
# and doing quantiles only of selected obejcts (in any bin)
bins = da.from_array(g["tomography/bin"], chunks=chunk_rows)
selected = bins >= 0
# We now build up the quantile values
quantile_values = {}
for new_name, old_name in col_names.items():
# Create dask arrays of the columns. This loads them lazily,
# so no data is actually loaded here. Only when the "compute"
# method is called below does anything actually happen.
col = da.from_array(f[old_name], chunks=chunk_rows)
# Ask dask to compute the percentiles of this column.
# Again, it will not actually do anything until the "compute"
# method is called below. When that happens, it will
# chunk up the data and calculate the percentiles in parallel.
quantile_values[new_name] = da.percentile(col[selected], percentiles)
# Now ask dask to actually do the calculations
(quantile_values,) = da.compute(quantile_values)
# Open the output file and save the results
with self.open_output("shear_catalog_quantiles") as f:
# put everything in a group called "quantiles"
g = f.create_group("quantiles")
# Save the quantile points
g.create_dataset("quantiles", data=quantiles)
# Save the quantile values themselves
for name, quantile_values in quantile_values.items():
g.create_dataset(name, data=quantile_values)
[docs]
class TXSourceDiagnosticPlots(PipelineStage):
"""
Make diagnostic plots of the shear catalog
This includes both tomographic and 2D measurements, and includes
the PSF as a function of various quantities
"""
name = "TXSourceDiagnosticPlots"
inputs = [
("shear_catalog", ShearCatalog),
("shear_tomography_catalog", TomographyCatalog),
("shear_catalog_quantiles", HDFFile),
]
outputs = [
("g_psf_T", PNGFile),
("g_psf_g", PNGFile),
("g1_hist", PNGFile),
("g2_hist", PNGFile),
("g_snr", PNGFile),
("g_T", PNGFile),
("g_colormag", PNGFile),
("source_snr_hist", PNGFile),
("source_mag_hist", PNGFile),
("response_hist", PNGFile),
("g_psf_T_out", TextFile),
("g_psf_g_out", TextFile),
("g_snr_out", TextFile),
("g_T_out", TextFile),
]
config_options = {
"chunk_rows": StageParameter(int, 100000, msg="Number of rows to process in each chunk."),
"delta_gamma": StageParameter(float, 0.02, msg="Delta gamma value for metacal response calculation."),
"nbins": StageParameter(int, 20, msg="Number of bins for histograms."),
"g_min": StageParameter(float, -0.03, msg="Minimum g value for plots."),
"g_max": StageParameter(float, 0.05, msg="Maximum g value for plots."),
"psfT_min": StageParameter(float, 0.04, msg="Minimum PSF T value for plots."),
"psfT_max": StageParameter(float, 0.36, msg="Maximum PSF T value for plots."),
"T_min": StageParameter(float, 0.04, msg="Minimum T value for plots."),
"T_max": StageParameter(float, 4.0, msg="Maximum T value for plots."),
"s2n_min": StageParameter(float, 10, msg="Minimum S/N value for plots."),
"s2n_max": StageParameter(float, 300, msg="Maximum S/N value for plots."),
"psf_unit_conv": StageParameter(bool, False, msg="Whether to convert PSF units."),
"bands": StageParameter(str, "riz", msg="Bands to use for diagnostics."),
}
def run(self):
# PSF tests
import matplotlib
matplotlib.use("agg")
# this also sets self.config["shear_catalog_type"]
cat_type = read_shear_catalog_type(self)
# Collect together all the methods on this class called self.plot_*
# They are all expected to be python coroutines - generators that
# use the yield feature to pause and wait for more input.
# We instantiate them all here
plotters = [getattr(self, f)() for f in dir(self) if f.startswith("plot_")]
# Start off each of the plotters. This will make them all run up to the
# first yield statement, then pause and wait for the first chunk of data
for plotter in plotters:
plotter.send(None)
# Create an iterator for reading through the input data.
# This method automatically splits up data among the processes,
# so the plotters should handle this.
chunk_rows = self.config["chunk_rows"]
bands = self.config["bands"]
if self.rank == 0:
print("Catalog type = ", cat_type)
if cat_type == "metacal":
shear_cols = [
f"psf_g1",
f"psf_g2",
f"psf_T_mean",
"g1",
"g1_1p",
"g1_2p",
"g1_1m",
"g1_2m",
"g2",
"g2_1p",
"g2_2p",
"g2_1m",
"g2_2m",
"s2n",
"T",
"T_1p",
"T_2p",
"T_1m",
"T_2m",
"s2n_1p",
"s2n_2p",
"s2n_1m",
"s2n_2m",
"weight",
] + [f"mag_{b}" for b in bands]
elif cat_type == "metadetect":
# g1, g2, T, psf_g1, psf_g2, T, s2n, weight, magnitudes
shear_cols = metadetect_variants(
"g1",
"g2",
"T",
"psf_g1",
"psf_g2",
"psf_T_mean",
"s2n",
"weight",
)
shear_cols += band_variants(bands, "mag", "mag_err", shear_catalog_type="metadetect")
else:
shear_cols = [
"dec",
"psf_g1",
"psf_g2",
"g1",
"g2",
"psf_T_mean",
"s2n",
"T",
"weight",
"m",
] + [f"mag_{b}" for b in self.config["bands"]]
shear_tomo_cols = ["bin"]
if self.config["shear_catalog_type"] == "metacal":
more_iters = ["shear_tomography_catalog", "response", ["R_gamma"]]
else:
more_iters = []
it = self.combined_iterators(
chunk_rows,
"shear_catalog",
"shear",
shear_cols,
"shear_tomography_catalog",
"tomography",
shear_tomo_cols,
*more_iters,
longest=True,
)
# Now loop through each chunk of input data, one at a time.
# Each time we get a new segment of data, which goes to all the plotters
for start, end, data in it:
print(f"Read data {start} - {end}")
# This causes each data = yield statement in each plotter to
# be given this data chunk as the variable data.
for plotter in plotters:
plotter.send(data)
# Tell all the plotters to finish, collect together results from the different
# processors, and make their final plots. Plotters need to respond
# to the None input and
for plotter in plotters:
try:
plotter.send(None)
except StopIteration:
pass
def get_bin_edges(self, col):
"""
Get the bin edges for a given column from the quantiles file
"""
with self.open_input("shear_catalog_quantiles") as f:
edges = f[f"quantiles/{col}"][:]
return edges
def plot_psf_shear(self):
# mean shear in bins of PSF
print("Making PSF shear plot")
import matplotlib.pyplot as plt
from scipy import stats
delta_gamma = self.config["delta_gamma"]
psf_g_edges = self.get_bin_edges("psf_g1")
shear_prefix = "00/" if self.config["shear_catalog_type"] == "metadetect" else ""
p1 = MeanShearInBins(
f"{shear_prefix}psf_g1",
psf_g_edges,
delta_gamma,
cut_source_bin=True,
shear_catalog_type=self.config["shear_catalog_type"],
)
p2 = MeanShearInBins(
f"{shear_prefix}psf_g2",
psf_g_edges,
delta_gamma,
cut_source_bin=True,
shear_catalog_type=self.config["shear_catalog_type"],
)
psf_g_mid = 0.5 * (psf_g_edges[1:] + psf_g_edges[:-1])
while True:
data = yield
if data is None:
break
p1.add_data(data)
p2.add_data(data)
mu1, mean11, mean12, std11, std12 = p1.collect(self.comm)
mu2, mean21, mean22, std21, std22 = p2.collect(self.comm)
if self.rank != 0:
return
fig = self.open_output("g_psf_g", wrapper=True)
# Include a small shift to be able to see the g1 / g2 points on the plot
dx = 0.1 * (psf_g_edges[1] - psf_g_edges[0])
idx = np.where(np.isfinite(mu1))[0]
slope11, intercept11, mc_cov = fit_straight_line(mu1[idx], mean11[idx], std11[idx])
std_err11 = mc_cov[0, 0] ** 0.5
line11 = slope11 * (mu1) + intercept11
slope12, intercept12, mc_cov = fit_straight_line(mu1[idx], mean12[idx], std12[idx])
std_err12 = mc_cov[0, 0] ** 0.5
line12 = slope12 * (mu1) + intercept12
slope21, intercept21, mc_cov = fit_straight_line(mu2[idx], mean21[idx], std21[idx])
std_err21 = mc_cov[0, 0] ** 0.5
line21 = slope21 * (mu2) + intercept21
slope22, intercept22, mc_cov = fit_straight_line(mu2[idx], mean22[idx], y_err=std22)
std_err22 = mc_cov[0, 0] ** 0.5
line22 = slope22 * (mu2) + intercept22
plt.subplot(2, 1, 1)
plt.plot(mu1, line11, color="red", label=r"$m=%.2e \pm %.2e$" % (slope11, std_err11))
plt.plot(mu1, line12, color="blue", label=r"$m=%.2e \pm %.2e$" % (slope12, std_err12))
plt.plot(mu1, [0] * len(line11), color="black")
plt.errorbar(mu1 + dx, mean11, std11, label="g1", fmt="s", markersize=5, color="red")
plt.errorbar(mu1 - dx, mean12, std12, label="g2", fmt="o", markersize=5, color="blue")
plt.xlabel("PSF g1")
plt.ylabel("Mean g")
plt.legend()
plt.subplot(2, 1, 2)
plt.plot(mu2, line21, color="red", label=r"$m=%.2e \pm %.2e$" % (slope21, std_err21))
plt.plot(mu2, line22, color="blue", label=r"$m=%.2e \pm %.2e$" % (slope22, std_err22))
plt.plot(mu2, [0] * len(line22), color="black")
plt.errorbar(mu2 + dx, mean21, std21, label="g1", fmt="s", markersize=5, color="red")
plt.errorbar(mu2 - dx, mean22, std22, label="g2", fmt="o", markersize=5, color="blue")
plt.xlabel("PSF g2")
plt.ylabel("Mean g")
plt.legend()
plt.tight_layout()
# This also saves the figure
fig.close()
f = self.open_output("g_psf_g_out")
data = [mu1, mu2, mean11, mean12, mean21, mean22, std11, std12, std21, std22, line11, line12, line21, line22]
f.write("".join([str(i) + "\n" for i in data]))
f.close()
def plot_psf_size_shear(self):
# mean shear in bins of PSF
print("making shear psf size plot")
import matplotlib.pyplot as plt
from scipy import stats
delta_gamma = self.config["delta_gamma"]
psf_T_edges = self.get_bin_edges("psf_T_mean")
shear_prefix = "00/" if self.config["shear_catalog_type"] == "metadetect" else ""
binnedShear = MeanShearInBins(
f"{shear_prefix}psf_T_mean",
psf_T_edges,
delta_gamma,
cut_source_bin=True,
shear_catalog_type=self.config["shear_catalog_type"],
psf_unit_conv=self.config["psf_unit_conv"],
)
while True:
data = yield
if data is None:
break
binnedShear.add_data(data)
mu, mean1, mean2, std1, std2 = binnedShear.collect(self.comm)
if self.rank != 0:
return
dx = 0.05 * (psf_T_edges[1] - psf_T_edges[0])
idx = np.where(np.isfinite(mu))[0]
slope1, intercept1, cov1 = fit_straight_line(mu[idx], mean1[idx], std1[idx])
std_err1 = cov1[0, 0] ** 0.5
line1 = slope1 * mu + intercept1
slope2, intercept2, cov2 = fit_straight_line(mu[idx], mean2[idx], std2[idx])
std_err2 = cov2[0, 0] ** 0.5
line2 = slope2 * mu + intercept2
fig = self.open_output("g_psf_T", wrapper=True)
plt.plot(mu, line1, color="red", label=r"$m=%.2e \pm %.2e$" % (slope1, std_err1))
plt.plot(mu, line2, color="blue", label=r"$m=%.2e \pm %.2e$" % (slope2, std_err2))
plt.plot(mu, [0] * len(mu), color="black")
plt.errorbar(mu + dx, mean1, std1, label="g1", fmt="s", markersize=5, color="red")
plt.errorbar(mu - dx, mean2, std2, label="g2", fmt="o", markersize=5, color="blue")
plt.xlabel("PSF $T$")
plt.ylabel("Mean g")
plt.legend(loc="best")
plt.tight_layout()
fig.close()
f = self.open_output("g_psf_T_out")
data = [mu, mean1, mean2, std1, std2, line1, line2]
f.write("".join([str(i) + "\n" for i in data]))
f.close()
def plot_snr_shear(self):
# mean shear in bins of snr
print("Making mean shear SNR plot")
import matplotlib.pyplot as plt
from scipy import stats
# Parameters of the binning in SNR
shear_catalog_type = self.config["shear_catalog_type"]
shear_prefix = "00/" if shear_catalog_type == "metadetect" else ""
delta_gamma = self.config["delta_gamma"]
snr_edges = self.get_bin_edges("s2n")
# This class includes all the cutting and calibration, both for
# estimator and selection biases
binnedShear = MeanShearInBins(
f"{shear_prefix}s2n",
snr_edges,
delta_gamma,
cut_source_bin=True,
shear_catalog_type=shear_catalog_type,
)
while True:
# This happens when we have loaded a new data chunk
data = yield
# Indicates the end of the data stream
if data is None:
break
binnedShear.add_data(data)
mu, mean1, mean2, std1, std2 = binnedShear.collect(self.comm)
if self.rank != 0:
return
# Get the error on the mean
dx = 0.05 * (snr_edges[1] - snr_edges[0])
idx = np.where(np.isfinite(mu))[0]
slope1, intercept1, mc_cov = fit_straight_line(mu[idx], mean1[idx], std1[idx])
std_err1 = mc_cov[0, 0] ** 0.5
line1 = slope1 * mu + intercept1
slope2, intercept2, mc_cov = fit_straight_line(mu[idx], mean2[idx], std2[idx])
std_err2 = mc_cov[0, 0] ** 0.5
line2 = slope2 * mu + intercept2
fig = self.open_output("g_snr", wrapper=True)
plt.plot(mu, line1, color="red", label=r"$m=%.2e \pm %.2e$" % (slope1, std_err1))
plt.plot(mu, line2, color="blue", label=r"$m=%.2e \pm %.2e$" % (slope2, std_err2))
plt.plot(mu, [0] * len(mu), color="black")
plt.errorbar(mu + dx, mean1, std1, label="g1", fmt="s", markersize=5, color="red")
plt.errorbar(mu - dx, mean2, std2, label="g2", fmt="o", markersize=5, color="blue")
plt.xlabel("SNR")
plt.ylabel("Mean g")
plt.legend()
plt.tight_layout()
fig.close()
f = self.open_output("g_snr_out")
data = [mu, mean1, mean2, std1, std2, line1, line2]
f.write("".join([str(i) + "\n" for i in data]))
f.close()
def plot_size_shear(self):
# mean shear in bins of galaxy size
print("Making mean shear galaxy size plot")
import matplotlib.pyplot as plt
from scipy import stats
shear_catalog_type = self.config["shear_catalog_type"]
shear_prefix = "00/" if shear_catalog_type == "metadetect" else ""
delta_gamma = self.config["delta_gamma"]
T_edges = self.get_bin_edges("T")
binnedShear = MeanShearInBins(
f"{shear_prefix}T",
T_edges,
delta_gamma,
cut_source_bin=True,
shear_catalog_type=shear_catalog_type,
psf_unit_conv=self.config["psf_unit_conv"],
)
while True:
# This happens when we have loaded a new data chunk
data = yield
# Indicates the end of the data stream
if data is None:
break
binnedShear.add_data(data)
mu, mean1, mean2, std1, std2 = binnedShear.collect(self.comm)
if self.rank != 0:
return
dx = 0.05 * (T_edges[1] - T_edges[0])
idx = np.where(np.isfinite(mu))[0]
slope1, intercept1, mc_cov = fit_straight_line(mu[idx], mean1[idx], y_err=std1[idx])
std_err1 = mc_cov[0, 0] ** 0.5
line1 = slope1 * mu + intercept1
slope2, intercept2, mc_cov = fit_straight_line(mu[idx], mean2[idx], y_err=std2[idx])
std_err2 = mc_cov[0, 0] ** 0.5
line2 = slope2 * mu + intercept2
fig = self.open_output("g_T", wrapper=True)
plt.plot(mu, line1, color="red", label=r"$m=%.2e \pm %.2e$" % (slope1, std_err1))
plt.plot(mu, line2, color="blue", label=r"$m=%.2e \pm %.2e$" % (slope2, std_err2))
plt.plot(mu, [0] * len(mu), color="black")
plt.errorbar(mu + dx, mean1, std1, label="g1", fmt="s", markersize=5, color="red")
plt.errorbar(mu - dx, mean2, std2, label="g2", fmt="o", markersize=5, color="blue")
plt.xlabel("galaxy size T")
plt.ylabel("Mean g")
plt.legend()
plt.tight_layout()
fig.close()
f = self.open_output("g_T_out")
data = [mu, mean1, mean2, std1, std2, line1, line2]
f.write("".join([str(i) + "\n" for i in data]))
f.close()
def plot_mag_shear(self):
# mean shear in bins of magnitude
print("Making mean shear band magnitude plot")
import matplotlib.pyplot as plt
from scipy import stats
shear_catalog_type = self.config["shear_catalog_type"]
shear_prefix = "00/" if shear_catalog_type == "metadetect" else ""
delta_gamma = self.config["delta_gamma"]
nbins = self.config["nbins"]
stat = {}
binnedShear = {}
for band in self.config["bands"]:
m_edges = self.get_bin_edges(f"mag_{band}")
binnedShear[f"{band}"] = MeanShearInBins(
f"{shear_prefix}mag_{band}",
m_edges,
delta_gamma,
cut_source_bin=True,
shear_catalog_type=self.config["shear_catalog_type"],
)
while True:
# This happens when we have loaded a new data chunk
data = yield
# Indicates the end of the data stream
if data is None:
break
for band in self.config["bands"]:
binnedShear[f"{band}"].add_data(data)
for band in self.config["bands"]:
(
stat[f"mu_{band}"],
stat[f"mean1_{band}"],
stat[f"mean2_{band}"],
stat[f"std1_{band}"],
stat[f"std2_{band}"],
) = binnedShear[f"{band}"].collect(self.comm)
if self.rank != 0:
return
for band in self.config["bands"]:
dx = 0.05 * (m_edges[1] - m_edges[0])
idx = np.where(np.isfinite(stat[f"mu_{band}"]))[0]
stat[f"slope1_{band}"], stat[f"intercept1_{band}"], stat[f"mc_cov_{band}"] = fit_straight_line(
stat[f"mu_{band}"][idx], stat[f"mean1_{band}"][idx], y_err=stat[f"std1_{band}"][idx]
)
stat[f"std_err1_{band}"] = stat[f"mc_cov_{band}"][0, 0] ** 0.5
stat[f"line1_{band}"] = stat[f"slope1_{band}"] * stat[f"mu_{band}"] + stat[f"intercept1_{band}"]
stat[f"slope2_{band}"], stat[f"intercept2_{band}"], stat[f"mc_cov_{band}"] = fit_straight_line(
stat[f"mu_{band}"][idx], stat[f"mean2_{band}"][idx], y_err=stat[f"std2_{band}"][idx]
)
stat[f"std_err2_{band}"] = stat[f"mc_cov_{band}"][0, 0] ** 0.5
stat[f"line2_{band}"] = stat[f"slope2_{band}"] * stat[f"mu_{band}"] + stat[f"intercept2_{band}"]
fig = self.open_output("g_colormag", wrapper=True)
for band, clr1, clr2 in zip(
self.config["bands"], ["maroon", "firebrick", "red"], ["darkblue", "royalblue", "deepskyblue"]
):
plt.subplot(2, 1, 1)
plt.plot(
stat[f"mu_{band}"],
stat[f"line1_{band}"],
color=clr1,
label=r"$m=%.2e \pm %.2e$" % (stat[f"slope1_{band}"], stat[f"std_err1_{band}"]),
)
plt.plot(stat[f"mu_{band}"], [0] * len(stat[f"mu_{band}"]), color="black")
plt.errorbar(
stat[f"mu_{band}"] + dx,
stat[f"mean1_{band}"],
stat[f"std1_{band}"],
label=f"{band}-band",
fmt="s",
markersize=5,
color=clr1,
)
plt.ylabel("Mean g1")
plt.xlabel("magnitude")
plt.legend()
plt.subplot(2, 1, 2)
plt.plot(
stat[f"mu_{band}"],
stat[f"line2_{band}"],
color=clr2,
label=r"$m=%.2e \pm %.2e$" % (stat[f"slope2_{band}"], stat[f"std_err2_{band}"]),
)
plt.plot(stat[f"mu_{band}"], [0] * len(stat[f"mu_{band}"]), color="black")
plt.errorbar(
stat[f"mu_{band}"] - dx,
stat[f"mean2_{band}"],
stat[f"std2_{band}"],
label=f"{band}-band",
fmt="o",
markersize=5,
color=clr2,
)
plt.ylabel("Mean g2")
plt.xlabel("magnitude")
plt.legend()
plt.tight_layout()
fig.close()
def plot_g_histogram(self):
from parallel_statistics import ParallelHistogram
print("plotting histogram")
import matplotlib.pyplot as plt
from scipy import stats
cat_type = self.config["shear_catalog_type"]
delta_gamma = self.config["delta_gamma"]
bins = 20
edges = np.linspace(-1, 1, bins + 1)
mids = 0.5 * (edges[1:] + edges[:-1])
width = edges[1:] - edges[:-1]
# Calibrate everything in the 2D bin
_, cal = Calibrator.load(self.get_input("shear_tomography_catalog"))
H1 = ParallelHistogram(edges)
H2 = ParallelHistogram(edges)
H1_weighted = ParallelHistogram(edges)
H2_weighted = ParallelHistogram(edges)
while True:
data = yield
if data is None:
break
qual_cut = data["bin"] != -1
if cat_type == "metacal":
g1 = data["g1"]
g2 = data["g2"]
w = data["weight"]
elif cat_type == "metadetect":
g1 = data["00/g1"]
g2 = data["00/g2"]
w = data["00/weight"]
elif cat_type == "lensfit":
dec = data["dec"]
g1 = data["g1"]
g2 = data["g2"]
w = data["weight"]
else:
g1 = data["g1"]
g2 = data["g2"]
c1 = data["c1"]
c2 = data["c2"]
w = data["weight"]
if cat_type == "metacal" or cat_type == "metadetect":
g1, g2 = cal.apply(g1, g2)
elif cat_type == "lensfit":
# In KiDS, the additive bias is calculated and removed per North and South field
# therefore, we add dec to split data into these fields.
# You can choose not to by setting dec_cut = 90 in the config, for example.
g1, g2 = cal.apply(g1, g2, dec)
else:
g1, g2 = cal.apply(g1, g2, c1, c2)
H1.add_data(g1)
H2.add_data(g2)
H1_weighted.add_data(g1, w)
H2_weighted.add_data(g2, w)
count1 = H1.collect(self.comm)
count2 = H2.collect(self.comm)
weight1 = H1_weighted.collect(self.comm)
weight2 = H2_weighted.collect(self.comm)
if self.rank != 0:
return
for i, count, weight in [(1, count1, weight1), (2, count2, weight2)]:
with self.open_output(f"g{i}_hist", wrapper=True) as fig:
plt.bar(
mids,
count,
width=width,
align="center",
color="lightblue",
label="Unweighted",
)
plt.bar(
mids,
weight,
width=width,
align="center",
color="none",
edgecolor="red",
label="Weighted",
)
plt.xlabel(f"g{i}")
plt.ylabel("Count")
plt.ylim(0, 1.1 * max(count1))
plt.legend()
def plot_snr_histogram(self):
from parallel_statistics import ParallelMeanVariance
print("plotting snr histogram")
import matplotlib.pyplot as plt
delta_gamma = self.config["delta_gamma"]
shear_catalog_type = self.config["shear_catalog_type"]
shear_prefix = "00/" if shear_catalog_type == "metadetect" else ""
bins = 10
edges = np.logspace(1, 3, bins + 1)
mids = 0.5 * (edges[1:] + edges[:-1])
calc1 = ParallelMeanVariance(bins)
while True:
data = yield
if data is None:
break
qual_cut = data["bin"] != -1
b1 = np.digitize(data[f"{shear_prefix}s2n"][qual_cut], edges) - 1
for i in range(bins):
w = np.where(b1 == i)
# Do more things here to establish
calc1.add_data(i, data[f"{shear_prefix}s2n"][qual_cut][w])
count1, mean1, var1 = calc1.collect(self.comm, mode="gather")
if self.rank != 0:
return
std1 = np.sqrt(var1 / count1)
fig = self.open_output("source_snr_hist", wrapper=True)
plt.bar(
mids,
count1,
width=edges[1:] - edges[:-1],
edgecolor="black",
align="center",
color="blue",
)
plt.ticklabel_format(style="sci", axis="y", scilimits=(0, 0))
plt.xscale("log")
plt.xlabel("log(snr)")
plt.ylabel(r"$N_{galaxies}$")
plt.ylim(0, 1.1 * max(count1))
fig.close()
def plot_response_histograms(self):
import matplotlib.pyplot as plt
if self.comm:
import mpi4py.MPI
size = 10
# This seems to be a reasonable range, though there are samples
# with extremely high values
edges = np.linspace(-3, 3, size + 1)
mid = 0.5 * (edges[1:] + edges[:-1])
width = edges[1] - edges[0]
cat_type = self.config["shear_catalog_type"]
if cat_type == "metacal":
# count of objects
counts = np.zeros((2, 2, size))
# make a separate histogram of the shear-sample-selected
# objects
counts_s = np.zeros((2, 2, size))
else:
# count of objects
counts = np.zeros((size))
# make a separate histogram of the shear-sample-selected
# objects
counts_s = np.zeros((size))
# Main loop
while True:
data = yield
if data is None:
break
if cat_type == "metadetect":
# No per-object R values in metadetect
continue
# check if selected for any source bin
in_shear_sample = data["bin"] != -1
if cat_type == "metacal":
B = np.digitize(data["R_gamma"], edges) - 1
# loop through this chunk of data.
for s, b in zip(in_shear_sample, B):
# for each element in the 2x2 matrix
for i in range(2):
for j in range(2):
bij = b[i, j]
# this will naturally filter out
# the nans
if (bij >= 0) and (bij < size):
counts[i, j, bij] += 1
if s:
counts_s[i, j, bij] += 1
elif cat_type == "lensfit":
B = np.digitize(data["m"], edges) - 1
# loop through this chunk of data.
for s, b in zip(in_shear_sample, B):
if (b >= 0) and (b < size):
counts[b] += 1
if s:
counts_s[b] += 1
else:
B = np.digitize(data["R"], edges) - 1
# loop through this chunk of data.
for s, b in zip(in_shear_sample, B):
if (b >= 0) and (b < size):
counts[b] += 1
if s:
counts_s[b] += 1
# Sum from all processors and then non-root ones return
if self.comm is not None:
if self.rank == 0:
self.comm.Reduce(mpi4py.MPI.IN_PLACE, counts)
self.comm.Reduce(mpi4py.MPI.IN_PLACE, counts_s)
else:
self.comm.Reduce(counts, None)
self.comm.Reduce(counts_s, None)
# only root process makes plots
return
fig = self.open_output("response_hist", wrapper=True, figsize=(10, 5))
plt.subplot(1, 2, 1)
if cat_type == "metacal":
manual_step_histogram(edges, counts[0, 0], label="R00", color="#1f77b4")
manual_step_histogram(edges, counts[1, 1], label="R11", color="#ff7f0e")
manual_step_histogram(edges, counts[0, 1], label="R01", color="#2ca02c")
manual_step_histogram(edges, counts[1, 0], label="R10", color="#d62728")
elif cat_type == "metadetect":
plt.text(
0.5,
0.5,
"This plot is intentionally left blank",
horizontalalignment="center",
)
counts[:] = 0.909
else:
manual_step_histogram(edges, counts, label="R", color="#1f77b4")
plt.ylim(0, counts.max() * 1.1)
plt.xlabel("Response")
plt.ylabel("Count")
plt.title("All flag=0")
plt.subplot(1, 2, 2)
if cat_type == "metacal":
manual_step_histogram(edges, counts_s[0, 0], label="R00", color="#1f77b4")
manual_step_histogram(edges, counts_s[1, 1], label="R11", color="#ff7f0e")
manual_step_histogram(edges, counts_s[0, 1], label="R01", color="#2ca02c")
manual_step_histogram(edges, counts_s[1, 0], label="R10", color="#d62728")
elif cat_type == "metadetect":
plt.text(
0.5,
0.5,
"This plot is intentionally left blank",
horizontalalignment="center",
)
counts_s[:] = 0.909
else:
manual_step_histogram(edges, counts_s, label="R", color="#1f77b4")
plt.ylim(0, counts_s.max() * 1.1)
plt.xlabel("R_gamma")
plt.ylabel("Count")
plt.title("Source sample")
plt.legend()
plt.tight_layout()
fig.close()
def plot_mag_histograms(self):
if self.comm:
import mpi4py.MPI
# mean shear in bins of PSF
print("Making mag histogram")
import matplotlib.pyplot as plt
size = 10
mag_min = 20
mag_max = 30
edges = np.linspace(mag_min, mag_max, size + 1)
mid = 0.5 * (edges[1:] + edges[:-1])
width = edges[1] - edges[0]
bands = self.config["bands"]
shear_prefix = "00/" if self.config["shear_catalog_type"] == "metadetect" else ""
nband = len(bands)
full_hists = [np.zeros(size, dtype=int) for b in bands]
source_hists = [np.zeros(size, dtype=int) for b in bands]
while True:
data = yield
if data is None:
break
for b, h1, h2 in zip(bands, full_hists, source_hists):
b1 = np.digitize(data[f"{shear_prefix}mag_{b}"], edges) - 1
for i in range(size):
w = b1 == i
count = w.sum()
h1[i] += count
w &= data["bin"] >= 0
count = w.sum()
h2[i] += count
if self.comm is not None:
full_hists = reduce(self.comm, full_hists)
source_hists = reduce(self.comm, source_hists)
if self.rank == 0:
fig = self.open_output("source_mag_hist", wrapper=True, figsize=(4, nband * 3))
for i, (b, h1, h2) in enumerate(zip(bands, full_hists, source_hists)):
plt.subplot(nband, 1, i + 1)
plt.bar(mid, h1, width=width, fill=False, label="Complete", edgecolor="r")
plt.bar(mid, h2, width=width, fill=True, label="WL Source", color="g")
plt.xlabel(f"Mag {b}")
plt.ylabel("N")
if i == 0:
plt.legend()
plt.tight_layout()
fig.close()
[docs]
class TXLensDiagnosticPlots(PipelineStage):
"""
Make diagnostic plots of the lens catalog
Currently this consists only of histograms of SNR and mag.
"""
name = "TXLensDiagnosticPlots"
# This tells ceci to launch under dask:
dask_parallel = True
inputs = [
("photometry_catalog", HDFFile),
("lens_tomography_catalog", TomographyCatalog),
]
outputs = [
("lens_snr_hist", PNGFile),
("lens_mag_hist", PNGFile),
]
config_options = {
"block_size": StageParameter(int, 0, msg="Block size for dask processing (0 means auto)."),
"delta_gamma": StageParameter(float, 0.02, msg="Delta gamma value for metacal response calculation."),
"mag_min": StageParameter(float, 18, msg="Minimum magnitude for plots."),
"mag_max": StageParameter(float, 28, msg="Maximum magnitude for plots."),
"snr_min": StageParameter(float, 5, msg="Minimum S/N for plots."),
"snr_max": StageParameter(float, 200, msg="Maximum S/N for plots."),
"bands": StageParameter(str, "ugrizy", msg="Bands to use for diagnostics."),
}
def run(self):
# PSF tests
import matplotlib
matplotlib.use("agg")
files, data, nbin = self.load_data()
self.plot_snr_histograms(data, nbin)
self.plot_mag_histograms(data, nbin)
# These need to stay open until the end
for f in files:
f.close()
def load_data(self):
_, da = import_dask()
bands = self.config["bands"]
# These need to stay open until dask has finished with them.:
f = self.open_input("photometry_catalog")
g = self.open_input("lens_tomography_catalog")
# read nbin from metadata
nbin = g["tomography"].attrs["nbin"]
# Default to the automatic value but expose as an option
block = self.config["block_size"]
if block == 0:
block = "auto"
# Load data columns, lazily for dask
data = {}
for b in bands:
data[f"mag_{b}"] = da.from_array(f[f"photometry/mag_{b}"], block)
# all the blocks must be the same size. If the columns are of different
# types then dask can sometimes select different block sizes for each column
# which can cause problems. So we force them to be the same size.
if block == "auto":
block = data[f"mag_{b}"].chunksize
data[f"snr_{b}"] = da.from_array(f[f"photometry/snr_{b}"], block)
data["bin"] = da.from_array(g["tomography/bin"], block)
# Avoid recomputing selections in each histogram by doing it externally here
data["sel"] = da.nonzero(data["bin"] >= 0)
for i in range(nbin):
data[f"sel{i}"] = da.nonzero(data["bin"] == i)
# Return the open files so they stay open until dask has finished with them.
# and also the dict of lazy columns and the bin info
return [f, g], data, nbin
def plot_snr_histograms(self, data, nbin):
smin = self.config["snr_min"]
smax = self.config["snr_max"]
bins = np.geomspace(smin, smax, 50)
xlog = True
self.plot_histograms(data, nbin, "snr", xlog, bins)
def plot_mag_histograms(self, data, nbin):
# Histogram ranges are read from configuration choices
mmin = self.config["mag_min"]
mmax = self.config["mag_max"]
# let's do this in 0.5 mag bins
bins = np.arange(mmin, mmax + 1e-6, 0.5)
xlog = False
self.plot_histograms(data, nbin, "mag", xlog, bins)
def plot_histograms(self, data, nbin, name, xlog, bins):
dask, da = import_dask()
import matplotlib.pyplot as plt
bands = self.config["bands"]
nband = len(bands)
hists = {}
# Do a different set of histograms for each band
for i, b in enumerate(bands):
sel = data["sel"]
# first do the global non-tomographic version (all selected objects)
hists[b, -1] = da.histogram(data[f"{name}_{b}"][sel], bins=bins)
# and also loop through tomo bins
for j in range(nbin):
sel = data[f"sel{j}"]
hists[b, j] = da.histogram(data[f"{name}_{b}"][sel], bins=bins)
# Launch actual dask computations so we have the data ready to plot
# Doing these all at once seems to be faster
print(f"Beginning {name} histogram compute")
(hists,) = dask.compute(hists)
print("Done")
# Now make all the panels. The open_output method returns an object that
# can be used as a context manager and will be saved automatically at the end to the
# right location
figsize = (4 * nband, 4 * (nbin + 1))
with self.open_output(f"lens_{name}_hist", wrapper=True, figsize=figsize) as fig:
axes = fig.file.subplots(nbin + 1, nband, squeeze=False)
for i, b in enumerate(bands):
for j in range(-1, nbin):
bj = j if j >= 0 else "2D"
heights, edges = hists[b, j]
ax = axes[j + 1, i]
# This is my function to plot a line histogram from pre-computed edges and heights
manual_step_histogram(edges, heights, ax=ax)
if xlog:
ax.set_xscale("log")
ax.set_xlabel(f"Bin {bj} {b}-{name}")
axes[0, 0].set_ylabel("Count")
fig.file.tight_layout()
def reduce(comm, H):
H2 = []
rank = comm.Get_rank()
for h in H:
if rank == 0:
hsum = np.zeros_like(h)
else:
hsum = None
comm.Reduce(h, hsum)
H2.append(hsum)
return H2