mspasspy.workflow

mspasspy.workflow.sliding_window_pipeline(dlist, processing_function, dask_client, sliding_window_size='auto', task_per_worker=2, completion_function=None, accumulator=None, pfunc_args=None, pfunc_kwargs=None, cfunc_args=None, cfunc_kwargs=None, a_args=None, a_kwargs=None, verbose=False)[source]

Run a processing function and optional completion function on an interable input using the sliding window of Futures algorithm.

Embarrassingly parallel workflows can often be reduced to a single function that chains a sequence of algorithms together. In dask terminology that workflow defines a pipeline. Such a workflow can also be done with a sequence of dask/spark map operators chained together. Experience has show, however, that a large fraction of workflows using MsPASS fail with memory faults in dask when implemented with the dask bag map method. The reason seems to be that the problematic workflows reduce to a sequence of three fundamentally different operations: (1) read seismic data defined by the tiny inputs of a single document or a python dictionary defining a query, (2) run a sequence of processing function on the data created by each document or query, and (3) save the result using the MongoDB API of MsPASS. The problem this seems to cause is that steps (1) and (3) are tiny compared to what step 2 handles. Dask then has to handle the situation where a tiny input bloats by many orders of magnitude. The default configuration for dask is now known to handle that situation very badly. With map operators we have observed it consistently try to handle too many items at once because it treats each map step as a task. We have seen many examples where that will bloat memory use and abort the job from a memory fault. Worse, is that memory faults often produce mysterious errors because it isn’t necessarily one a worker task being run that fails. e.g. a common problem is a mysterious crash of the MongoDB server when workers are run on the same node as the server.

This function was developed to handle the memory problem with the map-reduce operators. We know of two common situations where this function should be used instead of the standard map operators of the map-reduce paradigm: (1) atomic processing of very large (10^5 or more) data sets, or (2) handling large ensembles that push worker memory limits. Guidelines on how to estimate memory use can be found in the MsPASS User Manual.

The way this algorithm controls memory use is by limiting the scheduler to only process data defined by the “sliding_window_size” argument. The function uses dask Futures (note Futures are supported only in dask so this function cannot be used with a pyspark workflow) and submits and runs only sliding_window_size instances of the task defined by the “completion_function” argument. The default behavior runs approximately 2 instances of the function per worker. The number is “approximate” because the scheduler algorithm often queues up more instances on startup and the load is unbalanced until all workers are assigned data to process. That means for the default with tasks_per_worker=2, a rough measure of the minimum memory size per worker is c + a*D where c is an estimate of the base memory use by each worker process, D is the nominal maximum data size of data objects created and used in the processing function, and a is some multiplier larger than 2. In my (glp) experience a safe guess for a is about 2 or 3. If running on a single node a=1 can still work provided (D+c)*N_workers is less than the memory size on the worker node available to workers. That is, memory not needed by the operating system and other MsPASS systems that may or may not be running on the same node. See the MsPASS User Manual for more guidance on this issue.

The other element of this function is the optional “completion_function” argument. That function is run on the output of every instance of the function returned by workers. By default the output of the completion function replaces that of the processing function yielding a list of the same size as the input dlist. For large numbers of inputs such a giant list can be problematic for a variety of reasons. For that reason this function has an optional accumulator argument. As the name implies it should be an accumulation function that creates a summary of the output or some useful combination of the outputs (e.g. a stack of data). That function must have at least two required arguments and they must match this somewhat rigid expectation:

arg0 - must be the current value being accumulated. ALSO the

function must define arg0==None as a signal that the accumulated result is to be initialized. That is required for the first time the function is entered.

arg1 - is the current value to be merged (accumulated into)

the value of arg0.

Each call to accumulator should return a value that is the same type as arg0. That and the None initialization allows the accumulator to be stateless. Optional *args and **kwargs values can be passed by the optional a_args and a_kwargs parameters.

The function supports two types of completion functions controlled by the boolean argument “completion_is_accumulator”. When True the completion function is expected to be an accumulator. In that mode the completion function should still return it’s current result but the result will overwritten as each Future completes. The function then returns the final value.

The function is made generic at the cost of having a rather complicated API. That is summarized here, but the reader is advised to consult the MsPASS User Manual and online sources if you aren’t intimately familiar with the way “*args” and “**kwargs” are used in python.

Parameters:
  • dlist – any iterable with components matching the requirements of arg0 of processing_function. Commonly a list of queries or MongoDB documents. Note although it should theoretically work we do not recommend using a MongoDB CommandCursor for this argument as it is subject to timeout errors if the processing function runs for a significant time.

  • processing_function – symbol defining the function object (usually just the function name) to run on each component of dlist. If the function has required arguments you must define the related argument pfunc_args. If it requires kwarg parameters you must also define pfunc_kwargs.

  • pfunc_args – iterable container defining the list of values for required arguments to the function defined as processing_function.

  • dask_client – instance of a dask.distributed.Client that defines the cluster to which the processing_function is to run upon. In MsPASS normal use is to fetch this with the MsPASS client get_scheduler method.

  • sliding_window_size (must be either the unique string “auto” or an integer. If set to “auto” (default) the sliding window size is set to the value of tasks_per_worker times the number of workers dask_client responds are running when this function is first called. Note that works perfectly on HPC systems but can fail in cloud environment if dask is runnign with dynamic scheduling. This can also be a integer value that manually sets the sliding window size.) – size of the sliding window submit buffer.

  • task_per_worker – determines the size of the sliding window when sliding_window_size=”auto”. Ignored if the sliding window size is set manually with an integer value.

  • completion_function (a function object or None. If None (default) the outputs from each future are simply appended to a list. When defined each return will be passed to this function.) – if defined the result returned by each submit will be processed with this function. When defined the function output will be the single output of this function.

  • accumulator – optional accumulator function applied to the output of the completion function. In most cases the large list returned by this function with or without a completion function is awkward to catastrophic to deal with. Further there are algorithms where the completion function needs to act as a Reduce operator (an efficient way to do that for large data sets). You can use this optional function to allow an accumulation operation. The function requires to arguments: arg0 must be the previous value handle, and arg1 the current value being handled. It must also emit the same type as arg0 it receives. You should think of the operation as a generalization of return arg0+=arg1. It is ESSENTIAL this function handle the case with arg0 set to None and handle that as an initializer and return a valid initial value for the return.

  • cfunc_args – comparable to pfunc_args but for the completion function. Set to None (default) if the completion function does not have any required arguments.

  • cfunc_kwargs – comparable to pfunc_kwargs but for the completion funcion. Set to None (default) if the completion_function does not require kwargs.

  • a_args – comparable to pfunc_args but for the accumulator function. Set to None (default) if the accumulatgor function does not have any required arguments.

  • a_kwargs – comparable to pfunc_kwargs but for the accumulator function. Set to None (default) if the accumulator does not require kwargs.

  • verbose – boolean controlling output. When False (default) the function itself is totally silent. When set True it prints a message for each submit and each return. Not recommended for workflows that use a large number of submits as the output can easily get huge.

Returns:

When the completion_function is not defined (default) this function will return a list of return values from each component of dlist passed through the processing function. Be warned this list can get huge if the data set is large and anything but a tiny datum is returned by processing_function. The definitive use of this function in MsPASS is a function that runs Database.save_data returning the default output of that method. That is easily handled as the default is the a list of boolean values. If, however, the same function is run with return_data=True a memory overflow in the caller is likely unless the entire output of the processing fits in the memory space of the caller. When a completion function is used the return is the return of the completion function. A complexity is that if the accumulator function is defined the return is NOT a list but the accumulated output of that function.