I've a process that builds a data pipeline from a collection of modular functions.
One of the functions is a switch which is supposed to allow for different functions to be executed dependent on the content of the data.
In the below code, this is the switcheroo
function which performs a test (via dictionary lookup) and attempts to plug in an appropriate function into the generator pipeline.
def switcheroo(x, field, s_dict, default):
for content in x:
yield next(s_dict.get(content[field], default)([content]))
I can run the code below with some success, and doing so generates 3 files - however, the test_a.txt
file should contain two results. Instead, it only contains one because the save
function/generator is losing its place-holder and being re-evaluated from scratch each time it's called - in this case reopening the file from scratch.
If I run a similar pipeline without the switcheroo
, the save
generator retains its internal state, and saves multiple rows to a file.
I've tried alternate ways of composing the switcheroo
function, but my constraint is that I need to compose it, along with whatever other functions I have into a single pipeline
generator which will be iterated over at run-time.
Other constraints are that I'd like to maintain the modularity of all my functions, such that they can be composed in any order.
from collections import OrderedDict
from functools import partial, reduce
data_source = [ OrderedDict({"id" : "1", "name" : "Tom", "sync" : "a" }),
OrderedDict({"id" : "2", "name" : "Steve", "sync" : "a" }),
OrderedDict({"id" : "3", "name" : "Ulrich", "sync" : "b" }),
OrderedDict({"id" : "4", "name" : "Victor", "sync" : "b" }),
OrderedDict({"id" : "5", "name" : "Wolfgang", "sync" : "c" }),
OrderedDict({"id" : "6", "name" : "Xavier", "sync" : "c" }),
OrderedDict({"id" : "7", "name" : "Yves", "sync" : "c" }),
OrderedDict({"id" : "8", "name" : "Zaphod", "sync" : "d" }),
OrderedDict({ "id" : "9", "name" : "Albert", "sync" : "d" })]
def od_to_str(od):
return ",".join((str(v) for v in od.values()))
def filt(x, field, filt):
for content in x:
if content[field] in filt:
yield content
def save(x, filename):
with open(filename, "w") as out:
for content in x:
out.write(od_to_str(content)+"\n")
yield content
p_save_a = partial(save, filename="test_a.txt")
p_save_b = partial(save, filename="test_b.txt")
p_save_c = partial(save, filename="test_c.txt")
p_save_d = partial(save, filename="test_d.txt")
switch_d = { "a" : p_save_a,
"b" : p_save_b,
"c" : p_save_c,
"d" : p_save_d,
}
def switcheroo(x, field, s_dict, default):
for content in x:
yield next(s_dict.get(content[field], default)([content]))
p_filt=partial(filt, field="name", filt=["Tom", "Steve", "Victor", "Xavier"])
p_switcheroo = partial(switcheroo, field="sync", s_dict=switch_d, default=lambda x : x)
dsc=[d.copy() for d in data_source.copy()]
iterator=(d for d in dsc)
def compose(*functions):
return lambda x: reduce(lambda λ, f : f(λ), functions, x)
pipeline = [p_filt, p_switcheroo]
funcs = [p_filt, p_switcheroo]
pipeline=compose(*funcs)
for result in pipeline(iterator):
print (result)
For reference, the above generates 3 files, the contents of which should be:
test_a.txt
1,Tom,a
2,Steve,a
test_b.txt
4,Victor,b
test_c.txt
6,Xavier,c
However, in test_a.txt there is only
2,Steve,a
On account of the save
function being evaluated from the start twice. i.e. it is recreating the file from scratch each time a record is piped into it. So the "Tom" record is saved, but the "Steve" record overwrites it when the file-setup code is rerun.
What I want it to do, is this initial setup one time only as per the normal generatior/yield pattern.
I could set up the file to be an "append" but I'm more interested in preserving the generator pattern than the details of the serialisation - I want to end up with a working pattern that I can confidently apply to an arbitrary collection of modular components arranged in different combinations. I've managed that so far as a linear pipeline goes. But this branching functionality, where the pipeline becomes tree-like is throwing a spanner into the works.