Adding new pipeline stages
You can easily add new stages to TXPipe to calculate new observables or tests, or to modify existing steps.
Nothing in TXPipe is set in stone, so if you need to reconfigure other parts of the pipeline to make things work then please get in touch on the slack channel #desc-3x2pt to discuss it.
Planning your changes
First, plan how your work fits in with existing TXPipe stages. Does it replace an existing stage, or maybe several of them? Or is it a new addition?
Consider how your work should be split into different stages - what intermediate computations are there in the pipeline? The decision is often a trade-off between the need for repeated I/O and flexibility and ease of re-running things.
If you are important data from outside the pipeline, consider whether the process that generates that data could be a pipeline stage too, to allow people to re-run it easily when new data is available.
Have a look at flow charts for the existing pipelines, like the example pipeline here, to help you decide - look at the inputs that are available from existing code, for example.
Where to put your new work
If your new work is part of the core mission of TXPipe - computing clustering and lensing 2pt functions and associated information - then make new files for your code inside the txpipe/
directory. If it is an extension project, make a new directory in txpipe/extensions
.
It’s encouraged to put the bulk of your code in an external library and write a fairly simple pipeline stage in TXPipe.
Writing your stage
Setting up your class
The template below is an example of a new pipeline stage.
from .base_stage import PipelineStage
from .data_types import HDFFile
import numpy as np
class TXYourStageName(PipelineStage):
name = "TXYourStageName"
inputs = [
("some_input_tag", HDFFile),
# ...
]
outputs = [
("some_input_tag", HDFFile),
# ...
]
config_options = {
"name_of_option": "default_value",
# or for things without a default, specify the type,
# e.g. int, float, str, [float]
# the latter is for a list of floats.
"name_of_required_param": int,
}
def run(self):
# import any other required modules at the top
# and then put the rest of your code here
This class will inherit lots of behaviour from its parent PipelineStage class, which tell it how to connect to other pipeline stages, how to run, and the facilities described below.
The name of the class and the attribute name
should be the same, and be descriptive and clear. For core TXPipe modules it should start TX
; for extensions you can choose your own prefix.
You need to decide on the inputs and outputs for the file, and give them tags and types.
For inputs, search the page on current TXPipe files.
For each output, you can choose a tag, which will determine the name of the output file, and choose a file type from the various classes in the data_types page in the stages listing for details.
Using configuration parameters
When you run a stage, a dictionary called self.config
will be created with all the configuration information in it. The dict is populated with this priority:
Parameters set on the command line (top priority)
Parameters set in the config file
Parameter defaults in the class
Reading input data
You should always use TXPipe to find the paths to input data files. You can also use it to load data from them if you prefer - that’s especially helpful when running in parallel, since there are tools for that.
self.get_input(tag)
- returns the path to a file. Tag is one of the tags you listed in the inputs
field in your classs. The method returns a string.
self.open_input(tag, wrapper=False)
- returns an open handle to the named input tag. If wrapper
is False then this method will return a low-level object, such as an open python file
object, h5py.File
, or fitsio.FITS
, for example. It’s usually better to set wrapper=True
, in which case you get an instance of the class named in the inputs list. You can always access the underlying file object with obj.file
. See the data_types page in the stages list for the methods these classes have.
self.iterate_hdf(tag, group_name, cols, chunk_rows)
and self.iterate_fits(tag, hdunum, cols, chunk_rows)
- use these to make an iterator that you can use in a for
loop to read chunks for data at a time from the chosen file; it yields a tuple of start index, end index, and a data dict. This will also read in parallel (see below) when running under MPI.
it = self.iterate_hdf("shear_catalog", "shear", ["ra", "dec"], 100_000)
for start, end, data in it:
print(f"Read data from {start} - {end}")
ra = data["ra"]
dec = data["dec"]
...
self.combined_iterators(self, rows, *inputs)
- combines several calls to iterate_hdf
together to pull columns from different files or groups. Yields the same tuple as iterate_hdf
, with all the data combined into one dict. For example:
it = self.combined_iterators(100_000,
"shear_catalog", "shear", ["ra", "dec"],
"shear_tomography_catalog", "tomography", ["source_bin"],
)
for start, end, data in it:
print(f"Read data from {start} - {end}")
ra = data["ra"]
dec = data["dec"]
source_bin = data["source_bin"]
...
Writing output data
As with input files, you should use parent methods to find paths for and open output files. Unlike with inputs, though, you are strongly encouraged to use wrapper=True
, since this also automatically saves a wide range of provenance data in the output file.
self.get_output(tag)
- return the path to the file. Not preferred - use the next method instead, as noted above.
self.open_output(tag, wrapper=False)
- return an open file object or data file instance. It is preferred to set wrapper=True
and use the object returned.
Running and testing your stage
First, in txpipe/__init__.py
, import your new stage(s) from your python module(s). This lets TXPipe know about the new modules.
Then you can run a stage and get a list of options with:
python -m txpipe TXYourStageName --help
It will tell you the options you can specify to set input paths. Output paths are optional, and if left out the outputs will be put in your current directory.
Parallelizing your stage
It is relatively easy to parallelize TXPipe stages using either MPI (with mpi4py) or Dask.
TXPipe stages are assumed to be parallel by default unless you set parallel = False
in the class (alongside inputs
, etc.). They will use MPI by default; to use dask
, set dask_parallel = True
.
You can run a stage in parallel on the command line using (on local machines) mpirun -n <number-of-processes> python -m txpipe TXYourStageName --mpi ...
followed by other options.
Parallelizing at NERSC
The NERSC computers are particularly for parallel TXPipe because they can store files to make them accessible quickly from multiple processes at the same time. Run the command stripe_large
on a directory before copying any files in it to enable this; it makes a big difference.
Parallelizing with mpi4py
In the MPI model, all the different processes run the same program. Each process, though, has an index number, called the rank, which tells it which processes it is. The different processes then decide which data to process, or what to do, based on their rank.
The rank zero process is usually called the root process and is often in charge of tasks that should only be performed by one process.
All TXPipe stage instances have these three attributes:
* self.size
- the number of processes
* self.rank
- the index of the process, ranging from 0 .. self.size - 1
* self.comm
- an mpi4py
communicator object. Methods on this object like send
, recv
, bcast
can be used to communicate between processes. This will be set to None
if the stage was not run in parallel.
You can use these; don’t forget to check if self.comm is not None
in case the stage has been run in serial (non-parallel) mode.
See mpi4py and MPI documentation to learn more about using MPI in TXPipe. The Parallel Statistics Library may also be useful.
Parallel I/O with mpi4py
Reading data in parallel is usually straightforward - multiple processes can always open the same file for reading at the same time.
The iterate_hdf
and related methods described above all operate in parallel by default - if you call them from an MPI process then each process will load different chunks of data, and get different sets of start
and end
indices.
Parallel writing is more subtle, and requires more coordination. It is only supported for HDF5 files. You can pass the keyword parallel=True
to the open_output
method to return a file ready for parallel writing. Then every process can write to the file, provided that they don’t write to the same part of it (it’s often useful to use the same start/end indices here).
Parallelizing with dask
You can use the Dask library as an alternative to HDF5 by putting dask_parallel = True
at the top of the class. In this model only one process actually runs the code. One more is reserved as a work manager, and then the rest are all workers that tasks are automatically sent out to. You can then use dask’s extensive library of numpy-like tools to do calculations.