Source code for ceci.stage

"""Module with core functionality for a single pipeline stage """

import pathlib
import os
import sys
from textwrap import dedent
import shutil
import cProfile
import pdb
import datetime
import warnings
import socket

from abc import abstractmethod
from . import errors
from .monitor import MemoryMonitor
from .config import StageParameter, StageConfig, cast_to_streamable
from .utils import activate_tracing
from . import file_types

SERIAL = "serial"
MPI_PARALLEL = "mpi"
DASK_PARALLEL = "dask"

IN_PROGRESS_PREFIX = "inprogress_"


class PipelineStage:
    """A PipelineStage implements a single calculation step within a wider pipeline.

    Each different type of analysis stage is represented by a subclass of this
    base class.  The base class handles the connection between different pipeline
    stages, and the execution of the stages within a workflow system (parsl),
    potentially in parallel (MPI).

    An instance of one of these classes represents an actual run of the stage,
    with the required inputs, outputs, and configuration specified.

    See documentation pages for more details.

    """

    parallel = True
    dask_parallel = False
    config_options = {}
    doc = ""
    allow_reload = False

    def __init__(self, args, comm=None, aliases=None):
        """Construct a pipeline stage, specifying the inputs, outputs, and configuration for it.

        The constructor needs a dict or namespace. It should include:
        - input paths (required)
        - config path (required)
        - output paths (optional but usual)
        - additional configuration (required if not specified elsewhere)

        Input and output paths should map tags to paths.
        Tags are strings, and the first elements in each item in the subclass's
        "inputs" and "output" attributes.
        e.g. for a subclass with:
            inputs = [('eggs', TextFile)]
            outputs = [('spam', TextFile)]
        the args could contain:
            {'eggs': 'inputs/eggs.txt',
             'spam': 'outputs/spam.txt' }
        If spam is not specified it will default to "./spam.txt"

        }

        The config should map "config" to a path where a YAML config file
        is located, e.g. {'config':'/path/to/config.yml'}

        Any config variables that are specified in the class's config attribute
        will be searched for first in args, then in the config file, and then
        by looking at any default value they have been given.
        If they have no default value (and just a type, like int, is listed), then
        it's an error if they are not specified somewhere.

        The execute method can instantiate and run the class together, with added bonuses
        like profiling and debugging tools.

        Parameters
        ----------
        args: dict or namespace
            Specification of input and output paths and any missing config options
        comm: MPI communicator
            (default is None) An MPI comm object to use in preference to COMM_WORLD
        aliases: dict
            Mapping of tags to new tags
        """
        self._configs = StageConfig(**self.config_options)
        self._inputs = None
        self._outputs = None
        self._parallel = SERIAL
        self._comm = None
        self._size = 1
        self._rank = 0
        self._io_checked = False
        self.dask_client = None
        if aliases is None:
            aliases = {}
        self._aliases = aliases

        self.load_configs(args)
        if comm is not None:
            self.setup_mpi(comm)

        self.check_io()


    @classmethod
    def make_stage(cls, **kwargs):
        """Make a stage of a particular type"""
        kwcopy = kwargs.copy()
        kwcopy.setdefault("config", None)
        comm = kwcopy.pop("comm", None)
        name = kwcopy.get("name", None)
        aliases = kwcopy.pop("aliases", {})
        for input_ in cls.inputs:
            kwcopy.setdefault(input_[0], "None")
        if name is not None:
            for output_ in cls.outputs:  # pylint: disable=no-member
                outtag = output_[0]
                aliases[outtag] = f"{outtag}_{name}"
        stage = cls(kwcopy, comm=comm, aliases=aliases)
        return stage

    def get_aliases(self):
        """Returns the dictionary of aliases used to remap inputs and outputs
        in the case that we want to have multiple instance of this class in the pipeline"""
        return self._aliases

    def get_aliased_tag(self, tag):
        """Returns the possibly remapped value for an input or output tag

        Parameter
        ---------
        tag : `str`
            The input or output tag we are checking

        Returns
        -------
        aliased_tag : `str`
            The aliases version of the tag
        """
        aliases = self.get_aliases()
        return aliases.get(tag, tag)

    @abstractmethod
    def run(self):  # pragma: no cover
        """Run the stage and return the execution status.

        Subclasses must implemented this method.
        """
        raise NotImplementedError("run")

    def validate(self):
        """Check that the inputs actually have the data needed for execution,
        This is called before the run method. It is an optional stage, meant
        for checking that the input to the stage is actual in the form and
        shape needed before an expensive run is executed."""
        pass

    def load_configs(self, args):
        """
        Load the configuraiton

        Parameters
        ----------
        args: dict or namespace
            Specification of input and output paths and any missing config options
        """
        if not isinstance(args, dict):
            args = vars(args)

        # We alwys assume the config arg exists, whether it is in input_tags or not
        if "config" not in args:  # pragma: no cover
            raise ValueError("The argument --config was missing on the command line.")

        _name = args.get("name")
        if _name is not None:
            self._configs.name = _name

        # First, we extract configuration information from a combination of
        # command line arguments and optional 'config' file
        self._inputs = dict(config=args["config"])
        try:
            self.read_config(args)
        except Exception as error:
            error_class = type(error)
            msg = str(error)
            raise error_class(f"Error configuring {self.instance_name}: {msg}")

    def check_io(self, args=None):
        """
        Check the inputs and outputs.
        This function is seperate so that when Stages are configured interactively after
        construction then can invove this

        Parameters
        ----------
        args: dict or namespace
            Specification of input and output paths and any missing config options
        """

        # We first check for missing input files, that's a show stopper
        if self._io_checked:  # pragma: no cover
            return
        if args is None:  # pragma: no cover
            args = self.config

        missing_inputs = []
        for x in self.input_tags():
            val = args.get(x)
            aliased_tag = self.get_aliased_tag(x)

            if val is None:
                val = args.get(aliased_tag)

            if val is None:  # pragma: no cover
                missing_inputs.append(f"--{x}")
            else:
                self._inputs[aliased_tag] = val
        if missing_inputs:  # pragma: no cover
            missing_inputs = "  ".join(missing_inputs)
            raise ValueError(
                f"""

{self.instance_name} Missing these names on the command line:
    Input names: {missing_inputs}"""
            )

        # We prefer to receive explicit filenames for the outputs but will
        # tolerate missing output filenames and will default to tag name in
        # current folder.
        self._outputs = {}
        for i, x in enumerate(self.output_tags()):
            aliased_tag = self.get_aliased_tag(x)
            if args.get(x) is None:
                ftype = self.outputs[i][1]  # pylint: disable=no-member
                self._outputs[aliased_tag] = ftype.make_name(aliased_tag)
            else:
                self._outputs[aliased_tag] = args[x]
        self._io_checked = True

    def setup_mpi(self, comm=None):
        """
        Setup the MPI interface

        Parameters
        ----------
        comm: MPI communicator
            (default is None) An MPI comm object to use in preference to COMM_WORLD
        """
        mpi = self.config.get("mpi", False)

        if mpi:  # pragma: no cover
            try:
                # This isn't a ceci dependency, so give a sensible error message if not installed.
                import mpi4py.MPI
            except ImportError:
                print("ERROR: Using --mpi option requires mpi4py to be installed.")
                raise

        # For scripting and testing we allow an MPI communicator or anything
        # with the same API to be passed in directly, overriding the --mpi
        # flag.
        if comm is not None:
            self._parallel = MPI_PARALLEL
            self._comm = comm
            self._size = self._comm.Get_size()
            self._rank = self._comm.Get_rank()
        elif mpi:  # pragma: no cover
            self._parallel = MPI_PARALLEL
            self._comm = mpi4py.MPI.COMM_WORLD
            self._size = self._comm.Get_size()
            self._rank = self._comm.Get_rank()
        else:
            self._parallel = SERIAL
            self._comm = None
            self._size = 1
            self._rank = 0

        # If we are running under MPI but this subclass has enabled dask
        # then we note that here. It stops various MPI-specific things happening
        # later
        if (self._parallel == MPI_PARALLEL) and self.dask_parallel:
            self._parallel = DASK_PARALLEL

    pipeline_stages = {}
    incomplete_pipeline_stages = {}

    def __init_subclass__(cls, **kwargs):
        """
        Python 3.6+ provides a facility to automatically
        call a method (this one) whenever a new subclass
        is defined.  In this case we use that feature to keep
        track of all available pipeline stages, each of which is
        defined by a class.

        """
        super().__init_subclass__(**kwargs)

        # This is a hacky way of finding the file
        # where our stage was defined
        filename = sys.modules[cls.__module__].__file__

        stage_is_complete = (
            hasattr(cls, "inputs")
            and hasattr(cls, "outputs")
            and not getattr(cls.run, "__isabstractmethod__", False)
        )

        # If there isn't an explicit name already then set it here.
        # by default use the class name.
        if not hasattr(cls, "name"):  # pragma: no cover
            cls.name = cls.__name__
        if cls.name is None:  # pragma: no cover
            cls.name = cls.__name__

        if stage_is_complete:
            # Deal with duplicated class names
            if cls.name in cls.pipeline_stages and not cls.allow_reload:
                other = cls.pipeline_stages[cls.name][1]
                raise errors.DuplicateStageName(
                    "You created two pipeline stages with the"
                    f"name {cls.name}.\nOne was in {filename}\nand the "
                    f"other in {other}\nYou can either change the class "
                    "name or explicitly put a variable 'name' in the top"
                    "level of the class."
                )

            # Check for "config" in the inputs list - this is implicit
            for name, _ in cls.inputs:
                if name == "config":
                    raise errors.ReservedNameError(
                        "An input called 'config' is implicit in each pipeline "
                        "stage and should not be added explicitly.  Please update "
                        f"your pipeline stage called {cls.name} to remove/rename "
                        "the input called 'config'."
                    )

        # Check if user has over-written the config variable.
        # Quite a common error I make myself.
        if not isinstance(cls.config, property):
            raise errors.ReservedNameError(
                "You have a class variable called 'config', which "
                "is reserved in ceci for its own configuration. "
                "You may have meant to specify config_options?"
            )
        # Find the absolute path to the class defining the file
        path = pathlib.Path(filename).resolve()

        # Add a description of the parameters to the end of the docstring
        # If no config options are specified, omit this.
        if stage_is_complete and cls.config_options:
            config_text = cls._describe_configuration_text()
            if cls.__doc__ is None:
                cls.__doc__ = f"Stage {cls.name}\n\nParameters\n----------\n{config_text}"
            else:
                # strip any existing configuration text from parent classes that is at the end of the doctring
                cls.__doc__ = cls.__doc__.split("Parameters")[0]
                cls.__doc__ += f"\n\nParameters\n----------\n{config_text}"

        # Register the class
        if stage_is_complete:
            cls.pipeline_stages[cls.name] = (cls, path)
        else:
            cls.incomplete_pipeline_stages[cls.__name__] = (cls, path)

    #############################################
    # Life cycle-related methods and properties.
    #############################################

