3

I've a process that builds a data pipeline from a collection of modular functions.

One of the functions is a switch which is supposed to allow for different functions to be executed dependent on the content of the data.

In the below code, this is the switcheroo function which performs a test (via dictionary lookup) and attempts to plug in an appropriate function into the generator pipeline.

def switcheroo(x, field, s_dict, default):
    for content in x:
        yield next(s_dict.get(content[field], default)([content]))

I can run the code below with some success, and doing so generates 3 files - however, the test_a.txt file should contain two results. Instead, it only contains one because the save function/generator is losing its place-holder and being re-evaluated from scratch each time it's called - in this case reopening the file from scratch.

If I run a similar pipeline without the switcheroo, the save generator retains its internal state, and saves multiple rows to a file.

I've tried alternate ways of composing the switcheroo function, but my constraint is that I need to compose it, along with whatever other functions I have into a single pipeline generator which will be iterated over at run-time.

Other constraints are that I'd like to maintain the modularity of all my functions, such that they can be composed in any order.


from collections import OrderedDict
from functools import partial, reduce

data_source = [ OrderedDict({"id" : "1", "name" : "Tom",      "sync" : "a" }),
                OrderedDict({"id" : "2", "name" : "Steve",    "sync" : "a" }),
                OrderedDict({"id" : "3", "name" : "Ulrich",   "sync" : "b" }),
                OrderedDict({"id" : "4", "name" : "Victor",   "sync" : "b" }),
                OrderedDict({"id" : "5", "name" : "Wolfgang", "sync" : "c" }),
                OrderedDict({"id" : "6", "name" : "Xavier",   "sync" : "c" }),
                OrderedDict({"id" : "7", "name" : "Yves",     "sync" : "c" }),
                OrderedDict({"id" : "8", "name" : "Zaphod",   "sync" : "d" }),
               OrderedDict({ "id" : "9", "name" : "Albert",   "sync" : "d" })]


def od_to_str(od):
    return ",".join((str(v) for v in od.values()))

def filt(x, field, filt):
    for content in x:
        if content[field] in filt:
            yield content

def save(x, filename):
    with open(filename, "w") as out:
        for content in x:
            out.write(od_to_str(content)+"\n")
            yield content

p_save_a = partial(save, filename="test_a.txt")
p_save_b = partial(save, filename="test_b.txt")
p_save_c = partial(save, filename="test_c.txt")
p_save_d = partial(save, filename="test_d.txt")

switch_d = { "a" : p_save_a, 
             "b" : p_save_b, 
             "c" : p_save_c, 
             "d" : p_save_d, 
           }

def switcheroo(x, field, s_dict, default):
    for content in x:
        yield next(s_dict.get(content[field], default)([content]))

p_filt=partial(filt, field="name", filt=["Tom", "Steve", "Victor", "Xavier"])
p_switcheroo = partial(switcheroo, field="sync", s_dict=switch_d, default=lambda x : x)

dsc=[d.copy() for d in data_source.copy()]
iterator=(d for d in dsc)

def compose(*functions):
    return lambda x: reduce(lambda λ, f : f(λ), functions, x)

pipeline = [p_filt, p_switcheroo]

funcs = [p_filt, p_switcheroo]
pipeline=compose(*funcs)

for result in pipeline(iterator):
    print (result)

For reference, the above generates 3 files, the contents of which should be:

test_a.txt

1,Tom,a
2,Steve,a

test_b.txt

4,Victor,b

test_c.txt

6,Xavier,c

However, in test_a.txt there is only

2,Steve,a

On account of the save function being evaluated from the start twice. i.e. it is recreating the file from scratch each time a record is piped into it. So the "Tom" record is saved, but the "Steve" record overwrites it when the file-setup code is rerun.

What I want it to do, is this initial setup one time only as per the normal generatior/yield pattern.

I could set up the file to be an "append" but I'm more interested in preserving the generator pattern than the details of the serialisation - I want to end up with a working pattern that I can confidently apply to an arbitrary collection of modular components arranged in different combinations. I've managed that so far as a linear pipeline goes. But this branching functionality, where the pipeline becomes tree-like is throwing a spanner into the works.

