The Base Stage Class

All TXPipe stages inherit from this base class. It adds only a little to the parent ceci PipelineStage class.

class txpipe.base_stage.PipelineStage(args, comm=None, aliases=None)[source]

The parent class for all TXPipe stages

This stage should not be used directly (hence the name)

Runnig and Configuration

run()[source]

The main function that does the work and must be implemented when building a TXPipe class.

config

The configuration dictionary for this stage, aggregating command line options and optional configuration file.

Options specified in the subclass variable config_options are read from the configuration file, command line, or make_stage choices, and stored in this dictionary.

Reading and Writing Inputs and Outputs

open_input(tag, wrapper=False, **kwargs)[source]

Find and open an input file with the given tag, in read-only mode.

For general files this will simply return a standard python file object.

For specialized file types like FITS or HDF5 it will return a more specific object - see the types.py file for more info.

Parameters:
  • tag (str) – Tag as listed in self.inputs

  • wrapper (bool) – Whether to return an underlying file object (False) or a data type instance (True)

  • **kwargs (dict) – Extra arguments to pass to the file class constructor

Returns:

obj – The opened file or object

Return type:

file or object

open_output(tag, wrapper=False, **kwargs)[source]

Find and open an output file with the given tag, in write mode.

If final_name is True then they will be opened using their final target output name. Otherwise we will prepend inprogress_ to their file name. This means we know that if the final file exists then it is completed.

If wrapper is True this will return an instance of the class of the file as specified in the cls.outputs. Otherwise it will return an open file object (standard python one or something more specialized).

Parameters:
  • tag (str) – Tag as listed in self.outputs

  • wrapper (bool) – Whether to return an underlying file object (False) or a data type instange (True)

  • final_name (bool) – Default=False. Whether to save to

  • **kwargs – Extra args are passed on to the file’s class constructor, most notably “parallel” for parallel HDF writing.

Returns:

obj – The opened file or object

Return type:

file or object

iterate_fits(tag, hdunum, cols, chunk_rows, parallel=True)[source]

Loop through chunks of the input data from a FITS file with the given tag

TODO: add ceci tests of this functions

Parameters:
  • tag (str) – The tag from the inputs list to use

  • hdunum (int) – The extension number to read

  • cols (list) – The columns to read

  • chunk_rows (int) – Number of columns to read and return at once

  • parallel (bool) – Whether to split up data among processes (parallel=True) or give all processes all data (parallel=False). Default = True.

Returns:

it – Iterator yielding (int, int, array) tuples of (start, end, data) data is a structured array.

Return type:

iterator

iterate_hdf(tag, group_name, cols, chunk_rows, parallel=True, longest=False)[source]

Loop through chunks of the input data from an HDF5 file with the given tag.

All the selected columns must have the same length.

Parameters:
  • tag (str) – The tag from the inputs list to use

  • group (str) – The group within the HDF5 file to use, looked up as file[group]

  • cols (list) – The columns to read

  • chunk_rows (int) – Number of columns to read and return at once

  • parallel (bool) – Whether to split up data among processes (parallel=True) or give all processes all data (parallel=False). Default = True.

  • longest (bool) – Whether to allow mixed length arrays and keep going until the longest array is completed, returning empty arrays for shorter ones

Returns:

it – Iterator yielding (int, int, dict) tuples of (start, end, data)

Return type:

iterator

combined_iterators(rows, *inputs, parallel=True, longest=False)[source]

Iterate through multiple files at the same time.

If you have more several HDF files with the some columns of the same length then you can use this method to iterate through them all at once, and combine the data from all of them into a single dictionary.

Parameters:
  • rows (int) – The number of rows to read in each chunk

  • *inputs (list) – A list of (tag, group, cols) triples for each file to read. In each case tag is the input file name tag, group is the group within the HDF5 file to read, and cols is a list of columns to read from that group. Specify multiple triplets to read from multiple files

  • parallel (bool) – Whether to split up data among processes (parallel=True) or give all processes all data (parallel=False). Default = True.

Returns:

it – Iterator yielding (int, int, dict) tuples of (start, end, data)

Return type:

iterator

Finding Input and Output File Names and Types

You can often just use the input and output methods above rather than finding file paths directly, but if for any reason you need to find the actual file paths you can use these methods.

get_input(tag)[source]

Return the path of an input file with the given tag, which can be aliased.