[docs] @classmethod def get_stage(cls, name, module_name=None): """ Return the PipelineStage subclass with the given name. This is used so that we do not need a new entry point __main__ function for each new stage - instead we can just use a single one which can query which class it should be using based on the name. If module_name is provided, this will import that module in order to load the required class. Returns ------- cls: class The corresponding subclass """ stage = cls.pipeline_stages.get(name) if stage is None: if module_name: __import__(module_name) stage = cls.pipeline_stages.get(name) # If not found, then check for incomplete stages if stage is None: if name in cls.incomplete_pipeline_stages: raise errors.IncompleteStage( f"The stage {name} is not completely written. " "Stages must specify 'inputs', 'outputs' as class variables " f"and a 'run' method.\n{name} might be unfinished, or it might " "be intended as a base for other classes and not to be run." ) raise errors.StageNotFound(f"Unknown stage '{name}'") return stage[0]
[docs] @classmethod def get_module(cls): """ Return the path to the python package containing the current sub-class If we have a PipelineStage subclass defined in a module called "bar", in a package called "foo" e.g.: /path/to/foo/bar.py <-- contains subclass "Baz" Then calling Baz.get_module() will return "foo.bar". We use this later to construct command lines like "python -m foo Baz" Returns ------- module: str The module containing this class. """ return cls.pipeline_stages[cls.name][0].__module__
[docs] @classmethod def describe_configuration(cls): print(cls._describe_configuration_text())
@classmethod def _describe_configuration_text(cls): s = [] if cls.config_options is None: return "<This class has no configuration options>" for name, val in cls.config_options.items(): if isinstance(val, StageConfig): val = val[name] if isinstance(val, StageParameter): s.append(f"{name}: {val.numpy_style_help_text()}") elif isinstance(val, type): s.append(f"{name}: {val.__name__}] (required)") else: s.append(f"{name}: {type(val).__name__}] (default={val})") for input_ in cls.inputs: s.append(f"{input_[0]}: {input_[1].__name__} (INPUT)") for output_ in cls.outputs: s.append(f"{output_[0]}: {output_[1].__name__} (OUTPUT)") return '\n\n'.join(s) @classmethod def usage(cls): # pragma: no cover """ Print a usage message. """ names = [] docs = [] for name, (stage, _) in cls.pipeline_stages.items(): # find the first non-empty doc line, if there is one. try: doc_lines = [s.strip() for s in stage.__doc__.split("\n")] doc_lines = [d for d in doc_lines if d] doc = doc_lines[0] except (AttributeError, IndexError): doc = "" # cut off any very long lines if len(doc) > 100: doc = doc[:100] + " ..." # print the text names.append(name) docs.append(doc) # Make it look like a nice table by finding the maximum # length of the names, so that all the docs line up n = max(len(name) for name in names) + 1 stage_texts = [f"- {name:{n}} - {d}" for name, d in zip(names, docs)] stage_text = "\n".join(stage_texts) try: module = cls.get_module().split(".")[0] except: # pylint: disable=bare-except module = "<module_name>" sys.stderr.write( f""" Usage: python -m {module} <stage_name> <stage_arguments> If no stage_arguments are given then usage information for the chosen stage will be given. I currently know about these stages: {stage_text} """ ) @classmethod def main(cls): """ Create an instance of this stage and execute it with inputs and outputs taken from the command line """ try: stage_name = sys.argv[1] except IndexError: # pragma: no cover cls.usage() return 1 if stage_name in ["--help", "-h"] and len(sys.argv) == 2: # pragma: no cover cls.usage() return 1 if stage_name.find(".") >= 0: tokens = stage_name.split(".") module_name = ".".join(tokens[:-1]) stage_name = tokens[-1] else: module_name = None stage = cls.get_stage(stage_name, module_name) args = stage.parse_command_line() stage.execute(args) return 0 @classmethod def parse_command_line(cls, cmd=None): """Set up and argument parser and parse the command line Parameters ---------- cmd : str or None The command line to part (if None this will use the system arguments) Returns ------- args : Namespace The resulting Mapping of arguement to values """ import argparse parser = argparse.ArgumentParser(description=f"Run pipeline stage {cls.name}") parser.add_argument("stage_name") for conf, def_val in cls.config_options.items(): if isinstance(def_val, StageParameter): opt_type = def_val.dtype def_val = def_val.default else: opt_type = def_val if isinstance(def_val, type) else type(def_val) if opt_type == bool: parser.add_argument(f"--{conf}", action="store_const", const=True) parser.add_argument( f"--no-{conf}", dest=conf, action="store_const", const=False ) elif opt_type == list: if not def_val: out_type = str else: out_type = ( def_val[0] if isinstance(def_val[0], type) else type(def_val[0]) ) if out_type is str: # pragma: no cover parser.add_argument( f"--{conf}", type=lambda string: string.split(",") ) elif out_type is int: # pragma: no cover parser.add_argument( f"--{conf}", type=lambda string: [int(i) for i in string.split(",")], ) elif out_type is float: parser.add_argument( f"--{conf}", type=lambda string: [float(i) for i in string.split(",")], ) else: # pragma: no cover raise NotImplementedError( "Only handles str, int and float list arguments" ) else: # pragma: no cover parser.add_argument(f"--{conf}", type=opt_type) for inp in cls.input_tags(): parser.add_argument(f"--{inp}") for out in cls.output_tags(): parser.add_argument(f"--{out}") parser.add_argument( "--name", action="store", default=cls.name, type=str, help="Rename the stage", ) parser.add_argument("--config") if cls.parallel: parser.add_argument( "--mpi", action="store_true", help="Set up MPI parallelism" ) parser.add_argument( "--pdb", action="store_true", help="Run under the python debugger" ) parser.add_argument( "--cprofile", action="store", default="", type=str, help="Profile the stage using the python cProfile tool", ) parser.add_argument( "--memmon", type=int, default=0, help="Report memory use. Argument gives interval in seconds between reports", ) parser.add_argument( "--trace", action="store_true", help="Enable sending a signal to the process that prints a trace wherever it is", ) # Error message we will return if --mpi used on a non-supported # stage. mpi_err = ( "Error: you used the --mpi flag (or set MPI parallelism options) " f"for the stage {cls.name}, but that stage cannot be run in parallel." ) if cmd is None: if ("--mpi" in sys.argv) and not cls.parallel: raise ValueError(mpi_err) ret_args = parser.parse_args() else: if ("--mpi" in cmd) and not cls.parallel: raise ValueError(mpi_err) ret_args = parser.parse_args(cmd) return ret_args @classmethod def execute(cls, args, comm=None): """ Create an instance of this stage and run it with the specified inputs and outputs. This is calld by the main method. Parameters ---------- args: namespace The argparse namespace for this subclass. """ # Create the stage instance. Running under dask this only # actually needs to happen for one process, but it's not a major # overhead and lets us do a whole bunch of other setup above stage = cls(args) stage.setup_mpi(comm) # This happens before dask is initialized start_time = datetime.datetime.now() if stage.rank == 0: start_time_text = start_time.isoformat(" ") print(f"Executing stage: {cls.name} @ {start_time_text}") if stage.is_dask(): is_client = stage.start_dask() # worker and scheduler stages do not execute the # run method under dask if not is_client: return if args.cprofile: # pragma: no cover profile = cProfile.Profile() profile.enable() if args.memmon: # pragma: no cover monitor = MemoryMonitor.start_in_thread(interval=args.memmon) if args.trace: activate_tracing(stage._rank) # Now we try to see if the validation step has been changed, # if it has then we will run the validation step, and raise any errors try: stage.validate() except Exception as error: if stage.rank==0: print(f"Looks like there is an validation error in: {cls.name}", "the input data for this stage did not pass the checks implemented on it.") print(error) raise try: stage.run() except Exception as error: # pragma: no cover if args.pdb: print( "There was an exception - starting python debugger because you ran with --pdb" ) print(error) pdb.post_mortem() else: if stage.rank == 0: end_time = datetime.datetime.now() end_time_text = end_time.isoformat(" ") minutes = (end_time - start_time).total_seconds() / 60 print( f"Stage failed: {cls.name} @ {end_time_text} after {minutes:.2f} minutes" ) raise finally: if args.memmon: # pragma: no cover monitor.stop() if stage.is_dask(): stage.stop_dask() # The default finalization renames any output files to their # final location, but subclasses can override to do other things too try: stage.finalize() except Exception as error: # pragma: no cover if args.pdb: print( "There was an exception in the finalization - starting python debugger because you ran with --pdb" ) print(error) pdb.post_mortem() else: raise if args.cprofile: # pragma: no cover profile.disable() profile.dump_stats(args.cprofile) profile.print_stats("cumtime") # Under dask the # the root process has gone off to become the scheduler, # and process 1 becomes the client which runs this code # and gets to this point if stage.rank == 0 or stage.is_dask(): end_time = datetime.datetime.now() end_time_text = end_time.isoformat(" ") minutes = (end_time - start_time).total_seconds() / 60 print( f"Stage complete: {cls.name} @ {end_time_text} took {minutes:.2f} minutes" ) def finalize(self): """Finalize the stage, moving all its outputs to their final locations.""" # Synchronize files so that everything is closed if self.is_mpi(): # pragma: no cover self.comm.Barrier() # Move files to their final path # Only the root process moves things, except under dask it is # process 1, which is the only process that reaches this point # (as noted above) if (self.rank == 0) or self.is_dask(): for tag in self.output_tags(): # find the old and new names self._finalize_tag(tag) def _finalize_tag(self, tag): """Finalize the data for a particular tag. This can be overridden by sub-classes for more complicated behavior """ aliased_tag = self.get_aliased_tag(tag) temp_name = self.get_output(aliased_tag) final_name = self.get_output(aliased_tag, final_name=True) # it's not an error here if the path does not exist, # because that will be handled later. if pathlib.Path(temp_name).exists(): # replace directories, rather than nesting more results if pathlib.Path(final_name).is_dir(): # pragma: no cover shutil.rmtree(final_name) shutil.move(temp_name, final_name) else: # pragma: no cover sys.stderr.write( f"NOTE/WARNING: Expected output file {final_name} was not generated.\n" ) return final_name ############################################# # Parallelism-related methods and properties. ############################################# @property def rank(self): """The rank of this process under MPI (0 if not running under MPI)""" return self._rank @property def size(self): """The number or processes under MPI (1 if not running under MPI)""" return self._size @property def comm(self): """The MPI communicator object (None if not running under MPI)""" return self._comm def is_parallel(self): """ Returns True if the code is being run in parallel. Right now is_parallel() will return the same value as is_mpi(), but that may change in future if we implement other forms of parallelization. """ return self._parallel != SERIAL
[docs] def is_mpi(self): """ Check if the stage is being run under MPI. Returns ------- bool True if the stage is being run under MPI """ return self._parallel == MPI_PARALLEL
[docs] def is_dask(self): """ Check if the stage is being run in parallel with Dask. Returns ------- bool True if the stage is being run under MPI """ return self._parallel == DASK_PARALLEL
def start_dask(self): """ Prepare dask to run under MPI. After calling this method only a single process, MPI rank 1 will continue to exeute code """ # using the programmatic dask configuration system # does not seem to work. Presumably the loggers have already # been created by the time we modify the config. Doing it with # env vars seems to work. If the user has already set this then # we use that value. Otherwise we only want error logs key = "DASK_LOGGING__DISTRIBUTED" os.environ[key] = os.environ.get(key, "error") try: import dask import dask_mpi import dask.distributed except ImportError: # pragma: no cover print( "ERROR: Using --mpi option on stages that use dask requires " "dask[distributed] and dask_mpi to be installed." ) raise if self.size < 3: # pragma: no cover raise ValueError( "Dask requires at least three processes. One becomes a scheduler " "process, one is a client that runs the code, and more are required " "as worker processes." ) # This requires my fork until/unless they merge the PR, to allow # us to pass in these two arguments. In vanilla dask-mpi sys.exit # is called at the end of the event loop without returning to us. # After this point only a single process, MPI rank 1, # should continue to exeute code. The others enter an event # loop and return with is_client=False, which we return here # to tell the caller that they should not run everything. is_client = dask_mpi.initialize(comm=self.comm, exit=False) if is_client: # Connect this local process to remote workers. self.dask_client = dask.distributed.Client() # I don't yet know how to see this dashboard link at nersc print(f"Started dask. Diagnostics at {self.dask_client.dashboard_link}") return is_client def stop_dask(self): """ End the dask event loop """ self.dask_client.retire_workers() self.dask_client.shutdown()
[docs] def split_tasks_by_rank(self, tasks): """Iterate through a list of items, yielding ones this process is responsible for/ Tasks are allocated in a round-robin way. Parameters ---------- tasks: iterable Tasks to split up """ for i, task in enumerate(tasks): if i % self.size == self.rank: yield task
[docs] def map_tasks_by_rank(self, function, inputs, allgather=False): """Run a function over a series of inputs, in parallel This mirrors the map function, and returns the equivalent of [function(input) for input in inputs], but executes in parallel. Parameters ---------- function: Callable Function to be run on each item in inputs inputs: Iterable Any sequence of inputs, which should be the same on all processes. Or at least the same length: inputs not assigned to this process are ignored so you could get away with a dummy input for them. allgather: bool Whether to give all ranks the results (True) or just the root process (False). Default = False. Returns ------- results: list A list of the results of calling the function on each input, in the same order as the input tasks """ results = [] # We keep track of the number of inputs manually rather # than calling len(inputs) because this allows inputs to # be an iterator. n = 0 for i, inp in enumerate(inputs): n += 1 if i % self.size == self.rank: results.append(function(inp)) # If this is running in serial then the above just functions # like a basic map or list comprehension. if self.comm is not None: # Collate result as a list-of-lists, one sub-list for # each process if allgather: collected_results = self.comm.allgather(results) else: collected_results = self.comm.gather(results) if self.rank != 0: return # convert the list-of-lists back into a single list # of results, returning to the original ordering. # The round-robin way we allocated them in the first # place is reversed by this. results = [] for i in range(n): j = i % self.size k = i // self.size results.append(collected_results[j][k]) return results
[docs] def data_ranges_by_rank(self, n_rows, chunk_rows, parallel=True): """Split a number of rows by process. Given a total number of rows to read and a chunk size, yield the ranges within them that this process should handle. Parameters ---------- n_rows: int Total number of rows to split up chunk_rows: int Size of each chunk to be read. Parallel: bool Whether to split data by rank or just give all procs all data. Default=True Returns ------- start, end: tuple The start and end of the range of rows to be read by this process """ n_chunks = n_rows // chunk_rows if n_chunks * chunk_rows < n_rows: # pragma: no cover n_chunks += 1 if parallel: it = self.split_tasks_by_rank(range(n_chunks)) else: it = range(n_chunks) for i in it: start = i * chunk_rows end = min((i + 1) * chunk_rows, n_rows) yield start, end
################################################## # Input and output-related methods and properties. ##################################################
[docs] def get_input(self, tag): """ Return the path of an input file with the given tag, which can be aliased. Parameters ---------- tag: str Tag as listed in self.outputs Returns ------- path: str The path to the output file """ tag = self.get_aliased_tag(tag) return self._inputs[tag]
[docs] def get_output(self, tag, final_name=False): """ Return the path of an output file with the given tag, which can be aliased already. If final_name is False then use a temporary name - file will be moved to its final name at the end. The temporary name is prefixed with `inprogress_`. Parameters ---------- tag: str Tag as listed in self.outputs final_name: bool Default=False. Whether to save to the final name. Returns ------- path: str The path to the output file """ tag = self.get_aliased_tag(tag) path = self._outputs[tag] # If not the final version, add a tag at the start of the filename if not final_name: p = pathlib.Path(path) p = p.parent / (IN_PROGRESS_PREFIX + p.name) path = str(p) return path
[docs] def open_input(self, tag, wrapper=False, **kwargs): """ Find and open an input file with the given tag, in read-only mode. For general files this will simply return a standard python file object. For specialized file types like FITS or HDF5 it will return a more specific object - see the types.py file for more info. Parameters ---------- tag: str Tag as listed in self.inputs wrapper: bool Whether to return an underlying file object (False) or a data type instance (True) **kwargs: dict Extra arguments to pass to the file class constructor Returns ------- obj: file or object The opened file or object """ path = self.get_input(tag) input_class = self.get_input_type(tag) obj = input_class(path, "r", **kwargs) if wrapper: # pragma: no cover return obj return obj.file
def open_output( self, tag, wrapper=False, final_name=False, **kwargs ): # pragma: no cover """ Find and open an output file with the given tag, in write mode. If final_name is True then they will be opened using their final target output name. Otherwise we will prepend `inprogress_` to their file name. This means we know that if the final file exists then it is completed. If wrapper is True this will return an instance of the class of the file as specified in the cls.outputs. Otherwise it will return an open file object (standard python one or something more specialized). Parameters ---------- tag: str Tag as listed in self.outputs wrapper: bool Whether to return an underlying file object (False) or a data type instance (True) final_name: bool Default=False. Whether to save to **kwargs: Extra args are passed on to the file's class constructor. Returns ------- obj: file or object The opened file or object """ path = self.get_output(tag, final_name=final_name) output_class = self.get_output_type(tag) # HDF files and directory outputs can be opened for parallel writing # under MPI. This checks if: # - we have been told to open in parallel # - we are actually running under MPI # and adds the flags required if all these are true run_parallel = kwargs.pop("parallel", False) and self.is_mpi() if run_parallel and issubclass(output_class, file_types.Directory): kwargs["parallel"] = True kwargs["comm"] = self.comm # otherwise must be HDF5. Ideally we would check more carefully # here, but I don't want to mess up any RAIL HDF stuff by accident. elif run_parallel: kwargs["driver"] = "mpio" kwargs["comm"] = self.comm # XXX: This is also not a dependency, but it should be. # Or even better would be to make it a dependency of descformats where it # is actually used. import h5py if not h5py.get_config().mpi: print( dedent( """\ Your h5py installation is not MPI-enabled. Options include: 1) Set nprocess to 1 for all stages 2) Upgrade h5py to use mpi. See instructions here: http://docs.h5py.org/en/latest/build.html#custom-installation Note: If using conda, the most straightforward way is to enable it is conda install -c spectraldns h5py-parallel """ ) ) raise RuntimeError("h5py module is not MPI-enabled.") # Return an opened object representing the file obj = output_class(path, "w", **kwargs) if wrapper: return obj return obj.file @classmethod def inputs_(cls): """ Return the dict mapping input tags to file names. Returns ------- in_dict : dict[str:str] """ return cls.inputs # pylint: disable=no-member @classmethod def outputs_(cls): """ Return the dict mapping output tags to file names. Returns ------- out_dict : dict[str:str] """ return cls.outputs # pylint: disable=no-member
[docs] @classmethod def output_tags(cls): """ Return the list of output tags required by this stage. Returns ------- out_tags : list[str] The list of output tags """ return [tag for tag, _ in cls.outputs_()]
[docs] @classmethod def input_tags(cls): """ Return the list of input tags required by this stage. Returns ------- in_tags : list[str] The list of input tags """ return [tag for tag, _ in cls.inputs_()]
[docs] def get_input_type(self, tag): """ Return the file type class of an input file with the given tag. Parameters ---------- tag : str The tag of the input file Returns ------- ftype : FileType The file type class """ tag = self.get_aliased_tag(tag) for t, dt in self.inputs_(): t = self.get_aliased_tag(t) if t == tag: return dt raise ValueError(f"Tag {tag} is not a known input") # pragma: no cover
[docs] def get_output_type(self, tag): """ Return the file type class of an output file with the given tag. Parameters ---------- tag : str The tag of the output file Returns ------- ftype : FileType The file type class """ tag = self.get_aliased_tag(tag) for t, dt in self.outputs_(): t = self.get_aliased_tag(t) if t == tag: return dt raise ValueError(f"Tag {tag} is not a known output") # pragma: no cover
################################################## # Configuration-related methods and properties. ################################################## @property def instance_name(self): """Return the name associated to this particular instance of this stage""" return self._configs.get("name", self.name) @property def config(self): """ The configuration dictionary for this stage, aggregating command line options and optional configuration file. Options specified in the subclass variable `config_options` are read from the configuration file, command line, or `make_stage` choices, and stored in this dictionary. """ return self._configs def read_config(self, args): """ This function looks for the arguments of the pipeline stage using a combination of default values, command line options and separate configuration file. The order for resolving config options is first looking for a default value, then looking for a In case a mandatory argument (argument with no default) is missing, an exception is raised. Note that we recognize arguments with no default as the ones where self.config_options holds a type instead of a value. """ # Try to load configuration file if provided import yaml config_file = self.get_input("config") # This is all the config information in the file, including # things for other stages if isinstance(config_file, dict): overall_config = config_file elif config_file is not None: with open(config_file) as _config_file: overall_config = yaml.safe_load(_config_file) else: overall_config = {} # The user can define global options that are inherited by # all the other sections if not already specified there. input_config = overall_config.get("global", {}) # This is just the config info in the file for this stage. # It may be incomplete - there may be things specified on the # command line instead, or just using their default values stage_config = overall_config.get(self.instance_name, {}) input_config.update(stage_config) self._configs.set_config(input_config, args)
[docs] def get_config_dict(self, ignore=None, reduce_config=False): """Write the current configuration to a dict Parameters ---------- ignore : dict or None Global parameters not to write reduce_config : bool If true, reduce the configuration by parsing out the inputs, outputs and global params Returns ------- out_dict : dict The configuration """ out_dict = {} if reduce_config: ignore_keys = self.input_tags() + self.output_tags() + ["config"] else: ignore_keys = [] ignore = ignore or {} for key, val in self.config.items(): if reduce_config: if key in ignore: if ignore[key] == val: continue if key in ignore_keys: continue if key in self.input_tags() and val in [None, 'None']: continue out_dict[key] = cast_to_streamable(val) return out_dict
def find_inputs(self, pipeline_files): """Find and retrun all the inputs associated to this stage in the FileManager These are returned as a dictionary of tag : path pairs """ ret_dict = {} for tag, _ in self.inputs_(): aliased_tag = self.get_aliased_tag(tag) ret_dict[aliased_tag] = pipeline_files[aliased_tag] return ret_dict def find_outputs(self, outdir): """Find and retrun all the outputs associated to this stage These are returned as a dictionary of tag : path pairs """ ret_dict = {} for tag, ftype in self.outputs_(): aliased_tag = self.get_aliased_tag(tag) if not aliased_tag in self._outputs.keys(): # pragma: no cover self._outputs[aliased_tag] = ftype.make_name(aliased_tag) ret_dict[aliased_tag] = f"{outdir}/{self._outputs[aliased_tag]}" return ret_dict def print_io(self, stream=sys.stdout): """Print out the tags, paths and types for all the inputs and outputs of this stage""" stream.write("Inputs--------\n") for tag, ftype in self.inputs_(): aliased_tag = self.get_aliased_tag(tag) stream.write( f"{tag:20} : {aliased_tag:20} :{str(ftype):20} : {self._inputs[tag]}\n" ) stream.write("Outputs--------\n") for tag, ftype in self.outputs_(): aliased_tag = self.get_aliased_tag(tag) stream.write( f"{tag:20} : {aliased_tag:20} :{str(ftype):20} : {self._outputs[aliased_tag]}\n" ) def already_finished(self): """Print a warning that a stage is being skipped""" print(f"Skipping stage {self.instance_name} because its outputs exist already")
[docs] def iterate_fits( self, tag, hdunum, cols, chunk_rows, parallel=True ): # pragma: no cover """ Loop through chunks of the input data from a FITS file with the given tag TODO: add ceci tests of this functions Parameters ---------- tag: str The tag from the inputs list to use hdunum: int The extension number to read cols: list The columns to read chunk_rows: int Number of columns to read and return at once parallel: bool Whether to split up data among processes (parallel=True) or give all processes all data (parallel=False). Default = True. Returns ------- it: iterator Iterator yielding (int, int, array) tuples of (start, end, data) data is a structured array. """ fits = self.open_input(tag) ext = fits[hdunum] n = ext.get_nrows() for start, end in self.data_ranges_by_rank(n, chunk_rows, parallel=parallel): data = ext.read_columns(cols, rows=range(start, end)) yield start, end, data
[docs] def iterate_hdf( self, tag, group_name, cols, chunk_rows, parallel=True, longest=False ): """ Loop through chunks of the input data from an HDF5 file with the given tag. All the selected columns must have the same length. Parameters ---------- tag: str The tag from the inputs list to use group: str The group within the HDF5 file to use, looked up as file[group] cols: list The columns to read chunk_rows: int Number of columns to read and return at once parallel: bool Whether to split up data among processes (parallel=True) or give all processes all data (parallel=False). Default = True. longest: bool Whether to allow mixed length arrays and keep going until the longest array is completed, returning empty arrays for shorter ones Returns ------- it: iterator Iterator yielding (int, int, dict) tuples of (start, end, data) """ import numpy as np hdf = self.open_input(tag) group = hdf[group_name] # Check all the columns are the same length N = [len(group[col]) for col in cols] n = max(N) if not longest: if not np.equal(N, n).all(): raise ValueError( f"Different columns among {cols} in file {tag} group {group_name}" "are different sizes - if this is acceptable set longest=True" ) # Iterate through the data providing chunks for start, end in self.data_ranges_by_rank(n, chunk_rows, parallel=parallel): data = {col: group[col][start:end] for col in cols} yield start, end, data
def combined_iterators(self, rows, *inputs, parallel=True): """ Iterate through multiple files at the same time. If you have more several HDF files with the some columns of the same length then you can use this method to iterate through them all at once, and combine the data from all of them into a single dictionary. Parameters ---------- rows: int The number of rows to read in each chunk *inputs: list A list of (tag, group, cols) triples for each file to read. In each case tag is the input file name tag, group is the group within the HDF5 file to read, and cols is a list of columns to read from that group. Specify multiple triplets to read from multiple files parallel: bool Whether to split up data among processes (parallel=True) or give all processes all data (parallel=False). Default = True. Returns ------- it: iterator Iterator yielding (int, int, dict) tuples of (start, end, data) """ if not len(inputs) % 3 == 0: raise ValueError( "Arguments to combined_iterators should be in threes: " "tag, group, value" ) n = len(inputs) // 3 iterators = [] for i in range(n): tag = inputs[3 * i] section = inputs[3 * i + 1] cols = inputs[3 * i + 2] iterators.append( self.iterate_hdf(tag, section, cols, rows, parallel=parallel) ) for it in zip(*iterators): data = {} for (s, e, d) in it: data.update(d) yield s, e, data ################################ # Pipeline-related methods ################################ @classmethod def generate_command( cls, inputs, config, outputs, aliases=None, instance_name=None ): """ Generate a command line that will run the stage """ module = cls.get_module() module = module.split(".")[0] if sys.modules[module].__file__: # Regular module, stage will be imported with module flags = [f"{cls.name}"] else: # Namescape module, use 'ceci' to the get main # and specify the full path flags = [f"{cls.get_module()}.{cls.name}"] module = "ceci" aliases = aliases or {} for tag, _ in cls.inputs_(): aliased_tag = aliases.get(tag, tag) try: fpath = inputs[aliased_tag] except KeyError as msg: # pragma: no cover raise ValueError( f"Missing input location {aliased_tag} {str(inputs)}" ) from msg flags.append(f"--{tag}={fpath}") if instance_name is not None and instance_name != cls.name: flags.append(f"--name={instance_name}") flags.append(f"--config={config}") for tag, _ in cls.outputs_(): aliased_tag = aliases.get(tag, tag) try: fpath = outputs[aliased_tag] except KeyError as msg: # pragma: no cover raise ValueError( f"Missing output location {aliased_tag} {str(outputs)}" ) from msg flags.append(f"--{tag}={fpath}") flags = " ".join(flags) # We just return this, instead of wrapping it in a # parsl job cmd = f"python3 -m {module} {flags}" return cmd def time_stamp(self, tag): """ Print a time stamp with an optional tag. Parameters ---------- tag: str Additional info to print in the output line. Default is empty. """ t = datetime.datetime.now() print(f"Process {self.rank}: {tag} {t}") sys.stdout.flush() def memory_report(self, tag=None): """ Print a report about memory currently available on the node the process is running on. Parameters ---------- tag: str Additional info to print in the output line. Default is empty. """ import psutil t = datetime.datetime.now() # The different types of memory are really fiddly and don't # correspond to how you usually imagine. The simplest thing # to report here is just how much memory is left on the machine. mem = psutil.virtual_memory() avail = mem.available / 1024**3 total = mem.total / 1024**3 if tag is None: tag = "" else: tag = f" {tag}:" # This gives you the name of the host. At NERSC that is the node name host = socket.gethostname() # Print messsage print( f"{t}: Process {self.rank}:{tag} Remaining memory on {host} {avail:.1f} GB / {total:.1f} GB" ) sys.stdout.flush()