Thomas Kimber
  • 10,601
  • 3
  • 25
  • 42
  • 2
    I think `switch_d` needs to be a `dict` of active generators, not generator functions. This would require some changes to `send`, as it would receive the next value for `content` via the generator's `send` method rather than taking an iterable argument. – chepner Jun 04 '19 at 13:30
  • Nice idea - I'll look in that direction - there must be an established pattern for doing something like this. – Thomas Kimber Jun 04 '19 at 14:21
  • 1
    Another idea would be for `save` to take an open file handle as an argument, rather than a file name, or to open the file in append mode. That might be sufficient, without having to keep long-lived generators around. – chepner Jun 04 '19 at 14:28
  • So that would solve the immediate problem, but doing so locks-in the constraint that any function that appears within (or after) such a branching function has to avoid using state-aware generators. That may be a constraint I will have to deal with, but it would be extremely useful if there were a technique that allowed me to take advantage of generator state-persistence within this pipeline pattern. – Thomas Kimber Jun 04 '19 at 15:05
  • @ThomasKimber, can you show the expected output? – Mulan Jun 04 '19 at 21:34
  • @user633183 Sure done - see edited question body - thanks. – Thomas Kimber Jun 05 '19 at 08:21
  • 2
    I endorse @chepner's answer, you need to keep active generators and use `send` to feed them the next value, and catch a `StopIteration` when they're done. (more info on send: https://stackoverflow.com/questions/19302530/python-generator-send-function-purpose) – BlackBear Jun 05 '19 at 08:37
  • OK, so I can see how that might work - but that breaks the modularity where I need two different save functions, one to work on a regular generator basis, and the other on a `send` basis. Further (and this may be more style over substance) what I want to be able to do is compose a single iterable generator function that will represent arbitrarily complex flows. I don't think functional composition works with send, since sending data requires you to know the identity of the function to which you are sending. In this case, the receiving function is the one immediately enclosing the previous one. – Thomas Kimber Jun 05 '19 at 10:23
  • For more context - see this earlier question that gives some more detail about how I'm working towards a pipeline build by composing modular functions by adopting an iterator pattern. https://stackoverflow.com/questions/56431391/transmit-parameter-to-the-inner-most-call-of-a-composed-function – Thomas Kimber Jun 05 '19 at 10:29

2 Answers2

1

Building on chepner’s comment:

def dup(src):  # to allow "peeking"
  for x in src: yield x; yield x
def switcheroo(x, field, s_dict, default):
  # Make generators that share x:
  x=dup(x)
  d={k:v(x) for k,v in s_dict.items()}
  for content in x:  # peek
    yield next(d.get(content[field], default))

Note that the default must now be a generator itself, and that each generator produced from s_dict must be well behaved in the sense of reading exactly once from its input stream before each yield. One could use itertools.tee instead of dup; while less of a hack, it wouldn’t relax any assumptions.

Davis Herring
  • 36,443
  • 4
  • 48
  • 76
  • Thanks for this - I really like the `dup` trick there - it's the simplest "peek" I've seen to date. Let me think about situations where the "well behaved" assumption might not hold and what would happen if that wasn't the case... – Thomas Kimber Jun 06 '19 at 17:04
1

Hmm.

Your code doesn't work, because you try to shoehorn iteration into something more. It needs some push.

Consider, what happens, when switcheroo does the same save* function twice. It calls those functions twice (thus creating two generators, where every opens file for writing and overwrites previous one). Yes, you can store them, but then you hit second problem - you already passed data, over which p_save_a iterates ([content]). You can't extent it, when you encounter second p_save_a call using simple list object, you need a wrapper.

If you want something, that works, try this:

def switcheroo(x, field, s_dict, default):
    iterators = {}
    class placeholder:
        def __init__(self):
            self.values = []
        def append(self, v):
            self.values.append(v)
        def __iter__(self):
            return self
        def __next__(self):
            if not self.values:
                raise StopIteration
            v = self.values.pop(0)
            return v
    for content in x:
        val = content[field]
        try:
            iter, iter_val = iterators[val]
        except Exception:
            iter_val = placeholder()
            iter = s_dict.get(val, default)(iter_val)
            iterators[val] = iter, iter_val
        iter_val.append(content)
        yield next(iter)

placeholder class acts as a proxy communication service, a vector, that can be iterated and extented, while iterated over. Of course this is terrible performance wise, but this is only proof of concept.

Radosław Cybulski
  • 2,952
  • 10
  • 21
  • Beautiful - it works a dream - thankyou very much - and the explanation has helped too - I'm torn between marking this and @Davis Herring 's answer, which also does the trick. Thankyou once again. – Thomas Kimber Jun 06 '19 at 17:00
  • For this case, a single value suffices rather than a list, which should perform a little better. – Davis Herring Jun 06 '19 at 23:50
  • @DavisHerring is correct. I used list as general example, if there's a guarantee, that there will be at most one object, using single value is the way to go. – Radosław Cybulski Jun 07 '19 at 19:59