"""Module with core functionality for a single pipeline stage """
import pathlib
import os
import sys
from textwrap import dedent
import shutil
import cProfile
import pdb
import datetime
import warnings
import socket
from abc import abstractmethod
from . import errors
from .monitor import MemoryMonitor
from .config import StageParameter, StageConfig, cast_to_streamable
from .utils import activate_tracing
from . import file_types
SERIAL = "serial"
MPI_PARALLEL = "mpi"
DASK_PARALLEL = "dask"
IN_PROGRESS_PREFIX = "inprogress_"
class PipelineStage:
"""A PipelineStage implements a single calculation step within a wider pipeline.
Each different type of analysis stage is represented by a subclass of this
base class. The base class handles the connection between different pipeline
stages, and the execution of the stages within a workflow system (parsl),
potentially in parallel (MPI).
An instance of one of these classes represents an actual run of the stage,
with the required inputs, outputs, and configuration specified.
See documentation pages for more details.
"""
parallel = True
dask_parallel = False
config_options = {}
doc = ""
allow_reload = False
def __init__(self, args, comm=None, aliases=None):
"""Construct a pipeline stage, specifying the inputs, outputs, and configuration for it.
The constructor needs a dict or namespace. It should include:
- input paths (required)
- config path (required)
- output paths (optional but usual)
- additional configuration (required if not specified elsewhere)
Input and output paths should map tags to paths.
Tags are strings, and the first elements in each item in the subclass's
"inputs" and "output" attributes.
e.g. for a subclass with:
inputs = [('eggs', TextFile)]
outputs = [('spam', TextFile)]
the args could contain:
{'eggs': 'inputs/eggs.txt',
'spam': 'outputs/spam.txt' }
If spam is not specified it will default to "./spam.txt"
}
The config should map "config" to a path where a YAML config file
is located, e.g. {'config':'/path/to/config.yml'}
Any config variables that are specified in the class's config attribute
will be searched for first in args, then in the config file, and then
by looking at any default value they have been given.
If they have no default value (and just a type, like int, is listed), then
it's an error if they are not specified somewhere.
The execute method can instantiate and run the class together, with added bonuses
like profiling and debugging tools.
Parameters
----------
args: dict or namespace
Specification of input and output paths and any missing config options
comm: MPI communicator
(default is None) An MPI comm object to use in preference to COMM_WORLD
aliases: dict
Mapping of tags to new tags
"""
self._configs = StageConfig(**self.config_options)
self._inputs = None
self._outputs = None
self._parallel = SERIAL
self._comm = None
self._size = 1
self._rank = 0
self._io_checked = False
self.dask_client = None
if aliases is None:
aliases = {}
self._aliases = aliases
self.load_configs(args)
if comm is not None:
self.setup_mpi(comm)
self.check_io()
@classmethod
def make_stage(cls, **kwargs):
"""Make a stage of a particular type"""
kwcopy = kwargs.copy()
kwcopy.setdefault("config", None)
comm = kwcopy.pop("comm", None)
name = kwcopy.get("name", None)
aliases = kwcopy.pop("aliases", {})
for input_ in cls.inputs:
kwcopy.setdefault(input_[0], "None")
if name is not None:
for output_ in cls.outputs: # pylint: disable=no-member
outtag = output_[0]
aliases[outtag] = f"{outtag}_{name}"
stage = cls(kwcopy, comm=comm, aliases=aliases)
return stage
def get_aliases(self):
"""Returns the dictionary of aliases used to remap inputs and outputs
in the case that we want to have multiple instance of this class in the pipeline"""
return self._aliases
def get_aliased_tag(self, tag):
"""Returns the possibly remapped value for an input or output tag
Parameter
---------
tag : `str`
The input or output tag we are checking
Returns
-------
aliased_tag : `str`
The aliases version of the tag
"""
aliases = self.get_aliases()
return aliases.get(tag, tag)
@abstractmethod
def run(self): # pragma: no cover
"""Run the stage and return the execution status.
Subclasses must implemented this method.
"""
raise NotImplementedError("run")
def validate(self):
"""Check that the inputs actually have the data needed for execution,
This is called before the run method. It is an optional stage, meant
for checking that the input to the stage is actual in the form and
shape needed before an expensive run is executed."""
pass
def load_configs(self, args):
"""
Load the configuraiton
Parameters
----------
args: dict or namespace
Specification of input and output paths and any missing config options
"""
if not isinstance(args, dict):
args = vars(args)
# We alwys assume the config arg exists, whether it is in input_tags or not
if "config" not in args: # pragma: no cover
raise ValueError("The argument --config was missing on the command line.")
_name = args.get("name")
if _name is not None:
self._configs.name = _name
# First, we extract configuration information from a combination of
# command line arguments and optional 'config' file
self._inputs = dict(config=args["config"])
try:
self.read_config(args)
except Exception as error:
error_class = type(error)
msg = str(error)
raise error_class(f"Error configuring {self.instance_name}: {msg}")
def check_io(self, args=None):
"""
Check the inputs and outputs.
This function is seperate so that when Stages are configured interactively after
construction then can invove this
Parameters
----------
args: dict or namespace
Specification of input and output paths and any missing config options
"""
# We first check for missing input files, that's a show stopper
if self._io_checked: # pragma: no cover
return
if args is None: # pragma: no cover
args = self.config
missing_inputs = []
for x in self.input_tags():
val = args.get(x)
aliased_tag = self.get_aliased_tag(x)
if val is None:
val = args.get(aliased_tag)
if val is None: # pragma: no cover
missing_inputs.append(f"--{x}")
else:
self._inputs[aliased_tag] = val
if missing_inputs: # pragma: no cover
missing_inputs = " ".join(missing_inputs)
raise ValueError(
f"""
{self.instance_name} Missing these names on the command line:
Input names: {missing_inputs}"""
)
# We prefer to receive explicit filenames for the outputs but will
# tolerate missing output filenames and will default to tag name in
# current folder.
self._outputs = {}
for i, x in enumerate(self.output_tags()):
aliased_tag = self.get_aliased_tag(x)
if args.get(x) is None:
ftype = self.outputs[i][1] # pylint: disable=no-member
self._outputs[aliased_tag] = ftype.make_name(aliased_tag)
else:
self._outputs[aliased_tag] = args[x]
self._io_checked = True
def setup_mpi(self, comm=None):
"""
Setup the MPI interface
Parameters
----------
comm: MPI communicator
(default is None) An MPI comm object to use in preference to COMM_WORLD
"""
mpi = self.config.get("mpi", False)
if mpi: # pragma: no cover
try:
# This isn't a ceci dependency, so give a sensible error message if not installed.
import mpi4py.MPI
except ImportError:
print("ERROR: Using --mpi option requires mpi4py to be installed.")
raise
# For scripting and testing we allow an MPI communicator or anything
# with the same API to be passed in directly, overriding the --mpi
# flag.
if comm is not None:
self._parallel = MPI_PARALLEL
self._comm = comm
self._size = self._comm.Get_size()
self._rank = self._comm.Get_rank()
elif mpi: # pragma: no cover
self._parallel = MPI_PARALLEL
self._comm = mpi4py.MPI.COMM_WORLD
self._size = self._comm.Get_size()
self._rank = self._comm.Get_rank()
else:
self._parallel = SERIAL
self._comm = None
self._size = 1
self._rank = 0
# If we are running under MPI but this subclass has enabled dask
# then we note that here. It stops various MPI-specific things happening
# later
if (self._parallel == MPI_PARALLEL) and self.dask_parallel:
self._parallel = DASK_PARALLEL
pipeline_stages = {}
incomplete_pipeline_stages = {}
def __init_subclass__(cls, **kwargs):
"""
Python 3.6+ provides a facility to automatically
call a method (this one) whenever a new subclass
is defined. In this case we use that feature to keep
track of all available pipeline stages, each of which is
defined by a class.
"""
super().__init_subclass__(**kwargs)
# This is a hacky way of finding the file
# where our stage was defined
filename = sys.modules[cls.__module__].__file__
stage_is_complete = (
hasattr(cls, "inputs")
and hasattr(cls, "outputs")
and not getattr(cls.run, "__isabstractmethod__", False)
)
# If there isn't an explicit name already then set it here.
# by default use the class name.
if not hasattr(cls, "name"): # pragma: no cover
cls.name = cls.__name__
if cls.name is None: # pragma: no cover
cls.name = cls.__name__
if stage_is_complete:
# Deal with duplicated class names
if cls.name in cls.pipeline_stages and not cls.allow_reload:
other = cls.pipeline_stages[cls.name][1]
raise errors.DuplicateStageName(
"You created two pipeline stages with the"
f"name {cls.name}.\nOne was in {filename}\nand the "
f"other in {other}\nYou can either change the class "
"name or explicitly put a variable 'name' in the top"
"level of the class."
)
# Check for "config" in the inputs list - this is implicit
for name, _ in cls.inputs:
if name == "config":
raise errors.ReservedNameError(
"An input called 'config' is implicit in each pipeline "
"stage and should not be added explicitly. Please update "
f"your pipeline stage called {cls.name} to remove/rename "
"the input called 'config'."
)
# Check if user has over-written the config variable.
# Quite a common error I make myself.
if not isinstance(cls.config, property):
raise errors.ReservedNameError(
"You have a class variable called 'config', which "
"is reserved in ceci for its own configuration. "
"You may have meant to specify config_options?"
)
# Find the absolute path to the class defining the file
path = pathlib.Path(filename).resolve()
# Add a description of the parameters to the end of the docstring
# If no config options are specified, omit this.
if stage_is_complete and cls.config_options:
config_text = cls._describe_configuration_text()
if cls.__doc__ is None:
cls.__doc__ = f"Stage {cls.name}\n\nParameters\n----------\n{config_text}"
else:
# strip any existing configuration text from parent classes that is at the end of the doctring
cls.__doc__ = cls.__doc__.split("Parameters")[0]
cls.__doc__ += f"\n\nParameters\n----------\n{config_text}"
# Register the class
if stage_is_complete:
cls.pipeline_stages[cls.name] = (cls, path)
else:
cls.incomplete_pipeline_stages[cls.__name__] = (cls, path)
#############################################
# Life cycle-related methods and properties.
#############################################
[docs]
@classmethod
def get_stage(cls, name, module_name=None):
"""
Return the PipelineStage subclass with the given name.
This is used so that we do not need a new entry point __main__ function
for each new stage - instead we can just use a single one which can query
which class it should be using based on the name.
If module_name is provided, this will import that module
in order to load the required class.
Returns
-------
cls: class
The corresponding subclass
"""
stage = cls.pipeline_stages.get(name)
if stage is None:
if module_name:
__import__(module_name)
stage = cls.pipeline_stages.get(name)
# If not found, then check for incomplete stages
if stage is None:
if name in cls.incomplete_pipeline_stages:
raise errors.IncompleteStage(
f"The stage {name} is not completely written. "
"Stages must specify 'inputs', 'outputs' as class variables "
f"and a 'run' method.\n{name} might be unfinished, or it might "
"be intended as a base for other classes and not to be run."
)
raise errors.StageNotFound(f"Unknown stage '{name}'")
return stage[0]
[docs]
@classmethod
def get_module(cls):
"""
Return the path to the python package containing the current sub-class
If we have a PipelineStage subclass defined in a module called "bar", in
a package called "foo" e.g.:
/path/to/foo/bar.py <-- contains subclass "Baz"
Then calling Baz.get_module() will return "foo.bar".
We use this later to construct command lines like "python -m foo Baz"
Returns
-------
module: str
The module containing this class.
"""
return cls.pipeline_stages[cls.name][0].__module__
[docs]
@classmethod
def describe_configuration(cls):
print(cls._describe_configuration_text())
@classmethod
def _describe_configuration_text(cls):
s = []
if cls.config_options is None:
return "<This class has no configuration options>"
for name, val in cls.config_options.items():
if isinstance(val, StageConfig):
val = val[name]
if isinstance(val, StageParameter):
s.append(f"{name}: {val.numpy_style_help_text()}")
elif isinstance(val, type):
s.append(f"{name}: {val.__name__}] (required)")
else:
s.append(f"{name}: {type(val).__name__}] (default={val})")
for input_ in cls.inputs:
s.append(f"{input_[0]}: {input_[1].__name__} (INPUT)")
for output_ in cls.outputs:
s.append(f"{output_[0]}: {output_[1].__name__} (OUTPUT)")
return '\n\n'.join(s)
@classmethod
def usage(cls): # pragma: no cover
"""
Print a usage message.
"""
names = []
docs = []
for name, (stage, _) in cls.pipeline_stages.items():
# find the first non-empty doc line, if there is one.
try:
doc_lines = [s.strip() for s in stage.__doc__.split("\n")]
doc_lines = [d for d in doc_lines if d]
doc = doc_lines[0]
except (AttributeError, IndexError):
doc = ""
# cut off any very long lines
if len(doc) > 100:
doc = doc[:100] + " ..."
# print the text
names.append(name)
docs.append(doc)
# Make it look like a nice table by finding the maximum
# length of the names, so that all the docs line up
n = max(len(name) for name in names) + 1
stage_texts = [f"- {name:{n}} - {d}" for name, d in zip(names, docs)]
stage_text = "\n".join(stage_texts)
try:
module = cls.get_module().split(".")[0]
except: # pylint: disable=bare-except
module = "<module_name>"
sys.stderr.write(
f"""
Usage: python -m {module} <stage_name> <stage_arguments>
If no stage_arguments are given then usage information
for the chosen stage will be given.
I currently know about these stages:
{stage_text}
"""
)
@classmethod
def main(cls):
"""
Create an instance of this stage and execute it with
inputs and outputs taken from the command line
"""
try:
stage_name = sys.argv[1]
except IndexError: # pragma: no cover
cls.usage()
return 1
if stage_name in ["--help", "-h"] and len(sys.argv) == 2: # pragma: no cover
cls.usage()
return 1
if stage_name.find(".") >= 0:
tokens = stage_name.split(".")
module_name = ".".join(tokens[:-1])
stage_name = tokens[-1]
else:
module_name = None
stage = cls.get_stage(stage_name, module_name)
args = stage.parse_command_line()
stage.execute(args)
return 0
@classmethod
def parse_command_line(cls, cmd=None):
"""Set up and argument parser and parse the command line
Parameters
----------
cmd : str or None
The command line to part (if None this will use the system arguments)
Returns
-------
args : Namespace
The resulting Mapping of arguement to values
"""
import argparse
parser = argparse.ArgumentParser(description=f"Run pipeline stage {cls.name}")
parser.add_argument("stage_name")
for conf, def_val in cls.config_options.items():
if isinstance(def_val, StageParameter):
opt_type = def_val.dtype
def_val = def_val.default
else:
opt_type = def_val if isinstance(def_val, type) else type(def_val)
if opt_type == bool:
parser.add_argument(f"--{conf}", action="store_const", const=True)
parser.add_argument(
f"--no-{conf}", dest=conf, action="store_const", const=False
)
elif opt_type == list:
if not def_val:
out_type = str
else:
out_type = (
def_val[0] if isinstance(def_val[0], type) else type(def_val[0])
)
if out_type is str: # pragma: no cover
parser.add_argument(
f"--{conf}", type=lambda string: string.split(",")
)
elif out_type is int: # pragma: no cover
parser.add_argument(
f"--{conf}",
type=lambda string: [int(i) for i in string.split(",")],
)
elif out_type is float:
parser.add_argument(
f"--{conf}",
type=lambda string: [float(i) for i in string.split(",")],
)
else: # pragma: no cover
raise NotImplementedError(
"Only handles str, int and float list arguments"
)
else: # pragma: no cover
parser.add_argument(f"--{conf}", type=opt_type)
for inp in cls.input_tags():
parser.add_argument(f"--{inp}")
for out in cls.output_tags():
parser.add_argument(f"--{out}")
parser.add_argument(
"--name",
action="store",
default=cls.name,
type=str,
help="Rename the stage",
)
parser.add_argument("--config")
if cls.parallel:
parser.add_argument(
"--mpi", action="store_true", help="Set up MPI parallelism"
)
parser.add_argument(
"--pdb", action="store_true", help="Run under the python debugger"
)
parser.add_argument(
"--cprofile",
action="store",
default="",
type=str,
help="Profile the stage using the python cProfile tool",
)
parser.add_argument(
"--memmon",
type=int,
default=0,
help="Report memory use. Argument gives interval in seconds between reports",
)
parser.add_argument(
"--trace",
action="store_true",
help="Enable sending a signal to the process that prints a trace wherever it is",
)
# Error message we will return if --mpi used on a non-supported
# stage.
mpi_err = (
"Error: you used the --mpi flag (or set MPI parallelism options) "
f"for the stage {cls.name}, but that stage cannot be run in parallel."
)
if cmd is None:
if ("--mpi" in sys.argv) and not cls.parallel:
raise ValueError(mpi_err)
ret_args = parser.parse_args()
else:
if ("--mpi" in cmd) and not cls.parallel:
raise ValueError(mpi_err)
ret_args = parser.parse_args(cmd)
return ret_args
@classmethod
def execute(cls, args, comm=None):
"""
Create an instance of this stage and run it
with the specified inputs and outputs.
This is calld by the main method.
Parameters
----------
args: namespace
The argparse namespace for this subclass.
"""
# Create the stage instance. Running under dask this only
# actually needs to happen for one process, but it's not a major
# overhead and lets us do a whole bunch of other setup above
stage = cls(args)
stage.setup_mpi(comm)
# This happens before dask is initialized
start_time = datetime.datetime.now()
if stage.rank == 0:
start_time_text = start_time.isoformat(" ")
print(f"Executing stage: {cls.name} @ {start_time_text}")
if stage.is_dask():
is_client = stage.start_dask()
# worker and scheduler stages do not execute the
# run method under dask
if not is_client:
return
if args.cprofile: # pragma: no cover
profile = cProfile.Profile()
profile.enable()
if args.memmon: # pragma: no cover
monitor = MemoryMonitor.start_in_thread(interval=args.memmon)
if args.trace:
activate_tracing(stage._rank)
# Now we try to see if the validation step has been changed,
# if it has then we will run the validation step, and raise any errors
try:
stage.validate()
except Exception as error:
if stage.rank==0:
print(f"Looks like there is an validation error in: {cls.name}",
"the input data for this stage did not pass the checks implemented on it.")
print(error)
raise
try:
stage.run()
except Exception as error: # pragma: no cover
if args.pdb:
print(
"There was an exception - starting python debugger because you ran with --pdb"
)
print(error)
pdb.post_mortem()
else:
if stage.rank == 0:
end_time = datetime.datetime.now()
end_time_text = end_time.isoformat(" ")
minutes = (end_time - start_time).total_seconds() / 60
print(
f"Stage failed: {cls.name} @ {end_time_text} after {minutes:.2f} minutes"
)
raise
finally:
if args.memmon: # pragma: no cover
monitor.stop()
if stage.is_dask():
stage.stop_dask()
# The default finalization renames any output files to their
# final location, but subclasses can override to do other things too
try:
stage.finalize()
except Exception as error: # pragma: no cover
if args.pdb:
print(
"There was an exception in the finalization - starting python debugger because you ran with --pdb"
)
print(error)
pdb.post_mortem()
else:
raise
if args.cprofile: # pragma: no cover
profile.disable()
profile.dump_stats(args.cprofile)
profile.print_stats("cumtime")
# Under dask the
# the root process has gone off to become the scheduler,
# and process 1 becomes the client which runs this code
# and gets to this point
if stage.rank == 0 or stage.is_dask():
end_time = datetime.datetime.now()
end_time_text = end_time.isoformat(" ")
minutes = (end_time - start_time).total_seconds() / 60
print(
f"Stage complete: {cls.name} @ {end_time_text} took {minutes:.2f} minutes"
)
def finalize(self):
"""Finalize the stage, moving all its outputs to their final locations."""
# Synchronize files so that everything is closed
if self.is_mpi(): # pragma: no cover
self.comm.Barrier()
# Move files to their final path
# Only the root process moves things, except under dask it is
# process 1, which is the only process that reaches this point
# (as noted above)
if (self.rank == 0) or self.is_dask():
for tag in self.output_tags():
# find the old and new names
self._finalize_tag(tag)
def _finalize_tag(self, tag):
"""Finalize the data for a particular tag.
This can be overridden by sub-classes for more complicated behavior
"""
aliased_tag = self.get_aliased_tag(tag)
temp_name = self.get_output(aliased_tag)
final_name = self.get_output(aliased_tag, final_name=True)
# it's not an error here if the path does not exist,
# because that will be handled later.
if pathlib.Path(temp_name).exists():
# replace directories, rather than nesting more results
if pathlib.Path(final_name).is_dir(): # pragma: no cover
shutil.rmtree(final_name)
shutil.move(temp_name, final_name)
else: # pragma: no cover
sys.stderr.write(
f"NOTE/WARNING: Expected output file {final_name} was not generated.\n"
)
return final_name
#############################################
# Parallelism-related methods and properties.
#############################################
@property
def rank(self):
"""The rank of this process under MPI (0 if not running under MPI)"""
return self._rank
@property
def size(self):
"""The number or processes under MPI (1 if not running under MPI)"""
return self._size
@property
def comm(self):
"""The MPI communicator object (None if not running under MPI)"""
return self._comm
def is_parallel(self):
"""
Returns True if the code is being run in parallel.
Right now is_parallel() will return the same value as is_mpi(),
but that may change in future if we implement other forms of
parallelization.
"""
return self._parallel != SERIAL
[docs]
def is_mpi(self):
"""
Check if the stage is being run under MPI.
Returns
-------
bool
True if the stage is being run under MPI
"""
return self._parallel == MPI_PARALLEL
[docs]
def is_dask(self):
"""
Check if the stage is being run in parallel with Dask.
Returns
-------
bool
True if the stage is being run under MPI
"""
return self._parallel == DASK_PARALLEL
def start_dask(self):
"""
Prepare dask to run under MPI. After calling this method
only a single process, MPI rank 1 will continue to exeute code
"""
# using the programmatic dask configuration system
# does not seem to work. Presumably the loggers have already
# been created by the time we modify the config. Doing it with
# env vars seems to work. If the user has already set this then
# we use that value. Otherwise we only want error logs
key = "DASK_LOGGING__DISTRIBUTED"
os.environ[key] = os.environ.get(key, "error")
try:
import dask
import dask_mpi
import dask.distributed
except ImportError: # pragma: no cover
print(
"ERROR: Using --mpi option on stages that use dask requires "
"dask[distributed] and dask_mpi to be installed."
)
raise
if self.size < 3: # pragma: no cover
raise ValueError(
"Dask requires at least three processes. One becomes a scheduler "
"process, one is a client that runs the code, and more are required "
"as worker processes."
)
# This requires my fork until/unless they merge the PR, to allow
# us to pass in these two arguments. In vanilla dask-mpi sys.exit
# is called at the end of the event loop without returning to us.
# After this point only a single process, MPI rank 1,
# should continue to exeute code. The others enter an event
# loop and return with is_client=False, which we return here
# to tell the caller that they should not run everything.
is_client = dask_mpi.initialize(comm=self.comm, exit=False)
if is_client:
# Connect this local process to remote workers.
self.dask_client = dask.distributed.Client()
# I don't yet know how to see this dashboard link at nersc
print(f"Started dask. Diagnostics at {self.dask_client.dashboard_link}")
return is_client
def stop_dask(self):
"""
End the dask event loop
"""
self.dask_client.retire_workers()
self.dask_client.shutdown()
[docs]
def split_tasks_by_rank(self, tasks):
"""Iterate through a list of items, yielding ones this process is responsible for/
Tasks are allocated in a round-robin way.
Parameters
----------
tasks: iterable
Tasks to split up
"""
for i, task in enumerate(tasks):
if i % self.size == self.rank:
yield task
[docs]
def map_tasks_by_rank(self, function, inputs, allgather=False):
"""Run a function over a series of inputs, in parallel
This mirrors the map function, and returns the equivalent of
[function(input) for input in inputs], but executes in parallel.
Parameters
----------
function: Callable
Function to be run on each item in inputs
inputs: Iterable
Any sequence of inputs, which should be the same
on all processes. Or at least the same length:
inputs not assigned to this process are ignored so
you could get away with a dummy input for them.
allgather: bool
Whether to give all ranks the results (True) or just the
root process (False). Default = False.
Returns
-------
results: list
A list of the results of calling the function on each input,
in the same order as the input tasks
"""
results = []
# We keep track of the number of inputs manually rather
# than calling len(inputs) because this allows inputs to
# be an iterator.
n = 0
for i, inp in enumerate(inputs):
n += 1
if i % self.size == self.rank:
results.append(function(inp))
# If this is running in serial then the above just functions
# like a basic map or list comprehension.
if self.comm is not None:
# Collate result as a list-of-lists, one sub-list for
# each process
if allgather:
collected_results = self.comm.allgather(results)
else:
collected_results = self.comm.gather(results)
if self.rank != 0:
return
# convert the list-of-lists back into a single list
# of results, returning to the original ordering.
# The round-robin way we allocated them in the first
# place is reversed by this.
results = []
for i in range(n):
j = i % self.size
k = i // self.size
results.append(collected_results[j][k])
return results
[docs]
def data_ranges_by_rank(self, n_rows, chunk_rows, parallel=True):
"""Split a number of rows by process.
Given a total number of rows to read and a chunk size, yield
the ranges within them that this process should handle.
Parameters
----------
n_rows: int
Total number of rows to split up
chunk_rows: int
Size of each chunk to be read.
Parallel: bool
Whether to split data by rank or just give all procs all data.
Default=True
Returns
-------
start, end: tuple
The start and end of the range of rows to be read by this process
"""
n_chunks = n_rows // chunk_rows
if n_chunks * chunk_rows < n_rows: # pragma: no cover
n_chunks += 1
if parallel:
it = self.split_tasks_by_rank(range(n_chunks))
else:
it = range(n_chunks)
for i in it:
start = i * chunk_rows
end = min((i + 1) * chunk_rows, n_rows)
yield start, end
##################################################
# Input and output-related methods and properties.
##################################################
[docs]
def get_output(self, tag, final_name=False):
"""
Return the path of an output file with the given tag,
which can be aliased already.
If final_name is False then use a temporary name - file will
be moved to its final name at the end. The temporary name
is prefixed with `inprogress_`.
Parameters
----------
tag: str
Tag as listed in self.outputs
final_name: bool
Default=False. Whether to save to the final name.
Returns
-------
path: str
The path to the output file
"""
tag = self.get_aliased_tag(tag)
path = self._outputs[tag]
# If not the final version, add a tag at the start of the filename
if not final_name:
p = pathlib.Path(path)
p = p.parent / (IN_PROGRESS_PREFIX + p.name)
path = str(p)
return path
def open_output(
self, tag, wrapper=False, final_name=False, **kwargs
): # pragma: no cover
"""
Find and open an output file with the given tag, in write mode.
If final_name is True then they will be opened using their final
target output name. Otherwise we will prepend `inprogress_` to their
file name. This means we know that if the final file exists then it
is completed.
If wrapper is True this will return an instance of the class
of the file as specified in the cls.outputs. Otherwise it will
return an open file object (standard python one or something more
specialized).
Parameters
----------
tag: str
Tag as listed in self.outputs
wrapper: bool
Whether to return an underlying file object (False) or a data type instance (True)
final_name: bool
Default=False. Whether to save to
**kwargs:
Extra args are passed on to the file's class constructor.
Returns
-------
obj: file or object
The opened file or object
"""
path = self.get_output(tag, final_name=final_name)
output_class = self.get_output_type(tag)
# HDF files and directory outputs can be opened for parallel writing
# under MPI. This checks if:
# - we have been told to open in parallel
# - we are actually running under MPI
# and adds the flags required if all these are true
run_parallel = kwargs.pop("parallel", False) and self.is_mpi()
if run_parallel and issubclass(output_class, file_types.Directory):
kwargs["parallel"] = True
kwargs["comm"] = self.comm
# otherwise must be HDF5. Ideally we would check more carefully
# here, but I don't want to mess up any RAIL HDF stuff by accident.
elif run_parallel:
kwargs["driver"] = "mpio"
kwargs["comm"] = self.comm
# XXX: This is also not a dependency, but it should be.
# Or even better would be to make it a dependency of descformats where it
# is actually used.
import h5py
if not h5py.get_config().mpi:
print(
dedent(
"""\
Your h5py installation is not MPI-enabled.
Options include:
1) Set nprocess to 1 for all stages
2) Upgrade h5py to use mpi. See instructions here:
http://docs.h5py.org/en/latest/build.html#custom-installation
Note: If using conda, the most straightforward way is to enable it is
conda install -c spectraldns h5py-parallel
"""
)
)
raise RuntimeError("h5py module is not MPI-enabled.")
# Return an opened object representing the file
obj = output_class(path, "w", **kwargs)
if wrapper:
return obj
return obj.file
@classmethod
def inputs_(cls):
"""
Return the dict mapping input tags to file names.
Returns
-------
in_dict : dict[str:str]
"""
return cls.inputs # pylint: disable=no-member
@classmethod
def outputs_(cls):
"""
Return the dict mapping output tags to file names.
Returns
-------
out_dict : dict[str:str]
"""
return cls.outputs # pylint: disable=no-member
[docs]
def get_output_type(self, tag):
"""
Return the file type class of an output file with the given tag.
Parameters
----------
tag : str
The tag of the output file
Returns
-------
ftype : FileType
The file type class
"""
tag = self.get_aliased_tag(tag)
for t, dt in self.outputs_():
t = self.get_aliased_tag(t)
if t == tag:
return dt
raise ValueError(f"Tag {tag} is not a known output") # pragma: no cover
##################################################
# Configuration-related methods and properties.
##################################################
@property
def instance_name(self):
"""Return the name associated to this particular instance of this stage"""
return self._configs.get("name", self.name)
@property
def config(self):
"""
The configuration dictionary for this stage, aggregating command
line options and optional configuration file.
Options specified in the subclass variable `config_options` are
read from the configuration file, command line, or `make_stage` choices,
and stored in this dictionary.
"""
return self._configs
def read_config(self, args):
"""
This function looks for the arguments of the pipeline stage using a
combination of default values, command line options and separate
configuration file.
The order for resolving config options is first looking for a default
value, then looking for a
In case a mandatory argument (argument with no default) is missing,
an exception is raised.
Note that we recognize arguments with no default as the ones where
self.config_options holds a type instead of a value.
"""
# Try to load configuration file if provided
import yaml
config_file = self.get_input("config")
# This is all the config information in the file, including
# things for other stages
if isinstance(config_file, dict):
overall_config = config_file
elif config_file is not None:
with open(config_file) as _config_file:
overall_config = yaml.safe_load(_config_file)
else:
overall_config = {}
# The user can define global options that are inherited by
# all the other sections if not already specified there.
input_config = overall_config.get("global", {})
# This is just the config info in the file for this stage.
# It may be incomplete - there may be things specified on the
# command line instead, or just using their default values
stage_config = overall_config.get(self.instance_name, {})
input_config.update(stage_config)
self._configs.set_config(input_config, args)
[docs]
def get_config_dict(self, ignore=None, reduce_config=False):
"""Write the current configuration to a dict
Parameters
----------
ignore : dict or None
Global parameters not to write
reduce_config : bool
If true, reduce the configuration by parsing out the inputs, outputs and global params
Returns
-------
out_dict : dict
The configuration
"""
out_dict = {}
if reduce_config:
ignore_keys = self.input_tags() + self.output_tags() + ["config"]
else:
ignore_keys = []
ignore = ignore or {}
for key, val in self.config.items():
if reduce_config:
if key in ignore:
if ignore[key] == val:
continue
if key in ignore_keys:
continue
if key in self.input_tags() and val in [None, 'None']:
continue
out_dict[key] = cast_to_streamable(val)
return out_dict
def find_inputs(self, pipeline_files):
"""Find and retrun all the inputs associated to this stage in the FileManager
These are returned as a dictionary of tag : path pairs
"""
ret_dict = {}
for tag, _ in self.inputs_():
aliased_tag = self.get_aliased_tag(tag)
ret_dict[aliased_tag] = pipeline_files[aliased_tag]
return ret_dict
def find_outputs(self, outdir):
"""Find and retrun all the outputs associated to this stage
These are returned as a dictionary of tag : path pairs
"""
ret_dict = {}
for tag, ftype in self.outputs_():
aliased_tag = self.get_aliased_tag(tag)
if not aliased_tag in self._outputs.keys(): # pragma: no cover
self._outputs[aliased_tag] = ftype.make_name(aliased_tag)
ret_dict[aliased_tag] = f"{outdir}/{self._outputs[aliased_tag]}"
return ret_dict
def print_io(self, stream=sys.stdout):
"""Print out the tags, paths and types for all the inputs and outputs of this stage"""
stream.write("Inputs--------\n")
for tag, ftype in self.inputs_():
aliased_tag = self.get_aliased_tag(tag)
stream.write(
f"{tag:20} : {aliased_tag:20} :{str(ftype):20} : {self._inputs[tag]}\n"
)
stream.write("Outputs--------\n")
for tag, ftype in self.outputs_():
aliased_tag = self.get_aliased_tag(tag)
stream.write(
f"{tag:20} : {aliased_tag:20} :{str(ftype):20} : {self._outputs[aliased_tag]}\n"
)
def already_finished(self):
"""Print a warning that a stage is being skipped"""
print(f"Skipping stage {self.instance_name} because its outputs exist already")
[docs]
def iterate_fits(
self, tag, hdunum, cols, chunk_rows, parallel=True
): # pragma: no cover
"""
Loop through chunks of the input data from a FITS file with the given tag
TODO: add ceci tests of this functions
Parameters
----------
tag: str
The tag from the inputs list to use
hdunum: int
The extension number to read
cols: list
The columns to read
chunk_rows: int
Number of columns to read and return at once
parallel: bool
Whether to split up data among processes (parallel=True) or give
all processes all data (parallel=False). Default = True.
Returns
-------
it: iterator
Iterator yielding (int, int, array) tuples of (start, end, data)
data is a structured array.
"""
fits = self.open_input(tag)
ext = fits[hdunum]
n = ext.get_nrows()
for start, end in self.data_ranges_by_rank(n, chunk_rows, parallel=parallel):
data = ext.read_columns(cols, rows=range(start, end))
yield start, end, data
[docs]
def iterate_hdf(
self, tag, group_name, cols, chunk_rows, parallel=True, longest=False
):
"""
Loop through chunks of the input data from an HDF5 file with the given tag.
All the selected columns must have the same length.
Parameters
----------
tag: str
The tag from the inputs list to use
group: str
The group within the HDF5 file to use, looked up as
file[group]
cols: list
The columns to read
chunk_rows: int
Number of columns to read and return at once
parallel: bool
Whether to split up data among processes (parallel=True) or give
all processes all data (parallel=False). Default = True.
longest: bool
Whether to allow mixed length arrays and keep going until the longest
array is completed, returning empty arrays for shorter ones
Returns
-------
it: iterator
Iterator yielding (int, int, dict) tuples of (start, end, data)
"""
import numpy as np
hdf = self.open_input(tag)
group = hdf[group_name]
# Check all the columns are the same length
N = [len(group[col]) for col in cols]
n = max(N)
if not longest:
if not np.equal(N, n).all():
raise ValueError(
f"Different columns among {cols} in file {tag} group {group_name}"
"are different sizes - if this is acceptable set longest=True"
)
# Iterate through the data providing chunks
for start, end in self.data_ranges_by_rank(n, chunk_rows, parallel=parallel):
data = {col: group[col][start:end] for col in cols}
yield start, end, data
def combined_iterators(self, rows, *inputs, parallel=True):
"""
Iterate through multiple files at the same time.
If you have more several HDF files with the some
columns of the same length then you can use this method to
iterate through them all at once, and combine the data from
all of them into a single dictionary.
Parameters
----------
rows: int
The number of rows to read in each chunk
*inputs: list
A list of (tag, group, cols) triples for each file to read.
In each case tag is the input file name tag, group is the
group within the HDF5 file to read, and cols is a list of
columns to read from that group. Specify multiple triplets
to read from multiple files
parallel: bool
Whether to split up data among processes (parallel=True) or give
all processes all data (parallel=False). Default = True.
Returns
-------
it: iterator
Iterator yielding (int, int, dict) tuples of (start, end, data)
"""
if not len(inputs) % 3 == 0:
raise ValueError(
"Arguments to combined_iterators should be in threes: "
"tag, group, value"
)
n = len(inputs) // 3
iterators = []
for i in range(n):
tag = inputs[3 * i]
section = inputs[3 * i + 1]
cols = inputs[3 * i + 2]
iterators.append(
self.iterate_hdf(tag, section, cols, rows, parallel=parallel)
)
for it in zip(*iterators):
data = {}
for (s, e, d) in it:
data.update(d)
yield s, e, data
################################
# Pipeline-related methods
################################
@classmethod
def generate_command(
cls, inputs, config, outputs, aliases=None, instance_name=None
):
"""
Generate a command line that will run the stage
"""
module = cls.get_module()
module = module.split(".")[0]
if sys.modules[module].__file__:
# Regular module, stage will be imported with module
flags = [f"{cls.name}"]
else:
# Namescape module, use 'ceci' to the get main
# and specify the full path
flags = [f"{cls.get_module()}.{cls.name}"]
module = "ceci"
aliases = aliases or {}
for tag, _ in cls.inputs_():
aliased_tag = aliases.get(tag, tag)
try:
fpath = inputs[aliased_tag]
except KeyError as msg: # pragma: no cover
raise ValueError(
f"Missing input location {aliased_tag} {str(inputs)}"
) from msg
flags.append(f"--{tag}={fpath}")
if instance_name is not None and instance_name != cls.name:
flags.append(f"--name={instance_name}")
flags.append(f"--config={config}")
for tag, _ in cls.outputs_():
aliased_tag = aliases.get(tag, tag)
try:
fpath = outputs[aliased_tag]
except KeyError as msg: # pragma: no cover
raise ValueError(
f"Missing output location {aliased_tag} {str(outputs)}"
) from msg
flags.append(f"--{tag}={fpath}")
flags = " ".join(flags)
# We just return this, instead of wrapping it in a
# parsl job
cmd = f"python3 -m {module} {flags}"
return cmd
def time_stamp(self, tag):
"""
Print a time stamp with an optional tag.
Parameters
----------
tag: str
Additional info to print in the output line. Default is empty.
"""
t = datetime.datetime.now()
print(f"Process {self.rank}: {tag} {t}")
sys.stdout.flush()
def memory_report(self, tag=None):
"""
Print a report about memory currently available
on the node the process is running on.
Parameters
----------
tag: str
Additional info to print in the output line. Default is empty.
"""
import psutil
t = datetime.datetime.now()
# The different types of memory are really fiddly and don't
# correspond to how you usually imagine. The simplest thing
# to report here is just how much memory is left on the machine.
mem = psutil.virtual_memory()
avail = mem.available / 1024**3
total = mem.total / 1024**3
if tag is None:
tag = ""
else:
tag = f" {tag}:"
# This gives you the name of the host. At NERSC that is the node name
host = socket.gethostname()
# Print messsage
print(
f"{t}: Process {self.rank}:{tag} Remaining memory on {host} {avail:.1f} GB / {total:.1f} GB"
)
sys.stdout.flush()