Parameters:

tag (str) – Tag as listed in self.outputs

Returns:

path – The path to the output file

Return type:

str

get_output(tag, final_name=False)[source]

Return the path of an output file with the given tag, which can be aliased already.

If final_name is False then use a temporary name - file will be moved to its final name at the end. The temporary name is prefixed with inprogress_.

Parameters:
  • tag (str) – Tag as listed in self.outputs

  • final_name (bool) – Default=False. Whether to save to the final name.

Returns:

path – The path to the output file

Return type:

str

get_input_type(tag)[source]

Return the file type class of an input file with the given tag.

Parameters:

tag (str) – The tag of the input file

Returns:

ftype – The file type class

Return type:

FileType

get_output_type(tag)[source]

Return the file type class of an output file with the given tag.

Parameters:

tag (str) – The tag of the output file

Returns:

ftype – The file type class

Return type:

FileType

classmethod output_tags()[source]

Return the list of output tags required by this stage.

Returns:

out_tags – The list of output tags

Return type:

list[str]

classmethod input_tags()[source]

Return the list of input tags required by this stage.

Returns:

in_tags – The list of input tags

Return type:

list[str]

get_config_dict(ignore=None, reduce_config=False)[source]

Write the current configuration to a dict

Parameters:
  • ignore (dict or None) – Global parameters not to write

  • reduce_config (bool) – If true, reduce the configuration by parsing out the inputs, outputs and global params

Returns:

out_dict – The configuration

Return type:

dict

Parallelization Methods

is_mpi()[source]

Check if the stage is being run under MPI.

Returns:

True if the stage is being run under MPI

Return type:

bool

is_dask()[source]

Check if the stage is being run in parallel with Dask.

Returns:

True if the stage is being run under MPI

Return type:

bool

split_tasks_by_rank(tasks)[source]

Iterate through a list of items, yielding ones this process is responsible for/

Tasks are allocated in a round-robin way.

Parameters:

tasks (iterable) – Tasks to split up

map_tasks_by_rank(function, inputs, allgather=False)[source]

Run a function over a series of inputs, in parallel

This mirrors the map function, and returns the equivalent of [function(input) for input in inputs], but executes in parallel.

Parameters:
  • function (Callable) – Function to be run on each item in inputs

  • inputs (Iterable) – Any sequence of inputs, which should be the same on all processes. Or at least the same length: inputs not assigned to this process are ignored so you could get away with a dummy input for them.

  • allgather (bool) – Whether to give all ranks the results (True) or just the root process (False). Default = False.

Returns:

results – A list of the results of calling the function on each input, in the same order as the input tasks

Return type:

list

data_ranges_by_rank(n_rows, chunk_rows, parallel=True)[source]

Split a number of rows by process.

Given a total number of rows to read and a chunk size, yield the ranges within them that this process should handle.

Parameters:
  • n_rows (int) – Total number of rows to split up

  • chunk_rows (int) – Size of each chunk to be read.

  • Parallel (bool) – Whether to split data by rank or just give all procs all data. Default=True

Returns:

start, end – The start and end of the range of rows to be read by this process

Return type:

tuple

Diagnostic Methods

time_stamp(tag)[source]

Print a time stamp with an optional tag.

Parameters:

tag (str) – Additional info to print in the output line. Default is empty.

memory_report(tag=None)[source]

Print a report about memory currently available on the node the process is running on.

Parameters:

tag (str) – Additional info to print in the output line. Default is empty.

Introspection Methods

These methods are sometimes useful for finding out about the pipeline stages that are available in a pipeline, or when subclassing a method.

classmethod get_stage(name, module_name=None)[source]

Return the PipelineStage subclass with the given name.

This is used so that we do not need a new entry point __main__ function for each new stage - instead we can just use a single one which can query which class it should be using based on the name.

If module_name is provided, this will import that module in order to load the required class.

Returns:

cls – The corresponding subclass

Return type:

class

classmethod get_module()[source]

Return the path to the python package containing the current sub-class

If we have a PipelineStage subclass defined in a module called “bar”, in a package called “foo” e.g.: /path/to/foo/bar.py <– contains subclass “Baz”

Then calling Baz.get_module() will return “foo.bar”.

We use this later to construct command lines like “python -m foo Baz”

Returns:

module – The module containing this class.

Return type:

str

classmethod describe_configuration()[source]