25

I'm doing a pipeline code refactoring using python.

Assuming we have a series of generator functions and we want to chain those to form a data processing pipeline.

Example:

#!/usr/bin/python
import itertools

def foo1(g):
    for i in g:
        yield i + 1

def foo2(g):
    for i in g:
        yield 10 + i

def foo3(g):
    for i in g:
        yield 'foo3:' + str(i)

res = foo3(foo2(foo1(range(0, 5))))

for i in res:
    print i

Output:

foo3:11
foo3:12
foo3:13
foo3:14
foo3:15

I do not think foo3(foo2(foo1(range(0, 5)))) is a pythonic way to achieve my pipeline goal. Especially when the number of stages in the pipeline is large.

I wish I could rewrite it like chain in jquery. Something similar to :

range(0, 5).foo1().foo2().foo3()

Or maybe

l = [range(0, 5), foo1, foo2, foo3]
res = runner.run(l)

But I'm new to generator topic and couldn't find a way to achieve this.

Any help will be welcome.

boardrider
  • 5,882
  • 7
  • 49
  • 86
xuanyue
  • 1,368
  • 1
  • 17
  • 36
  • Maybe with [itertools.accumulate](https://docs.python.org/dev/library/itertools.html#itertools.accumulate) or [functools.reduce](https://docs.python.org/3.3/library/functools.html#functools.reduce)? (both are part of the standard library) – sascha Aug 04 '16 at 00:38
  • This sounds like some XY-problem. If you are doing number crunching on arrays/lists consider using numpy/pandas. – MSeifert Aug 04 '16 at 00:39
  • 1
    maxymoo's answer is possible the best, but you can also abuse operator overloading in devious ways (this is frowned upon in Python), see this for inspiration: http://stackoverflow.com/questions/33658355/piping-output-from-one-function-to-another-using-python-infix-syntax/33661327#33661327 – Paulo Scardine Aug 04 '16 at 01:07

7 Answers7

35

I sometimes like to use a left fold (called reduce in Python) for this type of situation:

from functools import reduce
def pipeline(*steps):
    return reduce(lambda x, y: y(x), list(steps))

res = pipeline(range(0, 5), foo1, foo2, foo3)

Or even better:

def compose(*funcs):
    return lambda x: reduce(lambda f, g: g(f), list(funcs), x)

p = compose(foo1, foo2, foo3)
res = p(range(0, 5))
maxymoo
  • 35,286
  • 11
  • 92
  • 119
  • 2
    Apart from being less efficient than the chained ``foo(foo1(...)`` this isn't exactly what I would call more readable even if there are lots of functions. – MSeifert Aug 04 '16 at 00:45
  • 1
    well it's a matter of taste, you could define an alias for this like @john1024 did, but for me, i just recognise the pattern as a pipeline, and focus on the list of functions as the meaningful part, the main thing is that they're pipelined left-to-right instead of right-to-left as in the function composition in the question – maxymoo Aug 04 '16 at 00:55
  • @MSeifert - how is this less efficient? It strikes me as identically efficient. – Robᵩ Aug 04 '16 at 01:00
  • 1
    @MSeifert: now imagine when you have 10-20 different generators with names that are actually descriptive. – chiffa Aug 04 '16 at 01:18
  • Yes, the efficiency doesn't suffer if one operates on largish data-sets. I don't think the answer is wrong or not recommendable. I just wanted to point out that _in my opinion_ something like `foo1(foo2(...))` is more readable in the long run. – MSeifert Aug 04 '16 at 01:47
  • What if you want to pass in arguments to one of the generators? – CpILL Apr 18 '19 at 01:55
  • @CpILL Just use the python `partial(func, agr1='something', arg2=69)`! – CpILL May 28 '19 at 00:44
  • all these solutions require the generator to be iterated over in the receiving function. is there a way to pop the generator out? – Summer-Sky May 12 '20 at 07:15
  • How would you handle async functions since lambdas can't be async? – CpILL Jul 17 '23 at 05:26
3

I do not think foo3(foo2(foo1(range(0, 5)))) is a pythonic way to achieve my pipeline goal. Especially when the number of stages in the pipeline is large.

There is a fairly trivial, and in my opinion clear, way of chaining generators: assigning the result of each to a variable, where each can have a descriptive name.

range_iter = range(0, 5)
foo1_iter = foo1(range_iter)
foo2_iter = foo2(foo1_iter)
foo3_iter = foo3(foo2_iter)

for i in foo3_iter:
  print(i)

I prefer this to a something that uses a higher order function, e.g. a reduce or similar:

  • In my real cases, often each foo* generator function needs its own other parameters, which is tricky if using a reduce.

  • In my real cases, the steps in the pipeline are not dynamic at runtime: it seems a bit odd/unexpected (to me) to have a pattern that seems more appropriate for a dynamic case.

  • It's a bit inconsistent with how regular functions are typically written where each is called explicitly, and the result of each is passed to the call of the next. Yes, I guess a bit of duplication, but I'm happy with "calling a function" being duplicated since (to me) it's really clear.

  • No need for an import: it uses core language features.

Michal Charemza
  • 25,940
  • 14
  • 98
  • 165
3

Although this implementation requires some overhead, I prefer to use >> operator for chaining pipeline steps; similar to how tasks are arranged into a dag in Airflow.

def foo1(g):
    for i in g:
        yield i + 1


def foo2(g):
    for i in g:
        yield 10 + i


def foo3(g):
    for i in g:
        yield "foo3:" + str(i)


def print_loop(g):
    for i in g:
        print(i)


class PipelineOperator:
    def __init__(self, task):
        self.task = task

    def __rrshift__(self, x):
        return self.task(x)


foo1_t = PipelineOperator(foo1)
foo2_t = PipelineOperator(foo2)
foo3_t = PipelineOperator(foo3)
print_loop_t = PipelineOperator(print_loop)

(range(0, 5) >> foo1_t >> foo2_t >> foo3_t >> print_loop_t)
Matthew Thomas
  • 594
  • 7
  • 13
2

Following up on your runner.run approach, let's define this utility function:

def recur(ops):
    return ops[0](recur(ops[1:])) if len(ops)>1 else ops[0]

As an example:

>>> ops = foo3, foo2, foo1, range(0, 5)
>>> list( recur(ops) )
['foo3:11', 'foo3:12', 'foo3:13', 'foo3:14', 'foo3:15']

Alternative: backward ordering

def backw(ops):
    return ops[-1](backw(ops[:-1])) if len(ops)>1 else ops[0]

For example:

>>> list( backw([range(0, 5), foo1, foo2, foo3]) )
['foo3:11', 'foo3:12', 'foo3:13', 'foo3:14', 'foo3:15']
John1024
  • 109,961
  • 14
  • 137
  • 171
2

Here is another answer in case the function in your example are one-time(or one-use) function. Some nice variable naming and use of generator expression can be helpful for small operations.

>>> g = range(0, 5)
>>> foo1 = (x+1 for x in g)
>>> foo2 = (x+10 for x in foo1)
>>> foo3 = ('foo3:' + str(x) for x in foo2)
>>> for x in foo3:
...     print x
...
foo3:11
foo3:12
foo3:13
foo3:14
foo3:15
Jay Rajput
  • 1,813
  • 17
  • 23
2

You can compose curried generator functions using PyMonad:

def main():
    odds = list * \
         non_divisibles(2) * \
         lengths * \
         Just(["1", "22", "333", "4444", "55555"])
    print(odds.getValue())    #prints [1, 3, 5]


@curry
def lengths(words: Iterable[Sized]) -> Iterable[int]:
    return map(len, words)


@curry
def non_divisibles(div: int, numbers: Iterable[int]) -> Iterable[int]:
    return (n for n in numbers if n % div)

Another alternative is to start with a Monad and compose the generators using fmap calls - this syntax is familiar to Java 8 Stream users:

def main():
    odds = Just(["1", "22", "333", "4444", "55555"]) \
        .fmap(lengths) \
        .fmap(non_divisibles(2)) \
        .fmap(list) \
        .getValue()
    print(odds)   #prints [1, 3, 5]


def lengths(words: Iterable[Sized]) -> Iterable[int]:
    return map(len, words)


@curry
def non_divisibles(div: int, numbers: Iterable[int]) -> Iterable[int]:
    return (n for n in numbers if n % div)

Note that the functions don't need to be decorated with @curry in this case. The entire chain of transformations is not evaluated until the terminal getValue() call.

Albert Gevorgyan
  • 171
  • 3
  • 10
2

For future readers: another solution that is very pythonic (IMHO):

steps = [
    foo1, 
    foo2, 
    foo3
    ]

res = range(0, 5)
for step in steps:
    res = step(res)

for i in res:
    print(i)

foo3:11
foo3:12
foo3:13
foo3:14
foo3:15

This is essentially doing the same as functools.reduce like in maxymoo's answer. The laziness of generators allows this simple formulation without functools.

examiner
  • 156
  • 6
  • This works great and reads well for non-lazy functions with side-effects in a pipeline too. Including `from functools import partial` and passing in additional custom parameters as necessary worked for me. – Wassadamo May 04 '22 at 23:39