0

In my application I have implemented a very crude workflow made up by 5 different "processing units". The code at the moment is structured like this:

def run(self, result_first_step=None, result_second_step=None):

    config = read_workflow_config("config.ini")

    if config.first_step:
        result_first_step = run_process_1()

    if config.second_step and result_first_step is not None:
        result_second_step = run_process_2(result_first_step)
    else:
        raise Exception("Missing required data")

    if config.third_step:
        result_third_step = run_process_3(result_first_step, result_second_step)
    else:
        result_third_step = None

    collect_results(result_first_step, result_second_step, result_third_step)

and so on. The code works but it's hardly maintainable and quite fragile (the processing is a lot more complex than this simplified example). So, I've been thinking of adopting another strategy, i.e. making a proper workflow with:

  • Short-circuit: I can give no data to the starting process, or two different types of data. In the latter case, the workflow short-circuits and skips some processing
  • Common objects: Stuff like configuration available to all units
  • Conditions: depending on the configuration, some bits may be skipped

Is there a Python library available to perform these kinds of workflows, or should I roll my own? I've been trying pyutilib.workflow but it doesn't support properly a common configuration object short of passing it around to all workers (tedious).

Notice: this is for a library / command line application, so web-based workflow solutions are not proper.

denis
  • 21,378
  • 10
  • 65
  • 88
Einar
  • 4,727
  • 7
  • 49
  • 64
  • Have you tried googling this question? What was wrong with what you found? – Marcin Mar 09 '12 at 17:13
  • The way you've written it, it looks like you can't `run_process_2` unless you've already `run_process_1`. Is that true? – Katriel Mar 09 '12 at 17:13
  • Indeed, I will adjust it to show better what I have in mind. EDIT: changed example showing how one could short-circuit. – Einar Mar 09 '12 at 17:16
  • 1
    @Marcin It's not the first time I googled for this answer, and most solutions are either over-engineered, web based (a no no) or don't provide what I need. – Einar Mar 09 '12 at 18:42
  • @Einar It would be helpful if you explained what is wrong with the existing solutions individually. – Marcin Mar 09 '12 at 18:51
  • http://stackoverflow.com/questions/704834/does-anyone-know-about-workflow-frameworks-libraries-in-python, this might provide some insight. – John Mar 09 '12 at 18:51

2 Answers2

0

There's quite a range of approaches to pipelines in Python, from half-a-page to ...
Here's the main idea: at the top, put all the step definitions in a dict;
then pipeline( e.g. "C A T" ) does the steps C, A, T.

class Pipelinesimple:
    """p = Pipelinesimple( funcdict );  p.run( "C A T" ) = C(X) | A | T

    funcdict = dict( A = Afunc, B = Bfunc ... Z = Zfunc )
    pipeline = Pipelinesimple( funcdict )
    cat = pipeline.run( "C A T", X )  # C(X) | A | T, i.e. T( A( C(X) ))
    dog = pipeline.run( "D O G", X, **kw )  # D(X, **kw) | O(**kw) | G(**kw)
    """

def __init__( self, funcdict ):
    self.funcdict = funcdict  # funcs or functors of X

def run( self, steps, X, **commonargs ):
    """ steps "C A T" or ["C", "A", "T"]
        all funcs( X, **commonargs )
    """

    if isinstance( steps, basestring ):
        steps = steps.split()  # "C A T" -> ["C", "A", "T"]
    for step in steps:
        func = self.funcdict(step)
        # if X is None: ...
        X = func( X, **commonargs )
    return X

Next, there are several ways of giving different parameters to the different steps.

One way is to parse a multiline string such as

""" C  ca=5  cb=6 ...
    A  aa=1 ...
    T  ...
"""

Another is to take a list of functions / function names / param dicts, like

pipeline.run( ["C", dict(ca=5, cb=6), lambda ..., "T", dict(ta=3) ])

A third is to split params "A__aa B__ba ..." the way sklearn.pipeline.Pipeline. does. (That's geared to machine learning, but you can copy the pipeline parts.)

Each of these has fairly clear pros and cons.

A large community of talented people can come up with a dozen prototype solutions to a problem [pipelines] very quickly.
But reducing the dozen to two or three takes forever.

Whichever way you take, provide a way of logging all parameters for a run.

See also:
FilterPype
nipype

denis
  • 21,378
  • 10
  • 65
  • 88
0

You could make the run method into a generator;

def run(self)
  result_first_step = run_process_1()
  yield result_first_step
  result_second_step = run_process_2(result_first_step)
  yield result_second_step
  result_third_step = run_process_3(result_first_step, result_second_step)
  collect_results(result_first_step, result_second_step, result_third_step)
Roland Smith
  • 42,427
  • 3
  • 64
  • 94